diff --git a/continuousintegration/orchestration/tests/client_prepare_file.sh b/continuousintegration/orchestration/tests/client_prepare_file.sh new file mode 100644 index 0000000000000000000000000000000000000000..f39d5c0a7ca4f52e54df254cbaa004118fef2fa3 --- /dev/null +++ b/continuousintegration/orchestration/tests/client_prepare_file.sh @@ -0,0 +1,82 @@ +#!/bin/sh + +EOSINSTANCE=ctaeos +EOS_BASEDIR=/eos/ctaeos/cta + +die() { + echo "$@" 1>&2 + test -z $TAILPID || kill ${TAILPID} &> /dev/null + exit 1 +} + +usage() { cat <<EOF 1>&2 +Usage: $0 [-e EOSINSTANCE] +EOF +exit 1 +} + +FILES_LOCATION=( ) + +while getopts "e:f:" o; do + case "${o}" in + f) + FILES_LOCATION+=( "$OPTARG" ) + ;; + e) + EOSINSTANCE=${OPTARG} + ;; + esac +done +shift $((OPTIND-1)) + +if [ "x${FILES_LOCATION}" = "x" ]; then + die "Files location in a list should be provided" +fi + +# get some common useful helpers for krb5 +. /root/client_helper.sh + +nbFilesToRetrieve=${#FILES_LOCATION[@]} + +KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 + +echo ${FILES_LOCATION[@]} | XrdSecPROTOCOL=krb5 KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 xargs --max-procs=10 -n 40 xrdfs ${EOSINSTANCE} prepare + +nbFilesRetrieved=0 +SECONDS_PASSED=0 +WAIT_FOR_RETRIEVED_FILE_TIMEOUT=200 + +declare -A directoriesNbFiles +for filePath in ${FILES_LOCATION[@]} +do + directoriesNbFiles[$(dirname $filePath)]=0 +done + +while [[ $nbFilesRetrieved != $nbFilesToRetrieve ]] +do + nbFilesRetrieved=0 + for directory in ${!directoriesNbFiles[@]} + do + nbFilesRetrieved=$((nbFilesRetrieved + `eos root://${EOSINSTANCE} ls -y ${directory} | egrep '^d[1-9][0-9]*::t1' | wc -l`)) + done + sleep 1 + let SECONDS_PASSED=SECONDS_PASSED+1 + echo "Waiting for file to be retrieved. Seconds passed = $SECONDS_PASSED" + if test ${SECONDS_PASSED} == ${WAIT_FOR_RETRIEVED_FILE_TIMEOUT}; then + die "$(date +%s): Timed out after ${WAIT_FOR_RETRIEVED_FILE_TIMEOUT} seconds waiting for files to be retrieved from tape" + fi +done + +#fileRetrieved=0 +#SECONDS_PASSED=0 +#WAIT_FOR_RETRIEVED_FILE_TIMEOUT=50 +#while [[ $fileRetrieved != 1 ]] +#do +# fileRetrieved=`eos root://${EOSINSTANCE} ls -y ${FILE_LOCATION} | egrep '^d[1-9][0-9]*::t1' | wc -l` +# sleep 1 +# let SECONDS_PASSED=SECONDS_PASSED+1 +# echo "Waiting for file to be retrieved. Seconds passed = $SECONDS_PASSED" +# if test ${SECONDS_PASSED} == ${WAIT_FOR_RETRIEVED_FILE_TIMEOUT}; then +# die "$(date +%s): Timed out after ${WAIT_FOR_RETRIEVED_FILE_TIMEOUT} seconds waiting for file to be retrieved from tape" +# fi +#done \ No newline at end of file diff --git a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh index c39b6487a872f5b3dcb158ab287e7ee1a4573b4d..d716209c1dbd9da7a12c311a9be10aed43e78920 100755 --- a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh +++ b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh @@ -32,17 +32,21 @@ fi echo "Preparing namespace for the tests" ./prepare_tests.sh -n ${NAMESPACE} -NB_FILES=1 -FILE_SIZE_KB=15 - kubectl -n ${NAMESPACE} cp client_helper.sh client:/root/client_helper.sh +kubectl -n ${NAMESPACE} cp client_prepare_file.sh client:/root/client_prepare_file.sh + +archiveFiles() { + NB_FILES=$1 + FILE_SIZE_KB=$2 + kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1 +} echo echo "Launching client_ar.sh on client pod" echo " Archiving ${NB_FILES} files of ${FILE_SIZE_KB}kB each" echo " Archiving files: xrdcp as user1" kubectl -n ${NAMESPACE} cp client_ar.sh client:/root/client_ar.sh -kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1 +archiveFiles 1 15 REPACK_BUFFER_URL=/eos/ctaeos/repack echo "Creating the repack buffer URL directory (${REPACK_BUFFER_URL})" @@ -126,11 +130,6 @@ repackDisableTape() { echo "*************************************************************" } -archiveFiles() { - NB_FILES=1152 - kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1 -} - repackJustMove() { echo echo "*********************************************" @@ -340,17 +339,116 @@ repackMoveAndAddCopies() { echo "Launching the repack \"Move and add copies\" test on VID ${VID_TO_REPACK}" kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -t 600 -r ${BASE_REPORT_DIRECTORY}/Step6-MoveAndAddCopies || exit 1 + + echo "Reclaimimg tape ${VID_TO_REPACK}" + kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REPACK} + echo echo "***************************************************************" echo "STEP 6. Testing Repack \"Move and Add copies\" workflow TEST OK" echo "***************************************************************" } +repackTapeRepair() { + echo + echo "*******************************************************" + echo "STEP 7. Testing Repack \"Tape Repair\" workflow" + echo "*******************************************************" + + VID_TO_REPACK=$(getFirstVidContainingFiles) + if [ "$VID_TO_REPACK" == "null" ] + then + echo "No vid found to repack" + exit 1 + fi + + echo "Getting files to inject into the repack buffer directory" + + afls=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json archivefile ls --vid ${VID_TO_REPACK}` + nbFileToInject=10 + + if [[ $nbFileToInject != 0 ]] + then + echo "Will inject $nbFileToInject files into the repack buffer directory" + bufferDirectory=${REPACK_BUFFER_URL}/${VID_TO_REPACK} + echo "Creating buffer directory in \"$bufferDirectory\"" + kubectl -n ${NAMESPACE} exec ctaeos -- eos mkdir $bufferDirectory + kubectl -n ${NAMESPACE} exec ctaeos -- eos chmod 1777 $bufferDirectory + + echo "Retrieving files from the tape" + allPid=() + pathOfFilesToInject=() + diskIds=() + + for i in $(seq 0 $(( nbFileToInject - 1 )) ) + do + diskId=`echo $afls | jq -r ". [$i] | .af.diskId"` || break + diskIds[$i]=$diskId + pathFileToInject=`kubectl -n ${NAMESPACE} exec ctaeos -- eos fileinfo fid:$diskId --path | cut -d":" -f2 | tr -d " "` + pathOfFilesToInject[$i]=$pathFileToInject + done + + kubectl -n ${NAMESPACE} exec client -- bash /root/client_prepare_file.sh `for file in ${pathOfFilesToInject[@]}; do echo -n "-f $file "; done` + + echo "Copying the retrieved files into the repack buffer $bufferDirectory" + + for i in $(seq 0 $(( nbFileToInject - 1)) ) + do + fseqFile=`echo $afls | jq -r ". [] | select(.af.diskId == \"${diskIds[$i]}\") | .tf.fSeq"` || break + kubectl -n ${NAMESPACE} exec ctaeos -- eos cp ${pathOfFilesToInject[$i]} $bufferDirectory/`printf "%9d\n" $fseqFile | tr ' ' 0` + done + + echo "Launching a repack request on the vid ${VID_TO_REPACK}" + kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step7-RepackTapeRepair || exit 1 + + repackLsResult=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json repack ls --vid ${VID_TO_REPACK} | jq -r ". [0]"` + userProvidedFiles=`echo $repackLsResult | jq -r ".userProvidedFiles"` + archivedFiles=`echo $repackLsResult | jq -r ".archivedFiles"` + retrievedFiles=`echo $repackLsResult | jq -r ".retrievedFiles"` + totalFilesToRetrieve=`echo $repackLsResult | jq -r ".totalFilesToRetrieve"` + totalFilesToArchive=`echo $repackLsResult | jq -r ".totalFilesToArchive"` + + if [[ $totalFilesToRetrieve != $(( $totalFilesToArchive - $userProvidedFiles )) ]] + then + echo "totalFilesToRetrieve ($totalFilesToRetrieve) != totalFilesToArchive ($totalFilesToArchive) - userProvidedFiles ($userProvidedFiles), test FAILED" + exit 1 + else + echo "totalFilesToRetrieve ($totalFilesToRetrieve) == totalFilesToArchive ($totalFilesToArchive) - userProvidedFiles ($userProvidedFiles), OK" + fi + + if [[ $retrievedFiles != $totalFilesToRetrieve ]] + then + echo "retrievedFiles ($retrievedFiles) != totalFilesToRetrieve ($totalFilesToRetrieve) test FAILED" + exit 1 + else + echo "retrievedFiles ($retrievedFiles) == totalFilesToRetrieve ($totalFilesToRetrieve), OK" + fi + + if [[ $archivedFiles != $totalFilesToArchive ]] + then + echo "archivedFiles ($archivedFiles) != totalFilesToArchive ($totalFilesToArchive), test FAILED" + exit 1 + else + echo "archivedFiles ($archivedFiles) == totalFilesToArchive ($totalFilesToArchive), OK" + fi + + else + echo "No file to inject, test not OK" + exit 1 + fi + + echo + echo "*******************************************************" + echo "STEP 7. Testing Repack \"Tape Repair\" workflow TEST OK" + echo "*******************************************************" +} + #Execution of each tests roundTripRepack repackDisableTape -archiveFiles +archiveFiles 10 15 repackJustMove repackJustAddCopies repackCancellation repackMoveAndAddCopies +repackTapeRepair diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 3d42dd084c322636e8384dc92caf87ca90c8bac1..86e68f1b97763687b6e4546285cada3c180c0d86 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -461,6 +461,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer .add("commitUnlockQueueTime", commitUnlockQueueTime) .add("sleepTime", sleepTime); lc.log(log::INFO, "In ArchiveRequest::garbageCollect(): slept some time to not sit on the queue after GC requeueing."); + } catch (JobNotQueueable &ex){ + log::ScopedParamContainer params(lc); + params.add("jobObject", getAddressIfSet()) + .add("queueObject", queueObject) + .add("presumedOwner", presumedOwner) + .add("copyNb", j->copynb()); + lc.log(log::WARNING, "Job garbage collected with a status not queueable, nothing to do."); } catch (...) { // We could not requeue the job: fail it. j->set_status(serializers::AJS_Failed); @@ -772,8 +779,11 @@ JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& s return JobQueueType::JobsToReportToRepackForFailure; case ArchiveJobStatus::AJS_Failed: return JobQueueType::FailedJobs; + case ArchiveJobStatus::AJS_Complete: + case ArchiveJobStatus::AJS_Abandoned: + throw JobNotQueueable("In ArchiveRequest::getQueueType(): status is "+ArchiveRequest::statusToString(status)+ "there for it is not queueable."); default: - throw cta::exception::Exception("In ArchiveRequest::getQueueType(): invalid status for queueing."); + throw cta::exception::Exception("In ArchiveRequest::getQueueType(): unknown status for queueing."); } } @@ -788,6 +798,10 @@ std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& return "ToReportForTransfer"; case serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure: return "ToReportForFailure"; + case serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure: + return "ToReportToRepackForFailure"; + case serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess: + return "ToReportToRepackForSuccess"; case serializers::ArchiveJobStatus::AJS_Complete: return "Complete"; case serializers::ArchiveJobStatus::AJS_Failed: @@ -848,7 +862,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE // Wrong status, but the context leaves no ambiguity. Just warn. log::ScopedParamContainer params(lc); params.add("event", eventToString(jobEvent)) - .add("status", statusToString(*currentStatus)) + .add("status", ArchiveRequest::statusToString(*currentStatus)) .add("fileId", m_payload.archivefileid()); lc.log(log::WARNING, "In ArchiveRequest::determineNextStep(): unexpected status. Assuming ToTransfer."); } @@ -858,7 +872,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE // Wrong status, but end status will be the same anyway. log::ScopedParamContainer params(lc); params.add("event", eventToString(jobEvent)) - .add("status", statusToString(*currentStatus)) + .add("status", ArchiveRequest::statusToString(*currentStatus)) .add("fileId", m_payload.archivefileid()); lc.log(log::WARNING, "In ArchiveRequest::determineNextStep(): unexpected status. Failing the job."); } diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 8d925e8150ec27b0a8633a900bc913bf6a0ec149..f14918b270f91c413a9844887d14bcc79fc1e2e6 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -64,7 +64,7 @@ public: serializers::ArchiveJobStatus getJobStatus(uint32_t copyNumber); void setJobStatus(uint32_t copyNumber, const serializers::ArchiveJobStatus & status); std::string getTapePoolForJob(uint32_t copyNumber); - std::string statusToString(const serializers::ArchiveJobStatus & status); + static std::string statusToString(const serializers::ArchiveJobStatus & status); enum class JobEvent { TransferFailed, ReportFailed diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index bdaebb2e70ad0c19a3a432b132323ddd60fa05d9..eea57c3cb7834db4bdafca8742fc6fd35b2d3fc5 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -319,7 +319,15 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: .add("fileId", ar->getArchiveFile().archiveFileID); lc.log(log::INFO, "Selected archive request for requeueing to the corresponding queue"); jobRequeued=true; - } catch (ArchiveRequest::JobNotQueueable &) {} + } catch (ArchiveRequest::JobNotQueueable &) { + log::ScopedParamContainer params3(lc); + params3.add("tapePool", j.tapePool) + .add("containerIdentifier", containerIdentifier) + .add("copynb", j.copyNb) + .add("status",ArchiveRequest::statusToString(j.status)) + .add("fileId", ar->getArchiveFile().archiveFileID); + lc.log(log::WARNING, "Job garbage collected with a status not queueable. Leaving it as is."); + } } } if (!jobRequeued) { diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index 178e7a86a97318ba2f438134416df6b3fc803233..2659d30f5199783eca59dd8d1c08da094941c492 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -180,6 +180,7 @@ void RepackRequest::setTotalStats(const cta::SchedulerDatabase::RepackRequest::T setTotalFileToArchive(totalStatsFiles.totalFilesToArchive); setTotalBytesToArchive(totalStatsFiles.totalBytesToArchive); setTotalBytesToRetrieve(totalStatsFiles.totalBytesToRetrieve); + setUserProvidedFiles(totalStatsFiles.userProvidedFiles); } void RepackRequest::setMountPolicy(const common::dataStructures::MountPolicy& mp){ @@ -246,9 +247,10 @@ void RepackRequest::setStatus(){ return; } } - //Expand is finished or not, if we have retrieved files or not (first reporting), we are in Running, + //Expand is finished or not, if we have retrieved files or not (first reporting) or if we have archived files (repack tape repair workflow with all files + //provided by the user), we are in Running, //else we are in starting - if(m_payload.retrievedfiles() || m_payload.failedtoretrievefiles()){ + if(m_payload.retrievedfiles() || m_payload.failedtoretrievefiles() || m_payload.archivedfiles() || m_payload.failedtoarchivefiles()){ setStatus(common::dataStructures::RepackInfo::Status::Running); } else { setStatus(common::dataStructures::RepackInfo::Status::Starting); @@ -384,6 +386,11 @@ void RepackRequest::setTotalBytesToArchive(const uint64_t nbBytesToArchive) { m_payload.set_totalbytestoarchive(nbBytesToArchive); } +void RepackRequest::setUserProvidedFiles(const uint64_t userProvidedFiles){ + checkPayloadWritable(); + m_payload.set_userprovidedfiles(userProvidedFiles); +} + //------------------------------------------------------------------------------ // RepackRequest::getTotalStatsFile() //------------------------------------------------------------------------------ @@ -394,6 +401,7 @@ cta::SchedulerDatabase::RepackRequest::TotalStatsFiles RepackRequest::getTotalSt ret.totalBytesToArchive = m_payload.totalbytestoarchive(); ret.totalFilesToRetrieve = m_payload.totalfilestoretrieve(); ret.totalFilesToArchive = m_payload.totalfilestoarchive(); + ret.userProvidedFiles = m_payload.userprovidedfiles(); return ret; } @@ -411,8 +419,10 @@ void RepackRequest::reportRetriveSuccesses(SubrequestStatistics::List& retrieveS auto & p = pointerMap.at(rs.fSeq); if (!p.retrieveAccounted) { p.retrieveAccounted = true; - m_payload.set_retrievedbytes(m_payload.retrievedbytes() + rs.bytes); - m_payload.set_retrievedfiles(m_payload.retrievedfiles() + rs.files); + if(!rs.hasUserProvidedFile){ + m_payload.set_retrievedbytes(m_payload.retrievedbytes() + rs.bytes); + m_payload.set_retrievedfiles(m_payload.retrievedfiles() + rs.files); + } didUpdate = true; } } catch (std::out_of_range &) { diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index c049551a21cde43154d0bbdcb992d12b4f797b57..985b703da163ebc30d6ccc8a77e19d1603ca0113 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -105,6 +105,7 @@ public: void setTotalBytesToRetrieve(const uint64_t nbBytesToRetrieve); void setTotalFileToArchive(const uint64_t nbFilesToArchive); void setTotalBytesToArchive(const uint64_t nbBytesToArchive); + void setUserProvidedFiles(const uint64_t userProvidedFiles); struct SubrequestStatistics { uint64_t fSeq; @@ -113,6 +114,7 @@ public: /// CopyNb is needed to record archive jobs statistics (we can have several archive jobs for the same fSeq) uint32_t copyNb = 0; bool subrequestDeleted = false; + bool hasUserProvidedFile = false; typedef std::list<SubrequestStatistics> List; bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; } }; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 2a77db5af1630508a78f6d9848e8c1d48389d107..0027e54dcd1bb49708f332c84826d060818c5340 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -652,6 +652,8 @@ void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) { for (auto cntr: repackInfo.copyNbsToRearchive) { m_payload.mutable_repack_info()->mutable_copy_nbs_to_rearchive()->Add(cntr); } + + m_payload.mutable_repack_info()->set_has_user_provided_file(repackInfo.hasUserProvidedFile); m_payload.mutable_repack_info()->set_force_disabled_tape(repackInfo.forceDisabledTape); m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL); m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress); @@ -946,6 +948,9 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string for (auto cntr: payload.repack_info().copy_nbs_to_rearchive()) { ri.copyNbsToRearchive.insert(cntr); } + if(payload.repack_info().has_has_user_provided_file()){ + ri.hasUserProvidedFile = payload.repack_info().has_user_provided_file(); + } ri.fileBufferURL = payload.repack_info().file_buffer_url(); ri.isRepack = true; ri.repackRequestAddress = payload.repack_info().repack_request_address(); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 63c37f74a2efd7b3f7b8d76518723668bba00620..b1cea870fe343e8c5e53c50b25a03c5a135d790d 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -157,6 +157,7 @@ public: std::string repackRequestAddress; uint64_t fSeq; std::string fileBufferURL; + bool hasUserProvidedFile = false; }; void setRepackInfo(const RepackInfo & repackInfo); RepackInfo getRepackInfo(); @@ -175,6 +176,9 @@ public: rrri.set_repack_request_address(repackRequestAddress); rrri.set_fseq(fSeq); rrri.set_force_disabled_tape(forceDisabledTape); + if(rrri.has_has_user_provided_file()){ + rrri.set_has_user_provided_file(hasUserProvidedFile); + } } void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) { @@ -185,6 +189,9 @@ public: repackRequestAddress = rrri.repack_request_address(); fSeq = rrri.fseq(); forceDisabledTape = rrri.force_disabled_tape(); + if(rrri.has_has_user_provided_file()){ + hasUserProvidedFile = rrri.has_user_provided_file(); + } } }; private: diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index 124cd089a0b70e2abd244d98a8d87b1a09967c05..f48a3f87080e057e4f71001840576dd3d773321f 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -103,7 +103,8 @@ void Sorter::insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest } catch(const cta::exception::Exception &ex){ log::ScopedParamContainer params(lc); params.add("fileId", archiveRequest->getArchiveFile().archiveFileID) - .add("exceptionMessage", ex.getMessageValue()); + .add("exceptionMessage", ex.getMessageValue()) + .add("status", ArchiveRequest::statusToString(job.status)); lc.log(log::ERR,"In Sorter::insertArchiveJob() Failed to determine destination queue for Archive Job."); } } diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 25af2633accb4de8cdf1c05ed41ec7f6c58f889f..18130877b126b2865aede3c587c2dd71f998ce34 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -417,6 +417,7 @@ message RetrieveRequestRepackInfo { required string file_buffer_url = 9530; required uint64 fseq = 9540; required bool force_disabled_tape = 9560; + optional bool has_user_provided_file = 9163 [default = false]; } // The different timings of the lifecycle of a RetrieveRequest (creation time, first select time, request complete) diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index e6a0ff4ef9db233245438d206bb24fcc551fa93f..a27b9ee5c71e8a5dbd253d29ef06c4f4ff147e51 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -176,7 +176,14 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct .add("fileId",job->archiveFile.archiveFileID) .add("type", "ReportSuccessful"); logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job succesful."); - tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release()); + try { + tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release()); + } catch (const cta::exception::Exception &ex){ + //We put the not validated job into this list in order to insert the job + //into the failedToReportArchiveJobs list in the exception catching block + validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); + throw ex; + } files++; bytes+=job->archiveFile.fileSize; validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index c89bd8115843fdb4afa208f181890f8073dbfa05..454ed147a82e67cbc9ae5620d2d10859312d1ea1 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1942,6 +1942,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { ssl.back().bytes = rr.archiveFile.fileSize; ssl.back().files = 1; ssl.back().fSeq = rr.repackInfo.fSeq; + ssl.back().hasUserProvidedFile = rr.repackInfo.hasUserProvidedFile; } // Record it. timingList.insertAndReset("successStatsPrepareTime", t); @@ -2386,6 +2387,9 @@ uint64_t OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequ rRRepackInfo.isRepack = true; rRRepackInfo.forceDisabledTape = forceDisabledTape; rRRepackInfo.repackRequestAddress = m_repackRequest.getAddressIfSet(); + if(rsr.hasUserProvidedFile){ + rRRepackInfo.hasUserProvidedFile = true; + } rr->setRepackInfo(rRRepackInfo); // Set the queueing parameters common::dataStructures::RetrieveFileQueueCriteria fileQueueCriteria; @@ -2421,6 +2425,7 @@ uint64_t OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequ failedCreationStats.bytes += rsr.archiveFile.fileSize; log::ScopedParamContainer params(lc); params.add("fileId", rsr.archiveFile.archiveFileID) + .add("forceDisabledTape",repackInfo.forceDisabledTape) .add("repackVid", repackInfo.vid); lc.log(log::ERR, "In OStoreDB::RepackRequest::addSubrequests(): could not queue a retrieve subrequest. Subrequest failed."); @@ -2446,6 +2451,16 @@ uint64_t OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequ continue; } copyNbFound:; + if(rsr.hasUserProvidedFile) { + /** + * As the user has provided the file through the Repack buffer folder, + * we will not Retrieve the file from the tape. We create the Retrieve + * Request but directly with the status RJS_ToReportToRepackForSuccess so that + * this retrieve request is queued in the RetrieveQueueToReportToRepackForSuccess + * and hence be transformed into an ArchiveRequest. + */ + rr->setJobStatus(activeCopyNumber,serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); + } // We have the best VID. The request is ready to be created after comleting its information. rr->setOwner(m_oStoreDB.m_agentReference->getAgentAddress()); rr->setActiveCopyNumber(activeCopyNumber); @@ -2538,6 +2553,7 @@ uint64_t OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequ locks.clear(); sorter.flushAll(lc); } + //General m_repackRequest.setLastExpandedFSeq(fSeq); m_repackRequest.commit(); return nbRetrieveSubrequestsCreated; diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 051ca84a19d7ddd6870660ee4fe36e603c4fc496..4a013eee3ffb96f205255f07b155e452807532dd 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -487,8 +487,15 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques cta::disk::DirectoryFactory dirFactory; dir.reset(dirFactory.createDirectory(dirBufferURL.str())); if(dir->exist()){ - //TODO : Repack tape repair workflow - //filesInDirectory = dir->getFilesName(); + //Repack tape repair workflow + try{ + filesInDirectory = dir->getFilesName(); + } catch (const cta::exception::XrootCl &ex) { + log::ScopedParamContainer spc(lc); + spc.add("vid",repackInfo.vid); + spc.add("errorMessage",ex.getMessageValue()); + lc.log(log::WARNING,"In Scheduler::expandRepackRequest(), received XRootdException while listing files in the buffer"); + } } else { dir->mkdir(); } @@ -599,12 +606,33 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques cta::disk::DiskFileFactory fileFactory("",0,radosStriperPool); cta::disk::ReadFile *fileReader = fileFactory.createReadFile(dirBufferURL.str() + fileName.str()); if(fileReader->size() == archiveFile.fileSize){ - /*createArchiveSubrequest = true; - retrieveSubrequests.pop_back();*/ - //TODO : We don't want to retrieve the file again, create archive subrequest + createArchiveSubrequest = true; + } + } + if (!createArchiveSubrequest && retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) { + if(!createArchiveSubrequest){ + log::ScopedParamContainer params(lc); + params.add("fileId", retrieveSubRequest.archiveFile.archiveFileID) + .add("repackVid", repackInfo.vid); + lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape."); + totalStatsFile.totalBytesToRetrieve -= retrieveSubRequest.archiveFile.fileSize; + totalStatsFile.totalFilesToRetrieve -= 1; + retrieveSubrequests.pop_back(); } + } else { + if(!createArchiveSubrequest){ + totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize; + totalStatsFile.totalFilesToRetrieve += 1; + } else { + totalStatsFile.userProvidedFiles += 1; + retrieveSubRequest.hasUserProvidedFile = true; + } + // We found some copies to rearchive. We still have to decide which file path we are going to use. + // File path will be base URL + /<VID>/<fSeq> + maxAddedFSeq = std::max(maxAddedFSeq,retrieveSubRequest.fSeq); + retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str(); } - if(!createArchiveSubrequest){ + /*if(!createArchiveSubrequest){ totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize; totalStatsFile.totalFilesToRetrieve += 1; if (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) { @@ -618,13 +646,10 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques } else { // We found some copies to rearchive. We still have to decide which file path we are going to use. // File path will be base URL + /<VID>/<fSeq> - /*std::stringstream fileBufferURL; - fileBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/" - << std::setw(9) << std::setfill('0') << rsr.fSeq;*/ maxAddedFSeq = std::max(maxAddedFSeq,retrieveSubRequest.fSeq); retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str(); } - } + }*/ expansionTimeReached = (elapsedTime >= m_repackRequestExpansionTimeLimit); } // Note: the highest fSeq will be recorded internally in the following call. @@ -656,7 +681,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques repackRequest->m_dbReq->requeueInToExpandQueue(lc); lc.log(log::INFO,"Repack Request requeued in ToExpand queue."); } else { - if(totalStatsFile.totalFilesToRetrieve == 0 || nbRetrieveSubrequestsQueued == 0){ + if(totalStatsFile.totalFilesToArchive == 0 && (totalStatsFile.totalFilesToRetrieve == 0 || nbRetrieveSubrequestsQueued == 0)){ //If no files have been retrieve, the repack buffer will have to be deleted //TODO : in case of Repack tape repair, we should not try to delete the buffer deleteRepackBuffer(std::move(dir)); diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index ee7ba1c4b41745b805f3a0e5adf94577d6ce5a75..94595b20e09ee83665b60132639e2cc4aab97e0a 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -459,6 +459,7 @@ public: cta::common::dataStructures::ArchiveFile archiveFile; std::set<uint32_t> copyNbsToRearchive; std::string fileBufferURL; + bool hasUserProvidedFile = false; }; //Struct to hold the RepackRequest's total stats @@ -467,7 +468,7 @@ public: uint64_t totalBytesToArchive = 0; uint64_t totalFilesToRetrieve = 0; uint64_t totalBytesToRetrieve = 0; - //TODO : userprovidedfiles and userprovidedbytes + uint64_t userProvidedFiles = 0; }; /**