Commit 30667345 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Created the queue for ArchiveRequests that come from RetrieveRequest

Started to queue the ArchiveRequest for Repack
parent c00e1253
......@@ -50,7 +50,7 @@ echo " Archiving ${NB_FILES} files of ${FILE_SIZE_KB}kB each"
echo " Archiving files: xrdcp as user1"
echo " Retrieving them as poweruser1"
kubectl -n ${NAMESPACE} cp client_ar.sh client:/root/client_ar.sh
kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 250 -d /eos/ctaeos/preprod -v -r || exit 1
kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -r || exit 1
kubectl -n ${NAMESPACE} exec ctaeos -- bash /root/grep_xrdlog_mgm_for_error.sh || exit 1
......
......@@ -149,5 +149,13 @@ class ArchiveQueueToReport: public ArchiveQueue {
class ArchiveQueueFailed: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToTransferForRepack: public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
}}
......@@ -493,4 +493,13 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToUser;
};
template<>
struct ContainerTraits<ArchiveQueue, ArchiveQueueToTransferForRepack>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer;
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToRepackForSuccess;
};
}} // namespace cta::objectstore
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ArchiveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::c_containerTypeName = "ArchiveQueueToReportToRepackForSuccess";
}
}
\ No newline at end of file
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ArchiveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::c_containerTypeName = "ArchiveQueueToTransferForRepack";
}
}
\ No newline at end of file
......@@ -887,4 +887,3 @@ std::string ArchiveRequest::getTapePoolForJob(uint16_t copyNumber) {
}} // namespace cta::objectstore
......@@ -66,11 +66,14 @@ add_library (ctaobjectstore SHARED
ArchiveQueueToTransferAlgorithms.cpp
ArchiveQueueToReportAlgorithms.cpp
ArchiveQueueFailedAlgorithms.cpp
ArchiveQueueToReportToRepackForSuccessAlgorithms.cpp
ArchiveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
RetrieveQueueToTransferAlgorithms.cpp
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
JobQueueType.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
......@@ -80,7 +83,6 @@ add_library (ctaobjectstore SHARED
RepackRequest.cpp
RepackQueue.cpp
RepackQueuePendingAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
RepackQueueToExpandAlgorithms.cpp
RepackQueueType.cpp
BackendVFS.cpp
......
......@@ -183,6 +183,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool
case JobQueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break;
case JobQueueType::JobsToReportToUser: archiveQueueNameHeader+="ToReport"; break;
case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break;
case JobQueueType::JobsToReportToRepackForSuccess: archiveQueueNameHeader+="ToReportToRepackForSuccess"; break;
default: break;
}
std::string archiveQueueAddress = agentRef.nextId(archiveQueueNameHeader+"-"+tapePool);
......@@ -794,7 +795,7 @@ void RootEntry::clearRepackQueueAddress(RepackQueueType queueType) {
case RepackQueueType::Pending:
if (!m_payload.has_repackrequestspendingqueuepointer())
throw NoSuchRepackQueue("In RootEntry::clearRepackQueueAddress: pending queue no set.");
m_payload.mutable_repackrequestspendingqueuepointer()->Clear();
return m_payload.mutable_repackrequestspendingqueuepointer()->Clear();
case RepackQueueType::ToExpand:
if (!m_payload.has_repackrequeststoexpandqueuepointer())
throw NoSuchRepackQueue("In RootEntry::clearRepackQueueAddress: toExpand queue not set.");
......
......@@ -120,6 +120,7 @@ message RootEntry {
repeated RetrieveQueuePointer failedretrievejobsqueuepointers = 1065;
repeated RetrieveQueuePointer retrieve_queue_to_report_to_repack_for_success_pointers = 1066;
repeated ArchiveQueuePointer archivejobstoreportqueuepointers = 1068;
repeated ArchiveQueuePointer archive_jobs_to_transfer_for_repack_pointers = 1069;
optional DriveRegisterPointer driveregisterpointer = 1070;
optional AgentRegisterPointer agentregisterpointer = 1080;
optional RepackIndexPointer repackindexpointer = 1085;
......
......@@ -639,6 +639,25 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool).");
}
void OStoreDB::queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& logContext){
// objectstore::ScopedExclusiveLock rReqL(request);
// request.fetch();
auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
auto * mutexForHelgrindAddr = mutexForHelgrind.release();
std::unique_ptr<cta::objectstore::ArchiveRequest> aReqUniqPtr;
aReqUniqPtr.reset(&request);
cta::objectstore::ArchiveRequest *aReqPtr = aReqUniqPtr.release();
auto * et = new EnqueueingTask([aReqPtr, mutexForHelgrindAddr, this]{
//TODO : queue the ArchiveRequest
});
ANNOTATE_HAPPENS_BEFORE(et);
mlForHelgrind.unlock();
m_enqueueingTasksQueue.push(et);
//TODO Time measurement
logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): recorded request for queueing (enqueueing posted to thread pool).");
}
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
......@@ -3176,7 +3195,6 @@ std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> OStoreDB::getNextS
if(jobs.elements.empty()) continue;
for(auto &j : jobs.elements)
{
//TODO : If the retrieve request has more than one job, it will be inserted. Should we filter it ?
ret.emplace_back(std::move(j.retrieveRequest));
}
return ret;
......
......@@ -267,6 +267,8 @@ public:
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override;
void queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& logContext) override;
std::map<std::string, std::list<common::dataStructures::ArchiveJob>> getArchiveJobs() const override;
......
......@@ -110,6 +110,10 @@ public:
return m_OStoreDB.queueArchive(instanceName, request, criteria, logContext);
}
void queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& lc) override {
return m_OStoreDB.queueArchiveForRepack(request,lc);
}
void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& cliIdentity, const std::string& remoteFile) override {
m_OStoreDB.deleteRetrieveRequest(cliIdentity, remoteFile);
}
......
......@@ -172,6 +172,30 @@ void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std:
lc.log(log::INFO, "Queued archive request");
}
void Scheduler::queueArchiveRequestForRepackBatch(std::list<cta::objectstore::ArchiveRequest> &archiveRequests,log::LogContext &lc)
{
for(auto& archiveRequest : archiveRequests){
objectstore::ScopedExclusiveLock rReqL(archiveRequest);
archiveRequest.fetch();
cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile();
rReqL.release();
this->m_db.queueArchiveForRepack(archiveRequest,lc);
cta::log::TimingList tl;
utils::Timer t;
tl.insOrIncAndReset("schedulerDbTime", t);
log::ScopedParamContainer spc(lc);
spc.add("instanceName", archiveFile.diskInstance)
.add("storageClass", archiveFile.storageClass)
.add("diskFileID", archiveFile.diskFileId)
.add("fileSize", archiveFile.fileSize)
.add("fileId", archiveFile.archiveFileID);
tl.insertOrIncrement("schedulerDbTime",t.secs());
tl.addToLog(spc);
lc.log(log::INFO,"Queued repack archive request");
}
}
//------------------------------------------------------------------------------
// queueRetrieve
//------------------------------------------------------------------------------
......
......@@ -49,6 +49,7 @@
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/RepackRequest.hpp"
#include "objectstore/RetrieveRequest.hpp"
#include "objectstore/ArchiveRequest.hpp"
#include "eos/DiskReporter.hpp"
#include "eos/DiskReporterFactory.hpp"
......@@ -135,6 +136,13 @@ public:
void queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc);
/**
* Queue the ArchiveRequests that have been transformed from Repack RetrieveRequests
* @param archiveRequests the list of the ArchiveRequests to queue into the ArchiveQueueToTransferForRepack queue.
* @param lc a log context allowing logging from within the scheduler routine.
*/
void queueArchiveRequestForRepackBatch(std::list<cta::objectstore::ArchiveRequest> &archiveRequests,log::LogContext &lc);
/**
* Queue a retrieve request.
* Throws a UserError exception in case of wrong request parameters (ex. unknown file id)
......
......@@ -76,6 +76,7 @@ class UserIdentity;
class RepackRequest;
namespace objectstore{
class RetrieveRequest;
class ArchiveRequest;
}
} // cta
......@@ -116,6 +117,8 @@ public:
virtual void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria,
log::LogContext &logContext) = 0;
virtual void queueArchiveForRepack(cta::objectstore::ArchiveRequest &request,log::LogContext &logContext) = 0;
/**
* Returns all of the queued archive jobs. The returned jobs are
......
......@@ -1404,29 +1404,34 @@ TEST_P(SchedulerTest, expandRepackRequest) {
//Now, we will transform the RetrieveRequests into ArchiveRequest
{
//Now, we will transform the RetrieveRequests into ArchiveRequest
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer>> transformers;
for (auto &retrieveRequest: listSucceededRetrieveRequests) {
std::unique_ptr<cta::objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer> retrieveToArchiveTransformer;
retrieveRequestsAddresses.push_back(retrieveRequest->getAddressIfSet());
retrieveToArchiveTransformer.reset(retrieveRequest->asyncTransformToArchiveRequest(*agentReference));
retrieveToArchiveTransformer->wait();
transformers.emplace_back(retrieveRequest->asyncTransformToArchiveRequest(*agentReference));
}
//Wait for all the asynchronous transformations
for (auto &transformer: transformers) {
transformer->wait();
}
}
}
//Testing that the RetrieveQueueToReportToRepackForSuccess is empty
ASSERT_EQ(schedulerDB.getNextSucceededRetrieveRequestForRepackBatch(10,lc).size(),0);
ASSERT_EQ(schedulerDB.getNextSucceededRetrieveRequestForRepackBatch(nbArchiveFilesPerTape,lc).size(),0);
//Testing the new ArchiveRequests contains the same data as the previous RetrieveRequest
archiveFileId = 1;
int retrieveRequestAddressIndex = 0;
std::map<uint64_t,std::list<cta::objectstore::ArchiveRequest>> archiveRequestsPerTape;
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
{
for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j)
{
cta::objectstore::ArchiveRequest archiveRequest(retrieveRequestsAddresses.at(retrieveRequestAddressIndex),schedulerDB.getBackend());
archiveRequestsPerTape[i].emplace_back(archiveRequest);
cta::objectstore::ScopedExclusiveLock arLock(archiveRequest);
archiveRequest.fetch();
//initialize variable for the test
//Test the ArchiveRequest
common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile();
ASSERT_EQ(archiveFile.archiveFileID,archiveFileId);
ASSERT_EQ(archiveFile.checksumType,checksumType);
......@@ -1452,7 +1457,12 @@ TEST_P(SchedulerTest, expandRepackRequest) {
++retrieveRequestAddressIndex;
++archiveFileId;
}
}
}
//queue all the ArchiveRequests into the ArchiveQueueToTransferForRepack queue.
for(uint64_t i = 1; i <= nbTapesForTest; ++i){
scheduler.queueArchiveRequestForRepackBatch(archiveRequestsPerTape[i],lc);
scheduler.waitSchedulerDbSubthreadsComplete();
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment