diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index b219654128adefd675da8199549614cdb9e8ed15..c55161c5ff191d779dbb741e044749d96b62720b 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -59,26 +59,10 @@ type SizeRecord struct { type Mongodb struct { client *mongo.Client timeout time.Duration - databases []string parent_db *Mongodb db_pointers_created map[string]bool } -func (db *Mongodb) databaseInList(dbname string) bool { - dbListLock.RLock() - defer dbListLock.RUnlock() - return utils.StringInSlice(dbname, db.databases) -} - -func (db *Mongodb) updateDatabaseList() (err error) { - dbListLock.Lock() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - db.databases, err = db.client.ListDatabaseNames(ctx, bson.M{}) - dbListLock.Unlock() - return err -} - func (db *Mongodb) Ping() (err error) { if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} @@ -88,22 +72,6 @@ func (db *Mongodb) Ping() (err error) { return db.client.Ping(ctx, nil) } -func (db *Mongodb) dataBaseExist(dbname string) (err error) { - if db.databaseInList(dbname) { - return nil - } - - if err := db.updateDatabaseList(); err != nil { - return err - } - - if !db.databaseInList(dbname) { - return &DBError{utils.StatusWrongInput, "stream not found: " + dbname} - } - - return nil -} - func (db *Mongodb) Connect(address string) (err error) { if db.client != nil { return &DBError{utils.StatusServiceUnavailable, already_connected_msg} @@ -122,13 +90,7 @@ func (db *Mongodb) Connect(address string) (err error) { } // db.client.SetSafe(&mgo.Safe{J: true}) - - if err := db.updateDatabaseList(); err != nil { - db.Close() - return err - } - - return + return db.Ping() } func (db *Mongodb) Close() { diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index c0e13537896aafd3af6fb30d312d04d4244a2df1..db3bfda62dad0f795fbb5bbd971a2bbd02e8411e 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -62,7 +62,7 @@ func cleanup() { // go_integration_test(${TARGET_NAME}-connectdb "./..." "MongoDBConnect") func TestMongoDBConnectFails(t *testing.T) { err := db.Connect("blabla") - defer cleanup() + defer db.Close() assert.NotNil(t, err) } diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 39e5590f058130bda5dc16da5f81652526d8aec9..84cdc1a1f5760ed58a22c954505e832709e5aa11 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -70,7 +70,8 @@ struct GenericRequestHeader { char substream[kMaxMessageSize]; std::string Json() { std::string s = "{\"id\":" + std::to_string(data_id) + "," - "\"buffer\":\"" + std::string(message) + "\"" + "\"buffer\":\"" + std::string(message) + "\"" + "," + "\"substream\":\"" + std::string(substream) + "\"" + "}"; return s; }; diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 38597a5547752b0e7694766b185a81931cac70a9..b90bf1b446b88e65239800129f5ddb1b32c1cd26 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -165,6 +165,8 @@ cdef class PyProducer: :type user_meta: JSON string :param subset: a tuple with two int values (subset id, subset size), default None :type subset: tuple + :param substream: substream name, default "default" + :type substream: string :param ingest_mode: ingest mode flag :type ingest_mode: int :param callback: callback function, default None @@ -210,6 +212,8 @@ cdef class PyProducer: :type user_meta: JSON string :param subset: a tuple with two int values (subset id, subset size), default None :type subset: tuple + :param substream: substream name, default "default" + :type substream: string :param ingest_mode: ingest mode flag :type ingest_mode: int :param callback: callback function, default None diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 6cd5fa90823e713b6ab0461db3aa7981e714dc4c..6110cbc43e223615111ca0fc7f84d724e3ebaa98 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -41,7 +41,7 @@ sleep 1 $1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 10 +cat out | grep "successfuly sent" | wc -l | grep 11 cat out | grep "local i/o error" cat out | grep "already have record with same id" | wc -l | grep 4 cat out | grep "duplicate" | wc -l | grep 4 diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index aec0dd80126a6b8a868105c866f68d3396f72b06..352cbfd3c653ae66f805c44a67ccd45e18a686a2 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -22,7 +22,7 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 10 || goto error +echo %NUM% | findstr 11 || goto error for /F %%N in ('find /C "} wrong input: Bad request: already have record with same id" ^< "out"') do set NUM=%%N echo %NUM% | findstr 2 || goto error diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index a0f559f6f50a69e4e9bf17ea8af062308c3e8212..3146c57b82bcf9e77ac93e5e4e18d5b8b4115287 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -85,6 +85,9 @@ producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", producer.send_data(6, stream+"/"+"file8",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +#send to another substream +producer.send_data(1, stream+"/"+"file9",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, substream="substream", callback = callback) producer.wait_requests_finished(50000)