From 82e69df6a97d9cfd3e68b95d6284ffdbf947f341 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 14 Oct 2020 15:44:43 +0200 Subject: [PATCH] change to high resolution clock, broker returns sorted list of substreams --- broker/src/asapo_broker/database/database.go | 1 + broker/src/asapo_broker/database/mongodb.go | 85 ++---------- .../database/mongodb_substreams.go | 131 ++++++++++++++++++ .../src/asapo_broker/database/mongodb_test.go | 5 +- .../asapo_broker/database/substreams_test.go | 72 ++++++++++ broker/src/asapo_broker/server/server.go | 11 +- common/cpp/include/common/data_structs.h | 18 +-- common/cpp/include/request/request.h | 4 +- common/cpp/src/data_structs/data_structs.cpp | 98 +++---------- common/cpp/src/database/mongodb_client.cpp | 2 +- common/cpp/src/system_io/system_io_linux.cpp | 2 +- .../cpp/src/system_io/system_io_linux_mac.cpp | 6 +- common/cpp/src/system_io/system_io_mac.cpp | 2 +- .../cpp/src/system_io/system_io_windows.cpp | 10 +- .../data_structs/test_data_structs.cpp | 72 +++++----- consumer/api/cpp/src/server_data_broker.cpp | 12 +- .../api/cpp/unittests/test_server_broker.cpp | 8 +- consumer/api/python/asapo_consumer.pxd | 6 +- consumer/api/python/asapo_consumer.pyx.in | 7 +- .../folder_to_db/src/folder_db_importer.cpp | 8 +- .../getnext_broker/getnext_broker.cpp | 2 +- examples/pipeline/in_to_out/in_to_out.cpp | 14 +- .../dummy_data_producer.cpp | 8 +- producer/api/cpp/src/producer_impl.cpp | 2 +- .../api/cpp/src/request_handler_filesystem.h | 2 +- producer/api/cpp/src/request_handler_tcp.cpp | 4 +- producer/api/cpp/src/request_handler_tcp.h | 4 +- producer/api/python/asapo_producer.pxd | 4 +- producer/api/python/asapo_producer.pyx.in | 2 +- .../src/shared_event_list.cpp | 8 +- .../src/shared_event_list.h | 2 +- receiver/src/data_cache.cpp | 2 +- .../request_handler_authorize.cpp | 6 +- .../request_handler_authorize.h | 2 +- .../request_handler_db_stream_info.cpp | 2 +- .../request_handler_db_write.cpp | 2 +- .../src/statistics/receiver_statistics.cpp | 6 +- receiver/src/statistics/receiver_statistics.h | 2 +- receiver/src/statistics/statistics.cpp | 6 +- receiver/src/statistics/statistics.h | 2 +- .../test_request_handler_db_stream_info.cpp | 2 +- .../test_request_handler_db_writer.cpp | 2 +- .../consumer/consumer_api/check_linux.sh | 6 +- .../consumer/consumer_api/check_windows.bat | 4 +- .../consumer/consumer_api/consumer_api.cpp | 15 +- .../consumer_api_python/check_linux.sh | 6 +- .../consumer_api_python/check_windows.bat | 6 +- .../consumer_api_python/consumer_api.py | 10 +- .../insert_retrieve_mongodb.cpp | 2 +- .../insert_retrieve_dataset_mongodb.cpp | 2 +- .../beamtime_metadata/beamtime_metadata.cpp | 2 +- .../getlast_broker.cpp | 10 +- 52 files changed, 398 insertions(+), 311 deletions(-) create mode 100644 broker/src/asapo_broker/database/mongodb_substreams.go create mode 100644 broker/src/asapo_broker/database/substreams_test.go diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 7f31bf120..61ec11360 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -10,6 +10,7 @@ type Agent interface { type DBSettings struct { ReadFromInprocessPeriod int + UpdateSubstreamCachePeriodMs int } type DBError struct { diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index a7ece20ab..576655bb4 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -12,7 +12,6 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "math" - "sort" "strconv" "strings" "sync" @@ -53,16 +52,6 @@ type LastAck struct { ID int `bson:"_id" json:"lastAckId"` } -type SubstreamInfo struct { - LastId int `bson:"lastId" json:"lastId"` - Name string `bson:"name" json:"name"` - Timestamp int64 `bson:"timestamp" json:"timestamp"` -} - -type SubstreamsRecord struct { - Substreams []SubstreamInfo `bson:"substreams" json:"substreams"` -} - type LocationPointer struct { GroupID string `bson:"_id"` Value int `bson:"current_pointer"` @@ -81,7 +70,9 @@ const already_connected_msg = "already connected" const finish_substream_keyword = "asapo_finish_substream" const no_next_substream_keyword = "asapo_no_next" -var dbSessionLock sync.RWMutex +var dbSessionLock sync.Mutex + + type SizeRecord struct { Size int `bson:"size" json:"size"` @@ -90,7 +81,6 @@ type SizeRecord struct { type Mongodb struct { client *mongo.Client timeout time.Duration - parent_db *Mongodb settings DBSettings lastReadFromInprocess int64 } @@ -269,9 +259,10 @@ func (db *Mongodb) getRecordByIDRow(dbname string, collection_name string, id, i func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) { var res map[string]interface{} c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) - opts := options.FindOne().SetSort(bson.M{"timestemp": 1}) + opts := options.FindOne().SetSort(bson.M{"timestamp": 1}) var q bson.M = nil err := c.FindOne(context.TODO(), q, opts).Decode(&res) + if err != nil { if err == mongo.ErrNoDocuments { return map[string]interface{}{}, nil @@ -334,14 +325,6 @@ func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id str return []byte(""), err } -func (db *Mongodb) getParentDB() *Mongodb { - if db.parent_db == nil { - return db - } else { - return db.parent_db - } -} - func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, collection_name string, group_id string) error { if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} @@ -648,55 +631,6 @@ func (db *Mongodb) queryImages(dbname string, collection_name string, query stri } } -func (db *Mongodb) getSubstreams(db_name string, from string) ([]byte, error) { - database := db.client.Database(db_name) - - result, err := database.ListCollectionNames(context.TODO(), bson.D{}) - if err != nil { - return db.processQueryError("get substreams", db_name, err) - } - - var rec = SubstreamsRecord{[]SubstreamInfo{}} - for _, coll := range result { - if strings.HasPrefix(coll, data_collection_name_prefix) { - si := SubstreamInfo{Name: strings.TrimPrefix(coll, data_collection_name_prefix)} - rec.Substreams = append(rec.Substreams, si) - } - } - - for i,record := range rec.Substreams { - res,err := db.getEarliestRecord(db_name,record.Name) - if err==nil { - ts,ok := res["timestamp"].(int64) - if ok { - rec.Substreams[i].Timestamp = ts - } - } - _, dataset:= res["images"] - max, err := db.getMaxIndex(db_name, record.Name, dataset) - if err!=nil { - rec.Substreams[i].LastId = max - } - } - - sort.Slice(rec.Substreams[:], func(i, j int) bool { - return rec.Substreams[i].Timestamp < rec.Substreams[j].Timestamp - }) - - if from!="" { - ind:=len(rec.Substreams) - for i,rec:= range rec.Substreams { - if rec.Name == from { - ind = i - break - } - } - rec.Substreams=rec.Substreams[ind:] - } - - return json.Marshal(&rec) -} - func makeRange(min, max int) []int { a := make([]int, max-min+1) for i := range a { @@ -821,6 +755,15 @@ func (db *Mongodb) getNacks(db_name string, collection_name string, group_id str return resp[0].Numbers, nil } +func (db *Mongodb) getSubstreams(db_name string, from string) ([]byte, error) { + rec, err := substreams.getSubstreams(db,db_name,from) + if err != nil { + return db.processQueryError("get substreams", db_name, err) + } + return json.Marshal(&rec) +} + + func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { dataset := false if strings.HasSuffix(op, "_dataset") { diff --git a/broker/src/asapo_broker/database/mongodb_substreams.go b/broker/src/asapo_broker/database/mongodb_substreams.go new file mode 100644 index 000000000..b1c23104a --- /dev/null +++ b/broker/src/asapo_broker/database/mongodb_substreams.go @@ -0,0 +1,131 @@ +//+build !test + +package database + +import ( + "asapo_common/utils" + "context" + "errors" + "go.mongodb.org/mongo-driver/bson" + "sort" + "strings" + "sync" + "time" +) + +type SubstreamInfo struct { + Name string `bson:"name" json:"name"` + Timestamp int64 `bson:"timestamp" json:"timestamp"` +} + +type SubstreamsRecord struct { + Substreams []SubstreamInfo `bson:"substreams" json:"substreams"` +} + +type Substreams struct { + records map[string]SubstreamsRecord + lastUpdated int64 +} + +var substreams = Substreams{lastUpdated: 0, records: make(map[string]SubstreamsRecord, 0)} +var substreamsLock sync.Mutex + +func (ss *Substreams) tryGetFromCache(db_name string, updatePeriodMs int) (SubstreamsRecord, error) { + if ss.lastUpdated < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { + return SubstreamsRecord{}, errors.New("cache expired") + } + rec, ok := ss.records[db_name] + if !ok { + return SubstreamsRecord{}, errors.New("no records for " + db_name) + } + return rec, nil +} + +func readSubstreams(db *Mongodb, db_name string) (SubstreamsRecord, error) { + database := db.client.Database(db_name) + result, err := database.ListCollectionNames(context.TODO(), bson.D{}) + if err != nil { + return SubstreamsRecord{}, err + } + var rec = SubstreamsRecord{[]SubstreamInfo{}} + for _, coll := range result { + if strings.HasPrefix(coll, data_collection_name_prefix) { + si := SubstreamInfo{Name: strings.TrimPrefix(coll, data_collection_name_prefix)} + rec.Substreams = append(rec.Substreams, si) + } + } + return rec, nil +} + +func updateTimestamps(db *Mongodb, db_name string, rec *SubstreamsRecord) { + ss,dbFound :=substreams.records[db_name] + currentSubstreams := []SubstreamInfo{} + if dbFound { + // sort substreams by name + currentSubstreams=ss.Substreams + sort.Slice(currentSubstreams,func(i, j int) bool { + return currentSubstreams[i].Name>=currentSubstreams[j].Name + }) + } + for i, record := range rec.Substreams { + ind := sort.Search(len(currentSubstreams),func(i int) bool { + return currentSubstreams[i].Name>=record.Name + }) + if ind < len(currentSubstreams) && currentSubstreams[ind].Name == record.Name { // record found, just skip it + rec.Substreams[i].Timestamp = currentSubstreams[ind].Timestamp + continue + } + res, err := db.getEarliestRecord(db_name, record.Name) + if err == nil { + var si SubstreamInfo + err_convert := utils.MapToStruct(res,&si) + if err_convert == nil { + rec.Substreams[i].Timestamp = si.Timestamp + } + } + } +} + +func sortRecords(rec *SubstreamsRecord) { + sort.Slice(rec.Substreams[:], func(i, j int) bool { + return rec.Substreams[i].Timestamp < rec.Substreams[j].Timestamp + }) +} + +func (ss *Substreams) updateFromDb(db *Mongodb, db_name string) (SubstreamsRecord, error) { + rec, err := readSubstreams(db, db_name) + if err != nil { + return SubstreamsRecord{}, err + } + updateTimestamps(db, db_name, &rec) + sortRecords(&rec) + if len(rec.Substreams)>0 { + ss.records[db_name] = rec + ss.lastUpdated = time.Now().UnixNano() + } + return rec, nil +} + +func (ss *Substreams) getSubstreams(db *Mongodb, db_name string, from string) (SubstreamsRecord, error) { + substreamsLock.Lock() + rec, err := ss.tryGetFromCache(db_name,db.settings.UpdateSubstreamCachePeriodMs) + if err != nil { + rec, err = ss.updateFromDb(db, db_name) + } + substreamsLock.Unlock() + if err != nil { + return SubstreamsRecord{}, err + } + + if from != "" { + ind := len(rec.Substreams) + for i, rec := range rec.Substreams { + if rec.Name == from { + ind = i + break + } + } + rec.Substreams = rec.Substreams[ind:] + } + return rec, nil +} diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index de42e1505..8e2d97295 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -664,8 +664,8 @@ var testsSubstreams = []struct { ok bool }{ {"",[]Substream{},SubstreamsRecord{[]SubstreamInfo{}}, "no substreams", true}, - {"",[]Substream{{"ss1",[]TestRecord{rec1,rec2}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0}}}, "one substream", true}, - {"",[]Substream{{"ss1",[]TestRecord{rec1,rec2}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0},SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "two substreams", true}, + {"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0}}}, "one substream", true}, + {"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0},SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "two substreams", true}, {"ss2",[]Substream{{"ss1",[]TestRecord{rec1,rec2}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "with from", true}, } @@ -687,7 +687,6 @@ func TestMongoDBListSubstreams(t *testing.T) { } cleanup() } - } func TestMongoDBAckImage(t *testing.T) { diff --git a/broker/src/asapo_broker/database/substreams_test.go b/broker/src/asapo_broker/database/substreams_test.go new file mode 100644 index 000000000..6c3ed9be0 --- /dev/null +++ b/broker/src/asapo_broker/database/substreams_test.go @@ -0,0 +1,72 @@ +// +build integration_tests + +package database + +import ( + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +type SubstreamsTestSuite struct { + suite.Suite +} + +func (suite *SubstreamsTestSuite) SetupTest() { + db.Connect(dbaddress) +} + +func (suite *SubstreamsTestSuite) TearDownTest() { + cleanup() + substreams.records= map[string]SubstreamsRecord{} +} + +func TestSubstreamsTestSuite(t *testing.T) { + suite.Run(t, new(SubstreamsTestSuite)) +} + +func (suite *SubstreamsTestSuite) TestSubstreamsEmpty() { + rec, err := substreams.getSubstreams(&db, "test", "") + suite.Nil(err) + suite.Empty(rec.Substreams, 0) +} + +func (suite *SubstreamsTestSuite) TestSubstreamsNotUsesCacheWhenEmpty() { + db.settings.UpdateSubstreamCachePeriodMs = 1000 + substreams.getSubstreams(&db, dbname, "") + db.insertRecord(dbname, collection, &rec1) + rec, err := substreams.getSubstreams(&db, dbname, "") + suite.Nil(err) + suite.Equal(1, len(rec.Substreams)) +} + +func (suite *SubstreamsTestSuite) TestSubstreamsUsesCache() { + db.settings.UpdateSubstreamCachePeriodMs = 1000 + db.insertRecord(dbname, collection, &rec2) + substreams.getSubstreams(&db, dbname, "") + db.insertRecord(dbname, collection, &rec1) + rec, err := substreams.getSubstreams(&db, dbname, "") + suite.Nil(err) + suite.Equal(int64(1), rec.Substreams[0].Timestamp) +} + +func (suite *SubstreamsTestSuite) TestSubstreamsNotUsesCacheWhenExpired() { + db.settings.UpdateSubstreamCachePeriodMs = 10 + db.insertRecord(dbname, collection, &rec2) + substreams.getSubstreams(&db, dbname, "") + db.insertRecord(dbname, collection, &rec1) + time.Sleep(time.Millisecond * 100) + rec, err := substreams.getSubstreams(&db, dbname, "") + suite.Nil(err) + suite.Equal(int64(1), rec.Substreams[0].Timestamp) +} + +func (suite *SubstreamsTestSuite) TestSubstreamRemovesDatabase() { + db.settings.UpdateSubstreamCachePeriodMs = 0 + db.insertRecord(dbname, collection, &rec1) + substreams.getSubstreams(&db, dbname, "") + db.dropDatabase(dbname) + rec, err := substreams.getSubstreams(&db, dbname, "") + suite.Nil(err) + suite.Empty(rec.Substreams, 0) +} diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index 288202d8d..957b8006c 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -10,6 +10,7 @@ import ( ) const kDefaultresendInterval = 10 +const kDefaultSubstreamCacheUpdateIntervalMs = 100 var db database.Agent @@ -23,6 +24,7 @@ type serverSettings struct { LogLevel string discoveredDbAddress string CheckResendInterval *int + SubstreamCacheUpdateIntervalMs *int } func (s *serverSettings) GetResendInterval() int { @@ -32,6 +34,13 @@ func (s *serverSettings) GetResendInterval() int { return *s.CheckResendInterval } +func (s *serverSettings) GetSubstreamCacheUpdateInterval() int { + if s.SubstreamCacheUpdateIntervalMs==nil { + return kDefaultSubstreamCacheUpdateIntervalMs + } + return *s.SubstreamCacheUpdateIntervalMs +} + func (s *serverSettings) GetDatabaseServer() string { if s.DatabaseServer == "auto" { return s.discoveredDbAddress @@ -82,7 +91,7 @@ func InitDB(dbAgent database.Agent) (err error) { log.Debug("Got mongodb server: " + settings.discoveredDbAddress) } - db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: settings.GetResendInterval()}) + db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: settings.GetResendInterval(),UpdateSubstreamCachePeriodMs: settings.GetSubstreamCacheUpdateInterval()}) return db.Connect(settings.GetDatabaseServer()) } diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index cae1362b0..cf7d061d3 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -13,16 +13,17 @@ namespace asapo { class JsonStringParser; -std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec); -uint64_t NanosecsEpochFromISODate(std::string date_time); -uint64_t NanosecsEpochFromTimePoint(std::chrono::system_clock::time_point); +uint64_t NanosecsEpochFromTimePoint(std::chrono::high_resolution_clock::time_point); uint64_t EpochNanosecsFromNow(); -bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::system_clock::time_point* val); +std::chrono::high_resolution_clock::time_point TimePointfromNanosec(uint64_t nanoseconds_from_epoch); + + +bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::high_resolution_clock::time_point* val); class FileInfo { public: std::string name; - std::chrono::system_clock::time_point timestamp; + std::chrono::high_resolution_clock::time_point timestamp; uint64_t size{0}; uint64_t id{0}; std::string source; @@ -37,9 +38,9 @@ class FileInfo { struct StreamInfo { uint64_t last_id{0}; std::string name; - std::chrono::system_clock::time_point timestamp; - std::string Json() const; - bool SetFromJson(const std::string& json_string); + std::chrono::high_resolution_clock::time_point timestamp; + std::string Json(bool add_last_id) const; + bool SetFromJson(const std::string& json_string,bool read_last_id); }; using StreamInfos = std::vector<StreamInfo>; @@ -62,7 +63,6 @@ struct DataSet { uint64_t id; FileInfos content; bool SetFromJson(const std::string& json_string); - }; using SubDirList = std::vector<std::string>; diff --git a/common/cpp/include/request/request.h b/common/cpp/include/request/request.h index 0dfb3e548..836b9a744 100644 --- a/common/cpp/include/request/request.h +++ b/common/cpp/include/request/request.h @@ -26,7 +26,7 @@ class GenericRequest { if (timeout_ms_ == 0) { return false; } - uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - created_at_).count(); return elapsed_ms > timeout_ms_; } @@ -34,7 +34,7 @@ class GenericRequest { private: uint64_t retry_counter_ = 0; uint64_t timeout_ms_ = 0; - std::chrono::system_clock::time_point created_at_ = std::chrono::system_clock::now(); + std::chrono::high_resolution_clock::time_point created_at_ = std::chrono::high_resolution_clock::now(); }; using GenericRequestPtr = std::unique_ptr<GenericRequest>; diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 2361ccb75..01e515d1c 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -10,7 +10,7 @@ #include "json_parser/json_parser.h" #include "preprocessor/definitions.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; #ifdef _WIN32 #define timegm _mkgmtime @@ -70,16 +70,18 @@ std::string FileInfo::Json() const { return s; } -bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::system_clock::time_point* val) { +std::chrono::high_resolution_clock::time_point TimePointfromNanosec(uint64_t nanoseconds_from_epoch){ + std::chrono::nanoseconds ns = std::chrono::nanoseconds {nanoseconds_from_epoch}; + return std::chrono::high_resolution_clock::time_point + {std::chrono::duration_cast<std::chrono::high_resolution_clock::duration>(ns)}; +} + +bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::high_resolution_clock::time_point* val) { uint64_t nanoseconds_from_epoch; if (parser.GetUInt64(name, &nanoseconds_from_epoch)) { return false; } - - std::chrono::nanoseconds ns = std::chrono::nanoseconds {nanoseconds_from_epoch}; - *val = std::chrono::system_clock::time_point - {std::chrono::duration_cast<std::chrono::system_clock::duration>(ns)}; - + *val = TimePointfromNanosec(nanoseconds_from_epoch); return true; } @@ -134,95 +136,27 @@ std::string FileInfo::FullName(const std::string& base_path) const { return full_name + name; } -std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec) { - std::chrono::nanoseconds ns = std::chrono::nanoseconds {time_from_epoch_nanosec}; - auto tp = std::chrono::system_clock::time_point - {std::chrono::duration_cast<std::chrono::system_clock::duration>(ns)}; - std::time_t time = std::chrono::system_clock::to_time_t(tp); - std::tm timetm = *std::gmtime(&time); - std::stringstream ssTp; - auto zz = time_from_epoch_nanosec % 1000000000; - - char buff[100]; - - sprintf(buff, "%.4d-%.2d-%.2dT%.2d:%.2d:%.2d", timetm.tm_year + 1900, timetm.tm_mon + 1, timetm.tm_mday, - timetm.tm_hour, timetm.tm_min, timetm.tm_sec); - if (zz > 0) { - sprintf(buff + 19, ".%.9ld", zz); - } - std::string res{buff}; - return res+"Z"; -} - -uint64_t NanosecsEpochFromISODate(std::string date_time) { - double frac = 0; - size_t pos = date_time.find_last_of("Z"); - if (pos != std::string::npos) { - date_time = date_time.substr(0,pos); - } else { - return 0; - } - - pos = date_time.find_first_of('.'); - if (pos != std::string::npos) { - std::string frac_str = date_time.substr(pos); - if (sscanf(frac_str.c_str(), "%lf", &frac) != 1) { - return 0; - } - date_time = date_time.substr(0, pos); - } - - std::tm tm{}; - - int year, month, day, hour, minute, second; - hour = 0; - minute = 0; - second = 0; - - auto n = sscanf(date_time.c_str(), "%d-%d-%dT%d:%d:%d", &year, &month, &day, &hour, &minute, &second); - if (!(year >= 1970 && month >= 1 && month <= 12 && day >= 1 && day <= 31) || (n != 3 && n != 6)) { - return 0; - } - if ((n == 3 && date_time.size() != 10) || (n == 6 && date_time.size() != 19)) { - return 0; - } - - tm.tm_sec = second; - tm.tm_min = minute; - tm.tm_hour = hour; - tm.tm_mday = day; - tm.tm_mon = month - 1; - tm.tm_year = year - 1900; - - system_clock::time_point tp = system_clock::from_time_t (timegm(&tm)); - uint64_t ns = NanosecsEpochFromTimePoint(tp); - - ns = ns + uint64_t(frac * 1000000000) ; - - return ns > 0 ? ns : 1; -} - uint64_t EpochNanosecsFromNow() { - return NanosecsEpochFromTimePoint(system_clock::now()); + return NanosecsEpochFromTimePoint(high_resolution_clock::now()); } -uint64_t NanosecsEpochFromTimePoint(std::chrono::system_clock::time_point time_point) { - return (uint64_t) std::chrono::time_point_cast<std::chrono::nanoseconds>(time_point).time_since_epoch().count(); +uint64_t NanosecsEpochFromTimePoint(std::chrono::high_resolution_clock::time_point time_point) { + return (uint64_t) std::chrono::duration_cast<std::chrono::nanoseconds>(time_point.time_since_epoch()).count(); } -std::string StreamInfo::Json() const { +std::string StreamInfo::Json(bool add_last_id) const { auto nanoseconds_from_epoch = NanosecsEpochFromTimePoint(timestamp); - return "{\"lastId\":" + std::to_string(last_id) + "," + return (add_last_id?"{\"lastId\":" + std::to_string(last_id) + ",":"{")+ "\"name\":\"" + name + "\"," "\"timestamp\":" + std::to_string(nanoseconds_from_epoch) + "}"; } -bool StreamInfo::SetFromJson(const std::string& json_string) { +bool StreamInfo::SetFromJson(const std::string& json_string,bool read_last_id) { auto old = *this; JsonStringParser parser(json_string); uint64_t id; - if (parser.GetUInt64("lastId", &last_id) || + if ((read_last_id?parser.GetUInt64("lastId", &last_id):nullptr) || parser.GetString("name", &name) || !TimeFromJson(parser, "timestamp", ×tamp)) { *this = old; diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 5b78a0bf9..f2cf1ca45 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -378,7 +378,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str,const std::str } auto parser2 = JsonStringParser(earliest_record_str); - std::chrono::system_clock::time_point timestamp; + std::chrono::high_resolution_clock::time_point timestamp; auto ok = TimeFromJson(parser2, "timestamp", ×tamp); if (!ok) { return DBErrorTemplates::kJsonParseError.Generate("StreamInfoFromDbResponse: cannot parse timestamp in response: " + earliest_record_str); diff --git a/common/cpp/src/system_io/system_io_linux.cpp b/common/cpp/src/system_io/system_io_linux.cpp index 2c97e377f..92e47b301 100644 --- a/common/cpp/src/system_io/system_io_linux.cpp +++ b/common/cpp/src/system_io/system_io_linux.cpp @@ -18,7 +18,7 @@ using std::string; using std::vector; -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { diff --git a/common/cpp/src/system_io/system_io_linux_mac.cpp b/common/cpp/src/system_io/system_io_linux_mac.cpp index 0277406c6..88bc7e7d8 100644 --- a/common/cpp/src/system_io/system_io_linux_mac.cpp +++ b/common/cpp/src/system_io/system_io_linux_mac.cpp @@ -17,7 +17,7 @@ using std::string; using std::vector; -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -92,8 +92,8 @@ void SetModifyDate(const struct stat& t_stat, FileInfo* file_info) { #undef st_mtim #endif - file_info->timestamp = system_clock::time_point - {std::chrono::duration_cast<system_clock::duration>(d)}; + file_info->timestamp = high_resolution_clock::time_point + {std::chrono::duration_cast<high_resolution_clock::duration>(d)}; } void SetFileSize(const struct stat& t_stat, FileInfo* file_info) { diff --git a/common/cpp/src/system_io/system_io_mac.cpp b/common/cpp/src/system_io/system_io_mac.cpp index 2e240568e..3e370689b 100644 --- a/common/cpp/src/system_io/system_io_mac.cpp +++ b/common/cpp/src/system_io/system_io_mac.cpp @@ -17,7 +17,7 @@ using std::string; using std::vector; -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { ListSocketDescriptors SystemIO::WaitSocketsActivity(SocketDescriptor master_socket, diff --git a/common/cpp/src/system_io/system_io_windows.cpp b/common/cpp/src/system_io/system_io_windows.cpp index cb9c6d43a..7a53a540a 100644 --- a/common/cpp/src/system_io/system_io_windows.cpp +++ b/common/cpp/src/system_io/system_io_windows.cpp @@ -11,7 +11,7 @@ using std::string; using std::vector; -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -95,11 +95,11 @@ uint64_t GetLinuxNanosecFromWindowsEpoch(ULARGE_INTEGER ull) { return (uint64_t)(ull.QuadPart % k100nsInSec) * 100; } -std::chrono::system_clock::time_point FileTime2TimePoint(const FILETIME& ft, Error* err) { +std::chrono::high_resolution_clock::time_point FileTime2TimePoint(const FILETIME& ft, Error* err) { *err = CheckFileTime(ft); if (*err) { - return std::chrono::system_clock::time_point{}; + return std::chrono::high_resolution_clock::time_point{}; } // number of seconds @@ -113,8 +113,8 @@ std::chrono::system_clock::time_point FileTime2TimePoint(const FILETIME& ft, Err std::chrono::nanoseconds d = std::chrono::nanoseconds{nsec} + std::chrono::seconds{sec}; - auto tp = system_clock::time_point - {std::chrono::duration_cast<std::chrono::system_clock::duration>(d)}; + auto tp = high_resolution_clock::time_point + {std::chrono::duration_cast<std::chrono::high_resolution_clock::duration>(d)}; *err = nullptr; return tp; diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index f6a0a0176..1d8d409c5 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -32,7 +32,7 @@ FileInfo PrepareFileInfo() { finfo.name = std::string("folder") + asapo::kPathSeparator + "test"; finfo.source = "host:1234"; finfo.buf_id = big_uint; - finfo.timestamp = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); + finfo.timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::milliseconds(1)); finfo.metadata = "{\"bla\":10}"; return finfo; } @@ -132,54 +132,30 @@ struct TestEpochFromISODate { uint64_t ns; }; -auto tests = std::vector<TestEpochFromISODate> { - TestEpochFromISODate{"1970-01-01T00:00:00.0Z", 1}, // 0 reserved for errors - TestEpochFromISODate{"1970-01-01Z", 1}, - TestEpochFromISODate{"1970-01-01T00:00:00.000000002Z", 2}, - TestEpochFromISODate{"2019-07-25T15:38:11.100010002Z", 1564069091100010002}, -//errors - TestEpochFromISODate{"1970-13-01T00:00:00.000000002", 0}, - TestEpochFromISODate{"1970-12-01T00:00:00.", 0}, - TestEpochFromISODate{"1970-01-01T00:00:00.000000002", 0}, -}; - -TEST(FileInFo, NanosecsEpochFromISODate) { - for (auto test : tests) { - auto res = asapo::NanosecsEpochFromISODate(test.iso); - ASSERT_THAT(res, Eq(test.ns)); - } -} - -auto tests2 = std::vector<TestEpochFromISODate> { - TestEpochFromISODate{"1970-01-01T00:00:00Z", 0}, - TestEpochFromISODate{"1970-01-01T00:00:00.000000002Z", 2}, - TestEpochFromISODate{"2019-07-25T15:38:11.100010002Z", 1564069091100010002}, -}; - -TEST(FileInFo, ISODateFromNanosecsEpoch) { - for (auto test : tests2) { - auto res = asapo::IsoDateFromEpochNanosecs(test.ns); - ASSERT_THAT(res, Eq(test.iso)); - } -} - StreamInfo PrepareStreamInfo() { StreamInfo sinfo; sinfo.last_id = 123; sinfo.name = "test"; - sinfo.timestamp = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); + sinfo.timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::milliseconds(1)); return sinfo; } +TEST(FileInFo, TimeFromNanosec) { + auto tp = asapo::TimePointfromNanosec(1); + auto res = asapo::NanosecsEpochFromTimePoint(tp); + ASSERT_THAT(res, Eq(1)); +} + + TEST(StreamInfo, ConvertFromJson) { StreamInfo result; auto sinfo = PrepareStreamInfo(); - std::string json = sinfo.Json(); + std::string json = sinfo.Json(true); - auto ok = result.SetFromJson(json); + auto ok = result.SetFromJson(json,true); ASSERT_THAT(ok, Eq(true)); ASSERT_THAT(result.last_id, sinfo.last_id); @@ -187,12 +163,25 @@ TEST(StreamInfo, ConvertFromJson) { ASSERT_THAT(result.timestamp, sinfo.timestamp); } +TEST(StreamInfo, ConvertFromJsonWithoutID) { + StreamInfo result; + + auto sinfo = PrepareStreamInfo(); + std::string json = sinfo.Json(false); + + auto ok = result.SetFromJson(json,false); + + ASSERT_THAT(ok, Eq(true)); + ASSERT_THAT(result.name, sinfo.name); + ASSERT_THAT(result.timestamp, sinfo.timestamp); +} + TEST(StreamInfo, ConvertFromJsonErr) { StreamInfo result; std::string json = R"({"lastId":123)"; - auto ok = result.SetFromJson(json); + auto ok = result.SetFromJson(json,true); ASSERT_THAT(ok, Eq(false)); ASSERT_THAT(result.last_id, Eq(0)); @@ -202,7 +191,16 @@ TEST(StreamInfo, ConvertToJson) { auto sinfo = PrepareStreamInfo(); std::string expected_json = R"({"lastId":123,"name":"test","timestamp":1000000})"; - auto json = sinfo.Json(); + auto json = sinfo.Json(true); + + ASSERT_THAT(expected_json, Eq(json)); +} + +TEST(StreamInfo, ConvertToJsonWithoutID) { + auto sinfo = PrepareStreamInfo(); + + std::string expected_json = R"({"name":"test","timestamp":1000000})"; + auto json = sinfo.Json(false); ASSERT_THAT(expected_json, Eq(json)); } diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index d72c5502a..0c7a9a6f2 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -12,7 +12,7 @@ #include "fabric_consumer_client.h" #include "rds_response_error.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -214,7 +214,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g uint64_t elapsed_ms = 0; Error no_data_error; while (true) { - auto start = system_clock::now(); + auto start = high_resolution_clock::now(); auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); @@ -247,7 +247,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g return no_data_error ? std::move(no_data_error) : std::move(err); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count(); + elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count(); } return nullptr; } @@ -413,7 +413,7 @@ Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_nam uint64_t elapsed_ms = 0; Error err; while (elapsed_ms <= timeout_ms_) { - auto start = system_clock::now(); + auto start = high_resolution_clock::now(); err = DiscoverService(service_name, service_uri); if (err == nullptr) { request.host = *service_uri; @@ -423,7 +423,7 @@ Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_nam } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count(); + elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count(); } return err; } @@ -641,7 +641,7 @@ StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) { } for (auto substream_encoded : substreams_endcoded) { StreamInfo si; - auto ok = si.SetFromJson(substream_encoded); + auto ok = si.SetFromJson(substream_encoded,false); if (!ok) { *err = TextError("cannot parse "+substream_encoded); return StreamInfos {}; diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 5ffc3457d..8e9c17a6d 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -162,7 +162,7 @@ class ServerDataBrokerTests : public Test { fi.id = 1; fi.buf_id = buf_id; fi.name = expected_filename; - fi.timestamp = std::chrono::system_clock::now(); + fi.timestamp = std::chrono::high_resolution_clock::now(); return fi; } }; @@ -1027,7 +1027,7 @@ TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { MockGetBrokerUri(); - std::string return_substreams = R"({"substreams":[{"lastId":123,"name":"test","timestamp":1000000},{"lastId":124,"name":"test1","timestamp":2000000}]})"; + std::string return_substreams = R"({"substreams":[{"lastId":123,"name":"test","timestamp":1000000},{"name":"test1","timestamp":2000000}]})"; EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams" + "?token=" + expected_token+"&from=stream_from", _, _)).WillOnce(DoAll( @@ -1040,8 +1040,8 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(substreams.size(), Eq(2)); ASSERT_THAT(substreams.size(), 2); - ASSERT_THAT(substreams[0].Json(), R"({"lastId":123,"name":"test","timestamp":1000000})"); - ASSERT_THAT(substreams[1].Json(), R"({"lastId":124,"name":"test1","timestamp":2000000})"); + ASSERT_THAT(substreams[0].Json(false), R"({"name":"test","timestamp":1000000})"); + ASSERT_THAT(substreams[1].Json(false), R"({"name":"test1","timestamp":2000000})"); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 986cc0822..692fb425b 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -45,8 +45,8 @@ cdef extern from "asapo_consumer.h" namespace "asapo": string stream string user_token cppclass StreamInfo: - string Json() - bool SetFromJson(string json_str) + string Json(bool add_last_id) + bool SetFromJson(string json_str, bool read_last_id) cdef extern from "asapo_consumer.h" namespace "asapo": cppclass NetworkConnectionType: @@ -78,7 +78,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: DataSet GetLastDataset(string group_id, string substream, Error* err) DataSet GetDatasetById(uint64_t id, string group_id, string substream, Error* err) Error RetrieveData(FileInfo* info, FileData* data) - vector[StreamInfo] GetSubstreamList(Error* err) + vector[StreamInfo] GetSubstreamList(string from_substream, Error* err) void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 28d4ad925..179816357 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -203,16 +203,17 @@ cdef class PyDataBroker: if err: throw_exception(err) return _str(group_id) - def get_substream_list(self): + def get_substream_list(self,from_substream): cdef Error err cdef vector[StreamInfo] substreams + cdef string b_from_substream = _bytes(from_substream) with nogil: - substreams = self.c_broker.GetSubstreamList(&err) + substreams = self.c_broker.GetSubstreamList(b_from_substream,&err) if err: throw_exception(err) list = [] for substream in substreams: - list.append(json.loads(_str(substream.Json()))) + list.append(json.loads(_str(substream.Json(False)))) return list def acknowledge(self, group_id, uint64_t id, substream = "default"): cdef string b_group_id = _bytes(group_id) diff --git a/consumer/tools/folder_to_db/src/folder_db_importer.cpp b/consumer/tools/folder_to_db/src/folder_db_importer.cpp index 30be63bcf..51a186d63 100644 --- a/consumer/tools/folder_to_db/src/folder_db_importer.cpp +++ b/consumer/tools/folder_to_db/src/folder_db_importer.cpp @@ -9,7 +9,7 @@ namespace asapo { -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; FolderToDbImporter::FolderToDbImporter() : io__{GenerateDefaultIO()}, db_factory__{new asapo::DatabaseFactory} { @@ -111,7 +111,7 @@ Error FolderToDbImporter::Convert(const std::string& uri, const std::string& fol FolderImportStatistics* statistics) const { db_uri_ = uri; db_name_ = db_name; - auto time_begin = system_clock::now(); + auto time_begin = high_resolution_clock::now(); Error err; auto file_list = GetFilesInFolder(folder, &err); @@ -119,11 +119,11 @@ Error FolderToDbImporter::Convert(const std::string& uri, const std::string& fol return err; } - auto time_end_read_folder = system_clock::now(); + auto time_end_read_folder = high_resolution_clock::now(); err = ImportFilelist(file_list); - auto time_end_import = system_clock::now(); + auto time_end_import = high_resolution_clock::now(); if (err == nullptr && statistics) { statistics->n_files_converted = file_list.size(); diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index 05a15a02d..6d8d96400 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -11,7 +11,7 @@ #include "asapo_consumer.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; using asapo::Error; std::string group_id = ""; diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 9001d9983..6411283ba 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -14,7 +14,7 @@ #include "asapo_producer.h" #include "preprocessor/definitions.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; using asapo::Error; using BrokerPtr = std::unique_ptr<asapo::DataBroker>; using ProducerPtr = std::unique_ptr<asapo::Producer>; @@ -23,8 +23,8 @@ std::mutex lock_in, lock_out; int files_sent; bool streamout_timer_started; -system_clock::time_point streamout_start; -system_clock::time_point streamout_finish; +high_resolution_clock::time_point streamout_start; +high_resolution_clock::time_point streamout_finish; struct Args { std::string server; @@ -46,7 +46,7 @@ void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { } lock_out.lock(); files_sent++; - streamout_finish = max(streamout_finish, system_clock::now()); + streamout_finish = max(streamout_finish, high_resolution_clock::now()); lock_out.unlock(); } @@ -112,7 +112,7 @@ void SendDataDownstreamThePipeline(const Args& args, const asapo::FileInfo& fi, lock_out.lock(); if (!streamout_timer_started) { streamout_timer_started = true; - streamout_start = system_clock::now(); + streamout_start = high_resolution_clock::now(); } lock_out.unlock(); @@ -168,7 +168,7 @@ std::vector<std::thread> StartConsumerThreads(const Args& args, const ProducerPt int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* duration_ms, int* nerrors) { asapo::FileInfo fi; - system_clock::time_point t1 = system_clock::now(); + high_resolution_clock::time_point t1 = high_resolution_clock::now(); std::vector<int> nfiles(args.nthreads, 0); std::vector<int> errors(args.nthreads, 0); @@ -179,7 +179,7 @@ int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* dura int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0); *nerrors = std::accumulate(errors.begin(), errors.end(), 0); - system_clock::time_point t2 = system_clock::now(); + high_resolution_clock::time_point t2 = high_resolution_clock::now(); auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); *duration_ms = duration_read.count(); return n_total; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index d5c522d30..9dbeadce7 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -10,7 +10,7 @@ #include "asapo_producer.h" #include "preprocessor/definitions.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; std::mutex mutex; int iterations_remained; @@ -199,8 +199,8 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void PrintOutput(const Args& args, const system_clock::time_point& start) { - system_clock::time_point t2 = system_clock::now(); +void PrintOutput(const Args& args, const high_resolution_clock::time_point& start) { + high_resolution_clock::time_point t2 = high_resolution_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0; double size_gb = double(args.number_of_bytes) * args.iterations / 1000.0 / 1000.0 / 1000.0 * 8.0; double rate = args.iterations / duration_sec; @@ -221,7 +221,7 @@ int main (int argc, char* argv[]) { iterations_remained = args.iterations * args.images_in_set; } - system_clock::time_point start_time = system_clock::now(); + high_resolution_clock::time_point start_time = high_resolution_clock::now(); if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.images_in_set, args.stream, (args.mode %100) / 10 == 0,args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw)) { diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index e6054e30a..27f6ab00e 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -277,7 +277,7 @@ void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, Re Error err) { StreamInfoResult res; if (err == nullptr) { - auto ok = res.sinfo.SetFromJson(payload.response); + auto ok = res.sinfo.SetFromJson(payload.response,true); res.err = ok ? nullptr : ProducerErrorTemplates::kInternalServerError.Generate( std::string("cannot read JSON string from server response: ") + payload.response).release(); } else { diff --git a/producer/api/cpp/src/request_handler_filesystem.h b/producer/api/cpp/src/request_handler_filesystem.h index 66b1ca980..b828b1d20 100644 --- a/producer/api/cpp/src/request_handler_filesystem.h +++ b/producer/api/cpp/src/request_handler_filesystem.h @@ -10,7 +10,7 @@ #include "request/request_handler.h" #include "logger/logger.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index befcd52d4..c621311fa 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -143,7 +143,7 @@ void RequestHandlerTcp::UpdateIfNewConnection() { bool RequestHandlerTcp::UpdateReceiversList() { auto thread_receivers_new = discovery_service__->RotatedUriList(thread_id_); - last_receivers_uri_update_ = system_clock::now(); + last_receivers_uri_update_ = high_resolution_clock::now(); if (thread_receivers_new != receivers_list_) { receivers_list_ = thread_receivers_new; return true; @@ -152,7 +152,7 @@ bool RequestHandlerTcp::UpdateReceiversList() { } bool RequestHandlerTcp::TimeToUpdateReceiverList() { - uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - last_receivers_uri_update_).count(); return elapsed_ms > discovery_service__->UpdateFrequency(); } diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h index 822b75da2..e4518d52f 100644 --- a/producer/api/cpp/src/request_handler_tcp.h +++ b/producer/api/cpp/src/request_handler_tcp.h @@ -12,7 +12,7 @@ #include "request/request_handler.h" #include "producer_request.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -49,7 +49,7 @@ class RequestHandlerTcp: public RequestHandler { bool CanCreateNewConnections(); bool ProcessErrorFromReceiver(const Error& error, const ProducerRequest* request, const std::string& receiver_uri); ReceiversList receivers_list_; - system_clock::time_point last_receivers_uri_update_; + high_resolution_clock::time_point last_receivers_uri_update_; void ProcessRequestCallback(Error err, ProducerRequest* request, std::string response, bool* retry); uint64_t thread_id_; uint64_t* ncurrent_connections_; diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index d08450fc3..400b0f0b2 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -28,8 +28,8 @@ cdef extern from "asapo_producer.h" namespace "asapo": cppclass FileData: unique_ptr[uint8_t[]] release() cppclass StreamInfo: - string Json() - bool SetFromJson(string json_str) + string Json(bool add_last_id) + bool SetFromJson(string json_str, bool read_last_id) cdef extern from "asapo_producer.h" namespace "asapo": cppclass RequestHandlerType: diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 7a991b6b8..f039e9611 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -216,7 +216,7 @@ cdef class PyProducer: info = self.c_producer.get().GetStreamInfo(b_substream,timeout_ms,&err) if err: throw_exception(err) - return json.loads(_str(info.Json())) + return json.loads(_str(info.Json(True))) def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp index db29ff318..286165cd7 100644 --- a/producer/event_monitor_producer/src/shared_event_list.cpp +++ b/producer/event_monitor_producer/src/shared_event_list.cpp @@ -3,7 +3,7 @@ #include <algorithm> -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -11,7 +11,7 @@ FilesToSend SharedEventList::GetAndClearEvents() { std::lock_guard<std::mutex> lock(mutex_); FilesToSend events; for (auto it = events_.begin(); it != events_.end(); /* NOTHING */) { - uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - it->time).count(); if (!it->apply_delay || elapsed_ms > kFileDelayMs) { GetDefaultEventMonLogger()->Debug("file considered closed or file moved: " + it->file_name); @@ -29,9 +29,9 @@ void SharedEventList::AddEvent(std::string event, bool apply_delay) { return e.file_name == event; }); if ( events_.end() == findIter ) { - events_.emplace_back(SingleEvent{std::move(event), system_clock::now(), apply_delay}); + events_.emplace_back(SingleEvent{std::move(event), high_resolution_clock::now(), apply_delay}); } else { - findIter->time = system_clock::now(); + findIter->time = high_resolution_clock::now(); } } } diff --git a/producer/event_monitor_producer/src/shared_event_list.h b/producer/event_monitor_producer/src/shared_event_list.h index 02e2aa421..3db154f0b 100644 --- a/producer/event_monitor_producer/src/shared_event_list.h +++ b/producer/event_monitor_producer/src/shared_event_list.h @@ -15,7 +15,7 @@ const uint64_t kFileDelayMs = 500; struct SingleEvent { std::string file_name; - std::chrono::system_clock::time_point time; + std::chrono::high_resolution_clock::time_point time; bool apply_delay; }; diff --git a/receiver/src/data_cache.cpp b/receiver/src/data_cache.cpp index dbcb01fd2..f038705d7 100644 --- a/receiver/src/data_cache.cpp +++ b/receiver/src/data_cache.cpp @@ -59,7 +59,7 @@ void* DataCache::GetFreeSlotAndLock(uint64_t size, CacheMeta** meta) { uint64_t DataCache::GetNextId() { counter_++; - std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); uint32_t timeMillis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count(); return (uint64_t) timeMillis << 32 | counter_; } diff --git a/receiver/src/request_handler/request_handler_authorize.cpp b/receiver/src/request_handler/request_handler_authorize.cpp index 3b5f7fc59..a141dfe3d 100644 --- a/receiver/src/request_handler/request_handler_authorize.cpp +++ b/receiver/src/request_handler/request_handler_authorize.cpp @@ -5,7 +5,7 @@ #include "json_parser/json_parser.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -58,7 +58,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr beamline_ + ", beamtime id: " + beamtime_id_ + ", stream: " + stream_); } - last_updated_ = system_clock::now(); + last_updated_ = high_resolution_clock::now(); cached_source_credentials_ = source_credentials; return nullptr; @@ -88,7 +88,7 @@ Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const { bool RequestHandlerAuthorize::NeedReauthorize() const { uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - last_updated_).count(); + (high_resolution_clock::now() - last_updated_).count(); return elapsed_ms >= GetReceiverConfig()->authorization_interval_ms; } diff --git a/receiver/src/request_handler/request_handler_authorize.h b/receiver/src/request_handler/request_handler_authorize.h index 7d6af9aab..f808d59cb 100644 --- a/receiver/src/request_handler/request_handler_authorize.h +++ b/receiver/src/request_handler/request_handler_authorize.h @@ -27,7 +27,7 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { mutable std::string online_path_; mutable SourceType source_type_; mutable std::string cached_source_credentials_; - mutable std::chrono::system_clock::time_point last_updated_; + mutable std::chrono::high_resolution_clock::time_point last_updated_; Error ProcessAuthorizationRequest(Request* request) const; Error ProcessOtherRequest(Request* request) const; Error Authorize(Request* request, const char* source_credentials) const; diff --git a/receiver/src/request_handler/request_handler_db_stream_info.cpp b/receiver/src/request_handler/request_handler_db_stream_info.cpp index 648967082..82689f7d6 100644 --- a/receiver/src/request_handler/request_handler_db_stream_info.cpp +++ b/receiver/src/request_handler/request_handler_db_stream_info.cpp @@ -21,7 +21,7 @@ Error RequestHandlerDbStreamInfo::ProcessRequest(Request* request) const { log__->Debug(std::string{"get stream info from "} + col_name + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); info.name = request->GetSubstream(); - request->SetResponseMessage(info.Json(), ResponseMessageType::kInfo); + request->SetResponseMessage(info.Json(true), ResponseMessageType::kInfo); } return err; } diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index fdeefa68c..76eb907c4 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -86,7 +86,7 @@ FileInfo RequestHandlerDbWrite::PrepareFileInfo(const Request* request) const { file_info.buf_id = request->GetSlotId(); file_info.source = GetReceiverConfig()->dataserver.advertise_uri; file_info.metadata = request->GetMetaData(); - file_info.timestamp = std::chrono::system_clock::now(); + file_info.timestamp = std::chrono::high_resolution_clock::now(); return file_info; } diff --git a/receiver/src/statistics/receiver_statistics.cpp b/receiver/src/statistics/receiver_statistics.cpp index 9f4458a81..406334e08 100644 --- a/receiver/src/statistics/receiver_statistics.cpp +++ b/receiver/src/statistics/receiver_statistics.cpp @@ -2,7 +2,7 @@ namespace asapo { -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; @@ -31,13 +31,13 @@ void ReceiverStatistics::ResetStatistics() noexcept { void ReceiverStatistics::StartTimer(const StatisticEntity& entity) noexcept { current_statistic_entity_ = entity; - current_timer_last_timepoint_ = system_clock::now(); + current_timer_last_timepoint_ = high_resolution_clock::now(); } void ReceiverStatistics::StopTimer() noexcept { auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds> - (system_clock::now() - current_timer_last_timepoint_); + (high_resolution_clock::now() - current_timer_last_timepoint_); time_counters_[current_statistic_entity_] += elapsed; } diff --git a/receiver/src/statistics/receiver_statistics.h b/receiver/src/statistics/receiver_statistics.h index cbd30bd5a..877eb5453 100644 --- a/receiver/src/statistics/receiver_statistics.h +++ b/receiver/src/statistics/receiver_statistics.h @@ -23,7 +23,7 @@ class ReceiverStatistics : public Statistics { StatisticsToSend PrepareStatisticsToSend() const noexcept override; void ResetStatistics() noexcept override; uint64_t GetElapsedMs(StatisticEntity entity) const noexcept; - std::chrono::system_clock::time_point current_timer_last_timepoint_; + std::chrono::high_resolution_clock::time_point current_timer_last_timepoint_; StatisticEntity current_statistic_entity_ = StatisticEntity::kDatabase; std::chrono::nanoseconds time_counters_[kNStatisticEntities]; }; diff --git a/receiver/src/statistics/statistics.cpp b/receiver/src/statistics/statistics.cpp index b35da64b1..e33421385 100644 --- a/receiver/src/statistics/statistics.cpp +++ b/receiver/src/statistics/statistics.cpp @@ -4,7 +4,7 @@ #include <algorithm> -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; namespace asapo { @@ -34,7 +34,7 @@ StatisticsToSend Statistics::PrepareStatisticsToSend() const noexcept { uint64_t Statistics::GetTotalElapsedMs() const noexcept { return std::chrono::duration_cast<std::chrono::milliseconds> - ( system_clock::now() - last_timepoint_).count(); + ( high_resolution_clock::now() - last_timepoint_).count(); } @@ -43,7 +43,7 @@ void Statistics::SetWriteInterval(uint64_t interval_ms) { } void Statistics::ResetStatistics() noexcept { - last_timepoint_ = system_clock::now(); + last_timepoint_ = high_resolution_clock::now(); nrequests_ = 0; volume_counter_ = 0; } diff --git a/receiver/src/statistics/statistics.h b/receiver/src/statistics/statistics.h index b7e23a615..05a2c6878 100644 --- a/receiver/src/statistics/statistics.h +++ b/receiver/src/statistics/statistics.h @@ -42,7 +42,7 @@ class Statistics { void Send() noexcept; uint64_t GetTotalElapsedMs() const noexcept; uint64_t nrequests_; - std::chrono::system_clock::time_point last_timepoint_; + std::chrono::high_resolution_clock::time_point last_timepoint_; uint64_t volume_counter_; unsigned int write_interval_; std::vector<std::pair<std::string, std::string>> tags_; diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp index 03388c8d0..13e9d213b 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp @@ -70,7 +70,7 @@ class DbMetaStreamInfoTests : public Test { GenericRequestHeader request_header; expected_stream_info.last_id = 10; expected_stream_info.name = expected_substream; - expected_stream_info.timestamp = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); + expected_stream_info.timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::milliseconds(1)); request_header.data_id = 0; handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; handler.log__ = &mock_logger; diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index 66d7e9afc..b21270ce0 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -121,7 +121,7 @@ MATCHER_P(CompareFileInfo, file, "") { if (arg.id != file.id) return false; if (arg.metadata != file.metadata) return false; - if (arg.timestamp<std::chrono::system_clock::now()-std::chrono::seconds (5)) { + if (arg.timestamp<std::chrono::high_resolution_clock::now()-std::chrono::seconds (5)) { return false; } diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh index 8d58e5cda..47eb27314 100644 --- a/tests/automatic/consumer/consumer_api/check_linux.sh +++ b/tests/automatic/consumer/consumer_api/check_linux.sh @@ -28,7 +28,7 @@ sleep 1 for i in `seq 1 10`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done for i in `seq 1 5`; @@ -38,15 +38,15 @@ done for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":2,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done echo hello1 > 1 - $@ 127.0.0.1:8400 $beamtime_id $token_test_run single + #check datasets echo "db.dropDatabase()" | mongo ${database_name} diff --git a/tests/automatic/consumer/consumer_api/check_windows.bat b/tests/automatic/consumer/consumer_api/check_windows.bat index 227c53f19..e11cbff14 100644 --- a/tests/automatic/consumer/consumer_api/check_windows.bat +++ b/tests/automatic/consumer/consumer_api/check_windows.bat @@ -8,11 +8,11 @@ set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= call start_services.bat -for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","timestamp":0,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error -for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":2,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error echo hello1 > 1 diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 80b8ae63a..e21e446e3 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -134,21 +134,18 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(err == nullptr, "GetNext stream2 no error"); M_AssertTrue(fi.name == "21", "GetNext stream2 filename"); - auto substreams = broker->GetSubstreamList(&err); + auto substreams = broker->GetSubstreamList("",&err); M_AssertTrue(err == nullptr, "GetSubstreamList no error"); M_AssertTrue(substreams.size() == 3, "substreams.size"); M_AssertTrue(substreams[0].name == "default", "substreams0.name1"); M_AssertTrue(substreams[1].name == "stream1", "substreams1.name2"); M_AssertTrue(substreams[2].name == "stream2", "substreams2.name3"); - std::cout<<substreams[0].Json()<<std::endl; - std::cout<<substreams[1].Json()<<std::endl; - std::cout<<substreams[2].Json()<<std::endl; - M_AssertTrue(substreams[0].last_id == 10, "substreams0.lastid"); - M_AssertTrue(substreams[1].last_id == 10, "substreams1.lastid"); - M_AssertTrue(substreams[2].last_id == 10, "substreams2.lastid"); - M_AssertTrue(asapo::NanosecsEpochFromTimePoint(substreams[0].timestamp) == 1, "substreams0.timestamp"); + std::cout<<substreams[0].Json(false)<<std::endl; + std::cout<<substreams[1].Json(false)<<std::endl; + std::cout<<substreams[2].Json(false)<<std::endl; + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(substreams[0].timestamp) == 0, "substreams0.timestamp"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(substreams[1].timestamp) == 1, "substreams1.timestamp"); - M_AssertTrue(asapo::NanosecsEpochFromTimePoint(substreams[2].timestamp) == 1, "substreams2.timestamp"); + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(substreams[2].timestamp) == 2, "substreams2.timestamp"); // acknowledges auto id = broker->GetLastAcknowledgedTulpeId(group_id, &err); diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index f359bd4e2..26c738cae 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -35,19 +35,19 @@ echo -n hello1 > $source_path/1_1 for i in `seq 1 5`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done echo 'db.data_streamfts.insert({"_id":'1',"size":0,"name":"'1'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} for i in `seq 1 5`; do - echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":2,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":3,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done sleep 1 diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat index a611558d0..66e9c2572 100644 --- a/tests/automatic/consumer/consumer_api_python/check_windows.bat +++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat @@ -12,13 +12,13 @@ set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= call start_services.bat -for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","timestamp":0,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error echo db.data_streamfts.insert({"_id":1,"size":0,"name":"1","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error -for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":2,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error -for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":3,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error mkdir %source_path% diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 2fba3c8a2..3bbdc6dfd 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -123,11 +123,13 @@ def check_single(broker,group_id): _, meta = broker.get_next(group_id,"stream2", meta_only=True) assert_metaname(meta,"21","get next stream2") - substreams = broker.get_substream_list() + substreams = broker.get_substream_list("") assert_eq(len(substreams),4,"number of substreams") - assert_eq(substreams[0],"default","substreams_name1") - assert_eq(substreams[1],"stream1","substreams_name2") - assert_eq(substreams[2],"stream2","substreams_name3") + print(substreams) + assert_eq(substreams[0]["name"],"default","substreams_name1") + assert_eq(substreams[1]["name"],"streamfts","substreams_name2") + assert_eq(substreams[2]["name"],"stream1","substreams_name2") + assert_eq(substreams[3]["name"],"stream2","substreams_name3") #acks try: diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index f36e4de35..22d90fb08 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -38,7 +38,7 @@ int main(int argc, char* argv[]) { fi.size = 100; fi.name = "relpath/1"; fi.id = args.file_id; - fi.timestamp = std::chrono::system_clock::now(); + fi.timestamp = std::chrono::high_resolution_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp index d54e63487..24c0413da 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp @@ -38,7 +38,7 @@ int main(int argc, char* argv[]) { fi.size = 100; fi.name = "relpath/1"; uint64_t subset_id = args.file_id; - fi.timestamp = std::chrono::system_clock::now(); + fi.timestamp = std::chrono::high_resolution_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; fi.id = 10; diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp index d8951bf79..1b7265fb6 100644 --- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp +++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp @@ -4,7 +4,7 @@ #include <thread> -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; struct Args { diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 658af3435..e4d635fa5 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -10,7 +10,7 @@ #include "asapo_consumer.h" -using std::chrono::system_clock; +using std::chrono::high_resolution_clock; using asapo::Error; std::string group_id = ""; @@ -77,8 +77,8 @@ std::vector<std::thread> StartThreads(const Args& params, lock.unlock(); - auto start = system_clock::now(); - while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < + auto start = high_resolution_clock::now(); + while (std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count() < params.timeout_ms) { if (params.datasets) { auto dataset = broker->GetLastDataset(group_id, &err); @@ -118,7 +118,7 @@ std::vector<std::thread> StartThreads(const Args& params, int ReadAllData(const Args& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total, asapo::NetworkConnectionType* connection_type) { asapo::FileInfo fi; - system_clock::time_point t1 = system_clock::now(); + high_resolution_clock::time_point t1 = high_resolution_clock::now(); std::vector<int> nfiles(params.nthreads, 0); std::vector<int> errors(params.nthreads, 0); @@ -134,7 +134,7 @@ int ReadAllData(const Args& params, uint64_t* duration_ms, int* nerrors, int* nb *nbuf = std::accumulate(nfiles_frombuf.begin(), nfiles_frombuf.end(), 0); *nfiles_total = std::accumulate(nfiles_total_in_datasets.begin(), nfiles_total_in_datasets.end(), 0); - system_clock::time_point t2 = system_clock::now(); + high_resolution_clock::time_point t2 = high_resolution_clock::now(); auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); *duration_ms = duration_read.count(); -- GitLab