XrdSsiCtaRequestMessage.cpp 64.7 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include <cmdline/CtaAdminCmdParse.hpp>
23
#include "XrdSsiCtaRequestMessage.hpp"
24
#include "XrdCtaAdminLs.hpp"
25
#include "XrdCtaArchiveFileLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaRepackLs.hpp"
34
#include "XrdCtaRequesterMountRuleLs.hpp"
35
#include "XrdCtaShowQueues.hpp"
36
#include "XrdCtaTapeLs.hpp"
37
#include "XrdCtaTapeFileLs.hpp"
38
#include "XrdCtaStorageClassLs.hpp"
39
#include "XrdCtaTapePoolLs.hpp"
40
#include "XrdCtaDiskSystemLs.hpp"
41
#include "XrdCtaVirtualOrganizationLs.hpp"
42

43
44
namespace cta {
namespace xrd {
45

46
47
48
49
50
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


51
52
53
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
54
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
55
56
57
58
   return (cmd << 16) + subcmd;
}


59
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
60
61
{
   // Branch on the Request payload type
62

63
64
65
   switch(request.request_case())
   {
      using namespace cta::xrd;
66

67
      case Request::kAdmincmd: {
68
69
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
70
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
71
72
73
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);

74
75
         cta::utils::Timer t;

76
77
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
78
79
80
81
82

         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

83
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
84
               processAdmin_Add(response);
85
86
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
87
               processAdmin_Ch(response);
88
89
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
90
               processAdmin_Rm(response);
91
92
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
93
               processAdmin_Ls(response, stream);
94
95
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
96
               processArchiveFile_Ls(response, stream);
97
98
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
99
               processArchiveRoute_Add(response);
100
101
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
102
               processArchiveRoute_Ch(response);
103
104
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
105
               processArchiveRoute_Rm(response);
106
107
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
108
               processArchiveRoute_Ls(response, stream);
109
110
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
111
               processDrive_Up(response);
112
113
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
114
               processDrive_Down(response);
115
116
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
117
               processDrive_Ls(response, stream);
118
               break;
119
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
120
               processDrive_Rm(response);
121
               break;
122
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
123
               processFailedRequest_Ls(response, stream);
124
               break;
125
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
126
               processGroupMountRule_Add(response);
127
128
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
129
               processGroupMountRule_Ch(response);
130
131
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
132
               processGroupMountRule_Rm(response);
133
               break;
134
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
135
               processGroupMountRule_Ls(response, stream);
136
137
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
138
               processListPendingArchives(response, stream);
139
140
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
141
               processListPendingRetrieves(response, stream);
142
143
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
144
               processLogicalLibrary_Add(response);
145
146
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
147
               processLogicalLibrary_Ch(response);
148
149
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
150
               processLogicalLibrary_Rm(response);
151
152
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
153
               processLogicalLibrary_Ls(response, stream);
154
155
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
156
               processMountPolicy_Add(response);
157
158
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
159
               processMountPolicy_Ch(response);
160
161
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
162
               processMountPolicy_Rm(response);
163
164
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
165
               processMountPolicy_Ls(response, stream);
166
167
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
168
               processRepack_Add(response);
169
170
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
171
               processRepack_Rm(response);
172
173
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
174
               processRepack_Ls(response, stream);
175
176
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
177
               processRepack_Err(response);
178
179
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
180
               processRequesterMountRule_Add(response);
181
182
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
183
               processRequesterMountRule_Ch(response);
184
185
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
186
               processRequesterMountRule_Rm(response);
187
188
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
189
               processRequesterMountRule_Ls(response, stream);
190
191
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
192
               processShowQueues(response, stream);
193
194
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
195
               processStorageClass_Add(response);
196
197
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
198
               processStorageClass_Ch(response);
199
200
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
201
               processStorageClass_Rm(response);
202
203
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
204
               processStorageClass_Ls(response, stream);
205
206
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
207
               processTape_Add(response);
208
209
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
210
               processTape_Ch(response);
211
212
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
213
               processTape_Rm(response);
214
215
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
216
               processTape_Reclaim(response);
217
218
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
219
               processTape_Ls(response, stream);
220
221
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
222
               processTape_Label(response);
223
               break;
224
225
226
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
227
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
228
               processTapePool_Add(response);
229
230
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
231
               processTapePool_Ch(response);
232
233
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
234
               processTapePool_Rm(response);
235
236
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
237
               processTapePool_Ls(response, stream);
238
               break;
239
240
241
242
243
244
245
246
247
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
248
249
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
250
251
252
253
254
255
256
257
258
259
260
261
262
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
263
264
265
266
267
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
268
269
270
271
272
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
273
         break;
274

275
      case Request::kNotification:
276
277
278
279
280
281
         // Validate that instance name in SSS key and instance name in Protocol buffer match
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
            throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                              "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
         }

282
         // Map the Workflow Event to a method
283
284
285
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

286
287
288
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
289
290
291
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
292
293
294
295
296
297
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
298
299
300
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
301
302
303
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
304

305
306
307
308
309
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
310
         break;
311

312
313
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
314
315

      default:
316
317
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
318
319
320
321
322
   }
}



323
324
// EOS Workflow commands

325
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
326
327
328
329
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
330
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
331
332
333
334
335
336

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

337
338


339
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
340
{
341
   // Validate received protobuf
342
343
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
344

345
   // Unpack message
346
347
348
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
349

350
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
351
   if(notification.file().xattr().end() == storageClassItor) {
352
353
354
355
356
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
357
358
   }
   const std::string storageClass = storageClassItor->second;
359
   if(storageClass.empty()) {
360
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
361
   }
362
363

   cta::utils::Timer t;
364
   uint64_t archiveFileId;
365

366
367
368
369
370
371
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
372

373
374
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
375
376
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
377
         .add("fileId", archiveFileId)
378
         .add("schedulerTime", t.secs());
379
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
380

381
   // Set ArchiveFileId in xattrs
382
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
383
384
   
   // Set the storage class in xattrs
385
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
386
387
388
389
390

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

391
392


393
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
394
{
395
   // Validate received protobuf
396
397
398
399
400
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
401

402
   // Unpack message
403
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
404
   if(notification.file().xattr().end() == storageClassItor) {
405
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
406
407
   }

408
   // For testing: this storage class will always fail
409
410
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
411
412
   }

413
414
415
416
417
418
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

419
   cta::common::dataStructures::ArchiveRequest request;
420
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
421
422
423
424
425
426
427
428
429
430
431
432
433
434
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
435

436
437
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
438
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
439
   if(notification.file().xattr().end() == archiveFileIdItor) {
440
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
441
442
443
444
445
446
447
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
448

449
450
   cta::utils::Timer t;

451
452
   // Queue the request
   m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
453

454
455
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
456
457
458
   params.add("fileId", archiveFileId);
   params.add("schedulerTime", t.secs());
   params.add("requesterInstance", notification.wf().requester_instance());
459
   m_lc.log(cta::log::INFO, "In RequestMessage::processCLOSEW(): queued file for archive.");
460

461
   // Set response type
462
463
464
465
466
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



467
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
468
{
469
   // Validate received protobuf
470
471
472
473
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
474

475
   // Unpack message
476
   cta::common::dataStructures::RetrieveRequest request;
477
478
479
480
481
482
483
484
485
486
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
487

488
489
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
490
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
491
   if(notification.file().xattr().end() == archiveFileIdItor) {
492
     // Fall back to the old xattr format
493
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
494
495
496
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
497
498
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
499
500
501
502
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
503
504
   
   // Activity value is a string. The parameter might be present or not.
505
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
506
     request.activity = notification.file().xattr().at("activity");
507
   }
508

509
510
   cta::utils::Timer t;

511
   // Queue the request
512
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
513

514
515
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
516
517
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
518
519
520
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
521
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
522

523
   // Set response type and add retrieve request reference as an extended attribute.
524
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
525
526
527
528
529
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



530
531
532
533
534
535
536
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
537
   cta::common::dataStructures::CancelRetrieveRequest request;
538
539
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
540
541
542

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
543
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
544
   if(notification.file().xattr().end() == archiveFileIdItor) {
545
     // Fall back to the old xattr format
546
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
547
548
549
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
550
551
552
553
554
555
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
556
557
   
   // The request Id should be stored as an extended attribute
558
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
559
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
560
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
561
   }
562
563
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
564
565

   // Queue the request
566
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
567

568
569
   cta::utils::Timer t;

570
571
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
572
573
574
575
576
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
577

578
   // Set response type and remove reference to retrieve request in EOS extended attributes.
579
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
580
581
582
583
584
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



585
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
586
{
587
   // Validate received protobuf
588
589
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
590

591
   // Unpack message
592
   cta::common::dataStructures::DeleteArchiveRequest request;
593
594
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
595
596
597
598

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t

599
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
600
   if(notification.file().xattr().end() == archiveFileIdItor) {
601
     // Fall back to the old xattr format
602
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
603
604
605
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
606
607
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
608
609
610
611
612
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }

613
   // Delete the file from the catalogue
614
   cta::utils::Timer t;
615
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
616
617
618

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
619
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs());
620
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
621
622
623
624
625

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

626
627


628
629
// Admin commands

630
631
632
633
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

634
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
635
636
637

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
638
639
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
640
641
642
643
644
645
646
647
648
649
650
651
652
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
653
   params.add("adminTime", t.secs());
654
655
656
657
658
   m_lc.log(cta::log::INFO, log_msg);
}



659
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
660
661
662
{
   using namespace cta::admin;

663
664
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
665
666

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
667
668
669
670
671
672

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



673
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
674
675
676
{
   using namespace cta::admin;

677
678
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
679
680

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
681
682
683
684
685
686

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



687
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
688
689
690
{
   using namespace cta::admin;

691
   auto &username = getRequired(OptionString::USERNAME);
692
693

   m_catalogue.deleteAdminUser(username);
694
695
696
697
698
699

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



700
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
701
{
702
  using namespace cta::admin;
703

704
705
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
706

707
  response.set_show_header(HeaderType::ADMIN_LS);
708
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
709
710
711
712
}



713
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
714
{
715
  using namespace cta::admin;
716

717
718
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
719

720
721
722
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
723

724
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
725
726
727
728
}



729
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
730
731
732
{
   using namespace cta::admin;

733
734
735
736
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
737

738
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
739

740
741
742
743
744
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



745
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
746
747
748
{
   using namespace cta::admin;

749
750
751
752
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
753

754
   if(comment) {
755
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, scn, cn, comment.value());
756
   }
757
   if(tapepool) {
758
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, scn, cn, tapepool.value());
759
   }
760
761
762
763
764
765

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



766
void RequestMessage::processArchiveRoute_Rm(cta::xrd::Response &response)
767
768
769
{
   using namespace cta::admin;

770
771
   auto &scn = getRequired(OptionString::STORAGE_CLASS);
   auto &cn  = getRequired(OptionUInt64::COPY_NUMBER);
772

773
   m_catalogue.deleteArchiveRoute(scn, cn);
774
775
776
777
778
779

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



780
void RequestMessage::processArchiveRoute_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
781
{
782
  using namespace cta::admin;
783

784
785
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveRouteLsStream(*this, m_catalogue, m_scheduler);
786

787
788
  response.set_show_header(HeaderType::ARCHIVEROUTE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
789
790
791
792
}



793
void RequestMessage::processDrive_Up(cta::xrd::Response &response)
794
795
796
{
   using namespace cta::admin;

797
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Up);
798

799
   response.set_message_txt(cmdlineOutput);
800
801
802
803
804
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



805
void RequestMessage::processDrive_Down(cta::xrd::Response &response)
806
807
808
{
   using namespace cta::admin;

809
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', Down);
810

811
   response.set_message_txt(cmdlineOutput);
812
813
814
815
816
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



817
void RequestMessage::processDrive_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
818
{
819
  using namespace cta::admin;
820

821
822
  // Create a XrdSsi stream object to return the results
  stream = new DriveLsStream(*this, m_catalogue, m_scheduler, m_cliIdentity, m_lc);
823

824
825
  response.set_show_header(HeaderType::DRIVE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
826
827
828
829
}



830
void RequestMessage::processDrive_Rm(cta::xrd::Response &response)
831
832
833
834
835
836
{
   using namespace cta::admin;

   std::stringstream cmdlineOutput;

   auto regex = getRequired(OptionString::DRIVE);
837
   regex = '^' + regex + '$';
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
   cta::utils::Regex driveNameRegex(regex.c_str());
   auto driveStates = m_scheduler.getDriveStates(m_cliIdentity, m_lc);
   bool drivesFound = false;
   for(auto driveState: driveStates)
   {
      const auto regexResult = driveNameRegex.exec(driveState.driveName);
      if(!regexResult.empty())
      {
         if(driveState<