Commit 412a3212 authored by Thomas White's avatar Thomas White
Browse files

WIP on tidy-up

parent de56cd77
......@@ -81,7 +81,7 @@ enum {
/* Information about the indexing process which is common to all patterns */
struct static_index_args
struct index_args
{
UnitCell *cell;
int config_cmfilter;
......@@ -110,7 +110,6 @@ struct static_index_args
double ir_out;
/* Output stream */
pthread_mutex_t *output_mutex; /* Protects the output stream */
FILE *ofh;
const struct copy_hdf5_field *copyme;
char *outfile;
......@@ -118,36 +117,16 @@ struct static_index_args
/* Information about the indexing process for one pattern */
struct index_args
struct pattern_args
{
/* "Input" */
char *filename;
struct static_index_args static_args;
/* "Output" */
int indexable;
};
/* Information needed to choose the next task and dispatch it */
struct queue_args
{
FILE *fh;
char *prefix;
int config_basename;
struct static_index_args static_args;
char *use_this_one_instead;
int n_indexable;
int n_processed;
int n_indexable_last_stats;
int n_processed_last_stats;
int t_last_stats;
int updateReader;
};
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
......@@ -246,16 +225,16 @@ static void show_help(const char *s)
}
// Get next pattern in .lst
char* get_pattern(FILE *fh) {
char *rval;
char line[LINE_LENGTH];
rval = fgets(line, LINE_LENGTH - 1, fh);
if (ferror(fh)) {
printf("Read error\n");
rval = NULL;
}
return rval;
static char *get_pattern(FILE *fh)
{
char *rval;
char line[LINE_LENGTH];
rval = fgets(line, LINE_LENGTH - 1, fh);
if ( ferror(fh) ) {
ERROR("Failed to get next filename from list.\n");
rval = NULL;
}
return rval;
}
......@@ -416,11 +395,11 @@ static void process_image(void *qp, void *pp, int cookie)
outfilename = malloc(strlen(outfile) + 1);
snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile);
if ((fd = open(outfilename, O_WRONLY)) == -1) {
perror("Error on opening\n");
ERROR("Error on opening\n");
exit(1);
}
if (fcntl(fd, F_SETLKW, &fl) == -1) {
perror("Error on setting lock wait\n");
ERROR("Error on setting lock wait\n");
exit(1);
}
......@@ -428,7 +407,7 @@ static void process_image(void *qp, void *pp, int cookie)
FILE *fh;
fh = fopen(outfilename, "a");
if (fh == NULL) {
perror("Error inside lock\n");
ERROR("Error inside lock\n");
}
write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags);
fclose(fh);
......@@ -436,7 +415,7 @@ static void process_image(void *qp, void *pp, int cookie)
/* Unlock stream for other processes */
fl.l_type = F_UNLCK; /* set to unlock same region */
if (fcntl(fd, F_SETLK, &fl) == -1) {
perror("fcntl");
ERROR("fcntl");
exit(1);
}
close(fd);
......@@ -456,6 +435,36 @@ static void process_image(void *qp, void *pp, int cookie)
}
static void run_work(const struct index_args *iargs,
int filename_pipe, int results_pipe)
{
int allDone = 0;
while ( !allDone ) {
/* read from pipe and return number of bytes read */
if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) {
ERROR("read1");
} else if (buff_count > 0) {
/* process image */
pargs.filename = buffR;
pargs.indexable = 0;
process_image(&qargs, &pargs, batchNum);
/* request another image */
buff_count = sprintf(buffW, "%d\n", pargs.indexable);
if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0)
ERROR("write P0");
} else if (buff_count == 0) {
allDone = 1;
}
}
/* close my pipes */
close(filename_pipe);
close(results_pipe);
}
#ifdef HAVE_CLOCK_GETTIME
static time_t get_monotonic_seconds()
......@@ -542,7 +551,7 @@ int main(int argc, char *argv[])
float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */
int cellr;
int peaks;
int nProcesses = 1;
int n_proc = 1;
char *prepare_line;
char prepare_filename[LINE_LENGTH];
struct queue_args qargs;
......@@ -558,6 +567,12 @@ int main(int argc, char *argv[])
float ir_inn = 4.0;
float ir_mid = 5.0;
float ir_out = 7.0;
int n_indexable, n_processed, n_indexable_last_stats;
int n_processed_last_stats;
int t_last_stats;
pid_t *pids;
int *filename_pipes;
int *result_pipes;
copyme = new_copy_hdf5_field_list();
if ( copyme == NULL ) {
......@@ -638,7 +653,7 @@ int main(int argc, char *argv[])
break;
case 'j' :
nProcesses = atoi(optarg);
n_proc = atoi(optarg);
break;
case 'g' :
......@@ -909,7 +924,7 @@ int main(int argc, char *argv[])
prepare_line = tmp;
}
snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line);
qargs.use_this_one_instead = prepare_line;
rewind(fh);
/* Prepare the indexer */
if ( indm != NULL ) {
......@@ -925,223 +940,147 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
qargs.static_args.cell = cell;
qargs.static_args.config_cmfilter = config_cmfilter;
qargs.static_args.config_noisefilter = config_noisefilter;
qargs.static_args.config_verbose = config_verbose;
qargs.static_args.config_satcorr = config_satcorr;
qargs.static_args.config_closer = config_closer;
qargs.static_args.config_insane = config_insane;
qargs.static_args.config_bgsub = config_bgsub;
qargs.static_args.cellr = cellr;
qargs.static_args.tols[0] = tols[0];
qargs.static_args.tols[1] = tols[1];
qargs.static_args.tols[2] = tols[2];
qargs.static_args.tols[3] = tols[3];
qargs.static_args.threshold = threshold;
qargs.static_args.min_gradient = min_gradient;
qargs.static_args.min_snr = min_snr;
qargs.static_args.min_int_snr = min_int_snr;
qargs.static_args.det = det;
qargs.static_args.indm = indm;
qargs.static_args.ipriv = ipriv;
qargs.static_args.peaks = peaks;
qargs.static_args.ofh = ofh;
qargs.static_args.beam = beam;
qargs.static_args.element = element;
qargs.static_args.stream_flags = stream_flags;
qargs.static_args.hdf5_peak_path = hdf5_peak_path;
qargs.static_args.copyme = copyme;
qargs.static_args.ir_inn = ir_inn;
qargs.static_args.ir_mid = ir_mid;
qargs.static_args.ir_out = ir_out;
qargs.fh = fh;
qargs.prefix = prefix;
qargs.config_basename = config_basename;
qargs.n_indexable = 0;
qargs.n_processed = 0;
qargs.n_indexable_last_stats = 0;
qargs.n_processed_last_stats = 0;
qargs.updateReader = 0; /* first process updates */
qargs.t_last_stats = get_monotonic_seconds();
/* Read .lst file */
register int i;
rewind(fh); /* make sure to read from start */
/* Clear output file content */
char *myOutfilename = NULL;
chomp(prefix);
chomp(outfile);
myOutfilename = malloc(strlen(outfile) + 1);
snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile);
FILE *tfh;
tfh = fopen(myOutfilename, "a+");
if (tfh == NULL) {
ERROR("No output filename\n");
}
fclose(tfh);
qargs.static_args.outfile = outfile;
int ready_fd;
int buff_count;
fd_set fdset,tmpset;
char buffR[BUFFER], buffW[BUFFER];
int fd_pipeIn[nProcesses][2]; /* Process0 In */
int fd_pipeOut[nProcesses][2]; /* Process0 Out */
unsigned int opts;
FD_ZERO(&fdset); /* clear the fd_set */
/* set pipeIn as non-blocking */
for ( i=0; i<nProcesses; i++ ) {
opts = fcntl(fd_pipeIn[i][0], F_GETFL);
fcntl(fd_pipeIn[i][0], F_SETFL, opts | O_NONBLOCK);
}
/* Static worker args */
iargs.cell = cell;
iargs.config_cmfilter = config_cmfilter;
iargs.config_noisefilter = config_noisefilter;
iargs.config_verbose = config_verbose;
iargs.config_satcorr = config_satcorr;
iargs.config_closer = config_closer;
iargs.config_insane = config_insane;
iargs.config_bgsub = config_bgsub;
iargs.cellr = cellr;
iargs.tols[0] = tols[0];
iargs.tols[1] = tols[1];
iargs.tols[2] = tols[2];
iargs.tols[3] = tols[3];
iargs.threshold = threshold;
iargs.min_gradient = min_gradient;
iargs.min_snr = min_snr;
iargs.min_int_snr = min_int_snr;
iargs.det = det;
iargs.indm = indm;
iargs.ipriv = ipriv;
iargs.peaks = peaks;
iargs.ofh = ofh;
iargs.beam = beam;
iargs.element = element;
iargs.stream_flags = stream_flags;
iargs.hdf5_peak_path = hdf5_peak_path;
iargs.copyme = copyme;
iargs.ir_inn = ir_inn;
iargs.ir_mid = ir_mid;
iargs.ir_out = ir_out;
iargs.outfile = outfile;
n_indexable = 0;
n_processed = 0;
n_indexable_last_stats = 0;
n_processed_last_stats = 0;
t_last_stats = get_monotonic_seconds();
/* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
pid_t p;
int filename_pipe[2];
int result_pipe[2];
if ( pipe(filename_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return 1;
}
/**** PIPING ****/
for ( i=0; i<nProcesses; i++ ) {
pipe(fd_pipeIn[i]);
pipe(fd_pipeOut[i]);
}
if ( pipe(results_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return 1;
}
int max_fd = 0;
for ( i=0; i<nProcesses; i++ ) {
FD_SET(fd_pipeIn[i][0], &fdset);
if (fd_pipeIn[i][0] > max_fd) { /* find max_fd */
max_fd = fd_pipeIn[i][0];
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
return 1;
}
}
max_fd = max_fd+1;
/* copy file set to tmpset */
memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set));
/**** FORKING ****/
int power = 10; /* 2^power must be larger than nProcesses */
int pid[power];
double num = 0;
int batchNum = 0;
/* Fork 2^power times */
for ( i=0; i<power; i++ ) {
pid[i] = fork();
}
/* Assign id */
for ( i=0; i<power; i++ ) {
if (pid[i] == 0) { /* keep parents and kill off children */
num += pow(2, i);
if ( p == 0 ) {
/* Child process gets the 'read' end of the filename
* pipe, and the 'write' end of the result pipe. */
close(filename_pipe[1]);
close(result_pipe[0]);
run_work(&iargs, filename_pipe[0], result_pipe[1]);
exit(0);
}
/* Parent process gets the 'write' end of the filename pipe
* and the 'read' end of the result pipe. */
pid[i] = p;
close(filename_pipe[0]);
close(result_pipe[1]);
filename_pipes[i] = filename_pipe[1];
result_pipes[i] = result_pipe[0];
}
/* Kill if batchNum too high */
if (num >= nProcesses + 1) {
exit(0); /* kill */
}
batchNum = (int) num;
/**** PLUMBING ****/
if (batchNum == qargs.updateReader) {
for ( i=0; i<nProcesses; i++ ) {
close(fd_pipeIn[i][1]); /* close all write pipes In */
close(fd_pipeOut[i][0]); /* close all read pipes Out */
}
} else {
for ( i=0; i<nProcesses; i++ ) {
if (i == batchNum - 1) { /* batchNum = 1,2,3 ... */
close(fd_pipeIn[i][0]); /* close read pipe In */
close(fd_pipeOut[i][1]); /* close write pipe Out */
} else {
close(fd_pipeIn[i][0]); // close remaining pipes In
close(fd_pipeIn[i][1]);
close(fd_pipeOut[i][0]); // close remaining pipes Out
close(fd_pipeOut[i][1]);
}
}
/* Send first image to all children */
char *nextImage = NULL;
for ( i=0; i<nProcesses; i++ ) {
nextImage = get_pattern(fh);
buff_count = sprintf(buffW, "%s",nextImage);
write (fd_pipeOut[i][1], buffW, buff_count);
}
/**** INDEXING ****/
double tStart, tEnd;
tStart = get_monotonic_seconds();
int allDone = 0;
if (batchNum == qargs.updateReader){
char *nextImage = NULL;
for ( i=0; i<nProcesses; i++ ) { /* Send out image to all processes*/
nextImage = get_pattern(fh);
buff_count = sprintf(buffW, "%s",nextImage);
write (fd_pipeOut[i][1], buffW, buff_count);
}
int nFinished = 0;
while (!allDone) {
/* select from file set for reading */
if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0)
perror("select");
if (ready_fd > 0) {
for ( i=0; i<nProcesses; i++ ) {
/* is in file set that raised flag? */
if (FD_ISSET(fd_pipeIn[i][0],&fdset)) {
/* read from pipe and return number of bytes read */
if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) {
perror("read");
int nFinished = 0;
while (!allDone) {
/* select from file set for reading */
if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0)
ERROR("select");
if (ready_fd > 0) {
for ( i=0; i<nProcesses; i++ ) {
/* is in file set that raised flag? */
if (FD_ISSET(fd_pipeIn[i][0],&fdset)) {
/* read from pipe and return number of bytes read */
if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) {
ERROR("read");
} else {
qargs.n_indexable += atoi(buffR);
qargs.n_processed++;
/* write to pipe */
if ((nextImage = get_pattern(fh)) == NULL){
nFinished++; /* no more images */
if ( nFinished == nProcesses )
allDone = 1; /* EXIT */
} else {
qargs.n_indexable += atoi(buffR);
qargs.n_processed++;
/* write to pipe */
if ((nextImage = get_pattern(fh)) == NULL){
nFinished++; /* no more images */
if ( nFinished == nProcesses )
allDone = 1; /* EXIT */
} else {
/* send out image */
buff_count = sprintf(buffW, "%s",nextImage);
if (write (fd_pipeOut[i][1], buffW, buff_count)<0)
perror("write pipe");
}
/* send out image */
buff_count = sprintf(buffW, "%s",nextImage);
if (write (fd_pipeOut[i][1], buffW, buff_count)<0)
ERROR("write pipe");
}
}
}
}
/* file set is modified, so copy original from tmpset */
memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set));
/* Update to screen */
double tNow = get_monotonic_seconds();
if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) {
STATUS("%i out of %i indexed so far,"
" %i out of %i since the last message.\n\n",
qargs.n_indexable, qargs.n_processed,
qargs.n_indexable - qargs.n_indexable_last_stats,
qargs.n_processed - qargs.n_processed_last_stats);
qargs.n_indexable_last_stats = qargs.n_indexable;
qargs.n_processed_last_stats = qargs.n_processed;
qargs.t_last_stats = tNow;
}
}
/* close my pipes */
for ( i=0; i<nProcesses; i++ ) {
close(fd_pipeIn[i][0]);
close(fd_pipeOut[i][1]);
/* file set is modified, so copy original from tmpset */
memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set));
/* Update to screen */
double tNow = get_monotonic_seconds();
if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) {
STATUS("%i out of %i indexed so far,"
" %i out of %i since the last message.\n\n",
qargs.n_indexable, qargs.n_processed,
qargs.n_indexable - qargs.n_indexable_last_stats,
qargs.n_processed - qargs.n_processed_last_stats);
qargs.n_indexable_last_stats = qargs.n_indexable;
qargs.n_processed_last_stats = qargs.n_processed;
qargs.t_last_stats = tNow;
}
tEnd = get_monotonic_seconds();
printf("Compute Time: %.2fs\n", tEnd - tStart);
} else {
while(!allDone){
/* read from pipe and return number of bytes read */
if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) {
perror("read1");
} else if (buff_count > 0) {
/* process image */
pargs.filename = buffR;
pargs.indexable = 0;
process_image(&qargs, &pargs, batchNum);
/* request another image */
buff_count = sprintf(buffW, "%d\n", pargs.indexable);
if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0)
perror("write P0");
} else if (buff_count == 0) {
allDone = 1; /* EXIT */
}
}
/* close my pipes */
close(fd_pipeIn[batchNum-1][1]);
close(fd_pipeOut[batchNum-1][0]);
}
/* close my pipes */
for ( i=0; i<nProcesses; i++ ) {
close(fd_pipeIn[i][0]);
close(fd_pipeOut[i][1]);
}
tEnd = get_monotonic_seconds();
cleanup_indexing(ipriv);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment