Commit e6dbcd53 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Changed the logging of successful Retrieve and Archive jobs to differentiate...

Changed the logging of successful Retrieve and Archive jobs to differentiate between User and Repack jobs
parent e71ba128
......@@ -234,7 +234,8 @@ while test 0 != ${ARCHIVING}; do
ARCHIVING=$((${TO_BE_ARCHIVED} - ${ARCHIVED}))
NB_TAPE_NOT_FULL=`admin_cta --json ta ls --all | jq "[.[] | select(.full == false)] | length"`
if test ${NB_TAPE_NOT_FULL} == 0; then
if [[ ${NB_TAPE_NOT_FULL} == 0 ]]
then
echo "$(date +%s): All tapes are full, exiting archiving loop"
break
fi
......
......@@ -108,7 +108,7 @@ done
nbDestinationTape=${#destinationTapes[@]}
# Compute the number of files to copy and the size of each file
fileSizeToCopy=`perl -e "print int((${SIZE_OF_TAPES} / ${NB_FILES_PER_TAPE}) * 1000)"`
fileSizeToCopy=`perl -e "use POSIX; print int( ceil((( (${SIZE_OF_TAPES} * 1000) - ((6 * 80) / 1000)) / ${NB_FILES_PER_TAPE})) )"`
nbFilesToCopy=$(($NB_FILES_PER_TAPE * $nbTapesToRepack))
echo
......@@ -127,9 +127,8 @@ allPid=()
for vid in ${tapesToRepack[@]}
do
echo "Launching repack requests on vid $vid"
bash /root/repack_systemtest.sh -v $vid -b ${REPACK_BUFFER_URL} &
bash /root/repack_systemtest.sh -v $vid -b ${REPACK_BUFFER_URL} -t 500 &
allPid+=($!)
#admin_cta repack add --vid $vid -m -b root://ctaeos//eos/ctaeos/repack
done
oneRepackFailed=0
......
......@@ -47,7 +47,7 @@ kubectl -n ${NAMESPACE} cp repack_generate_report.sh client:/root/repack_generat
NB_FILES_PER_TAPE=1000
SIZE_OF_TAPES=2
SIZE_OF_TAPES=10
REPACK_BUFFER_URL=/eos/ctaeos/repack
echo "Creating the repack buffer URL directory (${REPACK_BUFFER_URL})"
......
......@@ -12,7 +12,7 @@ die() {
}
usage() { cat <<EOF 1>&2
Usage: $0 -v <vid> -b <bufferURL> [-e <eosinstance>] [-t <timeout>] [-a] [m]
Usage: $0 -v <vid> -b <bufferURL> [-e <eosinstance>] [-t <timeout>] [-a] [-m] [-d]
(bufferURL example : /eos/ctaeos/repack)
eosinstance : the name of the ctaeos instance to be used (default ctaeos)
timeout : the timeout in seconds to wait for the repack to be done
......
......@@ -48,7 +48,7 @@ class ArchiveJob {
*/
friend class ArchiveMount;
friend class Scheduler;
protected:
/**
* Constructor.
......
......@@ -169,6 +169,12 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
cta::log::ScopedParamContainer params(logContext);
params.add("tapeVid",job->tapeFile.vid)
.add("mountType",cta::common::dataStructures::toString(job->m_mount->getMountType()))
.add("fileId",job->archiveFile.archiveFileID)
.add("type", "ReportSuccessful");
logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job succesful.");
tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release());
files++;
bytes+=job->archiveFile.fileSize;
......
......@@ -3630,6 +3630,14 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
if (osdbJob->isRepack) {
try {
osdbJob->m_jobSucceedForRepackReporter->wait();
{
cta::log::ScopedParamContainer spc(lc);
std::string vid = osdbJob->archiveFile.tapeFiles.at(osdbJob->selectedCopyNb).vid;
spc.add("tapeVid",vid)
.add("mountType","RetrieveForRepack")
.add("fileId",osdbJob->archiveFile.archiveFileID);
lc.log(cta::log::INFO,"In OStoreDB::RetrieveMount::flushAsyncSuccessReports(), retrieve job successful");
}
mountPolicy = osdbJob->m_jobSucceedForRepackReporter->m_MountPolicy;
jobsToRequeueForRepackMap[osdbJob->m_repackInfo.repackRequestAddress].emplace_back(osdbJob);
} catch (cta::exception::Exception & ex) {
......@@ -3644,6 +3652,15 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
} else {
try {
osdbJob->m_jobDelete->wait();
{
//Log for monitoring
cta::log::ScopedParamContainer spc(lc);
std::string vid = osdbJob->archiveFile.tapeFiles.at(osdbJob->selectedCopyNb).vid;
spc.add("tapeVid",vid)
.add("mountType","RetrieveForUser")
.add("fileId",osdbJob->archiveFile.archiveFileID);
lc.log(cta::log::INFO,"In OStoreDB::RetrieveMount::flushAsyncSuccessReports(), retrieve job successful");
}
osdbJob->retrieveRequest.lifecycleTimings.completed_time = time(nullptr);
std::string requestAddress = osdbJob->m_retrieveRequest.getAddressIfSet();
......@@ -3652,7 +3669,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
cta::common::dataStructures::LifecycleTimings requestTimings = osdbJob->retrieveRequest.lifecycleTimings;
log::ScopedParamContainer params(lc);
params.add("requestAddress",requestAddress)
.add("archiveFileID",osdbJob->archiveFile.archiveFileID)
.add("fileId",osdbJob->archiveFile.archiveFileID)
.add("vid",osdbJob->m_retrieveMount->mountInfo.vid)
.add("timeForSelection",requestTimings.getTimeForSelection())
.add("timeForCompletion", requestTimings.getTimeForCompletion());
......
......@@ -485,7 +485,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
}
}
double elapsedTime = 0;
bool stopExpansion = false;
bool expansionTimeReached = false;
std::list<common::dataStructures::StorageClass> storageClasses;
if(repackInfo.type == RepackType::AddCopiesOnly || repackInfo.type == RepackType::MoveAndAddCopies)
......@@ -494,12 +494,11 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
repackRequest->m_dbReq->setExpandStartedAndChangeStatus();
uint64_t nbRetrieveSubrequestsQueued = 0;
while(archiveFilesForCatalogue.hasMore() && !stopExpansion) {
while(archiveFilesForCatalogue.hasMore() && !expansionTimeReached) {
size_t filesCount = 0;
uint64_t maxAddedFSeq = 0;
std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests;
while(filesCount < c_defaultMaxNbFilesForRepack && !stopExpansion && archiveFilesForCatalogue.hasMore())
{
while(filesCount < c_defaultMaxNbFilesForRepack && !expansionTimeReached && archiveFilesForCatalogue.hasMore()){
filesCount++;
fSeq++;
retrieveSubrequests.push_back(cta::SchedulerDatabase::RepackRequest::Subrequest());
......@@ -617,7 +616,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str();
}
}
stopExpansion = (elapsedTime >= m_repackRequestExpansionTimeLimit);
expansionTimeReached = (elapsedTime >= m_repackRequestExpansionTimeLimit);
}
// Note: the highest fSeq will be recorded internally in the following call.
// We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted).
......@@ -631,7 +630,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
}
timingList.insertAndReset("addSubrequestsAndUpdateStatsTime",t);
{
if(!stopExpansion && archiveFilesForCatalogue.hasMore()){
if(!expansionTimeReached && archiveFilesForCatalogue.hasMore()){
log::ScopedParamContainer params(lc);
params.add("tapeVid",repackInfo.vid);
timingList.addToLog(params);
......@@ -643,13 +642,12 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
params.add("tapeVid",repackInfo.vid);
timingList.addToLog(params);
if(archiveFilesForCatalogue.hasMore()){
if(stopExpansion){
repackRequest->m_dbReq->requeueInToExpandQueue(lc);
lc.log(log::INFO,"Expansion time reached, Repack Request requeued in ToExpand queue.");
}
repackRequest->m_dbReq->requeueInToExpandQueue(lc);
lc.log(log::INFO,"Repack Request requeued in ToExpand queue.");
} else {
if(totalStatsFile.totalFilesToRetrieve == 0 || nbRetrieveSubrequestsQueued == 0){
//If no files have been retrieve, the repack buffer will have to be deleted
//TODO : in case of Repack tape repair, we should not try to delete the buffer
deleteRepackBuffer(std::move(dir));
}
repackRequest->m_dbReq->expandDone();
......
......@@ -56,7 +56,7 @@ MigrationReportPacker::~MigrationReportPacker(){
//reportCompletedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportCompletedJob(
std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) {
std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) {
std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob)));
cta::log::ScopedParamContainer params(lc);
params.add("type", "ReportSuccessful");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment