Commit 105f3c5b authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Fixed 4 bugs that are preventing DataTransferSessionGooddayRecall test to...

Fixed 4 bugs that are preventing DataTransferSessionGooddayRecall test to succeed. One more left: next commit will do
parent 66d77dbe
......@@ -26,6 +26,7 @@ namespace cta {
ArchiveFile::ArchiveFile():
fileId(0),
size(0),
checksum(0),
lastModificationTime(0) {
}
......@@ -33,7 +34,7 @@ ArchiveFile::ArchiveFile():
// constructor
//------------------------------------------------------------------------------
ArchiveFile::ArchiveFile(const std::string & path, const std::string & nsHostName, uint64_t fileId,
uint64_t size, const Checksum & checksum, const time_t lastModificationTime):
uint64_t size, const uint32_t checksum, const time_t lastModificationTime):
path(path),
nsHostName(nsHostName),
fileId(fileId),
......
......@@ -44,7 +44,7 @@ public:
* @param lastModificationTime The last modification time of the file
*/
ArchiveFile(const std::string & path, const std::string & nsHostName, uint64_t fileId,
uint64_t size, const Checksum & checksum, const time_t lastModificationTime);
uint64_t size, const uint32_t checksum, const time_t lastModificationTime);
/**
* The location of the file at NS lookup time
......@@ -69,7 +69,7 @@ public:
/**
* The file checksum
*/
Checksum checksum;
uint32_t checksum;
/**
* The last modification time of the file
......
......@@ -22,6 +22,7 @@
// constructor
//------------------------------------------------------------------------------
cta::ArchiveFileStatus::ArchiveFileStatus():
fileId(0),
mode(0),
size(0) {
}
......@@ -31,11 +32,13 @@ cta::ArchiveFileStatus::ArchiveFileStatus():
//------------------------------------------------------------------------------
cta::ArchiveFileStatus::ArchiveFileStatus(
const UserIdentity &owner,
const uint64_t fileId,
const mode_t mode,
const uint64_t size,
const Checksum &checksum,
const std::string &storageClassName):
owner(owner),
fileId(fileId),
mode(mode),
size(size),
checksum(checksum),
......
......@@ -50,6 +50,7 @@ struct ArchiveFileStatus {
*/
ArchiveFileStatus(
const UserIdentity &owner,
const uint64_t fileId,
const mode_t mode,
const uint64_t size,
const Checksum &checksum,
......@@ -59,7 +60,12 @@ struct ArchiveFileStatus {
* The identity of the owner.
*/
UserIdentity owner;
/**
* The nameserver file ID
*/
uint64_t fileId;
/**
* The mode bits of the directory entry.
*/
......@@ -67,8 +73,6 @@ struct ArchiveFileStatus {
/**
* Returns the size of the file in bytes.
*
* @return The size of the file in bytes.
*/
uint64_t size;
......
......@@ -269,7 +269,7 @@ std::unique_ptr<cta::ArchiveFileStatus> cta::CastorNameServer::statFile(
exception::Serrnum::throwOnMinusOne(Cns_queryclass(m_server.c_str(), statbuf.fileclass, NULL, &cns_fileclass), __FUNCTION__);
const std::string storageClassName(cns_fileclass.name);
return std::unique_ptr<ArchiveFileStatus>(
new ArchiveFileStatus(owner, mode, size, checksum, storageClassName));
new ArchiveFileStatus(owner, statbuf.fileid, mode, size, checksum, storageClassName));
}
//------------------------------------------------------------------------------
......@@ -335,7 +335,7 @@ cta::ArchiveDirEntry cta::CastorNameServer::getArchiveDirEntry(const SecurityIde
const UserIdentity owner(statbuf.uid, statbuf.gid);
const Checksum checksum(Checksum::CHECKSUMTYPE_ADLER32, std::string(statbuf.csumvalue));
const uint64_t size(statbuf.filesize);
ArchiveFileStatus status(owner, statbuf.filemode, size, checksum, storageClassName);
ArchiveFileStatus status(owner, statbuf.fileid, statbuf.filemode, size, checksum, storageClassName);
const std::list<NameServerTapeFile> tapeCopies = getTapeFiles(requester, path);
return ArchiveDirEntry(entryType, name, status, tapeCopies);
......
......@@ -289,7 +289,7 @@ void cta::MockNameServer::deleteTapeFile(const SecurityIdentity &requester, cons
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::MockNameServer::MockNameServer() {
cta::MockNameServer::MockNameServer(): m_fileIdCounter(0) {
umask(0);
char path[100];
strncpy(path, "/tmp/CTATmpFsXXXXXX", 100);
......@@ -387,6 +387,12 @@ void cta::MockNameServer::createFile(
}
Utils::setXattr(fsPath.c_str(), "user.CTATapeFileCopyOne", "");
Utils::setXattr(fsPath.c_str(), "user.CTATapeFileCopyTwo", "");
std::stringstream sizeString;
sizeString << size;
Utils::setXattr(fsPath.c_str(), "user.CTASize", sizeString.str());
std::stringstream fileIDString;
fileIDString << ++m_fileIdCounter;
Utils::setXattr(fsPath.c_str(), "user.CTAFileID", fileIDString.str());
}
//------------------------------------------------------------------------------
......@@ -623,9 +629,10 @@ cta::ArchiveDirEntry cta::MockNameServer::getArchiveDirEntry(
const UserIdentity owner = getOwner(requester, path);
const Checksum checksum;
const uint64_t size = 1234;
ArchiveFileStatus status(owner, statResult.st_mode, size, checksum,
storageClassName);
const std::string fsPath = m_fsDir + path;
const uint64_t size = atol(Utils::getXattr(fsPath, "user.CTASize").c_str());
const uint64_t fileId = atol(Utils::getXattr(fsPath, "user.CTAFileID").c_str());
ArchiveFileStatus status(owner, fileId, statResult.st_mode, size, checksum, storageClassName);
return ArchiveDirEntry(entryType, name, status, tapeCopies);
}
......
......@@ -23,6 +23,7 @@
#include "nameserver/NameServer.hpp"
#include "common/SecurityIdentity.hpp"
#include <cstdatomic>
#include <list>
#include <string>
......@@ -221,6 +222,11 @@ private:
* @return The next unique numeric identifier for a new storage class.
*/
uint32_t getNextStorageClassId() const;
/**
* Counter for file ID of new files
*/
std::atomic<uint64_t> m_fileIdCounter;
}; // class MockNameServer
......
......@@ -19,6 +19,7 @@
#include "RetrieveToFileRequest.hpp"
#include "GenericObject.hpp"
#include "CreationLog.hpp"
#include "objectstore/cta.pb.h"
cta::objectstore::RetrieveToFileRequest::RetrieveToFileRequest(
const std::string& address, Backend& os):
......@@ -54,14 +55,26 @@ void cta::objectstore::RetrieveToFileRequest::addJob(const cta::TapeFileLocation
}
void cta::objectstore::RetrieveToFileRequest::setArchiveFile(
const std::string& archiveFile) {
const cta::ArchiveFile& archiveFile) {
checkPayloadWritable();
m_payload.set_archivefile(archiveFile);
auto *af = m_payload.mutable_archivefile();
af->set_checksum(archiveFile.checksum);
af->set_fileid(archiveFile.fileId);
af->set_lastmodificationtime(archiveFile.lastModificationTime);
af->set_nshostname(archiveFile.nsHostName);
af->set_path(archiveFile.path);
af->set_size(archiveFile.size);
}
std::string cta::objectstore::RetrieveToFileRequest::getArchiveFile() {
cta::ArchiveFile cta::objectstore::RetrieveToFileRequest::getArchiveFile() {
checkPayloadReadable();
return m_payload.archivefile();
auto checksum = m_payload.archivefile().checksum();
auto fileId = m_payload.archivefile().fileid();
auto lastModificationTime = m_payload.archivefile().lastmodificationtime();
auto nsHostName = m_payload.archivefile().nshostname();
auto path = m_payload.archivefile().path();
auto size = m_payload.archivefile().size();
return ArchiveFile{path, nsHostName, fileId, size, checksum, lastModificationTime};
}
void cta::objectstore::RetrieveToFileRequest::setRemoteFile(
......
......@@ -21,6 +21,7 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include <list>
#include "common/archiveNS/ArchiveFile.hpp"
#include "common/archiveNS/TapeFileLocation.hpp"
namespace cta { namespace objectstore {
......@@ -71,8 +72,8 @@ public:
void setSuccessful();
void setFailed();
// ===========================================================================
void setArchiveFile(const std::string & archiveFile);
std::string getArchiveFile();
void setArchiveFile(const cta::ArchiveFile & archiveFile);
cta::ArchiveFile getArchiveFile();
void setRemoteFile (const std::string & remoteFile);
std::string getRemoteFile();
void setPriority (uint64_t priority);
......
......@@ -342,9 +342,18 @@ message RetrieveJobEntry {
required uint32 retrieswithinmount = 4707;
}
message ArchiveFile {
required string path = 4750;
required string nsHostName = 4751;
required uint64 fileId = 4752;
required uint64 size = 4753;
required uint32 checksum = 4754;
required uint64 lastModificationTime = 4755;
}
message RetrieveToFileRequest {
required string remotefile = 4800;
required string archivefile = 4801;
required ArchiveFile archivefile = 4801;
required uint64 size = 4802;
repeated RetrieveJobEntry jobs = 4803;
required uint64 priority = 4804;
......
......@@ -961,7 +961,7 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
rtfr.setRemoteFile(rqst.getRemoteFile());
rtfr.setPriority(rqst.priority);
rtfr.setCreationLog(rqst.creationLog);
rtfr.setSize(rqst.getSize());
rtfr.setSize(rqst.getArchiveFile().size);
// We will need to identity tapes is order to construct the request.
// First load all the tapes information in a memory map
std::map<std::string, std::string> vidToAddress;
......@@ -998,11 +998,15 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
uint16_t selectedCopyNumber;
uint64_t bestTapeQueuedBytes;
std::string selectedVid;
uint64_t selectedFseq;
uint64_t selectedBlockid;
{
// First tape copy is always better than nothing.
auto tc=rqst.getTapeCopies().begin();
selectedCopyNumber = tc->copyNb;
selectedVid = tc->vid;
selectedFseq = tc->fSeq;
selectedBlockid = tc->blockId;
// Get info for the tape.
{
objectstore::Tape t(vidToAddress.at(tc->vid), m_objectStore);
......@@ -1020,6 +1024,8 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
bestTapeQueuedBytes = t.getJobsSummary().bytes;
selectedCopyNumber = tc->copyNb;
selectedVid = tc->vid;
selectedFseq = tc->fSeq;
selectedBlockid = tc->blockId;
}
}
}
......@@ -1032,7 +1038,9 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
jd.copyNb = selectedCopyNumber;
jd.tape = selectedVid;
jd.tapeAddress = vidToAddress.at(selectedVid);
tp.addJob(jd, rtfr.getAddressIfSet(), rqst.getSize(), rqst.priority, rqst.creationLog.time);
jd.fseq = selectedFseq;
jd.blockid = selectedBlockid;
tp.addJob(jd, rtfr.getAddressIfSet(), rqst.getArchiveFile().size, rqst.priority, rqst.creationLog.time);
tp.commit();
}
// The request is now fully set. It belongs to the tape.
......@@ -1508,10 +1516,9 @@ auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:
// We can commit and release the tape pool lock, we will only fill up
// memory structure from here on.
t.commit();
privateRet->archiveFile.path = privateRet->m_rtfr.getArchiveFile();
privateRet->archiveFile = privateRet->m_rtfr.getArchiveFile();
privateRet->remoteFile = privateRet->m_rtfr.getRemoteFile();
objectstore::RetrieveToFileRequest::JobDump jobDump=
privateRet->m_rtfr.getJob(privateRet->m_copyNb);
objectstore::RetrieveToFileRequest::JobDump jobDump = privateRet->m_rtfr.getJob(privateRet->m_copyNb);
privateRet->nameServerTapeFile.tapeFileLocation.fSeq = jobDump.fseq;
privateRet->nameServerTapeFile.tapeFileLocation.blockId = jobDump.blockid;
privateRet->nameServerTapeFile.tapeFileLocation.copyNb = privateRet->m_copyNb;
......@@ -1525,7 +1532,32 @@ auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:
}
void OStoreDB::RetrieveMount::complete(time_t completionTime) {
throw NotImplemented("In OStoreDB::RetrieveMount::complete: not implemented");
// When the session is complete, we can reset the status of the tape and the
// drive
// Reset the drive
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
objectstore::ScopedExclusiveLock drl(dr);
dr.fetch();
// Reset the drive state.
dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary,
objectstore::DriveRegister::DriveStatus::Up, completionTime,
objectstore::DriveRegister::MountType::NoMount, 0,
0, 0, 0, "", "");
dr.commit();
// Find the tape and unbusy it.
objectstore::TapePool tp (re.getTapePoolAddress(mountInfo.tapePool), m_objectStore);
rel.release();
objectstore::ScopedSharedLock tpl(tp);
tp.fetch();
objectstore::Tape t(tp.getTapeAddress(mountInfo.vid), m_objectStore);
objectstore::ScopedExclusiveLock tl(t);
tpl.release();
t.fetch();
t.releaseBusy();
t.commit();
}
......
......@@ -87,7 +87,10 @@ std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
// complete
//------------------------------------------------------------------------------
void cta::RetrieveMount::complete() {
throw NotImplemented(std::string(__FUNCTION__) + ": Not implemented");
// Just set the session as complete in the DB.
m_dbMount->complete(time(NULL));
// and record we are done with the mount
m_sessionRunning = false;
}
//------------------------------------------------------------------------------
......
......@@ -18,6 +18,7 @@
#pragma once
#include "common/archiveNS/ArchiveFile.hpp"
#include "common/archiveNS/TapeFileLocation.hpp"
#include "common/CreationLog.hpp"
......@@ -33,7 +34,7 @@ namespace cta {
struct RetrieveRequestDump {
uint64_t priority; /**< The priority of the request. */
CreationLog creationLog; /**< The time at which the request was created. */
std::string archiveFile; /**< he full path of the source archive file. */
cta::ArchiveFile archiveFile; /**< he full path of the source archive file. */
uint64_t activeCopyNb; /**< The tape copy number currenty considered for retrieve. */
std::list<TapeFileLocation> tapeCopies; /**<The location of the copies on tape. */
std::string remoteFile; /**< The URL of the destination remote file. */
......
......@@ -35,15 +35,13 @@ cta::RetrieveToFileRequest::~RetrieveToFileRequest() throw() {
// constructor
//------------------------------------------------------------------------------
cta::RetrieveToFileRequest::RetrieveToFileRequest(
const std::string &archiveFile,
const uint64_t size,
const cta::ArchiveFile &archiveFile,
const std::list<cta::TapeFileLocation> &tapeCopies,
const std::string &remoteFile,
const uint64_t priority,
const CreationLog & creationLog):
RetrieveRequest(priority, creationLog),
m_archiveFile(archiveFile),
m_size(size),
m_tapeCopies(tapeCopies),
m_remoteFile(remoteFile){
}
......@@ -51,17 +49,10 @@ cta::RetrieveToFileRequest::RetrieveToFileRequest(
//------------------------------------------------------------------------------
// getArchiveFile
//------------------------------------------------------------------------------
const std::string &cta::RetrieveToFileRequest::getArchiveFile() const throw() {
const cta::ArchiveFile &cta::RetrieveToFileRequest::getArchiveFile() const throw() {
return m_archiveFile;
}
//------------------------------------------------------------------------------
// getSize
//------------------------------------------------------------------------------
uint64_t cta::RetrieveToFileRequest::getSize() const throw() {
return m_size;
}
//------------------------------------------------------------------------------
// getTapeCopies
//------------------------------------------------------------------------------
......
......@@ -18,6 +18,7 @@
#pragma once
#include "common/archiveNS/ArchiveFile.hpp"
#include "scheduler/RetrieveRequest.hpp"
#include <list>
......@@ -48,35 +49,25 @@ public:
/**
* Constructor.
*
* @param archiveFile The full path of the source archive file.
* @param archiveFile The source archive file.
* @param tapeCopies The physical location(s) of the archive file on tape.
* @param remoteFile The URL of the destination remote file.
* @param priority The priority of the request.
* @param user The identity of the user who made the request.
* @param creationTime Optionally the absolute time at which the user request
* was created. If no value is given then the current time is used.
* @param creationLog The creation log parameters
*/
RetrieveToFileRequest(
const std::string &archiveFile,
const uint64_t size,
const cta::ArchiveFile &archiveFile,
const std::list<TapeFileLocation> &tapeCopies,
const std::string &remoteFile,
const uint64_t priority,
const CreationLog & creationLog);
/**
* Returns the full path of the source archive file.
* Returns the source archive file.
*
* @return The full path of the source archive file.
* @return The source archive file.
*/
const std::string &getArchiveFile() const throw();
/**
* Returns the size of the source archive file.
*
* @return The size of the source archive file.
*/
uint64_t getSize() const throw();
const cta::ArchiveFile &getArchiveFile() const throw();
/**
* Returns the physical location(s) of the archive file on tape.
......@@ -97,12 +88,8 @@ private:
/**
* The full path of the source archive file.
*/
std::string m_archiveFile;
/**
* The file size
*/
uint64_t m_size;
cta::ArchiveFile m_archiveFile;
/**
* The physical location(s) of the archive file on tape.
*/
......
......@@ -773,7 +773,8 @@ void cta::Scheduler::queueRetrieveRequest(
for (auto nstf = tapeCopies.begin(); nstf != tapeCopies.end(); nstf++) {
tcl.push_back(nstf->tapeFileLocation);
}
RetrieveToFileRequest rtfr (archiveFiles.front(), sourceStat->size,
cta::ArchiveFile archiveFile(archiveFiles.front(), "", sourceStat->fileId, sourceStat->size, *((uint32_t*)(sourceStat->checksum.getByteArray().getBytes())), 0); //nshostname is set to "" and lastmodificationtime is 0 because we don't need this info for retrieving the file
RetrieveToFileRequest rtfr (archiveFile,
tcl, remoteFile, 0, cl);
m_db.queue(rtfr);
}
......
......@@ -488,7 +488,8 @@ TEST_P(SchedulerDatabaseTest, getMountInfo) {
tcl.back().fSeq = 5;
tcl.back().vid = "Tape3";
tcl.back().copyNb = 2;
ASSERT_NO_THROW(db.queue(cta::RetrieveToFileRequest("cta:://cta/myfile", 1234, tcl, "eos://myeos/myeosfile", 10, cl)));
cta::ArchiveFile af("cta:://cta/myfile", "", 0, 1234, 0, 0);
ASSERT_NO_THROW(db.queue(cta::RetrieveToFileRequest(af, tcl, "eos://myeos/myeosfile", 10, cl)));
mountCandidates.reset(NULL);
ASSERT_NO_THROW(mountCandidates = db.getMountInfo());
{
......@@ -532,7 +533,8 @@ TEST_P(SchedulerDatabaseTest, getMountInfo) {
tcl2.back().fSeq = 3;
tcl2.back().vid = "Tape2";
tcl2.back().copyNb = 2;
db.queue(cta::RetrieveToFileRequest("cta:://cta/myfile2", 1234, tcl2, "eos://myeos/myeosfile2", 10, cl));
cta::ArchiveFile af2("cta:://cta/myfile2", "", 0, 1234, 0, 0);
db.queue(cta::RetrieveToFileRequest(af2, tcl2, "eos://myeos/myeosfile2", 10, cl));
mountCandidates.reset(NULL);
ASSERT_NO_THROW(mountCandidates = db.getMountInfo());
{
......
......@@ -2468,7 +2468,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
for(auto rqstItor = tapeRqsts.cbegin(); rqstItor != tapeRqsts.cend();
rqstItor++) {
remoteFiles.insert(rqstItor->remoteFile);
archiveFiles.insert(rqstItor->archiveFile);
archiveFiles.insert(rqstItor->archiveFile.path);
}
ASSERT_EQ(1, remoteFiles.size());
ASSERT_FALSE(remoteFiles.find(s_remoteTargetRawPath1) == remoteFiles.end());
......
......@@ -228,7 +228,7 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio
for (int fseq=1; fseq <= 10 ; fseq ++) {
// Create a path to a remote destination file
std::ostringstream remoteFilePath;
remoteFilePath << "file:" << m_tmpDir << "/test" << fseq;
remoteFilePath << "file://" << m_tmpDir << "/test" << fseq;
remoteFilePaths.push_back(remoteFilePath.str());
// Create an archive file entry in the archive namespace
......@@ -241,16 +241,17 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio
archiveFilePath.str(),
archiveFileMode,
archiveFileSize));
std::unique_ptr<cta::ArchiveFileStatus> status = ns.statFile(requester, archiveFilePath.str());
// Write the file to tape
std::unique_ptr<cta::RetrieveJob> ftr(new MockRetrieveJob());
std::unique_ptr<cta::ArchiveJob> ftm(new MockArchiveJob());
ftr->nameServerTapeFile.tapeFileLocation.fSeq = fseq;
ftm->nameServerTapeFile.tapeFileLocation.fSeq = fseq;
ftr->archiveFile.fileId = 1000 + fseq;
ftm->archiveFile.fileId = 1000 + fseq;
ftr->archiveFile.fileId = status->fileId;
ftm->archiveFile.fileId = status->fileId;
castor::tape::tapeFile::WriteFile wf(&ws, *ftm, archiveFileSize);
ftr->nameServerTapeFile.tapeFileLocation.blockId = wf.getPosition();
ftr->nameServerTapeFile.tapeFileLocation.blockId = wf.getBlockId();
ftr->remotePath = remoteFilePath.str();
// Write the data (one block)
wf.write(data, sizeof(data));
......@@ -261,7 +262,7 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio
cta::NameServerTapeFile tapeFile;
tapeFile.copyNb = 1;
tapeFile.tapeFileLocation.fSeq = fseq;
tapeFile.tapeFileLocation.blockId = wf.getPosition();
tapeFile.tapeFileLocation.blockId = wf.getBlockId();
tapeFile.tapeFileLocation.vid = volInfo.vid;
tapeFile.tapeFileLocation.copyNb = 1;
tapeFile.size = archiveFileSize;
......
......@@ -89,7 +89,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
// Synchronise the counter with the open time counter.
currentErrorToCount = "Error_diskOpenForWrite";
transferTime = localTime;
writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->archiveFile.path));
writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->remotePath));
URLcontext.add("actualURL", writeFile->URL());
lc.log(LOG_INFO, "Opened disk file for writing");
m_stats.openingTime+=localTime.secs(castor::utils::Timer::resetCounter);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment