XrdxCastor2Ofs.cpp 27.9 KB
Newer Older
1
/*******************************************************************************
2
 *                      XrdxCastor2Ofs.cc
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2012  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *
21
 * @author Castor Dev team, castor-dev@cern.ch
22
 *
23
24
25
26
27
28
 ******************************************************************************/

/*-----------------------------------------------------------------------------*/
#include <sys/types.h>
#include <sys/stat.h>
#include <grp.h>
29
#include <fcntl.h>
30
31
32
#include "h/Cns_api.h"
#include "h/serrno.h"
#include <string.h>
33
#include <zlib.h>
34
#include "ceph/ceph_posix.h"
35
#include "movers/moverclose.h"
36
/*----------------------------------------------------------------------------*/
37
#include "XrdVersion.hh"
38
39
#include "XrdAcc/XrdAccAuthorize.hh"
#include "XrdOfs/XrdOfs.hh"
40
#include "XrdSys/XrdSysDNS.hh"
41
#include "XrdSys/XrdSysTimer.hh"
42
43
#include "XrdOss/XrdOssApi.hh"
#include "XrdOuc/XrdOucTrace.hh"
44
#include "XrdNet/XrdNetSocket.hh"
45
#include "XrdSec/XrdSecEntity.hh"
46
/*----------------------------------------------------------------------------*/
47
48
#include "XrdxCastor2Ofs.hpp"
#include "XrdxCastor2FsConstants.hpp"
49
#include "XrdxCastor2Timing.hpp"
50
/*----------------------------------------------------------------------------*/
51

52
XrdxCastor2Ofs* gSrv; ///< global diskserver OFS handle
53

54
// Extern symbols
55
extern XrdOfs*     XrdOfsFS;
56
extern XrdSysError OfsEroute;
57
extern XrdOssSys*  XrdOfsOss;
58
extern XrdOss*     XrdOssGetSS(XrdSysLogger*, const char*, const char*);
59

60
XrdVERSIONINFO(XrdSfsGetFileSystem, xCastor2Ofs);
61

62
63
// One minute for destination to contact us for tpc.key rendez-vous
const int XrdxCastor2OfsFile::sKeyExpiry = 60;
64

65

66
67
68
//------------------------------------------------------------------------------
// SfsGetFileSystem
//------------------------------------------------------------------------------
69
70
extern "C"
{
71
72
73
  XrdSfsFileSystem* XrdSfsGetFileSystem(XrdSfsFileSystem* native_fs,
                                        XrdSysLogger*     lp,
                                        const char*       configfn)
74
  {
75
    static XrdxCastor2Ofs myFS;
76
    // Do the herald thing
77
78
    OfsEroute.SetPrefix("castor2ofs_");
    OfsEroute.logger(lp);
79
    OfsEroute.Say("++++++ (c) 2014 CERN/IT-DSS xCastor2Ofs v1.0");
80

81
    // Initialize the subsystems
82
    gSrv = &myFS;
83
    gSrv->ConfigFN = (configfn && *configfn ? strdup(configfn) : 0);
84

85
    if (gSrv->Configure(OfsEroute)) return 0;
86

87
    // All done, we can return the callout vector to these routines
88
89
    XrdOfsFS = static_cast<XrdOfs*>(gSrv);
    return XrdOfsFS;
90
  }
91
92
}

93

94
/******************************************************************************/
95
/*                         x C a s t o r O f s                                */
96
/******************************************************************************/
97

98
99
100
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
101
102
103
104
105
XrdxCastor2Ofs::XrdxCastor2Ofs():
  XrdOfs(),
  LogId(),
  mLogLevel(LOG_INFO)
{ }
106
107


108
109
110
//------------------------------------------------------------------------------
// Configure
//------------------------------------------------------------------------------
111
int XrdxCastor2Ofs::Configure(XrdSysError& Eroute)
112
{
113
114
  char* var;
  const char* val;
115
  int  cfgFD;
116
  // Extract the manager from the config file
117
  XrdOucStream config_stream(&Eroute, getenv("XRDINSTANCE"));
118

119
120
  if (ConfigFN && *ConfigFN)
  {
121
    // Try to open the configuration file.
122
123
    if ((cfgFD = open(ConfigFN, O_RDONLY, 0)) < 0)
      return Eroute.Emsg("Config", errno, "open config file fn=", ConfigFN);
124

125
    config_stream.Attach(cfgFD);
126
127

    // Now start reading records until eof.
128
129
130
131
    while ((var = config_stream.GetMyFirstWord()))
    {
      if (!strncmp(var, "xcastor2.", 9))
      {
132
133
        var += 9;

134
        // Get the log level
135
136
137
138
139
        if (!strcmp("loglevel", var))
        {
          if (!(val = config_stream.GetWord()))
          {
            Eroute.Emsg("Config", "argument for debug level invalid set to INFO.");
140
            mLogLevel = LOG_INFO;
141
          }
142
143
          else
          {
144
            long int log_level = Logging::GetPriorityByString(val);
145

146
147
148
149
150
151
            if (log_level == -1)
            {
              // Maybe the log level is specified as an int from 0 to 7
              errno = 0;
              char* end;
              log_level = (int) strtol(val, &end, 10);
152

153
154
155
156
157
              if ((errno == ERANGE && ((log_level == LONG_MIN) || (log_level == LONG_MAX))) ||
                  ((errno != 0) && (log_level == 0)) ||
                  (end == val))
              {
                // There was an error default to LOG_INFO
158
                log_level = 6;
159
160
161
              }
            }

162
            mLogLevel = log_level;
163
164
            Eroute.Say("=====> xcastor2.loglevel: ",
                       Logging::GetPriorityString(mLogLevel), "");
165
          }
166
        }
167

168
169
170
171
172
173
        // Get any debug filter name
        if (!strcmp("debugfilter", var))
        {
          if (!(val = config_stream.GetWord()))
          {
            Eroute.Emsg("Config", "argument for debug filter invalid set to none.");
174
          }
175
176
          else
          {
177
            Logging::SetFilter(val);
178
            Eroute.Say("=====> xcastor2.debugfileter: ", val, "");
179
180
          }
        }
181
182
      }
    }
183

184
    config_stream.Close();
185
186
  }

187
  // Setup the circular in-memory logging buffer
188
189
190
191
  XrdOucString unit = "rdr@";
  unit += XrdSysDNS::getHostName();
  unit += ":1094";
  Logging::Init();
192
193
  Logging::SetLogPriority(mLogLevel);
  Logging::SetUnit(unit.c_str());
194
  xcastor_info("logging configured");
195
196
  // Parse the default XRootD directives
  int rc = XrdOfs::Configure(Eroute);
197
  // Set the effective user for all the XrdClients used to issue 'prepares'
198
199
  // to redirector
  setenv("XrdClientEUSER", "stage", 1);
200
201
202
203
  return rc;
}


204
//------------------------------------------------------------------------------
205
// Set the log level for the XRootD server daemon
206
//------------------------------------------------------------------------------
207
208
void
XrdxCastor2Ofs::SetLogLevel(int logLevel)
209
{
210
211
212
213
214
215
216
  if (mLogLevel != logLevel)
  {
    xcastor_notice("update log level from=%s to=%s",
                   Logging::GetPriorityString(mLogLevel),
                   Logging::GetPriorityString(logLevel));
    mLogLevel = logLevel;
    Logging::SetLogPriority(mLogLevel);
217
218
219
220
  }
}


221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
//------------------------------------------------------------------------------
// Stat file path
//------------------------------------------------------------------------------
int
XrdxCastor2Ofs::stat(const char* path,
                     struct stat* buf,
                     XrdOucErrInfo& einfo,
                     const XrdSecEntity* client,
                     const char* opaque)
{
  xcastor_debug("path=%s, opaque=%s", path, opaque);
  return XrdOfs::stat(path, buf, einfo, client, opaque);
}



237
238
239
/******************************************************************************/
/*                         x C a s t o r O f s F i l e                        */
/******************************************************************************/
240
241
242
243
244


//------------------------------------------------------------------------------
// Constuctor
//------------------------------------------------------------------------------
245
XrdxCastor2OfsFile::XrdxCastor2OfsFile(const char* user, int MonID) :
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
  XrdOfsFile(user, MonID),
  mEnvOpaque(NULL),
  mIsRW(false),
  mHasWrite(false),
  mViaDestructor(false),
  mReqId("0"),
  mHasAdlerErr(false),
  mHasAdler(true),
  mAdlerOffset(0),
  mXsValue(""),
  mXsType(""),
  mIsClosed(false),
  mTpcKey(""),
  mTpcFlag(TpcFlag::kTpcNone),
  mDiskMgrPort(0)
261
{
262
  mAdlerXs = adler32(0L, Z_NULL, 0);
263
264
265
266
267
268
269
270
}


//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
XrdxCastor2OfsFile::~XrdxCastor2OfsFile()
{
271
  mViaDestructor = true;
272
  close();
273

274
275
276
277
278
  if (mEnvOpaque)
  {
    delete mEnvOpaque;
    mEnvOpaque = NULL;
  }
279
}
280
281


282
//------------------------------------------------------------------------------
283
// Open file
284
285
//------------------------------------------------------------------------------
int
286
287
288
289
290
XrdxCastor2OfsFile::open(const char*         path,
                         XrdSfsFileOpenMode  open_mode,
                         mode_t              create_mode,
                         const XrdSecEntity* client,
                         const char*         opaque)
291
{
292
293
  xcastor::Timing open_timing("open");
  TIMING("START", &open_timing);
294
  XrdOucString spath = path;
295
  xcastor_debug("path=%s", path);
296
  XrdOucString newpath = "";
297
  XrdOucString newopaque = opaque;
298

299
  // If there is explicit user opaque information we find two ?,
300
  // so we just replace it with a seperator.
301
302
  while (newopaque.replace("?", "&")) { };
  while (newopaque.replace("&&", "&")) { };
303
304

  // Check if there are several redirection tokens
305
306
307
  int firstpos = 0;
  int lastpos = 0;
  int newpos = 0;
308
  firstpos = newopaque.find("castor2fs.sfn", 0);
309
310
  lastpos = firstpos + 1;

311
  while ((newpos = newopaque.find("castor2fs.sfn", lastpos)) != STR_NPOS)
312
    lastpos = newpos + 1;
313

314
  // Erase from the beginning to the last token start
315
316
  if (lastpos > (firstpos + 1))
    newopaque.erase(firstpos, lastpos - 2 - firstpos);
317

318
  // Erase the tried parameter from the opaque information
319
320
  int tried_pos = newopaque.find("tried=");
  int amp_pos = newopaque.find('&', tried_pos);
321

322
  if (tried_pos != STR_NPOS)
323
324
325
326
327
328
  {
    if (amp_pos != STR_NPOS)
      newopaque.erase(tried_pos, amp_pos - tried_pos);
    else
      newopaque.erase(tried_pos, newopaque.length() - tried_pos);
  }
329

330
  // Set the open flags and type of operation rd_only/rdwr
331
332
  mEnvOpaque = new XrdOucEnv(newopaque.c_str());
  newpath = mEnvOpaque->Get("castor2fs.pfn1");
333
  open_mode |= SFS_O_MKPTH;
334
  create_mode |= SFS_O_MKPTH;
335

336
  if (open_mode & (SFS_O_CREAT | SFS_O_TRUNC | SFS_O_WRONLY | SFS_O_RDWR))
337
    mIsRW = true;
338
339
340

  xcastor_info("path=%s, opaque=%s isRW=%d, open_mode=%x", path, opaque,
               mIsRW, open_mode);
341

342
  // Deal with native TPC transfers which are passed directly to the OFS layer
343
  if (newopaque.find("tpc.") != STR_NPOS)
344
  {
345
346
    if (PrepareTPC(newpath, newopaque, client))
      return SFS_ERROR;
347
  }
348
  else
349
  {
350
    if (ExtractTransferInfo(*mEnvOpaque))
351
      return SFS_ERROR;
352
353
  }

354
  TIMING("OFS_OPEN", &open_timing);
355
356
  int rc = XrdOfsFile::open(newpath.c_str(), open_mode, create_mode, client,
                            newopaque.c_str());
357

358
359
360
361
  if (rc == SFS_OK)
  {
    // Try to get the file checksum from the filesystem
    int nattr = 0;
362
363
364
365
366
367
    mXsValue.reserve(32 + 1);
    mXsValue[0] = '\0';
    mXsType.reserve(32);
    mXsType[0] = '\0';

    // Deal with ceph pool if any
368
    XrdOucString poolAndPath = newpath;
369
370

    if (mEnvOpaque->Get("castor2fs.pool"))
371
      poolAndPath = mEnvOpaque->Get("castor2fs.pool") + '/' + newpath;
372

373
    // Get existing checksum - we don't check errors here
374
375
    nattr = ceph_posix_getxattr(poolAndPath.c_str(), "user.castor.checksum.type",
                                (void*)&mXsType[0], mXsType.length());
376

377
378
379
    mXsType[nattr] = '\0';
    nattr = ceph_posix_getxattr(poolAndPath.c_str(), "user.castor.checksum.value",
                                (void*)&mXsValue[0], mXsValue.length());
380

381
    mXsValue[nattr] = '\0';
382
    xcastor_debug("xs_type=%s, xs_val=%s", mXsType.c_str(),  mXsValue.c_str());
383
384

    // Get also the size of the file
385
    if (XrdOfsOss->Stat(newpath.c_str(), &mStatInfo, 0, mEnvOpaque))
386
387
388
    {
      xcastor_err("error: getting file stat information");
      rc = SFS_ERROR;
389
    }
390
  }
391

392
  if (gSrv->mLogLevel == LOG_DEBUG)
393
394
395
396
  {
    TIMING("DONE", &open_timing);
    open_timing.Print();
  }
397

398
  return rc;
399
400
401
}


402
//------------------------------------------------------------------------------
403
// Close file
404
//------------------------------------------------------------------------------
405
406
407
408
int
XrdxCastor2OfsFile::close()
{
  int rc = SFS_OK;
409

410
  if (mIsClosed)
411
    return SFS_OK;
412

413
  mIsClosed = true;
414
415
  xcastor::Timing close_timing("close");
  TIMING("START", &close_timing);
416

417
418
419
  // If this is a TPC transfer then we can drop the key from the map
  if (mTpcKey != "")
  {
420
    xcastor_debug("drop from map tpc.key=%s", mTpcKey.c_str());
421
    XrdSysMutexHelper tpc_lock(gSrv->mTpcMapMutex);
422
    gSrv->mTpcMap.erase(mTpcKey);
423
424
    mTpcKey = "";
    // Remove keys which are older than one hour
425
    std::map<std::string, struct TpcInfo>::iterator iter = gSrv->mTpcMap.begin();
426
    time_t now = time(NULL);
427

428
    while (iter != gSrv->mTpcMap.end())
429
430
431
    {
      if (now - iter->second.expire > 3600)
      {
432
        xcastor_info("expire tpc.key=%s", iter->first.c_str());
433
        gSrv->mTpcMap.erase(iter++);
434
      }
435
      else
436
437
      {
        ++iter;
438
439
      }
    }
440
  }
441

442
  TIMING("CLEAN_TPC", &close_timing);
443
  char ckSumbuf[32 + 1];
444
  sprintf(ckSumbuf, "%x", mAdlerXs);
445
  char* ckSumalg = "ADLER32";
446
  std::string newpath = mEnvOpaque->Get("castor2fs.pfn1");
447

448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
  if (mHasWrite)
  {
    if (!mHasAdler)
    {
      // Rescan the file and compute checksum
      xcastor_debug("rescan file and compute checksum");
      char blk_xs_buf[64 * 1024];
      mAdlerXs = adler32(0L, Z_NULL, 0);
      mAdlerOffset = 0;
      XrdSfsFileOffset xs_offset = 0;
      XrdSfsXferSize xs_size = 0;

      while ((xs_size = read(xs_offset, blk_xs_buf, sizeof(blk_xs_buf))) > 0)
      {
        mAdlerXs = adler32(mAdlerXs, (const Bytef*)blk_xs_buf, xs_size);
        xs_offset += xs_size;
464
      }
465
466

      mHasAdler = true;
467
    }
468

469
    // Deal with ceph pool if any
470
471
    std::string poolAndPath = newpath;
    char* pool = mEnvOpaque->Get("castor2fs.pool");
472
473
474

    if (pool)
    {
475
476
477
      poolAndPath = pool;
      poolAndPath += '/';
      poolAndPath += newpath;
478
    }
479

480
481
    sprintf(ckSumbuf, "%x", mAdlerXs);
    xcastor_debug("file xs=%s", ckSumbuf);
482

483
484
    if (ceph_posix_setxattr(poolAndPath.c_str(), "user.castor.checksum.type",
                            ckSumalg, strlen(ckSumalg), 0))
485
    {
486
487
      xcastor_err("path=%s unable to set xs type", newpath.c_str());
      rc = gSrv->Emsg("close", error, EIO, "set checksum type");
488
489
490
    }
    else
    {
491
492
      if (ceph_posix_setxattr(poolAndPath.c_str(), "user.castor.checksum.value",
                              ckSumbuf, strlen(ckSumbuf), 0))
493
      {
494
495
        xcastor_err("path=%s unable to set xs value", newpath.c_str());
        rc = gSrv->Emsg("close", error, EIO, "set checksum");
496
497
      }
    }
498
499
  }

500
501
  TIMING("DONE_XS", &close_timing);

502
503
  if ((rc = XrdOfsFile::close()))
  {
504
505
    xcastor_err("path=%s failed closing ofs file", newpath.c_str());
    rc = gSrv->Emsg("close", error, EIO, "closing ofs file");
506
  }
507

508
509
  // This means the file was not properly closed
  if (mViaDestructor)
510
  {
511
512
    xcastor_err("path=%s closed via destructor", newpath.c_str());
    rc = gSrv->Emsg("close", error, EIO, "close file - delete via destructor");
513
  }
514

515
516
517
  // If we have contact info for the diskmanager then we contact it to pass the
  // status otherwise this is a d2d or an rtcpd transfer and we don't need to
  // contact the diskmanager.
518
  if (mDiskMgrPort)
519
  {
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
    TIMING("DISKMGR_STATUS", &close_timing);
    uint64_t sz_file = 0;

    // Get the size of the file for writes
    if (mHasWrite)
    {
      if (XrdOfsOss->Stat(newpath.c_str(), &mStatInfo, 0, mEnvOpaque))
      {
        xcastor_err("path=%s failed stat", newpath.c_str());
        rc = gSrv->Emsg("close", error, EIO, "stat file");
      }
      else
        sz_file = mStatInfo.st_size;
    }

535
    int errc = (rc ? error.getErrInfo() : rc);
536
    char* errmsg = (rc ? strdup(error.getErrText()) : (char*)0);
537
    xcastor_info("send diskmgr errc=%i, errmsg=%s", errc, (errmsg ? errmsg : ""));
538
539
540
    int dm_errno = mover_close_file(mDiskMgrPort, mReqId.c_str(), sz_file,
                                    const_cast<const char*>(ckSumalg),
                                    const_cast<const char*>(ckSumbuf),
541
                                    &errc, &errmsg);
542
543
544

    // If failed to commit to diskmanager then return error
    if (dm_errno)
545
    {
546
      xcastor_err("path=%s failed closing file: %s", newpath.c_str(), errmsg);
547
      rc = gSrv->Emsg("close", error, dm_errno, "send status to diskmanager");
548
    }
549
550

    // Free errmsg memory
551
552
    if (errmsg)
      free(errmsg);
553
554
  }

555
  if (gSrv->mLogLevel == LOG_DEBUG)
556
  {
557
558
    TIMING("DONE", &close_timing);
    close_timing.Print();
559
  }
560

561
562
563
  return rc;
}

564

565
566
567
//------------------------------------------------------------------------------
// Prepare TPC transfer - save info in the global TPC map, do checks etc.
//------------------------------------------------------------------------------
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
int
XrdxCastor2OfsFile::PrepareTPC(XrdOucString& path,
                               XrdOucString& opaque,
                               const XrdSecEntity* client)
{
  char* val;
  xcastor_info("path=%s", path.c_str());

  // Get the current state of the TPC transfer
  if ((val = mEnvOpaque->Get("tpc.stage")))
  {
    if (strncmp(val, "placement", 9) == 0)
    {
      mTpcFlag = TpcFlag::kTpcSrcCanDo;
    }
    else if (strncmp(val, "copy", 4) == 0)
584
    {
585
586
587
588
      if (!(val = mEnvOpaque->Get("tpc.key")))
      {
        xcastor_err("missing tpc.key information");
        return gSrv->Emsg("open", error, EACCES, "open - missing tpc.key info");
589
590
      }

591
592
593
594
595
596
597
598
599
600
601
602
603
      if ((val = mEnvOpaque->Get("tpc.lfn")))
      {
        mTpcFlag = TpcFlag::kTpcDstSetup;
      }
      else if ((val = mEnvOpaque->Get("tpc.dst")))
      {
        mTpcFlag = TpcFlag::kTpcSrcSetup;
        mTpcKey = mEnvOpaque->Get("tpc.key"); // save key only at this stage
      }
      else
      {
        xcastor_err("missing tpc options in copy stage");
        return gSrv->Emsg("open", error, EACCES, "open - missing tpc options",
604
                          "in copy stage");
605
606
607
608
609
610
      }
    }
    else
    {
      xcastor_err("unknown tpc.stage=", val);
      return gSrv->Emsg("open", error, EACCES, "open - unknown tpc.stage=", val);
611
612
    }
  }
613
614
  else if ((val = mEnvOpaque->Get("tpc.key")) &&
           (val = mEnvOpaque->Get("tpc.org")))
615
  {
616
    mTpcFlag = TpcFlag::kTpcSrcRead;
617
  }
618

619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
  if (mTpcFlag == TpcFlag::kTpcSrcSetup)
  {
    // This is a source TPC file and we just received the second open reuqest
    // from the initiator. We save the mapping between the tpc.key and the
    // castor pfn for future open requests from the destination of the TPC transfer.
    std::string castor_pfn = mEnvOpaque->Get("castor2fs.pfn1");
    std::string tpc_org = client->tident;
    tpc_org.erase(tpc_org.find(":"));
    tpc_org += "@";
    tpc_org += client->addrInfo->Name();;
    struct TpcInfo transfer = (struct TpcInfo)
    {
      castor_pfn,
      tpc_org,
      std::string(opaque.c_str()),
      time(NULL) + sKeyExpiry
    };
    {
      // Insert new key-transfer pair in the map
      XrdSysMutexHelper tpc_lock(gSrv->mTpcMapMutex);
      std::pair< std::map<std::string, struct TpcInfo>::iterator, bool> pair =
640
        gSrv->mTpcMap.insert(std::make_pair(mTpcKey, transfer));
641
642
643
644
645
646
647
648

      if (pair.second == false)
      {
        xcastor_err("tpc.key:%s is already in the map", mTpcKey.c_str());
        gSrv->mTpcMap.erase(pair.first);
        return gSrv->Emsg("open", error, EINVAL,
                          "tpc.key already in the map for file:  ",
                          path.c_str());
649
650
651
      }
    }
  }
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
  else if (mTpcFlag == TpcFlag::kTpcSrcRead)
  {
    // This is a source TPC file and we just received the third open request which
    // comes directly from the destination of the transfer. Here we need to retrieve
    // the castor lfn from the map which we saved previously as the destination has
    // no knowledge of the castor physical file name.
    std::string check_key =  mEnvOpaque->Get("tpc.key");
    // Get init opaque just for stager job information and don't touch the current
    // opaque info as it contains the tpc.key and tpc.org info needed by the OFS
    // layer to perform the actual tpc transfer
    std::string init_opaque;
    {
      bool found_key = false;
      int num_tries = 150;
      XrdSysMutexHelper tpc_lock(gSrv->mTpcMapMutex);
      std::map<std::string, struct TpcInfo>::iterator iter =
668
        gSrv->mTpcMap.find(check_key);
669

670
671
672
      while ((num_tries > 0) && !found_key)
      {
        num_tries--;
673

674
675
676
677
678
679
        if (iter != gSrv->mTpcMap.end())
        {
          // Check if the key is still valid
          if (iter->second.expire > time(NULL))
          {
            std::string tpc_org = mEnvOpaque->Get("tpc.org");
680

681
682
683
684
685
686
687
            if (tpc_org != iter->second.org)
            {
              xcastor_err("tpc.org from destination=%s, not matching initiator orig.=%s",
                          tpc_org.c_str(), iter->second.org.c_str());
              return gSrv->Emsg("open", error, EPERM, "PrepareTPC - destination "
                                "tpc.org not matching initiator origin");
            }
688

689
690
691
692
693
694
695
696
            path = iter->second.path.c_str();
            init_opaque = iter->second.opaque;
            found_key = true;
          }
          else
          {
            xcastor_err("tpc key expired");
            return gSrv->Emsg("open", error, EKEYEXPIRED, "PrepareTPC - "
697
                              "tpc key expired");
698
          }
699
        }
700
701
702
703
704
705
706
707
708
709
        else
        {
          // Wait for the initiator to show up with the proper key and unlock
          // the map so that he can add the key
          gSrv->mTpcMapMutex.UnLock(); // <--
          XrdSysTimer timer;
          timer.Wait(100);
          xcastor_debug("wait for initator to come with the tpc.key");
          gSrv->mTpcMapMutex.Lock();  // -->
        }
710
711
      }

712
713
714
715
716
      if (!found_key)
      {
        xcastor_err("tpc key from destination not in map");
        return gSrv->Emsg("open", error, EINVAL, "PrepareTPC - can not find "
                          "tpc.key in map");
717
718
      }
    }
719
    // Now we can extract the transfer info
720
    XrdOucEnv env_opaque(init_opaque.c_str());
721

722
    if (ExtractTransferInfo(env_opaque))
723
      return SFS_ERROR;
724
  }
725
726
727
728
729
730
731
  else if (mTpcFlag == TpcFlag::kTpcDstSetup)
  {
    // This is a TPC destination and we force the recomputation of the checksum
    // at the end since all writes go directly to the file without passing through
    // the write method in OFS.
    mHasWrite = true;
    mHasAdler = false;
732
    // Now we can extract the transfer info
733
    XrdOucEnv env_opaque(opaque.c_str());
734

735
    if (ExtractTransferInfo(env_opaque))
736
      return SFS_ERROR;
737
738
  }

739
  return SFS_OK;
740
741
}

742

743
//------------------------------------------------------------------------------
744
// Extract diskmanager info from the opaque data
745
//------------------------------------------------------------------------------
746
int
747
XrdxCastor2OfsFile::ExtractTransferInfo(XrdOucEnv& env_opaque)
748
{
749
  char* val = env_opaque.Get("castor2fs.pfn2");
750

751
  if (!val)
752
  {
753
    // If no diskmanager contact info is present in the opaque info, it means
754
    // that this is an internal d2d or an rtcpd transfer and we don't try to
755
    // contact the diskmanager but we allow it to pass through. The authorization
756
    // plugin makes sure we only allow trusted hosts to do transfers - it
757
    // verifies the signature.
758
    xcastor_debug("no diskmanager opaque infomation - this is either a d2d "
759
                  "transfer or an rtcpd request");
760
    return SFS_OK;
761
762
  }

763
  std::string connect_info = val;
764
  int pos1, pos2;
765

766
767
  // Syntax for request is: <id:diskmanagerport:subReqId> out of which only the
  // last two values are used for the moment
768
769
770
  if ((pos1 = connect_info.find(":")) != STR_NPOS)
  {
    if ((pos2 = connect_info.find(":", pos1 + 1)) != STR_NPOS)
771
    {
772
      std::string sport(connect_info, pos1 + 1, pos2 - 1);
773
774
      mDiskMgrPort = atoi(sport.c_str());
      mReqId.assign(connect_info, pos2 + 1, std::string::npos);
775
776
777
    }
  }

778
  return SFS_OK;
779
780
}

781
782

//------------------------------------------------------------------------------
783
// Verify checksum
784
//------------------------------------------------------------------------------
785
bool
786
XrdxCastor2OfsFile::VerifyChecksum()
787
{
788
  bool rc = true;
789
790
  std::string xs_val;
  std::string xs_type;
791

792
  if ((mXsValue != "") && (mXsType != ""))
793
794
  {
    char ckSumbuf[32 + 1];
795
796
797
    sprintf(ckSumbuf, "%x", mAdlerXs);
    xs_val = ckSumbuf;
    xs_type = "ADLER32";
798

799
800
801
    if (xs_val != mXsValue)
    {
      gSrv->Emsg("VerifyChecksum", error, EIO, "checksum value wrong");
802
803
      rc = false;
    }
804

805
806
807
    if (xs_type != mXsType)
    {
      gSrv->Emsg("VerifyChecksum", error, EIO, "wrong checksum type");
808
809
      rc = false;
    }
810

811
812
    if (!rc)
    {
813
      xcastor_err("error: checksum %s != %s with algorithms [ %s <=> %s ]",
814
815
816
817
818
819
                  xs_val.c_str(), mXsValue.c_str(),
                  xs_type.c_str(), mXsType.c_str());
    }
    else
    {
      xcastor_debug("checksum OK: %s", xs_val.c_str());
820
821
    }
  }
822

823
824
825
  return rc;
}

826
827
828
829

//------------------------------------------------------------------------------
// Read
//------------------------------------------------------------------------------
830
int
831
832
XrdxCastor2OfsFile::read(XrdSfsFileOffset fileOffset,   // Preread only
                         XrdSfsXferSize   amount)
833
{
834
835
  int rc = XrdOfsFile::read(fileOffset, amount);
  mHasAdler = false;
836
837
838
  return rc;
}

839
840
841
842
843

//------------------------------------------------------------------------------
// Read
//------------------------------------------------------------------------------
XrdSfsXferSize
844
845
846
XrdxCastor2OfsFile::read(XrdSfsFileOffset fileOffset,
                         char*            buffer,
                         XrdSfsXferSize   buffer_size)
847
{
848
  xcastor_debug("off=%llu, len=%i", fileOffset, buffer_size);
849

850
  // If we once got an adler checksum error, we fail all reads
851
852
  if (mHasAdlerErr)
  {
853
    xcastor_err("error: found xs error, fail read at off=%ll, size=%i",
854
                fileOffset, buffer_size);
855
856
857
    return SFS_ERROR;
  }

858
  int rc = XrdOfsFile::read(fileOffset, buffer, buffer_size);
859

860
  // Computation of adler checksum - we disable it if seeks happen
861
862
  if (fileOffset != mAdlerOffset)
    mHasAdler = false;
863

864
865
866
  if (mHasAdler)
  {
    mAdlerXs = adler32(mAdlerXs, (const Bytef*) buffer, rc);
867

868
    if (rc > 0)
869
    {
870
      mAdlerOffset += rc;
871

872
873
874
875
876
877
878
879
880
      if (fileOffset + rc >= mStatInfo.st_size)
      {
        // Invoke the checksum verification
        if (!VerifyChecksum())
        {
          mHasAdlerErr = true;
          rc = SFS_ERROR;
        }
      }
881
882
    }
  }
883

884
885
  return rc;
}
886
887
888
889
890


//------------------------------------------------------------------------------
// Read
//------------------------------------------------------------------------------
891
int
892
XrdxCastor2OfsFile::read(XrdSfsAio* aioparm)
893
{
894
895
  int rc = XrdOfsFile::read(aioparm);
  mHasAdler = false;
896
897
  return rc;
}
898
899
900
901
902


//------------------------------------------------------------------------------
// Write
//------------------------------------------------------------------------------
903
XrdSfsXferSize
904
905
906
XrdxCastor2OfsFile::write(XrdSfsFileOffset fileOffset,
                          const char*      buffer,
                          XrdSfsXferSize   buffer_size)
907
{
908
  int rc = XrdOfsFile::write(fileOffset, buffer, buffer_size);
909

910
  // Computation of adler checksum - we disable it if seek happened
911
912
  if (fileOffset != mAdlerOffset)
    mHasAdler = false;
913