diff --git a/catalogue/catalogue_common_schema.sql b/catalogue/catalogue_common_schema.sql index 3184c2009c24b568e924faacf3ebb98d52f6b95f..d8e7eca8a47ce7af60bdd4bd6d8c772093bcffaf 100644 --- a/catalogue/catalogue_common_schema.sql +++ b/catalogue/catalogue_common_schema.sql @@ -192,6 +192,7 @@ CREATE TABLE TAPE_FILE( REFERENCES ARCHIVE_FILE(ARCHIVE_FILE_ID), CONSTRAINT TAPE_FILE_VID_BLOCK_ID_UN UNIQUE(VID, BLOCK_ID) ); +CREATE INDEX TAPE_FILE_ARCHIVE_FILE_ID_FK ON TAPE_FILE(ARCHIVE_FILE_ID); INSERT INTO CTA_CATALOGUE( SCHEMA_VERSION_MAJOR, SCHEMA_VERSION_MINOR) diff --git a/common/log/LogContext.cpp b/common/log/LogContext.cpp index 258cc5655ecadae51d28cf0bb4a349fbf47ac4f1..335d3c0ff9ff4ac150cfd02268d31e5eee2c1ccc 100644 --- a/common/log/LogContext.cpp +++ b/common/log/LogContext.cpp @@ -41,6 +41,17 @@ void LogContext::pushOrReplace(const Param& param) throw() { } } +void LogContext::moveToTheEndIfPresent(const std::string& paramName) throw() { + ParamNameMatcher match(paramName); + std::list<Param>::iterator i = + std::find_if(m_params.begin(), m_params.end(), match); + if (i != m_params.end()) { + const Param param(paramName,i->getValue()); + m_params.erase(i); + m_params.push_back(param); + } +} + void LogContext::erase(const std::string& paramName) throw() { ParamNameMatcher match(paramName); m_params.erase(std::remove_if(m_params.begin(), m_params.end(), match), m_params.end()); diff --git a/common/log/LogContext.hpp b/common/log/LogContext.hpp index 09b13e0814071b59cf57393d13c790a7ab135227..92be7db1151b9f02a52c96bf42f829d626ebd1e2 100644 --- a/common/log/LogContext.hpp +++ b/common/log/LogContext.hpp @@ -59,6 +59,14 @@ public: * @param param */ void pushOrReplace(const Param & param) throw(); + + /** + * Move a parameter with a given name to the end of the container it it + * present. + * + * @param paramName The name of the parameter to check and move. + */ + void moveToTheEndIfPresent(const std::string ¶mName) throw(); /** * Removes a parameter from the list. diff --git a/continuousintegration/buildtree_runner/generate_mhvtl_config.sh b/continuousintegration/buildtree_runner/generate_mhvtl_config.sh index 29a557e5b6e9fc9dff732964bc1131c5a1ab0fc5..6c0f1e4da7468ec4b7e2c57c7bf02663d71fd537 100755 --- a/continuousintegration/buildtree_runner/generate_mhvtl_config.sh +++ b/continuousintegration/buildtree_runner/generate_mhvtl_config.sh @@ -19,6 +19,8 @@ Library: ${libid} CHANNEL: $(printf %.2d ${lib}) TARGET: 00 LUN: 00 Product identification: VLSTK${libid} Unit serial number: LIB${libid} NAA: 30:22:33:44:ab:$(printf %.2x ${lib}):00:00 + Compression: factor 1 enabled 1 + Compression type: lzo Home directory: /opt/mhvtl PERSIST: False Backoff: 400 diff --git a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list index d382864a86b66f605f6d6a2a352cbc29bf9283e3..dbf1ebc4edbd2f611ee77586c77855a8831c5719 100644 --- a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list +++ b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list @@ -1,19 +1,54 @@ -0:eos-archive-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-cleanup-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-client-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-debuginfo-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-fuse-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-fuse-core-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-fuse-sysv-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-server-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-srm-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-test-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -0:eos-testkeytab-4.1.25-20170717164915gitbd36415.el7.cern.x86_64 -1:xrootd-4.4.1-1.el7.x86_64 -1:xrootd-debuginfo-4.4.1-1.el7.x86_64 -1:xrootd-client-libs-4.4.1-1.el7.x86_64 -1:xrootd-client-4.4.1-1.el7.x86_64 -1:xrootd-selinux-4.4.1-1.el7.noarch -1:xrootd-server-4.4.1-1.el7.x86_64 -1:xrootd-libs-4.4.1-1.el7.x86_64 -1:xrootd-server-libs-4.4.1-1.el7.x86_64 +0:eos-archive-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-cleanup-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-client-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-debuginfo-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-fuse-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-fuse-core-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-fuse-sysv-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-server-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-srm-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-test-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-testkeytab-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +1:xrootd-4.7.0-1.el7.x86_64 +1:xrootd-client-4.7.0-1.el7.x86_64 +1:xrootd-client-libs-4.7.0-1.el7.x86_64 +1:xrootd-debuginfo-4.7.0-1.el7.x86_64 +1:xrootd-libs-4.7.0-1.el7.x86_64 +1:xrootd-selinux-4.7.0-1.el7.noarch +1:xrootd-server-4.7.0-1.el7.x86_64 +1:xrootd-server-libs-4.7.0-1.el7.x86_64 +1:ceph-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-base-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-common-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-debuginfo-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:cephfs-java-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-fuse-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-mds-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-mon-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-osd-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-radosgw-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-selinux-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:ceph-test-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libcephfs1-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libcephfs-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libcephfs_jni1-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libcephfs_jni-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librados2-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librados-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libradosstriper1-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:libradosstriper-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librbd1-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librbd-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librgw2-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:librgw-devel-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python34-ceph-argparse-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python34-cephfs-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python34-rados-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python34-rbd-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python-ceph-compat-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python-cephfs-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python-rados-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:python-rbd-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:rbd-fuse-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:rbd-mirror-11.0.0-2590.g08becd3.el7.cern.x86_64 +1:rbd-nbd-11.0.0-2590.g08becd3.el7.cern.x86_64 diff --git a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh index 1cefbdc3f6e1373a38bfc4bd5368a52fbc699dc4..d0edfd645f9aa018a09be91a99bfef8392fff411 100755 --- a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh +++ b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh @@ -8,7 +8,7 @@ yum-config-manager --enable eos-citrine-depend yum-config-manager --enable eos-citrine # Install missing RPMs -yum -y install eos-client eos-server xrootd-client xrootd-debuginfo xrootd-server cta-cli cta-debuginfo +yum -y install eos-client eos-server xrootd-client xrootd-debuginfo xrootd-server cta-cli cta-debuginfo sudo # create local users as the mgm is the only one doing the uid/user/group mapping in the full infrastructure groupadd --gid 1100 eosusers @@ -98,6 +98,33 @@ for ((i=0;i<600;i++)); do done test -f /CANSTART && echo OK || exit 1 +# setting higher OS limits for EOS processes +maxproc=$(ulimit -u) +echo "Setting nproc for user daemon to ${maxproc}" +cat >> /etc/security/limits.conf <<EOF +daemon soft nproc ${maxproc} +daemon hard nproc ${maxproc} +EOF +echo "Checking limits..." +echo -n "nproc..." +if [ "${maxproc}" -eq "$(sudo -u daemon bash -c 'ulimit -u')" ]; then + echo OK +else + echo FAILED +fi +echo +echo "Limits summary for user daemon:" +sudo -u daemon bash -c 'ulimit -a' + +# Using jemalloc as specified in +# it-puppet-module-eos: +# code/templates/etc_sysconfig_mgm.erb +# code/templates/etc_sysconfig_mgm_env.erb +# code/templates/etc_sysconfig_fst.erb +# code/templates/etc_sysconfig_fst_env.erb +test -e /usr/lib64/libjemalloc.so.1 && echo "Using jemalloc for EOS processes" +test -e /usr/lib64/libjemalloc.so.1 && export LD_PRELOAD=/usr/lib64/libjemalloc.so.1 + # start and setup eos for xrdcp to the ${CTA_TEST_DIR} #/etc/init.d/eos start /usr/bin/xrootd -n mq -c /etc/xrd.cf.mq -l /var/log/eos/xrdlog.mq -b -Rdaemon @@ -155,7 +182,9 @@ test -f /CANSTART && echo OK || exit 1 # enable eos workflow engine eos space config default space.wfe=on # set the thread-pool size of concurrently running workflows - eos space config default space.wfe.ntx=3 + #eos space config default space.wfe.ntx=10 + # set interval in which the WFE engine is running + #eos space config default space.wfe.interval=1 # ATTENTION # for sss authorisation unix has to be replaced by sss diff --git a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctafrontend.sh b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctafrontend.sh index 7a63e301ca0b5702f7ed886b0311e9aa35d7a832..718e70cdcc3a5866e7d83f9b448743f8fd27cce8 100755 --- a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctafrontend.sh +++ b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctafrontend.sh @@ -15,7 +15,7 @@ yes | cp -r /opt/ci/ctafrontend/etc / . /tmp/objectstore-rc.sh echo "ObjectStore BackendPath $OBJECTSTOREURL" > /etc/cta/cta-frontend.conf -echo "Catalogue NumberOfConnections 1" >>/etc/cta/cta-frontend.conf +echo "Catalogue NumberOfConnections 10" >>/etc/cta/cta-frontend.conf echo "Log URL file:/var/log/cta/cta-frontend.log" >>/etc/cta/cta-frontend.conf @@ -52,6 +52,9 @@ touch /var/log/cta/cta-frontend.log chmod a+w /var/log/cta/cta-frontend.log tail -F /var/log/cta/cta-frontend.log & +echo "Generating core file in /var/log/cta directory so that those are available as artifacts" +echo '/var/log/cta/core_%e.%p' > /proc/sys/kernel/core_pattern + echo "Launching frontend" runuser --shell='/bin/bash' --session-command='cd ~cta; xrootd -n cta -c /etc/xrootd/xrootd-cta.cfg -I v4' cta diff --git a/continuousintegration/orchestration/tests/client_ar.sh b/continuousintegration/orchestration/tests/client_ar.sh index 7d135d2387a13411499bc146af8f9dd6ac58d03d..a80433ca7b60f9f0765baeb90664e9d9494524d3 100644 --- a/continuousintegration/orchestration/tests/client_ar.sh +++ b/continuousintegration/orchestration/tests/client_ar.sh @@ -3,6 +3,7 @@ EOSINSTANCE=ctaeos EOS_BASEDIR=/eos/ctaeos/cta TEST_FILE_NAME_BASE=test +DATA_SOURCE=/dev/urandom NB_PROCS=1 NB_FILES=1 @@ -18,12 +19,12 @@ die() { usage() { cat <<EOF 1>&2 -Usage: $0 [-n <nb_files>] [-s <file_kB_size>] [-p <# parallel procs>] [-v] [-d <eos_dest_dir>] [-e <eos_instance>] +Usage: $0 [-n <nb_files>] [-s <file_kB_size>] [-p <# parallel procs>] [-v] [-d <eos_dest_dir>] [-e <eos_instance>] [-S <data_source_file>] EOF exit 1 } -while getopts "d:e:n:s:p:v" o; do +while getopts "d:e:n:s:p:vS:" o; do case "${o}" in e) EOSINSTANCE=${OPTARG} @@ -43,6 +44,9 @@ while getopts "d:e:n:s:p:v" o; do v) VERBOSE=1 ;; + S) + DATA_SOURCE=${OPTARG} + ;; *) usage ;; @@ -82,7 +86,7 @@ eos root://${EOSINSTANCE} ls ${EOS_DIR} | egrep "${TEST_FILE_NAME_BASE}[0-9]+" | echo "Waiting for files to be on tape:" SECONDS_PASSED=0 -WAIT_FOR_ARCHIVED_FILE_TIMEOUT=$((40+${NB_FILES}/10)) +WAIT_FOR_ARCHIVED_FILE_TIMEOUT=$((40+${NB_FILES}/5)) while test 0 != $(grep -c copied$ ${STATUS_FILE}); do echo "Waiting for files to be archived to tape: Seconds passed = ${SECONDS_PASSED}" sleep 1 @@ -135,7 +139,7 @@ grep tapeonly$ ${STATUS_FILE} | sed -e 's/ .*$//' | XrdSecPROTOCOL=sss xargs --m # Wait for the copy to appear on disk SECONDS_PASSED=0 -WAIT_FOR_RETRIEVED_FILE_TIMEOUT=$((40+${NB_FILES}/10)) +WAIT_FOR_RETRIEVED_FILE_TIMEOUT=$((40+${NB_FILES}/5)) while test 0 != $(grep -c tapeonly$ ${STATUS_FILE}); do echo "Waiting for files to be retrieved from tape: Seconds passed = ${SECONDS_PASSED}" sleep 1 diff --git a/eos/EOSReporter.cpp b/eos/EOSReporter.cpp index 4905da0f3933d37c979c580591bad15ed263c80e..3bcfa56dfda1285cec0278026415a73dd9a8bb15 100644 --- a/eos/EOSReporter.cpp +++ b/eos/EOSReporter.cpp @@ -42,38 +42,29 @@ void EOSReporter::asyncReportArchiveFullyComplete() { auto qcOpaque = XrdCl::QueryCode::OpaqueFile; XrdCl::Buffer arg (m_query.size()); arg.FromString(m_query); - AsyncQueryHandler *handler = new AsyncQueryHandler(m_reporterState); - XrdCl::XRootDStatus status=m_fs.Query( qcOpaque, arg, handler, CTA_EOS_QUERY_TIMEOUT); + XrdCl::XRootDStatus status=m_fs.Query( qcOpaque, arg, this, CTA_EOS_QUERY_TIMEOUT); cta::exception::XrootCl::throwOnError(status, "In EOSReporter::asyncReportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()"); } -//------------------------------------------------------------------------------ -//EOSReporter::AsyncQueryHandler::AsyncQueryHandler -//------------------------------------------------------------------------------ -EOSReporter::AsyncQueryHandler::AsyncQueryHandler(std::promise<void> &handlerPromise): - m_handlerPromise(handlerPromise) {} - //------------------------------------------------------------------------------ //EOSReporter::AsyncQueryHandler::HandleResponse //------------------------------------------------------------------------------ -void EOSReporter::AsyncQueryHandler::HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response) { +void EOSReporter::HandleResponse(XrdCl::XRootDStatus *status, + XrdCl::AnyObject *response) { try { cta::exception::XrootCl::throwOnError(*status, "In EOSReporter::AsyncQueryHandler::HandleResponse(): failed to XrdCl::FileSystem::Query()"); + m_reporterState.set_value(); } catch (...) { try { // store anything thrown in the promise - m_handlerPromise.set_exception(std::current_exception()); + m_reporterState.set_exception(std::current_exception()); } catch(...) { // set_exception() may throw too } } - - m_handlerPromise.set_value(); delete response; delete status; - delete this; } }} // namespace cta::disk diff --git a/eos/EOSReporter.hpp b/eos/EOSReporter.hpp index c35fe5490910891653a18aa2145809bed9ef0ed2..f85569f40f6b916f06e700e8b8b8267da5fd3172 100644 --- a/eos/EOSReporter.hpp +++ b/eos/EOSReporter.hpp @@ -26,7 +26,7 @@ namespace cta { namespace eos { const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds -class EOSReporter: public DiskReporter { +class EOSReporter: public DiskReporter, public XrdCl::ResponseHandler { public: EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState); void reportArchiveFullyComplete() override; @@ -35,14 +35,8 @@ private: XrdCl::FileSystem m_fs; std::string m_query; std::promise<void> &m_reporterState; - class AsyncQueryHandler: public XrdCl::ResponseHandler { - public: - AsyncQueryHandler(std::promise<void> &handlerPromise); - virtual void HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response); - private: - std::promise<void> &m_handlerPromise; - }; + virtual void HandleResponse(XrdCl::XRootDStatus *status, + XrdCl::AnyObject *response); }; }} // namespace cta::disk diff --git a/eos_wfe_scripts/Makefile b/eos_wfe_scripts/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..8cc3bc2ab2ee2b7cc65d061e9cabff74b2a4a791 --- /dev/null +++ b/eos_wfe_scripts/Makefile @@ -0,0 +1,24 @@ +SPECFILE = $(shell find -maxdepth 1 -type f -name *.spec) +PACKAGE = $(shell awk '$$1 == "Name:" { print $$2 }' $(SPECFILE) ) +VERSION = $(shell awk '$$1 == "Version:" { print $$2 }' $(SPECFILE) ) +RELEASE = $(shell awk '$$1 == "Release:" { print $$2 }' $(SPECFILE) ) +TARFILE = $(PACKAGE)-$(VERSION).tar.gz +RPMTOPDIR = $(shell rpm --eval '%{_topdir}') +BUILDARCH = $(shell awk '$$1 == "BuildArch:" { print $$2 }' $(SPECFILE) ) +BUILTRPM = $(RPMTOPDIR)/RPMS/$(BUILDARCH)/$(PACKAGE)-$(VERSION)-$(RELEASE).$(BUILDARCH).rpm + +all: $(TARFILE) + + +$(TARFILE): + tar cvzf $(TARFILE) --hard-dereference --dereference --exclude-vcs --transform 's,^,$(PACKAGE)-$(VERSION)/,' * + +clean: + rm $(TARFILE) + +build: $(TARFILE) $(SPECFILE) + mv $(TARFILE) $(RPMTOPDIR)/SOURCES + rpmbuild -ba $(SPECFILE) + +deb_build: build + sudo alien $(BUILTRPM) --scripts -k diff --git a/eos_wfe_scripts/cta_eos_wfe_scripts.spec b/eos_wfe_scripts/cta_eos_wfe_scripts.spec new file mode 100644 index 0000000000000000000000000000000000000000..cb6620974bfc903da9bfbdabfb74d7998b142270 --- /dev/null +++ b/eos_wfe_scripts/cta_eos_wfe_scripts.spec @@ -0,0 +1,42 @@ +Summary: CERN Tape Archive workflow scripts for EOS +Name: cta_eos_wfe_scripts +Version: 0.1 +Release: 1 +License: GPL +Group: Applications/System +Buildroot: %{_tmppath}/%{name}-%{version} +Source: %{name}-%{version}.tar.gz +Group: Applications/System +BuildArch: noarch +requires: eos-server + +%description +eos_wfe_scripts contains all the workflows needed for archival from EOS to CTA and for retrieva from CTA to EOS. +This version contains all the file for the so called *preproduction* workflows. + + +%prep +%setup -n %{name}-%{version} + + +%build + + +%install +[ -d %{buildroot} ] && rm -rf %{buildroot} + +mkdir -p %{buildroot}/var/eos/wfe/bash +install -m 755 create_tape_drop_disk_replicas %{buildroot}/var/eos/wfe/bash/create_tape_drop_disk_replicas +install -m 755 delete_archive_file %{buildroot}/var/eos/wfe/bash/delete_archive_file +install -m 755 retrieve_archive_file %{buildroot}/var/eos/wfe/bash/retrieve_archive_file + + +%clean +rm -rf %{buildroot} + + +%files +%defattr(-,daemon,daemon) +/var/eos/wfe/bash/create_tape_drop_disk_replicas +/var/eos/wfe/bash/delete_archive_file +/var/eos/wfe/bash/retrieve_archive_file diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 6f586b18a4c35b002f101a996e83fb8668736365..a24b9c2afb78567fc1f9c1e99813b251aa37f9cc 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -408,6 +408,7 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16 ret->m_updaterCallback= [this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string { // We have a locked and fetched object, so we just need to work on its representation. + retRef.m_timingReport.lockFetchTime = retRef.m_timer.secs(utils::Timer::resetCounter); serializers::ObjectHeader oh; if (!oh.ParseFromString(in)) { // Use a the tolerant parser to assess the situation. @@ -455,6 +456,7 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16 retRef.m_archiveReportURL = payload.archivereporturl(); retRef.m_srcURL = payload.srcurl(); oh.set_payload(payload.SerializePartialAsString()); + retRef.m_timingReport.processTime = retRef.m_timer.secs(utils::Timer::resetCounter); return oh.SerializeAsString(); } } @@ -467,8 +469,14 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16 void ArchiveRequest::AsyncJobOwnerUpdater::wait() { m_backendUpdater->wait(); + m_timingReport.commitUnlockTime = m_timer.secs(); } +ArchiveRequest::AsyncJobOwnerUpdater::TimingsReport ArchiveRequest::AsyncJobOwnerUpdater::getTimeingsReport() { + return m_timingReport; +} + + const common::dataStructures::ArchiveFile& ArchiveRequest::AsyncJobOwnerUpdater::getArchiveFile() { return m_archiveFile; } diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index d56d3d0998065c60773f53258c2052eb105fb9c2..ebe78be3be94da038f3d1cd8b304e1739f6fa4ce 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -24,6 +24,7 @@ #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/UserIdentity.hpp" #include "common/dataStructures/ArchiveFile.hpp" +#include "common/Timer.hpp" #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" #include <list> @@ -66,12 +67,20 @@ public: const common::dataStructures::ArchiveFile & getArchiveFile(); const std::string & getSrcURL(); const std::string & getArchiveReportURL(); + struct TimingsReport { + double lockFetchTime = 0; + double processTime = 0; + double commitUnlockTime = 0; + }; + TimingsReport getTimeingsReport(); private: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; common::dataStructures::ArchiveFile m_archiveFile; std::string m_srcURL; std::string m_archiveReportURL; + utils::Timer m_timer; + TimingsReport m_timingReport; }; // An job owner updater factory. The owner MUST be previousOwner for the update to be executed. CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner); diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 242274acd28300be4ff25c336147fcb45164e62d..db5761cfcbb647df965346857877f2f665dca543 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -41,9 +41,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, // Try and find an existing one first, create if needed Backend & be = archiveQueue.m_objectStore; for (size_t i=0; i<5; i++) { - double rootLockSharedTime = 0; - double rootFetchTime = 0; - double rootUnlockSharedTime = 0; + double rootFetchNoLockTime = 0; double rootRelockExclusiveTime = 0; double rootUnlockExclusiveTime = 0; double rootRefetchTime = 0; @@ -52,16 +50,12 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, double queueFetchTime = 0; utils::Timer t; { - RootEntry re (be); - ScopedSharedLock rel(re); - rootLockSharedTime = t.secs(utils::Timer::resetCounter); - re.fetch(); - rootFetchTime = t.secs(utils::Timer::resetCounter); + RootEntry re(be); + re.fetchNoLock(); + rootFetchNoLockTime = t.secs(utils::Timer::resetCounter); try { archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool)); } catch (cta::exception::Exception & ex) { - rel.release(); - rootUnlockSharedTime = t.secs(utils::Timer::resetCounter); ScopedExclusiveLock rexl(re); rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter); re.fetch(); @@ -70,9 +64,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter); } } - if (!rootUnlockSharedTime) - rootUnlockSharedTime = t.secs(utils::Timer::resetCounter); - else + if (rootRelockExclusiveTime) rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter); try { archiveQueueLock.lock(archiveQueue); @@ -82,9 +74,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, log::ScopedParamContainer params(lc); params.add("attemptNb", i+1) .add("queueObject", archiveQueue.getAddressIfSet()) - .add("rootLockSharedTime", rootLockSharedTime) - .add("rootFetchTime", rootFetchTime) - .add("rootUnlockSharedTime", rootUnlockSharedTime) + .add("rootFetchNoLockTime", rootFetchNoLockTime) .add("rootRelockExclusiveTime", rootRelockExclusiveTime) .add("rootRefetchTime", rootRefetchTime) .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime) @@ -105,15 +95,13 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, params.add("attemptNb", i+1) .add("exceptionMessage", ex.getMessageValue()) .add("queueObject", archiveQueue.getAddressIfSet()) - .add("rootLockSharedTime", rootLockSharedTime) - .add("rootFetchTime", rootFetchTime) - .add("rootUnlockSharedTime", rootUnlockSharedTime) + .add("rootFetchNoLockTime", rootFetchNoLockTime) .add("rootRefetchTime", rootRefetchTime) .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime) .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime) .add("queueLockTime", queueLockTime) .add("queueFetchTime", queueFetchTime); - lc.log(log::ERR, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): failed to fetch an existing queue. Retrying."); + lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): failed to fetch an existing queue. Retrying."); archiveQueue.resetAddress(); continue; } catch (...) { @@ -145,31 +133,79 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue // Try and find an existing one first, create if needed Backend & be = retrieveQueue.m_objectStore; for (size_t i=0; i<5; i++) { + double rootFetchNoLockTime = 0; + double rootRelockExclusiveTime = 0; + double rootUnlockExclusiveTime = 0; + double rootRefetchTime = 0; + double addOrGetQueueandCommitTime = 0; + double queueLockTime = 0; + double queueFetchTime = 0; + utils::Timer t; { RootEntry re (be); - ScopedSharedLock rel(re); - re.fetch(); + re.fetchNoLock(); + rootFetchNoLockTime = t.secs(utils::Timer::resetCounter); try { retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid)); } catch (cta::exception::Exception & ex) { - rel.release(); ScopedExclusiveLock rexl(re); + rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter); re.fetch(); + rootRefetchTime = t.secs(utils::Timer::resetCounter); retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference)); + addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter); } } + if (rootRelockExclusiveTime) + rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter); try { retrieveQueueLock.lock(retrieveQueue); + queueLockTime = t.secs(utils::Timer::resetCounter); retrieveQueue.fetch(); + queueFetchTime = t.secs(utils::Timer::resetCounter); + log::ScopedParamContainer params(lc); + params.add("attemptNb", i+1) + .add("queueObject", retrieveQueue.getAddressIfSet()) + .add("rootFetchNoLockTime", rootFetchNoLockTime) + .add("rootRelockExclusiveTime", rootRelockExclusiveTime) + .add("rootRefetchTime", rootRefetchTime) + .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime) + .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime) + .add("queueLockTime", queueLockTime) + .add("queueFetchTime", queueFetchTime); + lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): Successfully found and locked a retrieve queue."); return; } catch (cta::exception::Exception & ex) { // We have a (rare) opportunity for a race condition, where we identify the // queue and it gets deleted before we manage to lock it. // The locking of fetching will fail in this case. // We hence allow ourselves to retry a couple times. + // We also need to make sure the lock on the queue is released (it is in + // an object and hence not scoped). + if (retrieveQueueLock.isLocked()) retrieveQueueLock.release(); + log::ScopedParamContainer params(lc); + params.add("attemptNb", i+1) + .add("exceptionMessage", ex.getMessageValue()) + .add("queueObject", retrieveQueue.getAddressIfSet()) + .add("rootFetchNoLockTime", rootFetchNoLockTime) + .add("rootRefetchTime", rootRefetchTime) + .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime) + .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime) + .add("queueLockTime", queueLockTime) + .add("queueFetchTime", queueFetchTime); + lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): failed to fetch an existing queue. Retrying."); + retrieveQueue.resetAddress(); continue; + } catch (...) { + // Also release the lock if needed here. + if (retrieveQueueLock.isLocked()) retrieveQueueLock.release(); + retrieveQueue.resetAddress(); + throw; } } + // Also release the lock if needed here. + if (retrieveQueueLock.isLocked()) retrieveQueueLock.release(); + retrieveQueue.resetAddress(); throw cta::exception::Exception(std::string( "In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ") + vid); diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 1c71cc2dbc994efdf4c43988bc185531c35218af..36f55db894c0b0e0a47d07099a14cf5e5e78d4ed 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -330,7 +330,7 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId) { return false; } } - throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job"); + throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job"); } bool RetrieveRequest::finishIfNecessary() { @@ -417,7 +417,7 @@ auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& o } } // If we do not find the copy, return not owned as well... - throw WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): copyNb not found."); + throw WrongPreviousOwner("In RetrieveRequest::asyncUpdateJobOwner()::lambda(): copyNb not found."); }; ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback)); return ret.release(); diff --git a/objectstore/cta-objectstore-collect-orphaned-object.cpp b/objectstore/cta-objectstore-collect-orphaned-object.cpp index 98a85ffec62b9c846523738b88bc34781912bc0c..334530224454541d1f004c93ad0240a225079c84 100644 --- a/objectstore/cta-objectstore-collect-orphaned-object.cpp +++ b/objectstore/cta-objectstore-collect-orphaned-object.cpp @@ -30,6 +30,7 @@ #include "RetrieveRequest.hpp" #include "GenericObject.hpp" #include "common/log/StringLogger.hpp" +#include "common/Configuration.hpp" #include "catalogue/CatalogueFactory.hpp" #include <iostream> #include <stdexcept> @@ -41,14 +42,24 @@ int main(int argc, char ** argv) { std::unique_ptr<cta::catalogue::Catalogue> catalogue; cta::log::StringLogger sl("cta-objectstore-collect-orphaned", cta::log::DEBUG); cta::log::LogContext lc(sl); + std::string objectName; if (4 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(argv[2]); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 0; catalogue=std::move(cta::catalogue::CatalogueFactory::create(sl, catalogueLogin, nbConns, nbArchiveFileListingConns)); + objectName = argv[3]; + } else if (2 == argc) { + cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/etc/cta/cta_catalogue_db.conf"); + const uint64_t nbConns = 1; + const uint64_t nbArchiveFileListingConns = 0; + catalogue = std::move(cta::catalogue::CatalogueFactory::create(sl, catalogueLogin, nbConns, nbArchiveFileListingConns)); + objectName = argv[1]; } else { - throw std::runtime_error("Wrong number of arguments: expected 3: <objectstoreURL> <catalogue login file> <objectname>"); + throw std::runtime_error("Wrong number of arguments: expected 1 or 3: [objectstoreURL catalogueLoginFile] objectname"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. @@ -57,7 +68,7 @@ int main(int argc, char ** argv) { } catch (std::bad_cast &){} std::cout << "Object store path: " << be->getParams()->toURL() << std::endl; // Try and open the object. - cta::objectstore::GenericObject go(argv[3], *be); + cta::objectstore::GenericObject go(objectName, *be); cta::objectstore::ScopedExclusiveLock gol(go); std::cout << "Object address: " << go.getAddressIfSet() << std::endl; go.fetch(); @@ -74,7 +85,7 @@ int main(int argc, char ** argv) { gol.release(); bool someGcDone=false; gcpass: - cta::objectstore::ArchiveRequest ar(argv[3], *be); + cta::objectstore::ArchiveRequest ar(objectName, *be); cta::objectstore::ScopedExclusiveLock arl(ar); ar.fetch(); for (auto & j: ar.dumpJobs()) { @@ -100,7 +111,7 @@ int main(int argc, char ** argv) { // Reopen the object as an ArchiveRequest std::cout << "The object is an RetrieveRequest" << std::endl; gol.release(); - cta::objectstore::RetrieveRequest rr(argv[3], *be); + cta::objectstore::RetrieveRequest rr(objectName, *be); cta::objectstore::ScopedExclusiveLock rrl(rr); rr.fetch(); if (!be->exists(rr.getOwner())) { diff --git a/objectstore/cta-objectstore-dereference-removed-queues.cpp b/objectstore/cta-objectstore-dereference-removed-queues.cpp index a4667c4368b5d05c20120a1e9a473ad513d65473..560522493c2431fa27e253d4be4e10ada94b1748 100644 --- a/objectstore/cta-objectstore-dereference-removed-queues.cpp +++ b/objectstore/cta-objectstore-dereference-removed-queues.cpp @@ -22,6 +22,7 @@ * unblock tape servers after changing the ArchiveQueue schema during development. */ +#include "common/Configuration.hpp" #include "BackendFactory.hpp" #include "BackendVFS.hpp" #include "Agent.hpp" @@ -35,8 +36,11 @@ int main(int argc, char ** argv) { std::unique_ptr<cta::objectstore::Backend> be; if (2 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + } else if (1 == argc) { + cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); } else { - throw std::runtime_error("Wrong number of arguments: expected 1"); + throw std::runtime_error("Wrong number of arguments: expected 0 or 1: [objectstoreURL]"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. diff --git a/objectstore/cta-objectstore-dump-object.cpp b/objectstore/cta-objectstore-dump-object.cpp index d12ff6a6dc65669b9c4147fd16dd70a6371a1ef7..7e3899ef311a39ad22517fa9852359052a48d813 100644 --- a/objectstore/cta-objectstore-dump-object.cpp +++ b/objectstore/cta-objectstore-dump-object.cpp @@ -22,6 +22,7 @@ * the path the backend store and exit */ +#include "common/Configuration.hpp" #include "BackendFactory.hpp" #include "BackendVFS.hpp" #include "GenericObject.hpp" @@ -31,19 +32,25 @@ int main(int argc, char ** argv) { try { std::unique_ptr<cta::objectstore::Backend> be; + std::string objectName; if (3 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + objectName = argv[2]; + } else if (2 == argc ){ + cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); + be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + objectName = argv[1]; } else { - throw std::runtime_error("Wrong number of arguments: expected 2"); + throw std::runtime_error("Wrong number of arguments: expected 1 or 2: [objectstoreURL] objectname"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. try { dynamic_cast<cta::objectstore::BackendVFS &>(*be).noDeleteOnExit(); } catch (std::bad_cast &){} - std::cout << "Object store path: " << be->getParams()->toURL() - << " object name=" << argv[2] << std::endl; - cta::objectstore::GenericObject ge(argv[2], *be); + std::cout << "Object store path: " << be->getParams()->toURL() << std::endl + << "Object name: " << objectName << std::endl; + cta::objectstore::GenericObject ge(objectName, *be); cta::objectstore::ScopedSharedLock gel(ge); ge.fetch(); std::cout << ge.dump(gel) << std::endl; diff --git a/objectstore/cta-objectstore-list.cpp b/objectstore/cta-objectstore-list.cpp index 14f6eb55e7961e556c8b063d889db07168f9ab81..e0ddc4e82bdc0615cb2edd2083723fc7ffa51839 100644 --- a/objectstore/cta-objectstore-list.cpp +++ b/objectstore/cta-objectstore-list.cpp @@ -22,6 +22,7 @@ * the path the backend store and exit */ +#include "common/Configuration.hpp" #include "BackendVFS.hpp" #include "BackendFactory.hpp" #include "RootEntry.hpp" @@ -33,12 +34,12 @@ int main(int argc, char ** argv) { try { std::unique_ptr<cta::objectstore::Backend> be; if (1 == argc) { - be.reset(new cta::objectstore::BackendVFS); - + cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); } else if (2 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); } else { - throw std::runtime_error("Wrong number of arguments: expected 0 or 1"); + throw std::runtime_error("Wrong number of arguments: expected 0 or 1: [objectstoreURL]"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. @@ -51,7 +52,7 @@ int main(int argc, char ** argv) { std::cout << *o << std::endl; } } catch (std::exception & e) { - std::cerr << "Failed to initialise the root entry in a new VFS backend store" + std::cerr << "Failed to list backend store: " << std::endl << e.what() << std::endl; } } \ No newline at end of file diff --git a/objectstore/cta-objectstore-unfollow-agent.cpp b/objectstore/cta-objectstore-unfollow-agent.cpp index 1ab5273a891b6ccde4c1877f6c19d4296f73bcd1..1a61b3404e6bc88d6a0363d446fda6d7dbef3f95 100644 --- a/objectstore/cta-objectstore-unfollow-agent.cpp +++ b/objectstore/cta-objectstore-unfollow-agent.cpp @@ -22,6 +22,7 @@ * the path the backend store and exit */ +#include "common/Configuration.hpp" #include "BackendFactory.hpp" #include "BackendVFS.hpp" #include "Agent.hpp" @@ -33,11 +34,17 @@ int main(int argc, char ** argv) { try { + std::string agentName; std::unique_ptr<cta::objectstore::Backend> be; if (3 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + agentName = argv[2]; + } else if (2 == argc) { + cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); + be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + agentName = argv[1]; } else { - throw std::runtime_error("Wrong number of arguments: expected 2"); + throw std::runtime_error("Wrong number of arguments: expected 1 or 2: [objectstoreURL] agentName"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. @@ -45,8 +52,8 @@ int main(int argc, char ** argv) { dynamic_cast<cta::objectstore::BackendVFS &>(*be).noDeleteOnExit(); } catch (std::bad_cast &){} std::cout /* << "Object store path: " << be->getParams()->toURL() - << " agent */<< "name=" << argv[2] << std::endl; - if (!be->exists(argv[2])) { + << " agent */<< "name=" << agentName << std::endl; + if (!be->exists(agentName)) { // Agent does not exist: remove from registry. cta::objectstore::RootEntry re (*be); cta::objectstore::ScopedSharedLock rel(re); @@ -55,12 +62,12 @@ int main(int argc, char ** argv) { rel.release(); cta::objectstore::ScopedExclusiveLock arl(ar); ar.fetch(); - ar.removeAgent(argv[2]); + ar.removeAgent(agentName); ar.commit(); std::cout << "De-listed a non-existing agent." << std::endl; exit (EXIT_SUCCESS); } - cta::objectstore::Agent ag(argv[2], *be); + cta::objectstore::Agent ag(agentName, *be); cta::objectstore::ScopedExclusiveLock agl(ag); try { ag.fetch(); diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index de37ff600db33e566f1e239887d37165f73eb88d..627dc38c9ba2d3bb916596121eccd3fb2a638b3d 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -52,8 +52,8 @@ void cta::ArchiveJob::asyncSetJobSucceed() { //------------------------------------------------------------------------------ bool cta::ArchiveJob::checkAndAsyncReportComplete() { if (m_dbJob->checkSucceed()) { - std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState)); - reporter->asyncReportArchiveFullyComplete(); + m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState)); + m_reporter->asyncReportArchiveFullyComplete(); return true; } return false; diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index d94afd831da7e09e2ab34307928e344036c782fa..30ca2c4790dda3a7757e864156f5007eeed7918a 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -22,6 +22,7 @@ #include "common/remoteFS/RemotePathAndStatus.hpp" #include "scheduler/SchedulerDatabase.hpp" #include "catalogue/Catalogue.hpp" +#include "eos/DiskReporter.hpp" #include <stdint.h> #include <string> @@ -122,6 +123,11 @@ public: private: std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> m_dbJob; + /** + * The reporter for the job. + */ + std::unique_ptr<cta::eos::DiskReporter> m_reporter; + /** * The mount that generated this job */ diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 24833e1918372e648be867fb0d583b447bc68cbc..682fe93dbed7958f032b52e5939f033b1cb63791 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -326,6 +326,13 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share } { log::ScopedParamContainer params(logContext); + if (typeid(Queue) == typeid(objectstore::ArchiveQueue)) { + params.add("type", "Archive") + .add("tapepool", queueIndex); + } else if (typeid(Queue) == typeid(objectstore::RetrieveQueue)) { + params.add("type", "Retrieve") + .add("vid", queueIndex); + } params.add("objectQueue", queue.getAddressIfSet()) .add("jobsBefore", qJobsBefore) .add("jobsAfter", qJobsAfter) @@ -361,8 +368,12 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share log::ScopedParamContainer params(logContext); params.add("message", ex.getMessageValue()); logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got an exception writing. Will propagate to other threads."); + } catch (std::exception & ex) { + log::ScopedParamContainer params(logContext); + params.add("exceptionWhat", ex.what()); + logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got a standard exception writing. Will propagate to other threads."); } catch (...) { - logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got a non cta exception writing. Will propagate to other threads."); + logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got an unknown exception writing. Will propagate to other threads."); } size_t exceptionsNotPassed = 0; // Something went wrong. We should inform the other threads diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 92977384f3967d5a33cf9c81c2dc3786bfa7c825..a49cec8c12cf9b257effa3881b5748dc9adf4509 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -776,6 +776,8 @@ std::list<cta::common::dataStructures::ArchiveJob> ret.back().request.checksumValue = osar.getArchiveFile().checksumValue; ret.back().request.creationLog = osar.getEntryLog(); ret.back().request.diskFileID = osar.getArchiveFile().diskFileId; + ret.back().request.diskFileInfo = osar.getArchiveFile().diskFileInfo; + ret.back().request.fileSize = osar.getArchiveFile().fileSize; ret.back().instanceName = osar.getArchiveFile().diskInstance; ret.back().request.requester = osar.getRequester(); ret.back().request.srcURL = osar.getSrcURL(); @@ -1643,22 +1645,33 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou //------------------------------------------------------------------------------ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) { + utils::Timer t; + double driveRegisterCheckTime = 0; + double findQueueTime = 0; + double lockFetchQueueTime = 0; + double emptyQueueCleanupTime = 0; + double jobSelectionTime = 0; + double ownershipAdditionTime = 0; + double asyncUpdateLaunchTime = 0; + double jobsUpdateTime = 0; + double queueProcessTime = 0; + double ownershipRemovalTime = 0; + double queueCommitTime = 0; // Find the next files to archive // First, check we should not forcibly go down. In such an occasion, we just find noting to do. // Get drive register { // Get the archive queue objectstore::RootEntry re(m_objectStore); - objectstore::ScopedSharedLock rel(re); - re.fetch(); + re.fetchNoLock(); objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore); - ScopedSharedLock drl(dr); - dr.fetch(); + dr.fetchNoLock(); auto drs = dr.getDriveState(mountInfo.drive); if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) { logContext.log(log::INFO, "In OStoreDB::ArchiveMount::getNextJobBatch(): returning no job as we are forcibly going down."); return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >(); } + driveRegisterCheckTime = t.secs(utils::Timer::resetCounter); } // Now, we should repeatedly fetch jobs from the queue until we fulfilled the request or there is nothing to get form the // queue anymore. @@ -1675,8 +1688,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun uint64_t beforeFiles=currentFiles; // Try and get access to a queue. objectstore::RootEntry re(m_objectStore); - objectstore::ScopedSharedLock rel(re); - re.fetch(); + re.fetchNoLock(); std::string aqAddress; auto aql = re.dumpArchiveQueues(); for (auto & aqp : aql) { @@ -1687,16 +1699,16 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun // try and lock the archive queue. Any failure from here on means the end of the getting jobs. objectstore::ArchiveQueue aq(aqAddress, m_objectStore); objectstore::ScopedExclusiveLock aqlock; + findQueueTime += t.secs(utils::Timer::resetCounter); try { try { aqlock.lock(aq); - rel.release(); aq.fetch(); + lockFetchQueueTime += t.secs(utils::Timer::resetCounter); } catch (cta::exception::Exception & ex) { // The queue is now absent. We can remove its reference in the root entry. // A new queue could have been added in the mean time, and be non-empty. // We will then fail to remove from the RootEntry (non-fatal). - if (rel.isLocked()) rel.release(); ScopedExclusiveLock rexl(re); re.fetch(); try { @@ -1713,6 +1725,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("Message", ex.getMessageValue()); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry"); } + emptyQueueCleanupTime += t.secs(utils::Timer::resetCounter); continue; } // We now have the queue. @@ -1754,17 +1767,20 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("requestedBytes", bytesRequested); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): will process a set of candidate jobs."); } + jobSelectionTime += t.secs(utils::Timer::resetCounter); // We now have a batch of jobs to try and dequeue. Should not be empty. // First add the jobs to the owned list of the agent. std::list<std::string> addedJobs; for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_archiveRequest.getAddressIfSet()); m_agentReference.addBatchToOwnership(addedJobs, m_objectStore); + ownershipAdditionTime += t.secs(utils::Timer::resetCounter); // We can now attempt to switch the ownership of the jobs. Depending on the type of failure (if any) we // will adapt the rest. // First, start the parallel updates of jobs std::list<std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater>> jobUpdates; for (const auto &j: candidateJobs) jobUpdates.emplace_back( j->m_archiveRequest.asyncUpdateJobOwner(j->tapeFile.copyNb, m_agentReference.getAgentAddress(), aqAddress)); + asyncUpdateLaunchTime += t.secs(utils::Timer::resetCounter); // Now run through the results of the asynchronous updates. Non-success results come in the form of exceptions. std::list<std::string> jobsToForget; // The jobs either absent or not owned, for which we should just remove references (agent). std::list<std::string> jobsToDequeue; // The jobs that should not be queued anymore. All of them indeed (invalid or successfully poped). @@ -1789,10 +1805,14 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun (*j)->m_mountId = mountInfo.mountId; (*j)->m_tapePool = mountInfo.tapePool; log::ScopedParamContainer params(logContext); + auto timingsReport = (*ju)->getTimeingsReport(); params.add("tapepool", mountInfo.tapePool) .add("queueObject", aq.getAddressIfSet()) .add("requestObject", (*j)->m_archiveRequest.getAddressIfSet()) - .add("fileId", (*j)->archiveFile.archiveFileID); + .add("fileId", (*j)->archiveFile.archiveFileID) + .add("lockFetchTime", timingsReport.lockFetchTime) + .add("processTime", timingsReport.processTime) + .add("commitUnlockTime", timingsReport.commitUnlockTime); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): popped one job"); validatedJobs.emplace_back(std::move(*j)); } catch (cta::exception::Exception & e) { @@ -1855,15 +1875,21 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun currentFiles--; currentBytes-=(*j)->archiveFile.fileSize; } + jobsUpdateTime += t.secs(utils::Timer::resetCounter); // In all cases: move to the nexts. ju=jobUpdates.erase(ju); j=candidateJobs.erase(j); } // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue. for (const auto &j: jobsToDequeue) aq.removeJob(j); - if (jobsToForget.size()) m_agentReference.removeBatchFromOwnership(addedJobs, m_objectStore); + queueProcessTime += t.secs(utils::Timer::resetCounter); + if (jobsToForget.size()) { + m_agentReference.removeBatchFromOwnership(addedJobs, m_objectStore); + ownershipRemovalTime += t.secs(utils::Timer::resetCounter); + } // (Possibly intermediate) commit of the queue. We keep the lock for the moment. aq.commit(); + queueCommitTime += t.secs(utils::Timer::resetCounter); // We can now add the validated jobs to the return value. auto vj = validatedJobs.begin(); while (vj != validatedJobs.end()) { @@ -1940,7 +1966,18 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun log::ScopedParamContainer params(logContext); params.add("tapepool", mountInfo.tapePool) .add("files", nFiles) - .add("bytes", nBytes); + .add("bytes", nBytes) + .add("driveRegisterCheckTime", driveRegisterCheckTime) + .add("findQueueTime", findQueueTime) + .add("lockFetchQueueTime", lockFetchQueueTime) + .add("emptyQueueCleanupTime", emptyQueueCleanupTime) + .add("jobSelectionTime", jobSelectionTime) + .add("ownershipAdditionTime", ownershipAdditionTime) + .add("asyncUpdateLaunchTime", asyncUpdateLaunchTime) + .add("jobsUpdateTime", jobsUpdateTime) + .add("queueProcessTime", queueProcessTime) + .add("ownershipRemovalTime", ownershipRemovalTime) + .add("queueCommitTime", queueCommitTime); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete."); } // We can construct the return value. @@ -2161,7 +2198,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo params.add("tapepool", mountInfo.tapePool) .add("queueObject", rq.getAddressIfSet()) .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); - logContext.log(log::WARNING, "In ArchiveMount::getNextJobBatch(): skipped job not owned or not present."); + logContext.log(log::WARNING, "In RetrieveMount::getNextJobBatch(): skipped job not owned or not present."); } else if (typeid(e) == typeid(Backend::CouldNotUnlock)) { // We failed to unlock the object. The request was successfully updated, so we do own it. This is a non-fatal // situation, so we just issue a warning. Removing the request from our agent ownership would @@ -2180,7 +2217,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo free(exceptionTypeStr); exceptionTypeStr = nullptr; params.add("message", e.getMessageValue()); - logContext.log(log::WARNING, "In ArchiveMount::getNextJobBatch(): Failed to unlock the request (lock expiration). Request remains selected."); + logContext.log(log::WARNING, "In RetrieveMount::getNextJobBatch(): Failed to unlock the request (lock expiration). Request remains selected."); validatedJobs.emplace_back(std::move(*j)); } else { // This is not a success, yet we could not confirm the job status due to an unexpected error. @@ -2199,7 +2236,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo free(exceptionTypeStr); exceptionTypeStr = nullptr; params.add("message", e.getMessageValue()); - logContext.log(log::ERR, "In ArchiveMount::getNextJobBatch(): unexpected error. Leaving the job queued."); + logContext.log(log::ERR, "In RetrieveMount::getNextJobBatch(): unexpected error. Leaving the job queued."); jobsToForget.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); retrieveRequestsToSkip.insert((*j)->m_retrieveRequest.getAddressIfSet()); } @@ -2237,13 +2274,13 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo log::ScopedParamContainer params(logContext); params.add("tapepool", mountInfo.tapePool) .add("queueObject", rq.getAddressIfSet()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): deleted empty queue"); + logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): deleted empty queue"); } catch (cta::exception::Exception &ex) { log::ScopedParamContainer params(logContext); params.add("tapepool", mountInfo.tapePool) .add("queueObject", rq.getAddressIfSet()) .add("Message", ex.getMessageValue()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not delete a presumably empty queue"); + logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not delete a presumably empty queue"); } } // We can now summarize this round @@ -2258,7 +2295,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo .add("filesAfter", currentFiles) .add("bytesAfter", currentBytes) .add("iterationCount", iterationCount); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): did one round of jobs retrieval."); + logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): did one round of jobs retrieval."); } // We could be done now. if (currentBytes >= bytesRequested || currentFiles >= filesRequested) @@ -2270,16 +2307,16 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo } catch (cta::exception::Exception & ex) { log::ScopedParamContainer params (logContext); params.add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (CTA exception) getting more jobs. Backtrace follows."); + logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (CTA exception) getting more jobs. Backtrace follows."); logContext.logBacktrace(log::ERR, ex.backtrace()); break; } catch (std::exception & e) { log::ScopedParamContainer params (logContext); params.add("exceptionWhat", e.what()); - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (std exception) getting more jobs."); + logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (std exception) getting more jobs."); break; } catch (...) { - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (unknown exception) getting more jobs."); + logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (unknown exception) getting more jobs."); break; } } @@ -2295,7 +2332,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo params.add("tapepool", mountInfo.tapePool) .add("files", nFiles) .add("bytes", nBytes); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete."); + logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): jobs retrieval complete."); } // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > ret; diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 347d9478f60c56c5616efeb871d26d807199e22d..f4263436dc68613b39355e08c0e7edef12791272 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -379,39 +379,65 @@ void Scheduler::setDesiredDriveState(const common::dataStructures::SecurityIdent //------------------------------------------------------------------------------ // setDesiredDriveState //------------------------------------------------------------------------------ -void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, common::dataStructures::MountType type, common::dataStructures::DriveStatus status) { +void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, common::dataStructures::MountType type, common::dataStructures::DriveStatus status, log::LogContext & lc) { // TODO: mount type should be transmitted too. + utils::Timer t; m_db.reportDriveStatus(driveInfo, type, status, time(NULL)); + auto schedulerDbTime = t.secs(); + log::ScopedParamContainer spc(lc); + spc.add("drive", driveInfo.driveName) + .add("schedulerDbTime", schedulerDbTime); + lc.log(log::INFO, "In Scheduler::reportDriveStatus(): success."); } //------------------------------------------------------------------------------ // getPendingArchiveJobs //------------------------------------------------------------------------------ -std::map<std::string, std::list<common::dataStructures::ArchiveJob> > Scheduler::getPendingArchiveJobs() const { - return m_db.getArchiveJobs(); +std::map<std::string, std::list<common::dataStructures::ArchiveJob> > Scheduler::getPendingArchiveJobs(log::LogContext & lc) const { + utils::Timer t; + auto ret = m_db.getArchiveJobs(); + auto schedulerDbTime = t.secs(); + log::ScopedParamContainer spc(lc); + spc.add("schedulerDbTime", schedulerDbTime); + lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(): success."); + return ret; } //------------------------------------------------------------------------------ // getPendingArchiveJobs //------------------------------------------------------------------------------ -std::list<common::dataStructures::ArchiveJob> Scheduler::getPendingArchiveJobs(const std::string &tapePoolName) const { +std::list<common::dataStructures::ArchiveJob> Scheduler::getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext & lc) const { + utils::Timer t; if(!m_catalogue.tapePoolExists(tapePoolName)) { throw exception::Exception(std::string("Tape pool ") + tapePoolName + " does not exist"); } - return m_db.getArchiveJobs(tapePoolName); + auto catalogueTime = t.secs(utils::Timer::resetCounter); + auto ret = m_db.getArchiveJobs(tapePoolName); + auto schedulerDbTime = t.secs(); + log::ScopedParamContainer spc(lc); + spc.add("catalogueTime", catalogueTime) + .add("schedulerDbTime", schedulerDbTime); + lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(tapePool): success."); + return ret; } //------------------------------------------------------------------------------ // getPendingRetrieveJobs //------------------------------------------------------------------------------ -std::map<std::string, std::list<common::dataStructures::RetrieveJob> > Scheduler::getPendingRetrieveJobs() const { - return m_db.getRetrieveJobs(); +std::map<std::string, std::list<common::dataStructures::RetrieveJob> > Scheduler::getPendingRetrieveJobs(log::LogContext & lc) const { + utils::Timer t; + auto ret = m_db.getRetrieveJobs(); + auto schedulerDbTime = t.secs(); + log::ScopedParamContainer spc(lc); + spc.add("schedulerDbTime", schedulerDbTime); + lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success."); + return ret; } //------------------------------------------------------------------------------ // getPendingRetrieveJobs //------------------------------------------------------------------------------ -std::list<common::dataStructures::RetrieveJob> Scheduler::getPendingRetrieveJobs(const std::string& vid) const { +std::list<common::dataStructures::RetrieveJob> Scheduler::getPendingRetrieveJobs(const std::string& vid, log::LogContext &lc) const { throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); } @@ -443,9 +469,24 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib // Many steps for this logic are not specific for the database and are hence // implemented in the scheduler itself. // First, get the mount-related info from the DB + utils::Timer timer; + double getMountInfoTime = 0; + double queueTrimingTime = 0; + double getTapeInfoTime = 0; + double candidateSortingTime = 0; + double getTapeForWriteTime = 0; + double decisionTime = 0; + double mountCreationTime = 0; + double driveStatusSetTime = 0; + double schedulerDbTime = 0; + double catalogueTime = 0; std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo; mountInfo = m_db.getMountInfo(lc); - if (mountInfo->queueTrimRequired) m_db.trimEmptyQueues(lc); + getMountInfoTime = timer.secs(utils::Timer::resetCounter); + if (mountInfo->queueTrimRequired) { + m_db.trimEmptyQueues(lc); + queueTrimingTime = timer.secs(utils::Timer::resetCounter); + } __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo; // The library information is not know for the tapes involved in retrieves. We @@ -457,6 +498,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib } if (tapeSet.size()) { auto tapesInfo=m_catalogue.getTapesByVid(tapeSet); + getTapeInfoTime = timer.secs(utils::Timer::resetCounter); for (auto &m:mountInfo->potentialMounts) { if (m.type==common::dataStructures::MountType::Retrieve) { m.logicalLibrary=tapesInfo[m.vid].logicalLibraryName; @@ -525,8 +567,11 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib mountPassesACriteria = true; if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) { log::ScopedParamContainer params(lc); - params.add("tapepool", m->tapePool) - .add("mountType", common::dataStructures::toString(m->type)) + params.add("tapepool", m->tapePool); + if ( m->type == common::dataStructures::MountType::Retrieve) { + params.add("VID", m->vid); + } + params.add("mountType", common::dataStructures::toString(m->type)) .add("existingMounts", existingMounts) .add("bytesQueued", m->bytesQueued) .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) @@ -542,8 +587,11 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib // populate the mount with a weight m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->maxDrivesAllowed; log::ScopedParamContainer params(lc); - params.add("tapepool", m->tapePool) - .add("mountType", common::dataStructures::toString(m->type)) + params.add("tapepool", m->tapePool); + if ( m->type == common::dataStructures::MountType::Retrieve) { + params.add("VID", m->vid); + } + params.add("mountType", common::dataStructures::toString(m->type)) .add("existingMounts", existingMounts) .add("bytesQueued", m->bytesQueued) .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) @@ -565,6 +613,8 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end()); std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end()); + candidateSortingTime = timer.secs(utils::Timer::resetCounter); + // Find out if we have any potential archive mount in the list. If so, get the // list of tapes from the catalogue. std::list<catalogue::TapeForWriting> tapeList; @@ -572,6 +622,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == common::dataStructures::MountType::Archive; } )) { tapeList = m_catalogue.getTapesForWriting(logicalLibraryName); + getTapeForWriteTime = timer.secs(utils::Timer::resetCounter); } // Remove from the tape list the ones already or soon to be mounted @@ -600,18 +651,23 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_catalogue)); // Get the db side of the session try { + decisionTime += timer.secs(utils::Timer::resetCounter); internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t, driveName, logicalLibraryName, utils::getShortHostname(), time(NULL)).release()); + mountCreationTime += timer.secs(utils::Timer::resetCounter); internalRet->m_sessionRunning = true; internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting); + driveStatusSetTime += timer.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); uint32_t existingMounts = 0; try { existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); } catch (...) {} + schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; params.add("tapepool", m->tapePool) .add("vid", t.vid) .add("mountType", common::dataStructures::toString(m->type)) @@ -621,7 +677,17 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge); + .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("getMountInfoTime", getMountInfoTime) + .add("queueTrimingTime", queueTrimingTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("mountCreationTime", mountCreationTime) + .add("driveStatusSetTime", driveStatusSetTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); lc.log(log::DEBUG, "In Scheduler::getNextMount(): Selected next mount (archive)"); return std::unique_ptr<TapeMount> (internalRet.release()); } catch (cta::exception::Exception & ex) { @@ -639,6 +705,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib if (tapesInUse.count(m->vid)) continue; try { // create the mount, and populate its DB side. + decisionTime += timer.secs(utils::Timer::resetCounter); std::unique_ptr<RetrieveMount> internalRet ( new RetrieveMount(mountInfo->createRetrieveMount(m->vid, m->tapePool, @@ -646,15 +713,19 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib logicalLibraryName, utils::getShortHostname(), time(NULL)))); + mountCreationTime += timer.secs(utils::Timer::resetCounter); internalRet->m_sessionRunning = true; internalRet->m_diskRunning = true; internalRet->m_tapeRunning = true; internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting); + driveStatusSetTime += timer.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); uint32_t existingMounts = 0; try { existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); } catch (...) {} + schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; params.add("tapepool", m->tapePool) .add("vid", m->vid) .add("mountType", common::dataStructures::toString(m->type)) @@ -664,7 +735,17 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge); + .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("getMountInfoTime", getMountInfoTime) + .add("queueTrimingTime", queueTrimingTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("mountCreationTime", mountCreationTime) + .add("driveStatusSetTime", driveStatusSetTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); lc.log(log::DEBUG, "In Scheduler::getNextMount(): Selected next mount (retrieve)"); return std::unique_ptr<TapeMount> (internalRet.release()); } catch (exception::Exception & ex) { @@ -677,6 +758,19 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib throw std::runtime_error("In Scheduler::getNextMount unexpected mount type"); } } + schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; + log::ScopedParamContainer params(lc); + params.add("getMountInfoTime", getMountInfoTime) + .add("queueTrimingTime", queueTrimingTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("mountCreationTime", mountCreationTime) + .add("driveStatusSetTime", driveStatusSetTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); lc.log(log::DEBUG, "In Scheduler::getNextMount(): No valid mount found."); return std::unique_ptr<TapeMount>(); } diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 5d633f7c7d5c5fdcdee96868be30dbb709e82cfd..49a9f93ee29cce89fd462bc83feb474400cca052 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -189,10 +189,10 @@ public: const cta::common::dataStructures::TestSourceType testSourceType, const std::string &tag) const; - std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > getPendingArchiveJobs() const; - std::list<cta::common::dataStructures::ArchiveJob> getPendingArchiveJobs(const std::string &tapePoolName) const; - std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > getPendingRetrieveJobs() const; - std::list<cta::common::dataStructures::RetrieveJob> getPendingRetrieveJobs(const std::string &vid) const; + std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > getPendingArchiveJobs(log::LogContext &lc) const; + std::list<cta::common::dataStructures::ArchiveJob> getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext &lc) const; + std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > getPendingRetrieveJobs(log::LogContext &lc) const; + std::list<cta::common::dataStructures::RetrieveJob> getPendingRetrieveJobs(const std::string &vid, log::LogContext &lc) const; /*============== Drive state management ====================================*/ CTA_GENERATE_EXCEPTION_CLASS(NoSuchDrive); @@ -227,7 +227,7 @@ public: * error encountered by the drive. */ void reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, cta::common::dataStructures::MountType type, - cta::common::dataStructures::DriveStatus status); + cta::common::dataStructures::DriveStatus status, log::LogContext & lc); /** * Dumps the states of all drives for display diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 646084f2c8d8e6594a5b0a0f9d9512f6a50ed556..a9070717b6dd7b635afef30cd331dcab3783f8cc 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -265,7 +265,7 @@ TEST_P(SchedulerTest, archive_to_new_file) { scheduler.queueArchive(s_diskInstance, request, lc); { - auto rqsts = scheduler.getPendingArchiveJobs(); + auto rqsts = scheduler.getPendingArchiveJobs(lc); ASSERT_EQ(1, rqsts.size()); auto poolItor = rqsts.cbegin(); ASSERT_FALSE(poolItor == rqsts.cend()); @@ -401,7 +401,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { // (otherwise we miss the selected ones). // Could also be limited to querying by ID (global index needed) bool found=false; - for (auto & tp: scheduler.getPendingArchiveJobs()) { + for (auto & tp: scheduler.getPendingArchiveJobs(lc)) { for (auto & req: tp.second) { if (req.archiveFileID == archiveFileId) found = true; @@ -437,8 +437,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { std::unique_ptr<cta::TapeMount> mount; // This first initialization is normally done by the dataSession function. cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); ASSERT_NE((cta::TapeMount*)NULL, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Archive, mount.get()->getMountType()); @@ -490,7 +490,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { // Check that the retrieve request is queued { - auto rqsts = scheduler.getPendingRetrieveJobs(); + auto rqsts = scheduler.getPendingRetrieveJobs(lc); // We expect 1 tape with queued jobs ASSERT_EQ(1, rqsts.size()); // We expect the queue to contain 1 job diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp index b97d8e5ec4c168cd5ebed4022c9639f404f15e21..c81e4f4643a1b3d7f9bb5e8271f0651255d388e2 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp @@ -86,6 +86,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction cta::log::LogContext lc(m_log); // Create a sticky thread name, which will be overridden by the other threads lc.pushOrReplace(cta::log::Param("thread", "MainThread")); + lc.pushOrReplace(cta::log::Param("unitName", m_driveConfig.unitName)); // 2a) Determine if we want to mount at all (for now) // This variable will allow us to see if we switched from down to up and start a @@ -99,7 +100,7 @@ schedule: downUpTransition = true; // We wait a bit before polling the scheduler again. // TODO: parametrize the duration? - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); sleep (5); } else { break; @@ -107,7 +108,7 @@ schedule: } catch (cta::Scheduler::NoSuchDrive & e) { // The object store does not even know about this drive. We will report our state // (default status is down). - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); } } // If we get here after seeing a down desired state, we are transitioning from @@ -117,7 +118,7 @@ schedule: castor::tape::tapeserver::daemon::EmptyDriveProbe emptyDriveProbe(m_log, m_driveConfig, m_sysWrapper); lc.log(cta::log::INFO, "Transition from down to up detected. Will check if a tape is in the drive."); if (!emptyDriveProbe.driveIsEmpty()) { - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity securityIdentity; m_scheduler.setDesiredDriveState(securityIdentity, m_driveConfig.unitName, false, false, lc); lc.log(cta::log::ERR, "A tape was detected in the drive. Putting the drive back down."); @@ -135,7 +136,7 @@ schedule: localParams.add("errorMessage", e.getMessageValue()); lc.log(cta::log::ERR, "Error while scheduling new mount. Putting the drive down. Stack trace follows."); lc.logBacktrace(cta::log::ERR, e.backtrace()); - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity cliId; m_scheduler.setDesiredDriveState(cliId, m_driveConfig.unitName, false, false, lc); return MARK_DRIVE_AS_DOWN; @@ -143,7 +144,7 @@ schedule: // No mount to be done found, that was fast... if (!tapeMount.get()) { lc.log(cta::log::DEBUG, "No new mount found. (sleeping 10 seconds)"); - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); sleep (10); goto schedule; // return MARK_DRIVE_AS_UP; @@ -154,7 +155,8 @@ schedule: // 2c) ... and log. // Make the DGN and TPVID parameter permanent. cta::log::ScopedParamContainer params(lc); - params.add("TPVID", m_volInfo.vid); + params.add("vid", m_volInfo.vid) + .add("mountId", tapeMount->getMountTransactionId()); { cta::log::ScopedParamContainer localParams(lc); localParams.add("tapebridgeTransId", tapeMount->getMountTransactionId()) diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index f934c54acfdcb25758a5ea6a18c997bae5c36a21..d7aa73109d54e8d1205d892e9976960b3de7ff23 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -433,7 +433,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // 7) Create the data transfer session @@ -626,7 +626,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // 7) Create the data transfer session @@ -829,7 +829,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) { driveInfo.driveName=driveConfig.unitName; driveInfo.logicalLibrary=driveConfig.rawLibrarySlot; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // 7) Create the data transfer session @@ -1003,7 +1003,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // 8) Create the data transfer session @@ -1147,7 +1147,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // 8) Create the data transfer session @@ -1274,7 +1274,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayMigration) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // Create the data transfer session @@ -1416,7 +1416,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // Create the data transfer session @@ -1552,7 +1552,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // Create the data transfer session @@ -1702,7 +1702,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) { driveInfo.logicalLibrary=driveConfig.logicalLibrary; driveInfo.host=="host"; // We need to create the drive in the registry before being able to put it up. - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); // Create the data transfer session diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index 0e004b22eeafe02fe01561efe3d5811b770b2cbe..c6f1ab7cedb84463b1c1076ed6023c131750b10f 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -304,7 +304,8 @@ void MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs( try { job->waitForReporting(); // should not be a deadWait as soon as we have a timeout on the xroot query cta::log::ScopedParamContainer params(logContext); - params.add("reportURL", job->reportURL()); + params.add("reportURL", job->reportURL()) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); logContext.log(cta::log::INFO,"Reported to the client a full file archival"); } catch(cta::exception::Exception &ex) { cta::log::ScopedParamContainer params(logContext); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index e0ccfbf1ce6a3389b4909f9d4100289f974e2997..563bb7cf16df62618d7275275c654697c39f387e 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -394,6 +394,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::logWithStat( /1000/1000/m_stats.totalTime:0.0) .add("driveTransferSpeedMBps", m_stats.totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume) /1000/1000/m_stats.totalTime:0.0); + m_logContext.moveToTheEndIfPresent("status"); m_logContext.log(level,msg); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp index 07e153302017cccc32659bf554c70747fed05e98..c4bfb0452b4a76829b46a04dcffa66f005615505 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp @@ -529,6 +529,7 @@ int level,const std::string& msg, cta::log::ScopedParamContainer& params){ /1000/1000/m_stats.totalTime:0.0) .add("driveTransferSpeedMBps", m_stats.totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume) /1000/1000/m_stats.totalTime:0.0); + m_logContext.moveToTheEndIfPresent("status"); m_logContext.log(level, msg); } diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 13c2d121c04971eaba80e87b03b59e42a5200949..b315078a4dc9a41b8de1c7fb2bc0fd7757536cda 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -165,7 +165,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::fork() { } } catch (cta::exception::Exception & ex) { cta::log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName) + params.add("unitName", m_configLine.unitName) .add("Error", ex.getMessageValue()); m_processManager.logContext().log(log::ERR, "Failed to fork drive process. Initiating shutdown with SIGTERM."); // Wipe all previous states as we are shutting down @@ -231,7 +231,7 @@ void DriveHandler::kill() { // If we have a subprocess, kill it and wait for completion (if needed). We do not need to keep // track of the exit state as kill() means we will not be called anymore. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (m_pid != -1) { params.add("SubProcessId", m_pid); // The socket pair will be reopened on the next fork. Clean it up. @@ -255,6 +255,7 @@ void DriveHandler::kill() { m_sessionEndContext.pushOrReplace({"Error_sessionKilled", "1"}); m_sessionEndContext.pushOrReplace({"killSignal", WTERMSIG(status)}); m_sessionEndContext.pushOrReplace({"status", "failure"}); + m_sessionEndContext.pushOrReplace({"unitName",m_configLine.unitName}); m_sessionEndContext.log(cta::log::INFO, "Tape session finished"); m_sessionEndContext.clear(); m_pid=-1; @@ -272,7 +273,7 @@ void DriveHandler::kill() { //------------------------------------------------------------------------------ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() { log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); // Read from the socket pair try { serializers::WatchdogMessage message; @@ -373,7 +374,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processStartingUp(serializers: // We expect to reach this state from pending fork. This is the first signal from the new process. // Check the transition is expected. This is non-fatal as the drive session has the last word anyway. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (m_sessionState!=SessionState::PendingFork || m_sessionType!=SessionType::Undetermined) { params.add("ExpectedState", session::toString(SessionState::PendingFork)) .add("ActualState", session::toString(m_sessionState)) @@ -402,7 +403,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processScheduling(serializers: // We are either going to schedule // Check the transition is expected. This is non-fatal as the drive session has the last word anyway. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); std::set<SessionState> expectedStates = { SessionState::StartingUp, SessionState::Scheduling }; if (!expectedStates.count(m_sessionState) || m_sessionType != SessionType::Undetermined || @@ -438,7 +439,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processChecking(serializers::W // We expect to come from statup/undefined and to get into checking/cleanup // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (m_sessionState!=SessionState::StartingUp || m_sessionType!=SessionType::Undetermined|| (SessionType)message.sessiontype()!=SessionType::Cleanup) { params.add("PreviousState", session::toString(m_sessionState)) @@ -472,7 +473,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processMounting(serializers::W // The only transition expected is from scheduling. Several sessions types are possible // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); std::set<SessionType> expectedNewTypes= { SessionType::Archive, SessionType::Retrieve, SessionType::Label }; if (m_sessionState!=SessionState::Scheduling || m_sessionType!=SessionType::Undetermined|| @@ -510,7 +511,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processRunning(serializers::Wa // We expect the type not to change (and to be in the right range) // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); std::set<SessionState> expectedStates = { SessionState::Mounting, SessionState::Running }; std::set<SessionType> expectedTypes = { SessionType::Archive, SessionType::Retrieve, SessionType::Label }; if (!expectedStates.count(m_sessionState) || @@ -554,7 +555,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processUnmounting(serializers: // of checking in the case of the cleanup session. // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); std::set<std::tuple<SessionState, SessionType>> expectedStateTypes = { std::make_tuple( SessionState::Running, SessionType::Archive ), @@ -587,7 +588,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processDrainingToDisk(serializ // This status transition is expected from unmounting, and only for retrieve sessions. // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (SessionState::Unmounting != m_sessionState || SessionType::Retrieve != m_sessionType) { params.add("PreviousState", session::toString(m_sessionState)) @@ -613,7 +614,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processShutingDown(serializers // This status transition is expected from unmounting, and only for retrieve sessions. // As usual, subprocess has the last word. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); std::set<SessionState> expectedStates = { SessionState::Unmounting, SessionState::DrainingToDisk }; if (!expectedStates.count(m_sessionState)) { params.add("PreviousState", session::toString(m_sessionState)) @@ -639,7 +640,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processFatal(serializers::Watc // This status indicates that the session cannot be run and the server should // shut down (central storage unavailable). log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); m_sessionState=(SessionState)message.sessionstate(); m_sessionType=(SessionType)message.sessiontype(); m_sessionVid=""; @@ -701,7 +702,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processSigChild() { // be picked up) and -1 if the process is entirely gone. // Of course we might not have a child process to begin with. log::ScopedParamContainer params(m_processManager.logContext()); - params.add("DriveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (-1 == m_pid) return m_processingStatus; int processStatus; int rc=::waitpid(m_pid, &processStatus, WNOHANG); @@ -711,7 +712,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processSigChild() { } catch (exception::Exception &ex) { cta::log::ScopedParamContainer params(m_processManager.logContext()); params.add("pid", m_pid) - .add("driveUnit", m_configLine.unitName) + .add("unitName", m_configLine.unitName) .add("Message", ex.getMessageValue()) .add("SessionState", session::toString(m_sessionState)) .add("SessionType", toString(m_sessionType)); @@ -764,6 +765,8 @@ SubprocessHandler::ProcessingStatus DriveHandler::processSigChild() { m_sessionEndContext.pushOrReplace({"status", "failure"}); } // In all cases we log the end of the session. + m_sessionEndContext.pushOrReplace({"unitName",m_configLine.unitName}); + m_sessionEndContext.moveToTheEndIfPresent("status"); m_sessionEndContext.log(cta::log::INFO, "Tape session finished"); m_sessionEndContext.clear(); // And record we do not have a process anymore. @@ -779,7 +782,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processTimeout() { // Process manager found that we timed out. Let's log why and kill the child process, // if any (there should be one). log::ScopedParamContainer params(m_processManager.logContext()); - params.add("driveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); if (-1 == m_pid) { m_processManager.logContext().log(log::ERR, "In DriveHandler::processTimeout(): Received timeout without child process present."); m_processManager.logContext().log(log::INFO, "Re-launching child process."); @@ -937,7 +940,7 @@ int DriveHandler::runChild() { // 1) Special case first, if we crashed in a cleaner session, we put the drive down if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) { log::ScopedParamContainer params(lc); - params.add("driveUnit", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); lc.log(log::ERR, "In DriveHandler::runChild(): the cleaner session crashed. Putting the drive down."); // Get hold of the scheduler. try { @@ -945,7 +948,7 @@ int DriveHandler::runChild() { driveInfo.driveName=m_configLine.unitName; driveInfo.logicalLibrary=m_configLine.logicalLibrary; driveInfo.host=hostname; - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity securityIdentity; scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName, false, false, lc); return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; @@ -971,7 +974,7 @@ int DriveHandler::runChild() { driveInfo.driveName=m_configLine.unitName; driveInfo.logicalLibrary=m_configLine.logicalLibrary; driveInfo.host=hostname; - scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity securityIdentity; scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName, false, false, lc); return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; @@ -988,7 +991,7 @@ int DriveHandler::runChild() { { log::ScopedParamContainer params(lc); params.add("VID", m_previousVid) - .add("driveUnit", m_configLine.unitName) + .add("unitName", m_configLine.unitName) .add("PreviousState", session::toString(m_sessionState)) .add("PreviousType", session::toString(m_sessionType)); lc.log(log::INFO, "In DriveHandler::runChild(): starting cleaner after crash with tape potentially loaded."); @@ -1055,7 +1058,7 @@ int DriveHandler::runChild() { if (m_previousSession == PreviousSession::Initiating) { // Log that we put the drive's desired state to down and do it. log::ScopedParamContainer params(lc); - params.add("Drive", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); lc.log(log::INFO, "Setting the drive down at daemon startup"); try { // Before setting the desired state as down, we have to make sure the drive exists in the registry. @@ -1064,7 +1067,7 @@ int DriveHandler::runChild() { driveInfo.driveName=m_configLine.unitName; driveInfo.logicalLibrary=m_configLine.logicalLibrary; driveInfo.host=hostname; - scheduler.reportDriveStatus(driveInfo, common::dataStructures::MountType::NoMount, common::dataStructures::DriveStatus::Down); + scheduler.reportDriveStatus(driveInfo, common::dataStructures::MountType::NoMount, common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity securityIdentity; scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName, false /* down */, false /* no force down*/, lc); } catch (cta::exception::Exception & ex) { @@ -1099,7 +1102,7 @@ int DriveHandler::runChild() { SubprocessHandler::ProcessingStatus DriveHandler::shutdown() { // TODO: improve in the future (preempt the child process) log::ScopedParamContainer params(m_processManager.logContext()); - params.add("Drive", m_configLine.unitName); + params.add("unitName", m_configLine.unitName); m_processManager.logContext().log(log::INFO, "In DriveHandler::shutdown(): simply killing the process."); kill(); m_sessionState = SessionState::Shutdown; diff --git a/xroot_plugins/XrdCtaFile.cpp b/xroot_plugins/XrdCtaFile.cpp index 213848637b915835d87ad0bdf62cb23a51f22e2e..b545229f9954487dd8a2779ca57f72f85c6a8031 100644 --- a/xroot_plugins/XrdCtaFile.cpp +++ b/xroot_plugins/XrdCtaFile.cpp @@ -24,6 +24,7 @@ #include "catalogue/TapeFileSearchCriteria.hpp" #include "common/Configuration.hpp" #include "common/utils/utils.hpp" +#include "common/utils/Regex.hpp" #include "common/Timer.hpp" #include "common/utils/GetOptThreadSafe.hpp" #include "common/exception/UserError.hpp" @@ -254,8 +255,7 @@ int XrdCtaFile::fctl(const int cmd, const char *args, XrdOucErrInfo &eInfo) { // FName //------------------------------------------------------------------------------ const char* XrdCtaFile::FName() { - error.setErrInfo(ENOTSUP, "Not supported."); - return nullptr; + return ""; } //------------------------------------------------------------------------------ @@ -686,6 +686,9 @@ std::string XrdCtaFile::xCom_admin() { << "\tch --username/-u <user_name> --comment/-m <\"comment\">" << std::endl << "\trm --username/-u <user_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -740,6 +743,9 @@ std::string XrdCtaFile::xCom_adminhost() { << "\tch --name/-n <host_name> --comment/-m <\"comment\">" << std::endl << "\trm --name/-n <host_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -794,6 +800,9 @@ std::string XrdCtaFile::xCom_tapepool() { << "\tch --name/-n <tapepool_name> [--partialtapesnumber/-p <number_of_partial_tapes>] [--encrypted/-e <\"true\" or \"false\">] [--comment/-m <\"comment\">]" << std::endl << "\trm --name/-n <tapepool_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -861,6 +870,9 @@ std::string XrdCtaFile::xCom_archiveroute() { << "\tch --instance/-i <instance_name> --storageclass/-s <storage_class_name> --copynb/-c <copy_number> [--tapepool/-t <tapepool_name>] [--comment/-m <\"comment\">]" << std::endl << "\trm --instance/-i <instance_name> --storageclass/-s <storage_class_name> --copynb/-c <copy_number>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -926,6 +938,9 @@ std::string XrdCtaFile::xCom_logicallibrary() { << "\tch --name/-n <logical_library_name> --comment/-m <\"comment\">" << std::endl << "\trm --name/-n <logical_library_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -987,6 +1002,9 @@ std::string XrdCtaFile::xCom_tape() { << "\tlabel --vid/-v <vid> [--force/-f <\"true\" or \"false\">] [--lbp/-l <\"true\" or \"false\">] [--tag/-t <tag_name>]" << std::endl << "Where" << std::endl << "\tencryption_key Is the name of the encryption key used to encrypt the tape" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1131,6 +1149,9 @@ std::string XrdCtaFile::xCom_storageclass() { << "\tch --instance/-i <instance_name> --name/-n <storage_class_name> [--copynb/-c <number_of_tape_copies>] [--comment/-m <\"comment\">]" << std::endl << "\trm --instance/-i <instance_name> --name/-n <storage_class_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1199,6 +1220,9 @@ std::string XrdCtaFile::xCom_requestermountrule() { << "\tch --instance/-i <instance_name> --name/-n <user_name> [--mountpolicy/-u <policy_name>] [--comment/-m <\"comment\">]" << std::endl << "\trm --instance/-i <instance_name> --name/-n <user_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1263,6 +1287,9 @@ std::string XrdCtaFile::xCom_groupmountrule() { << "\tch --instance/-i <instance_name> --name/-n <user_name> [--mountpolicy/-u <policy_name>] [--comment/-m <\"comment\">]" << std::endl << "\trm --instance/-i <instance_name> --name/-n <user_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1329,6 +1356,9 @@ std::string XrdCtaFile::xCom_mountpolicy() { << "\t [--minretrieverequestage/--ra <minRequestAge>] [--maxdrivesallowed/-d <maxDrivesAllowed>] [--comment/-m <\"comment\">]" << std::endl << "\trm --name/-n <mountpolicy_name>" << std::endl << "\tls [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1415,6 +1445,9 @@ std::string XrdCtaFile::xCom_repack() { << "\trm --vid/-v <vid>" << std::endl << "\tls [--header/-h] [--vid/-v <vid>]" << std::endl << "\terr --vid/-v <vid>" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1518,6 +1551,9 @@ std::string XrdCtaFile::xCom_shrink() { std::stringstream cmdlineOutput; std::stringstream help; help << m_requestTokens.at(0) << " sh/shrink --tapepool/-t <tapepool_name>" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> tapepool = getOptionStringValue("-t", "--tapepool", true, false); checkOptions(help.str()); m_scheduler->shrink(m_cliIdentity, tapepool.value()); @@ -1535,6 +1571,9 @@ std::string XrdCtaFile::xCom_verify() { << "\trm --vid/-v <vid>" << std::endl << "\tls [--header/-h] [--vid/-v <vid>]" << std::endl << "\terr --vid/-v <vid>" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1612,6 +1651,9 @@ std::string XrdCtaFile::xCom_archivefile() { std::stringstream help; help << m_requestTokens.at(0) << " af/archivefile ls [--header/-h] [--id/-I <archive_file_id>] [--diskid/-d <disk_id>] [--copynb/-c <copy_no>] [--vid/-v <vid>] [--tapepool/-t <tapepool>] " "[--owner/-o <owner>] [--group/-g <group>] [--storageclass/-s <class>] [--path/-p <fullpath>] [--instance/-i <instance>] [--summary/-S] [--all/-a] (default gives error)" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1695,6 +1737,9 @@ std::string XrdCtaFile::xCom_test() { << "\tread --drive/-d <drive_name> --vid/-v <vid> --firstfseq/-f <first_fseq> --lastfseq/-l <last_fseq> --checkchecksum/-c --output/-o <\"null\" or output_dir> [--tag/-t <tag_name>]" << std::endl << "\twrite --drive/-d <drive_name> --vid/-v <vid> --file/-f <filename> [--tag/-t <tag_name>]" << std::endl << "\twrite_auto --drive/-d <drive_name> --vid/-v <vid> --number/-n <number_of_files> --size/-s <file_size> --input/-i <\"zero\" or \"urandom\"> [--tag/-t <tag_name>]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } if(m_requestTokens.size() < 3) { throw cta::exception::UserError(help.str()); } @@ -1791,14 +1836,17 @@ std::string XrdCtaFile::xCom_drive() { std::stringstream cmdlineOutput; std::stringstream help; help << m_requestTokens.at(0) << " dr/drive up/down/ls (it is a synchronous command):" << std::endl - << "Set the requested state of the drive. The drive will complete any running mount" << std::endl + << "Set the requested state of the drives. The drives will complete any running mount" << std::endl << "unless it is preempted with the --force option." << std::endl - << "\tup <drive_name>" << std::endl - << "\tdown <drive_name> [--force/-f]" << std::endl + << "\tup <drives_name>" << std::endl + << "\tdown <drives_name> [--force/-f]" << std::endl << "" << std::endl << "List the states for one or all drives" << std::endl << "\tls [<drive_name>]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } // We should have at least one sub command. {"cta", "dr/drive", "up/down/ls"}. if (m_requestTokens.size() < 3) throw cta::exception::UserError(help.str()); @@ -1806,8 +1854,7 @@ std::string XrdCtaFile::xCom_drive() { // Here the drive name is required in addition if (m_requestTokens.size() != 4) throw cta::exception::UserError(help.str()); - m_scheduler->setDesiredDriveState(m_cliIdentity, m_requestTokens.at(3), true, false, lc); - cmdlineOutput << "Drive " << m_requestTokens.at(3) << " set UP." << std::endl; + changeStateForDrivesByRegex(m_requestTokens.at(3), lc, cmdlineOutput, true, false); } else if ("down" == m_requestTokens.at(2)) { // Parse the command line for option and drive name. cta::utils::GetOpThreadSafe::Request req; @@ -1823,12 +1870,7 @@ std::string XrdCtaFile::xCom_drive() { } // Check if the force option was present. bool force=reply.options.size() && (reply.options.at(0).option == "f"); - m_scheduler->setDesiredDriveState(m_cliIdentity, reply.remainder.at(0), false, force, lc); - cmdlineOutput << "Drive " << reply.remainder.at(0) << " set DOWN"; - if (force) { - cmdlineOutput << " (forced down)"; - } - cmdlineOutput << "." << std::endl; + changeStateForDrivesByRegex(m_requestTokens.at(3), lc, cmdlineOutput, false, force); } else if ("ls" == m_requestTokens.at(2)) { if ((m_requestTokens.size() == 3) || (m_requestTokens.size() == 4)) { // We will dump all the drives, and select the one asked for if needed. @@ -1923,21 +1965,50 @@ std::string XrdCtaFile::xCom_drive() { return cmdlineOutput.str(); } +//------------------------------------------------------------------------------ +// changeStateForDrivesByRegex +//------------------------------------------------------------------------------ +void XrdCtaFile::changeStateForDrivesByRegex(const std::string ®ex, + log::LogContext &lc, std::stringstream &cmdlineOutput, const bool toMakeUp, + const bool isForceSet) { + cta::utils::Regex driveNameRegex(regex.c_str()); + auto driveStates = m_scheduler->getDriveStates(m_cliIdentity, lc); + bool drivesNotFound = true; + for (auto driveState: driveStates) { + const auto regexResult = driveNameRegex.exec(driveState.driveName); + if (regexResult.size()) { + m_scheduler->setDesiredDriveState(m_cliIdentity, driveState.driveName, toMakeUp, isForceSet, lc); + cmdlineOutput << "Drive " << driveState.driveName << " set " + << (toMakeUp?"Up":"Down") + << (isForceSet?" (forced)":"") + << "." << std::endl; + drivesNotFound = false; + } + } + if (drivesNotFound) { + cmdlineOutput << "Drives not found by regex: " << regex << std::endl; + } +} + //------------------------------------------------------------------------------ // xCom_listpendingarchives //------------------------------------------------------------------------------ std::string XrdCtaFile::xCom_listpendingarchives() { + log::LogContext lc(m_log); std::stringstream cmdlineOutput; std::stringstream help; help << m_requestTokens.at(0) << " lpa/listpendingarchives [--header/-h] [--tapepool/-t <tapepool_name>] [--extended/-x]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> tapepool = getOptionStringValue("-t", "--tapepool", false, false); bool extended = hasOption("-x", "--extended"); std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > result; if(!tapepool) { - result = m_scheduler->getPendingArchiveJobs(); + result = m_scheduler->getPendingArchiveJobs(lc); } else { - std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler->getPendingArchiveJobs(tapepool.value()); + std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler->getPendingArchiveJobs(tapepool.value(), lc); if(list.size()>0) { result[tapepool.value()] = list; } @@ -1985,6 +2056,7 @@ std::string XrdCtaFile::xCom_listpendingarchives() { cmdlineOutput << formatResponse(responseTable, hasOption("-h", "--header")); } } + m_suppressOptionalOptionsWarning = true; return cmdlineOutput.str(); } @@ -1994,15 +2066,19 @@ std::string XrdCtaFile::xCom_listpendingarchives() { std::string XrdCtaFile::xCom_listpendingretrieves() { std::stringstream cmdlineOutput; std::stringstream help; + log::LogContext lc(m_log); help << m_requestTokens.at(0) << " lpr/listpendingretrieves [--header/-h] [--vid/-v <vid>] [--extended/-x]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> vid = getOptionStringValue("-v", "--vid", false, false); bool extended = hasOption("-x", "--extended"); std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > result; if(!vid) { - result = m_scheduler->getPendingRetrieveJobs(); + result = m_scheduler->getPendingRetrieveJobs(lc); } else { - std::list<cta::common::dataStructures::RetrieveJob> list = m_scheduler->getPendingRetrieveJobs(vid.value()); + std::list<cta::common::dataStructures::RetrieveJob> list = m_scheduler->getPendingRetrieveJobs(vid.value(), lc); if(list.size()>0) { result[vid.value()] = list; } @@ -2049,6 +2125,7 @@ std::string XrdCtaFile::xCom_listpendingretrieves() { cmdlineOutput << formatResponse(responseTable, hasOption("-h", "--header")); } } + m_suppressOptionalOptionsWarning = true; return cmdlineOutput.str(); } @@ -2059,6 +2136,9 @@ std::string XrdCtaFile::xCom_showqueues() { std::stringstream cmdlineOutput; std::stringstream help; help << m_requestTokens.at(0) << " sq/showqueues [--header/-h]" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } log::LogContext lc(m_log); auto queuesAndMounts=m_scheduler->getQueuesAndMountSummaries(lc); if (queuesAndMounts.size()) { @@ -2118,6 +2198,9 @@ std::string XrdCtaFile::xCom_archive() { << "\t--checksumvalue <checksum_value> --storageclass <storage_class> --diskfilepath <disk_filepath> --diskfileowner <disk_fileowner>" << std::endl << "\t--diskfilegroup <disk_filegroup> --recoveryblob <recovery_blob> --reportURL <reportURL>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); optional<std::string> diskid = getOptionStringValue("", "--diskid", true, false); @@ -2168,6 +2251,9 @@ std::string XrdCtaFile::xCom_retrieve() { help << m_requestTokens.at(0) << " r/retrieve --user <user> --group <group> --id <CTA_ArchiveFileID> --dsturl <dst_URL> --diskfilepath <disk_filepath>" << std::endl << "\t--diskfileowner <disk_fileowner> --diskfilegroup <disk_filegroup> --recoveryblob <recovery_blob>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); optional<uint64_t> id = getOptionUint64Value("", "--id", true, false); @@ -2205,6 +2291,9 @@ std::string XrdCtaFile::xCom_deletearchive() { std::stringstream help; help << m_requestTokens.at(0) << " da/deletearchive --user <user> --group <group> --id <CTA_ArchiveFileID>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); optional<uint64_t> id = getOptionUint64Value("", "--id", true, false); @@ -2232,6 +2321,9 @@ std::string XrdCtaFile::xCom_cancelretrieve() { help << m_requestTokens.at(0) << " cr/cancelretrieve --user <user> --group <group> --id <CTA_ArchiveFileID> --dsturl <dst_URL> --diskfilepath <disk_filepath>" << std::endl << "\t--diskfileowner <disk_fileowner> --diskfilegroup <disk_filegroup> --recoveryblob <recovery_blob>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); optional<uint64_t> id = getOptionUint64Value("", "--id", true, false); @@ -2267,6 +2359,9 @@ std::string XrdCtaFile::xCom_updatefilestorageclass() { help << m_requestTokens.at(0) << " ufsc/updatefilestorageclass --user <user> --group <group> --id <CTA_ArchiveFileID> --storageclass <storage_class> --diskfilepath <disk_filepath>" << std::endl << "\t--diskfileowner <disk_fileowner> --diskfilegroup <disk_filegroup> --recoveryblob <recovery_blob>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); optional<uint64_t> id = getOptionUint64Value("", "--id", true, false); @@ -2302,6 +2397,9 @@ std::string XrdCtaFile::xCom_updatefileinfo() { help << m_requestTokens.at(0) << " ufi/updatefileinfo --id <CTA_ArchiveFileID> --diskfilepath <disk_filepath>" << std::endl << "\t--diskfileowner <disk_fileowner> --diskfilegroup <disk_filegroup> --recoveryblob <recovery_blob>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<uint64_t> id = getOptionUint64Value("", "--id", true, false); optional<std::string> diskfilepath = getOptionStringValue("", "--diskfilepath", true, false); optional<std::string> diskfileowner = getOptionStringValue("", "--diskfileowner", true, false); @@ -2328,6 +2426,9 @@ std::string XrdCtaFile::xCom_liststorageclass() { std::stringstream help; help << m_requestTokens.at(0) << " lsc/liststorageclass --user <user> --group <group>" << std::endl << "\tNote: apply the postfix \":base64\" to long option names whose values are base64 encoded" << std::endl; + if (hasOption("-?", "--help")) { + return help.str(); + } optional<std::string> user = getOptionStringValue("", "--user", true, false); optional<std::string> group = getOptionStringValue("", "--group", true, false); checkOptions(help.str()); @@ -2380,6 +2481,8 @@ std::string XrdCtaFile::getGenericHelp(const std::string &programName) const { help << programName << " updatefileinfo/ufi" << std::endl; help << programName << " updatefilestorageclass/ufsc" << std::endl; help << "" << std::endl; + help << "Option for printing command usage: " << programName << " command -?/--help" << std::endl; + help << "" << std::endl; help << "Special option for running " << programName << " within the EOS worflow engine:" << std::endl; help << "" << std::endl; help << programName << " ... --stderr" << std::endl; diff --git a/xroot_plugins/XrdCtaFile.hpp b/xroot_plugins/XrdCtaFile.hpp index cef1f0cca3a74c09e50bc93c52441403ae4659b0..4b843874c1970127cd90002cf3afc8e616712632 100644 --- a/xroot_plugins/XrdCtaFile.hpp +++ b/xroot_plugins/XrdCtaFile.hpp @@ -362,7 +362,20 @@ protected: * Checks whether the user that issued the admin command is an authorized admin (throws an exception if it's not). */ void authorizeAdmin(); - + + /** + * Changes state for the drives by a given regex. + * + * @param regex The regex to be used to match drive name. + * @param lc The log context. + * @param cmdlineOutput The string stream to stream output. + * @param isMakeUp The desired state for the drives Up or Down. + * @param isForceSet The boolean value for force parameter. + */ + void changeStateForDrivesByRegex(const std::string ®ex, + log::LogContext &lc, std::stringstream &cmdlineOutput, const bool isMakeUp, + const bool isForceSet); + /** * Returns the response string properly formatted in a table *