XrdSsiCtaRequestMessage.cpp 67.7 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include <cmdline/CtaAdminCmdParse.hpp>
23
#include "XrdSsiCtaRequestMessage.hpp"
24
#include "XrdCtaAdminLs.hpp"
25
#include "XrdCtaArchiveFileLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaRepackLs.hpp"
34
#include "XrdCtaRequesterMountRuleLs.hpp"
35
#include "XrdCtaShowQueues.hpp"
36
#include "XrdCtaTapeLs.hpp"
37
#include "XrdCtaTapeFileLs.hpp"
38
#include "XrdCtaStorageClassLs.hpp"
39
#include "XrdCtaTapePoolLs.hpp"
40
#include "XrdCtaDiskSystemLs.hpp"
41
#include "XrdCtaVirtualOrganizationLs.hpp"
42
#include "XrdCtaVersion.hpp"
43

44
45
namespace cta {
namespace xrd {
46

47
48
49
50
51
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


52
53
54
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
55
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
56
57
58
59
   return (cmd << 16) + subcmd;
}


60
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
61
62
{
   // Branch on the Request payload type
63

64
65
66
   switch(request.request_case())
   {
      using namespace cta::xrd;
67

68
      case Request::kAdmincmd: {
69
        
70
71
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
72
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
73
74
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
75
         
76
77
         cta::utils::Timer t;

78
79
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
80
81
82
83
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
84
85
86
87
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

88
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
89
               processAdmin_Add(response);
90
91
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
92
               processAdmin_Ch(response);
93
94
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
95
               processAdmin_Rm(response);
96
97
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
98
               processAdmin_Ls(response, stream);
99
100
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
101
               processArchiveFile_Ls(response, stream);
102
103
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
104
               processArchiveRoute_Add(response);
105
106
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
107
               processArchiveRoute_Ch(response);
108
109
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
110
               processArchiveRoute_Rm(response);
111
112
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
113
               processArchiveRoute_Ls(response, stream);
114
115
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
116
               processDrive_Up(response);
117
118
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
119
               processDrive_Down(response);
120
               break;
121
122
123
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
124
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
125
               processDrive_Ls(response, stream);
126
               break;
127
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
128
               processDrive_Rm(response);
129
               break;
130
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
131
               processFailedRequest_Ls(response, stream);
132
               break;
133
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
134
               processGroupMountRule_Add(response);
135
136
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
137
               processGroupMountRule_Ch(response);
138
139
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
140
               processGroupMountRule_Rm(response);
141
               break;
142
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
143
               processGroupMountRule_Ls(response, stream);
144
145
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
146
               processListPendingArchives(response, stream);
147
148
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
149
               processListPendingRetrieves(response, stream);
150
151
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
152
               processLogicalLibrary_Add(response);
153
154
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
155
               processLogicalLibrary_Ch(response);
156
157
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
158
               processLogicalLibrary_Rm(response);
159
160
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
161
               processLogicalLibrary_Ls(response, stream);
162
163
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
164
               processMountPolicy_Add(response);
165
166
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
167
               processMountPolicy_Ch(response);
168
169
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
170
               processMountPolicy_Rm(response);
171
172
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
173
               processMountPolicy_Ls(response, stream);
174
175
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
176
               processRepack_Add(response);
177
178
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
179
               processRepack_Rm(response);
180
181
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
182
               processRepack_Ls(response, stream);
183
184
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
185
               processRepack_Err(response);
186
187
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
188
               processRequesterMountRule_Add(response);
189
190
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
191
               processRequesterMountRule_Ch(response);
192
193
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
194
               processRequesterMountRule_Rm(response);
195
196
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
197
               processRequesterMountRule_Ls(response, stream);
198
199
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
200
               processShowQueues(response, stream);
201
202
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
203
               processStorageClass_Add(response);
204
205
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
206
               processStorageClass_Ch(response);
207
208
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
209
               processStorageClass_Rm(response);
210
211
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
212
               processStorageClass_Ls(response, stream);
213
214
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
215
               processTape_Add(response);
216
217
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
218
               processTape_Ch(response);
219
220
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
221
               processTape_Rm(response);
222
223
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
224
               processTape_Reclaim(response);
225
226
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
227
               processTape_Ls(response, stream);
228
229
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
230
               processTape_Label(response);
231
               break;
232
233
234
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
235
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
236
               processTapePool_Add(response);
237
238
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
239
               processTapePool_Ch(response);
240
241
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
242
               processTapePool_Rm(response);
243
244
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
245
               processTapePool_Ls(response, stream);
246
               break;
247
248
249
250
251
252
253
254
255
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
256
257
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
258
259
260
261
262
263
264
265
266
267
268
269
270
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
271
272
273
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
274
275
276
277
278
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
279
280
281
282
283
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
284
         break;
285

286
      case Request::kNotification:
287
288
289
290
291
292
         // Validate that instance name in SSS key and instance name in Protocol buffer match
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
            throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                              "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
         }

293
         // Map the Workflow Event to a method
294
295
296
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

297
298
299
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
300
301
302
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
303
304
305
306
307
308
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
309
310
311
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
312
313
314
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
315

316
317
318
319
320
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
321
         break;
322

323
324
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
325
326

      default:
327
328
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
329
330
331
332
333
   }
}



334
335
// EOS Workflow commands

336
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
337
338
339
340
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
341
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
342
343
344
345
346
347

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

348
349


350
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
351
{
352
   // Validate received protobuf
353
354
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
355

356
   // Unpack message
357
358
359
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
360

361
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
362
   if(notification.file().xattr().end() == storageClassItor) {
363
364
365
366
367
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
368
369
   }
   const std::string storageClass = storageClassItor->second;
370
   if(storageClass.empty()) {
371
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
372
   }
373
374

   cta::utils::Timer t;
375
   uint64_t archiveFileId;
376

377
378
379
380
381
382
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
383

384
385
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
386
387
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
388
         .add("fileId", archiveFileId)
389
         .add("schedulerTime", t.secs());
390
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
391

392
   // Set ArchiveFileId in xattrs
393
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
394
395
   
   // Set the storage class in xattrs
396
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
397
398
399
400
401

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

402
403


404
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
405
{
406
   // Validate received protobuf
407
408
409
410
411
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
412

413
   // Unpack message
414
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
415
   if(notification.file().xattr().end() == storageClassItor) {
416
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
417
418
   }

419
   // For testing: this storage class will always fail
420
421
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
422
423
   }

424
425
426
427
428
429
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

430
   cta::common::dataStructures::ArchiveRequest request;
431
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
432
433
434
435
436
437
438
439
440
441
442
443
444
445
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
446

447
448
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
449
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
450
   if(notification.file().xattr().end() == archiveFileIdItor) {
451
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
452
453
454
455
456
457
458
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
459

460
461
   cta::utils::Timer t;

462
   // Queue the request
463
   std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
464

465
466
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
467
468
469
   params.add("fileId", archiveFileId);
   params.add("schedulerTime", t.secs());
   params.add("requesterInstance", notification.wf().requester_instance());
470
   params.add("archiveRequestId",archiveRequestAddr);
471
   m_lc.log(cta::log::INFO, "In RequestMessage::processCLOSEW(): queued file for archive.");
472

473
   // Set response type and add archive request reference as an extended attribute.
474
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
475
476
477
478
479
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



480
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
481
{
482
   // Validate received protobuf
483
484
485
486
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
487

488
   // Unpack message
489
   cta::common::dataStructures::RetrieveRequest request;
490
491
492
493
494
495
496
497
498
499
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
500

501
502
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
503
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
504
   if(notification.file().xattr().end() == archiveFileIdItor) {
505
     // Fall back to the old xattr format
506
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
507
508
509
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
510
511
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
512
513
514
515
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
516
517
   
   // Activity value is a string. The parameter might be present or not.
518
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
519
     request.activity = notification.file().xattr().at("activity");
520
   }
521

522
523
   cta::utils::Timer t;

524
   // Queue the request
525
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
526

527
528
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
529
530
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
531
532
533
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
534
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
535

536
   // Set response type and add retrieve request reference as an extended attribute.
537
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
538
539
540
541
542
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



543
544
545
546
547
548
549
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
550
   cta::common::dataStructures::CancelRetrieveRequest request;
551
552
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
553
554
555

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
556
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
557
   if(notification.file().xattr().end() == archiveFileIdItor) {
558
     // Fall back to the old xattr format
559
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
560
561
562
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
563
564
565
566
567
568
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
569
570
   
   // The request Id should be stored as an extended attribute
571
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
572
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
573
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
574
   }
575
576
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
577
578

   // Queue the request
579
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
580

581
582
   cta::utils::Timer t;

583
584
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
585
586
587
588
589
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
590

591
   // Set response type and remove reference to retrieve request in EOS extended attributes.
592
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
593
594
595
596
597
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



598
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
599
{
600
   // Validate received protobuf
601
602
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
603

604
   // Unpack message
605
   cta::common::dataStructures::DeleteArchiveRequest request;
606
607
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
608
   
609
610
611
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t

612
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
613
   if(notification.file().xattr().end() == archiveFileIdItor) {
614
     // Fall back to the old xattr format
615
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
616
617
618
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
619
620
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
621
622
623
624
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
625
626
627
628
629
630
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
     request.address = archiveRequestAddrItor->second;
   }
631

632
   // Delete the file from the catalogue or from the objectstore if archive request is created
633
   cta::utils::Timer t;
634
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
635
636
637

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
638
639
640
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
         .add("schedulerTime", t.secs());
641
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
642
643
644
645
646

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

647
648


649
650
// Admin commands

651
652
653
654
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

655
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
656
657
658

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
659
660
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
661
662
663
664
665
666
667
668
669
670
671
672
673
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
674
   params.add("adminTime", t.secs());
675
676
677
678
679
   m_lc.log(cta::log::INFO, log_msg);
}



680
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
681
682
683
{
   using namespace cta::admin;

684
685
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
686
687

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
688
689
690
691
692
693

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



694
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
695
696
697
{
   using namespace cta::admin;

698
699
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
700
701

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
702
703
704
705
706
707

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



708
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
709
710
711
{
   using namespace cta::admin;

712
   auto &username = getRequired(OptionString::USERNAME);
713
714

   m_catalogue.deleteAdminUser(username);
715
716
717
718
719
720

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



721
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
722
{
723
  using namespace cta::admin;
724

725
726
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
727

728
  response.set_show_header(HeaderType::ADMIN_LS);
729
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
730
731
732
733
}



734
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
735
{
736
  using namespace cta::admin;
737

738
739
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
740

741
742
743
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
744

745
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
746
747
748
749
}



750
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
751
752
753
{
   using namespace cta::admin;

754
755
756
757
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
758

759
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
760

761
762
763
764
765
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



766
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
767
768
769
{
   using namespace cta::admin;

770
771
772
773
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
774

775
   if(comment) {
776
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, scn, cn, comment.value());
777
   }
778
   if(tapepool) {
779
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, scn, cn, tapepool.value());
780
   }
781
782
783
784
785
786

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



787
void RequestMessage::processArchiveRoute_Rm(cta::xrd::Response &response)
788
789
790
{
   using namespace cta::admin;

791
792
   auto &scn = getRequired(OptionString::STORAGE_CLASS);
   auto &cn  = getRequired(OptionUInt64::COPY_NUMBER);
793

794
   m_catalogue.deleteArchiveRoute(scn, cn);
795
796
797
798
799
800

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



801
void RequestMessage::processArchiveRoute_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
802
{
803
  using namespace cta::admin;
804

805
806
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveRouteLsStream(*this, m_catalogue, m_scheduler);
807

808
809
  response.set_show_header(HeaderType::ARCHIVEROUTE_LS);
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
810
811
812
813
}



814
void RequestMessage::processDrive_Up(cta::xrd::Response &response)
815
816
{
   using namespace cta::admin;
817
818
819
820
821
822
823
824
825
826
827
828
   
   cta::optional<std::string> reason = getOptional(OptionString::REASON);
   cta::common::dataStructures::DesiredDriveState desiredDS;
   desiredDS.up = true;
   desiredDS.forceDown = false;
   desiredDS.reason = reason;
   if(!desiredDS.reason){
     //If reason not provided while setting the drive up, we delete it, so we set it to an empty string
     desiredDS.reason = "";
   }
   
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', desiredDS);
829

830
   response.set_message_txt(cmdlineOutput);
831
832
833
834
835
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



836
void RequestMessage::processDrive_Down(cta::xrd::Response &response)
837
838
839
{
   using namespace cta::admin;

840
841
842
843
844
845
846
847
848
849
   std::string reason = getRequired(OptionString::REASON);
   if(utils::trimString(reason).empty()) {
     throw cta::exception::UserError("You must provide a reason in order to set the drive down");
   }
   cta::common::dataStructures::DesiredDriveState desiredDS;
   desiredDS.up = false;
   desiredDS.forceDown = has_flag(OptionBoolean::FORCE);
   desiredDS.reason = reason;
   
   std::string cmdlineOutput = setDriveState('^' + getRequired(OptionString::DRIVE) + '$', desiredDS);
850