Commit fc963e95 authored by Eric Cano's avatar Eric Cano
Browse files

Added recording and reporting of job failure reasons.

The error(s) gets recorded at failure time, and the list is printed at
job deletion time.
parent 65c592f7
......@@ -85,7 +85,7 @@ bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) {
}
bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId, log::LogContext & lc) {
uint64_t mountId, const std::string & failureReason, log::LogContext & lc) {
checkPayloadWritable();
// Find the job and update the number of failures
// (and return the job status: failed (true) or to be retried (false))
......@@ -99,6 +99,7 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
j.set_lastmountwithfailure(mountId);
}
j.set_totalretries(j.totalretries() + 1);
* j.mutable_failurelogs()->Add() = failureReason;
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::AJS_Failed);
......@@ -617,8 +618,12 @@ bool ArchiveRequest::finishIfNecessary(log::LogContext & lc) {
for (auto & j: jl)
if (!finishedStatuses.count(j.status()))
return false;
remove();
log::ScopedParamContainer params(lc);
size_t failureNumber = 0;
for (auto failure: getFailures()) {
params.add(std::string("failure")+std::to_string(failureNumber), failure);
}
remove();
params.add("archiveRequestObject", getAddressIfSet());
for (auto & j: jl) {
params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status()));
......@@ -637,5 +642,17 @@ std::string ArchiveRequest::dump() {
return headerDump;
}
std::list<std::string> ArchiveRequest::getFailures() {
checkPayloadReadable();
std::list<std::string> ret;
for (auto &j: m_payload.jobs()) {
for (auto &f: j.failurelogs()) {
ret.push_back(f);
}
}
return ret;
}
}} // namespace cta::objectstore
......@@ -48,7 +48,7 @@ public:
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId, log::LogContext &lc); //< returns true the job is failed
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId, const std::string & failureReason, log::LogContext &lc); //< returns true the job is failed
struct RetryStatus {
uint64_t retriesWithinMount = 0;
uint64_t maxRetriesWithinMount = 0;
......@@ -56,6 +56,7 @@ public:
uint64_t maxTotalRetries = 0;
};
RetryStatus getRetryStatus(uint16_t copyNumber);
std::list<std::string> getFailures();
serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber);
std::string statusToString(const serializers::ArchiveJobStatus & status);
bool finishIfNecessary(log::LogContext & lc);/**< Handling of the consequences of a job status change for the entire request.
......
......@@ -314,7 +314,8 @@ auto RetrieveRequest::getJobs() -> std::list<JobDump> {
//------------------------------------------------------------------------------
// RetrieveRequest::addJobFailure()
//------------------------------------------------------------------------------
bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, log::LogContext & lc) {
bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId,
const std::string & failureReason, log::LogContext & lc) {
checkPayloadWritable();
// Find the job and update the number of failures
// (and return the full request status: failed (true) or to be retried (false))
......@@ -329,6 +330,7 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, log::
j.set_lastmountwithfailure(mountId);
}
j.set_totalretries(j.totalretries() + 1);
* j.mutable_failurelogs()->Add() = failureReason;
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::RJS_Failed);
......@@ -580,5 +582,20 @@ void RetrieveRequest::AsyncJobDeleter::wait() {
m_backendDeleter->wait();
}
//------------------------------------------------------------------------------
// RetrieveRequest::getFailures()
//------------------------------------------------------------------------------
std::list<std::string> RetrieveRequest::getFailures() {
checkPayloadReadable();
std::list<std::string> ret;
for (auto &j: m_payload.jobs()) {
for (auto &f: j.failurelogs()) {
ret.push_back(f);
}
}
return ret;
}
}} // namespace cta::objectstore
......@@ -64,7 +64,8 @@ public:
AsyncJobDeleter * asyncDeleteJob();
JobDump getJob(uint16_t copyNb);
std::list<JobDump> getJobs();
bool addJobFailure(uint16_t copyNumber, uint64_t mountId, log::LogContext & lc); /**< Returns true is the request is completely failed
bool addJobFailure(uint16_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc);
/**< Returns true is the request is completely failed
(telling wheather we should requeue or not). */
struct RetryStatus {
uint64_t retriesWithinMount = 0;
......@@ -73,6 +74,7 @@ public:
uint64_t maxTotalRetries = 0;
};
RetryStatus getRetryStatus(uint16_t copyNumber);
std::list<std::string> getFailures();
std::string statusToString(const serializers::RetrieveJobStatus & status);
bool finishIfNecessary(log::LogContext & lc); /**< Handling of the consequences of a job status change for the entire request.
* This function returns true if the request got finished. */
......
......@@ -315,6 +315,7 @@ message ArchiveJob {
required uint64 lastmountwithfailure = 4407;
required uint32 maxtotalretries = 4408;
required uint32 maxretrieswithinmount = 4409;
repeated string failurelogs = 4410;
}
message ArchiveRequest {
......@@ -354,6 +355,7 @@ message RetrieveJob {
required uint32 totalretries = 9204;
required RetrieveJobStatus status = 9205;
required uint64 lastmountwithfailure = 9206;
repeated string failurelogs = 9207;
}
message RetrieveRequest {
......
......@@ -115,12 +115,12 @@ std::string cta::ArchiveJob::reportURL() {
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::ArchiveJob::failed(const cta::exception::Exception &ex, log::LogContext & lc) {
if (m_dbJob->fail(lc)) {
void cta::ArchiveJob::failed(const std::string &failureReason, log::LogContext & lc) {
if (m_dbJob->fail(failureReason, lc)) {
std::string base64ErrorReport;
// Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
const bool noNewLineInBase64Output = false;
CryptoPP::StringSource ss1(ex.getMessageValue(), true,
CryptoPP::StringSource ss1(failureReason, true,
new CryptoPP::Base64Encoder(
new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output));
std::string fullReportURL = m_dbJob->errorReportURL + base64ErrorReport;
......@@ -136,7 +136,7 @@ void cta::ArchiveJob::failed(const cta::exception::Exception &ex, log::LogConte
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("fullReportURL", fullReportURL)
.add("errorReport", ex.getMessageValue())
.add("errorReport", failureReason)
.add("reportTime", t.secs());
lc.log(log::INFO, "In ArchiveJob::failed(): reported error to client.");
} catch (cta::exception::Exception & ex) {
......@@ -144,7 +144,7 @@ void cta::ArchiveJob::failed(const cta::exception::Exception &ex, log::LogConte
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("errorReport", ex.getMessageValue())
.add("errorReport", failureReason)
.add("exceptionMsg", ex.getMessageValue())
.add("reportTime", t.secs());
lc.log(log::ERR, "In ArchiveJob::failed(): failed to report error to client.");
......
......@@ -102,16 +102,10 @@ public:
virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten();
/**
* Triggers a scheduler update following the failure of the job.
* The reason for the failure should have been set beforehand by calling
* setFailureReason(), but failure to do it is non-fatal (a standard error
* reason will be used)
* This 2 step approach allows the reason to be recorded fast in the
* tape writing thread, and the slow(er) update of the DB to be executed
* in a second thread.
*
* Triggers a scheduler update following the failure of the job. Retry policy will
* be applied by the scheduler.
*/
virtual void failed(const cta::exception::Exception &ex, log::LogContext & lc);
virtual void failed(const std::string &failureReason, log::LogContext & lc);
/**
* Get the URL used for reporting
......
......@@ -2703,14 +2703,14 @@ std::set<cta::SchedulerDatabase::ArchiveJob*> OStoreDB::ArchiveMount::setJobBatc
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::fail()
//------------------------------------------------------------------------------
bool OStoreDB::ArchiveJob::fail(log::LogContext & lc) {
bool OStoreDB::ArchiveJob::fail(const std::string& failureReason, log::LogContext& lc) {
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::ArchiveJob::fail: cannot fail a job not owned");
// Lock the archive request. Fail the job.
objectstore::ScopedExclusiveLock arl(m_archiveRequest);
m_archiveRequest.fetch();
// Add a job failure. If the job is failed, we will delete it.
if (m_archiveRequest.addJobFailure(tapeFile.copyNb, m_mountId, lc)) {
if (m_archiveRequest.addJobFailure(tapeFile.copyNb, m_mountId, failureReason, lc)) {
// The job will not be retried. Either another jobs for the same request is
// queued and keeps the request referenced or the request has been deleted.
// In any case, we can forget it.
......@@ -2819,14 +2819,14 @@ OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, OStoreDB & oSt
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::fail()
//------------------------------------------------------------------------------
bool OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
bool OStoreDB::RetrieveJob::fail(const std::string& failureReason, log::LogContext& logContext) {
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::RetrieveJob::fail: cannot fail a job not owned");
// Lock the retrieve request. Fail the job.
objectstore::ScopedExclusiveLock rrl(m_retrieveRequest);
m_retrieveRequest.fetch();
// Add a job failure. If the job is failed, we will delete it.
if (m_retrieveRequest.addJobFailure(selectedCopyNb, m_mountId, logContext)) {
if (m_retrieveRequest.addJobFailure(selectedCopyNb, m_mountId, failureReason, logContext)) {
// The job will not be retried. Either another jobs for the same request is
// queued and keeps the request referenced or the request has been deleted.
// In any case, we can forget it.
......@@ -2834,6 +2834,10 @@ bool OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
m_jobOwned = false;
log::ScopedParamContainer params(logContext);
params.add("object", m_retrieveRequest.getAddressIfSet());
size_t failureNumber=0;
for (auto failure: m_retrieveRequest.getFailures()) {
params.add(std::string("failure")+std::to_string(failureNumber++), failure);
}
logContext.log(log::ERR, "In OStoreDB::RetrieveJob::fail(): request was definitely failed and deleted.");
return true;
} else {
......@@ -2881,7 +2885,7 @@ bool OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
objectstore::ScopedExclusiveLock rql;
objectstore::Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, *m_oStoreDB.m_agentReference, bestVid, logContext);
auto rfqc = m_retrieveRequest.getRetrieveFileQueueCriteria();
auto & af=rfqc.archiveFile;
auto & af = rfqc.archiveFile;
auto & tf = af.tapeFiles.at(bestCopyNb);
auto sr = m_retrieveRequest.getSchedulerRequest();
std::list<objectstore::RetrieveQueue::JobToAdd> jta;
......
......@@ -168,7 +168,7 @@ public:
public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
bool fail(log::LogContext & lc) override;
bool fail(const std::string& failureReason, log::LogContext& lc) override;
private:
void asyncSucceed();
bool waitAsyncSucceed();
......@@ -215,7 +215,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
virtual bool fail(log::LogContext &) override; ///< Returns true if this failure is final (we will not retry).
bool fail(const std::string& failureReason, log::LogContext&) override;
virtual ~RetrieveJob() override;
private:
RetrieveJob(const std::string &, OStoreDB &, RetrieveMount &);
......
......@@ -61,12 +61,12 @@ void cta::RetrieveJob::checkComplete() {
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::RetrieveJob::failed(const std::string & errorReport, log::LogContext &lc) {
if (m_dbJob->fail(lc)) {
void cta::RetrieveJob::failed(const std::string & failureReason, log::LogContext &lc) {
if (m_dbJob->fail(failureReason, lc)) {
std::string base64ErrorReport;
// Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
const bool noNewLineInBase64Output = false;
CryptoPP::StringSource ss1(errorReport, true,
CryptoPP::StringSource ss1(failureReason, true,
new CryptoPP::Base64Encoder(
new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output));
std::string fullReportURL = m_dbJob->retrieveRequest.errorReportURL + base64ErrorReport;
......@@ -81,7 +81,7 @@ void cta::RetrieveJob::failed(const std::string & errorReport, log::LogContext &
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("errorReport", errorReport)
.add("errorReport", failureReason)
.add("reportTime", t.secs());
lc.log(log::INFO, "In RetrieveJob::failed(): reported error to client.");
} catch (cta::exception::Exception & ex) {
......@@ -89,7 +89,7 @@ void cta::RetrieveJob::failed(const std::string & errorReport, log::LogContext &
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("errorReport", errorReport)
.add("errorReport", failureReason)
.add("exceptionMsg", ex.getMessageValue())
.add("reportTime", t.secs());
lc.log(log::ERR, "In RetrieveJob::failed(): failed to report error to client.");
......
......@@ -88,11 +88,10 @@ public:
virtual void checkComplete();
/**
* Indicates that the job failed. Like for complete(), reason for failure
* should already be recorded in the object beforehand. Retry policy will
* Indicates that the job failed. Reason for failure is indicated. Retry policy will
* be applied by the scheduler.
*/
virtual void failed(const std::string & errorReport, cta::log::LogContext &);
virtual void failed(const std::string &failureReason, cta::log::LogContext &);
/**
* Helper function returning a reference to the currently selected tape file.
......@@ -147,11 +146,6 @@ public:
*/
uint64_t transferredSize;
/**
* The error string. This should be set before calling failed().
*/
std::string failureMessage;
}; // class RetrieveJob
} // namespace cta
......@@ -183,7 +183,7 @@ public:
std::string errorReportURL;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::TapeFile tapeFile;
virtual bool fail(log::LogContext & lc) = 0;
virtual bool fail(const std::string & failureReason, log::LogContext & lc) = 0;
virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0;
virtual ~ArchiveJob() {}
};
......@@ -334,7 +334,7 @@ public:
uint64_t selectedCopyNb;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
virtual bool fail(log::LogContext &) = 0; ///< Returns true if this failure is final (we will not retry).
virtual bool fail(const std::string & failureReason, log::LogContext &) = 0;
virtual ~RetrieveJob() {}
};
......
......@@ -620,7 +620,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
ASSERT_NE(0, archiveJobList.size());
// Validate we got the right file
ASSERT_EQ(archiveFileId, archiveJobList.front()->archiveFile.archiveFileID);
archiveJobList.front()->failed(cta::exception::Exception("Archive failed"), lc);
archiveJobList.front()->failed("Archive failed", lc);
}
// Then the request should be gone
ASSERT_EQ(0, archiveMount->getNextJobBatch(1,1,lc).size());
......
......@@ -34,7 +34,7 @@ namespace cta {
~MockArchiveJob() throw() {}
void failed(const cta::exception::Exception& ex, log::LogContext & lc) override {
void failed(const std::string& failureReason, log::LogContext& lc) override {
failures++;
}
......
......@@ -35,7 +35,7 @@ namespace cta {
}
virtual void asyncComplete() override { completes++; }
virtual void checkComplete() override {}
virtual void failed(const std::string & errorReport, cta::log::LogContext &) override { failures++; };
void failed(const std::string& failureReason, cta::log::LogContext&) override { failures++; };
~MockRetrieveJob() throw() {}
};
......
......@@ -51,11 +51,12 @@ namespace unitTests{
using namespace castor::tape::diskFile;
struct MockMigrationReportPacker : public MigrationReportPacker {
void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob) {}
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, const cta::exception::Exception& ex) {}
void reportEndOfSession() {}
void reportEndOfSessionWithErrors(const std::string msg, int error_code) {}
void disableBulk() {}
void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) override {}
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
const cta::exception::Exception& ex, cta::log::LogContext & lc) override {}
void reportEndOfSession(cta::log::LogContext & lc) override {}
void reportEndOfSessionWithErrors(const std::string msg, int error_code, cta::log::LogContext & lc) override {}
void disableBulk() override {}
MockMigrationReportPacker(cta::ArchiveMount *rm,cta::log::LogContext lc):
MigrationReportPacker(rm,lc) {}
};
......
......@@ -36,9 +36,7 @@ namespace daemon {
// constructor
//------------------------------------------------------------------------------
DiskWriteTask::DiskWriteTask(cta::RetrieveJob *retrieveJob, RecallMemoryManager& mm):
m_retrieveJob(retrieveJob),m_memManager(mm){
}
m_retrieveJob(retrieveJob),m_memManager(mm){}
//------------------------------------------------------------------------------
// DiskWriteTask::execute
......@@ -168,8 +166,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
params.add("errorMessage", e.getMessageValue());
logWithStat(cta::log::ERR, "File writing to disk failed.", lc);
lc.logBacktrace(cta::log::ERR, e.backtrace());
m_retrieveJob->failureMessage = e.getMessageValue();
reporter.reportFailedJob(std::move(m_retrieveJob));
reporter.reportFailedJob(std::move(m_retrieveJob), e);
watchdog.deleteParameter("stillOpenFileForThread"+
std::to_string((long long)threadID));
......
......@@ -68,20 +68,20 @@ namespace unitTests{
using namespace castor::tape::tapeserver::client;
using namespace castor::tape::diskFile;
struct MockRecallReportPacker : public RecallReportPacker {
void reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob) {
void reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob) override {
cta::threading::MutexLocker ml(m_mutex);
completeJobs++;
}
void reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob) {
void reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob, const cta::exception::Exception & ex) override {
cta::threading::MutexLocker ml(m_mutex);
failedJobs++;
}
void disableBulk() {}
void reportEndOfSession() {
void disableBulk() override {}
void reportEndOfSession() override {
cta::threading::MutexLocker ml(m_mutex);
endSessions++;
}
void reportEndOfSessionWithErrors(const std::string msg, int error_code) {
void reportEndOfSessionWithErrors(const std::string msg, int error_code) override {
cta::threading::MutexLocker ml(m_mutex);
endSessionsWithError++;
}
......
......@@ -25,6 +25,7 @@
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "catalogue/TapeFileWritten.hpp"
#include "common/utils/utils.hpp"
#include <memory>
#include <numeric>
......@@ -68,7 +69,9 @@ std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
const cta::exception::Exception &ex, cta::log::LogContext & lc){
std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob),ex));
std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
" " + ex.what();
std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob), failureLog));
cta::log::ScopedParamContainer params(lc);
params.add("type", "ReportError");
lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportFailedJob(), pushing a report.");
......@@ -285,12 +288,12 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPa
reportPacker.m_errorHappened=true;
{
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", m_ex.getMessageValue())
params.add("failureLog", m_failureLog)
.add("fileId", m_failedArchiveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportError::execute(): failing archive job after exception.");
}
try {
m_failedArchiveJob->failed(cta::exception::Exception(m_ex.getMessageValue()), reportPacker.m_lc);
m_failedArchiveJob->failed(m_failureLog, reportPacker.m_lc);
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
......
......@@ -172,15 +172,15 @@ private:
void execute(MigrationReportPacker& reportPacker) override;
};
class ReportError : public Report {
const cta::exception::Exception m_ex;
const std::string m_failureLog;
/**
* The failed archive job to be reported immediately
*/
std::unique_ptr<cta::ArchiveJob> m_failedArchiveJob;
public:
ReportError(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, const cta::exception::Exception &ex):
m_ex(ex), m_failedArchiveJob(std::move(failedArchiveJob)){}
ReportError(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, std::string &failureLog):
m_failureLog(failureLog), m_failedArchiveJob(std::move(failedArchiveJob)){}
void execute(MigrationReportPacker& reportPacker) override;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment