From 5377fa8e078e93f2d1e7e5f0211450aad323d270 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 22 Dec 2020 13:14:20 +0100 Subject: [PATCH] switch to ms --- CHANGELOG.md | 7 ++-- broker/src/asapo_broker/database/mongodb.go | 32 +++++++++---------- .../src/asapo_broker/database/mongodb_test.go | 16 +++++----- .../asapo_broker/server/get_commands_test.go | 2 +- broker/src/asapo_broker/server/get_next.go | 4 +-- .../api/cpp/include/asapo/consumer/consumer.h | 8 ++--- consumer/api/cpp/src/consumer_impl.cpp | 12 +++---- consumer/api/cpp/src/consumer_impl.h | 6 ++-- .../api/cpp/unittests/test_consumer_impl.cpp | 8 ++--- consumer/api/python/asapo_consumer.pxd | 4 +-- consumer/api/python/asapo_consumer.pyx.in | 8 ++--- .../pipeline/in_to_out_python/in_to_out.py | 2 +- .../dummy_data_producer.cpp | 10 +++--- examples/producer/simple-producer/produce.cpp | 2 +- .../api/cpp/include/asapo/producer/producer.h | 12 +++---- producer/api/cpp/src/producer.cpp | 4 +-- producer/api/cpp/src/producer_impl.cpp | 30 ++++++++--------- producer/api/cpp/src/producer_impl.h | 12 +++---- producer/api/cpp/unittests/test_producer.cpp | 12 +++---- .../api/cpp/unittests/test_producer_impl.cpp | 12 +++---- producer/api/python/asapo_producer.pxd | 6 ++-- producer/api/python/asapo_producer.pyx.in | 22 ++++++------- .../src/main_eventmon.cpp | 2 +- .../bugfix_callback.py | 2 +- .../send_recv_streams/send_recv_streams.cpp | 2 +- .../send_recv_streams.py | 2 +- tests/automatic/producer/aai/producer_aai.py | 2 +- .../beamtime_metadata/beamtime_metadata.cpp | 2 +- .../producer/python_api/producer_api.py | 2 +- tests/manual/producer_cpp/producer.cpp | 2 +- .../producer_wait_bug_mongo/test.py | 2 +- .../producer_wait_threads/producer_api.py | 4 +-- .../producer_wait_threads/test.py | 2 +- 33 files changed, 128 insertions(+), 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f06a6d2..6b7122210 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,11 +14,12 @@ FEATURES BREAKING CHANGES * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before * Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions -* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) - #### renaming - Producer API +* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) + #### renaming - general * stream -> data_source, substream -> stream +* use millisecond everywhere for timeout/delay + #### renaming - Producer API #### renaming - Consumer API -* stream -> data_source, substream -> stream * broker -> consumer BUG FIXES diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index d0005e7df..478139680 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -35,14 +35,14 @@ type InProcessingRecord struct { ID int `bson:"_id" json:"_id"` MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelaySec int64 `bson:"delaySec" json:"delaySec"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } type NegAckParamsRecord struct { ID int `bson:"_id" json:"_id"` MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelaySec int64 `bson:"delaySec" json:"delaySec"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } @@ -321,7 +321,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { input := struct { Id int Params struct { - DelaySec int + DelayMs int } }{} @@ -330,7 +330,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelaySec, 1) + err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelayMs, 1) return []byte(""), err } @@ -386,18 +386,18 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err return curPointer, max_ind, nil } -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delaySec int,nResendAttempts int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int,nResendAttempts int) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) - tNow := time.Now().Unix() + tNow := time.Now().UnixNano() var update bson.M if nResendAttempts==0 { - update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} } else { - update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} } - q := bson.M{"delaySec": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} + q := bson.M{"delayMs": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} c := db.client.Database(dbname).Collection(collection_name) err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { @@ -412,9 +412,9 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay return res.ID, nil } -func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delaySec int, nResendAttempts int) error { +func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delayMs int, nResendAttempts int) error { record := InProcessingRecord{ - id, nResendAttempts, 0,time.Now().Unix()+int64(delaySec), + id, nResendAttempts, 0,time.Now().UnixNano()+int64(delayMs*1e6), } c := db.client.Database(db_name).Collection(collection_name) @@ -429,20 +429,20 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str if len(extra_param) == 0 { return nil } - delaySec, nResendAttempts, err := extractsTwoIntsFromString(extra_param) + delayMs, nResendAttempts, err := extractsTwoIntsFromString(extra_param) if err != nil { return err } - return db.InsertRecordToInprocess(db_name,collection_name,id,delaySec, nResendAttempts) + return db.InsertRecordToInprocess(db_name,collection_name,id,delayMs, nResendAttempts) } func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { - var record_ind, max_ind, delaySec, nResendAttempts int + var record_ind, max_ind, delayMs, nResendAttempts int var err error if len(request.ExtraParam) != 0 { - delaySec, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) + delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) if err != nil { return 0, 0, err } @@ -451,7 +451,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delaySec,nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs,nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index d9fdf51c4..3acab2a69 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -902,10 +902,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -920,10 +920,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -987,11 +987,11 @@ func TestMongoDBNegAck(t *testing.T) { inputParams := struct { Id int Params struct { - DelaySec int + DelayMs int } }{} inputParams.Id = 1 - inputParams.Params.DelaySec = 0 + inputParams.Params.DelayMs = 0 db.insertRecord(dbname, collection, &rec1) db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index a464c8184..e4db0514b 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -46,7 +46,7 @@ var testsGetCommand = []struct { {"nacks", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"}, {"next", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""}, {"next", expectedStream, expectedGroupID, expectedStream + "/" + - expectedGroupID + "/next","&resend_nacks=true&delay_sec=10&resend_attempts=3","10_3"}, + expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"}, {"size", expectedStream, "", expectedStream + "/size","","0"}, {"streams", "0", "", "0/streams","",""}, {"lastack", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""}, diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 9e588fb9a..3f051f57e 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -7,11 +7,11 @@ import ( func extractResend(r *http.Request) (string) { keys := r.URL.Query() resend := keys.Get("resend_nacks") - delay_sec := keys.Get("delay_sec") + delay_ms := keys.Get("delay_ms") resend_attempts := keys.Get("resend_attempts") resend_params := "" if len(resend)!=0 { - resend_params=delay_sec+"_"+resend_attempts + resend_params=delay_ms+"_"+resend_attempts } return resend_params } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 92531d380..12a07f4c5 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -38,11 +38,11 @@ class Consumer { /*! \param group_id - group id to use. \param id - data tuple id - \param delay_sec - data tuple will be redelivered after delay, 0 to redeliver immediately + \param delay_ms - data tuple will be redelivered after delay, 0 to redeliver immediately \param stream (optional) - stream \return nullptr of command was successful, otherwise error. */ - virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, + virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream = kDefaultStream) = 0; @@ -191,10 +191,10 @@ class Consumer { //! Configure resending nonacknowledged data /*! \param resend - where to resend - \param delay_sec - how many seconds to wait before resending + \param delay_ms - how many milliseconds to wait before resending \param resend_attempts - how many resend attempts to make */ - virtual void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) = 0; + virtual void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) = 0; //! Will try to interrupt current long runnung operations (mainly needed to exit waiting loop in C from Python) virtual void InterruptCurrentOperation() = 0; diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index f6523a137..575460028 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -260,8 +260,8 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + "/" + group_id + "/" + request_suffix, dataset, min_size); if (request_suffix == "next" && resend_) { - ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_sec=" + - std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_); + ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); @@ -832,22 +832,22 @@ uint64_t ConsumerImpl::GetLastAcknowledgedTulpeId(std::string group_id, Error* e return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultStream, error); } -void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) { +void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) { resend_ = resend; - delay_sec_ = delay_sec; + delay_ms_ = delay_ms; resend_attempts_ = resend_attempts; } Error ConsumerImpl::NegativeAcknowledge(std::string group_id, uint64_t id, - uint64_t delay_sec, + uint64_t delay_ms, std::string stream) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; - ri.body = R"({"Op":"negackimage","Params":{"DelaySec":)" + std::to_string(delay_sec) + "}}"; + ri.body = R"({"Op":"negackimage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; Error err; BrokerRequestWithTimeout(ri, &err); diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 3d37a7962..59266f2ce 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -57,7 +57,7 @@ class ConsumerImpl final : public asapo::Consumer { SourceCredentials source); Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) override; - Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, + Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream = kDefaultStream) override; IdList GetUnacknowledgedTupleIds(std::string group_id, @@ -112,7 +112,7 @@ class ConsumerImpl final : public asapo::Consumer { Error RetrieveData(FileInfo* info, FileData* data) override; StreamInfos GetStreamList(std::string from, Error* err) override; - void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) override; + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; @@ -168,7 +168,7 @@ class ConsumerImpl final : public asapo::Consumer { RequestInfo CreateFileTransferRequest(const FileInfo* info) const; uint64_t resend_timout_ = 0; bool resend_ = false; - uint64_t delay_sec_; + uint64_t delay_ms_; uint64_t resend_attempts_; std::atomic<bool> interrupt_flag_{ false}; }; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 6ad5265a3..9fb1918c8 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1341,19 +1341,19 @@ TEST_F(ConsumerImplTests, ResendNacks) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/next?token=" - + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _, + + expected_token + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - consumer->SetResendNacs(true, 10, 3); + consumer->SetResendNacs(true, 10000, 3); consumer->GetNext(&info, expected_group_id, nullptr); } TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { MockGetBrokerUri(); - auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})"; + auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelayMs":10000}})"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/" + expected_group_id @@ -1364,7 +1364,7 @@ TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { SetArgPointee<4>(nullptr), Return(""))); - auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10, expected_stream); + auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10000, expected_stream); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index c855a961b..ea868f8cb 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -69,7 +69,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: Error SetLastReadMarker(uint64_t value, string group_id, string stream) Error ResetLastReadMarker(string group_id, string stream) Error Acknowledge(string group_id, uint64_t id, string stream) - Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_sec, string stream) + Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_ms, string stream) uint64_t GetLastAcknowledgedTulpeId(string group_id, string stream, Error* error) IdList GetUnacknowledgedTupleIds(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) string GenerateNewGroupId(Error* err) @@ -80,7 +80,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id, string stream, uint64_t min_size, Error* err) Error RetrieveData(FileInfo* info, FileData* data) vector[StreamInfo] GetStreamList(string from_stream, Error* err) - void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 214e3b63e..f56c264bf 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -229,17 +229,17 @@ cdef class PyConsumer: err = self.c_consumer.get().Acknowledge(b_group_id,id,b_stream) if err: throw_exception(err) - def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, stream = "default"): + def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_ms, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) cdef Error err with nogil: - err = self.c_consumer.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_stream) + err = self.c_consumer.get().NegativeAcknowledge(b_group_id,id,delay_ms,b_stream) if err: throw_exception(err) - def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts): + def set_resend_nacs(self,bool resend, uint64_t delay_ms, uint64_t resend_attempts): with nogil: - self.c_consumer.get().SetResendNacs(resend,delay_sec,resend_attempts) + self.c_consumer.get().SetResendNacs(resend,delay_ms,resend_attempts) def get_last_acknowledged_tuple_id(self, group_id, stream = "default"): cdef string b_group_id = _bytes(group_id) diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 7534a5fdc..45943a99b 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -28,7 +28,7 @@ transfer_data=int(transfer_data)>0 consumer = asapo_consumer.create_consumer(source,path, True,beamtime,stream_in,token,timeout_s*1000) -producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', stream_out, token, nthreads, 600) +producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', stream_out, token, nthreads, 600000) group_id = consumer.generate_group_id() diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index ed391bfc5..3e67b04ce 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -24,7 +24,7 @@ struct Args { uint64_t iterations; uint64_t nthreads; uint64_t mode; - uint64_t timeout_sec; + uint64_t timeout_ms; uint64_t images_in_set; }; @@ -38,7 +38,7 @@ void PrintCommandArguments(const Args& args) { << "Write files: " << ((args.mode %100) / 10 == 1) << std::endl << "Tcp mode: " << ((args.mode % 10) ==0 ) << std::endl << "Raw: " << (args.mode / 100 == 1)<< std::endl - << "timeout: " << args.timeout_sec << std::endl + << "timeout: " << args.timeout_ms << std::endl << "images in set: " << args.images_in_set << std::endl << std::endl; } @@ -86,7 +86,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { args->iterations = std::stoull(argv[4]); args->nthreads = std::stoull(argv[5]); args->mode = std::stoull(argv[6]); - args->timeout_sec = std::stoull(argv[7]); + args->timeout_ms = std::stoull(argv[7])*1000; if (argc == 9) { args->images_in_set = std::stoull(argv[8]); } else { @@ -188,7 +188,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { asapo::Error err; auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, - asapo::SourceCredentials{args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw,args.beamtime_id, "", args.data_source, args.token }, 3600, &err); + asapo::SourceCredentials{args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw,args.beamtime_id, "", args.data_source, args.token }, 3600000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); @@ -228,7 +228,7 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000); + auto err = producer->WaitRequestsFinished(args.timeout_ms); if (err) { std::cerr << "Producer exit on timeout " << std::endl; exit(EXIT_FAILURE); diff --git a/examples/producer/simple-producer/produce.cpp b/examples/producer/simple-producer/produce.cpp index 7986c4bfa..5ddfe5d41 100644 --- a/examples/producer/simple-producer/produce.cpp +++ b/examples/producer/simple-producer/produce.cpp @@ -26,7 +26,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto producer = asapo::Producer::Create(source, 1, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{beamtime, "", "", ""}, 60, &err); + asapo::SourceCredentials{beamtime, "", "", ""}, 60000, &err); exit_if_error("Cannot start producer", err); std::string to_send = "hello"; diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 272a4f75b..334b4ae72 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -18,7 +18,7 @@ class Producer { */ static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type, SourceCredentials source_cred, - uint64_t timeout_sec, + uint64_t timeout_ms, Error* err); virtual ~Producer() = default; @@ -26,18 +26,18 @@ class Producer { //! Get stream information from receiver /*! \param stream (optional) - stream - \param timeout_sec - operation timeout in seconds + \param timeout_ms - operation timeout in milliseconds \return StreamInfo - a structure with stream information */ - virtual StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const = 0; - virtual StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; //! Get stream that has the newest ingested data /*! - \param timeout_ms - operation timeout in seconds + \param timeout_ms - operation timeout in milliseconds \return StreamInfo - a structure with stream information */ - virtual StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) const = 0; + virtual StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const = 0; //! Sends data to the receiver diff --git a/producer/api/cpp/src/producer.cpp b/producer/api/cpp/src/producer.cpp index 63a2d488b..cb94f8d0c 100644 --- a/producer/api/cpp/src/producer.cpp +++ b/producer/api/cpp/src/producer.cpp @@ -3,7 +3,7 @@ #include "asapo/producer/producer_error.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, - asapo::RequestHandlerType type, SourceCredentials source_cred, uint64_t timeout_sec, Error* err) { + asapo::RequestHandlerType type, SourceCredentials source_cred, uint64_t timeout_ms, Error* err) { if (n_processing_threads > kMaxProcessingThreads || n_processing_threads == 0) { *err = ProducerErrorTemplates::kWrongInput.Generate("Set number of processing threads > 0 and <= " + std::to_string( @@ -13,7 +13,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp std::unique_ptr<asapo::Producer> producer; try { - producer.reset(new ProducerImpl(endpoint, n_processing_threads, timeout_sec, type)); + producer.reset(new ProducerImpl(endpoint, n_processing_threads, timeout_ms, type)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index bd826db67..24bdea3c4 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -19,9 +19,9 @@ const std::string ProducerImpl::kFinishStreamKeyword = "asapo_finish_stream"; const std::string ProducerImpl::kNoNextStreamKeyword = "asapo_no_next"; -ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_sec, +ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms, asapo::RequestHandlerType type): - log__{GetDefaultProducerLogger()}, timeout_sec_{timeout_sec} { + log__{GetDefaultProducerLogger()}, timeout_ms_{timeout_ms} { switch (type) { case RequestHandlerType::kTcp: discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); @@ -110,7 +110,7 @@ Error ProducerImpl::Send(const EventHeader& event_header, auto request_header = GenerateNextSendRequest(event_header, std::move(stream), ingest_mode); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_sec_ * 1000} + std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_ms_} }); } @@ -221,7 +221,7 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca FileData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), "", "", callback, true, timeout_sec_} + std::move(data), "", "", callback, true, timeout_ms_} }); } @@ -302,9 +302,9 @@ void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, Re } catch(...) {} } -StreamInfo GetInfoFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_sec, Error* err) { +StreamInfo GetInfoFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_ms, Error* err) { try { - auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_sec * 1000)); + auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_ms)); if (status == std::future_status::ready) { auto res = promiseResult->get(); if (res.err == nullptr) { @@ -330,7 +330,7 @@ GenericRequestHeader CreateRequestHeaderFromOp(StreamRequestOp op,std::string st } } -StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, uint64_t timeout_sec, Error* err) const { +StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, uint64_t timeout_ms, Error* err) const { auto header = CreateRequestHeaderFromOp(op,stream); std::unique_ptr<std::promise<StreamInfoResult>> promise {new std::promise<StreamInfoResult>}; std::future<StreamInfoResult> promiseResult = promise->get_future(); @@ -338,25 +338,25 @@ StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, ui *err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(header), nullptr, "", "", unwrap_callback(ActivatePromise, std::move(promise)), true, - timeout_sec * 1000} + timeout_ms} }, true); if (*err) { return StreamInfo{}; } - return GetInfoFromCallback(&promiseResult, timeout_sec + 2, + return GetInfoFromCallback(&promiseResult, timeout_ms + 2000, err); // we give two more sec for request to exit by timeout } -StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const { - return StreamRequest(StreamRequestOp::kStreamInfo,stream,timeout_sec,err); +StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const { + return StreamRequest(StreamRequestOp::kStreamInfo,stream,timeout_ms,err); } -StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_sec, Error* err) const { - return GetStreamInfo(kDefaultStream, timeout_sec, err); +StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_ms, Error* err) const { + return GetStreamInfo(kDefaultStream, timeout_ms, err); } -StreamInfo ProducerImpl::GetLastStream(uint64_t timeout_sec, Error* err) const { - return StreamRequest(StreamRequestOp::kLastStream,"",timeout_sec,err); +StreamInfo ProducerImpl::GetLastStream(uint64_t timeout_ms, Error* err) const { + return StreamRequest(StreamRequestOp::kLastStream,"",timeout_ms,err); } uint64_t ProducerImpl::GetRequestsQueueVolumeMb() { diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 71a524dbf..a2380040f 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -27,14 +27,14 @@ class ProducerImpl : public Producer { static const std::string kFinishStreamKeyword; static const std::string kNoNextStreamKeyword; - explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_sec, + explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms, asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl &) = delete; ProducerImpl &operator=(const ProducerImpl &) = delete; - StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const override; - StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const override; - StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) const override; + StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const override; + StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const override; + StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const override; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; @@ -69,14 +69,14 @@ class ProducerImpl : public Producer { uint64_t GetRequestsQueueVolumeMb() override; void SetRequestsQueueLimits(uint64_t size, uint64_t volume) override; private: - StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_sec, Error* err) const; + StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const; Error Send(const EventHeader &event_header, std::string stream, FileData data, std::string full_path, uint64_t ingest_mode, RequestCallback callback, bool manage_data_memory); GenericRequestHeader GenerateNextSendRequest(const EventHeader &event_header, std::string stream, uint64_t ingest_mode); std::string source_cred_string_; - uint64_t timeout_sec_; + uint64_t timeout_ms_; }; struct StreamInfoResult { diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index da219ba91..4fc2319b1 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -15,7 +15,7 @@ namespace { TEST(CreateProducer, TcpProducer) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } @@ -24,7 +24,7 @@ TEST(CreateProducer, ErrorBeamtime) { asapo::Error err; std::string expected_beamtimeid(asapo::kMaxMessageSize * 10, 'a'); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,expected_beamtimeid, "", "", ""}, 3600, &err); + SourceCredentials{asapo::SourceType::kRaw,expected_beamtimeid, "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -33,7 +33,7 @@ TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { asapo::SourceCredentials creds{asapo::SourceType::kRaw,"auto", "auto", "subname", "token"}; asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - creds, 3600, &err); + creds, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -41,7 +41,7 @@ TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, - asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -50,7 +50,7 @@ TEST(CreateProducer, TooManyThreads) { TEST(CreateProducer, ZeroThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0, - asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -59,7 +59,7 @@ TEST(CreateProducer, ZeroThreads) { TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, + SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); asapo::EventHeader event_header{1, 1, "test"}; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 5cd3bf882..81c86cc3c 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -52,7 +52,7 @@ MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_ } TEST(ProducerImpl, Constructor) { - asapo::ProducerImpl producer{"", 4, 3600, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 4, 3600000, asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); } @@ -63,7 +63,7 @@ class ProducerImplTests : public testing::Test { asapo::ProducerRequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; - asapo::ProducerImpl producer{"", 1, 3600, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 1, 3600000, asapo::RequestHandlerType::kTcp}; uint64_t expected_size = 100; uint64_t expected_id = 10; uint64_t expected_subset_id = 100; @@ -476,14 +476,14 @@ TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { Return(nullptr)); asapo::Error err; - producer.GetStreamInfo(expected_stream, 1, &err); + producer.GetStreamInfo(expected_stream, 1000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } TEST(GetStreamInfoTest, GetStreamInfoTimeout) { - asapo::ProducerImpl producer1{"", 1, 10, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer1{"", 1, 10000, asapo::RequestHandlerType::kTcp}; asapo::Error err; - auto sinfo = producer1.GetStreamInfo(5, &err); + auto sinfo = producer1.GetStreamInfo(5000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); @@ -497,7 +497,7 @@ TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { Return(nullptr)); asapo::Error err; - producer.GetLastStream(1, &err); + producer.GetLastStream(1000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index a87583117..892766c40 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -103,7 +103,7 @@ cdef extern from "asapo_wrappers.h" namespace "asapo": cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod - unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_sec, Error* error) + unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_ms, Error* error) Error SendFile(const EventHeader& event_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback) Error SendData__(const EventHeader& event_header, string stream, void* data, uint64_t ingest_mode,RequestCallback callback) void StopThreads__() @@ -111,8 +111,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: uint64_t GetRequestsQueueSize() Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) - StreamInfo GetStreamInfo(string stream, uint64_t timeout_sec, Error* err) - StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) + StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) + StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) cdef extern from "asapo/asapo_producer.h" namespace "asapo": diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index e77bafcb7..a0814c3aa 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -198,11 +198,11 @@ cdef class PyProducer: if err: throw_exception(err) - def stream_info(self, stream = 'default', uint64_t timeout_sec = 1): + def stream_info(self, stream = 'default', uint64_t timeout_ms = 1000): """ :param stream: stream name :type stream: string - :param timeout_sec: timeout in seconds + :param timeout_ms: timeout in milliseconds :type timeout_ms: int :raises: AsapoWrongInputError: wrong input (authorization, ...) @@ -213,12 +213,12 @@ cdef class PyProducer: cdef StreamInfo info cdef string b_stream = _bytes(stream) with nogil: - info = self.c_producer.get().GetStreamInfo(b_stream,timeout_sec,&err) + info = self.c_producer.get().GetStreamInfo(b_stream,timeout_ms,&err) if err: throw_exception(err) return json.loads(_str(info.Json(True))) - def last_stream(self, uint64_t timeout_sec = 1): + def last_stream(self, uint64_t timeout_ms = 1000): """ :param timeout_ms: timeout in seconds :type timeout_ms: int @@ -230,7 +230,7 @@ cdef class PyProducer: cdef Error err cdef StreamInfo info with nogil: - info = self.c_producer.get().GetLastStream(timeout_sec,&err) + info = self.c_producer.get().GetLastStream(timeout_ms,&err) if err: throw_exception(err) return json.loads(_str(info.Json(True))) @@ -320,7 +320,7 @@ cdef class PyProducer: if self.c_producer.get() is not NULL: self.c_producer.get().StopThreads__() @staticmethod - def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): + def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): pyProd = PyProducer() cdef Error err cdef SourceType source_type @@ -333,12 +333,12 @@ cdef class PyProducer: source.user_token = token source.data_source = data_source source.type = source_type - pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_sec,&err) + pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_ms,&err) if err: throw_exception(err) return pyProd -def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): +def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): """ :param endpoint: server endpoint (url:port) :type endpoint: string @@ -354,13 +354,13 @@ def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthread :type token: string :param nthreads: ingest mode flag :type nthreads: int - :param timeout_sec: send requests timeout - :type timeout_sec: int + :param timeout_ms: send requests timeout in milliseconds + :type timeout_ms: int :raises: AsapoWrongInputError: wrong input (number of threads, ,,,) AsapoProducerError: actually should not happen """ - return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_sec) + return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_ms) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 9f2b9eac2..4436f6dca 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -39,7 +39,7 @@ std::unique_ptr<Producer> CreateProducer() { Error err; auto producer = Producer::Create(config->asapo_endpoint, (uint8_t) config->nthreads, - config->mode, asapo::SourceCredentials{asapo::SourceType::kProcessed,config->beamtime_id, "", config->data_source, ""}, 3600, &err); + config->mode, asapo::SourceCredentials{asapo::SourceType::kProcessed,config->beamtime_id, "", config->data_source, ""}, 3600000, &err); if(err) { std::cerr << "cannot create producer: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index 060f2edf4..f99966537 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -25,7 +25,7 @@ class AsapoSender: def _callback(self, header, err): print ("hello self callback") -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("debug") sender = AsapoSender(producer) diff --git a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp index ed96f1484..4ebb3a884 100644 --- a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp +++ b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp @@ -61,7 +61,7 @@ ProducerPtr CreateProducer(const Args& args) { auto producer = asapo::Producer::Create(args.server, 1, asapo::RequestHandlerType::kTcp, asapo::SourceCredentials{asapo::SourceType::kProcessed, - args.beamtime_id, "", "", args.token }, 60, &err); + args.beamtime_id, "", "", args.token }, 60000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py index eea7495bc..3949f00b2 100644 --- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py +++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py @@ -26,7 +26,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] consumer = asapo_consumer.create_consumer(source,".",True, beamtime,"",token,timeout) -producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600) +producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600000) producer.set_log_level("debug") group_id = consumer.generate_group_id() diff --git a/tests/automatic/producer/aai/producer_aai.py b/tests/automatic/producer/aai/producer_aai.py index 29e405738..4c4cdf41f 100644 --- a/tests/automatic/producer/aai/producer_aai.py +++ b/tests/automatic/producer/aai/producer_aai.py @@ -26,7 +26,7 @@ def callback(header,err): lock.release() -producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60) +producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60000) producer.set_log_level("debug") diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp index 4d9fff49c..15ac3fbeb 100644 --- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp +++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp @@ -70,7 +70,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, asapo::SourceCredentials{asapo::SourceType::kProcessed, - args.beamtime_id, "", "", ""}, 60, &err); + args.beamtime_id, "", "", ""}, 60000, &err); if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 0c484944d..3f6e54555 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -37,7 +37,7 @@ def callback(payload, err): lock.release() -producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60) +producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60000) producer.set_log_level("debug") diff --git a/tests/manual/producer_cpp/producer.cpp b/tests/manual/producer_cpp/producer.cpp index cea384460..25c7b06e4 100644 --- a/tests/manual/producer_cpp/producer.cpp +++ b/tests/manual/producer_cpp/producer.cpp @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto producer = asapo::Producer::Create(endpoint, 1,asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "", ""}, 60, &err); + asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "", ""}, 60000, &err); exit_if_error("Cannot start producer", err); uint32_t eventid = 1; diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py index 32bbff83b..145c27e04 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("debug") diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py index c6011bbc1..0175bb386 100644 --- a/tests/manual/python_tests/producer_wait_threads/producer_api.py +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,'processed',beamtime, 'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime, 'auto', data_source, token, nthreads, 600000) producer.set_log_level("info") @@ -63,7 +63,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, 0, 600) + producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, 0, 600000) except Exception as Asapo: print(e) else: diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py index b832ef275..bd6cdcc23 100644 --- a/tests/manual/python_tests/producer_wait_threads/test.py +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("info") -- GitLab