XrdSsiCtaRequestMessage.cpp 77.4 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include "common/utils/Regex.hpp"
23
#include <cmdline/CtaAdminCmdParse.hpp>
24
#include "XrdSsiCtaRequestMessage.hpp"
25
#include "XrdCtaAdminLs.hpp"
26
#include "XrdCtaArchiveFileLs.hpp"
27
#include "XrdCtaArchiveRouteLs.hpp"
28
#include "XrdCtaDriveLs.hpp"
29
#include "XrdCtaFailedRequestLs.hpp"
30
#include "XrdCtaGroupMountRuleLs.hpp"
31
#include "XrdCtaListPendingQueue.hpp"
32
#include "XrdCtaLogicalLibraryLs.hpp"
33
#include "XrdCtaMountPolicyLs.hpp"
34
#include "XrdCtaMediaTypeLs.hpp"
35
#include "XrdCtaRepackLs.hpp"
36
#include "XrdCtaRequesterMountRuleLs.hpp"
37
#include "XrdCtaShowQueues.hpp"
38
#include "XrdCtaTapeLs.hpp"
39
#include "XrdCtaTapeFileLs.hpp"
40
#include "XrdCtaStorageClassLs.hpp"
41
#include "XrdCtaTapePoolLs.hpp"
42
#include "XrdCtaDiskSystemLs.hpp"
43
#include "XrdCtaVirtualOrganizationLs.hpp"
44
#include "XrdCtaVersion.hpp"
45

46
47
48
#include <limits>
#include <sstream>

49
50
namespace cta {
namespace xrd {
51

52
53
54
55
56
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


57
58
59
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
60
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
61
62
63
64
   return (cmd << 16) + subcmd;
}


65
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
66
67
{
   // Branch on the Request payload type
68

69
70
71
   switch(request.request_case())
   {
      using namespace cta::xrd;
72

73
      case Request::kAdmincmd: {
74
        
75
76
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
77
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
78
79
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
80
         
81
82
         cta::utils::Timer t;

83
84
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
85
86
87
88
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
89
90
91
92
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

93
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
94
               processAdmin_Add(response);
95
96
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
97
               processAdmin_Ch(response);
98
99
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
100
               processAdmin_Rm(response);
101
102
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
103
               processAdmin_Ls(response, stream);
104
105
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
106
               processArchiveFile_Ls(response, stream);
107
108
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
109
               processArchiveRoute_Add(response);
110
111
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
112
               processArchiveRoute_Ch(response);
113
114
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
115
               processArchiveRoute_Rm(response);
116
117
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
118
               processArchiveRoute_Ls(response, stream);
119
120
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
121
               processDrive_Up(response);
122
123
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
124
               processDrive_Down(response);
125
               break;
126
127
128
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
129
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
130
               processDrive_Ls(response, stream);
131
               break;
132
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
133
               processDrive_Rm(response);
134
               break;
135
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
136
               processFailedRequest_Ls(response, stream);
137
               break;
138
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
139
               processGroupMountRule_Add(response);
140
141
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
142
               processGroupMountRule_Ch(response);
143
144
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
145
               processGroupMountRule_Rm(response);
146
               break;
147
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
148
               processGroupMountRule_Ls(response, stream);
149
150
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
151
               processListPendingArchives(response, stream);
152
153
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
154
               processListPendingRetrieves(response, stream);
155
156
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
157
               processLogicalLibrary_Add(response);
158
159
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
160
               processLogicalLibrary_Ch(response);
161
162
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
163
               processLogicalLibrary_Rm(response);
164
165
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
166
               processLogicalLibrary_Ls(response, stream);
167
               break;
168
169
170
171
172
173
174
175
176
177
178
179
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_ADD):
               processMediaType_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_CH):
               processMediaType_Ch(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_RM):
               processMediaType_Rm(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_LS):
               processMediaType_Ls(response, stream);
               break;
180
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
181
               processMountPolicy_Add(response);
182
183
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
184
               processMountPolicy_Ch(response);
185
186
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
187
               processMountPolicy_Rm(response);
188
189
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
190
               processMountPolicy_Ls(response, stream);
191
192
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
193
               processRepack_Add(response);
194
195
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
196
               processRepack_Rm(response);
197
198
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
199
               processRepack_Ls(response, stream);
200
201
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
202
               processRepack_Err(response);
203
204
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
205
               processRequesterMountRule_Add(response);
206
207
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
208
               processRequesterMountRule_Ch(response);
209
210
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
211
               processRequesterMountRule_Rm(response);
212
213
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
214
               processRequesterMountRule_Ls(response, stream);
215
216
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
217
               processShowQueues(response, stream);
218
219
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
220
               processStorageClass_Add(response);
221
222
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
223
               processStorageClass_Ch(response);
224
225
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
226
               processStorageClass_Rm(response);
227
228
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
229
               processStorageClass_Ls(response, stream);
230
231
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
232
               processTape_Add(response);
233
234
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
235
               processTape_Ch(response);
236
237
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
238
               processTape_Rm(response);
239
240
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
241
               processTape_Reclaim(response);
242
243
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
244
               processTape_Ls(response, stream);
245
246
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
247
               processTape_Label(response);
248
               break;
249
250
251
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
252
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
253
               processTapePool_Add(response);
254
255
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
256
               processTapePool_Ch(response);
257
258
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
259
               processTapePool_Rm(response);
260
261
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
262
               processTapePool_Ls(response, stream);
263
               break;
264
265
266
267
268
269
270
271
272
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
273
274
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
275
276
277
278
279
280
281
282
283
284
285
286
287
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
288
289
290
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
291
292
293
294
295
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
296
297
298
299
300
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
301
         break;
302

303
      case Request::kNotification:
304
         // Validate that instance name in key used to authenticate matches instance name in Protocol buffer
305
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
306
307
308
309
310
311
312
313
314
315
316
317
318
            // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators
            // to use a command line tool to resubmit failed archive or prepare requests. This is NOT
            // permitted for DELETE events as we don't want files removed from the catalogue to be left
            // in the EOS namespace.
            if(m_protocol == Protocol::KRB5 &&
               (request.notification().wf().event() == cta::eos::Workflow::CLOSEW ||
                request.notification().wf().event() == cta::eos::Workflow::PREPARE)) {
               m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
               m_cliIdentity.username = request.notification().wf().instance().name();
            } else {
               throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                                 "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
            }
319
320
         }

321
322
323
324
         // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/
         {
           static const utils::Regex re("^/eos/[^/]+/proc/.*");
           if(re.exec(request.notification().file().lpath()).size()) {
325
326
327
328
329
330
             std::ostringstream msg;
             msg << "Cannot process a workflow event for a file in /eos/INSTANCE/proc/: instance=" <<
               request.notification().wf().instance().name() << " event=" <<
               Workflow_EventType_Name(request.notification().wf().event()) << " lpath=" <<
               request.notification().file().lpath();
             throw PbException(msg.str());
331
332
333
           }
         }

334
         // Map the Workflow Event to a method
335
336
337
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

338
339
340
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
341
342
343
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
344
345
346
347
348
349
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
350
351
352
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
353
354
355
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
356
357
358
            case Workflow::UPDATE_FID:
               processUPDATE_FID (request.notification(), response);
               break;
359

360
361
362
363
364
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
365
         break;
366

367
368
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
369
370

      default:
371
372
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
373
374
375
376
377
   }
}



378
379
// EOS Workflow commands

380
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
381
382
383
384
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
385
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
386
387
388
389
390
391

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

392
393


394
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
395
{
396
   // Validate received protobuf
397
398
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
399

400
   // Unpack message
401
402
403
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
404

405
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
406
   if(notification.file().xattr().end() == storageClassItor) {
407
408
409
410
411
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
412
413
   }
   const std::string storageClass = storageClassItor->second;
414
   if(storageClass.empty()) {
415
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
416
   }
417
418

   cta::utils::Timer t;
419
   uint64_t archiveFileId;
420

421
422
423
424
425
426
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
427

428
429
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
430
431
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
432
         .add("fileId", archiveFileId)
433
         .add("schedulerTime", t.secs());
434
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
435

436
   // Set ArchiveFileId in xattrs
437
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
438
439
   
   // Set the storage class in xattrs
440
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
441
442
443
444
445

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

446
447


448
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
449
{
450
   // Validate received protobuf
451
452
453
454
455
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
456

457
   // Unpack message
458
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
459
   if(notification.file().xattr().end() == storageClassItor) {
460
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
461
462
   }

463
   // For testing: this storage class will always fail
464
465
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
466
467
   }

468
469
470
471
472
473
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

474
   cta::common::dataStructures::ArchiveRequest request;
475
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
476
477
478
479
480
481
482
483
484
485
486
487
488
489
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
490

491
492
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
493
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
494
   if(notification.file().xattr().end() == archiveFileIdItor) {
495
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
496
497
498
499
500
501
502
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
503

504
505
   cta::utils::Timer t;

506
507
508
509
510
511
512
513
514
515
516
517
518
519
   cta::log::ScopedParamContainer params(m_lc);
   std::string logMessage = "In RequestMessage::processCLOSEW(): ";
   if(request.fileSize > 0) {
     // Queue the request
     std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
     logMessage += "queued file for archive.";
     params.add("schedulerTime", t.secs());
     params.add("archiveRequestId", archiveRequestAddr);

     // Add archive request reference to response as an extended attribute
     response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
   } else {
     logMessage += "ignoring zero-length file.";
   }
520

521
   // Create a log entry
522
523
   params.add("fileId", archiveFileId);
   params.add("requesterInstance", notification.wf().requester_instance());
524
   m_lc.log(cta::log::INFO, logMessage);
525

526
   // Set response type
527
528
529
530
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}


531
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
532
{
533
   // Validate received protobuf
534
535
536
537
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
538

539
   // Unpack message
540
   cta::common::dataStructures::RetrieveRequest request;
541
542
543
544
545
546
547
548
549
550
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
551

552
553
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
554
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
555
   if(notification.file().xattr().end() == archiveFileIdItor) {
556
     // Fall back to the old xattr format
557
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
558
559
560
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
561
562
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
563
564
565
566
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
567
568
   
   // Activity value is a string. The parameter might be present or not.
569
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
570
     request.activity = notification.file().xattr().at("activity");
571
   }
572

573
574
   cta::utils::Timer t;

575
   // Queue the request
576
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
577

578
579
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
580
581
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
582
583
584
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
585
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
586

587
   // Set response type and add retrieve request reference as an extended attribute.
588
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
589
590
591
592
593
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



594
595
596
597
598
599
600
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
601
   cta::common::dataStructures::CancelRetrieveRequest request;
602
603
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
604
605
606

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
607
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
608
   if(notification.file().xattr().end() == archiveFileIdItor) {
609
     // Fall back to the old xattr format
610
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
611
612
613
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
614
615
616
617
618
619
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
620
621
   
   // The request Id should be stored as an extended attribute
622
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
623
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
624
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
625
   }
626
627
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
628
629

   // Queue the request
630
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
631

632
633
   cta::utils::Timer t;

634
635
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
636
637
638
639
640
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
641

642
   // Set response type and remove reference to retrieve request in EOS extended attributes.
643
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
644
645
646
647
648
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



649
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
650
{
651
   // Validate received protobuf
652
653
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
654
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
655

656
   // Unpack message
657
   cta::common::dataStructures::DeleteArchiveRequest request;
658
659
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
660
   
661
662
663
664
665
   std::string lpath         = notification.file().lpath();
   uint64_t diskFileId       = notification.file().fid();
   request.diskFilePath          = lpath;
   request.diskFileId = std::to_string(diskFileId);
   request.diskInstance = m_cliIdentity.username;
666
667
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
668
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
669
   if(notification.file().xattr().end() == archiveFileIdItor) {
670
     // Fall back to the old xattr format
671
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
672
673
674
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
675
676
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
677
678
679
680
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
681
682
683
684
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
685
686
687
688
     std::string objectstoreAddress = archiveRequestAddrItor->second;
     if(!objectstoreAddress.empty()){
      request.address = archiveRequestAddrItor->second;
     }
689
   }
690

691
   // Delete the file from the catalogue or from the objectstore if archive request is created
692
   cta::utils::Timer t;
693
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
694
695
696

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
697
698
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
699
         .add("filePath",request.diskFilePath)
700
         .add("schedulerTime", t.secs());
701
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
702
703
704
705
706

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

707
708


709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
void RequestMessage::processUPDATE_FID(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.file().lpath(),  "notification.file.lpath");

   // Unpack message
   const std::string &diskInstance = m_cliIdentity.username;
   const std::string &diskFilePath = notification.file().lpath();
   const std::string diskFileId = std::to_string(notification.file().fid());

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
   if(notification.file().xattr().end() == archiveFileIdItor) {
     // Fall back to the old xattr format
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10);
   if(0 == archiveFileId) {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
   
   // Update the disk file ID
   cta::utils::Timer t;
   m_catalogue.updateDiskFileId(archiveFileId, diskInstance, diskFileId);

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
   params.add("fileId", archiveFileId)
         .add("schedulerTime", t.secs())
         .add("diskInstance", diskInstance)
         .add("diskFilePath", diskFilePath)
         .add("diskFileId", diskFileId);
   m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID.");

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



754
755
// Admin commands

756
757
758
759
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

760
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
761
762
763

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
764
765
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
766
767
768
769
770
771
772
773
774
775
776
777
778
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
779
   params.add("adminTime", t.secs());
780
781
782
783
784
   m_lc.log(cta::log::INFO, log_msg);
}



785
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
786
787
788
{
   using namespace cta::admin;

789
790
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
791
792

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
793
794
795
796
797
798

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



799
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
800
801
802
{
   using namespace cta::admin;

803
804
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
805
806

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
807
808
809
810
811
812

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



813
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
814
815
816
{
   using namespace cta::admin;

817
   auto &username = getRequired(OptionString::USERNAME);
818
819

   m_catalogue.deleteAdminUser(username);
820
821
822
823
824
825

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



826
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
827
{
828
  using namespace cta::admin;
829

830
831
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
832

833
  response.set_show_header(HeaderType::ADMIN_LS);
834
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
835
836
837
838
}



839
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
840
{
841
  using namespace cta::admin;
842

843
844
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
845

846
847
848
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
849

850
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
851
852
853
854
}



855
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
856
857
858
{
   using namespace cta::admin;

859
860
861
862
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
863

864
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
865

866
867
868
869
870
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



871
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
872
873
874
{
   using namespace cta::admin;

875
876
877
878
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
879

880
   if(comment) {
881
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, scn, cn, comment.value());
882
   }
883
   if(tapepool) {
884
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, scn, cn, ta