diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc28876fe5513b5153ad7706cf0e5320a364636..1b04c08a883546bb55ed6daf6289f6bb9da1ea14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,12 @@ ## 21.03.1 +IMPROVEMENTS +* Consumer API - retry file delivery/reading with timeout (can be useful for the case file arrives after metadta ingested asapo, e.g. for slow NFS transfer,...) + BUG FIXES -* fix LDAP authorization for raw data type Producers +* Core services: fix LDAP authorization for raw data type Producers +* Consumer API: fix race condition in GetStreamList/get_stream_list +* Producer API: fix segfault in send_stream_finished_flag ## 21.03.0 diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index ce1ce2cdac11aa19bba2c9fec166d4997a63b6f2..4d32341c6044570d9c3028f5770e7f08e8c72c63 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -28,14 +28,14 @@ type StreamsRecord struct { type Streams struct { records map[string]StreamsRecord - lastUpdated int64 + lastUpdated map[string]int64 } -var streams = Streams{lastUpdated: 0, records: make(map[string]StreamsRecord, 0)} +var streams = Streams{lastUpdated: make(map[string]int64, 0), records: make(map[string]StreamsRecord, 0)} var streamsLock sync.Mutex func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { - if ss.lastUpdated < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { + if ss.lastUpdated[db_name] < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { return StreamsRecord{}, errors.New("cache expired") } rec, ok := ss.records[db_name] @@ -173,7 +173,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err res :=StreamsRecord{} utils.DeepCopy(rec,&res) ss.records[db_name] = res - ss.lastUpdated = time.Now().UnixNano() + ss.lastUpdated[db_name] = time.Now().UnixNano() } return rec, nil } diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go index c972ba789f8d4b77e44f3c9d8b5a26416db770c9..fef6ff3afb5d5f5f74bcf85975c5e22748d2388e 100644 --- a/broker/src/asapo_broker/database/streams_test.go +++ b/broker/src/asapo_broker/database/streams_test.go @@ -54,6 +54,27 @@ func (suite *StreamsTestSuite) TestStreamsUsesCache() { suite.Equal(int64(1), rec.Streams[0].TimestampLast) } +func (suite *StreamsTestSuite) TestStreamsCacheexpires() { + db.settings.UpdateStreamCachePeriodMs = 100 + var res1 StreamsRecord + go func() { + db.insertRecord(dbname, collection, &rec1) + streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + db.insertRecord(dbname, collection, &rec_finished) + res1,_ = streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + }() + db.insertRecord(dbname, collection+"1", &rec1_later) + res2,_ := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + db.insertRecord(dbname, collection+"1", &rec_finished) + time.Sleep(time.Second) + res3, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + suite.Nil(err) + suite.Equal(true, res3.Streams[0].Finished) + fmt.Println(res1,res2) +// suite.Equal(true, rec.Streams[1].Finished) +} + + func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec1) diff --git a/common/cpp/include/asapo/request/request.h b/common/cpp/include/asapo/request/request.h index 594c03c85b002d12a50b76a7bc1921bed341f8e8..83774bc6eb095b07199ca2125ababbcbe98e0700 100644 --- a/common/cpp/include/asapo/request/request.h +++ b/common/cpp/include/asapo/request/request.h @@ -15,6 +15,9 @@ class GenericRequest { GenericRequest(GenericRequestHeader h, uint64_t timeout_ms): header{std::move(h)}, timeout_ms_{timeout_ms} {}; GenericRequestHeader header; virtual ~GenericRequest() = default; + virtual bool ContainsData() { + return true; + }; uint64_t GetRetryCounter() { return retry_counter_; } diff --git a/common/cpp/include/asapo/unittests/MockIO.h b/common/cpp/include/asapo/unittests/MockIO.h index d6c10de678c3cfa3ad8f4d9c0c1220c96b6e7514..9c80d4ab6bd7205ed45c06f6f12b19e35fc5c438 100644 --- a/common/cpp/include/asapo/unittests/MockIO.h +++ b/common/cpp/include/asapo/unittests/MockIO.h @@ -208,13 +208,17 @@ class MockIO : public IO { MOCK_CONST_METHOD2(CreateNewDirectory_t, void(const std::string& directory_name, ErrorInterface** err)); MessageData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override { - ErrorInterface* error = nullptr; + std::function<ErrorInterface*()> error; auto data = GetDataFromFile_t(fname, fsize, &error); - err->reset(error); + if (error!=nullptr) { + err->reset(error()); + } else { + err->reset(nullptr); + } return MessageData(data); } - MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t* fsize, ErrorInterface** err)); + MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t* fsize, std::function<ErrorInterface*()>* err_gen)); Error GetLastError() const override { diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 7cab88b4e40ae6f5e9b3547c65bb784213e6abf8..8a35d66cff0345d372b42ea078933351860e6256 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -417,7 +417,7 @@ Error StreamInfoFromDbResponse(const std::string &last_record_str, return err; } - return UpdateStreamInfoFromEarliestRecord(last_record_str,info); + return UpdateStreamInfoFromEarliestRecord(earliest_record_str,info); } diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index 5df251934d7b8ca16ced26beda4d7312cfb865f2..dc2573b67c47bac923fe8ff8843eeed50a38fe44 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -28,10 +28,11 @@ Error RequestPool::CanAddRequests(const GenericRequests &requests) { uint64_t total_size = 0; for (auto &request : requests) { - total_size += request->header.data_size; + if (request->ContainsData()) { + total_size += request->header.data_size; + } } - if (memory_used_ + total_size > limits_.max_memory_mb * 1000000) { return IOErrorTemplates::kNoSpaceLeft.Generate( "reached maximum memory capacity of " + std::to_string(limits_.max_memory_mb) + " MB"); @@ -49,6 +50,10 @@ Error RequestPool::CanAddRequest(const GenericRequestPtr &request, bool top_prio "reached maximum number of " + std::to_string(limits_.max_requests) + " requests"); } + if (!request->ContainsData()) { + return nullptr; + } + if (limits_.max_memory_mb > 0 && memory_used_ + request->header.data_size > limits_.max_memory_mb * 1000000) { return IOErrorTemplates::kNoSpaceLeft.Generate( "reached maximum memory capacity of " + std::to_string(limits_.max_memory_mb) + " MB"); @@ -66,7 +71,9 @@ Error RequestPool::AddRequest(GenericRequestPtr request, bool top_priority) { return err; } - memory_used_ += request->header.data_size; + if (request->ContainsData()) { + memory_used_ += request->header.data_size; + } if (top_priority) { request_queue_.emplace_front(std::move(request)); @@ -118,7 +125,9 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler> &request_ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); thread_info->lock.lock(); } else { - memory_used_ -= request->header.data_size; + if (request->ContainsData()) { + memory_used_ -= request->header.data_size; + } } } @@ -157,7 +166,9 @@ Error RequestPool::AddRequests(GenericRequests requests) { uint64_t total_size = 0; for (auto &elem : requests) { - total_size += elem->header.data_size; + if (elem->ContainsData()) { + total_size += elem->header.data_size; + } request_queue_.emplace_front(std::move(elem)); } memory_used_ += total_size; diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index ccda29729425b60a4a914e7d03ca8e060e93deff..44bfbb8c410578c7ae4be49d03ad8238ad6d15b1 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -37,42 +37,45 @@ using asapo::ErrorInterface; using asapo::GenericRequest; using asapo::GenericRequestHeader; - - class MockRequestHandlerFactory : public asapo::RequestHandlerFactory { - public: - MockRequestHandlerFactory(RequestHandler* request_handler): - RequestHandlerFactory() { - request_handler_ = request_handler; - } - std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override { - return std::unique_ptr<RequestHandler> {request_handler_}; - } - private: - RequestHandler* request_handler_; + public: + MockRequestHandlerFactory(RequestHandler* request_handler) : + RequestHandlerFactory() { + request_handler_ = request_handler; + } + std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override { + return std::unique_ptr<RequestHandler>{request_handler_}; + } + private: + RequestHandler* request_handler_; }; class TestRequest : public GenericRequest { - public: - TestRequest(GenericRequestHeader header, uint64_t timeout): GenericRequest(header, timeout) {}; + public: + TestRequest(GenericRequestHeader header, uint64_t timeout, bool contains_data = true) : GenericRequest(header, + timeout) { + contains_data_ = contains_data; + }; + bool ContainsData() { return contains_data_; }; + private: + bool contains_data_; }; - class RequestPoolTests : public testing::Test { - public: - NiceMock<MockRequestHandler>* mock_request_handler = new testing::NiceMock<MockRequestHandler>; - NiceMock<asapo::MockLogger> mock_logger; - MockRequestHandlerFactory request_handler_factory{mock_request_handler}; - const uint8_t nthreads = 1; - asapo::RequestPool pool {nthreads, &request_handler_factory, &mock_logger}; - std::unique_ptr<GenericRequest> request{new TestRequest{GenericRequestHeader{asapo::kOpcodeUnknownOp,0,1000000}, 0}}; - void SetUp() override { - } - void TearDown() override { - } + public: + NiceMock<MockRequestHandler>* mock_request_handler = new testing::NiceMock<MockRequestHandler>; + NiceMock<asapo::MockLogger> mock_logger; + MockRequestHandlerFactory request_handler_factory{mock_request_handler}; + const uint8_t nthreads = 1; + asapo::RequestPool pool{nthreads, &request_handler_factory, &mock_logger}; + std::unique_ptr<GenericRequest> + request{new TestRequest{GenericRequestHeader{asapo::kOpcodeUnknownOp, 0, 1000000}, 0}}; + void SetUp() override { + } + void TearDown() override { + } }; - TEST(RequestPool, Constructor) { NiceMock<asapo::MockLogger> mock_logger; MockRequestHandlerFactory factory(nullptr); @@ -118,9 +121,9 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_, _)).Times(ntimes).WillRepeatedly( - DoAll( testing::SetArgPointee<1>(false), - Return(true) - )); + DoAll(testing::SetArgPointee<1>(false), + Return(true) + )); EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(true)).Times(ntimes); } @@ -128,14 +131,12 @@ void ExpectFailProcessRequest(MockRequestHandler* mock_handler) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(AtLeast(1)); EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_, _)).Times(AtLeast(1)).WillRepeatedly( - DoAll( testing::SetArgPointee<1>(true), - Return(false) - )); + DoAll(testing::SetArgPointee<1>(true), + Return(false) + )); EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(false)).Times(AtLeast(1)); } - - TEST_F(RequestPoolTests, AddRequestIncreasesRetryCounter) { ExpectFailProcessRequest(mock_request_handler); @@ -147,7 +148,6 @@ TEST_F(RequestPoolTests, AddRequestIncreasesRetryCounter) { ASSERT_THAT(mock_request_handler->retry_counter, Gt(0)); } - TEST_F(RequestPoolTests, AddRequestCallsSend) { ExpectSend(mock_request_handler); @@ -182,7 +182,6 @@ TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) { ASSERT_THAT(nreq2, Eq(0)); } - TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { TestRequest* request2 = new TestRequest{GenericRequestHeader{}, 0}; @@ -201,7 +200,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { TEST_F(RequestPoolTests, RefuseAddRequestIfHitSizeLimitation) { TestRequest* request2 = new TestRequest{GenericRequestHeader{}, 0}; - pool.SetLimits(asapo::RequestPoolLimits({1,0})); + pool.SetLimits(asapo::RequestPoolLimits({1, 0})); pool.AddRequest(std::move(request)); request.reset(request2); auto err = pool.AddRequest(std::move(request)); @@ -219,7 +218,7 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { header.data_size = 100; TestRequest* request2 = new TestRequest{header, 0}; - pool.SetLimits(asapo::RequestPoolLimits({0,1})); + pool.SetLimits(asapo::RequestPoolLimits({0, 1})); pool.AddRequest(std::move(request)); request.reset(request2); auto err = pool.AddRequest(std::move(request)); @@ -233,15 +232,31 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { } +TEST_F(RequestPoolTests, OkAddRequestIfSendingFile) { + auto header = GenericRequestHeader{}; + header.data_size = 100; + TestRequest* request2 = new TestRequest{header, 0, false}; + + pool.SetLimits(asapo::RequestPoolLimits({0, 1})); + pool.AddRequest(std::move(request)); + request.reset(request2); + auto err = pool.AddRequest(std::move(request)); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(2)); + ASSERT_THAT(err, Eq(nullptr)); +} + TEST_F(RequestPoolTests, RefuseAddRequestsIfHitSizeLimitation) { TestRequest* request2 = new TestRequest{GenericRequestHeader{}, 0}; std::vector<std::unique_ptr<GenericRequest>> requests; requests.push_back(std::move(request)); - requests.push_back(std::unique_ptr<GenericRequest> {request2}); + requests.push_back(std::unique_ptr<GenericRequest>{request2}); - pool.SetLimits(asapo::RequestPoolLimits({1,0})); + pool.SetLimits(asapo::RequestPoolLimits({1, 0})); auto err = pool.AddRequests(std::move(requests)); auto nreq = pool.NRequestsInPool(); @@ -249,7 +264,6 @@ TEST_F(RequestPoolTests, RefuseAddRequestsIfHitSizeLimitation) { ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); } - TEST_F(RequestPoolTests, RefuseAddRequestsIfHitMemoryLimitation) { auto header = GenericRequestHeader{}; @@ -259,9 +273,9 @@ TEST_F(RequestPoolTests, RefuseAddRequestsIfHitMemoryLimitation) { std::vector<std::unique_ptr<GenericRequest>> requests; requests.push_back(std::move(request)); - requests.push_back(std::unique_ptr<GenericRequest> {request2}); + requests.push_back(std::unique_ptr<GenericRequest>{request2}); - pool.SetLimits(asapo::RequestPoolLimits({0,1})); + pool.SetLimits(asapo::RequestPoolLimits({0, 1})); auto err = pool.AddRequests(std::move(requests)); auto nreq = pool.NRequestsInPool(); @@ -269,7 +283,6 @@ TEST_F(RequestPoolTests, RefuseAddRequestsIfHitMemoryLimitation) { ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); } - TEST_F(RequestPoolTests, AddRequestsOk) { TestRequest* request2 = new TestRequest{GenericRequestHeader{}, 0}; @@ -278,7 +291,7 @@ TEST_F(RequestPoolTests, AddRequestsOk) { std::vector<std::unique_ptr<GenericRequest>> requests; requests.push_back(std::move(request)); - requests.push_back(std::unique_ptr<GenericRequest> {request2}); + requests.push_back(std::unique_ptr<GenericRequest>{request2}); auto err = pool.AddRequests(std::move(requests)); @@ -328,5 +341,4 @@ TEST_F(RequestPoolTests, StopThreads) { Mock::VerifyAndClearExpectations(&mock_logger); } - } diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 7d3d565e5db6a80a6382e31b6c39adaf7d9f504c..32ecf6a3671b00e337f0bc78c0887712714c4404 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -371,10 +371,24 @@ Error ConsumerImpl::GetMessageFromServer(GetMessageServerOperation op, uint64_t } Error ConsumerImpl::GetDataFromFile(MessageMeta* info, MessageData* data) { - Error error; - *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); - if (error) { - return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); + interrupt_flag_ = false; + uint64_t elapsed_ms = 0; + Error err; + while (elapsed_ms <= timeout_ms_) { + if (interrupt_flag_) { + err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); + break; + } + auto start = system_clock::now(); + *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &err); + if (err == nullptr) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count(); + } + if (err != nullptr) { + return ConsumerErrorTemplates::kLocalIOError.Generate(err->Explain()); } return nullptr; } diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 6ad5448e5d336702803094aa454d40e4292e9555..be641e76cd2c11787ba1f763aa4a75592300e762 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -58,6 +58,10 @@ TEST(FolderDataBroker, Constructor) { const uint8_t expected_value = 1; +asapo::ErrorInterface* new_error(){ + return new asapo::SimpleError{"s"}; +}; + class ConsumerImplTests : public Test { public: std::unique_ptr<ConsumerImpl> consumer, fts_consumer; @@ -167,9 +171,14 @@ class ConsumerImplTests : public Test { return; } - EXPECT_CALL(mock_io, GetDataFromFile_t(expected_full_path, testing::Pointee(100), _)).Times(times). - WillRepeatedly(DoAll(SetArgPointee<2>(new asapo::SimpleError{"s"}), testing::Return(nullptr))); + auto simple_error = []{ + return new asapo::SimpleError{"s"}; + }; + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_full_path, testing::Pointee(100), _)).Times(AtLeast(times)). + WillRepeatedly(DoAll(SetArgPointee<2>(simple_error), testing::Return(nullptr))); } + MessageMeta CreateFI(uint64_t buf_id = expected_buf_id) { MessageMeta fi; fi.size = expected_message_size; @@ -564,6 +573,23 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) { consumer->GetNext(expected_group_id, &info, &data, expected_stream); } +TEST_F(ConsumerImplTests, GetMessageCallsRetriesReadFromFile) { + MockGetBrokerUri(); + auto to_send = CreateFI(0); + auto json = to_send.Json(); + MockGet(json); + consumer->SetTimeout(200); + + MessageData data; + + EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0); + + MockReadDataFromFile(2); + + consumer->GetNext(expected_group_id, &info, &data, expected_stream); +} + + TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) { MockGetBrokerUri(); diff --git a/file_transfer/src/asapo_file_transfer/server/transfer.go b/file_transfer/src/asapo_file_transfer/server/transfer.go index 8e6817007b75dc25ab7b589814c13a0788ea42f6..e6528ff34e280ccd9510d1e910f3038bb3ccb2b9 100644 --- a/file_transfer/src/asapo_file_transfer/server/transfer.go +++ b/file_transfer/src/asapo_file_transfer/server/transfer.go @@ -21,7 +21,7 @@ type fileTransferRequest struct { func Exists(name string) bool { fi, err := os.Stat(name) - return !os.IsNotExist(err) && !fi.IsDir() + return err==nil && !fi.IsDir() } @@ -42,7 +42,7 @@ func checkFileExists(r *http.Request,name string) (int,error) { if !Exists(name) { err_txt := "file "+name+" does not exist" log.Error("cannot transfer file: "+err_txt) - return http.StatusBadRequest,errors.New(err_txt) + return http.StatusNotFound,errors.New(err_txt) } return http.StatusOK,nil diff --git a/file_transfer/src/asapo_file_transfer/server/transfer_test.go b/file_transfer/src/asapo_file_transfer/server/transfer_test.go index 378426407d26265af17b2ee6e522d023a540b207..d4117dcfd9a71939914f3f7227c8d18b148320ab 100644 --- a/file_transfer/src/asapo_file_transfer/server/transfer_test.go +++ b/file_transfer/src/asapo_file_transfer/server/transfer_test.go @@ -60,7 +60,7 @@ var transferFileTests = [] struct { message string }{ {"folder","exists", prepareToken("folder"),http.StatusOK,"file transferred"}, - {"folder","not_exists", prepareToken("folder"),http.StatusBadRequest,"file not exists"}, + {"folder","not_exists", prepareToken("folder"),http.StatusNotFound,"file not exists"}, {"wrong_folder","p07", prepareToken("folder"),http.StatusUnauthorized,"wrong folder"}, {"folder","p07", "wrong token",http.StatusUnauthorized,"wrong token"}, } diff --git a/producer/api/cpp/src/producer_request.h b/producer/api/cpp/src/producer_request.h index 4d0b73fc1eec36fcb270221f62a570d36f31230d..f15d2510330d4c1530f018cea718f6479dd6f5a7 100644 --- a/producer/api/cpp/src/producer_request.h +++ b/producer/api/cpp/src/producer_request.h @@ -18,6 +18,9 @@ class ProducerRequest : public GenericRequest { RequestCallback callback, bool manage_data_memory, uint64_t timeout_ms); + virtual bool ContainsData() override { + return !DataFromFile(); + }; std::string source_credentials; std::string metadata; MessageData data; diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 9c666a0561cac488cd3d0826e8b773bc52df4994..d0b042aa09729ccb700e780c201f8e1639969e1f 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -130,11 +130,14 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) { + auto unknown_error = []{ + return asapo::IOErrorTemplates::kUnknownIOError.Generate().release(); + }; EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) .WillOnce( DoAll( - testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + testing::SetArgPointee<2>(unknown_error), Return(nullptr) )); diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 5391c8ce8ea41e3eada8ad182b8e4cc755d3feb1..3b3609cc4a533390ee56d4ff31eecef79f6a5090 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -220,6 +220,8 @@ cdef class PyProducer: unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) if err: throw_exception(err) + if callback != None: + Py_XINCREF(<PyObject*>callback) def stream_info(self, stream = 'default', uint64_t timeout_ms = 1000): """ diff --git a/tests/automatic/bug_fixes/CMakeLists.txt b/tests/automatic/bug_fixes/CMakeLists.txt index 17a66a5243a33d9371e73664d479faca25d20793..6cc3dbaa7a47a0f8687396b687db47de0129ec79 100644 --- a/tests/automatic/bug_fixes/CMakeLists.txt +++ b/tests/automatic/bug_fixes/CMakeLists.txt @@ -2,6 +2,7 @@ if (UNIX) add_subdirectory(receiver_cpu_usage) if (BUILD_PYTHON) add_subdirectory(consumer_python_memleak) + add_subdirectory(streamlist_python_multithread) add_subdirectory(error-sending-data-using-callback-method) endif() endif() diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt b/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..704002fc8e41064709fb34552514a79c2db1cc82 --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt @@ -0,0 +1,20 @@ +set(TARGET_NAME streamlist_python) + + +prepare_asapo() + +find_package (Python3 REQUIRED) + + +if (UNIX) + get_target_property(PYTHON_LIBS python-lib-consumer BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR) +else() + get_target_property(PYTHON_LIBS asapo_consumer BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR) +endif() + +add_script_test("${TARGET_NAME}" "${PYTHON_LIBS} ${PYTHON_LIBS_PRODUCER} ${Python3_EXECUTABLE}" nomem) + +configure_file(magic_producer.py magic_producer.py COPYONLY) + diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh b/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..678925935e45b108d355dd5755ddc33f24dc247f --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -e + +trap Cleanup EXIT + +endpoint=127.0.0.1:8400 +beamtime_id=asapo_test +token=$ASAPO_TEST_RW_TOKEN + +Cleanup() { + echo cleanup + nomad stop nginx + nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill + nomad stop broker + nomad stop discovery + nomad stop authorizer + nomad stop receiver + echo "db.dropDatabase()" | mongo ${beamtime_id}_source_1 + echo "db.dropDatabase()" | mongo ${beamtime_id}_source_2 +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd +nomad run receiver_tcp.nmd +nomad run authorizer.nmd + +sleep 1 + +export PYTHONPATH=$1:$2:${PYTHONPATH} +export Python3_EXECUTABLE=$3 + +$Python3_EXECUTABLE magic_producer.py $endpoint $beamtime_id $token > out +cat out +cat out | grep "5 : number of streams source_1: 5" +cat out | grep "5 : number of streams source_2: 5" diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py b/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py new file mode 100644 index 0000000000000000000000000000000000000000..97f38c68ed0dbc651f5609e3529604351e30730b --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py @@ -0,0 +1,42 @@ +from concurrent.futures import ThreadPoolExecutor +import threading +import asapo_producer +import asapo_consumer +from time import sleep +from datetime import datetime +import sys + +endpoint, beamtime, token = sys.argv[1:] + + +def dummy_producer(data_source): + def callback(header, err): + if err is not None: + print("could not sent: ", header, err) + # else: + # print("successfuly sent: ", header) + producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', data_source, '', 5, + 60000) # source type 'processed' to write to the core filesystem + producer.set_log_level("none") + consumer = asapo_consumer.create_consumer(endpoint, "", False, beamtime, data_source, token, 3000) + + for j in range(5): + stream = datetime.now().strftime('%Y%m%dT_%H%M%S') + for i in range(5): + producer.send(i + 1, data_source + "_" + stream + "_" + str(i), None, + callback=callback, + ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, + stream=stream) + sleep(0.6) + producer.send_stream_finished_flag(stream=stream, last_id=i + 1) + print(j + 1, ": number of streams", data_source + ": ", len(consumer.get_stream_list())) + + +def main(): + with ThreadPoolExecutor(max_workers=3) as executor: + task1 = executor.submit(dummy_producer, "source_1") + task2 = executor.submit(dummy_producer, "source_2") + + +if __name__ == '__main__': + main() diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 41564e5e5bf04551ec57bea271459074accdb875..957e6ff0dbeff68f1f56d880ffcfcc8a07f6b314 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -41,12 +41,12 @@ sleep 10 $1 $3 $data_source $beamtime_id "127.0.0.1:8400" &> out || cat out cat out -echo count successfully send, expect 13 -cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 13 +echo count successfully send, expect 15 +cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 15 echo count same id, expect 4 cat out | grep "already have record with same id" | wc -l | tee /dev/stderr | grep 4 -echo count duplicates, expect 4 -cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 4 +echo count duplicates, expect 6 +cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 6 echo count data in callback, expect 3 cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 3 echo check found local io error diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index c115da36fd80a77128f2248ac1202c360336c9ae..844e51b378243114eed735074a11e6dbb81e02dc 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -22,13 +22,13 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 13 || goto error +echo %NUM% | findstr 15 || goto error for /F %%N in ('find /C "} wrong input: Bad request: already have record with same id" ^< "out"') do set NUM=%%N echo %NUM% | findstr 2 || goto error for /F %%N in ('find /C "} server warning: ignoring duplicate record" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 1 || goto error +echo %NUM% | findstr 2 || goto error for /F %%N in ('find /C "} server warning: duplicated request" ^< "out"') do set NUM=%%N echo %NUM% | findstr 1 || goto error diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 2b419865037cb5cc806da97d3bff01743045c5df..879053cd9c71ab79e3fa5c8eeeebbce32df673a0 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -25,6 +25,11 @@ def assert_eq(val, expected, name): print('val: ', val, ' expected: ', expected) sys.exit(1) +class CallBackClass: + def callback(self, payload, err): + callback(payload,err) + +callback_object = CallBackClass() def callback(payload, err): lock.acquire() # to print @@ -159,6 +164,13 @@ else: print("should be AsapoRequestsPoolIsFull error ") sys.exit(1) +#stream_finished +producer.wait_requests_finished(10000) +producer.send_stream_finished_flag("stream", 2, next_stream = "next_stream", callback = callback) +# check callback_object.callback works, will be duplicated request +producer.send_stream_finished_flag("stream", 2, next_stream = "next_stream", callback = callback_object.callback) +producer.wait_requests_finished(10000) + #stream infos info = producer.stream_info() @@ -172,7 +184,8 @@ print("created: ",datetime.utcfromtimestamp(info['timestampCreated']/1000000000) print("last record: ",datetime.utcfromtimestamp(info['timestampLast']/1000000000).strftime('%Y-%m-%d %H:%M:%S.%f')) info = producer.stream_info('stream') -assert_eq(info['lastId'], 2, "last id from different stream") +assert_eq(info['lastId'], 3, "last id from different stream") +assert_eq(info['finished'], True, "stream finished") info_last = producer.last_stream() assert_eq(info_last['name'], "stream", "last stream") diff --git a/tests/manual/mongodb_performance/go.mod b/tests/manual/mongodb_performance/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..eb828f99573d51e8b9bfa728127e5466617f4c50 --- /dev/null +++ b/tests/manual/mongodb_performance/go.mod @@ -0,0 +1,5 @@ +module perf + +go 1.16 + +require go.mongodb.org/mongo-driver v1.5.1 diff --git a/tests/manual/mongodb_performance/go.sum b/tests/manual/mongodb_performance/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..1ec00898ead19b91f2b3f8ba6106c18a2c51205d --- /dev/null +++ b/tests/manual/mongodb_performance/go.sum @@ -0,0 +1,125 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.34.28 h1:sscPpn/Ns3i0F4HPEWAVcwdIRaZZCuL7llJ2/60yPIk= +github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= +github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= +github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg= +github.com/gobuffalo/envy v1.6.15/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= +github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= +github.com/gobuffalo/flect v0.1.0/go.mod h1:d2ehjJqGOH/Kjqcoz+F7jHTBbmDb38yXA598Hb50EGs= +github.com/gobuffalo/flect v0.1.1/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= +github.com/gobuffalo/flect v0.1.3/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= +github.com/gobuffalo/genny v0.0.0-20190329151137-27723ad26ef9/go.mod h1:rWs4Z12d1Zbf19rlsn0nurr75KqhYp52EAGGxTbBhNk= +github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e/go.mod h1:80lIj3kVJWwOrXWWMRzzdhW3DsrdjILVil/SFKBzF28= +github.com/gobuffalo/genny v0.1.0/go.mod h1:XidbUqzak3lHdS//TPu2OgiFB+51Ur5f7CSnXZ/JDvo= +github.com/gobuffalo/genny v0.1.1/go.mod h1:5TExbEyY48pfunL4QSXxlDOmdsD44RRq4mVZ0Ex28Xk= +github.com/gobuffalo/gitgen v0.0.0-20190315122116-cc086187d211/go.mod h1:vEHJk/E9DmhejeLeNt7UVvlSGv3ziL+djtTr3yyzcOw= +github.com/gobuffalo/gogen v0.0.0-20190315121717-8f38393713f5/go.mod h1:V9QVDIxsgKNZs6L2IYiGR8datgMhB577vzTDqypH360= +github.com/gobuffalo/gogen v0.1.0/go.mod h1:8NTelM5qd8RZ15VjQTFkAW6qOMx5wBbW4dSCS3BY8gg= +github.com/gobuffalo/gogen v0.1.1/go.mod h1:y8iBtmHmGc4qa3urIyo1shvOD8JftTtfcKi+71xfDNE= +github.com/gobuffalo/logger v0.0.0-20190315122211-86e12af44bc2/go.mod h1:QdxcLw541hSGtBnhUc4gaNIXRjiDppFGaDqzbrBd3v8= +github.com/gobuffalo/mapi v1.0.1/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc= +github.com/gobuffalo/mapi v1.0.2/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc= +github.com/gobuffalo/packd v0.0.0-20190315124812-a385830c7fc0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4= +github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4= +github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= +github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= +github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= +github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= +github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M= +github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= +github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w= +github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc= +github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +go.mongodb.org/mongo-driver v1.5.1 h1:9nOVLGDfOaZ9R0tBumx/BcuqkbFpyTCU2r/Po7A2azI= +go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190416151739-9c9e1878f421/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190420181800-aa740d480789/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tests/manual/mongodb_performance/perf b/tests/manual/mongodb_performance/perf new file mode 100755 index 0000000000000000000000000000000000000000..40103d1481d0a828eff4a7908ab4cc626316595a Binary files /dev/null and b/tests/manual/mongodb_performance/perf differ diff --git a/tests/manual/mongodb_performance/perf.go b/tests/manual/mongodb_performance/perf.go new file mode 100644 index 0000000000000000000000000000000000000000..947aded3e8229ed76d6c7d4dda3cebdd22eb3753 --- /dev/null +++ b/tests/manual/mongodb_performance/perf.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/writeconcern" + "go.mongodb.org/mongo-driver/x/bsonx" + "log" + "os" + "strconv" + "time" +) + +var client *mongo.Client + +var address = "localhost:27017" + +func connectDb() error { + opts := options.Client().SetConnectTimeout(20 * time.Second). + ApplyURI("mongodb://" + address).SetWriteConcern(writeconcern.New(writeconcern.J(false))) + var err error + client, err = mongo.NewClient(opts) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + err = client.Connect(ctx) + if err != nil { + client = nil + return err + } + return nil +} + +type MessageRecord struct { + ID int `bson:"id" json:"id"` + Timestamp int `bson:"timestamp" json:"timestamp"` + Name string `bson:"name" json:"name"` + Meta map[string]interface{} `json:"meta"` + NextStream string + FinishedStream bool +} + +func getRecord(id int) *MessageRecord { + var rec MessageRecord + rec.ID = id + rec.Name = "rec_" + strconv.Itoa(id) + rec.NextStream = "ns" + rec.FinishedStream = true + return &rec +} + + +func main() { + if err := connectDb(); err != nil { + log.Fatal(err) + } + ctx := context.Background() + + dbName := "test" + client.Database(dbName).Drop(ctx) + + nRecords, _ := strconv.Atoi(os.Args[1]) + nCollections, _ := strconv.Atoi(os.Args[2]) + + keysDoc := bsonx.Doc{} + keysDoc = keysDoc.Append("name", bsonx.Int32(1)). + Append("id", bsonx.Int32(1)) + + mod := mongo.IndexModel{ + Keys: keysDoc, Options: nil, + } + + _, err := client.Database(dbName).Collection("data").Indexes().CreateOne(ctx, mod) + if err != nil { + log.Fatal(err) + } + + start := time.Now() + for col := 0; col < nCollections; col++ { + collection := "col" + strconv.Itoa(col+1) + for i := 0; i < nRecords/nCollections; i++ { + rec := getRecord(i+1) + rec.Name = collection + _, err := client.Database(dbName).Collection("data").InsertOne(ctx, rec) + if err != nil { + log.Fatal(err) + } + } + } + elapsed := time.Since(start) + log.Printf("write %d records/sec", int(float64(nRecords)/elapsed.Seconds())) + +}