XrdSsiCtaRequestMessage.cpp 75.2 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include <cmdline/CtaAdminCmdParse.hpp>
23
#include "XrdSsiCtaRequestMessage.hpp"
24
#include "XrdCtaAdminLs.hpp"
25
#include "XrdCtaArchiveFileLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaMediaTypeLs.hpp"
34
#include "XrdCtaRepackLs.hpp"
35
#include "XrdCtaRequesterMountRuleLs.hpp"
36
#include "XrdCtaShowQueues.hpp"
37
#include "XrdCtaTapeLs.hpp"
38
#include "XrdCtaTapeFileLs.hpp"
39
#include "XrdCtaStorageClassLs.hpp"
40
#include "XrdCtaTapePoolLs.hpp"
41
#include "XrdCtaDiskSystemLs.hpp"
42
#include "XrdCtaVirtualOrganizationLs.hpp"
43
#include "XrdCtaVersion.hpp"
44

45
46
47
#include <limits>
#include <sstream>

48
49
namespace cta {
namespace xrd {
50

51
52
53
54
55
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


56
57
58
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
59
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
60
61
62
63
   return (cmd << 16) + subcmd;
}


64
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
65
66
{
   // Branch on the Request payload type
67

68
69
70
   switch(request.request_case())
   {
      using namespace cta::xrd;
71

72
      case Request::kAdmincmd: {
73
        
74
75
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
76
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
77
78
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
79
         
80
81
         cta::utils::Timer t;

82
83
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
84
85
86
87
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
88
89
90
91
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

92
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
93
               processAdmin_Add(response);
94
95
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
96
               processAdmin_Ch(response);
97
98
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
99
               processAdmin_Rm(response);
100
101
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
102
               processAdmin_Ls(response, stream);
103
104
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
105
               processArchiveFile_Ls(response, stream);
106
107
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
108
               processArchiveRoute_Add(response);
109
110
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
111
               processArchiveRoute_Ch(response);
112
113
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
114
               processArchiveRoute_Rm(response);
115
116
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
117
               processArchiveRoute_Ls(response, stream);
118
119
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
120
               processDrive_Up(response);
121
122
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
123
               processDrive_Down(response);
124
               break;
125
126
127
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
128
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
129
               processDrive_Ls(response, stream);
130
               break;
131
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
132
               processDrive_Rm(response);
133
               break;
134
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
135
               processFailedRequest_Ls(response, stream);
136
               break;
137
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
138
               processGroupMountRule_Add(response);
139
140
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
141
               processGroupMountRule_Ch(response);
142
143
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
144
               processGroupMountRule_Rm(response);
145
               break;
146
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
147
               processGroupMountRule_Ls(response, stream);
148
149
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
150
               processListPendingArchives(response, stream);
151
152
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
153
               processListPendingRetrieves(response, stream);
154
155
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
156
               processLogicalLibrary_Add(response);
157
158
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
159
               processLogicalLibrary_Ch(response);
160
161
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
162
               processLogicalLibrary_Rm(response);
163
164
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
165
               processLogicalLibrary_Ls(response, stream);
166
               break;
167
168
169
170
171
172
173
174
175
176
177
178
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_ADD):
               processMediaType_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_CH):
               processMediaType_Ch(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_RM):
               processMediaType_Rm(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_LS):
               processMediaType_Ls(response, stream);
               break;
179
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
180
               processMountPolicy_Add(response);
181
182
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
183
               processMountPolicy_Ch(response);
184
185
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
186
               processMountPolicy_Rm(response);
187
188
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
189
               processMountPolicy_Ls(response, stream);
190
191
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
192
               processRepack_Add(response);
193
194
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
195
               processRepack_Rm(response);
196
197
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
198
               processRepack_Ls(response, stream);
199
200
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
201
               processRepack_Err(response);
202
203
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
204
               processRequesterMountRule_Add(response);
205
206
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
207
               processRequesterMountRule_Ch(response);
208
209
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
210
               processRequesterMountRule_Rm(response);
211
212
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
213
               processRequesterMountRule_Ls(response, stream);
214
215
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
216
               processShowQueues(response, stream);
217
218
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
219
               processStorageClass_Add(response);
220
221
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
222
               processStorageClass_Ch(response);
223
224
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
225
               processStorageClass_Rm(response);
226
227
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
228
               processStorageClass_Ls(response, stream);
229
230
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
231
               processTape_Add(response);
232
233
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
234
               processTape_Ch(response);
235
236
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
237
               processTape_Rm(response);
238
239
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
240
               processTape_Reclaim(response);
241
242
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
243
               processTape_Ls(response, stream);
244
245
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
246
               processTape_Label(response);
247
               break;
248
249
250
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
251
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
252
               processTapePool_Add(response);
253
254
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
255
               processTapePool_Ch(response);
256
257
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
258
               processTapePool_Rm(response);
259
260
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
261
               processTapePool_Ls(response, stream);
262
               break;
263
264
265
266
267
268
269
270
271
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
272
273
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
274
275
276
277
278
279
280
281
282
283
284
285
286
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
287
288
289
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
290
291
292
293
294
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
295
296
297
298
299
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
300
         break;
301

302
      case Request::kNotification:
303
         // Validate that instance name in key used to authenticate matches instance name in Protocol buffer
304
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
305
306
307
308
309
310
311
312
313
314
315
316
317
            // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators
            // to use a command line tool to resubmit failed archive or prepare requests. This is NOT
            // permitted for DELETE events as we don't want files removed from the catalogue to be left
            // in the EOS namespace.
            if(m_protocol == Protocol::KRB5 &&
               (request.notification().wf().event() == cta::eos::Workflow::CLOSEW ||
                request.notification().wf().event() == cta::eos::Workflow::PREPARE)) {
               m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
               m_cliIdentity.username = request.notification().wf().instance().name();
            } else {
               throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                                 "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
            }
318
319
         }

320
         // Map the Workflow Event to a method
321
322
323
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

324
325
326
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
327
328
329
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
330
331
332
333
334
335
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
336
337
338
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
339
340
341
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
342
343
344
            case Workflow::UPDATE_FID:
               processUPDATE_FID (request.notification(), response);
               break;
345

346
347
348
349
350
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
351
         break;
352

353
354
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
355
356

      default:
357
358
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
359
360
361
362
363
   }
}



364
365
// EOS Workflow commands

366
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
367
368
369
370
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
371
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
372
373
374
375
376
377

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

378
379


380
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
381
{
382
   // Validate received protobuf
383
384
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
385

386
   // Unpack message
387
388
389
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
390

391
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
392
   if(notification.file().xattr().end() == storageClassItor) {
393
394
395
396
397
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
398
399
   }
   const std::string storageClass = storageClassItor->second;
400
   if(storageClass.empty()) {
401
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
402
   }
403
404

   cta::utils::Timer t;
405
   uint64_t archiveFileId;
406

407
408
409
410
411
412
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
413

414
415
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
416
417
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
418
         .add("fileId", archiveFileId)
419
         .add("schedulerTime", t.secs());
420
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
421

422
   // Set ArchiveFileId in xattrs
423
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
424
425
   
   // Set the storage class in xattrs
426
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
427
428
429
430
431

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

432
433


434
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
435
{
436
   // Validate received protobuf
437
438
439
440
441
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
442

443
   // Unpack message
444
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
445
   if(notification.file().xattr().end() == storageClassItor) {
446
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
447
448
   }

449
   // For testing: this storage class will always fail
450
451
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
452
453
   }

454
455
456
457
458
459
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

460
   cta::common::dataStructures::ArchiveRequest request;
461
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
462
463
464
465
466
467
468
469
470
471
472
473
474
475
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
476

477
478
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
479
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
480
   if(notification.file().xattr().end() == archiveFileIdItor) {
481
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
482
483
484
485
486
487
488
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
489

490
491
   cta::utils::Timer t;

492
493
494
495
496
497
498
499
500
501
502
503
504
505
   cta::log::ScopedParamContainer params(m_lc);
   std::string logMessage = "In RequestMessage::processCLOSEW(): ";
   if(request.fileSize > 0) {
     // Queue the request
     std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
     logMessage += "queued file for archive.";
     params.add("schedulerTime", t.secs());
     params.add("archiveRequestId", archiveRequestAddr);

     // Add archive request reference to response as an extended attribute
     response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
   } else {
     logMessage += "ignoring zero-length file.";
   }
506

507
   // Create a log entry
508
509
   params.add("fileId", archiveFileId);
   params.add("requesterInstance", notification.wf().requester_instance());
510
   m_lc.log(cta::log::INFO, logMessage);
511

512
   // Set response type
513
514
515
516
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}


517
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
518
{
519
   // Validate received protobuf
520
521
522
523
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
524

525
   // Unpack message
526
   cta::common::dataStructures::RetrieveRequest request;
527
528
529
530
531
532
533
534
535
536
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
537

538
539
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
540
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
541
   if(notification.file().xattr().end() == archiveFileIdItor) {
542
     // Fall back to the old xattr format
543
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
544
545
546
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
547
548
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
549
550
551
552
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
553
554
   
   // Activity value is a string. The parameter might be present or not.
555
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
556
     request.activity = notification.file().xattr().at("activity");
557
   }
558

559
560
   cta::utils::Timer t;

561
   // Queue the request
562
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
563

564
565
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
566
567
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
568
569
570
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
571
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
572

573
   // Set response type and add retrieve request reference as an extended attribute.
574
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
575
576
577
578
579
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



580
581
582
583
584
585
586
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
587
   cta::common::dataStructures::CancelRetrieveRequest request;
588
589
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
590
591
592

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
593
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
594
   if(notification.file().xattr().end() == archiveFileIdItor) {
595
     // Fall back to the old xattr format
596
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
597
598
599
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
600
601
602
603
604
605
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
606
607
   
   // The request Id should be stored as an extended attribute
608
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
609
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
610
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
611
   }
612
613
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
614
615

   // Queue the request
616
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
617

618
619
   cta::utils::Timer t;

620
621
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
622
623
624
625
626
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
627

628
   // Set response type and remove reference to retrieve request in EOS extended attributes.
629
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
630
631
632
633
634
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



635
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
636
{
637
   // Validate received protobuf
638
639
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
640
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
641

642
   // Unpack message
643
   cta::common::dataStructures::DeleteArchiveRequest request;
644
645
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
646
   
647
648
649
650
651
   std::string lpath         = notification.file().lpath();
   uint64_t diskFileId       = notification.file().fid();
   request.diskFilePath          = lpath;
   request.diskFileId = std::to_string(diskFileId);
   request.diskInstance = m_cliIdentity.username;
652
653
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
654
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
655
   if(notification.file().xattr().end() == archiveFileIdItor) {
656
     // Fall back to the old xattr format
657
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
658
659
660
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
661
662
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
663
664
665
666
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
667
668
669
670
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
671
672
673
674
     std::string objectstoreAddress = archiveRequestAddrItor->second;
     if(!objectstoreAddress.empty()){
      request.address = archiveRequestAddrItor->second;
     }
675
   }
676

677
   // Delete the file from the catalogue or from the objectstore if archive request is created
678
   cta::utils::Timer t;
679
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
680
681
682

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
683
684
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
685
         .add("filePath",request.diskFilePath)
686
         .add("schedulerTime", t.secs());
687
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
688
689
690
691
692

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

693
694


695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
void RequestMessage::processUPDATE_FID(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.file().lpath(),  "notification.file.lpath");

   // Unpack message
   const std::string &diskInstance = m_cliIdentity.username;
   const std::string &diskFilePath = notification.file().lpath();
   const std::string diskFileId = std::to_string(notification.file().fid());

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
   if(notification.file().xattr().end() == archiveFileIdItor) {
     // Fall back to the old xattr format
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10);
   if(0 == archiveFileId) {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
   
   // Update the disk file ID
   cta::utils::Timer t;
   m_catalogue.updateDiskFileId(archiveFileId, diskInstance, diskFileId);

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
   params.add("fileId", archiveFileId)
         .add("schedulerTime", t.secs())
         .add("diskInstance", diskInstance)
         .add("diskFilePath", diskFilePath)
         .add("diskFileId", diskFileId);
   m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID.");

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



740
741
// Admin commands

742
743
744
745
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

746
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
747
748
749

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
750
751
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
752
753
754
755
756
757
758
759
760
761
762
763
764
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
765
   params.add("adminTime", t.secs());
766
767
768
769
770
   m_lc.log(cta::log::INFO, log_msg);
}



771
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
772
773
774
{
   using namespace cta::admin;

775
776
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
777
778

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
779
780
781
782
783
784

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



785
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
786
787
788
{
   using namespace cta::admin;

789
790
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
791
792

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
793
794
795
796
797
798

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



799
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
800
801
802
{
   using namespace cta::admin;

803
   auto &username = getRequired(OptionString::USERNAME);
804
805

   m_catalogue.deleteAdminUser(username);
806
807
808
809
810
811

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



812
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
813
{
814
  using namespace cta::admin;
815

816
817
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
818

819
  response.set_show_header(HeaderType::ADMIN_LS);
820
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
821
822
823
824
}



825
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
826
{
827
  using namespace cta::admin;
828

829
830
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
831

832
833
834
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
835

836
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
837
838
839
840
}



841
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
842
843
844
{
   using namespace cta::admin;

845
846
847
848
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
849

850
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
851

852
853
854
855
856
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



857
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
858
859
860
{
   using namespace cta::admin;

861
862
863
864
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment  = getOptional(OptionString::COMMENT);
865

866
   if(comment) {
867
      m_catalogue.modifyArchiveRouteComment(m_cliIdentity, scn, cn, comment.value());
868
   }
869
   if(tapepool) {
870
      m_catalogue.modifyArchiveRouteTapePoolName(m_cliIdentity, scn, cn, tapepool.value());
871
   }
872
873
874
875
876
877

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



878
void RequestMessage::processArchiveRoute_Rm(cta::xrd::Response &response)
879
880
881
{
   using namespace cta::admin;

882
883
   auto &scn = getRequired(OptionString::STORAGE_CLASS);
   auto &cn  = getRequired(