XrdSsiCtaRequestMessage.cpp 61.8 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include <cmdline/CtaAdminCmdParse.hpp>
23
#include "XrdSsiCtaRequestMessage.hpp"
24
#include "XrdCtaAdminLs.hpp"
25
#include "XrdCtaArchiveFileLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaRepackLs.hpp"
34
#include "XrdCtaRequesterMountRuleLs.hpp"
35
#include "XrdCtaShowQueues.hpp"
36
#include "XrdCtaTapeLs.hpp"
37
#include "XrdCtaTapeFileLs.hpp"
38
#include "XrdCtaStorageClassLs.hpp"
39
#include "XrdCtaTapePoolLs.hpp"
40
#include "XrdCtaDiskSystemLs.hpp"
41

42
43
namespace cta {
namespace xrd {
44

45
46
47
48
49
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


50
51
52
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
53
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
54
55
56
57
   return (cmd << 16) + subcmd;
}


58
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
59
60
{
   // Branch on the Request payload type
61

62
63
64
   switch(request.request_case())
   {
      using namespace cta::xrd;
65

66
      case Request::kAdmincmd: {
67
68
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
69
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
70
71
72
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);

73
74
         cta::utils::Timer t;

75
76
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
77
78
79
80
81

         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

82
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
83
               processAdmin_Add(response);
84
85
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
86
               processAdmin_Ch(response);
87
88
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
89
               processAdmin_Rm(response);
90
91
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
92
               processAdmin_Ls(response, stream);
93
94
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
95
               processArchiveFile_Ls(response, stream);
96
97
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
98
               processArchiveRoute_Add(response);
99
100
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
101
               processArchiveRoute_Ch(response);
102
103
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
104
               processArchiveRoute_Rm(response);
105
106
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
107
               processArchiveRoute_Ls(response, stream);
108
109
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
110
               processDrive_Up(response);
111
112
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
113
               processDrive_Down(response);
114
115
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
116
               processDrive_Ls(response, stream);
117
               break;
118
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
119
               processDrive_Rm(response);
120
               break;
121
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
122
               processFailedRequest_Ls(response, stream);
123
               break;
124
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
125
               processGroupMountRule_Add(response);
126
127
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
128
               processGroupMountRule_Ch(response);
129
130
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
131
               processGroupMountRule_Rm(response);
132
               break;
133
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
134
               processGroupMountRule_Ls(response, stream);
135
136
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
137
               processListPendingArchives(response, stream);
138
139
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
140
               processListPendingRetrieves(response, stream);
141
142
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
143
               processLogicalLibrary_Add(response);
144
145
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
146
               processLogicalLibrary_Ch(response);
147
148
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
149
               processLogicalLibrary_Rm(response);
150
151
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
152
               processLogicalLibrary_Ls(response, stream);
153
154
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
155
               processMountPolicy_Add(response);
156
157
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
158
               processMountPolicy_Ch(response);
159
160
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
161
               processMountPolicy_Rm(response);
162
163
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
164
               processMountPolicy_Ls(response, stream);
165
166
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
167
               processRepack_Add(response);
168
169
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
170
               processRepack_Rm(response);
171
172
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
173
               processRepack_Ls(response, stream);
174
175
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
176
               processRepack_Err(response);
177
178
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
179
               processRequesterMountRule_Add(response);
180
181
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
182
               processRequesterMountRule_Ch(response);
183
184
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
185
               processRequesterMountRule_Rm(response);
186
187
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
188
               processRequesterMountRule_Ls(response, stream);
189
190
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
191
               processShowQueues(response, stream);
192
193
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
194
               processStorageClass_Add(response);
195
196
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
197
               processStorageClass_Ch(response);
198
199
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
200
               processStorageClass_Rm(response);
201
202
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
203
               processStorageClass_Ls(response, stream);
204
205
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
206
               processTape_Add(response);
207
208
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
209
               processTape_Ch(response);
210
211
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
212
               processTape_Rm(response);
213
214
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
215
               processTape_Reclaim(response);
216
217
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
218
               processTape_Ls(response, stream);
219
220
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
221
               processTape_Label(response);
222
               break;
223
224
225
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
226
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
227
               processTapePool_Add(response);
228
229
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
230
               processTapePool_Ch(response);
231
232
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
233
               processTapePool_Rm(response);
234
235
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
236
               processTapePool_Ls(response, stream);
237
               break;
238
239
240
241
242
243
244
245
246
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
247
248
249
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
               break;  
250
               
251
252
253
254
255
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
256
257
258
259
260
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
261
         break;
262

263
      case Request::kNotification:
264
265
266
267
268
269
         // Validate that instance name in SSS key and instance name in Protocol buffer match
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
            throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                              "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
         }

270
         // Map the Workflow Event to a method
271
272
273
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

274
275
276
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
277
278
279
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
280
281
282
283
284
285
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
286
287
288
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
289
290
291
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
292

293
294
295
296
297
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
298
         break;
299

300
301
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
302
303

      default:
304
305
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
306
307
308
309
310
   }
}



311
312
// EOS Workflow commands

313
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
314
315
316
317
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
318
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
319
320
321
322
323
324

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

325
326


327
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
328
{
329
   // Validate received protobuf
330
331
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
332

333
   // Unpack message
334
335
336
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
337

338
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
339
   if(notification.file().xattr().end() == storageClassItor) {
340
341
342
343
344
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
345
346
   }
   const std::string storageClass = storageClassItor->second;
347
   if(storageClass.empty()) {
348
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
349
   }
350
351
352

   cta::utils::Timer t;

353
   const uint64_t archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
354

355
356
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
357
358
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
359
         .add("fileId", archiveFileId)
360
         .add("schedulerTime", t.secs());
361
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
362

363
   // Set ArchiveFileId in xattrs
364
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
365
366
   
   // Set the storage class in xattrs
367
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
368
369
370
371
372

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

373
374


375
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
376
{
377
   // Validate received protobuf
378
379
380
381
382
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
383

384
   // Unpack message
385
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
386
   if(notification.file().xattr().end() == storageClassItor) {
387
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
388
389
   }

390
   cta::common::dataStructures::ArchiveRequest request;
391
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
392
393
394
395
396
397
398
399
400
401
402
403
404
405
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
406

407
408
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
409

410
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
411
   if(notification.file().xattr().end() == archiveFileIdItor) {
412
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
413
414
415
416
417
418
419
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
420

421
422
   cta::utils::Timer t;

423
424
   // Queue the request
   m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
425

426
427
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
428
   params.add("fileId", archiveFileId).add("schedulerTime", t.secs());
429
   m_lc.log(cta::log::INFO, "In RequestMessage::processCLOSEW(): queued file for archive.");
430

431
   // Set response type
432
433
434
435
436
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



437
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
438
{
439
   // Validate received protobuf
440
441
442
443
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
444

445
   // Unpack message
446
   cta::common::dataStructures::RetrieveRequest request;
447
448
449
450
451
452
453
454
455
456
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
457

458
459
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
460
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
461
   if(notification.file().xattr().end() == archiveFileIdItor) {
462
     // Fall back to the old xattr format
463
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
464
465
466
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
467
468
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
469
470
471
472
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
473
474
   
   // Activity value is a string. The parameter might be present or not.
475
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
476
     request.activity = notification.file().xattr().at("activity");
477
   }
478

479
480
   cta::utils::Timer t;

481
   // Queue the request
482
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
483

484
485
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
486
487
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
488
489
490
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
491
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
492

493
   // Set response type and add retrieve request reference as an extended attribute.
494
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
495
496
497
498
499
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



500
501
502
503
504
505
506
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
507
   cta::common::dataStructures::CancelRetrieveRequest request;
508
509
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
510
511
512

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
513
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
514
   if(notification.file().xattr().end() == archiveFileIdItor) {
515
     // Fall back to the old xattr format
516
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
517
518
519
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
520
521
522
523
524
525
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
526
527
   
   // The request Id should be stored as an extended attribute
528
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
529
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
530
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
531
   }
532
533
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
534
535

   // Queue the request
536
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
537

538
539
   cta::utils::Timer t;

540
541
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
542
543
544
545
546
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
547

548
   // Set response type and remove reference to retrieve request in EOS extended attributes.
549
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
550
551
552
553
554
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



555
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
556
{
557
   // Validate received protobuf
558
559
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
560

561
   // Unpack message
562
   cta::common::dataStructures::DeleteArchiveRequest request;
563
564
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
565
566
567
568

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t

569
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
570
   if(notification.file().xattr().end() == archiveFileIdItor) {
571
     // Fall back to the old xattr format
572
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
573
574
575
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
576
577
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
578
579
580
581
582
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }

583
   // Delete the file from the catalogue
584
   cta::utils::Timer t;
585
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
586
587
588

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
589
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs());
590
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
591
592
593
594
595

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

596
597


598
599
// Admin commands

600
601
602
603
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

604
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
605
606
607

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
608
609
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
610
611
612
613
614
615
616
617
618
619
620
621
622
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
623
   params.add("adminTime", t.secs());
624
625
626
627
628
   m_lc.log(cta::log::INFO, log_msg);
}



629
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
630
631
632
{
   using namespace cta::admin;

633
634
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
635
636

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
637
638
639
640
641
642

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



643
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
644
645
646
{
   using namespace cta::admin;

647
648
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
649
650

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
651
652
653
654
655
656

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



657
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
658
659
660
{
   using namespace cta::admin;

661
   auto &username = getRequired(OptionString::USERNAME);
662
663

   m_catalogue.deleteAdminUser(username);
664
665
666
667
668
669

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



670
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
671
{
672
  using namespace cta::admin;
673

674
675
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
676

677
  response.set_show_header(HeaderType::ADMIN_LS);
678
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
679
680
681
682
}



683
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
684
{
685
  using namespace cta::admin;
686

687
688
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
689

690
691
692
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
693

694
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
695
696
697
698
}



699
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
700
701
702
{
   using namespace cta::admin;

703
704
705
706
707
   auto &in       = getRequired(OptionString::INSTANCE);
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
708

709
   m_catalogue.createArchiveRoute(m_cliIdentity, in, scn, cn, tapepool, comment);
710

711
712
713
714
715
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



716
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
717
718
719
{
   using namespace cta::admin;

720
721
722
723
724
   auto &in       = getRequired(OptionString::INSTANCE);
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
725

726
727
   if(comment) {
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, in, scn, cn, comment.value());
728
   }
729
730
   if(tapepool) {
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, in, scn, cn, tapepool.value());
731
   }
732
733
734
735
736
737

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



738
void RequestMessage::processArchiveRoute_Rm(cta::xrd::Response &response)
739
740
741
{
   using namespace cta::admin;

742
743
744
   auto &in  = getRequired(OptionString::INSTANCE);
   auto &scn = getRequired(OptionString::STORAGE_CLASS);
   auto &cn  = getRequired(OptionUInt64::COPY_NUMBER);
745
746

   m_catalogue.deleteArchiveRoute(in, scn, cn);
747
748
749
750
751
752

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



753
void RequestMessage::processArchiveRoute_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
754
{
755
  using namespace cta::admin;
756

757
758
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveRouteLsStream(*this, m_catalogue, m_scheduler);
759

760
761
  response.set_show_header(HeaderType::ARCHIVEROUTE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
762
763
764
765
}



766
void RequestMessage::processDrive_Up(cta::xrd::Response &response)
767
768
769
{
   using namespace cta::admin;

770
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Up);
771

772
   response.set_message_txt(cmdlineOutput);
773
774
775
776
777
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



778
void RequestMessage::processDrive_Down(cta::xrd::Response &response)
779
780
781
{
   using namespace cta::admin;

782
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Down);
783

784
   response.set_message_txt(cmdlineOutput);
785
786
787
788
789
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



790
void RequestMessage::processDrive_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
791
{
792
  using namespace cta::admin;
793

794
795
  // Create a XrdSsi stream object to return the results
  stream = new DriveLsStream(*this, m_catalogue, m_scheduler, m_cliIdentity, m_lc);
796

797
798
  response.set_show_header(HeaderType::DRIVE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
799
800
801
802
}



803
void RequestMessage::processDrive_Rm(cta::xrd::Response &response)
804
805
806
807
808
809
{
   using namespace cta::admin;

   std::stringstream cmdlineOutput;

   auto regex = getRequired(OptionString::DRIVE);
810
   regex = '^' + regex + '$';
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
   cta::utils::Regex driveNameRegex(regex.c_str());
   auto driveStates = m_scheduler.getDriveStates(m_cliIdentity, m_lc);
   bool drivesFound = false;
   for(auto driveState: driveStates)
   {
      const auto regexResult = driveNameRegex.exec(driveState.driveName);
      if(!regexResult.empty())
      {
         if(driveState.driveStatus == cta::common::dataStructures::DriveStatus::Down     ||
            driveState.driveStatus == cta::common::dataStructures::DriveStatus::Shutdown ||
            driveState.driveStatus == cta::common::dataStructures::DriveStatus::Unknown  ||
            has_flag(OptionBoolean::FORCE))
         {
            m_scheduler.removeDrive(m_cliIdentity, driveState.driveName, m_lc);
            cmdlineOutput << "Drive " << driveState.driveName << " removed"
                          << (has_flag(OptionBoolean::FORCE) ? " (forced)." : ".") << std::endl;            
         } else {
            cmdlineOutput << "Drive " << driveState.driveName << " in state "
                          << cta::common::dataStructures::toString(driveState.driveStatus)
                          << " and force is not set (skipped)." << std::endl;
         }
         drivesFound = true;
      }
   }

   if(!drivesFound) {
837
      cmdlineOutput << "No drives match \"" << regex << "\". No drives were removed." << std::endl;
838
839
840
841
842
843
844
845
   }

   response.set_message_txt(cmdlineOutput.str());
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



846
void RequestMessage::processFailedRequest_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
847
{
848
  using namespace cta::admin;
849

850
  stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_scheddb, m_lc);
851

852
853