XrdSsiCtaRequestMessage.cpp 79.2 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include "common/utils/Regex.hpp"
23
#include <cmdline/CtaAdminCmdParse.hpp>
24
#include "XrdSsiCtaRequestMessage.hpp"
25
#include "XrdCtaAdminLs.hpp"
26
#include "XrdCtaArchiveRouteLs.hpp"
27
#include "XrdCtaDriveLs.hpp"
28
#include "XrdCtaFailedRequestLs.hpp"
29
#include "XrdCtaGroupMountRuleLs.hpp"
30
#include "XrdCtaListPendingQueue.hpp"
31
#include "XrdCtaLogicalLibraryLs.hpp"
32
#include "XrdCtaMountPolicyLs.hpp"
33
#include "XrdCtaMediaTypeLs.hpp"
34
#include "XrdCtaRepackLs.hpp"
35
#include "XrdCtaRequesterMountRuleLs.hpp"
36
#include "XrdCtaShowQueues.hpp"
37
#include "XrdCtaTapeLs.hpp"
38
#include "XrdCtaTapeFileLs.hpp"
39
#include "XrdCtaStorageClassLs.hpp"
40
#include "XrdCtaTapePoolLs.hpp"
41
#include "XrdCtaDiskSystemLs.hpp"
42
#include "XrdCtaVirtualOrganizationLs.hpp"
43
#include "XrdCtaVersion.hpp"
44
#include "XrdCtaSchedulingInfosLs.hpp"
45

46
47
48
#include <limits>
#include <sstream>

49
50
namespace cta {
namespace xrd {
51

52
53
54
55
56
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


57
58
59
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
60
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
61
62
63
64
   return (cmd << 16) + subcmd;
}


65
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
66
67
{
   // Branch on the Request payload type
68

69
70
71
   switch(request.request_case())
   {
      using namespace cta::xrd;
72

73
      case Request::kAdmincmd: {
74
        
75
76
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
77
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
78
79
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
80
         
81
82
         cta::utils::Timer t;

83
84
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
85
86
87
88
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
89
90
91
92
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

93
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
94
               processAdmin_Add(response);
95
96
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
97
               processAdmin_Ch(response);
98
99
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
100
               processAdmin_Rm(response);
101
102
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
103
               processAdmin_Ls(response, stream);
104
               break;
105
#if 0
106
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
107
               processArchiveFile_Ls(response, stream);
108
               break;
109
#endif
110
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
111
               processArchiveRoute_Add(response);
112
113
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
114
               processArchiveRoute_Ch(response);
115
116
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
117
               processArchiveRoute_Rm(response);
118
119
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
120
               processArchiveRoute_Ls(response, stream);
121
122
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
123
               processDrive_Up(response);
124
125
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
126
               processDrive_Down(response);
127
               break;
128
129
130
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
131
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
132
               processDrive_Ls(response, stream);
133
               break;
134
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
135
               processDrive_Rm(response);
136
               break;
137
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
138
               processFailedRequest_Ls(response, stream);
139
               break;
140
141
142
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_RM):
               processFailedRequest_Rm(response);
               break;
143
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
144
               processGroupMountRule_Add(response);
145
146
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
147
               processGroupMountRule_Ch(response);
148
149
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
150
               processGroupMountRule_Rm(response);
151
               break;
152
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
153
               processGroupMountRule_Ls(response, stream);
154
155
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
156
               processListPendingArchives(response, stream);
157
158
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
159
               processListPendingRetrieves(response, stream);
160
161
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
162
               processLogicalLibrary_Add(response);
163
164
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
165
               processLogicalLibrary_Ch(response);
166
167
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
168
               processLogicalLibrary_Rm(response);
169
170
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
171
               processLogicalLibrary_Ls(response, stream);
172
               break;
173
174
175
176
177
178
179
180
181
182
183
184
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_ADD):
               processMediaType_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_CH):
               processMediaType_Ch(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_RM):
               processMediaType_Rm(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_LS):
               processMediaType_Ls(response, stream);
               break;
185
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
186
               processMountPolicy_Add(response);
187
188
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
189
               processMountPolicy_Ch(response);
190
191
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
192
               processMountPolicy_Rm(response);
193
194
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
195
               processMountPolicy_Ls(response, stream);
196
197
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
198
               processRepack_Add(response);
199
200
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
201
               processRepack_Rm(response);
202
203
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
204
               processRepack_Ls(response, stream);
205
206
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
207
               processRepack_Err(response);
208
209
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
210
               processRequesterMountRule_Add(response);
211
212
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
213
               processRequesterMountRule_Ch(response);
214
215
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
216
               processRequesterMountRule_Rm(response);
217
218
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
219
               processRequesterMountRule_Ls(response, stream);
220
221
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
222
               processShowQueues(response, stream);
223
224
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
225
               processStorageClass_Add(response);
226
227
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
228
               processStorageClass_Ch(response);
229
230
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
231
               processStorageClass_Rm(response);
232
233
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
234
               processStorageClass_Ls(response, stream);
235
236
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
237
               processTape_Add(response);
238
239
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
240
               processTape_Ch(response);
241
242
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
243
               processTape_Rm(response);
244
245
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
246
               processTape_Reclaim(response);
247
248
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
249
               processTape_Ls(response, stream);
250
251
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
252
               processTape_Label(response);
253
               break;
254
255
256
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
257
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
258
               processTapePool_Add(response);
259
260
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
261
               processTapePool_Ch(response);
262
263
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
264
               processTapePool_Rm(response);
265
266
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
267
               processTapePool_Ls(response, stream);
268
               break;
269
270
271
272
273
274
275
276
277
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
278
279
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
280
281
282
283
284
285
286
287
288
289
290
291
292
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
293
294
295
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
296
297
298
           case cmd_pair(AdminCmd::CMD_SCHEDULINGINFOS, AdminCmd::SUBCMD_LS):
             processSchedulingInfos_Ls(response,stream);
             break;
299
300
301
302
303
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
304
305
306
307
308
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
309
         break;
310

311
      case Request::kNotification:
312
         // Validate that instance name in key used to authenticate matches instance name in Protocol buffer
313
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
314
315
316
317
318
319
320
321
322
323
324
325
326
            // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators
            // to use a command line tool to resubmit failed archive or prepare requests. This is NOT
            // permitted for DELETE events as we don't want files removed from the catalogue to be left
            // in the EOS namespace.
            if(m_protocol == Protocol::KRB5 &&
               (request.notification().wf().event() == cta::eos::Workflow::CLOSEW ||
                request.notification().wf().event() == cta::eos::Workflow::PREPARE)) {
               m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
               m_cliIdentity.username = request.notification().wf().instance().name();
            } else {
               throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                                 "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
            }
327
328
         }

329
330
         // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/
         {
331
332
333
334
           const std::string &longInstanceName = request.notification().wf().instance().name();
           const bool longInstanceNameStartsWithEos = 0 == longInstanceName.find("eos");
           const std::string shortInstanceName =
             longInstanceNameStartsWithEos ? longInstanceName.substr(3) : longInstanceName;
335
           if(shortInstanceName.empty()) {
336
             std::ostringstream msg;
337
             msg << "Short instance name is an empty string: instance=" << longInstanceName;
338
             throw PbException(msg.str());
339
340
341
           }
           const std::string procFullPath = std::string("/eos/") + shortInstanceName + "/proc/";
           if(request.notification().file().lpath().find(procFullPath) == 0) {
342
             std::ostringstream msg;
343
344
             msg << "Cannot process a workflow event for a file in " << procFullPath << " instance=" << longInstanceName
               << " event=" << Workflow_EventType_Name(request.notification().wf().event()) << " lpath=" <<
345
346
               request.notification().file().lpath();
             throw PbException(msg.str());
347
348
349
           }
         }

350
         // Map the Workflow Event to a method
351
352
353
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

354
355
356
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
357
358
359
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
360
361
362
363
364
365
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
366
367
368
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
369
370
371
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
372
373
374
            case Workflow::UPDATE_FID:
               processUPDATE_FID (request.notification(), response);
               break;
375

376
377
378
379
380
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
381
         break;
382

383
384
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
385
386

      default:
387
388
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
389
390
391
392
393
   }
}



394
395
// EOS Workflow commands

396
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
397
398
399
400
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
401
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
402
403
404
405
406
407

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

408
409


410
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
411
{
412
   // Validate received protobuf
413
414
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
415

416
   // Unpack message
417
418
419
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
420

421
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
422
   if(notification.file().xattr().end() == storageClassItor) {
423
424
425
426
427
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
428
429
   }
   const std::string storageClass = storageClassItor->second;
430
   if(storageClass.empty()) {
431
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
432
   }
433
434

   cta::utils::Timer t;
435
   uint64_t archiveFileId;
436

437
438
439
440
441
442
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
443

444
445
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
446
447
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
448
         .add("fileId", archiveFileId)
449
         .add("schedulerTime", t.secs());
450
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
451

452
   // Set ArchiveFileId in xattrs
453
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
454
455
   
   // Set the storage class in xattrs
456
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
457
458
459
460
461

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

462
463


464
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
465
{
466
   // Validate received protobuf
467
468
469
470
471
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
472

473
   // Unpack message
474
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
475
   if(notification.file().xattr().end() == storageClassItor) {
476
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
477
478
   }

479
   // For testing: this storage class will always fail
480
481
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
482
483
   }

484
485
486
487
488
489
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

490
   cta::common::dataStructures::ArchiveRequest request;
491
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
492
493
494
495
496
497
498
499
500
501
502
503
504
505
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
506

507
508
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
509
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
510
   if(notification.file().xattr().end() == archiveFileIdItor) {
511
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
512
513
514
515
516
517
518
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
519

520
521
   cta::utils::Timer t;

522
523
524
525
526
527
528
529
530
531
532
533
534
535
   cta::log::ScopedParamContainer params(m_lc);
   std::string logMessage = "In RequestMessage::processCLOSEW(): ";
   if(request.fileSize > 0) {
     // Queue the request
     std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
     logMessage += "queued file for archive.";
     params.add("schedulerTime", t.secs());
     params.add("archiveRequestId", archiveRequestAddr);

     // Add archive request reference to response as an extended attribute
     response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
   } else {
     logMessage += "ignoring zero-length file.";
   }
536

537
   // Create a log entry
538
539
   params.add("fileId", archiveFileId);
   params.add("requesterInstance", notification.wf().requester_instance());
540
   m_lc.log(cta::log::INFO, logMessage);
541

542
   // Set response type
543
544
545
546
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}


547
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
548
{
549
   // Validate received protobuf
550
551
552
553
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
554

555
   // Unpack message
556
   cta::common::dataStructures::RetrieveRequest request;
557
558
559
560
561
562
563
564
565
566
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
567

568
569
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
570
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
571
   if(notification.file().xattr().end() == archiveFileIdItor) {
572
     // Fall back to the old xattr format
573
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
574
575
576
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
577
578
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
579
580
581
582
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
583
584
   
   // Activity value is a string. The parameter might be present or not.
585
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
586
     request.activity = notification.file().xattr().at("activity");
587
   }
588

589
590
   cta::utils::Timer t;

591
   // Queue the request
592
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
593

594
595
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
596
597
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
598
599
600
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
601
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
602

603
   // Set response type and add retrieve request reference as an extended attribute.
604
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
605
606
607
608
609
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



610
611
612
613
614
615
616
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
617
   cta::common::dataStructures::CancelRetrieveRequest request;
618
619
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
620
621
622

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
623
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
624
   if(notification.file().xattr().end() == archiveFileIdItor) {
625
     // Fall back to the old xattr format
626
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
627
628
629
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
630
631
632
633
634
635
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
636
637
   
   // The request Id should be stored as an extended attribute
638
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
639
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
640
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
641
   }
642
643
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
644
645

   // Queue the request
646
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
647

648
649
   cta::utils::Timer t;

650
651
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
652
653
654
655
656
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
657

658
   // Set response type and remove reference to retrieve request in EOS extended attributes.
659
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
660
661
662
663
664
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



665
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
666
{
667
   // Validate received protobuf
668
669
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
670
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
671

672
   // Unpack message
673
   cta::common::dataStructures::DeleteArchiveRequest request;
674
675
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
676
   
677
678
679
680
681
   std::string lpath         = notification.file().lpath();
   uint64_t diskFileId       = notification.file().fid();
   request.diskFilePath          = lpath;
   request.diskFileId = std::to_string(diskFileId);
   request.diskInstance = m_cliIdentity.username;
682
683
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
684
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
685
   if(notification.file().xattr().end() == archiveFileIdItor) {
686
     // Fall back to the old xattr format
687
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
688
689
690
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
691
692
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
693
694
695
696
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
697
698
699
700
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
701
702
703
704
     std::string objectstoreAddress = archiveRequestAddrItor->second;
     if(!objectstoreAddress.empty()){
      request.address = archiveRequestAddrItor->second;
     }
705
   }
706

707
   // Delete the file from the catalogue or from the objectstore if archive request is created
708
   cta::utils::Timer t;
709
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
710
711
712

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
713
714
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
715
         .add("filePath",request.diskFilePath)
716
         .add("schedulerTime", t.secs());
717
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
718
719
720
721
722

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

723
724


725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
void RequestMessage::processUPDATE_FID(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.file().lpath(),  "notification.file.lpath");

   // Unpack message
   const std::string &diskInstance = m_cliIdentity.username;
   const std::string &diskFilePath = notification.file().lpath();
   const std::string diskFileId = std::to_string(notification.file().fid());

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
   if(notification.file().xattr().end() == archiveFileIdItor) {
     // Fall back to the old xattr format
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10);
   if(0 == archiveFileId) {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
   
   // Update the disk file ID
   cta::utils::Timer t;
   m_catalogue.updateDiskFileId(archiveFileId, diskInstance, diskFileId);

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
   params.add("fileId", archiveFileId)
         .add("schedulerTime", t.secs())
         .add("diskInstance", diskInstance)
         .add("diskFilePath", diskFilePath)
         .add("diskFileId", diskFileId);
   m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID.");

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



770
771
// Admin commands

772
773
774
775
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

776
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
777
778
779

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
780
781
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
782
783
784
785
786
787
788
789
790
791
792
793
794
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
795
   params.add("adminTime", t.secs());
796
797
798
799
800
   m_lc.log(cta::log::INFO, log_msg);
}



801
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
802
803
804
{
   using namespace cta::admin;

805
806
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
807
808

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
809
810
811
812
813
814

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



815
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
816
817
818
{
   using namespace cta::admin;

819
820
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
821
822

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
823
824
825
826
827
828

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



829
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
830
831
832
{
   using namespace cta::admin;

833
   auto &username = getRequired(OptionString::USERNAME);
834
835

   m_catalogue.deleteAdminUser(username);
836
837
838
839
840
841

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



842
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
843
{
844
  using namespace cta::admin;
845

846
847
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
848

849
  response.set_show_header(HeaderType::ADMIN_LS);
850
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
851
852
853
854
}



855
#if 0
856
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
857
{
858
  using namespace cta::admin;
859

860
861
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
862

863
864
865
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
866

867
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
868
}
869
#endif
870
871
872



873
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
874
875
876
{
   using namespace cta::admin;