diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 2ccfd229f1da583bd8a325e007422fe3da1d1dcd..dc3b81ace0e6fd64c528a8029d92af660e433377 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -73,7 +73,7 @@ func (db *Mongodb) dataBaseExist(dbname string) (err error) { } if !db.databaseInList(dbname) { - return errors.New("dataset not found: " + dbname) + return &DBError{utils.StatusWrongInput, "stream not found: " + dbname} } return nil @@ -81,7 +81,7 @@ func (db *Mongodb) dataBaseExist(dbname string) (err error) { func (db *Mongodb) Connect(address string) (err error) { if db.session != nil { - return errors.New(already_connected_msg) + return &DBError{utils.StatusServiceUnavailable, already_connected_msg} } db.session, err = mgo.DialWithTimeout(address, time.Second) @@ -108,14 +108,14 @@ func (db *Mongodb) Close() { func (db *Mongodb) DeleteAllRecords(dbname string) (err error) { if db.session == nil { - return errors.New(no_session_msg) + return &DBError{utils.StatusServiceUnavailable, no_session_msg} } return db.session.DB(dbname).DropDatabase() } func (db *Mongodb) InsertRecord(dbname string, s interface{}) error { if db.session == nil { - return errors.New(no_session_msg) + return &DBError{utils.StatusServiceUnavailable, no_session_msg} } c := db.session.DB(dbname).C(data_collection_name) @@ -125,7 +125,7 @@ func (db *Mongodb) InsertRecord(dbname string, s interface{}) error { func (db *Mongodb) InsertMeta(dbname string, s interface{}) error { if db.session == nil { - return errors.New(no_session_msg) + return &DBError{utils.StatusServiceUnavailable, no_session_msg} } c := db.session.DB(dbname).C(meta_collection_name) @@ -133,7 +133,7 @@ func (db *Mongodb) InsertMeta(dbname string, s interface{}) error { return c.Insert(s) } -func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int) { +func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int,err error) { c := db.session.DB(dbname).C(data_collection_name) var id Pointer var q bson.M @@ -142,11 +142,11 @@ func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int) { } else { q = nil } - err := c.Find(q).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) - if err != nil { - return 0 + err = c.Find(q).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) + if err == mgo.ErrNotFound { + return 0,nil } - return id.ID + return id.ID,err } func (db *Mongodb) createLocationPointers(dbname string, group_id string) (err error) { @@ -179,8 +179,10 @@ func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, r _, err = c.Find(q).Apply(change, res) if err == mgo.ErrNotFound { return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind)} + } else if err !=nil { // we do not know if counter was updated + return &DBError{utils.StatusTransactionInterrupted, err.Error()} } - return err + return nil } func encodeAnswer(id, id_max int) string { @@ -218,17 +220,20 @@ func (db *Mongodb) GetRecordByIDRow(dbname string, id, id_max int, dataset bool) func (db *Mongodb) GetRecordByID(dbname string, group_id string, id_str string, dataset bool) ([]byte, error) { id, err := strconv.Atoi(id_str) if err != nil { - return nil, err + return nil, &DBError{utils.StatusWrongInput, err.Error()} } if err := db.checkDatabaseOperationPrerequisites(dbname, group_id); err != nil { return nil, err } - max_ind := db.getMaxIndex(dbname, dataset) - res, err := db.GetRecordByIDRow(dbname, id, max_ind, dataset) + max_ind,err := db.getMaxIndex(dbname, dataset) + if err != nil { + return nil,err + } + + return db.GetRecordByIDRow(dbname, id, max_ind, dataset) - return res, err } func (db *Mongodb) needCreateLocationPointersInDb(group_id string) bool { @@ -264,11 +269,11 @@ func (db *Mongodb) getParentDB() *Mongodb { func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id string) error { if db.session == nil { - return &DBError{utils.StatusError, no_session_msg} + return &DBError{utils.StatusServiceUnavailable, no_session_msg} } if err := db.getParentDB().dataBaseExist(db_name); err != nil { - return &DBError{utils.StatusWrongInput, err.Error()} + return err } if len(group_id) > 0 { @@ -278,10 +283,13 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id } func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bool) (Pointer, int, error) { - max_ind := db.getMaxIndex(db_name, dataset) + max_ind,err := db.getMaxIndex(db_name, dataset) + if err != nil { + return Pointer{}, 0, err + } var curPointer Pointer - err := db.incrementField(db_name, group_id, max_ind, &curPointer) + err = db.incrementField(db_name, group_id, max_ind, &curPointer) if err != nil { return Pointer{}, 0, err } @@ -303,16 +311,19 @@ func (db *Mongodb) GetNextRecord(db_name string, group_id string, dataset bool) log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) return db.GetRecordByIDRow(db_name, curPointer.Value, max_ind, dataset) - } + func (db *Mongodb) GetLastRecord(db_name string, group_id string, dataset bool) ([]byte, error) { if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err } - max_ind := db.getMaxIndex(db_name, dataset) + max_ind,err := db.getMaxIndex(db_name, dataset) + if err !=nil { + return nil,err + } res, err := db.GetRecordByIDRow(db_name, max_ind, max_ind, dataset) db.setCounter(db_name, group_id, max_ind) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 12a4ba197e58b3479604c39b09881db5b6dd689c..92e84b252713d000c9bd0a3352ca8a471472eb00 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -67,7 +67,7 @@ func TestMongoDBConnectOK(t *testing.T) { func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { _, err := db.GetNextRecord("", groupId, false) - assert.Equal(t, utils.StatusError, err.(*DBError).Code) + assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { @@ -287,7 +287,7 @@ func TestMongoDBGetSizeNoDatabase(t *testing.T) { func TestMongoDBGetRecordByIDNotConnected(t *testing.T) { _, err := db.GetRecordByID(dbname, "", "2", false) - assert.Equal(t, utils.StatusError, err.(*DBError).Code) + assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBResetCounter(t *testing.T) { diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index a912998bc3d4f697338209ab58352ce6addb53d0..e7a3df8597262afbc856d31d1afd387a25789cd6 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -68,8 +68,8 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par } func returnError(err error, log_str string) (answer []byte, code int) { + code = utils.StatusServiceUnavailable err_db, ok := err.(*database.DBError) - code = utils.StatusError if ok { code = err_db.Code } diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 65816108f40575b1e4c3c830dfc0d52381e0e985..c5c6c8cb9351a042bbc4afff811c3a26260454b5 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -136,7 +136,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) - suite.Equal(http.StatusInternalServerError, w.Code, "internal error") + suite.Equal(http.StatusNotFound, w.Code, "internal error") } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { diff --git a/common/go/src/asapo_common/utils/status_codes.go b/common/go/src/asapo_common/utils/status_codes.go index 58fef4da3eba0fb3d6962487a88b0c2fc8a4d393..9f6e061622fe87e82f779e6bc871cca099b29dc9 100644 --- a/common/go/src/asapo_common/utils/status_codes.go +++ b/common/go/src/asapo_common/utils/status_codes.go @@ -8,7 +8,8 @@ const ( ) const ( //error codes - StatusError = http.StatusInternalServerError - StatusWrongInput = http.StatusBadRequest - StatusNoData = http.StatusConflict + StatusTransactionInterrupted = http.StatusInternalServerError + StatusServiceUnavailable = http.StatusNotFound + StatusWrongInput = http.StatusBadRequest + StatusNoData = http.StatusConflict ) diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h index f751b0622d535496d227a48b0ef07f70f93e8db9..62f2a2f73e403a8e83ce95ef40db03789aee5cc0 100644 --- a/consumer/api/cpp/include/consumer/consumer_error.h +++ b/consumer/api/cpp/include/consumer/consumer_error.h @@ -9,9 +9,9 @@ namespace asapo { enum class ConsumerErrorType { kNoData, kEndOfStream, - kBrokerServersNotFound, - kBrokerServerError, - kIOError, + kUnavailableService, + kInterruptedTransaction, + kLocalIOError, kWrongInput }; @@ -27,8 +27,8 @@ class ConsumerErrorData : public CustomErrorData { namespace ConsumerErrorTemplates { -auto const kIOError = ConsumerErrorTemplate{ - "i/o error", ConsumerErrorType::kIOError +auto const kLocalIOError = ConsumerErrorTemplate{ + "local i/o error", ConsumerErrorType::kLocalIOError }; auto const kEndOfStream = ConsumerErrorTemplate{ @@ -43,12 +43,12 @@ auto const kWrongInput = ConsumerErrorTemplate{ "wrong input", ConsumerErrorType::kWrongInput }; -auto const kBrokerServerError = ConsumerErrorTemplate{ - "error from broker server", ConsumerErrorType::kBrokerServerError +auto const kInterruptedTransaction = ConsumerErrorTemplate{ + "error from broker server", ConsumerErrorType::kInterruptedTransaction }; -auto const kBrokerServersNotFound = ConsumerErrorTemplate{ - "cannot find brokers", ConsumerErrorType::kBrokerServersNotFound +auto const kUnavailableService = ConsumerErrorTemplate{ + "cannot find brokers", ConsumerErrorType::kUnavailableService }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 7fe7ce71fe4d7ec48cfaf5502cd9c14c2fab013b..fda7b50803b09037f2997c1ed49e5d94fa51a6fa 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -27,7 +27,7 @@ Error ErrorFromNoDataResponse(const std::string& response) { uint64_t id, id_max; auto parse_error = GetIDsFromJson(response, &id, &id_max); if (parse_error) { - return ConsumerErrorTemplates::kBrokerServerError.Generate("malformed response - " + response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); } Error err; if (id >= id_max ) { @@ -53,13 +53,13 @@ Error ErrorFromServerResponce(const std::string& response, const HttpCode& code) case HttpCode::Unauthorized: return ConsumerErrorTemplates::kWrongInput.Generate(response); case HttpCode::InternalServerError: - return ConsumerErrorTemplates::kBrokerServerError.Generate(response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response); case HttpCode::NotFound: - return ConsumerErrorTemplates::kBrokerServersNotFound.Generate(response); + return ConsumerErrorTemplates::kUnavailableService.Generate(response); case HttpCode::Conflict: return ErrorFromNoDataResponse(response); default: - return ConsumerErrorTemplates::kBrokerServerError.Generate(response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response); } } @@ -96,7 +96,7 @@ Error ServerDataBroker::ProcessRequest(std::string* response, const RequestInfo& } if (err != nullptr) { current_broker_uri_ = ""; - return ConsumerErrorTemplates::kBrokerServerError.Generate("error processing request: " + err->Explain()); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate("error processing request: " + err->Explain()); } return ErrorFromServerResponce(*response, code); } @@ -114,7 +114,7 @@ Error ServerDataBroker::GetBrokerUri() { err = ProcessRequest(¤t_broker_uri_, ri); if (err != nullptr || current_broker_uri_.empty()) { current_broker_uri_ = ""; - return ConsumerErrorTemplates::kBrokerServersNotFound.Generate(" on " + server_uri_ + return ConsumerErrorTemplates::kUnavailableService.Generate(" on " + server_uri_ + (err != nullptr ? ": " + err->Explain() : "")); } @@ -125,7 +125,7 @@ void ServerDataBroker::ProcessServerError(Error* err, const std::string& respons if (*err == ConsumerErrorTemplates::kNoData) { auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData()); if (error_data == nullptr) { - *err = ConsumerErrorTemplates::kBrokerServerError.Generate("malformed response - " + response); + *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); return; } *op = std::to_string(error_data->id); @@ -156,7 +156,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g ProcessServerError(&err, *response, &request_suffix); - if (err == ConsumerErrorTemplates::kBrokerServerError && request_suffix == "next") { + if (err == ConsumerErrorTemplates::kInterruptedTransaction && request_suffix == "next") { return err; } @@ -207,7 +207,7 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t } if (!info->SetFromJson(response)) { - return ConsumerErrorTemplates::kBrokerServerError.Generate(std::string("malformed response:") + response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(std::string("malformed response:") + response); } return GetDataIfNeeded(info, data); @@ -229,7 +229,7 @@ Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { Error error; *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); if (error) { - return ConsumerErrorTemplates::kIOError.Generate(error->Explain()); + return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); } return nullptr; @@ -347,7 +347,7 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error* (parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded)) || (parse_err = parser.GetUInt64("_id", &id)); if (parse_err) { - *err = ConsumerErrorTemplates::kBrokerServerError.Generate("malformed response:" + parse_err->Explain()); + *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response:" + parse_err->Explain()); return {0, FileInfos{}}; } @@ -355,7 +355,7 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error* for (auto fi_encoded : vec_fi_endcoded) { FileInfo fi; if (!fi.SetFromJson(fi_encoded)) { - *err = ConsumerErrorTemplates::kBrokerServerError.Generate("malformed response:" + fi_encoded); + *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response:" + fi_encoded); return {0, FileInfos{}}; } res.emplace_back(fi); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 487546662f3966ccf3d1fc55e7caa8974bc78a92..5301f836df316b6ef6bea4072fce3b7de29de7f3 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -239,7 +239,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsWrongResponseFromHttpClient) { auto err = data_broker->GetNext(&info, expected_group_id, nullptr); - ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kBrokerServerError)); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("malformed")); } @@ -320,7 +320,7 @@ TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) { data_broker->SetTimeout(300); auto err = data_broker->GetNext(&info, expected_group_id, nullptr); - ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kBrokerServerError)); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("sss")); } @@ -349,7 +349,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) { auto err = data_broker->GetNext(&info, expected_group_id, nullptr); - ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kBrokerServerError)); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); } TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDataNeeded) { @@ -778,7 +778,7 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) { asapo::Error err; auto dataset = data_broker->GetNextDataset(expected_group_id, &err); - ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kBrokerServerError)); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(dataset.content.size(), Eq(0)); ASSERT_THAT(dataset.id, Eq(0)); diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 811d1099f1b0eb46b158c2a9fe2792630c83237a..29ab7d0bcf52c324c62c0fb4481ae1aed7e461a3 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -71,9 +71,9 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef extern from "asapo_consumer.h" namespace "asapo": ErrorTemplateInterface kNoData "asapo::ConsumerErrorTemplates::kNoData" ErrorTemplateInterface kEndOfStream "asapo::ConsumerErrorTemplates::kEndOfStream" - ErrorTemplateInterface kBrokerServersNotFound "asapo::ConsumerErrorTemplates::kBrokerServersNotFound" - ErrorTemplateInterface kBrokerServerError "asapo::ConsumerErrorTemplates::kBrokerServerError" - ErrorTemplateInterface kIOError "asapo::ConsumerErrorTemplates::kIOError" + ErrorTemplateInterface kUnavailableService "asapo::ConsumerErrorTemplates::kUnavailableService" + ErrorTemplateInterface kInterruptedTransaction "asapo::ConsumerErrorTemplates::kInterruptedTransaction" + ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError" ErrorTemplateInterface kWrongInput "asapo::ConsumerErrorTemplates::kWrongInput" cdef cppclass ConsumerErrorData: uint64_t id diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index fd7d3047e61011eef2497c2276f77c3fd4f2d2e6..b4d3d413cbb04f324b0885d17a6c8fb8b80d7dee 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -34,6 +34,15 @@ class AsapoConsumerError(Exception): class AsapoWrongInputError(AsapoConsumerError): pass +class AsapoInterruptedTransactionError(AsapoConsumerError): + pass + +class AsapoUnavailableServiceError(AsapoConsumerError): + pass + +class AsapoLocalIOError(AsapoConsumerError): + pass + class AsapoEndOfStreamError(AsapoConsumerError): def __init__(self,message,id_max=None): AsapoConsumerError.__init__(self,message) @@ -45,7 +54,6 @@ class AsapoNoDataError(AsapoConsumerError): self.id_max = id_max self.id = id - cdef throw_exception(Error& err): cdef ConsumerErrorData* data if err == kEndOfStream: @@ -62,10 +70,15 @@ cdef throw_exception(Error& err): raise AsapoNoDataError(err.get().Explain()) elif err == kWrongInput: raise AsapoWrongInputError(err.get().Explain()) + elif err == kLocalIOError: + raise AsapoLocalIOError(err.get().Explain()) + elif err == kUnavailableService: + raise AsapoUnavailableServiceError(err.get().Explain()) + elif err == kInterruptedTransaction: + raise AsapoInterruptedTransactionError(err.get().Explain()) else: raise AsapoConsumerError(err.get().Explain()) - cdef class PyDataBroker: cdef DataBroker* c_broker def _op(self, op, group_id, meta_only, uint64_t id): diff --git a/deploy/docker/cluster/asapo-start b/deploy/docker/cluster/asapo-start index daa182133aae7af5c75729bb66710bcfb9773098..6b93af566596adcd14a643e6f4a1daafab0942c4 100755 --- a/deploy/docker/cluster/asapo-start +++ b/deploy/docker/cluster/asapo-start @@ -1,12 +1,9 @@ #!/usr/bin/env bash - -if [ ! -f /var/nomad/token_all ]; then +if [ ! -f /var/nomad/token ]; then nomad acl bootstrap > /var/nomad/bootstrap cat /var/nomad/bootstrap | grep Secret | awk '{print $4}' > /var/nomad/token cp /var/nomad/token $TF_VAR_service_dir/nomad_token fi -#export NOMAD_TOKEN=`cat /var/nomad/token ` - cd /var/run/asapo && terraform apply -auto-approve "$@" \ No newline at end of file diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 2851220aaefacaf8378080f8fd2b055ccc5c2f58..a5007b554e13677f6ae23ded0989db4b21db4c08 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -7,19 +7,48 @@ database_name=${beamtime_id}_${stream} token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= set -e +trap Cleanup EXIT +function wait_mongo { +NEXT_WAIT_TIME=0 +until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do + echo "Wait for mongo" + NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ )) + sleep 1 +done +if (( NEXT_WAIT_TIME == 30 )); then + echo "Timeout" + exit -1 +fi +} + + +function kill_mongo { + kill -2 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'` +} + + +function start_mongo { + mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork +} -trap Cleanup EXIT Cleanup() { set +e - nomad stop nginx - nomad stop discovery - nomad stop broker - echo "db.dropDatabase()" | mongo ${database_name} + nomad stop nginx >/dev/null + nomad stop discovery >/dev/null + nomad stop broker >/dev/null + echo "db.dropDatabase()" | mongo --port 27016 ${database_name} >/dev/null rm 1 1_1 + kill_mongo } +sed -i 's/27017/27016/g' discovery.json.tpl + + +start_mongo +wait_mongo + nomad run nginx.nmd nomad run discovery.nmd nomad run broker.nmd @@ -27,21 +56,26 @@ nomad run broker.nmd echo hello1 > 1 echo hello1 > 1_1 - for i in `seq 1 5`; do - echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo --port 27016 ${database_name} >/dev/null done sleep 1 export PYTHONPATH=$1:${PYTHONPATH} +kill_mongo +#python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run broker_server_error +sleep 1 +start_mongo +wait_mongo python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single + #check datasets -echo "db.dropDatabase()" | mongo ${database_name} +echo "db.dropDatabase()" | mongo --port 27016 ${database_name} > /dev/null sleep 1 @@ -53,7 +87,7 @@ do images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" done images=${images#?} - echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo --port 27016 ${database_name} >/dev/null done diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index dcbf21bd35406ee694796e9baed776f76c125abb..a41756a876a87623e083cf823d2e3e75e052c038 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -29,6 +29,16 @@ def assert_eq(val,expected,name): print ('val: ', val,' expected: ',expected) sys.exit(1) + +def check_broker_server_error(broker,group_id_new): + try: + broker.get_last(group_id_new, meta_only=True) + except asapo_consumer.AsapoUnavailableServiceError as err: + print(err) + pass + else: + exit_on_noerr("AsapoBrokerServerError") + def check_single(broker,group_id_new): _, meta = broker.get_next(group_id_new, meta_only=True) @@ -46,9 +56,17 @@ def check_single(broker,group_id_new): assert_metaname(meta,"5","get last1") assert_usermetadata(meta,"get last1") + try: + broker.get_by_id(30, group_id_new, meta_only=True) + except asapo_consumer.AsapoEndOfStreamError: + pass + else: + exit_on_noerr("get_by_id no data") + + try: _, meta = broker.get_next(group_id_new, meta_only=True) - except: + except asapo_consumer.AsapoEndOfStreamError: pass else: exit_on_noerr("get_next3") @@ -79,6 +97,22 @@ def check_single(broker,group_id_new): assert_metaname(meta,"5","get next6") assert_usermetadata(meta,"get next6") + try: + broker.get_next("bla", meta_only=True) + except asapo_consumer.AsapoWrongInputError as err: + print(err) + pass + else: + exit_on_noerr("wrong input") + + + try: + broker.get_last(group_id_new, meta_only=False) + except asapo_consumer.AsapoLocalIOError as err: + print(err) + pass + else: + exit_on_noerr("io error") images = broker.query_images("meta.test = 10") assert_eq(len(images),5,"size of query answer 1") @@ -102,6 +136,17 @@ def check_single(broker,group_id_new): else: exit_on_noerr("wrong query") + broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000) + try: + broker.get_last(group_id_new, meta_only=True) + except asapo_consumer.AsapoUnavailableServiceError as err: + print(err) + pass + else: + exit_on_noerr("AsapoBrokerServersNotFound") + + + def check_dataset(broker,group_id_new): id, metas = broker.get_next_dataset(group_id_new) assert_eq(id,1,"get_next_dataset1") @@ -146,6 +191,9 @@ broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,1000 group_id_new = broker.generate_group_id() +if mode == "broker_server_error": + check_broker_server_error(broker,group_id_new) + if mode == "single": check_single(broker,group_id_new)