XrdSsiCtaRequestMessage.cpp 62.4 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include <cmdline/CtaAdminCmdParse.hpp>
23
#include "XrdSsiCtaRequestMessage.hpp"
24
#include "XrdCtaAdminLs.hpp"
25
#include "XrdCtaArchiveFileLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaRepackLs.hpp"
34
#include "XrdCtaRequesterMountRuleLs.hpp"
35
#include "XrdCtaShowQueues.hpp"
36
#include "XrdCtaTapeLs.hpp"
37
#include "XrdCtaTapeFileLs.hpp"
38
#include "XrdCtaStorageClassLs.hpp"
39
#include "XrdCtaTapePoolLs.hpp"
40
#include "XrdCtaDiskSystemLs.hpp"
41

42
43
namespace cta {
namespace xrd {
44

45
46
47
48
49
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


50
51
52
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
53
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
54
55
56
57
   return (cmd << 16) + subcmd;
}


58
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
59
60
{
   // Branch on the Request payload type
61

62
63
64
   switch(request.request_case())
   {
      using namespace cta::xrd;
65

66
      case Request::kAdmincmd: {
67
68
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
69
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
70
71
72
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);

73
74
         cta::utils::Timer t;

75
76
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
77
78
79
80
81

         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

82
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
83
               processAdmin_Add(response);
84
85
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
86
               processAdmin_Ch(response);
87
88
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
89
               processAdmin_Rm(response);
90
91
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
92
               processAdmin_Ls(response, stream);
93
94
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
95
               processArchiveFile_Ls(response, stream);
96
97
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
98
               processArchiveRoute_Add(response);
99
100
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
101
               processArchiveRoute_Ch(response);
102
103
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
104
               processArchiveRoute_Rm(response);
105
106
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
107
               processArchiveRoute_Ls(response, stream);
108
109
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
110
               processDrive_Up(response);
111
112
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
113
               processDrive_Down(response);
114
115
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
116
               processDrive_Ls(response, stream);
117
               break;
118
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
119
               processDrive_Rm(response);
120
               break;
121
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
122
               processFailedRequest_Ls(response, stream);
123
               break;
124
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
125
               processGroupMountRule_Add(response);
126
127
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
128
               processGroupMountRule_Ch(response);
129
130
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
131
               processGroupMountRule_Rm(response);
132
               break;
133
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
134
               processGroupMountRule_Ls(response, stream);
135
136
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
137
               processListPendingArchives(response, stream);
138
139
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
140
               processListPendingRetrieves(response, stream);
141
142
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
143
               processLogicalLibrary_Add(response);
144
145
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
146
               processLogicalLibrary_Ch(response);
147
148
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
149
               processLogicalLibrary_Rm(response);
150
151
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
152
               processLogicalLibrary_Ls(response, stream);
153
154
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
155
               processMountPolicy_Add(response);
156
157
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
158
               processMountPolicy_Ch(response);
159
160
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
161
               processMountPolicy_Rm(response);
162
163
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
164
               processMountPolicy_Ls(response, stream);
165
166
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
167
               processRepack_Add(response);
168
169
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
170
               processRepack_Rm(response);
171
172
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
173
               processRepack_Ls(response, stream);
174
175
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
176
               processRepack_Err(response);
177
178
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
179
               processRequesterMountRule_Add(response);
180
181
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
182
               processRequesterMountRule_Ch(response);
183
184
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
185
               processRequesterMountRule_Rm(response);
186
187
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
188
               processRequesterMountRule_Ls(response, stream);
189
190
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
191
               processShowQueues(response, stream);
192
193
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
194
               processStorageClass_Add(response);
195
196
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
197
               processStorageClass_Ch(response);
198
199
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
200
               processStorageClass_Rm(response);
201
202
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
203
               processStorageClass_Ls(response, stream);
204
205
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
206
               processTape_Add(response);
207
208
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
209
               processTape_Ch(response);
210
211
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
212
               processTape_Rm(response);
213
214
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
215
               processTape_Reclaim(response);
216
217
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
218
               processTape_Ls(response, stream);
219
220
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
221
               processTape_Label(response);
222
               break;
223
224
225
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
226
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
227
               processTapePool_Add(response);
228
229
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
230
               processTapePool_Ch(response);
231
232
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
233
               processTapePool_Rm(response);
234
235
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
236
               processTapePool_Ls(response, stream);
237
               break;
238
239
240
241
242
243
244
245
246
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
247
248
249
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
               break;  
250
               
251
252
253
254
255
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
256
257
258
259
260
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
261
         break;
262

263
      case Request::kNotification:
264
265
266
267
268
269
         // Validate that instance name in SSS key and instance name in Protocol buffer match
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
            throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                              "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
         }

270
         // Map the Workflow Event to a method
271
272
273
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

274
275
276
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
277
278
279
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
280
281
282
283
284
285
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
286
287
288
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
289
290
291
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
292

293
294
295
296
297
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
298
         break;
299

300
301
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
302
303

      default:
304
305
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
306
307
308
309
310
   }
}



311
312
// EOS Workflow commands

313
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
314
315
316
317
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
318
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
319
320
321
322
323
324

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

325
326


327
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
328
{
329
   // Validate received protobuf
330
331
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
332

333
   // Unpack message
334
335
336
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
337

338
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
339
   if(notification.file().xattr().end() == storageClassItor) {
340
341
342
343
344
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
345
346
   }
   const std::string storageClass = storageClassItor->second;
347
   if(storageClass.empty()) {
348
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
349
   }
350
351

   cta::utils::Timer t;
352
   uint64_t archiveFileId;
353

354
355
356
357
358
359
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
360

361
362
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
363
364
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
365
         .add("fileId", archiveFileId)
366
         .add("schedulerTime", t.secs());
367
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
368

369
   // Set ArchiveFileId in xattrs
370
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
371
372
   
   // Set the storage class in xattrs
373
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
374
375
376
377
378

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

379
380


381
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
382
{
383
   // Validate received protobuf
384
385
386
387
388
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
389

390
   // Unpack message
391
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
392
   if(notification.file().xattr().end() == storageClassItor) {
393
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
394
395
   }

396
   // For testing: this storage class will always fail
397
398
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
399
400
   }

401
402
403
404
405
406
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

407
   cta::common::dataStructures::ArchiveRequest request;
408
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
409
410
411
412
413
414
415
416
417
418
419
420
421
422
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
423

424
425
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
426
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
427
   if(notification.file().xattr().end() == archiveFileIdItor) {
428
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
429
430
431
432
433
434
435
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
436

437
438
   cta::utils::Timer t;

439
440
   // Queue the request
   m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
441

442
443
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
444
445
446
   params.add("fileId", archiveFileId);
   params.add("schedulerTime", t.secs());
   params.add("requesterInstance", notification.wf().requester_instance());
447
   m_lc.log(cta::log::INFO, "In RequestMessage::processCLOSEW(): queued file for archive.");
448

449
   // Set response type
450
451
452
453
454
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



455
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
456
{
457
   // Validate received protobuf
458
459
460
461
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
462

463
   // Unpack message
464
   cta::common::dataStructures::RetrieveRequest request;
465
466
467
468
469
470
471
472
473
474
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
475

476
477
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
478
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
479
   if(notification.file().xattr().end() == archiveFileIdItor) {
480
     // Fall back to the old xattr format
481
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
482
483
484
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
485
486
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
487
488
489
490
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
491
492
   
   // Activity value is a string. The parameter might be present or not.
493
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
494
     request.activity = notification.file().xattr().at("activity");
495
   }
496

497
498
   cta::utils::Timer t;

499
   // Queue the request
500
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
501

502
503
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
504
505
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
506
507
508
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
509
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
510

511
   // Set response type and add retrieve request reference as an extended attribute.
512
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
513
514
515
516
517
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



518
519
520
521
522
523
524
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
525
   cta::common::dataStructures::CancelRetrieveRequest request;
526
527
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
528
529
530

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
531
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
532
   if(notification.file().xattr().end() == archiveFileIdItor) {
533
     // Fall back to the old xattr format
534
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
535
536
537
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
538
539
540
541
542
543
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
544
545
   
   // The request Id should be stored as an extended attribute
546
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
547
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
548
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
549
   }
550
551
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
552
553

   // Queue the request
554
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
555

556
557
   cta::utils::Timer t;

558
559
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
560
561
562
563
564
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
565

566
   // Set response type and remove reference to retrieve request in EOS extended attributes.
567
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
568
569
570
571
572
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



573
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
574
{
575
   // Validate received protobuf
576
577
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
578

579
   // Unpack message
580
   cta::common::dataStructures::DeleteArchiveRequest request;
581
582
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
583
584
585
586

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t

587
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
588
   if(notification.file().xattr().end() == archiveFileIdItor) {
589
     // Fall back to the old xattr format
590
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
591
592
593
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
594
595
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
596
597
598
599
600
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }

601
   // Delete the file from the catalogue
602
   cta::utils::Timer t;
603
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
604
605
606

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
607
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs());
608
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
609
610
611
612
613

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

614
615


616
617
// Admin commands

618
619
620
621
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

622
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
623
624
625

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
626
627
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
628
629
630
631
632
633
634
635
636
637
638
639
640
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
641
   params.add("adminTime", t.secs());
642
643
644
645
646
   m_lc.log(cta::log::INFO, log_msg);
}



647
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
648
649
650
{
   using namespace cta::admin;

651
652
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
653
654

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
655
656
657
658
659
660

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



661
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
662
663
664
{
   using namespace cta::admin;

665
666
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
667
668

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
669
670
671
672
673
674

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



675
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
676
677
678
{
   using namespace cta::admin;

679
   auto &username = getRequired(OptionString::USERNAME);
680
681

   m_catalogue.deleteAdminUser(username);
682
683
684
685
686
687

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



688
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
689
{
690
  using namespace cta::admin;
691

692
693
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
694

695
  response.set_show_header(HeaderType::ADMIN_LS);
696
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
697
698
699
700
}



701
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
702
{
703
  using namespace cta::admin;
704

705
706
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
707

708
709
710
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
711

712
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
713
714
715
716
}



717
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
718
719
720
{
   using namespace cta::admin;

721
722
723
724
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
725

726
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
727

728
729
730
731
732
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



733
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
734
735
736
{
   using namespace cta::admin;

737
738
739
740
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
741

742
   if(comment) {
743
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, scn, cn, comment.value());
744
   }
745
   if(tapepool) {
746
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, scn, cn, tapepool.value());
747
   }
748
749
750
751
752
753

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



754
void RequestMessage::processArchiveRoute_Rm(cta::xrd::Response &response)
755
756
757
{
   using namespace cta::admin;

758
759
   auto &scn = getRequired(OptionString::STORAGE_CLASS);
   auto &cn  = getRequired(OptionUInt64::COPY_NUMBER);
760

761
   m_catalogue.deleteArchiveRoute(scn, cn);
762
763
764
765
766
767

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



768
void RequestMessage::processArchiveRoute_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
769
{
770
  using namespace cta::admin;
771

772
773
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveRouteLsStream(*this, m_catalogue, m_scheduler);
774

775
776
  response.set_show_header(HeaderType::ARCHIVEROUTE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
777
778
779
780
}



781
void RequestMessage::processDrive_Up(cta::xrd::Response &response)
782
783
784
{
   using namespace cta::admin;

785
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Up);
786

787
   response.set_message_txt(cmdlineOutput);
788
789
790
791
792
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



793
void RequestMessage::processDrive_Down(cta::xrd::Response &response)
794
795
796
{
   using namespace cta::admin;

797
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Down);
798

799
   response.set_message_txt(cmdlineOutput);
800
801
802
803
804
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



805
void RequestMessage::processDrive_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
806
{
807
  using namespace cta::admin;
808

809
810
  // Create a XrdSsi stream object to return the results
  stream = new DriveLsStream(*this, m_catalogue, m_scheduler, m_cliIdentity, m_lc);
811

812
813
  response.set_show_header(HeaderType::DRIVE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
814
815
816
817
}



818
void RequestMessage::processDrive_Rm(cta::xrd::Response &response)
819
820
821
822
823
824
{
   using namespace cta::admin;

   std::stringstream cmdlineOutput;

   auto regex = getRequired(OptionString::DRIVE);
825
   regex = '^' + regex + '$';
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
   cta::utils::Regex driveNameRegex(regex.c_str());
   auto driveStates = m_scheduler.getDriveStates(m_cliIdentity, m_lc);
   bool drivesFound = false;
   for(auto driveState: driveStates)
   {
      const auto regexResult = driveNameRegex.exec(driveState.driveName);
      if(!regexResult.empty())
      {
         if(driveState.driveStatus == cta::common::dataStructures::DriveStatus::Down     ||
            driveState.driveStatus == cta::common::dataStructures::DriveStatus::Shutdown ||
            driveState.driveStatus == cta::common::dataStructures::DriveStatus::Unknown  ||
            has_flag(OptionBoolean::FORCE))
         {
            m_scheduler.removeDrive(m_cliIdentity, driveState.driveName, m_lc);
            cmdlineOutput << "Drive " << driveState.driveName << " removed"
                          << (has_flag(OptionBoolean::FORCE) ? " (forced)." : ".") << std::endl;            
         } else {
            cmdlineOutput << "Drive " << driveState.driveName << " in state "
                          << cta::common::dataStructures::toString(driveState.driveStatus)
                          << " and force is not set (skipped)." << std::endl;
         }
         drivesFound = true;
      }
   }

   if(!drivesFound) {