XrdSsiCtaRequestMessage.cpp 78.2 KB
Newer Older
1
2
3
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          XRootD EOS Notification handler
4
 * @copyright      Copyright 2019 CERN
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <XrdSsiPbException.hpp>
20
21
using XrdSsiPb::PbException;

22
#include "common/utils/Regex.hpp"
23
#include <cmdline/CtaAdminCmdParse.hpp>
24
#include "XrdSsiCtaRequestMessage.hpp"
25
#include "XrdCtaAdminLs.hpp"
26
#include "XrdCtaArchiveFileLs.hpp"
27
#include "XrdCtaArchiveRouteLs.hpp"
28
#include "XrdCtaDriveLs.hpp"
29
#include "XrdCtaFailedRequestLs.hpp"
30
#include "XrdCtaGroupMountRuleLs.hpp"
31
#include "XrdCtaListPendingQueue.hpp"
32
#include "XrdCtaLogicalLibraryLs.hpp"
33
#include "XrdCtaMountPolicyLs.hpp"
34
#include "XrdCtaMediaTypeLs.hpp"
35
#include "XrdCtaRepackLs.hpp"
36
#include "XrdCtaRequesterMountRuleLs.hpp"
37
#include "XrdCtaShowQueues.hpp"
38
#include "XrdCtaTapeLs.hpp"
39
#include "XrdCtaTapeFileLs.hpp"
40
#include "XrdCtaStorageClassLs.hpp"
41
#include "XrdCtaTapePoolLs.hpp"
42
#include "XrdCtaDiskSystemLs.hpp"
43
#include "XrdCtaVirtualOrganizationLs.hpp"
44
#include "XrdCtaVersion.hpp"
45

46
47
48
#include <limits>
#include <sstream>

49
50
namespace cta {
namespace xrd {
51

52
53
54
55
56
// Codes to change colours for console output (when sending a response to cta-admin)
const char* const TEXT_RED    = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";


57
58
59
/*
 * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
 */
60
constexpr unsigned int cmd_pair(cta::admin::AdminCmd::Cmd cmd, cta::admin::AdminCmd::SubCmd subcmd) {
61
62
63
64
   return (cmd << 16) + subcmd;
}


65
void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Response &response, XrdSsiStream* &stream)
66
67
{
   // Branch on the Request payload type
68

69
70
71
   switch(request.request_case())
   {
      using namespace cta::xrd;
72

73
      case Request::kAdmincmd: {
74
        
75
76
         // Validate that the Kerberos user is an authorized CTA Admin user
         if(m_protocol != Protocol::KRB5) {
77
            throw cta::exception::UserError("[ERROR] Admin commands must be authenticated using the Kerberos 5 protocol.");
78
79
         }
         m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
80
         
81
82
         cta::utils::Timer t;

83
84
         // Validate the Protocol Buffer and import options into maps
         importOptions(request.admincmd());
85
86
87
88
         
         m_client_versions.ctaVersion = request.client_cta_version();
         m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version();
         
89
90
91
92
         // Map the <Cmd, SubCmd> to a method
         switch(cmd_pair(request.admincmd().cmd(), request.admincmd().subcmd())) {
            using namespace cta::admin;

93
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_ADD):
94
               processAdmin_Add(response);
95
96
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_CH):
97
               processAdmin_Ch(response);
98
99
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_RM):
100
               processAdmin_Rm(response);
101
102
               break;
            case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
103
               processAdmin_Ls(response, stream);
104
105
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
106
               processArchiveFile_Ls(response, stream);
107
108
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
109
               processArchiveRoute_Add(response);
110
111
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_CH):
112
               processArchiveRoute_Ch(response);
113
114
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_RM):
115
               processArchiveRoute_Rm(response);
116
117
               break;
            case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_LS):
118
               processArchiveRoute_Ls(response, stream);
119
120
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_UP):
121
               processDrive_Up(response);
122
123
               break;
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
124
               processDrive_Down(response);
125
               break;
126
127
128
           case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
             processDrive_Ch(response);
             break;
129
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
130
               processDrive_Ls(response, stream);
131
               break;
132
            case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM):
133
               processDrive_Rm(response);
134
               break;
135
            case cmd_pair(AdminCmd::CMD_FAILEDREQUEST, AdminCmd::SUBCMD_LS):
136
               processFailedRequest_Ls(response, stream);
137
               break;
138
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_ADD):
139
               processGroupMountRule_Add(response);
140
141
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_CH):
142
               processGroupMountRule_Ch(response);
143
144
               break;
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_RM):
145
               processGroupMountRule_Rm(response);
146
               break;
147
            case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
148
               processGroupMountRule_Ls(response, stream);
149
150
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
151
               processListPendingArchives(response, stream);
152
153
               break;
            case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
154
               processListPendingRetrieves(response, stream);
155
156
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
157
               processLogicalLibrary_Add(response);
158
159
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_CH):
160
               processLogicalLibrary_Ch(response);
161
162
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_RM):
163
               processLogicalLibrary_Rm(response);
164
165
               break;
            case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_LS):
166
               processLogicalLibrary_Ls(response, stream);
167
               break;
168
169
170
171
172
173
174
175
176
177
178
179
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_ADD):
               processMediaType_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_CH):
               processMediaType_Ch(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_RM):
               processMediaType_Rm(response);
               break;
            case cmd_pair(AdminCmd::CMD_MEDIATYPE, AdminCmd::SUBCMD_LS):
               processMediaType_Ls(response, stream);
               break;
180
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_ADD):
181
               processMountPolicy_Add(response);
182
183
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_CH):
184
               processMountPolicy_Ch(response);
185
186
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM):
187
               processMountPolicy_Rm(response);
188
189
               break;
            case cmd_pair(AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS):
190
               processMountPolicy_Ls(response, stream);
191
192
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD):
193
               processRepack_Add(response);
194
195
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM):
196
               processRepack_Rm(response);
197
198
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS):
199
               processRepack_Ls(response, stream);
200
201
               break;
            case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR):
202
               processRepack_Err(response);
203
204
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_ADD):
205
               processRequesterMountRule_Add(response);
206
207
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_CH):
208
               processRequesterMountRule_Ch(response);
209
210
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_RM):
211
               processRequesterMountRule_Rm(response);
212
213
               break;
            case cmd_pair(AdminCmd::CMD_REQUESTERMOUNTRULE, AdminCmd::SUBCMD_LS):
214
               processRequesterMountRule_Ls(response, stream);
215
216
               break;
            case cmd_pair(AdminCmd::CMD_SHOWQUEUES, AdminCmd::SUBCMD_NONE):
217
               processShowQueues(response, stream);
218
219
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_ADD):
220
               processStorageClass_Add(response);
221
222
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_CH):
223
               processStorageClass_Ch(response);
224
225
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_RM):
226
               processStorageClass_Rm(response);
227
228
               break;
            case cmd_pair(AdminCmd::CMD_STORAGECLASS, AdminCmd::SUBCMD_LS):
229
               processStorageClass_Ls(response, stream);
230
231
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_ADD):
232
               processTape_Add(response);
233
234
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_CH):
235
               processTape_Ch(response);
236
237
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RM):
238
               processTape_Rm(response);
239
240
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_RECLAIM):
241
               processTape_Reclaim(response);
242
243
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS):
244
               processTape_Ls(response, stream);
245
246
               break;
            case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL):
247
               processTape_Label(response);
248
               break;
249
250
251
            case cmd_pair(AdminCmd::CMD_TAPEFILE, AdminCmd::SUBCMD_LS):
               processTapeFile_Ls(response, stream);
               break;
252
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_ADD):
253
               processTapePool_Add(response);
254
255
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_CH):
256
               processTapePool_Ch(response);
257
258
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_RM):
259
               processTapePool_Rm(response);
260
261
               break;
            case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
262
               processTapePool_Ls(response, stream);
263
               break;
264
265
266
267
268
269
270
271
272
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_LS):
               processDiskSystem_Ls(response, stream);
               break;   
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_ADD):
               processDiskSystem_Add(response);
               break;
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_RM):
               processDiskSystem_Rm(response);
               break;  
273
274
            case cmd_pair(AdminCmd::CMD_DISKSYSTEM, AdminCmd::SUBCMD_CH):
               processDiskSystem_Ch(response);
275
276
277
278
279
280
281
282
283
284
285
286
287
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_ADD):
               processVirtualOrganization_Add(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_CH):
               processVirtualOrganization_Ch(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION, AdminCmd::SUBCMD_RM):
               processVirtualOrganization_Rm(response);
               break;
           case cmd_pair(AdminCmd::CMD_VIRTUALORGANIZATION,AdminCmd::SUBCMD_LS):
               processVirtualOrganization_Ls(response, stream);
               break;
288
289
290
           case cmd_pair(AdminCmd::CMD_VERSION, AdminCmd::SUBCMD_NONE):
             processVersion(response, stream);
             break;
291
292
293
294
295
            default:
               throw PbException("Admin command pair <" +
                     AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " +
                     AdminCmd_SubCmd_Name(request.admincmd().subcmd()) +
                     "> is not implemented.");
296
297
298
299
300
            } // end switch

            // Log the admin command
            logAdminCmd(__FUNCTION__, request.admincmd(), t);
         } // end case Request::kAdmincmd
301
         break;
302

303
      case Request::kNotification:
304
         // Validate that instance name in key used to authenticate matches instance name in Protocol buffer
305
         if(m_cliIdentity.username != request.notification().wf().instance().name()) {
306
307
308
309
310
311
312
313
314
315
316
317
318
            // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators
            // to use a command line tool to resubmit failed archive or prepare requests. This is NOT
            // permitted for DELETE events as we don't want files removed from the catalogue to be left
            // in the EOS namespace.
            if(m_protocol == Protocol::KRB5 &&
               (request.notification().wf().event() == cta::eos::Workflow::CLOSEW ||
                request.notification().wf().event() == cta::eos::Workflow::PREPARE)) {
               m_scheduler.authorizeAdmin(m_cliIdentity, m_lc);
               m_cliIdentity.username = request.notification().wf().instance().name();
            } else {
               throw PbException("Instance name \"" + request.notification().wf().instance().name() +
                                 "\" does not match key identifier \"" + m_cliIdentity.username + "\"");
            }
319
320
         }

321
322
         // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/
         {
323
324
325
326
           const std::string &longInstanceName = request.notification().wf().instance().name();
           const bool longInstanceNameStartsWithEos = 0 == longInstanceName.find("eos");
           const std::string shortInstanceName =
             longInstanceNameStartsWithEos ? longInstanceName.substr(3) : longInstanceName;
327
           if(shortInstanceName.empty()) {
328
             std::ostringstream msg;
329
             msg << "Short instance name is an empty string: instance=" << longInstanceName;
330
             throw PbException(msg.str());
331
332
333
           }
           const std::string procFullPath = std::string("/eos/") + shortInstanceName + "/proc/";
           if(request.notification().file().lpath().find(procFullPath) == 0) {
334
             std::ostringstream msg;
335
336
             msg << "Cannot process a workflow event for a file in " << procFullPath << " instance=" << longInstanceName
               << " event=" << Workflow_EventType_Name(request.notification().wf().event()) << " lpath=" <<
337
338
               request.notification().file().lpath();
             throw PbException(msg.str());
339
340
341
           }
         }

342
         // Map the Workflow Event to a method
343
344
345
         switch(request.notification().wf().event()) {
            using namespace cta::eos;

346
347
348
            case Workflow::OPENW:
               processOPENW (request.notification(), response);
               break;
349
350
351
            case Workflow::CREATE:
               processCREATE (request.notification(), response);
               break;
352
353
354
355
356
357
            case Workflow::CLOSEW:
               processCLOSEW (request.notification(), response);
               break;
            case Workflow::PREPARE:
               processPREPARE(request.notification(), response);
               break;
358
359
360
            case Workflow::ABORT_PREPARE:
               processABORT_PREPARE(request.notification(), response);
               break;
361
362
363
            case Workflow::DELETE:
               processDELETE (request.notification(), response);
               break;
364
365
366
            case Workflow::UPDATE_FID:
               processUPDATE_FID (request.notification(), response);
               break;
367

368
369
370
371
372
            default:
               throw PbException("Workflow event " +
                     Workflow_EventType_Name(request.notification().wf().event()) +
                     " is not implemented.");
         }
373
         break;
374

375
376
      case Request::REQUEST_NOT_SET:
         throw PbException("Request message has not been set.");
377
378

      default:
379
380
         throw PbException("Unrecognized Request message. "
                           "Possible Protocol Buffer version mismatch between client and server.");
381
382
383
384
385
   }
}



386
387
// EOS Workflow commands

388
void RequestMessage::processOPENW(const cta::eos::Notification &notification, cta::xrd::Response &response)
389
390
391
392
{
   // Create a log entry

   cta::log::ScopedParamContainer params(m_lc);
393
   m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event.");
394
395
396
397
398
399

   // Set response type

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

400
401


402
void RequestMessage::processCREATE(const cta::eos::Notification &notification, cta::xrd::Response &response)
403
{
404
   // Validate received protobuf
405
406
   checkIsNotEmptyString(notification.cli().user().username(),  "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname");
407

408
   // Unpack message
409
410
411
   cta::common::dataStructures::RequesterIdentity requester;
   requester.name  = notification.cli().user().username();
   requester.group = notification.cli().user().groupname();
412

413
   auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
414
   if(notification.file().xattr().end() == storageClassItor) {
415
416
417
418
419
     // Fall back to old xattr format
     storageClassItor = notification.file().xattr().find("CTA_StorageClass");
     if(notification.file().xattr().end() == storageClassItor) {
       throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set");
     }
420
421
   }
   const std::string storageClass = storageClassItor->second;
422
   if(storageClass.empty()) {
423
     throw PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string");
424
   }
425
426

   cta::utils::Timer t;
427
   uint64_t archiveFileId;
428

429
430
431
432
433
434
   // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool.
   if(storageClassItor->second == "fail_on_closew_test") {
     archiveFileId = std::numeric_limits<uint64_t>::max();
   } else {
     archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc);
   }
435

436
437
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
438
439
   params.add("diskFileId", std::to_string(notification.file().fid()))
         .add("diskFilePath", notification.file().lpath())
440
         .add("fileId", archiveFileId)
441
         .add("schedulerTime", t.secs());
442
   m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID.");
443

444
   // Set ArchiveFileId in xattrs
445
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId)));
446
447
   
   // Set the storage class in xattrs
448
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass));
449
450
451
452
453

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

454
455


456
void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, cta::xrd::Response &response)
457
{
458
   // Validate received protobuf
459
460
461
462
463
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.wf().instance().url(),      "notification.wf.instance.url");
   checkIsNotEmptyString(notification.transport().report_url(),   "notification.transport.report_url");
464

465
   // Unpack message
466
   const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class");
467
   if(notification.file().xattr().end() == storageClassItor) {
468
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class");
469
470
   }

471
   // For testing: this storage class will always fail
472
473
   if(storageClassItor->second == "fail_on_closew_test") {
      throw PbException("File is in fail_on_closew_test storage class, which always fails.");
474
475
   }

476
477
478
479
480
481
   // Disallow archival of files above the specified limit
   if(notification.file().size() > m_archiveFileMaxSize) {
      throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) +
                                 " bytes) exceeds maximum allowed size (" + std::to_string(m_archiveFileMaxSize) + " bytes)");
   }

482
   cta::common::dataStructures::ArchiveRequest request;
483
   checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob);
484
485
486
487
488
489
490
491
492
493
494
495
496
497
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.diskFileID             = std::to_string(notification.file().fid());
   request.fileSize               = notification.file().size();
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.srcURL                 = notification.wf().instance().url();
   request.storageClass           = storageClassItor->second;
   request.archiveReportURL       = notification.transport().report_url();
   request.archiveErrorReportURL  = notification.transport().error_report_url();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
498

499
500
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
501
   const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
502
   if(notification.file().xattr().end() == archiveFileIdItor) {
503
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
504
505
506
507
508
509
510
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   uint64_t archiveFileId = 0;
   if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
511

512
513
   cta::utils::Timer t;

514
515
516
517
518
519
520
521
522
523
524
525
526
527
   cta::log::ScopedParamContainer params(m_lc);
   std::string logMessage = "In RequestMessage::processCLOSEW(): ";
   if(request.fileSize > 0) {
     // Queue the request
     std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
     logMessage += "queued file for archive.";
     params.add("schedulerTime", t.secs());
     params.add("archiveRequestId", archiveRequestAddr);

     // Add archive request reference to response as an extended attribute
     response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
   } else {
     logMessage += "ignoring zero-length file.";
   }
528

529
   // Create a log entry
530
531
   params.add("fileId", archiveFileId);
   params.add("requesterInstance", notification.wf().requester_instance());
532
   m_lc.log(cta::log::INFO, logMessage);
533

534
   // Set response type
535
536
537
538
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}


539
void RequestMessage::processPREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
540
{
541
   // Validate received protobuf
542
543
544
545
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
   checkIsNotEmptyString(notification.transport().dst_url(),      "notification.transport.dst_url");
546

547
   // Unpack message
548
   cta::common::dataStructures::RetrieveRequest request;
549
550
551
552
553
554
555
556
557
558
   request.requester.name         = notification.cli().user().username();
   request.requester.group        = notification.cli().user().groupname();
   request.dstURL                 = notification.transport().dst_url();
   request.errorReportURL         = notification.transport().error_report_url();
   request.diskFileInfo.owner_uid = notification.file().owner().uid();
   request.diskFileInfo.gid       = notification.file().owner().gid();
   request.diskFileInfo.path      = notification.file().lpath();
   request.creationLog.host       = m_cliIdentity.host;
   request.creationLog.username   = m_cliIdentity.username;
   request.creationLog.time       = time(nullptr);
559

560
561
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
562
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
563
   if(notification.file().xattr().end() == archiveFileIdItor) {
564
     // Fall back to the old xattr format
565
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
566
567
568
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
569
570
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
571
572
573
574
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
575
576
   
   // Activity value is a string. The parameter might be present or not.
577
   if(notification.file().xattr().find("activity") != notification.file().xattr().end()) {
578
     request.activity = notification.file().xattr().at("activity");
579
   }
580

581
582
   cta::utils::Timer t;

583
   // Queue the request
584
   std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc);
585

586
587
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
588
589
   params.add("fileId", request.archiveFileID).add("schedulerTime", t.secs())
         .add("retrieveReqId", retrieveReqId);
590
591
592
   if(static_cast<bool>(request.activity)) {
     params.add("activity", request.activity.value());
   }
593
   m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve.");
594

595
   // Set response type and add retrieve request reference as an extended attribute.
596
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
597
598
599
600
601
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



602
603
604
605
606
607
608
void RequestMessage::processABORT_PREPARE(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");

   // Unpack message
609
   cta::common::dataStructures::CancelRetrieveRequest request;
610
611
   request.requester.name   = notification.cli().user().username();
   request.requester.group  = notification.cli().user().groupname();
612
613
614

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
615
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
616
   if(notification.file().xattr().end() == archiveFileIdItor) {
617
     // Fall back to the old xattr format
618
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
619
620
621
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
622
623
624
625
626
627
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
628
629
   
   // The request Id should be stored as an extended attribute
630
   const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id");
631
   if(notification.file().xattr().end() == retrieveRequestIdItor) {
632
     throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id");
633
   }
634
635
   const std::string retrieveRequestId = retrieveRequestIdItor->second;
   request.retrieveRequestId = retrieveRequestId;
636
637

   // Queue the request
638
   m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc);
639

640
641
   cta::utils::Timer t;

642
643
   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
644
645
646
647
648
   params.add("fileId", request.archiveFileID)
         .add("schedulerTime", t.secs())
         .add("retrieveRequestId", request.retrieveRequestId)
         .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100));
   m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request.");
649

650
   // Set response type and remove reference to retrieve request in EOS extended attributes.
651
   response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", ""));
652
653
654
655
656
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



657
void RequestMessage::processDELETE(const cta::eos::Notification &notification, cta::xrd::Response &response)
658
{
659
   // Validate received protobuf
660
661
   checkIsNotEmptyString(notification.cli().user().username(),    "notification.cli.user.username");
   checkIsNotEmptyString(notification.cli().user().groupname(),   "notification.cli.user.groupname");
662
   checkIsNotEmptyString(notification.file().lpath(),             "notification.file.lpath");
663

664
   // Unpack message
665
   cta::common::dataStructures::DeleteArchiveRequest request;
666
667
   request.requester.name    = notification.cli().user().username();
   request.requester.group   = notification.cli().user().groupname();
668
   
669
670
671
672
673
   std::string lpath         = notification.file().lpath();
   uint64_t diskFileId       = notification.file().fid();
   request.diskFilePath          = lpath;
   request.diskFileId = std::to_string(diskFileId);
   request.diskInstance = m_cliIdentity.username;
674
675
   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
   // must be converted to a valid uint64_t
676
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
677
   if(notification.file().xattr().end() == archiveFileIdItor) {
678
     // Fall back to the old xattr format
679
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
680
681
682
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
683
684
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
685
686
687
688
   if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
   {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
689
690
691
692
   
   auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id");
   if(archiveRequestAddrItor != notification.file().xattr().end()){
     //We have the ArchiveRequest's objectstore address.
693
694
695
696
     std::string objectstoreAddress = archiveRequestAddrItor->second;
     if(!objectstoreAddress.empty()){
      request.address = archiveRequestAddrItor->second;
     }
697
   }
698

699
   // Delete the file from the catalogue or from the objectstore if archive request is created
700
   cta::utils::Timer t;
701
   m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
702
703
704

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
705
706
   params.add("fileId", request.archiveFileID)
         .add("address", (request.address ? request.address.value() : "null"))
707
         .add("filePath",request.diskFilePath)
708
         .add("schedulerTime", t.secs());
709
   m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted.");
710
711
712
713
714

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}

715
716


717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
void RequestMessage::processUPDATE_FID(const cta::eos::Notification &notification, cta::xrd::Response &response)
{
   // Validate received protobuf
   checkIsNotEmptyString(notification.file().lpath(),  "notification.file.lpath");

   // Unpack message
   const std::string &diskInstance = m_cliIdentity.username;
   const std::string &diskFilePath = notification.file().lpath();
   const std::string diskFileId = std::to_string(notification.file().fid());

   // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be
   // converted to a valid uint64_t
   auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id");
   if(notification.file().xattr().end() == archiveFileIdItor) {
     // Fall back to the old xattr format
     archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId");
     if(notification.file().xattr().end() == archiveFileIdItor) {
       throw PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id");
     }
   }
   const std::string archiveFileIdStr = archiveFileIdItor->second;
   const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10);
   if(0 == archiveFileId) {
      throw PbException("Invalid archiveFileID " + archiveFileIdStr);
   }
   
   // Update the disk file ID
   cta::utils::Timer t;
   m_catalogue.updateDiskFileId(archiveFileId, diskInstance, diskFileId);

   // Create a log entry
   cta::log::ScopedParamContainer params(m_lc);
   params.add("fileId", archiveFileId)
         .add("schedulerTime", t.secs())
         .add("diskInstance", diskInstance)
         .add("diskFilePath", diskFilePath)
         .add("diskFileId", diskFileId);
   m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID.");

   // Set response type
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



762
763
// Admin commands

764
765
766
767
void RequestMessage::logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t)
{
   using namespace cta::admin;

768
   std::string log_msg = "In RequestMessage::" + function + "(): Admin command succeeded: ";
769
770
771

   // Reverse lookup of strings corresponding to <command,subcommand> pair
   for(auto cmd_it = cmdLookup.begin(); cmd_it != cmdLookup.end(); ++cmd_it) {
772
773
      // Return the matching long string (length > 3)
      if(admincmd.cmd() == cmd_it->second && cmd_it->first.length() > 3) {
774
775
776
777
778
779
780
781
782
783
784
785
786
         log_msg += cmd_it->first + ' ';
         break;
      }
   }
   for(auto subcmd_it = subcmdLookup.begin(); subcmd_it != subcmdLookup.end(); ++subcmd_it) {
      if(admincmd.subcmd() == subcmd_it->second) {
         log_msg += subcmd_it->first;
         break;
      }
   }

   // Add the log message
   cta::log::ScopedParamContainer params(m_lc);
787
   params.add("adminTime", t.secs());
788
789
790
791
792
   m_lc.log(cta::log::INFO, log_msg);
}



793
void RequestMessage::processAdmin_Add(cta::xrd::Response &response)
794
795
796
{
   using namespace cta::admin;

797
798
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
799
800

   m_catalogue.createAdminUser(m_cliIdentity, username, comment);
801
802
803
804
805
806

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



807
void RequestMessage::processAdmin_Ch(cta::xrd::Response &response)
808
809
810
{
   using namespace cta::admin;

811
812
   auto &username = getRequired(OptionString::USERNAME);
   auto &comment  = getRequired(OptionString::COMMENT);
813
814

   m_catalogue.modifyAdminUserComment(m_cliIdentity, username, comment);
815
816
817
818
819
820

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



821
void RequestMessage::processAdmin_Rm(cta::xrd::Response &response)
822
823
824
{
   using namespace cta::admin;

825
   auto &username = getRequired(OptionString::USERNAME);
826
827

   m_catalogue.deleteAdminUser(username);
828
829
830
831
832
833

   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



834
void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
835
{
836
  using namespace cta::admin;
837

838
839
  // Create a XrdSsi stream object to return the results
  stream = new AdminLsStream(*this, m_catalogue, m_scheduler);
840

841
  response.set_show_header(HeaderType::ADMIN_LS);
842
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
843
844
845
846
}



847
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
848
{
849
  using namespace cta::admin;
850

851
852
  // Create a XrdSsi stream object to return the results
  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
853

854
855
856
  // Set correct column headers
  response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
                                                            : HeaderType::ARCHIVEFILE_LS);
857

858
  response.set_type(cta::xrd::Response::RSP_SUCCESS);
859
860
861
862
}



863
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
864
865
866
{
   using namespace cta::admin;

867
868
869
870
   auto &scn      = getRequired(OptionString::STORAGE_CLASS);
   auto &cn       = getRequired(OptionUInt64::COPY_NUMBER);
   auto &tapepool = getRequired(OptionString::TAPE_POOL);
   auto &comment  = getRequired(OptionString::COMMENT);
871

872
   m_catalogue.createArchiveRoute(m_cliIdentity, scn, cn, tapepool, comment);
873

874
875
876
877
878
   response.set_type(cta::xrd::Response::RSP_SUCCESS);
}



879
void RequestMessage::processArchiveRoute_Ch(cta::xrd::Response &response)