Commit e26dfdbd authored by David COME's avatar David COME
Browse files

Added NULL as sentinel for the writing process

And removed functions that were previously removed from DiskWriteTaskInterface
parent 2f3cc5e3
......@@ -72,21 +72,9 @@ public:
* @param mm: memory manager of the session
*/
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,MemoryManager& mm):
m_blockCount(blockID(*file)),m_fifo(m_blockCount),m_recallingFile(file),m_memManager(mm){
mm.addClient(&m_fifo);
m_recallingFile(file),m_memManager(mm){
}
/**
* Return the number of files to write to disk
* @return always 1
*/
virtual int files() { return 1; };
/**
* @return the number of memory blocks to be used
*/
virtual int blocks() { return m_blockCount; }
/**
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
......@@ -98,23 +86,27 @@ public:
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
while(!m_fifo.finished()) {
MemBlock *mb = m_fifo.popDataBlock();
MemBlock *mb = m_fifo.pop();
while(NULL!=mb){
AutoReleaseBlock releaser(mb,m_memManager);
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock){
|| blockId != mb->m_fileBlock || mb->m_failled ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("expectedFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("receivedFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expectedFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("receivedFBLOCKId", mb->m_fileBlock)),
};
tape::utils::suppresUnusedVariable(sp);
LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failled))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
mb->m_payload.write(ourFile);
mb = m_fifo.pop();
blockId++;
}
ourFile.close();
reporter.reportCompletedJob(*m_recallingFile);
......@@ -130,7 +122,7 @@ public:
* Allows client code to return a reusable memory block
* @return the pointer to the memory block that can be reused
*/
virtual MemBlock *getFreeBlock() { return m_fifo.getFreeBlock(); }
virtual MemBlock *getFreeBlock() { return m_fifo.pop(); }
/**
* Function used to enqueue a new memory block holding data to be written to disk
......@@ -138,16 +130,9 @@ public:
*/
virtual void pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.pushDataBlock(mb);
}
/**
* Function used to wait until the end of the write
*/
virtual void waitCompletion() {
volatile castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.push(mb);
}
/**
* Destructor (also waiting for the end of the write operation)
*/
......@@ -156,16 +141,11 @@ public:
}
private:
/**
* Number of blocks in the fifo
*/
const int m_blockCount;
/**
* The fifo containing the memory blocks holding data to be written to disk
*/
DataFifo m_fifo;
castor::tape::threading::BlockingQueue<MemBlock *> m_fifo;
/**
* All we need to know about the file we are currently recalling
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment