Commit 8048e69b authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Fixed bugs that prevented archivals to be reported

parent 3c603051
......@@ -26,6 +26,7 @@
#include <memory>
#include <shift/serrno.h>
#include <sstream>
#include <iostream>
#include <stdlib.h>
#include <strings.h>
#include <sys/types.h>
......
......@@ -68,7 +68,7 @@ struct TapeFileLocation {
/**
* The copy number of the tape file. TODO: to be put in the mount object in the future
*/
uint8_t copyNb;
uint16_t copyNb;
}; // struct TapeFileLocation
......
......@@ -233,8 +233,7 @@ cta::NameServerTapeFile cta::MockNameServer::fromStringToNameServerTapeFile(cons
// addTapeFile
//------------------------------------------------------------------------------
void cta::MockNameServer::addTapeFile(const SecurityIdentity &requester, const std::string &path, const NameServerTapeFile &tapeFile) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_mutex);
Utils::assertAbsolutePathSyntax(path);
const std::string fsPath = m_fsDir + path;
assertFsFileExists(fsPath);
......@@ -326,6 +325,7 @@ cta::MockNameServer::MockNameServer(): m_deleteOnExit(true) {
// constructor
//------------------------------------------------------------------------------
cta::MockNameServer::MockNameServer(const std::string &path): m_fsDir(path), m_deleteOnExit(false) {
umask(0);
assertBasePathAccessible();
Utils::assertAbsolutePathSyntax(path);
assertFsDirExists(path);
......@@ -398,7 +398,7 @@ void cta::MockNameServer::createFile(
const std::string fsPath = m_fsDir + path;
assertFsPathDoesNotExist(fsPath);
SmartFd fd(open(fsPath.c_str(), O_WRONLY|O_CREAT|O_NOCTTY|O_NONBLOCK, 0600));
SmartFd fd(open(fsPath.c_str(), O_WRONLY|O_CREAT|O_NOCTTY|O_NONBLOCK, 0666));
if(0 > fd.get()) {
const int savedErrno = errno;
std::ostringstream msg;
......@@ -507,7 +507,7 @@ void cta::MockNameServer::createDir(const SecurityIdentity &requester,
inheritedStorageClass = getDirStorageClass(requester, enclosingPath);
const std::string fsPath = m_fsDir + path;
if(mkdir(fsPath.c_str(), 0755)) {
if(mkdir(fsPath.c_str(), 0777)) {
const int savedErrno = errno;
std::ostringstream msg;
msg << __FUNCTION__ << " - mkdir " << path << " error. Reason: \n" <<
......
......@@ -93,7 +93,13 @@ void cta::objectstore::Agent::removeAndUnregisterSelf() {
checkPayloadWritable();
// Check that we are not empty
if (!isEmpty()) {
throw AgentStillOwnsObjects("In Agent::deleteAndUnregisterSelf: agent still owns objects");
std::list<std::string> ownershipList = getOwnershipList();
std::stringstream exSs;
exSs << "In Agent::deleteAndUnregisterSelf: agent still owns objects. Here's the list:";
for(auto i=ownershipList.begin(); i!=ownershipList.end(); i++) {
exSs << " " << *i;
}
throw AgentStillOwnsObjects(exSs.str());
}
// First delete ourselves
remove();
......
......@@ -1456,6 +1456,10 @@ void OStoreDB::ArchiveMount::complete(time_t completionTime) {
t.fetch();
t.releaseBusy();
t.commit();
objectstore::ScopedExclusiveLock agl(m_agent);
m_agent.fetch();
m_agent.removeFromOwnership(t.getAddressIfSet());
m_agent.commit();
}
OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress,
......@@ -1577,6 +1581,10 @@ void OStoreDB::RetrieveMount::complete(time_t completionTime) {
t.fetch();
t.releaseBusy();
t.commit();
objectstore::ScopedExclusiveLock agl(m_agent);
m_agent.fetch();
m_agent.removeFromOwnership(t.getAddressIfSet());
m_agent.commit();
}
......
......@@ -85,8 +85,8 @@ void MigrationReportPacker::reportFlush(drive::compressionStats compressStats){
//reportEndOfSession
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSession() {
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
......@@ -111,44 +111,11 @@ void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker){
reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
}
//------------------------------------------------------------------------------
//ReportFlush::computeCompressedSize
//------------------------------------------------------------------------------
//void MigrationReportPacker::ReportFlush::computeCompressedSize(
//std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator beg,
//std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
//)
//{
// //lets pray for C++11 lamba and std accumulate
// uint64_t rawSize = 0;
// for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator it = beg;
// it != end ;++it){
// rawSize+=(*it)->fileSize();
// }
//
// uint64_t nbByteWritenWithCompression = m_compressStats.toTape;
// //we dont want compressionRatio to be equal to zero not to have a division by zero
// double compressionRatio = nbByteWritenWithCompression>0 && rawSize >0 ?
// 1.0*nbByteWritenWithCompression/rawSize : 1.;
//
// for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator it = beg;
// it != end ;++it){
// const uint64_t compressedFileSize =
// static_cast<uint64_t>((*it)->fileSize() * compressionRatio);
//
// // The compressed file size should never be reported as being less than 1
// // byte
// uint64_t validCompressedFileSize = 0 < compressedFileSize ? compressedFileSize : 1;
//
// (*it)->setCompressedFileSize(validCompressedFileSize);
// }
//}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker){
if(!reportPacker.m_errorHappened){
// We can receive double flushes when the periodic flush happens
// right before the end of session (which triggers also a flush)
......@@ -221,7 +188,6 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker){
if(reportPacker.m_errorHappened) {
reportPacker.m_archiveMount->complete();
log::ScopedParamContainer sp(reportPacker.m_lc);
......@@ -275,7 +241,7 @@ void MigrationReportPacker::WorkerThread::run(){
rep->execute(m_parent);
}
catch(const failedMigrationRecallResult& e){
//here we catch a failed report MigrationResult. We try to close and it that fails too
//here we catch a failed report MigrationResult. We try to close and if that fails too
//we end up in the catch below
m_parent.m_archiveMount->complete();
m_parent.m_lc.log(LOG_INFO,"Successfully closed client's session after the failed report MigrationResult");
......@@ -290,7 +256,39 @@ void MigrationReportPacker::WorkerThread::run(){
catch(const castor::exception::Exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
m_parent.m_lc.log(LOG_ERR,"tried to report endOfSession(WithError) and got an exception, cant do much more");
std::stringstream ssEx;
ssEx << "Tried to report endOfSession or endofSessionWithErrors and got a castor exception, cant do much more. The exception is the following: " << e.getMessageValue();
m_parent.m_lc.log(LOG_ERR, ssEx.str());
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
}
} catch(const cta::exception::Exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
std::stringstream ssEx;
ssEx << "Tried to report endOfSession or endofSessionWithErrors and got a CTA exception, cant do much more. The exception is the following: " << e.getMessageValue();
m_parent.m_lc.log(LOG_ERR, ssEx.str());
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
}
} catch(const std::exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
std::stringstream ssEx;
ssEx << "Tried to report endOfSession or endofSessionWithErrors and got a standard exception, cant do much more. The exception is the following: " << e.what();
m_parent.m_lc.log(LOG_ERR, ssEx.str());
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
}
} catch(...){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
std::stringstream ssEx;
ssEx << "Tried to report endOfSession or endofSessionWithErrors and got an unknown exception, cant do much more.";
m_parent.m_lc.log(LOG_ERR, ssEx.str());
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment