rtcpd_Tape.c 101 KB
Newer Older
Olof Barring's avatar
Olof Barring committed
1
/*
2
 * Copyright (C) 1999-2004 by CERN IT
Olof Barring's avatar
Olof Barring committed
3
4
5
6
 * All rights reserved
 */

/*
7
 * rtcpd_Tape.c - RTCOPY server tape IO thread
Olof Barring's avatar
Olof Barring committed
8
9
 */

10
11
#include <stdlib.h>
#include <time.h>
Olof Barring's avatar
Olof Barring committed
12
13
14
15
16
#include <sys/param.h>
#include <sys/types.h>                  /* Standard data types          */
#include <netdb.h>                      /* Network "data base"          */
#include <sys/socket.h>
#include <netinet/in.h>                 /* Internet data types          */
17
#include <sys/time.h>
Benjamin Couturier's avatar
Benjamin Couturier committed
18
#include <sys/times.h>
Olof Barring's avatar
Olof Barring committed
19
20
21
22
23
24
25
26
27
28
29
30
31
32

#include <errno.h>
#include <stdarg.h>
#include <sys/stat.h>

#include <pwd.h>
#include <Castor_limits.h>
#include <Cglobals.h>
#include <log.h>
#include <osdep.h>
#include <net.h>
#include <Cthread_api.h>
#include <vdqm_api.h>
#include <Ctape_api.h>
33
#include <Cuuid.h>
Olof Barring's avatar
Olof Barring committed
34
35
36
37
#include <rtcp_constants.h>
#include <rtcp.h>
#include <rtcp_server.h>
#include <serrno.h>
38
#include "tplogger_api.h"
39
char *getconfent (char *, char *, int);
Olof Barring's avatar
Olof Barring committed
40

41
#include "h/tapeBridgeClientInfo2MsgBody.h"
42
#include "h/tapebridge_constants.h"
43
44
#include "h/tapebridge_recvTapeBridgeFlushedToTapeAck.h"
#include "h/tapebridge_sendTapeBridgeFlushedToTape.h"
45
#include "h/tapebridge_tapeFlushModeToStr.h"
Benjamin Couturier's avatar
Benjamin Couturier committed
46

47
#include <fcntl.h>
48
#include <stdint.h>
49
50
#include <sys/types.h>
#include <sys/stat.h>
51

Olof Barring's avatar
Olof Barring committed
52
53
#define TP_STATUS(X) (proc_stat.tapeIOstatus.current_activity = (X))
#define TP_SIZE(X)   (proc_stat.tapeIOstatus.nbbytes += (u_signed64)(X))
54
#define DEBUG_PRINT(X) {if ( debug == TRUE ) rtcp_log X ;}
Olof Barring's avatar
Olof Barring committed
55
56

typedef struct thread_arg {
57
    int client_socket;
Olof Barring's avatar
Olof Barring committed
58
    tape_list_t *tape;
59
    rtcpClientInfo_t *client;
60

61
62
63
64
65
    /**
     * Boolean value specifying whether or not the client of the rtcpd daemon is
     * tapebridged.
     */
    int clientIsTapeBridge;
66

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    /**
     * The mode of tape-flush behaviour to be used when writing to tape.
     *
     * TAPEBRIDGE_N_FLUSHES_PER_FILE
     * TAPEBRIDGE_ONE_FLUSH_PER_N_FILES
     */
    uint32_t tapeFlushMode;

    /**
     * The maximum number of bytes to be written to tape before a flush to
     * tape.
     *
     * The value of maxBytesBeforeFlush is only applicable if tapeFlushMode is
     * TAPEBRIDGE_ONE_FLUSH_PER_N_FILES.
     */
82
83
    uint64_t maxBytesBeforeFlush;

84
85
86
87
88
89
90
    /**
     * The maximum number of files to be written to tape before a flush to
     * tape.
     *
     * The value of maxFilesBeforeFlush is only applicable if tapeFlushMode is
     * TAPEBRIDGE_ONE_FLUSH_PER_N_FILES.
     */
91
    uint64_t maxFilesBeforeFlush;
92
93
94
95
96
97
98
99

    /**
     * Pointer to a boolean indicating whether or not the tape still needs to
     * be released from the drive after all of the work of rtcpd has been
     * finished, in other words after the client has sent the "end of entire
     * session" message (RTCP_ENDOF_REQ) on the main socket.
     */
    int *tapeNeedsToBeReleasedAtEndOfSession;
Olof Barring's avatar
Olof Barring committed
100
101
} thread_arg_t;

102
103
extern int Debug;

Olof Barring's avatar
Olof Barring committed
104
105
106
107
108
109
110
111
112
113
114
115
116
extern int nb_bufs;

extern int bufsz;

extern processing_cntl_t proc_cntl;
extern processing_status_t proc_stat;

extern buffer_table_t **databufs;

extern rtcpClientInfo_t *client;

extern int success;
extern int failure;
117
extern int AbortFlag;
Olof Barring's avatar
Olof Barring committed
118
119

static int last_block_done = 0;
120
static int WaitToJoin = FALSE;
Olof Barring's avatar
Olof Barring committed
121

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
 * Signal to disk IO thread that file has been positioned (tape read with
 * stager only).
 */
int rtcpd_SignalFilePositioned(tape_list_t *tape, file_list_t *file) {
    int rc;

    rtcp_log(LOG_DEBUG,"rtcpd_SignalFilePositioned() called\n");
    tl_rtcpd.tl_log( &tl_rtcpd, 11, 2, 
                     "func"   , TL_MSG_PARAM_STR, "rtcpd_SignalFilePositioned",
                     "Message", TL_MSG_PARAM_STR, "called" );
    if ( tape == NULL || file == NULL ) {
        serrno = EINVAL;
        return(-1);
    }

    rc = Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
    if ( rc == -1 ) {
        rtcp_log(LOG_ERR,"rtcpd_SignalFilePositioned(): Cthread_mutex_lock_ext(proc_cntl): %s\n",
            sstrerror(serrno));
        tl_rtcpd.tl_log( &tl_rtcpd, 11, 3, 
                         "func"   , TL_MSG_PARAM_STR, "rtcpd_SignalFilePositioned",
                         "Message", TL_MSG_PARAM_STR, "Cthread_mutex_lock_ext(proc_cntl)",
                         "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno) );        
        return(-1);
    }
    rc = Cthread_cond_broadcast_ext(proc_cntl.cntl_lock);
    if ( rc == -1 ) {
        rtcp_log(LOG_ERR,"rtcpd_SignalFilePositioned(): Cthread_cond_broadcast_ext(proc_cntl): %s\n",
            sstrerror(serrno));
        tl_rtcpd.tl_log( &tl_rtcpd, 11, 3, 
                         "func"   , TL_MSG_PARAM_STR, "rtcpd_SignalFilePositioned",
                         "Message", TL_MSG_PARAM_STR, "Cthread_cond_broadcast_ext(proc_cntl)",
                         "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno) );                
        (void)Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
        return(-1);
    }
    rc = Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
    if ( rc == -1 ) {
        rtcp_log(LOG_ERR,"rtcpd_SignalFilePositioned(): Cthread_mutex_unlock_ext(proc_cntl): %s\n",
                 sstrerror(serrno));
        tl_rtcpd.tl_log( &tl_rtcpd, 11, 3, 
                         "func"   , TL_MSG_PARAM_STR, "rtcpd_SignalFilePositioned",
                         "Message", TL_MSG_PARAM_STR, "Cthread_mutex_unlock_ext(proc_cntl)",
                         "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno) );                        
        return(-1);
    }
    rtcp_log(LOG_DEBUG,"rtcpd_SignalFilePositioned() returns\n");
    tl_rtcpd.tl_log( &tl_rtcpd, 11, 2, 
                     "func"   , TL_MSG_PARAM_STR, "rtcpd_SignalFilePositioned",
                     "Message", TL_MSG_PARAM_STR, "returns" );
    return(0);
}

176
177
178
179
180
181
182
183
static int WaitDiskIO() {
    int rc;

    rc = Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
    if ( rc == -1 ) return(-1);

    rtcp_log(LOG_DEBUG,"WaitDiskIO(): diskIOfinished=%d, nb_diskIOactive=%d\n",
             proc_cntl.diskIOfinished,proc_cntl.nb_diskIOactive);
184
185
186
187
    tl_rtcpd.tl_log( &tl_rtcpd, 11, 3, 
                     "func"           , TL_MSG_PARAM_STR, "WaitDiskIO",
                     "diskIOfinished" , TL_MSG_PARAM_INT, proc_cntl.diskIOfinished,
                     "nb_diskIOactive", TL_MSG_PARAM_INT, proc_cntl.nb_diskIOactive );
188
189
190
191
192
    while ( proc_cntl.diskIOfinished == 0 || proc_cntl.nb_diskIOactive > 0 ) {
        rc = Cthread_cond_wait_ext(proc_cntl.cntl_lock);
        if ( rc == -1 ) return(-1);
        rtcp_log(LOG_DEBUG,"WaitDiskIO(): diskIOfinished=%d, nb_diskIOactive=%d\n",
                 proc_cntl.diskIOfinished,proc_cntl.nb_diskIOactive);
193
194
195
196
        tl_rtcpd.tl_log( &tl_rtcpd, 11, 3, 
                         "func"           , TL_MSG_PARAM_STR, "WaitDiskIO",
                         "diskIOfinished" , TL_MSG_PARAM_INT, proc_cntl.diskIOfinished,
                         "nb_diskIOactive", TL_MSG_PARAM_INT, proc_cntl.nb_diskIOactive );
197
198
199
200
201
    }
    rc = Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
    return(rc);
}

202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
static void TapeIOstarted() {
    (void)Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
    proc_cntl.tapeIOstarted = 1;
    proc_cntl.tapeIOfinished = 0;
    (void)Cthread_cond_broadcast_ext(proc_cntl.cntl_lock);
    (void)Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
    return;
}

static void TapeIOfinished() {
    (void)Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
    proc_cntl.tapeIOstarted = 0;
    proc_cntl.tapeIOfinished = 1;
    (void)Cthread_cond_broadcast_ext(proc_cntl.cntl_lock);
    (void)Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
    return;
}

220
221
222
223
224
225
226
227
228
int rtcpd_nbFullBufs(int increment) {
    int rc;
    (void)Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
    proc_cntl.nbFullBufs += increment;
    rc = proc_cntl.nbFullBufs;
    (void)Cthread_cond_broadcast_ext(proc_cntl.cntl_lock);
    (void)Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
    return(rc);
}
229

Olof Barring's avatar
Olof Barring committed
230
231
232
/*
 * Read from tape to memory
 */
233
234
235
236
237
238
239
240
241
242
243
244
static int MemoryToTape(
    const int          tape_fd,
    int *const         indxp,
    int *const         firstblk,
    int *const         diskIOfinished,
    tape_list_t *const tape, 
    file_list_t *const file,
    const uint32_t     tapeFlushMode,
    uint64_t *const    nbBytesWrittenWithoutFlush,
    uint64_t *const    nbFilesWrittenWithoutFlush,
    const uint64_t     maxBytesBeforeFlush,
    const uint64_t     maxFilesBeforeFlush,
245
    int *const         flushedToTapeAfterNFiles,
246
    uint64_t *const    batchBytesToTape) {
247
248
    int rc = 0;
    int nb_bytes, i, j, last_sz, blksiz, severity, lrecl;
249
    int end_of_tpfile, buf_done, proc_err;
250
251
    int pileupWaitTime = 0, nbFullBufs;
    char *p;
252
    register int debug = Debug;
253
    register int NoSyncAccess;
Olof Barring's avatar
Olof Barring committed
254
255
256
257
    rtcpTapeRequest_t *tapereq = NULL;
    rtcpFileRequest_t *filereq = NULL;

    if ( tape_fd < 0 || indxp == NULL || firstblk == NULL ||
258
         diskIOfinished == NULL || tape == NULL || file == NULL ) {
Olof Barring's avatar
Olof Barring committed
259
260
261
262
263
264
265
        serrno = EINVAL;
        return(-1);
    }
    tapereq = &tape->tapereq;
    filereq = &file->filereq;
    if ( last_block_done != 0 ) {
        rtcp_log(LOG_ERR,"MemoryToTape() called beyond last block!!!\n");
266
267
268
        tl_rtcpd.tl_log( &tl_rtcpd, 3, 2, 
                         "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                         "Message", TL_MSG_PARAM_STR, "called beyond last block" );
Olof Barring's avatar
Olof Barring committed
269
270
271
272
273
        return(-1);
    }

    blksiz = filereq->blocksize;
    lrecl = filereq->recordlength;
274
275
276
    if ( lrecl <= 0 ) {
        lrecl = blksiz;
    }
277

278
    NoSyncAccess = *diskIOfinished;
279

280
281
282
283
    if ( ((p = getenv("RTCPD_PILEUP_WAIT_TIME")) != NULL) ||
         ((p = getconfent("RTCPD","PILEUP_WAIT_TIME",0)) != NULL) )
      pileupWaitTime = atoi(p);

Olof Barring's avatar
Olof Barring committed
284
285
286
287
    /*
     * Write loop to break out of
     */
    end_of_tpfile = FALSE;
288
    proc_err = 0;
Olof Barring's avatar
Olof Barring committed
289
290
291
292
293
294
    for (;;) {
        i = *indxp;
        buf_done = FALSE;
        /*
         * Synchronize access to next buffer
         */
295
        if ( NoSyncAccess == 0 ) {
296
297
298
299
300
301
302
303
            TP_STATUS(RTCP_PS_WAITMTX);
            nbFullBufs = rtcpd_nbFullBufs(0);
            TP_STATUS(RTCP_PS_WAITMTX);
            if ( (nbFullBufs < 1) && (pileupWaitTime > 0) ) {
              rtcp_log(LOG_INFO,"Tape IO protection against disk IO slack: index=%d, nb full buffers=%d, wait %d seconds\n",
                       i,
                       nbFullBufs,
                       pileupWaitTime);
304
305
306
307
308
309
              tl_rtcpd.tl_log( &tl_rtcpd, 10, 5, 
                               "func"           , TL_MSG_PARAM_STR, "MemoryToTape",
                               "Message"        , TL_MSG_PARAM_STR, "Tape IO protection against disk IO slack",
                               "index"          , TL_MSG_PARAM_INT, i,
                               "nb full buffers", TL_MSG_PARAM_INT, nbFullBufs,
                               "wait seconds"   , TL_MSG_PARAM_INT, pileupWaitTime );
310
              rtcp_log(LOG_INFO,"path=%s\n",filereq->file_path);
311
312
313
              tl_rtcpd.tl_log( &tl_rtcpd, 10, 2, 
                               "func", TL_MSG_PARAM_STR, "MemoryToTape",
                               "path", TL_MSG_PARAM_STR, filereq->file_path);
314
315
              sleep(pileupWaitTime);
            }
316
317
318
319
320
321
            TP_STATUS(RTCP_PS_WAITMTX);
            rc = Cthread_mutex_lock_ext(databufs[i]->lock);
            TP_STATUS(RTCP_PS_NOBLOCKING);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_mutex_lock_ext(): %s\n",
                    sstrerror(serrno));
322
323
324
325
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"    , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message" , TL_MSG_PARAM_STR, "Cthread_mutex_lock_ext",
                                 "Error"   , TL_MSG_PARAM_STR, sstrerror(serrno));
326
                return(-1);
327
            }
Olof Barring's avatar
Olof Barring committed
328
329
330
331
332
        }
        /*
         * Wait until it is full
         */
        while (databufs[i]->flag == BUFFER_EMPTY) {
333
334
            rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
            if ( (proc_err = ((severity | rtcpd_CheckProcError()) & 
335
                  RTCP_FAILED)) != 0 ) {
336
337
338
339
340
                (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
                break;
            }

341
342
            if ( NoSyncAccess == 1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() unexpected empty buffer %d after diskIO finished\n",i);
343
344
345
346
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "unexpected empty buffer after diskIO finished",
                                 "buffer" , TL_MSG_PARAM_INT, i );
347
348
349
350
351
352
                (void)rtcpd_SetReqStatus(NULL,file,SEINTERNAL,
                                         RTCP_FAILED|RTCP_UNERR);
                (void)rtcpd_AppendClientMsg(NULL,file,"Internal error: %s\n",
                                           "unexpected empty buffer");
                return(-1);
            }
353
            databufs[i]->nb_waiters++;
Olof Barring's avatar
Olof Barring committed
354
355
356
357
358
359
            TP_STATUS(RTCP_PS_WAITCOND);
            rc = Cthread_cond_wait_ext(databufs[i]->lock);
            TP_STATUS(RTCP_PS_NOBLOCKING);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_cond_wait_ext(): %s\n",
                    sstrerror(serrno));
360
361
362
363
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "Cthread_cond_wait_ext",
                                 "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
Olof Barring's avatar
Olof Barring committed
364
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
Olof Barring's avatar
Olof Barring committed
365
366
                return(-1);
            }
367
            databufs[i]->nb_waiters--;
368
369
370
        } /*  while (databufs[i]->flag == BUFFER_EMPTY) */

        if ( databufs[i]->flag == BUFFER_FULL ) {
371
            rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
372
            if ( (proc_err = ((severity | rtcpd_CheckProcError()) & 
373
                  RTCP_FAILED)) != 0 ) {
Olof Barring's avatar
Olof Barring committed
374
375
376
                (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
            }
377
        }
378
        if ( proc_err != 0 ) break;
379
        DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() buffer %d full\n",i));
Olof Barring's avatar
Olof Barring committed
380
381
382
383
384
385

        /*
         * Verify that actual buffer size matches block size
         */
        if ( (databufs[i]->length % blksiz) != 0 ) {
            rtcp_log(LOG_ERR,"MemoryToTape() blocksize mismatch\n");
386
387
388
            tl_rtcpd.tl_log( &tl_rtcpd, 3, 2, 
                             "func"    , TL_MSG_PARAM_STR, "MemoryToTape",
                             "Messsage", TL_MSG_PARAM_STR, "blocksize mismatch" );
389
390
391
392
            if ( NoSyncAccess == 0 ) {
                (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
            }
393
394
395
396
            (void)rtcpd_SetReqStatus(NULL,file,SEINTERNAL,
                                     RTCP_FAILED|RTCP_UNERR);
            (void)rtcpd_AppendClientMsg(NULL,file,"Internal error: %s\n",
                                        "MemoryToTape() blocksize mismatch");
Olof Barring's avatar
Olof Barring committed
397
398
399
400
401
402
403
            return(-1);
        }
        /*
         * Check if this is the last buffer of the tape file
         */
        end_of_tpfile = databufs[i]->end_of_tpfile;
        if ( end_of_tpfile == TRUE ) 
404
            DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() end of tape file in buffer %d\n",i));
Olof Barring's avatar
Olof Barring committed
405
406
407
408
        /*
         * Copy the data to tape
         */
        last_sz = 0;
409
        if ( filereq->maxnbrec > 0 ) {
Olof Barring's avatar
Olof Barring committed
410
411
412
413
            /*
             * Check if we reached the number of records limit
             * and, if so, reduce the data_length accordingly.
             */
414
415
416
417
                if ( filereq->nbrecs + databufs[i]->data_length/lrecl >
                    filereq->maxnbrec ) {
                    databufs[i]->data_length = (int)(filereq->maxnbrec -
                        filereq->nbrecs) * lrecl;
418
                    rtcpd_SetReqStatus(NULL,file,ERTLIMBYSZ,RTCP_OK | RTCP_LIMBYSZ);
419
                }
Olof Barring's avatar
Olof Barring committed
420
        }
421
422
        DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() write %d bytes to tape\n",
            databufs[i]->data_length));
423
424
425
426
427
        j = *firstblk;
        for (;;) {
                    /*
                     * Have we read all data in this buffer yet?
                     */
428
429
430
431
432
433
                    if ( j*blksiz >= databufs[i]->data_length + 
                                     (*firstblk) * blksiz ) {
                        DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() reached EOB (buffer %d), j=%d, blksiz=%d, data_length=%d, firstblk=%d\n",
                                i,j,blksiz,databufs[i]->data_length,*firstblk));
                        break;
                    }
434
435
436
437
                    /*
                     * Normal no record padding format.
                     * Note: last block of file may be partial.
                     */
438
                    nb_bytes = databufs[i]->data_length - (j-*firstblk)*blksiz;
439
                    if ( nb_bytes > blksiz ) nb_bytes = blksiz;
Olof Barring's avatar
Olof Barring committed
440
441
442
443
444
445

            /*
             * >>>>>>>>>>> write to tape <<<<<<<<<<<<<
             */
            TP_STATUS(RTCP_PS_WRITE);
            rc = twrite(tape_fd,
446
                        databufs[i]->buffer + j*blksiz,
Olof Barring's avatar
Olof Barring committed
447
448
                        nb_bytes,
                        tape,
449
450
                        file,
                        tapeFlushMode);
Olof Barring's avatar
Olof Barring committed
451
            TP_STATUS(RTCP_PS_NOBLOCKING);
Benjamin Couturier's avatar
Benjamin Couturier committed
452
453
            
	    if ( rc == -1 ) {
454
                rtcp_log(LOG_ERR,"MemoryToTape() tape write error\n");
455
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
456
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
457
458
                                 "Message", TL_MSG_PARAM_STR, "tape write error",
                                 "Drive"  , TL_MSG_PARAM_STR, tapereq->unit );
459
460
461
462
                if ( NoSyncAccess == 0 ) {
                    (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                    (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
                }
Olof Barring's avatar
Olof Barring committed
463
464
                return(-1);
            }
Benjamin Couturier's avatar
Benjamin Couturier committed
465
466

	    if ( rc == 0 ) {
Olof Barring's avatar
Olof Barring committed
467
468
469
470
471
472
473
                /*
                 * EOV reached
                 */
                if ( file->eovflag == 1 ) *firstblk = j;
                break;
            }
            last_sz += rc;
474
475
476
477
                file->tapebytes_sofar += (u_signed64)rc;
                if ( lrecl > 0 && rc/lrecl > 0 ) 
                    filereq->nbrecs += (u_signed64)rc/lrecl;
                else if ( rc > 0 ) filereq->nbrecs++;
Benjamin Couturier's avatar
Benjamin Couturier committed
478

Olof Barring's avatar
Olof Barring committed
479
            TP_SIZE(rc);
480
            j++;
Olof Barring's avatar
Olof Barring committed
481
        } /* End of for (j=...) */
482
483
        DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() wrote %d bytes to tape\n",
            last_sz));
484
485
486
487
488
489
490
491
        /*
         * Subtract what we've just written. If we have reblocked
         * the data the last_sz is larger than the input data size
         * because records have been padded. In that case 
         * rtcpd_VarToFix() returns 0 if all input data has been
         * converted.
         */
            databufs[i]->data_length -= last_sz;
Olof Barring's avatar
Olof Barring committed
492
493
494
495
496

        /*
         * Reset the buffer semaphore only if the
         * full buffer has been succesfully written.
         */
497
        if ( databufs[i]->data_length <= 0 ) {
Olof Barring's avatar
Olof Barring committed
498
499
500
501
502
            /*
             * Check if this is the request end.
             */
            last_block_done = databufs[i]->last_buffer;
            if ( last_block_done != 0 )
503
504
                DEBUG_PRINT((LOG_DEBUG,"MemoryToTape() last block done = %d\n",
                    last_block_done));
Olof Barring's avatar
Olof Barring committed
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
            /*
             * Reset all switches so that the buffer can be reused
             */
            databufs[i]->bufendp = 0;
            databufs[i]->data_length = 0;
            databufs[i]->end_of_tpfile = FALSE;
            databufs[i]->last_buffer = FALSE;
            databufs[i]->flag = BUFFER_EMPTY;
            buf_done = TRUE;
            /*
             * Increment the circular data buffer index and reset the
             * tape block index.
             */
            *indxp = (*indxp+1) % nb_bufs;
            *firstblk = 0;
        }
        /*
         * Release lock on this buffer
         */
524
        if ( databufs[i]->nb_waiters > 0 && NoSyncAccess == 0 ) {
525
526
527
528
            rc = Cthread_cond_broadcast_ext(databufs[i]->lock);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_cond_broadcast_ext(): %s\n",
                    sstrerror(serrno));
529
530
531
532
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "Cthread_cond_broadcast_ext",
                                 "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
533
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
534
                return(-1);
535
            }
Olof Barring's avatar
Olof Barring committed
536
        }
537
538
539
540
541
        if ( NoSyncAccess == 0 ) {
            rc = Cthread_mutex_unlock_ext(databufs[i]->lock);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_mutex_unlock_ext(): %s\n",
                    sstrerror(serrno));
542
543
544
545
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "Cthread_mutex_unlock_ext",
                                 "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
546
                return(-1);
547
            }
Olof Barring's avatar
Olof Barring committed
548
549
        }

550
        if ( buf_done == TRUE && NoSyncAccess == 0 ) {
551
            (void)rtcpd_nbFullBufs(-1);
Olof Barring's avatar
Olof Barring committed
552
553
554
555
556
557
558
            /*
             * Decrement the number of reserved buffers.
             */
            rc = Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_mutex_lock_ext(): %s\n",
                    sstrerror(serrno));
559
560
561
562
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "Cthread_mutex_lock_ext",
                                 "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
Olof Barring's avatar
Olof Barring committed
563
564
565
                return(-1);
            }
            proc_cntl.nb_reserved_bufs--;
566
567
            /*
             * Check if disk IO has finished. If so, we don't need to sync.
568
569
570
571
572
             * buffers anymore. The test on diskIOstarted may seem a bit
             * unlogical but in fact this flag should have been set by the
             * thread started before we can allow no sync. access - there
             * is a window where disk IO starter can exit before the last
             * disk IO thread has started. 
573
574
             */
            if ( proc_cntl.diskIOfinished == 1 && 
575
                 proc_cntl.diskIOstarted == 1 &&
576
577
                 proc_cntl.nb_diskIOactive == 0 ) {
                rtcp_log(LOG_DEBUG,"MemoryToTape() disk IO has finished! Sync. not needed anymore\n");
578
579
580
                tl_rtcpd.tl_log( &tl_rtcpd, 10, 2, 
                                 "func"   , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message", TL_MSG_PARAM_STR, "disk IO has finished! Sync. not needed anymore" );
581
582
                NoSyncAccess = *diskIOfinished = 1;
            }
Olof Barring's avatar
Olof Barring committed
583
584
585
586

            /*
             * Signal to main thread that we changed cntl info
             */
587
588
589
590
591
            if ( proc_cntl.nb_reserved_bufs < nb_bufs ) {
                rc = Cthread_cond_broadcast_ext(proc_cntl.cntl_lock);
                if ( rc == -1 ) {
                    rtcp_log(LOG_ERR,"MemoryToTape() Cthread_cond_broadcast_ext(proc_cntl): %s\n",
                        sstrerror(serrno));
592
593
594
595
                    tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                     "func"    , TL_MSG_PARAM_STR, "MemoryToTape",
                                     "Message" , TL_MSG_PARAM_STR, "Cthread_cond_broadcast_ext(proc_cntl)",
                                     "Error"   , TL_MSG_PARAM_STR, sstrerror(serrno));
596
                    (void)Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
597
                    return(-1);
598
                }
Olof Barring's avatar
Olof Barring committed
599
600
601
602
603
604
            }

            rc = Cthread_mutex_unlock_ext(proc_cntl.cntl_lock);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"MemoryToTape() Cthread_mutex_unlock_ext(): %s\n",
                    sstrerror(serrno));
605
606
607
608
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"    , TL_MSG_PARAM_STR, "MemoryToTape",
                                 "Message" , TL_MSG_PARAM_STR, "Cthread_mutex_unlock_ext",
                                 "Error"   , TL_MSG_PARAM_STR, sstrerror(serrno));
Olof Barring's avatar
Olof Barring committed
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
                return(-1);
            }
        }

        /*
         * Break out to main tape IO loop if we have hit 
         * the end of the current tape file
         */
        if ( end_of_tpfile == TRUE ) break;
        /*
         * Check if we reached end of volume. If so, return to
         * let the main tapeIO loop iterate one step in the tape
         * request list. This must be the last file request (in
         * a multi-volume request only the last file can span). Note
         * that we have to start on the same indxp since the last
         * buffer has not been completely written out.
         */
        if ( file->eovflag == 1 ) break;
        /*
         * Check if last block
         */
        if ( last_block_done != 0 ) break;
        /*
         * Has something fatal happened while we were occupied
         * writing to the tape?
         */
635
        rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
636
        if ( (proc_err = ((severity | rtcpd_CheckProcError()) & 
637
              RTCP_FAILED)) != 0 ) {
638
            break;
639
        }
Olof Barring's avatar
Olof Barring committed
640
641
    } /* End of for (;;) */

642
    TP_STATUS(RTCP_PS_CLOSE);
643
644
645
646
647
648
649
    if ( proc_err == 0 ) rc = tclose(tape_fd,tape,file,tapeFlushMode,
        nbBytesWrittenWithoutFlush,
        nbFilesWrittenWithoutFlush,
        maxBytesBeforeFlush,
        maxFilesBeforeFlush,
        flushedToTapeAfterNFiles,
        batchBytesToTape);
Olof Barring's avatar
Olof Barring committed
650
    TP_STATUS(RTCP_PS_NOBLOCKING);
651

Olof Barring's avatar
Olof Barring committed
652
653
654
655
656
657
658
    return(rc);
}

static int TapeToMemory(int tape_fd, int *indxp, int *firstblk,
                        tape_list_t *tape, 
                        file_list_t *file) {
    int nb_bytes, rc, i, j, last_sz, blksiz, current_bufsz;
659
    int lrecl, end_of_tpfile, break_and_return, severity, proc_err;
Olof Barring's avatar
Olof Barring committed
660
    int nb_skipped_blks = 0;
661
    register int debug = Debug;
Olof Barring's avatar
Olof Barring committed
662
    rtcpFileRequest_t *filereq = NULL;
663
    extern char *u64tostr (u_signed64, char *, int);
Olof Barring's avatar
Olof Barring committed
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702

    if ( tape_fd < 0 || indxp == NULL || firstblk == NULL ||
         tape == NULL || file == NULL ) {
        serrno = EINVAL;
        return(-1);
    }
    filereq = &file->filereq;

    /*
     * In the case when startsize > maxsize, i.e. we are concatenating
     * tape files to a single disk file and the max size has been
     * exceeded, we still need to enter the loop in order to flag a
     * buffer full to wake up the disk IO thread. However no data
     * will be read from tape since tapebytes_sofar exceeds maxsize
     * already from the beginning. See notice further down (immediately
     * before the tread()).
     */
    file->tapebytes_sofar = filereq->startsize;
    blksiz = filereq->blocksize;
    lrecl = filereq->recordlength;

    /*
     * Calculate new actual buffer length
     */
    current_bufsz = rtcpd_CalcBufSz(tape,file);
    if ( current_bufsz == -1 ) return(-1);
    /*
     * Tape read loop to break out of. We must assure that there
     * are at least one empty buffer available for next disk IO
     * thread before signaling end of tape file to main control
     * thread. The reason for this is to avoid conflicting access
     * between two disk IO thread on the same buffer in case the
     * network or remote disk is much slower than the tape IO. Thus
     * we must assure that the buffer following the last tape file
     * buffer is empty which means that the currently active disk IO
     * thread has finished with it.
     */
    end_of_tpfile = FALSE;
    break_and_return = FALSE;
703
    proc_err = 0;
Olof Barring's avatar
Olof Barring committed
704
705
706
707
708
709
710
711
712
713
714
    for (;;) {
        i = *indxp;
        /*
         * Synchronize access to next buffer
         */
        TP_STATUS(RTCP_PS_WAITMTX);
        rc = Cthread_mutex_lock_ext(databufs[i]->lock);
        TP_STATUS(RTCP_PS_NOBLOCKING);
        if ( rc == -1 ) {
            rtcp_log(LOG_ERR,"TapeToMemory() Cthread_mutex_lock_ext(): %s\n",
                sstrerror(serrno));
715
716
717
718
            tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                             "func"   , TL_MSG_PARAM_STR, "TapeToMemory",
                             "Message", TL_MSG_PARAM_STR, "Cthread_mutex_lock_ext",
                             "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
Olof Barring's avatar
Olof Barring committed
719
720
721
722
723
724
            return(-1);
        }
        /*
         * Wait until it is empty.
         */
        while (databufs[i]->flag == BUFFER_FULL) {
725
726
            rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
            if ( (proc_err = ((severity | rtcpd_CheckProcError()) & 
727
                  RTCP_FAILED)) != 0 ) {
728
729
730
731
732
                (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
                break;
            }

733
734
            DEBUG_PRINT((LOG_DEBUG,"TapeToMemory() wait on buffer[%d]->flag=%d\n",
                i,databufs[i]->flag));
735
            databufs[i]->nb_waiters++;
Olof Barring's avatar
Olof Barring committed
736
737
738
739
740
741
            TP_STATUS(RTCP_PS_WAITCOND);
            rc = Cthread_cond_wait_ext(databufs[i]->lock);
            TP_STATUS(RTCP_PS_NOBLOCKING);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"TapeToMemory() Cthread_cond_wait_ext(): %s\n",
                    sstrerror(serrno));
742
743
744
745
                tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                 "func"   , TL_MSG_PARAM_STR, "TapeToMemory",
                                 "Message", TL_MSG_PARAM_STR, "Cthread_cond_wait_ext",
                                 "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
746
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
Olof Barring's avatar
Olof Barring committed
747
748
                return(-1);
            }
749
            databufs[i]->nb_waiters--;
750
751
752
        } /* while (databufs[i]->flag == BUFFER_FULL) */

        if ( databufs[i]->flag == BUFFER_EMPTY ) {
753
            rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
754
            if ( (proc_err = ((severity | rtcpd_CheckProcError()) & 
755
                  RTCP_FAILED)) != 0 ) {
Olof Barring's avatar
Olof Barring committed
756
757
758
                (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
            }
759
        }
760
        if ( proc_err != 0 ) break;
761
        DEBUG_PRINT((LOG_DEBUG,"TapeToMemory() buffer %d empty\n",i));
Olof Barring's avatar
Olof Barring committed
762
763
764
765
766
767
768
769
770
771
772
773
774
775
        if ( end_of_tpfile == FALSE ) {
            /*
             * Set the actual buffer size to match current block size
             */
            databufs[i]->length = current_bufsz;
            /*
             * Reset switches (may remain old settings from a previous file).
             */
            databufs[i]->end_of_tpfile = FALSE;
            databufs[i]->last_buffer = FALSE;
            /*
             * Copy the data from tape
             */
            last_sz = 0;
776

777
            for (j=*firstblk; last_sz+blksiz < databufs[i]->length; j++) {
Olof Barring's avatar
Olof Barring committed
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
                /*
                 * Max size may have been exceeded even before we
                 * read any data at all. This happens when we are 
                 * concatenating on disk and max size is already 
                 * exceeded already at entry (i.e. startsize > maxsize).
                 * We still need to flag the buffer full to wake up
                 * the disk IO thread.
                 * Note that in the normal case (where we already have 
                 * read some data) the max size limit is tested after 
                 * the read. In that case the following test is always
                 * false.
                 */
                if ( (filereq->maxsize > 0) && 
                     (file->tapebytes_sofar >= filereq->maxsize) ) {
                    /*
                     * Nothing will be copied!
                     */
                    filereq->bytes_out = 0;
                    end_of_tpfile = TRUE;
797
                    break;
Olof Barring's avatar
Olof Barring committed
798
799
800
801
802
803
804
805
806
807
808
809
810
                }

                /*
                 * Read a block
                 */
                end_of_tpfile = FALSE;
                nb_bytes = blksiz;

                /*
                 * >>>>>>>>>>> read from tape <<<<<<<<<<<<<
                 */
                TP_STATUS(RTCP_PS_READ);
                rc = tread(tape_fd,
811
                           databufs[i]->buffer + last_sz,
Olof Barring's avatar
Olof Barring committed
812
813
814
                           nb_bytes,
                           tape,
                           file);
Benjamin Couturier's avatar
Benjamin Couturier committed
815

Olof Barring's avatar
Olof Barring committed
816
                TP_STATUS(RTCP_PS_NOBLOCKING);
817
818
                if ( rc == -1 ) {
                    rtcp_log(LOG_ERR,"TapeToMemory() tape read error\n");
819
820
821
                    tl_rtcpd.tl_log( &tl_rtcpd, 3, 2, 
                                     "func"   , TL_MSG_PARAM_STR, "TapeToMemory",
                                     "Message", TL_MSG_PARAM_STR, "tape read error" );
Olof Barring's avatar
Olof Barring committed
822
823
824
825
826
827
828
829
830
831
832
833
834
                    (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                    (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
                    return(-1);
                }
                if ( rc == 0 ) {
                    rtcpd_CheckReqStatus(NULL,file,NULL,&severity);
                    if ( severity & RTCP_NEXTRECORD ) {
                        /*
                         * Bad block skipped
                         */
                        nb_skipped_blks++;
                    } else if ( file->eovflag == 1 ) {
                        /*
835
836
837
838
839
840
841
842
843
                         * EOV reached. Note that this is the normal EOD
                         * status for a non-labeled volume (since there is
                         * no way to tell the difference in that case). We
                         * allow it also for labelled tapes (strictly this
                         * is an error) to allow users to read only first part
                         * of a spanned volume without getting an error. 
                         * Re-assinging it to an EOF instead of EOV allows 
                         * common treatment of all EOD conditions when
                         * trying to position to the next file.
Olof Barring's avatar
Olof Barring committed
844
                         */
845
846
847
848
849
                        if ( (filereq->concat & VOLUME_SPANNING) == 0 ) {
                            DEBUG_PRINT((LOG_DEBUG,"TapeToMemory() EOV condition without spanning volume\n"));
                            end_of_tpfile = TRUE;
                            file->eovflag = 0;
                        }
Olof Barring's avatar
Olof Barring committed
850
851
852
853
854
855
856
857
                    } else {
                        /*
                         * EOF reached
                         */
                        end_of_tpfile = TRUE;
                    }
                    break;
                }
858
859
860
                /*
                 * The record length was not given by the user
                 */
861
                if ( lrecl <= 0 ) {
862
863
864
                    lrecl = rc;
                    filereq->recordlength = lrecl;
                }
865

866
                last_sz += rc;
Olof Barring's avatar
Olof Barring committed
867
                file->tapebytes_sofar += rc;
868
869
                if ( lrecl > 0 && rc/lrecl > 0 ) filereq->nbrecs += rc/lrecl;
                else if ( rc > 0 ) filereq->nbrecs++;
Benjamin Couturier's avatar
Benjamin Couturier committed
870

Olof Barring's avatar
Olof Barring committed
871
872
873
874
875
876
877
878
879
                TP_SIZE(rc);
            } /* End of for (j=...) */
            /*  
             * Set the actual data length. Should be the
             * same as the current buffer length except for
             * the last buffer of the file.
             */
            databufs[i]->data_length += last_sz;
            databufs[i]->bufendp = last_sz;
880
881
882
883
884
885
886
887

            /*
             * Check if we already reached max number of
             * records. If so we substract buffer sizes accordingly
             * and signal end of file with this buffer
             */
            if ( (filereq->maxnbrec > 0) &&
                 (filereq->nbrecs > filereq->maxnbrec) ) {
888
                if ( lrecl > 0 ) {
Olof Barring's avatar
Olof Barring committed
889
890
891
892
893
                    databufs[i]->data_length = databufs[i]->data_length -
                        (int)(filereq->nbrecs - filereq->maxnbrec)*lrecl;
                    file->tapebytes_sofar -= 
                        (filereq->nbrecs - filereq->maxnbrec)*lrecl;
                }
894
895
896
                filereq->nbrecs = filereq->maxnbrec;
                end_of_tpfile = TRUE;
                severity = RTCP_OK | RTCP_LIMBYSZ;
897
                rtcpd_SetReqStatus(NULL,file,ERTLIMBYSZ,severity);
Olof Barring's avatar
Olof Barring committed
898
899
900
901
902
903
904
            }
            /*
             * Check if maxsize has been reached for this file
             */
            if ((filereq->maxsize > 0) &&
                (filereq->maxsize <= file->tapebytes_sofar) ) {
                severity = RTCP_OK | RTCP_LIMBYSZ;
905
                rtcpd_SetReqStatus(NULL,file,ERTLIMBYSZ,severity);
Olof Barring's avatar
Olof Barring committed
906
907
908
909
910
911
912
913
914
915
916
                end_of_tpfile = TRUE;
            }

            if ( end_of_tpfile == TRUE ) {
                databufs[i]->end_of_tpfile = TRUE;
                /*
                 * Need to close here to get compression
                 * statistics etc. before the disk IO
                 * thread send the info. to the client
                 */
                TP_STATUS(RTCP_PS_CLOSE);
917
                {
918
                    /* all following variables are ignored for reads */
919
920
                    const uint32_t tapeFlushMode =
                        TAPEBRIDGE_N_FLUSHES_PER_FILE;
921
922
923
924
925
926
927
928
929
930
931
932
933
934
                    uint64_t       nbBytesWrittenWithoutFlush = 0;
                    uint64_t       nbFilesWrittenWithoutFlush = 0;
                    const uint64_t maxBytesBeforeFlush = 0;
                    const uint64_t maxFilesBeforeFlush = 0;
                    int            flushedToTapeAfterNFiles = 0;
                    uint64_t       batchBytesToTape = 0;

                    rc = tclose(tape_fd,tape,file,tapeFlushMode,
                        &nbBytesWrittenWithoutFlush,
                        &nbFilesWrittenWithoutFlush,
                        maxBytesBeforeFlush,
                        maxFilesBeforeFlush,
                        &flushedToTapeAfterNFiles,
                        &batchBytesToTape);
935
                }
Olof Barring's avatar
Olof Barring committed
936
937
938
939
                TP_STATUS(RTCP_PS_NOBLOCKING);
                if ( rc == -1 ) {
                    rtcp_log(LOG_ERR,"TapeToMemory() tclose(): %s\n",
                        sstrerror(serrno));
940
941
942
943
                    tl_rtcpd.tl_log( &tl_rtcpd, 3, 3, 
                                     "func"   , TL_MSG_PARAM_STR, "TapeToMemory",
                                     "Message", TL_MSG_PARAM_STR, "tclose",
                                     "Error"  , TL_MSG_PARAM_STR, sstrerror(serrno));
Olof Barring's avatar
Olof Barring committed
944
945
946
947
948
                    (void)Cthread_cond_broadcast_ext(databufs[i]->lock);
                    (void)Cthread_mutex_unlock_ext(databufs[i]->lock);
                    return(-1);
                }
                tape_fd = -1;
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966

                /*
                 * Note down the actual file size to
                 * allow for start up of next disk IO thread.
                 */
                if ((filereq->maxsize > 0) &&
                    (filereq->maxsize <= file->tapebytes_sofar) ) {
                    /*
                     * Truncated due to user specified max size.
                     * Re-calculate number of bytes out and number
                     * of tape records.
                     */
                    filereq->bytes_out = filereq->maxsize - filereq->startsize;
                    if ( lrecl > 0 )
                        filereq->nbrecs = filereq->bytes_out / lrecl;
                } else
                    filereq->bytes_out = file->tapebytes_sofar -
                        filereq->startsize;
Olof Barring's avatar
Olof Barring committed
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
            }

            /*
             * Set the buffer semaphore
             */
            databufs[i]->flag = BUFFER_FULL;
            /*
             * Always start next iteration with a new buffer boundary
             */
            *indxp = (*indxp+1) % nb_bufs;
            *firstblk = 0;
        } else {
            /*
             * End of tape file was reached in previous iteration.
             * At least one empty buffer follows the last written
             * buffer for this tape file. We can therefore signal
             * to the control thread that a new disk IO thread can
             * be started.
             */
986
            DEBUG_PRINT((LOG_DEBUG,
987
               "TapeToMemory() tapebytes_sofar=%d, max=%d, start=%d, bytes_out=%d\n",
988
                (int)file->tapebytes_sofar,(int)filereq->maxsize,
989
                (int)filereq->startsize,(int)filereq->bytes_out));
990
991
992
993
994
995
996
997
998
999
1000

            /*
             * Tell main control thread that we finished by
             * resetting the number of reserved buffers.
             */
            TP_STATUS(RTCP_PS_WAITMTX);
            rc = Cthread_mutex_lock_ext(proc_cntl.cntl_lock);
            TP_STATUS(RTCP_PS_NOBLOCKING);
            if ( rc == -1 ) {
                rtcp_log(LOG_ERR,"TapeToMemory() Cthread_mutex_lock_ext(proc_cntl): %s\n",
                    sstrerror(serrno));