XrdSsiCtaRequestMessage.cpp 78.1 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include "common/utils/Regex.hpp"
23
#include <cmdline/CtaAdminCmdParse.hpp>
24
#include "XrdSsiCtaRequestMessage.hpp"
25
#include "XrdCtaAdminLs.hpp"
26
#include "XrdCtaArchiveFileLs.hpp"
27
#include "XrdCtaArchiveRouteLs.hpp"
28
#include "XrdCtaDriveLs.hpp"
29
#include "XrdCtaFailedRequestLs.hpp"
30
#include "XrdCtaGroupMountRuleLs.hpp"
31
#include "XrdCtaListPendingQueue.hpp"
32
#include "XrdCtaLogicalLibraryLs.hpp"
33
#include "XrdCtaMountPolicyLs.hpp"
34
#include "XrdCtaMediaTypeLs.hpp"
35
#include "XrdCtaRepackLs.hpp"
36
#include "XrdCtaRequesterMountRuleLs.hpp"
37
#include "XrdCtaShowQueues.hpp"
38
#include "XrdCtaTapeLs.hpp"
39
#include "XrdCtaTapeFileLs.hpp"
40
#include "XrdCtaStorageClassLs.hpp"
41
#include "XrdCtaTapePoolLs.hpp"
42
#include "XrdCtaDiskSystemLs.hpp"
43
#include "XrdCtaVirtualOrganizationLs.hpp"
44
#include "XrdCtaVersion.hpp"
45

46
47
48
#include <limits>
#include <sstream>

49
50
namespace cta {
namespace xrd {
51

52
53
54
55
56
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


57
58
59
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
60
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
61
62
63
64
   return (cmd << 16) + subcmd;
}


65
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
66
67
{
   // Branch on the Request payload type
68

69
70
71
   switch(request.request_case())
   {
      using namespace cta::xrd;
72

73
      case Request::kAdmincmd: {
74
        
75
76
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
77
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
78
79
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
80
         
81
82
         cta::utils::Timer t;

83
84
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
85
86
87
88
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
89
90
91
92
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

93
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
94
               processAdmin_Add(response);
95
96
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
97
               processAdmin_Ch(response);
98
99
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
100
               processAdmin_Rm(response);
101
102
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
103
               processAdmin_Ls(response, stream);
104
105
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
106
               processArchiveFile_Ls(response, stream);
107
108
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
109
               processArchiveRoute_Add(response);
110
111
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
112
               processArchiveRoute_Ch(response);
113
114
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
115
               processArchiveRoute_Rm(response);
116
117
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
118
               processArchiveRoute_Ls(response, stream);
119
120
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
121
               processDrive_Up(response);
122
123
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
124
               processDrive_Down(response);
125
               break;
126
127
128
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
129
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
130
               processDrive_Ls(response, stream);
131
               break;
132
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
133
               processDrive_Rm(response);
134
               break;
135
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
136
               processFailedRequest_Ls(response, stream);
137
               break;
138
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
139
               processGroupMountRule_Add(response);
140
141
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
142
               processGroupMountRule_Ch(response);
143
144
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
145
               processGroupMountRule_Rm(response);
146
               break;
147
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
148
               processGroupMountRule_Ls(response, stream);
149
150
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
151
               processListPendingArchives(response, stream);
152
153
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
154
               processListPendingRetrieves(response, stream);
155
156
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
157
               processLogicalLibrary_Add(response);
158
159
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
160
               processLogicalLibrary_Ch(response);
161
162
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
163
               processLogicalLibrary_Rm(response);
164
165
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
166
               processLogicalLibrary_Ls(response, stream);
167
               break;
168
169
170
171
172
173
174
175
176
177
178
179
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_ADD):
               processMediaType_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_CH):
               processMediaType_Ch(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_RM):
               processMediaType_Rm(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_LS):
               processMediaType_Ls(response, stream);
               break;
180
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
181
               processMountPolicy_Add(response);
182
183
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
184
               processMountPolicy_Ch(response);
185
186
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
187
               processMountPolicy_Rm(response);
188
189
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
190
               processMountPolicy_Ls(response, stream);
191
192
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
193
               processRepack_Add(response);
194
195
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
196
               processRepack_Rm(response);
197
198
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
199
               processRepack_Ls(response, stream);
200
201
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
202
               processRepack_Err(response);
203
204
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
205
               processRequesterMountRule_Add(response);
206
207
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
208
               processRequesterMountRule_Ch(response);
209
210
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
211
               processRequesterMountRule_Rm(response);
212
213
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
214
               processRequesterMountRule_Ls(response, stream);
215
216
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
217
               processShowQueues(response, stream);
218
219
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
220
               processStorageClass_Add(response);
221
222
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
223
               processStorageClass_Ch(response);
224
225
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
226
               processStorageClass_Rm(response);
227
228
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
229
               processStorageClass_Ls(response, stream);
230
231
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
232
               processTape_Add(response);
233
234
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
235
               processTape_Ch(response);
236
237
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
238
               processTape_Rm(response);
239
240
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
241
               processTape_Reclaim(response);
242
243
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
244
               processTape_Ls(response, stream);
245
246
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
247
               processTape_Label(response);
248
               break;
249
250
251
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
252
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
253
               processTapePool_Add(response);
254
255
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
256
               processTapePool_Ch(response);
257
258
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
259
               processTapePool_Rm(response);
260
261
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
262
               processTapePool_Ls(response, stream);
263
               break;
264
265
266
267
268
269
270
271
272
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
273
274
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
275
276
277
278
279
280
281
282
283
284
285
286
287
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
288
289
290
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
291
292
293
294
295
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
296
297
298
299
300
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
301
         break;
302

303
      case Request::kNotification:
304
         // Validate that instance name in key used to authenticate matches instance name in Protocol buffer
305
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
306
307
308
309
310
311
312
313
314
315
316
317
318
            // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators
            // to use a command line tool to resubmit failed archive or prepare requests. This is NOT
            // permitted for DELETE events as we don't want files removed from the catalogue to be left
            // in the EOS namespace.
            if(m_protocol == Protocol::KRB5 &&
               (request.notification().wf().event() == cta::eos::Workflow::CLOSEW ||
                request.notification().wf().event() == cta::eos::Workflow::PREPARE)) {
               m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
               m_cliIdentity.username = request.notification().wf().instance().name();
            } else {
               throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                                 "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
            }
319
320
         }

321
322
323
324
325
326
327
328
         // Enforce that the long EOS instance name starts with the string "eos"
         if (request.notification().wf().instance().name().find("eos") != 0) {
           std::ostringstream msg;
           msg << "Instance name does not start with eos: instance=" <<
             request.notification().wf().instance().name();
           throw PbException(msg.str());
         }

329
330
         // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/
         {
331
332
333
334
335
336
           const std::string shortInstanceName = request.notification().wf().instance().name().substr(3);
           if(shortInstanceName.empty()) {
             throw PbException("Instance name only contains the string eos");
           }
           const std::string procFullPath = std::string("/eos/") + shortInstanceName + "/proc/";
           if(request.notification().file().lpath().find(procFullPath) == 0) {
337
             std::ostringstream msg;
338
             msg << "Cannot process a workflow event for a file in " << procFullPath << " instance=" <<
339
340
341
342
               request.notification().wf().instance().name() << " event=" <<
               Workflow_EventType_Name(request.notification().wf().event()) << " lpath=" <<
               request.notification().file().lpath();
             throw PbException(msg.str());
343
344
345
           }
         }

346
         // Map the Workflow Event to a method
347
348
349
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

350
351
352
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
353
354
355
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
356
357
358
359
360
361
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
362
363
364
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
365
366
367
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
368
369
370
            case Workflow::UPDATE_FID:
               processUPDATE_FID (request.notification(), response);
               break;
371

372
373
374
375
376
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
377
         break;
378

379
380
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
381
382

      default:
383
384
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
385
386
387
388
389
   }
}



390
391
// EOS Workflow commands

392
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
393
394
395
396
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
397
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
398
399
400
401
402
403

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

404
405


406
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
407
{
408
   // Validate received protobuf
409
410
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
411

412
   // Unpack message
413
414
415
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
416

417
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
418
   if(notification.file().xattr().end() == storageClassItor) {
419
420
421
422
423
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
424
425
   }
   const std::string storageClass = storageClassItor->second;
426
   if(storageClass.empty()) {
427
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
428
   }
429
430

   cta::utils::Timer t;
431
   uint64_t archiveFileId;
432

433
434
435
436
437
438
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
439

440
441
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
442
443
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
444
         .add("fileId", archiveFileId)
445
         .add("schedulerTime", t.secs());
446
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
447

448
   // Set ArchiveFileId in xattrs
449
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
450
451
   
   // Set the storage class in xattrs
452
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
453
454
455
456
457

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

458
459


460
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
461
{
462
   // Validate received protobuf
463
464
465
466
467
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
468

469
   // Unpack message
470
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
471
   if(notification.file().xattr().end() == storageClassItor) {
472
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
473
474
   }

475
   // For testing: this storage class will always fail
476
477
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
478
479
   }

480
481
482
483
484
485
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

486
   cta::common::dataStructures::ArchiveRequest request;
487
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
488
489
490
491
492
493
494
495
496
497
498
499
500
501
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
502

503
504
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
505
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
506
   if(notification.file().xattr().end() == archiveFileIdItor) {
507
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
508
509
510
511
512
513
514
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
515

516
517
   cta::utils::Timer t;

518
519
520
521
522
523
524
525
526
527
528
529
530
531
   cta::log::ScopedParamContainer params(m_lc);
   std::string logMessage = "In RequestMessage::processCLOSEW(): ";
   if(request.fileSize > 0) {
     // Queue the request
     std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
     logMessage += "queued file for archive.";
     params.add("schedulerTime", t.secs());
     params.add("archiveRequestId", archiveRequestAddr);

     // Add archive request reference to response as an extended attribute
     response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
   } else {
     logMessage += "ignoring zero-length file.";
   }
532

533
   // Create a log entry
534
535
   params.add("fileId", archiveFileId);
   params.add("requesterInstance", notification.wf().requester_instance());
536
   m_lc.log(cta::log::INFO, logMessage);
537

538
   // Set response type
539
540
541
542
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}


543
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
544
{
545
   // Validate received protobuf
546
547
548
549
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
550

551
   // Unpack message
552
   cta::common::dataStructures::RetrieveRequest request;
553
554
555
556
557
558
559
560
561
562
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
563

564
565
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
566
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
567
   if(notification.file().xattr().end() == archiveFileIdItor) {
568
     // Fall back to the old xattr format
569
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
570
571
572
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
573
574
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
575
576
577
578
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
579
580
   
   // Activity value is a string. The parameter might be present or not.
581
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
582
     request.activity = notification.file().xattr().at("activity");
583
   }
584

585
586
   cta::utils::Timer t;

587
   // Queue the request
588
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
589

590
591
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
592
593
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
594
595
596
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
597
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
598

599
   // Set response type and add retrieve request reference as an extended attribute.
600
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
601
602
603
604
605
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



606
607
608
609
610
611
612
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
613
   cta::common::dataStructures::CancelRetrieveRequest request;
614
615
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
616
617
618

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
619
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
620
   if(notification.file().xattr().end() == archiveFileIdItor) {
621
     // Fall back to the old xattr format
622
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
623
624
625
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
626
627
628
629
630
631
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
632
633
   
   // The request Id should be stored as an extended attribute
634
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
635
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
636
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
637
   }
638
639
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
640
641

   // Queue the request
642
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
643

644
645
   cta::utils::Timer t;

646
647
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
648
649
650
651
652
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
653

654
   // Set response type and remove reference to retrieve request in EOS extended attributes.
655
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
656
657
658
659
660
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



661
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
662
{
663
   // Validate received protobuf
664
665
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
666
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
667

668
   // Unpack message
669
   cta::common::dataStructures::DeleteArchiveRequest request;
670
671
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
672
   
673
674
675
676
677
   std::string lpath         = notification.file().lpath();
   uint64_t diskFileId       = notification.file().fid();
   request.diskFilePath          = lpath;
   request.diskFileId = std::to_string(diskFileId);
   request.diskInstance = m_cliIdentity.username;
678
679
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
680
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
681
   if(notification.file().xattr().end() == archiveFileIdItor) {
682
     // Fall back to the old xattr format
683
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
684
685
686
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
687
688
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
689
690
691
692
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
693
694
695
696
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
697
698
699
700
     std::string objectstoreAddress = archiveRequestAddrItor->second;
     if(!objectstoreAddress.empty()){
      request.address = archiveRequestAddrItor->second;
     }
701
   }
702

703
   // Delete the file from the catalogue or from the objectstore if archive request is created
704
   cta::utils::Timer t;
705
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
706
707
708

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
709
710
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
711
         .add("filePath",request.diskFilePath)
712
         .add("schedulerTime", t.secs());
713
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
714
715
716
717
718

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

719
720


721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
void RequestMessage::processUPDATE_FID(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.file().lpath(),  "notification.file.lpath");

   // Unpack message
   const std::string &diskInstance = m_cliIdentity.username;
   const std::string &diskFilePath = notification.file().lpath();
   const std::string diskFileId = std::to_string(notification.file().fid());

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
   if(notification.file().xattr().end() == archiveFileIdItor) {
     // Fall back to the old xattr format
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10);
   if(0 == archiveFileId) {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
   
   // Update the disk file ID
   cta::utils::Timer t;
   m_catalogue.updateDiskFileId(archiveFileId, diskInstance, diskFileId);

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
   params.add("fileId", archiveFileId)
         .add("schedulerTime", t.secs())
         .add("diskInstance", diskInstance)
         .add("diskFilePath", diskFilePath)
         .add("diskFileId", diskFileId);
   m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID.");

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



766
767
// Admin commands

768
769
770
771
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

772
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
773
774
775

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
776
777
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
778
779
780
781
782
783
784
785
786
787
788
789
790
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
791
   params.add("adminTime", t.secs());
792
793
794
795
796
   m_lc.log(cta::log::INFO, log_msg);
}



797
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
798
799
800
{
   using namespace cta::admin;

801
802
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
803
804

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
805
806
807
808
809
810

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



811
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
812
813
814
{
   using namespace cta::admin;

815
816
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
817
818

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
819
820
821
822
823
824

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



825
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
826
827
828
{
   using namespace cta::admin;

829
   auto &username = getRequired(OptionString::USERNAME);
830
831

   m_catalogue.deleteAdminUser(username);
832
833
834
835
836
837

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



838
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
839
{
840
  using namespace cta::admin;
841

842
843
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
844

845
  response.set_show_header(HeaderType::ADMIN_LS);
846
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
847
848
849
850
}



851
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
852
{
853
  using namespace cta::admin;
854

855
856
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
857

858
859
860
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
861

862
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
863
864
865
866
}



867
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
868
869
870
{
   using namespace cta::admin;

871
872
873
874
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
875

876
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
877

878
879
880
881
882
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



883
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)
884
885
886
{
   using namespace cta::admin;

887
888
889
890
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto  tapepool = getOptional(OptionString::TAPE_POOL);
   auto  comment