Commit 706a9a92 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Fixed gridftp transfers handling with diskmanagerd

parent a8353a3c
......@@ -198,7 +198,7 @@ class ActivityControlThread(threading.Thread):
if transfer.protocol != 'xroot':
# prepare the listening port for the client to connect. The mover is executed only after the client connects back
moverPort = self.clientsListener.createSocketForMover(qTransfer, self.startMover)
destPath = transfer.reqId
destPath = transfer.transferId
else:
# xroot is special here: we don't create a socket for the mover, but instead we tell
# the xroot server (through the redirector) to use our mover handler port for telling
......
......@@ -124,7 +124,7 @@ class ClientsReplierThread(threading.Thread):
dlf.writenotice(msgs.CLIENTSREPLIEREXCEPTION, error=str(e), subreqId=qTransfer.transfer.transferId,
clientHost=clientHost, clientPort=clientPort)
# now we should fail the outstanding transfer, as the client will never connect,
# but we rely on the clientsListener thread, which fails those transfers on timeout.
# but we rely on the manager thread, which fails those transfers on timeout.
def stop(self):
'''stops processing in this thread'''
......
......@@ -359,9 +359,9 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
unsigned long index;
unsigned long i;
unsigned long file_checksum;
char ckSumbuf[CA_MAXCKSUMLEN+1];
char * ckSumalg="ADLER32"; /* we only support Adler32 for gridftp */
char * func="globus_l_gfs_file_net_read_cb";
char ckSumbuf[CA_MAXCKSUMLEN+1] = "0";
char * ckSumalg = "ADLER32"; /* we only support Adler32 for gridftp */
char * func = "globus_l_gfs_file_net_read_cb";
CASTOR2_handle = (globus_l_gfs_CASTOR2_handle_t *) user_arg;
......@@ -423,7 +423,7 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
/* checksum calculation */
checksum_array=(checksum_block_list_t**)
globus_calloc(CASTOR2_handle->number_of_blocks,sizeof(checksum_block_list_t*));
if (checksum_array==NULL){
if (checksum_array == NULL) {
free_checksum_list(CASTOR2_handle->checksum_list);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: malloc error \n",func);
......@@ -473,7 +473,7 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
checksum_array[i]->offset+checksum_array[i]->size,
chkOffset);
CASTOR2_handle->done = GLOBUS_TRUE;
close(CASTOR2_handle->fd);
ceph_posix_close(CASTOR2_handle->fd);
globus_mutex_unlock(&CASTOR2_handle->mutex);
return;
}
......@@ -484,19 +484,19 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
chkOffset += checksum_array[i]->size;
}
fileSize = (long long)chkOffset;
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP,"%s: checksum for %s : AD 0x%lx\n",
func,CASTOR2_handle->fullDestPath,file_checksum);
sprintf(ckSumbuf,"%lx",file_checksum);
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "%s: checksum for %s : AD 0x%lx\n",
func,CASTOR2_handle->fullDestPath, file_checksum);
globus_free(checksum_array);
free_checksum_list(CASTOR2_handle->checksum_list);
/* set extended attributes */
sprintf(ckSumbuf,"%lx",file_checksum);
if (ceph_posix_fsetxattr(CASTOR2_handle->fd,"user.castor.checksum.type",
ckSumalg,strlen(ckSumalg),0)) {
;/* ignore all errors */
} else if (ceph_posix_fsetxattr(CASTOR2_handle->fd,"user.castor.checksum.value",
ckSumbuf,strlen(ckSumbuf),0)) {
; /* ignore all errors */
ckSumalg, strlen(ckSumalg), 0)) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: unable to store checksum type as xattr\n", func);
}
else if (ceph_posix_fsetxattr(CASTOR2_handle->fd,"user.castor.checksum.value",
ckSumbuf, strlen(ckSumbuf), 0)) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: unable to store checksum value as xattr\n", func);
}
}
ceph_posix_close(CASTOR2_handle->fd);
......@@ -510,9 +510,14 @@ static void globus_l_gfs_file_net_read_cb(globus_gfs_operation_t op,
if (mover_close_file(port, CASTOR2_handle->uuid, fileSize, "AD", ckSumbuf, &rc, &error_msg) != 0) {
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR,"%s: mover_close_file failed for transferid=%s: %s\n",
func, CASTOR2_handle->uuid, error_msg);
CASTOR2_handle->cached_res = GLOBUS_FAILURE;
CASTOR2_handle->done = GLOBUS_TRUE;
free(error_msg);
}
else {
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "%s: mover_close_file succeeded for transferid=%s, fileSize=%ld, checkSum=0x%s\n",
func, CASTOR2_handle->uuid, fileSize, ckSumbuf);
}
globus_gridftp_server_finished_transfer(op, CASTOR2_handle->cached_res);
}
}
......@@ -903,7 +908,7 @@ static globus_bool_t globus_l_gfs_CASTOR2_send_next_to_client
file_checksum=adler32_combine_(file_checksum,checksum_array[i]->csumvalue,
checksum_array[i]->size);
}
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP,"%s: checksum for %s : AD 0x%lx\n",
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,"%s: checksum for %s : AD 0x%lx\n",
func,CASTOR2_handle->fullDestPath,file_checksum);
globus_free(checksum_array);
free_checksum_list(CASTOR2_handle->checksum_list);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment