diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f06a6d28ce77b426bc3cc1e060750660e8c355..6b7122210c8609028f9b47802223379855e81761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,11 +14,12 @@ FEATURES BREAKING CHANGES * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before * Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions -* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) - #### renaming - Producer API +* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) + #### renaming - general * stream -> data_source, substream -> stream +* use millisecond everywhere for timeout/delay + #### renaming - Producer API #### renaming - Consumer API -* stream -> data_source, substream -> stream * broker -> consumer BUG FIXES diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index d0005e7dffb3066659c4cfedd4d6967ad083429b..47813968057f50bd2af934fc74477f7917d59f18 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -35,14 +35,14 @@ type InProcessingRecord struct { ID int `bson:"_id" json:"_id"` MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelaySec int64 `bson:"delaySec" json:"delaySec"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } type NegAckParamsRecord struct { ID int `bson:"_id" json:"_id"` MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelaySec int64 `bson:"delaySec" json:"delaySec"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } @@ -321,7 +321,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { input := struct { Id int Params struct { - DelaySec int + DelayMs int } }{} @@ -330,7 +330,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelaySec, 1) + err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelayMs, 1) return []byte(""), err } @@ -386,18 +386,18 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err return curPointer, max_ind, nil } -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delaySec int,nResendAttempts int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int,nResendAttempts int) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) - tNow := time.Now().Unix() + tNow := time.Now().UnixNano() var update bson.M if nResendAttempts==0 { - update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} } else { - update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} } - q := bson.M{"delaySec": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} + q := bson.M{"delayMs": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} c := db.client.Database(dbname).Collection(collection_name) err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { @@ -412,9 +412,9 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay return res.ID, nil } -func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delaySec int, nResendAttempts int) error { +func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delayMs int, nResendAttempts int) error { record := InProcessingRecord{ - id, nResendAttempts, 0,time.Now().Unix()+int64(delaySec), + id, nResendAttempts, 0,time.Now().UnixNano()+int64(delayMs*1e6), } c := db.client.Database(db_name).Collection(collection_name) @@ -429,20 +429,20 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str if len(extra_param) == 0 { return nil } - delaySec, nResendAttempts, err := extractsTwoIntsFromString(extra_param) + delayMs, nResendAttempts, err := extractsTwoIntsFromString(extra_param) if err != nil { return err } - return db.InsertRecordToInprocess(db_name,collection_name,id,delaySec, nResendAttempts) + return db.InsertRecordToInprocess(db_name,collection_name,id,delayMs, nResendAttempts) } func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { - var record_ind, max_ind, delaySec, nResendAttempts int + var record_ind, max_ind, delayMs, nResendAttempts int var err error if len(request.ExtraParam) != 0 { - delaySec, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) + delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) if err != nil { return 0, 0, err } @@ -451,7 +451,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delaySec,nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs,nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index d9fdf51c4550b220b32f5412ecf2d0e7a7b45a61..3acab2a690a422d73902c93a28c7df548332128b 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -902,10 +902,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -920,10 +920,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -987,11 +987,11 @@ func TestMongoDBNegAck(t *testing.T) { inputParams := struct { Id int Params struct { - DelaySec int + DelayMs int } }{} inputParams.Id = 1 - inputParams.Params.DelaySec = 0 + inputParams.Params.DelayMs = 0 db.insertRecord(dbname, collection, &rec1) db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index a464c8184181398a65f1ed28ca5e37ddcf1ef710..e4db0514b53c85393ce47e55c7f7de02e99da6ae 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -46,7 +46,7 @@ var testsGetCommand = []struct { {"nacks", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"}, {"next", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""}, {"next", expectedStream, expectedGroupID, expectedStream + "/" + - expectedGroupID + "/next","&resend_nacks=true&delay_sec=10&resend_attempts=3","10_3"}, + expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"}, {"size", expectedStream, "", expectedStream + "/size","","0"}, {"streams", "0", "", "0/streams","",""}, {"lastack", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""}, diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 9e588fb9a73da679beffef9598aa9aa8d5df61d1..3f051f57e72398d1eeb8222fecbed005de4dc3f0 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -7,11 +7,11 @@ import ( func extractResend(r *http.Request) (string) { keys := r.URL.Query() resend := keys.Get("resend_nacks") - delay_sec := keys.Get("delay_sec") + delay_ms := keys.Get("delay_ms") resend_attempts := keys.Get("resend_attempts") resend_params := "" if len(resend)!=0 { - resend_params=delay_sec+"_"+resend_attempts + resend_params=delay_ms+"_"+resend_attempts } return resend_params } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 92531d380b119e0b199e7ac3aaa1ff3487a7e311..12a07f4c5e7ee098b1f9c6802e52b91244a3c100 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -38,11 +38,11 @@ class Consumer { /*! \param group_id - group id to use. \param id - data tuple id - \param delay_sec - data tuple will be redelivered after delay, 0 to redeliver immediately + \param delay_ms - data tuple will be redelivered after delay, 0 to redeliver immediately \param stream (optional) - stream \return nullptr of command was successful, otherwise error. */ - virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, + virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream = kDefaultStream) = 0; @@ -191,10 +191,10 @@ class Consumer { //! Configure resending nonacknowledged data /*! \param resend - where to resend - \param delay_sec - how many seconds to wait before resending + \param delay_ms - how many milliseconds to wait before resending \param resend_attempts - how many resend attempts to make */ - virtual void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) = 0; + virtual void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) = 0; //! Will try to interrupt current long runnung operations (mainly needed to exit waiting loop in C from Python) virtual void InterruptCurrentOperation() = 0; diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index f6523a137cdfa4c2c1af61244495c0adf3cf94c2..575460028eb76cb27a3056abc0af5e4f8b9b5b49 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -260,8 +260,8 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + "/" + group_id + "/" + request_suffix, dataset, min_size); if (request_suffix == "next" && resend_) { - ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_sec=" + - std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_); + ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); @@ -832,22 +832,22 @@ uint64_t ConsumerImpl::GetLastAcknowledgedTulpeId(std::string group_id, Error* e return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultStream, error); } -void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) { +void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) { resend_ = resend; - delay_sec_ = delay_sec; + delay_ms_ = delay_ms; resend_attempts_ = resend_attempts; } Error ConsumerImpl::NegativeAcknowledge(std::string group_id, uint64_t id, - uint64_t delay_sec, + uint64_t delay_ms, std::string stream) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; - ri.body = R"({"Op":"negackimage","Params":{"DelaySec":)" + std::to_string(delay_sec) + "}}"; + ri.body = R"({"Op":"negackimage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; Error err; BrokerRequestWithTimeout(ri, &err); diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 3d37a79624c2c92dae6085bde9ccb6587f7e0b46..59266f2ce36cbb2754055ecf3e2b9c227106f572 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -57,7 +57,7 @@ class ConsumerImpl final : public asapo::Consumer { SourceCredentials source); Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) override; - Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, + Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream = kDefaultStream) override; IdList GetUnacknowledgedTupleIds(std::string group_id, @@ -112,7 +112,7 @@ class ConsumerImpl final : public asapo::Consumer { Error RetrieveData(FileInfo* info, FileData* data) override; StreamInfos GetStreamList(std::string from, Error* err) override; - void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) override; + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; @@ -168,7 +168,7 @@ class ConsumerImpl final : public asapo::Consumer { RequestInfo CreateFileTransferRequest(const FileInfo* info) const; uint64_t resend_timout_ = 0; bool resend_ = false; - uint64_t delay_sec_; + uint64_t delay_ms_; uint64_t resend_attempts_; std::atomic<bool> interrupt_flag_{ false}; }; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 6ad5265a35f289ee6b4da8103dfbe5883fce7ca8..9fb1918c89673d5d18c7750a700fc553f0d19de6 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1341,19 +1341,19 @@ TEST_F(ConsumerImplTests, ResendNacks) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/next?token=" - + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _, + + expected_token + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - consumer->SetResendNacs(true, 10, 3); + consumer->SetResendNacs(true, 10000, 3); consumer->GetNext(&info, expected_group_id, nullptr); } TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { MockGetBrokerUri(); - auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})"; + auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelayMs":10000}})"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/" + expected_group_id @@ -1364,7 +1364,7 @@ TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { SetArgPointee<4>(nullptr), Return(""))); - auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10, expected_stream); + auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10000, expected_stream); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index c855a961bcad42948a0a7828de19a8767b6ea039..ea868f8cb387d6caf7f6c841914b31b022bfe753 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -69,7 +69,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: Error SetLastReadMarker(uint64_t value, string group_id, string stream) Error ResetLastReadMarker(string group_id, string stream) Error Acknowledge(string group_id, uint64_t id, string stream) - Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_sec, string stream) + Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_ms, string stream) uint64_t GetLastAcknowledgedTulpeId(string group_id, string stream, Error* error) IdList GetUnacknowledgedTupleIds(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) string GenerateNewGroupId(Error* err) @@ -80,7 +80,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id, string stream, uint64_t min_size, Error* err) Error RetrieveData(FileInfo* info, FileData* data) vector[StreamInfo] GetStreamList(string from_stream, Error* err) - void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 214e3b63ebbe5766c78633330090a1670447f587..f56c264bf66fd1e3cb04132941535b5ff4a18264 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -229,17 +229,17 @@ cdef class PyConsumer: err = self.c_consumer.get().Acknowledge(b_group_id,id,b_stream) if err: throw_exception(err) - def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, stream = "default"): + def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_ms, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) cdef Error err with nogil: - err = self.c_consumer.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_stream) + err = self.c_consumer.get().NegativeAcknowledge(b_group_id,id,delay_ms,b_stream) if err: throw_exception(err) - def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts): + def set_resend_nacs(self,bool resend, uint64_t delay_ms, uint64_t resend_attempts): with nogil: - self.c_consumer.get().SetResendNacs(resend,delay_sec,resend_attempts) + self.c_consumer.get().SetResendNacs(resend,delay_ms,resend_attempts) def get_last_acknowledged_tuple_id(self, group_id, stream = "default"): cdef string b_group_id = _bytes(group_id) diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 7534a5fdc1b9806324bdade16c0ab479b0e7ddfe..45943a99b129afa6b3c39ef4b104022d5954ea4d 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -28,7 +28,7 @@ transfer_data=int(transfer_data)>0 consumer = asapo_consumer.create_consumer(source,path, True,beamtime,stream_in,token,timeout_s*1000) -producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', stream_out, token, nthreads, 600) +producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', stream_out, token, nthreads, 600000) group_id = consumer.generate_group_id() diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index ed391bfc52cd2025cb6932737c8907f18e5b42b3..3e67b04ce4513720409949d1e6bde569a3213523 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -24,7 +24,7 @@ struct Args { uint64_t iterations; uint64_t nthreads; uint64_t mode; - uint64_t timeout_sec; + uint64_t timeout_ms; uint64_t images_in_set; }; @@ -38,7 +38,7 @@ void PrintCommandArguments(const Args& args) { << "Write files: " << ((args.mode %100) / 10 == 1) << std::endl << "Tcp mode: " << ((args.mode % 10) ==0 ) << std::endl << "Raw: " << (args.mode / 100 == 1)<< std::endl - << "timeout: " << args.timeout_sec << std::endl + << "timeout: " << args.timeout_ms << std::endl << "images in set: " << args.images_in_set << std::endl << std::endl; } @@ -86,7 +86,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { args->iterations = std::stoull(argv[4]); args->nthreads = std::stoull(argv[5]); args->mode = std::stoull(argv[6]); - args->timeout_sec = std::stoull(argv[7]); + args->timeout_ms = std::stoull(argv[7])*1000; if (argc == 9) { args->images_in_set = std::stoull(argv[8]); } else { @@ -188,7 +188,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { asapo::Error err; auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, - asapo::SourceCredentials{args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw,args.beamtime_id, "", args.data_source, args.token }, 3600, &err); + asapo::SourceCredentials{args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw,args.beamtime_id, "", args.data_source, args.token }, 3600000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); @@ -228,7 +228,7 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000); + auto err = producer->WaitRequestsFinished(args.timeout_ms); if (err) { std::cerr << "Producer exit on timeout " << std::endl; exit(EXIT_FAILURE); diff --git a/examples/producer/simple-producer/produce.cpp b/examples/producer/simple-producer/produce.cpp index 7986c4bfa0b5d53f5dbeedabcd8ac0446b84b2a3..5ddfe5d41bd3ad88db69722541ef8c7e19b213fb 100644 --- a/examples/producer/simple-producer/produce.cpp +++ b/examples/producer/simple-producer/produce.cpp @@ -26,7 +26,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto producer = asapo::Producer::Create(source, 1, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{beamtime, "", "", ""}, 60, &err); + asapo::SourceCredentials{beamtime, "", "", ""}, 60000, &err); exit_if_error("Cannot start producer", err); std::string to_send = "hello"; diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 272a4f75b86123f635774f0bf8c3b08262746aa2..334b4ae72e55d8293184c75464e290ad48a3d404 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -18,7 +18,7 @@ class Producer { */ static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type, SourceCredentials source_cred, - uint64_t timeout_sec, + uint64_t timeout_ms, Error* err); virtual ~Producer() = default; @@ -26,18 +26,18 @@ class Producer { //! Get stream information from receiver /*! \param stream (optional) - stream - \param timeout_sec - operation timeout in seconds + \param timeout_ms - operation timeout in milliseconds \return StreamInfo - a structure with stream information */ - virtual StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const = 0; - virtual StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; //! Get stream that has the newest ingested data /*! - \param timeout_ms - operation timeout in seconds + \param timeout_ms - operation timeout in milliseconds \return StreamInfo - a structure with stream information */ - virtual StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) const = 0; + virtual StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const = 0; //! Sends data to the receiver diff --git a/producer/api/cpp/src/producer.cpp b/producer/api/cpp/src/producer.cpp index 63a2d488baff067f4cce2862ad0475ba11bf4fee..cb94f8d0c08d1cdd8abb4deefcde65ca1f780f7b 100644 --- a/producer/api/cpp/src/producer.cpp +++ b/producer/api/cpp/src/producer.cpp @@ -3,7 +3,7 @@ #include "asapo/producer/producer_error.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, - asapo::RequestHandlerType type, SourceCredentials source_cred, uint64_t timeout_sec, Error* err) { + asapo::RequestHandlerType type, SourceCredentials source_cred, uint64_t timeout_ms, Error* err) { if (n_processing_threads > kMaxProcessingThreads || n_processing_threads == 0) { *err = ProducerErrorTemplates::kWrongInput.Generate("Set number of processing threads > 0 and <= " + std::to_string( @@ -13,7 +13,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp std::unique_ptr<asapo::Producer> producer; try { - producer.reset(new ProducerImpl(endpoint, n_processing_threads, timeout_sec, type)); + producer.reset(new ProducerImpl(endpoint, n_processing_threads, timeout_ms, type)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index bd826db67832a6e9c9c6213ee8d23699dabdcd80..24bdea3c47e31c90c5154ccb178d4da9a5bea75e 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -19,9 +19,9 @@ const std::string ProducerImpl::kFinishStreamKeyword = "asapo_finish_stream"; const std::string ProducerImpl::kNoNextStreamKeyword = "asapo_no_next"; -ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_sec, +ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms, asapo::RequestHandlerType type): - log__{GetDefaultProducerLogger()}, timeout_sec_{timeout_sec} { + log__{GetDefaultProducerLogger()}, timeout_ms_{timeout_ms} { switch (type) { case RequestHandlerType::kTcp: discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); @@ -110,7 +110,7 @@ Error ProducerImpl::Send(const EventHeader& event_header, auto request_header = GenerateNextSendRequest(event_header, std::move(stream), ingest_mode); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_sec_ * 1000} + std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_ms_} }); } @@ -221,7 +221,7 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca FileData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), "", "", callback, true, timeout_sec_} + std::move(data), "", "", callback, true, timeout_ms_} }); } @@ -302,9 +302,9 @@ void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, Re } catch(...) {} } -StreamInfo GetInfoFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_sec, Error* err) { +StreamInfo GetInfoFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_ms, Error* err) { try { - auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_sec * 1000)); + auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_ms)); if (status == std::future_status::ready) { auto res = promiseResult->get(); if (res.err == nullptr) { @@ -330,7 +330,7 @@ GenericRequestHeader CreateRequestHeaderFromOp(StreamRequestOp op,std::string st } } -StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, uint64_t timeout_sec, Error* err) const { +StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, uint64_t timeout_ms, Error* err) const { auto header = CreateRequestHeaderFromOp(op,stream); std::unique_ptr<std::promise<StreamInfoResult>> promise {new std::promise<StreamInfoResult>}; std::future<StreamInfoResult> promiseResult = promise->get_future(); @@ -338,25 +338,25 @@ StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, ui *err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(header), nullptr, "", "", unwrap_callback(ActivatePromise, std::move(promise)), true, - timeout_sec * 1000} + timeout_ms} }, true); if (*err) { return StreamInfo{}; } - return GetInfoFromCallback(&promiseResult, timeout_sec + 2, + return GetInfoFromCallback(&promiseResult, timeout_ms + 2000, err); // we give two more sec for request to exit by timeout } -StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const { - return StreamRequest(StreamRequestOp::kStreamInfo,stream,timeout_sec,err); +StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const { + return StreamRequest(StreamRequestOp::kStreamInfo,stream,timeout_ms,err); } -StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_sec, Error* err) const { - return GetStreamInfo(kDefaultStream, timeout_sec, err); +StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_ms, Error* err) const { + return GetStreamInfo(kDefaultStream, timeout_ms, err); } -StreamInfo ProducerImpl::GetLastStream(uint64_t timeout_sec, Error* err) const { - return StreamRequest(StreamRequestOp::kLastStream,"",timeout_sec,err); +StreamInfo ProducerImpl::GetLastStream(uint64_t timeout_ms, Error* err) const { + return StreamRequest(StreamRequestOp::kLastStream,"",timeout_ms,err); } uint64_t ProducerImpl::GetRequestsQueueVolumeMb() { diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 71a524dbf7bb523d698982e5f50b73ec7a0b2bd6..a2380040f1a054f0d497f3981b8e77054aab5144 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -27,14 +27,14 @@ class ProducerImpl : public Producer { static const std::string kFinishStreamKeyword; static const std::string kNoNextStreamKeyword; - explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_sec, + explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms, asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl &) = delete; ProducerImpl &operator=(const ProducerImpl &) = delete; - StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_sec, Error* err) const override; - StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const override; - StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) const override; + StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const override; + StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const override; + StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const override; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; @@ -69,14 +69,14 @@ class ProducerImpl : public Producer { uint64_t GetRequestsQueueVolumeMb() override; void SetRequestsQueueLimits(uint64_t size, uint64_t volume) override; private: - StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_sec, Error* err) const; + StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const; Error Send(const EventHeader &event_header, std::string stream, FileData data, std::string full_path, uint64_t ingest_mode, RequestCallback callback, bool manage_data_memory); GenericRequestHeader GenerateNextSendRequest(const EventHeader &event_header, std::string stream, uint64_t ingest_mode); std::string source_cred_string_; - uint64_t timeout_sec_; + uint64_t timeout_ms_; }; struct StreamInfoResult { diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index da219ba91417e7b98928bb36abb93567b697c2a3..4fc2319b1cc3de2dc56f639f36898d40aa3a199b 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -15,7 +15,7 @@ namespace { TEST(CreateProducer, TcpProducer) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } @@ -24,7 +24,7 @@ TEST(CreateProducer, ErrorBeamtime) { asapo::Error err; std::string expected_beamtimeid(asapo::kMaxMessageSize * 10, 'a'); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,expected_beamtimeid, "", "", ""}, 3600, &err); + SourceCredentials{asapo::SourceType::kRaw,expected_beamtimeid, "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -33,7 +33,7 @@ TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { asapo::SourceCredentials creds{asapo::SourceType::kRaw,"auto", "auto", "subname", "token"}; asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - creds, 3600, &err); + creds, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -41,7 +41,7 @@ TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, - asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -50,7 +50,7 @@ TEST(CreateProducer, TooManyThreads) { TEST(CreateProducer, ZeroThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0, - asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -59,7 +59,7 @@ TEST(CreateProducer, ZeroThreads) { TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, - SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600, + SourceCredentials{asapo::SourceType::kRaw,"bt", "", "", ""}, 3600000, &err); asapo::EventHeader event_header{1, 1, "test"}; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 5cd3bf8820ba02742e86468a5bbb21824b42cc84..81c86cc3ca61a7f636d10578e8c300e9d76fd69c 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -52,7 +52,7 @@ MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_ } TEST(ProducerImpl, Constructor) { - asapo::ProducerImpl producer{"", 4, 3600, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 4, 3600000, asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); } @@ -63,7 +63,7 @@ class ProducerImplTests : public testing::Test { asapo::ProducerRequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; - asapo::ProducerImpl producer{"", 1, 3600, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 1, 3600000, asapo::RequestHandlerType::kTcp}; uint64_t expected_size = 100; uint64_t expected_id = 10; uint64_t expected_subset_id = 100; @@ -476,14 +476,14 @@ TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { Return(nullptr)); asapo::Error err; - producer.GetStreamInfo(expected_stream, 1, &err); + producer.GetStreamInfo(expected_stream, 1000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } TEST(GetStreamInfoTest, GetStreamInfoTimeout) { - asapo::ProducerImpl producer1{"", 1, 10, asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer1{"", 1, 10000, asapo::RequestHandlerType::kTcp}; asapo::Error err; - auto sinfo = producer1.GetStreamInfo(5, &err); + auto sinfo = producer1.GetStreamInfo(5000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); @@ -497,7 +497,7 @@ TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { Return(nullptr)); asapo::Error err; - producer.GetLastStream(1, &err); + producer.GetLastStream(1000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index a8758311749ce8ea27574648430897b3cddf920f..892766c4079c2db86d59fba92f40f57a536ee8de 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -103,7 +103,7 @@ cdef extern from "asapo_wrappers.h" namespace "asapo": cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod - unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_sec, Error* error) + unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_ms, Error* error) Error SendFile(const EventHeader& event_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback) Error SendData__(const EventHeader& event_header, string stream, void* data, uint64_t ingest_mode,RequestCallback callback) void StopThreads__() @@ -111,8 +111,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: uint64_t GetRequestsQueueSize() Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) - StreamInfo GetStreamInfo(string stream, uint64_t timeout_sec, Error* err) - StreamInfo GetLastStream(uint64_t timeout_sec, Error* err) + StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) + StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) cdef extern from "asapo/asapo_producer.h" namespace "asapo": diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index e77bafcb7e14d8adf4e08f12e2678cd5f2a3802c..a0814c3aaafc171d1e375e4d955e6adf62188819 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -198,11 +198,11 @@ cdef class PyProducer: if err: throw_exception(err) - def stream_info(self, stream = 'default', uint64_t timeout_sec = 1): + def stream_info(self, stream = 'default', uint64_t timeout_ms = 1000): """ :param stream: stream name :type stream: string - :param timeout_sec: timeout in seconds + :param timeout_ms: timeout in milliseconds :type timeout_ms: int :raises: AsapoWrongInputError: wrong input (authorization, ...) @@ -213,12 +213,12 @@ cdef class PyProducer: cdef StreamInfo info cdef string b_stream = _bytes(stream) with nogil: - info = self.c_producer.get().GetStreamInfo(b_stream,timeout_sec,&err) + info = self.c_producer.get().GetStreamInfo(b_stream,timeout_ms,&err) if err: throw_exception(err) return json.loads(_str(info.Json(True))) - def last_stream(self, uint64_t timeout_sec = 1): + def last_stream(self, uint64_t timeout_ms = 1000): """ :param timeout_ms: timeout in seconds :type timeout_ms: int @@ -230,7 +230,7 @@ cdef class PyProducer: cdef Error err cdef StreamInfo info with nogil: - info = self.c_producer.get().GetLastStream(timeout_sec,&err) + info = self.c_producer.get().GetLastStream(timeout_ms,&err) if err: throw_exception(err) return json.loads(_str(info.Json(True))) @@ -320,7 +320,7 @@ cdef class PyProducer: if self.c_producer.get() is not NULL: self.c_producer.get().StopThreads__() @staticmethod - def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): + def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): pyProd = PyProducer() cdef Error err cdef SourceType source_type @@ -333,12 +333,12 @@ cdef class PyProducer: source.user_token = token source.data_source = data_source source.type = source_type - pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_sec,&err) + pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_ms,&err) if err: throw_exception(err) return pyProd -def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): +def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): """ :param endpoint: server endpoint (url:port) :type endpoint: string @@ -354,13 +354,13 @@ def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthread :type token: string :param nthreads: ingest mode flag :type nthreads: int - :param timeout_sec: send requests timeout - :type timeout_sec: int + :param timeout_ms: send requests timeout in milliseconds + :type timeout_ms: int :raises: AsapoWrongInputError: wrong input (number of threads, ,,,) AsapoProducerError: actually should not happen """ - return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_sec) + return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_ms) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 9f2b9eac26f8a46663695f7364f69dd01b312600..4436f6dca6c41187773ec53f70004f6b723532f9 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -39,7 +39,7 @@ std::unique_ptr<Producer> CreateProducer() { Error err; auto producer = Producer::Create(config->asapo_endpoint, (uint8_t) config->nthreads, - config->mode, asapo::SourceCredentials{asapo::SourceType::kProcessed,config->beamtime_id, "", config->data_source, ""}, 3600, &err); + config->mode, asapo::SourceCredentials{asapo::SourceType::kProcessed,config->beamtime_id, "", config->data_source, ""}, 3600000, &err); if(err) { std::cerr << "cannot create producer: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index 060f2edf42416651fd84fa9a6b5b9944b1d86b28..f999665371057705c38946bfd4d009ccf71e5358 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -25,7 +25,7 @@ class AsapoSender: def _callback(self, header, err): print ("hello self callback") -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("debug") sender = AsapoSender(producer) diff --git a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp index ed96f1484db175d3684b21176977914a1555cef0..4ebb3a8847cdc1962dfd083ea257679df24cd823 100644 --- a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp +++ b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp @@ -61,7 +61,7 @@ ProducerPtr CreateProducer(const Args& args) { auto producer = asapo::Producer::Create(args.server, 1, asapo::RequestHandlerType::kTcp, asapo::SourceCredentials{asapo::SourceType::kProcessed, - args.beamtime_id, "", "", args.token }, 60, &err); + args.beamtime_id, "", "", args.token }, 60000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py index eea7495bcd63a299f5910e3e8b2dc9257e7de92c..3949f00b2027540839ea983ec7032cb0ec8fd406 100644 --- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py +++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py @@ -26,7 +26,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] consumer = asapo_consumer.create_consumer(source,".",True, beamtime,"",token,timeout) -producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600) +producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600000) producer.set_log_level("debug") group_id = consumer.generate_group_id() diff --git a/tests/automatic/producer/aai/producer_aai.py b/tests/automatic/producer/aai/producer_aai.py index 29e40573840b8f4d684e77b21a62313f9ad0755f..4c4cdf41f56bade8bd7ad983263b09e8b125641a 100644 --- a/tests/automatic/producer/aai/producer_aai.py +++ b/tests/automatic/producer/aai/producer_aai.py @@ -26,7 +26,7 @@ def callback(header,err): lock.release() -producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60) +producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60000) producer.set_log_level("debug") diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp index 4d9fff49c703f95563ade6db55f22788791c33cb..15ac3fbeb3d0dc2302abc1b61c33f8609978296e 100644 --- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp +++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp @@ -70,7 +70,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, asapo::SourceCredentials{asapo::SourceType::kProcessed, - args.beamtime_id, "", "", ""}, 60, &err); + args.beamtime_id, "", "", ""}, 60000, &err); if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 0c484944dacca101220b122743bcd5fa6d6c41f9..3f6e545556547912b38aaedb5cf6184eaf7f2555 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -37,7 +37,7 @@ def callback(payload, err): lock.release() -producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60) +producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60000) producer.set_log_level("debug") diff --git a/tests/manual/producer_cpp/producer.cpp b/tests/manual/producer_cpp/producer.cpp index cea38446054e1821dfdca84eaedbe0389a19b754..25c7b06e499c1f916258e1b05f739d95eaf6f4a0 100644 --- a/tests/manual/producer_cpp/producer.cpp +++ b/tests/manual/producer_cpp/producer.cpp @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto producer = asapo::Producer::Create(endpoint, 1,asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "", ""}, 60, &err); + asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "", ""}, 60000, &err); exit_if_error("Cannot start producer", err); uint32_t eventid = 1; diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py index 32bbff83b816a49ee886479b4bc826c8f0384311..145c27e04e644560248a9d423be425784fa3a9e4 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("debug") diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py index c6011bbc173bd69ecbb8a68eea48a33f952154e0..0175bb3861f5b69be4860da15425ff9b8766d8ec 100644 --- a/tests/manual/python_tests/producer_wait_threads/producer_api.py +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,'processed',beamtime, 'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime, 'auto', data_source, token, nthreads, 600000) producer.set_log_level("info") @@ -63,7 +63,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, 0, 600) + producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, 0, 600000) except Exception as Asapo: print(e) else: diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py index b832ef27515cb75723a860b84e95d8f9862b4c2a..bd6cdcc23f939163f2ace6eda1f8763d07fc25f3 100644 --- a/tests/manual/python_tests/producer_wait_threads/test.py +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("info")