Commit 8b17b0cb authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Properly implemented gridftp transfers via the diskmanagerd. Still

not tested as gridftp transfers are actually broken in SLC6 for long...
parent 83167cf2
......@@ -46,12 +46,19 @@ typedef struct globus_l_gfs_CASTOR2_handle_s {
char *uuid; /* must be pointers to environment variables */
char *fullDestPath; /* we do not allocate or free memory here */
char *access_mode;
/* we have to save all blocs checksums for on the fly calculation */
/* we have to save all blocs checksums for the on the fly calculation */
checksum_block_list_t * checksum_list;
checksum_block_list_t *checksum_list_p;
unsigned long number_of_blocks;
long long fileSize;
} globus_l_gfs_CASTOR2_handle_t;
/* a function to wrap all is needed to close a file */
static void globus_castor_close(const char* func,
globus_l_gfs_CASTOR2_handle_t* CASTOR2_handle,
const char* ckSumbuf,
const char* error_msg);
static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t,
globus_result_t,
globus_byte_t *,
......
......@@ -28,20 +28,18 @@
#include <sys/types.h>
#include <dirent.h>
#include <string.h>
#include "globus_gridftp_server.h"
#include "dsi_CASTOR2.h"
#include <stdio.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <zlib.h>
#include <sys/xattr.h>
#include "globus_gridftp_server.h"
#include "dsi_CASTOR2.h"
#include "getconfent.h"
#include "serrno.h"
#include "ceph/ceph_posix.h"
#include "movers/moverclose.h"
......@@ -160,6 +158,43 @@ static void ceph_logfunc_wrapper (char *format, va_list argp) {
free(logstr);
}
/* a function to wrap all is needed to close a file */
static void globus_castor_close(const char* func,
globus_l_gfs_CASTOR2_handle_t* CASTOR2_handle,
const char* ckSumbuf,
const char* error_msg) {
int rc = 0;
char* errorBuf = NULL;
CASTOR2_handle->done = GLOBUS_TRUE;
ceph_posix_close(CASTOR2_handle->fd);
if(error_msg) {
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "%s: terminating transfer on error: %s\n", func, error_msg);
errorBuf = strdup(error_msg);
rc = SEINTERNAL;
}
else {
CASTOR2_handle->cached_res = GLOBUS_SUCCESS;
}
/* tell diskmanagerd to close the file */
int port = MOVERHANDLERPORT;
if(getconfent("DiskManager", "MoverHandlerPort", 0) != NULL) {
port = atoi(getconfent("DiskManager", "MoverHandlerPort", 0));
}
if (mover_close_file(port, CASTOR2_handle->uuid, CASTOR2_handle->fileSize, "AD", ckSumbuf, &rc, &errorBuf) != 0) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "%s: mover_close_file failed for transferid=%s: %s\n",
func, CASTOR2_handle->uuid, errorBuf);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
free(errorBuf);
}
else {
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,
"%s: mover_close_file succeeded for transferid=%s, fileSize=%ld, checkSum=0x%s\n",
func, CASTOR2_handle->uuid, CASTOR2_handle->fileSize, (ckSumbuf ? ckSumbuf : "0"));
}
}
/*************************************************************************
* start
* -----
......@@ -332,12 +367,13 @@ int CASTOR2_handle_open(char *path,
/* if clients uuid is the same as internal uuid we will access fullDestPath file then */
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP,"%s: open file \"%s\"\n",
func,CASTOR2_handle->fullDestPath);
rc = ceph_posix_open(CASTOR2_handle->fullDestPath,flags,mode);
rc = ceph_posix_open(CASTOR2_handle->fullDestPath, flags, mode);
CASTOR2_handle->fileSize = 0;
return (rc);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,
"%s: client and server uuids do not match \"%s\" != \"%s\"\n",
func, uuid_path,CASTOR2_handle->uuid);
func, uuid_path, CASTOR2_handle->uuid);
errno = EINVAL;
return (-1);
}
......@@ -409,6 +445,7 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
free_checksum_list(CASTOR2_handle->checksum_list);
} else {
globus_gridftp_server_update_bytes_written(op,offset,nbytes);
CASTOR2_handle->fileSize += bytes_written;
}
}
}
......@@ -418,17 +455,14 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
if(!CASTOR2_handle->done) globus_l_gfs_CASTOR2_read_from_net(CASTOR2_handle);
/* if done and there are no outstanding callbacks finish */
else if(CASTOR2_handle->outstanding == 0) {
long long fileSize = 0;
if (CASTOR2_handle->number_of_blocks > 0) {
/* checksum calculation */
checksum_array=(checksum_block_list_t**)
globus_calloc(CASTOR2_handle->number_of_blocks,sizeof(checksum_block_list_t*));
if (checksum_array == NULL) {
free_checksum_list(CASTOR2_handle->checksum_list);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: malloc error \n",func);
CASTOR2_handle->done = GLOBUS_TRUE;
ceph_posix_close(CASTOR2_handle->fd);
CASTOR2_handle->fileSize = 0;
globus_castor_close(func, CASTOR2_handle, NULL, "Internal error (malloc)");
globus_mutex_unlock(&CASTOR2_handle->mutex);
return;
}
......@@ -466,14 +500,12 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
} else {
// overlapping chunks. This is not supported, fail the transfer
free_checksum_list(CASTOR2_handle->checksum_list);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: Overlapping chunks detected while handling 0x%x-0x%x. The overlap starts at 0x%x\n",
func,
checksum_array[i]->offset,
checksum_array[i]->offset+checksum_array[i]->size,
chkOffset);
CASTOR2_handle->done = GLOBUS_TRUE;
ceph_posix_close(CASTOR2_handle->fd);
char errorBuf[1024];
sprintf(errorBuf, "Overlapping chunks detected while handling 0x%lx-0x%lx. The overlap starts at 0x%lx\n",
(unsigned long int)checksum_array[i]->offset,
(unsigned long int)checksum_array[i]->offset+checksum_array[i]->size,
(unsigned long int)chkOffset);
globus_castor_close(func, CASTOR2_handle, NULL, errorBuf);
globus_mutex_unlock(&CASTOR2_handle->mutex);
return;
}
......@@ -483,8 +515,7 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
checksum_array[i]->size);
chkOffset += checksum_array[i]->size;
}
fileSize = (long long)chkOffset;
sprintf(ckSumbuf,"%lx",file_checksum);
sprintf(ckSumbuf, "%lx", file_checksum);
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "%s: checksum for %s : AD 0x%lx\n",
func,CASTOR2_handle->fullDestPath, file_checksum);
globus_free(checksum_array);
......@@ -499,25 +530,7 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: unable to store checksum value as xattr\n", func);
}
}
ceph_posix_close(CASTOR2_handle->fd);
/* tell diskmanagerd to close the file */
int port = MOVERHANDLERPORT;
if(getconfent("DiskManager", "MoverHandlerPort", 0) != NULL) {
port = atoi(getconfent("DiskManager", "MoverHandlerPort", 0));
}
int rc = 0;
char* error_msg = NULL;
if (mover_close_file(port, CASTOR2_handle->uuid, fileSize, "AD", ckSumbuf, &rc, &error_msg) != 0) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: mover_close_file failed for transferid=%s: %s\n",
func, CASTOR2_handle->uuid, error_msg);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
CASTOR2_handle->done = GLOBUS_TRUE;
free(error_msg);
}
else {
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "%s: mover_close_file succeeded for transferid=%s, fileSize=%ld, checkSum=0x%s\n",
func, CASTOR2_handle->uuid, fileSize, ckSumbuf);
}
globus_castor_close(func, CASTOR2_handle, ckSumbuf, NULL);
globus_gridftp_server_finished_transfer(op, CASTOR2_handle->cached_res);
}
}
......@@ -542,7 +555,7 @@ static void globus_l_gfs_CASTOR2_read_from_net
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "malloc failed");
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
......@@ -556,13 +569,13 @@ static void globus_l_gfs_CASTOR2_read_from_net
if(result != GLOBUS_SUCCESS) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,
"%s: register read has finished with a bad result \n",
"%s: register read has finished with a bad result\n",
func);
globus_free(buffer);
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "register read has finished with a bad result");
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
......@@ -796,9 +809,7 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
&CASTOR2_handle->blk_length);
if(CASTOR2_handle->blk_length == 0) {
result = GLOBUS_SUCCESS;
ceph_posix_close(CASTOR2_handle->fd);
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
globus_castor_close(func, CASTOR2_handle, ckSumbuf, NULL);
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
......@@ -818,9 +829,8 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
/* verify that it worked */
if (start_offset != CASTOR2_handle->blk_offset) {
result = globus_l_gfs_make_error("seek");
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "failed to seek");
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
......@@ -831,9 +841,8 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
buffer = globus_malloc(read_length);
if (buffer == NULL) {
result = GlobusGFSErrorGeneric("error: malloc failed");
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "malloc failed");
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
......@@ -851,16 +860,12 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
(checksum_block_list_t *)globus_malloc(sizeof(checksum_block_list_t));
if (CASTOR2_handle->checksum_list_p->next==NULL) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: malloc error \n",func);
globus_free(buffer);
ceph_posix_close(CASTOR2_handle->fd);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
CASTOR2_handle->done = GLOBUS_TRUE;
globus_castor_close(func, CASTOR2_handle, NULL, "malloc error");
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: finished (error)\n",func);
return CASTOR2_handle->done;
}
CASTOR2_handle->checksum_list_p->next->next=NULL;
......@@ -880,15 +885,11 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
sizeof(checksum_block_list_t*));
if (checksum_array==NULL){
free_checksum_list(CASTOR2_handle->checksum_list);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: malloc error \n",func);
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,"%s: finished (error)\n",func);
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "malloc error");
return CASTOR2_handle->done;
}
checksum_list_pp=CASTOR2_handle->checksum_list;
......@@ -938,26 +939,23 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
}
if (useCksum) { /* we have disks and on the fly checksums here */
sprintf(ckSumbuf,"%lx", file_checksum);
sprintf(ckSumbuf, "%lx", file_checksum);
if (strncmp(ckSumbufdisk,ckSumbuf,CA_MAXCKSUMLEN)==0) {
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP,"%s: checksums OK! \n",func);
} else {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,
"%s: checksum error detected reading file: %s (recorded checksum: 0x%s calculated checksum: 0x%s)\n",
func,
CASTOR2_handle->fullDestPath,
ckSumbufdisk,
ckSumbuf);
char errorBuf[1024];
sprintf(errorBuf, "checksum error detected reading file: %s (recorded checksum: 0x%s calculated checksum: 0x%s)\n",
CASTOR2_handle->fullDestPath,
ckSumbufdisk,
ckSumbuf);
/* to do something in error case */
CASTOR2_handle->cached_res =
globus_error_put (globus_object_construct (GLOBUS_ERROR_TYPE_BAD_DATA));
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,"%s: finished (error)\n",func);
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, errorBuf);
return CASTOR2_handle->done;
}
} else {
......@@ -965,10 +963,7 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
"%s: ADLER32 checksum has not been found in extended attributes\n",
func);
}
ceph_posix_close(CASTOR2_handle->fd);
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
globus_castor_close(func, CASTOR2_handle, ckSumbuf, NULL);
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
......@@ -979,16 +974,13 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
if (nbread < 0) { /* error */
result = globus_l_gfs_make_error("read");
globus_free(buffer);
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "error reading from disk");
CASTOR2_handle->cached_res = result;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: finished (error)\n",func);
return CASTOR2_handle->done;
}
if (read_length>=nbread) {
......@@ -1014,9 +1006,8 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
if(res != GLOBUS_SUCCESS) {
globus_free(buffer);
ceph_posix_close(CASTOR2_handle->fd);
globus_castor_close(func, CASTOR2_handle, NULL, "error writing to network");
CASTOR2_handle->cached_res = res;
CASTOR2_handle->done = GLOBUS_TRUE;
if (CASTOR2_handle->outstanding == 0) {
globus_gridftp_server_finished_transfer(CASTOR2_handle->op,
CASTOR2_handle->cached_res);
......@@ -1050,7 +1041,8 @@ static void globus_l_gfs_net_write_cb(globus_gfs_operation_t op,
if (!CASTOR2_handle->done) {
globus_l_gfs_CASTOR2_send_next_to_client(CASTOR2_handle);
} else if (CASTOR2_handle->outstanding == 0) {
ceph_posix_close(CASTOR2_handle->fd);
/* this is a read, we don't care about the checksum */
globus_castor_close(func, CASTOR2_handle, "0", NULL);
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,"%s: finished transfer\n",func);
globus_gridftp_server_finished_transfer(op, CASTOR2_handle->cached_res);
}
......
......@@ -25,6 +25,8 @@ extern "C" {
<< ' ' << cksumvalue << ' ' << *errorcode;
if(*errorcode) {
writeBuf << ' ' << *errormsg;
free(*errormsg);
*errormsg = NULL;
}
try {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment