From 11a09135fb706cb5e609f71092f637ef9cc84028 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 29 Mar 2019 16:03:12 +0100 Subject: [PATCH] groups for workfer --- CMakeModules/prepare_asapo.cmake | 2 +- broker/src/asapo_broker/server/get_image.go | 1 + broker/src/asapo_broker/server/listroutes.go | 3 +- .../cpp/src/http_client/curl_http_client.cpp | 3 +- .../worker/getnext_broker/getnext_broker.cpp | 18 ++++- .../getnext_broker_python/check_linux.sh | 13 ++- .../getnext_broker_python/check_windows.bat | 13 ++- .../worker/getnext_broker_python/getnext.py | 14 +++- .../worker/process_folder/process_folder.cpp | 2 +- .../next_multithread_broker.cpp | 4 +- .../next_multithread_folder.cpp | 2 +- .../getlast_broker.cpp | 19 ++++- worker/api/cpp/include/worker/data_broker.h | 14 +++- worker/api/cpp/src/folder_data_broker.cpp | 9 ++- worker/api/cpp/src/folder_data_broker.h | 6 +- worker/api/cpp/src/server_data_broker.cpp | 51 +++++++++--- worker/api/cpp/src/server_data_broker.h | 11 +-- .../api/cpp/unittests/test_folder_broker.cpp | 28 +++---- .../api/cpp/unittests/test_server_broker.cpp | 80 ++++++++++++++----- worker/api/python/asapo_worker.pxd | 6 +- worker/api/python/asapo_worker.pyx.in | 26 +++--- 21 files changed, 232 insertions(+), 93 deletions(-) diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 99e4f0a58..f5beccfa7 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -20,7 +20,7 @@ function(prepare_asapo) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin.in receiver.json.tpl @ONLY) endif() - configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) diff --git a/broker/src/asapo_broker/server/get_image.go b/broker/src/asapo_broker/server/get_image.go index e809cc79e..eb576e249 100644 --- a/broker/src/asapo_broker/server/get_image.go +++ b/broker/src/asapo_broker/server/get_image.go @@ -23,6 +23,7 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string func getImage(w http.ResponseWriter, r *http.Request, op string, id int, needGroupID bool) { r.Header.Set("Content-type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") db_name, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { w.WriteHeader(http.StatusBadRequest) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 202fa113f..486905245 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -24,12 +24,11 @@ var listRoutes = utils.Routes{ routeGetByID, }, utils.Route{ - "GetID", + "CreateGroup", "Post", "/creategroup", routeCreateGroupID, }, - utils.Route{ "Health", "Get", diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index 6111559f3..8e80571f6 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -38,8 +38,9 @@ void SetCurlOptions(CURL* curl, bool post, const std::string& data, const std::s if (post) { curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str()); + } else { + curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); } - } HttpCode GetResponseCode(CURL* curl) { diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index c9a497d61..0410877de 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -6,12 +6,16 @@ #include <chrono> #include <iomanip> #include <numeric> +#include <mutex> #include "asapo_worker.h" using std::chrono::high_resolution_clock; using asapo::Error; +std::string group_id = ""; +std::mutex lock; + struct Params { std::string server; std::string file_path; @@ -46,8 +50,20 @@ std::vector<std::thread> StartThreads(const Params& params, broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + while (true) { - err = broker->GetNext(&fi, params.read_data ? &data : nullptr); + err = broker->GetNext(&fi, group_id, params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { diff --git a/examples/worker/getnext_broker_python/check_linux.sh b/examples/worker/getnext_broker_python/check_linux.sh index 8008fce0a..707c48bbc 100644 --- a/examples/worker/getnext_broker_python/check_linux.sh +++ b/examples/worker/getnext_broker_python/check_linux.sh @@ -3,7 +3,7 @@ source_path=dummy database_name=test_run token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= - +group_id=bif31l2uiddd4r0q6b40 #set -e @@ -31,20 +31,25 @@ sleep 1 export PYTHONPATH=$1:${PYTHONPATH} -python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id > out cat out cat out | grep '"size": 100' cat out | grep '"_id": 1' -python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id> out cat out cat out | grep '"_id": 2' -python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id> out cat out cat out | grep '"_id": 3' +python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run new> out +cat out +cat out | grep '"_id": 1' + + #echo $? diff --git a/examples/worker/getnext_broker_python/check_windows.bat b/examples/worker/getnext_broker_python/check_windows.bat index 4f327539e..7bc838965 100644 --- a/examples/worker/getnext_broker_python/check_windows.bat +++ b/examples/worker/getnext_broker_python/check_windows.bat @@ -3,6 +3,7 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= +set group_id=bif31l2uiddd4r0q6b40 c:\opt\consul\nomad run discovery.nmd c:\opt\consul\nomad run broker.nmd @@ -14,20 +15,26 @@ for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x set PYTHONPATH=%1 -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"100" || goto :error type out | findstr /c:"\"_id\": 1" || goto :error -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"\"_id\": 2" || goto :error -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"\"_id\": 3" || goto :error +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% new > out +type out +type out | findstr /c:"100" || goto :error +type out | findstr /c:"\"_id\": 1" || goto :error + + goto :clean :error diff --git a/examples/worker/getnext_broker_python/getnext.py b/examples/worker/getnext_broker_python/getnext.py index b8d10262d..f124d484a 100644 --- a/examples/worker/getnext_broker_python/getnext.py +++ b/examples/worker/getnext_broker_python/getnext.py @@ -4,11 +4,21 @@ import asapo_worker import json import sys -source, path, beamtime, token = sys.argv[1:] +source, path, beamtime, token, group_id = sys.argv[1:] broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) -_, meta, err = broker.get_next(meta_only=True) + +if group_id == "new": + group_id_new, err = broker.generate_group_id() + if err != None: + print ('cannot generate group id, err: ', err) + else: + print ('generated group id: ', group_id_new) +else: + group_id_new = group_id + +_, meta, err = broker.get_next(group_id_new, meta_only=True) if err != None: print ('err: ', err) else: diff --git a/examples/worker/process_folder/process_folder.cpp b/examples/worker/process_folder/process_folder.cpp index 63a6955dd..9c390e945 100644 --- a/examples/worker/process_folder/process_folder.cpp +++ b/examples/worker/process_folder/process_folder.cpp @@ -57,7 +57,7 @@ void ReadAllData(std::unique_ptr<asapo::DataBroker>* broker, Statistics* statist int nfiles = 0; uint64_t size = 0; - while ((err = (*broker)->GetNext(&file_info, &file_data)) == nullptr) { + while ((err = (*broker)->GetNext(&file_info, "", &file_data)) == nullptr) { nfiles++; size += file_info.size; } diff --git a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp index d45921882..ecacc919d 100644 --- a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp @@ -50,11 +50,11 @@ Args GetArgs(int argc, char* argv[]) { void GetAllFromBroker(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); - + auto group_id = broker->GenerateNewGroupId(&err); std::vector<asapo::FileInfos>file_infos(args.nthreads); auto exec_next = [&](int i) { asapo::FileInfo fi; - while ((err = broker->GetNext(&fi, nullptr)) == nullptr) { + while ((err = broker->GetNext(&fi, group_id, nullptr)) == nullptr) { file_infos[i].emplace_back(fi); } printf("%s\n", err->Explain().c_str()); diff --git a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp index 9c7fb9634..434fc7ea0 100644 --- a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp +++ b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp @@ -45,7 +45,7 @@ void GetAllFromBroker(const Args& args) { std::vector<asapo::FileInfo>file_infos(args.nthreads); auto exec_next = [&](int i) { - broker->GetNext(&file_infos[i], nullptr); + broker->GetNext(&file_infos[i], "", nullptr); }; std::vector<std::thread> threads; diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 9bdb7e7ad..95c374790 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -9,6 +9,11 @@ #include "asapo_worker.h" +#include <mutex> + +std::string group_id = ""; +std::mutex lock; + using std::chrono::high_resolution_clock; using asapo::Error; @@ -46,10 +51,22 @@ std::vector<std::thread> StartThreads(const Params& params, broker->SetTimeout(params.timeout_ms); asapo::FileData data; + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + auto start = high_resolution_clock::now(); while (std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count() < params.timeout_ms) { - err = broker->GetLast(&fi, params.read_data ? &data : nullptr); + err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 6f6e468b1..50d6b90ce 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -34,19 +34,25 @@ class DataBroker { //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; //! Receive next image. + /*! + \param err - return nullptr of operation succeed, error otherwise. + \return group id if OK, "" otherwise. + */ + virtual std::string GenerateNewGroupId(Error* err) = 0; + //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param data - where to store image data. Can be set to nullptr only image metadata is needed. - \return Error if both pointers are nullptr or data cannot be read, WorkerErrorCode::OK otherwise. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetNext(FileInfo* info, FileData* data) = 0; + virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param data - where to store image data. Can be set to nullptr only image metadata is needed. - \return Error if both pointers are nullptr or data cannot be read, WorkerErrorCode::OK otherwise. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetLast(FileInfo* info, FileData* data) = 0; + virtual Error GetLast(FileInfo* info, std::string group_id, FileData* data) = 0; virtual ~DataBroker() = default; // needed for unique_ptr to delete itself }; diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 6214c505f..d2a47b65e 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -61,7 +61,7 @@ Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, Fi } -Error FolderDataBroker::GetNext(FileInfo* info, FileData* data) { +Error FolderDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* data) { // could probably use atomic here, but just to make sure (tests showed no performance difference) mutex_.lock(); uint64_t nfile_to_get = ++current_file_; @@ -70,9 +70,14 @@ Error FolderDataBroker::GetNext(FileInfo* info, FileData* data) { return GetFileByIndex(nfile_to_get, info, data); } -Error FolderDataBroker::GetLast(FileInfo* info, FileData* data) { +Error FolderDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { uint64_t nfile_to_get = filelist_.size() - 1; return GetFileByIndex(nfile_to_get, info, data); } +std::string FolderDataBroker::GenerateNewGroupId(Error* err) { + *err = nullptr; + return ""; +} + } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index cbf2005a8..78a2591f8 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -14,9 +14,11 @@ class FolderDataBroker final : public asapo::DataBroker { public: explicit FolderDataBroker(const std::string& source_name); Error Connect() override; - Error GetNext(FileInfo* info, FileData* data) override; - Error GetLast(FileInfo* info, FileData* data) override; + Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; + Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case + std::string GenerateNewGroupId(Error* err) + override; // return "0" always and no error - no group ids for folder datra broker std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch private: diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 99d11e598..db61b1a8c 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -36,7 +36,7 @@ Error HttpCodeToWorkerError(const HttpCode& code) { message = WorkerErrorMessage::kNoData; return TextErrorWithType(message, ErrorType::kEndOfFile); default: - message = WorkerErrorMessage::kErrorReadingSource; + message = WorkerErrorMessage::kUnknownIOError; break; } return Error{new HttpError(message, code)}; @@ -88,10 +88,14 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + token_; } -Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri) { +Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri, bool post) { Error err; HttpCode code; - *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err); + if (post) { + *response = httpclient__->Post(RequestWithToken(request_uri), "", &code, &err); + } else { + *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err); + } if (err != nullptr) { current_broker_uri_ = ""; return err; @@ -106,7 +110,7 @@ Error ServerDataBroker::GetBrokerUri() { std::string request_uri = server_uri_ + "/discovery/broker"; Error err; - err = ProcessRequest(¤t_broker_uri_, request_uri); + err = ProcessRequest(¤t_broker_uri_, request_uri, false); if (err != nullptr || current_broker_uri_.empty()) { current_broker_uri_ = ""; return TextError("cannot get broker uri from " + server_uri_); @@ -115,15 +119,15 @@ Error ServerDataBroker::GetBrokerUri() { } -Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, GetImageServerOperation op) { - std::string request_suffix = OpToUriCmd(op); +Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op) { + std::string request_suffix = std::move(group_id) + "/" + OpToUriCmd(op); uint64_t elapsed_ms = 0; std::string response; while (true) { auto err = GetBrokerUri(); if (err == nullptr) { std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/"; - err = ProcessRequest(&response, request_api + request_suffix); + err = ProcessRequest(&response, request_api + request_suffix, false); if (err == nullptr) { break; } @@ -145,12 +149,12 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, GetImageServerOper return nullptr; } -Error ServerDataBroker::GetNext(FileInfo* info, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetNext, info, data); +Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* data) { + return GetImageFromServer(GetImageServerOperation::GetNext, std::move(group_id), info, data); } -Error ServerDataBroker::GetLast(FileInfo* info, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetLast, info, data); +Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { + return GetImageFromServer(GetImageServerOperation::GetLast, std::move(group_id), info, data); } std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { @@ -163,12 +167,13 @@ std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { return ""; } -Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, FileInfo* info, FileData* data) { +Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, + FileData* data) { if (info == nullptr) { return TextError(WorkerErrorMessage::kWrongInput); } - auto err = GetFileInfoFromServer(info, op); + auto err = GetFileInfoFromServer(info, std::move(group_id), op); if (err != nullptr) { return err; } @@ -202,4 +207,24 @@ Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* dat return net_client__->GetData(info, data); } +std::string ServerDataBroker::GenerateNewGroupId(Error* err) { + uint64_t elapsed_ms = 0; + std::string response; + while (elapsed_ms <= timeout_ms_) { + *err = GetBrokerUri(); + if (*err == nullptr) { + std::string request = current_broker_uri_ + "/creategroup"; + *err = ProcessRequest(&response, request, true); + if (*err == nullptr) { + return response; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; + } + + *err = TextErrorWithType("exit on timeout, last error: " + (*err)->Explain(), asapo::ErrorType::kTimeOut); + return ""; +} + } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 34f8eebd3..80dfe0d7c 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -19,8 +19,9 @@ class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, std::string source_name, std::string token); Error Connect() override; - Error GetNext(FileInfo* info, FileData* data) override; - Error GetLast(FileInfo* info, FileData* data) override; + Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; + Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; + std::string GenerateNewGroupId(Error* err) override; void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch @@ -28,12 +29,12 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<NetClient> net_client__; private: std::string RequestWithToken(std::string uri); - Error GetFileInfoFromServer(FileInfo* info, GetImageServerOperation op); + Error GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); - Error ProcessRequest(std::string* response, std::string request_uri); - Error GetImageFromServer(GetImageServerOperation op, FileInfo* info, FileData* data); + Error ProcessRequest(std::string* response, std::string request_uri, bool post); + Error GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, FileData* data); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); std::string OpToUriCmd(GetImageServerOperation op); diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index f7e3de194..6d8beba96 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -134,7 +134,7 @@ TEST_F(FolderDataBrokerTests, ConnectReturnsUnknownIOError) { } TEST_F(FolderDataBrokerTests, GetNextWithoutConnectReturnsError) { - auto err = data_broker->GetNext(nullptr, nullptr); + auto err = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kSourceNotConnected)); } @@ -142,7 +142,7 @@ TEST_F(FolderDataBrokerTests, GetNextWithoutConnectReturnsError) { TEST_F(FolderDataBrokerTests, GetNextWithNullPointersReturnsError) { data_broker->Connect(); - auto err = data_broker->GetNext(nullptr, nullptr); + auto err = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kWrongInput)); } @@ -151,7 +151,7 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("1")); @@ -164,7 +164,7 @@ TEST_F(FolderDataBrokerTests, GetLastReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetLast(&fi, nullptr); + auto err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("3")); @@ -176,9 +176,9 @@ TEST_F(FolderDataBrokerTests, GetLastSecondTimeReturnsSameFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetLast(&fi, nullptr); + auto err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); - err = data_broker->GetLast(&fi, nullptr); + err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("3")); @@ -191,9 +191,9 @@ TEST_F(FolderDataBrokerTests, GetLastSecondTimeReturnsSameFileInfo) { TEST_F(FolderDataBrokerTests, SecondNextReturnsAnotherFileInfo) { data_broker->Connect(); FileInfo fi; - data_broker->GetNext(&fi, nullptr); + data_broker->GetNext(&fi, "", nullptr); - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("2")); @@ -204,7 +204,7 @@ TEST_F(FolderDataBrokerTests, GetNextFromEmptyFolderReturnsError) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_TRUE(asapo::ErrorTemplates::kEndOfFile == err); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kNoData)); @@ -216,7 +216,7 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsErrorWhenFilePermissionsDenied) { FileInfo fi; FileData data; - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::IOErrorTemplates::kPermissionDenied.Generate()->Explain())); } @@ -244,7 +244,7 @@ TEST_F(GetDataFromFileTests, GetNextCallsGetDataFileWithFileName) { EXPECT_CALL(mock, GetDataFromFile_t(std::string("/path/to/file") + asapo::kPathSeparator + "1", _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(static_cast<SimpleError*>(nullptr)), testing::Return(nullptr))); - data_broker->GetNext(&fi, &data); + data_broker->GetNext(&fi, "", &data); } @@ -252,7 +252,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsDataAndInfo) { EXPECT_CALL(mock, GetDataFromFile_t(_, _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'}))); - data_broker->GetNext(&fi, &data); + data_broker->GetNext(&fi, "", &data); ASSERT_THAT(data[0], Eq('1')); ASSERT_THAT(fi.name, Eq("1")); @@ -264,7 +264,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotReadData) { WillOnce(DoAll(testing::SetArgPointee<2>(asapo::IOErrorTemplates::kReadError.Generate().release()), testing::Return(nullptr))); - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::IOErrorTemplates::kReadError.Generate()->Explain())); } @@ -274,7 +274,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotAllocateData) { WillOnce(DoAll(testing::SetArgPointee<2>(asapo::ErrorTemplates::kMemoryAllocationError.Generate().release()), testing::Return(nullptr))); - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::ErrorTemplates::kMemoryAllocationError.Generate()->Explain())); } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 036c08d42..7c7216e33 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -61,6 +61,8 @@ class ServerDataBrokerTests : public Test { std::string expected_path = "/tmp/beamline/beamtime"; std::string expected_filename = "filename"; std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename; + std::string expected_group_id = "groupid"; + static const uint64_t expected_buf_id = 123; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { @@ -123,30 +125,32 @@ TEST_F(ServerDataBrokerTests, CanConnect) { } TEST_F(ServerDataBrokerTests, GetImageReturnsErrorOnWrongInput) { - auto return_code = data_broker->GetNext(nullptr, nullptr); + auto return_code = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(return_code->Explain(), Eq(asapo::WorkerErrorMessage::kWrongInput)); } TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/next?token=" + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token=" + + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/last?token=" + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/last?token=" + + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - data_broker->GetLast(&info, nullptr); + data_broker->GetLast(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClient) { @@ -157,7 +161,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); @@ -171,7 +175,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNotAuthorized) { SetArgPointee<2>(nullptr), Return(""))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("authorization")); @@ -186,7 +190,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsWrongResponseFromHttpClient) { SetArgPointee<2>(nullptr), Return("id"))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse")); } @@ -199,7 +203,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerAddressNotFound) { Return(""))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); } @@ -212,7 +216,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerUriEmpty) { Return(""))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); } @@ -222,12 +226,12 @@ TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) { MockGet("error_response"); data_broker->SetTimeout(100); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).Times(0); MockGet("error_response"); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetBrokerUriAgainAfterConnectionError) { @@ -235,12 +239,12 @@ TEST_F(ServerDataBrokerTests, GetBrokerUriAgainAfterConnectionError) { MockGetError(); data_broker->SetTimeout(0); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); MockGetBrokerUri(); MockGet("error_response"); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClientUntilTimeout) { @@ -258,7 +262,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClientUntilTimeout) { Return("{\"id\":1}"))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); } @@ -271,7 +275,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) { MockGet(json); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Eq(nullptr)); @@ -284,7 +288,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) { TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) { MockGetBrokerUri(); MockGet("error_response"); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kErrorReadingSource)); } @@ -296,7 +300,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDtataNeeded) { EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0); EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageTriesToGetDataFromMemoryCache) { @@ -309,7 +313,7 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetDataFromMemoryCache) { EXPECT_CALL(mock_netclient, GetData_t(&info, &data)).WillOnce(Return(nullptr)); MockReadDataFromFile(0); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); ASSERT_THAT(info.buf_id, Eq(expected_buf_id)); @@ -327,7 +331,7 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfCannotReadFromCache) { &data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); MockReadDataFromFile(); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); ASSERT_THAT(info.buf_id, Eq(0)); } @@ -344,7 +348,41 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfZeroBufId) { MockReadDataFromFile(); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); +} + + +TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), "", _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<2>(HttpCode::BadRequest), + SetArgPointee<3>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + asapo::Error err; + auto groupid = data_broker->GenerateNewGroupId(&err); + if (err != nullptr ) { + ASSERT_THAT(err->Explain(), HasSubstr("timeout")); + } + ASSERT_THAT(groupid, Eq("")); +} + + +TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), "", _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(expected_group_id))); + + data_broker->SetTimeout(100); + asapo::Error err; + auto groupid = data_broker->GenerateNewGroupId(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(groupid, Eq(expected_group_id)); } diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 06c2a9e5e..025c84eee 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -28,9 +28,9 @@ cdef extern from "asapo_worker.h" namespace "asapo": cdef cppclass DataBroker: DataBroker() except + void SetTimeout(uint64_t timeout_ms) - Error GetNext(FileInfo* info, FileData* data) - Error GetLast(FileInfo* info, FileData* data) - + Error GetNext(FileInfo* info, string group_id, FileData* data) + Error GetLast(FileInfo* info, string group_id, FileData* data) + string GenerateNewGroupId(Error* err) cdef extern from "asapo_worker.h" namespace "asapo": cdef cppclass DataBrokerFactory: diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index 8313f7539..2bb14cded 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -26,15 +26,15 @@ cdef bytes _bytes(s): cdef class PyDataBroker: cdef DataBroker* c_broker - def _op(self, op, meta_only): + def _op(self, op, group_id, meta_only): cdef FileInfo info cdef FileData data cdef Error err cdef np.npy_intp dims[1] if op == "next": - err = self.c_broker.GetNext(&info,<FileData*>NULL if meta_only else &data) + err = self.c_broker.GetNext(&info, _bytes(group_id), <FileData*>NULL if meta_only else &data) elif op == "last": - err = self.c_broker.GetLast(&info,<FileData*>NULL if meta_only else &data) + err = self.c_broker.GetLast(&info,_bytes(group_id), <FileData*>NULL if meta_only else &data) err_str = _str(GetErrorString(&err)) if err_str.strip(): return None,None,err_str @@ -49,13 +49,19 @@ cdef class PyDataBroker: del meta['lastchange'] arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) return arr,meta,None - - - def get_next(self, meta_only = True): - return self._op("next",meta_only) - def get_last(self, meta_only = True): - return self._op("last",meta_only) - + def get_next(self, group_id, meta_only = True): + return self._op("next",group_id,meta_only) + def get_last(self, group_id, meta_only = True): + return self._op("last",group_id,meta_only) + def generate_group_id(self): + cdef Error err + cdef string group_id + group_id = self.c_broker.GenerateNewGroupId(&err) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None, err_str + else: + return _str(group_id), None cdef class PyDataBrokerFactory: cdef DataBrokerFactory c_factory -- GitLab