Commit 61b69a8c authored by Thomas White's avatar Thomas White
Browse files

Write stream without using a lock at all

parent fa3a72ba
......@@ -106,9 +106,7 @@ struct index_args
double ir_out;
/* Output stream */
FILE *ofh;
const struct copy_hdf5_field *copyme;
char *outfile;
};
......@@ -227,26 +225,30 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
char *line;
char *filename;
/* Get the next filename */
if ( *use_this_one_instead != NULL ) {
do {
line = *use_this_one_instead;
*use_this_one_instead = NULL;
/* Get the next filename */
if ( *use_this_one_instead != NULL ) {
} else {
line = *use_this_one_instead;
*use_this_one_instead = NULL;
char *rval;
} else {
char *rval;
line = malloc(1024*sizeof(char));
rval = fgets(line, 1023, fh);
if ( rval == NULL ) {
free(line);
return NULL;
}
line = malloc(1024*sizeof(char));
rval = fgets(line, 1023, fh);
if ( rval == NULL ) {
free(line);
return NULL;
}
}
chomp(line);
chomp(line);
} while ( strlen(line) == 0 );
if ( config_basename ) {
char *tmp;
......@@ -266,7 +268,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
static void process_image(const struct index_args *iargs,
struct pattern_args *pargs, int cookie)
struct pattern_args *pargs, FILE *ofh,
int cookie)
{
float *data_for_measurement;
size_t data_size;
......@@ -278,12 +281,7 @@ static void process_image(const struct index_args *iargs,
struct beam_params *beam = iargs->beam;
int r, check;
struct hdfile *hdfile;
char *outfile = iargs->outfile;
struct image image;
char *outfilename = iargs->outfile;
int fd;
FILE *fh;
struct flock fl;
image.features = NULL;
image.data = NULL;
......@@ -408,38 +406,9 @@ static void process_image(const struct index_args *iargs,
image.reflections = NULL;
}
/* Write Lock */
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0; /* Means "lock the whole file" */
fd = open(outfile, O_WRONLY);
if ( fd == -1 ) {
ERROR("Couldn't open output stream ('%s').\n", outfile);
exit(1);
}
if ( fcntl(fd, F_SETLKW, &fl) == -1 ) {
ERROR("Couldn't get lock on output stream.\n");
exit(1);
}
fh = fdopen(fd, "a");
if ( fh == NULL ) {
ERROR("Couldn't fdopen() the output stream.\n");
exit(1);
}
write_chunk(fh, &image, hdfile, iargs->stream_flags);
fflush(fh);
/* Unlock stream for other processes */
fl.l_type = F_UNLCK; /* set to unlock same region */
if ( fcntl(fd, F_SETLK, &fl) == -1 ) {
ERROR("fcntl");
exit(1);
}
fclose(fh); /* close(fd) happens as well because fd was not dup'd */
write_chunk(ofh, &image, hdfile, iargs->stream_flags);
fprintf(ofh, "END\n");
fflush(ofh);
/* Only free cell if found */
cell_free(image.indexed_cell);
......@@ -454,7 +423,8 @@ static void process_image(const struct index_args *iargs,
static void run_work(const struct index_args *iargs,
int filename_pipe, int results_pipe, int cookie)
int filename_pipe, int results_pipe, FILE *ofh,
int cookie)
{
int allDone = 0;
FILE *fh;
......@@ -489,16 +459,25 @@ static void run_work(const struct index_args *iargs,
}
chomp(line);
pargs.filename = line;
pargs.indexable = 0;
process_image(iargs, &pargs, cookie);
if ( strlen(line) == 0 ) {
allDone = 1;
} else {
pargs.filename = line;
pargs.indexable = 0;
process_image(iargs, &pargs, ofh, cookie);
/* Request another image */
c = sprintf(buf, "%i\n", pargs.indexable);
w = write(results_pipe, buf, c);
if ( w < 0 ) {
ERROR("write P0\n");
}
/* Request another image */
c = sprintf(buf, "%i\n", pargs.indexable);
w = write(results_pipe, buf, c);
if ( w < 0 ) {
ERROR("write P0\n");
}
free(line);
......@@ -558,6 +537,118 @@ static int parse_cell_reduction(const char *scellr, int *err,
}
static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
{
int done = 0;
int *finished;
FILE **fhs;
int i;
int chunk_finished;
finished = calloc(n_proc, sizeof(int));
if ( finished == NULL ) {
ERROR("Couldn't allocate memory for flags!\n");
exit(1);
}
fhs = calloc(n_proc, sizeof(FILE *));
if ( fhs == NULL ) {
ERROR("Couldn't allocate memory for file handles!\n");
exit(1);
}
for ( i=0; i<n_proc; i++ ) {
fhs[i] = fdopen(stream_pipe_read[i], "r");
if ( fhs[i] == NULL ) {
ERROR("Couldn't fdopen() stream!\n");
exit(1);
}
}
while ( !done ) {
int r, i;
struct timeval tv;
fd_set fds;
int fdmax;
tv.tv_sec = 5;
tv.tv_usec = 0;
FD_ZERO(&fds);
fdmax = 0;
for ( i=0; i<n_proc; i++ ) {
int fd;
if ( finished[i] ) continue;
fd = stream_pipe_read[i];
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
}
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
ERROR("select() failed: %s\n", strerror(errno));
continue;
}
if ( r == 0 ) continue; /* Nothing this time. Try again */
for ( i=0; i<n_proc; i++ ) {
if ( finished[i] ) continue;
if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue;
chunk_finished = 0;
do {
char line[1024];
char *rval;
rval = fgets(line, 1024, fhs[i]);
if ( rval == NULL ) {
if ( feof(fhs[i]) ) {
/* Process died */
finished[i] = 1;
} else {
ERROR("fgets() failed: %s\n",
strerror(errno));
}
continue;
}
if ( strcmp(line, "END\n") == 0 ) {
chunk_finished = 1;
} else {
fprintf(ofh, "%s", line);
}
} while ( !chunk_finished );
}
done = 1;
for ( i=0; i<n_proc; i++ ) {
if ( !finished[i] ) done = 0;
}
}
free(finished);
for ( i=0; i<n_proc; i++ ) {
fclose(fhs[i]);
}
}
int main(int argc, char *argv[])
{
int c;
......@@ -616,11 +707,14 @@ int main(int argc, char *argv[])
int t_last_stats;
pid_t *pids;
int *filename_pipes;
int *stream_pipe_read;
int *stream_pipe_write;
FILE **result_fhs;
fd_set fds;
int i;
int allDone;
int *finished;
pid_t pr;
copyme = new_copy_hdf5_field_list();
if ( copyme == NULL ) {
......@@ -1007,7 +1101,6 @@ int main(int argc, char *argv[])
iargs.indm = indm;
iargs.ipriv = ipriv;
iargs.peaks = peaks;
iargs.ofh = ofh;
iargs.beam = beam;
iargs.element = element;
iargs.stream_flags = stream_flags;
......@@ -1016,7 +1109,6 @@ int main(int argc, char *argv[])
iargs.ir_inn = ir_inn;
iargs.ir_mid = ir_mid;
iargs.ir_out = ir_out;
iargs.outfile = outfile;
n_indexable = 0;
n_processed = 0;
......@@ -1027,6 +1119,8 @@ int main(int argc, char *argv[])
FD_ZERO(&fds);
filename_pipes = calloc(n_proc, sizeof(int));
result_fhs = calloc(n_proc, sizeof(FILE *));
stream_pipe_read = calloc(n_proc, sizeof(int));
stream_pipe_write = calloc(n_proc, sizeof(int));
if ( filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return 1;
......@@ -1035,6 +1129,14 @@ int main(int argc, char *argv[])
ERROR("Couldn't allocate memory for pipe file handles.\n");
return 1;
}
if ( stream_pipe_read == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return 1;
}
if ( stream_pipe_write == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return 1;
}
pids = calloc(n_proc, sizeof(pid_t));
if ( pids == NULL ) {
......@@ -1048,6 +1150,28 @@ int main(int argc, char *argv[])
return 1;
}
for ( i=0; i<n_proc; i++ ) {
int stream_pipe[2];
if ( pipe(stream_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return 1;
}
stream_pipe_read[i] = stream_pipe[0];
stream_pipe_write[i] = stream_pipe[1];
}
pr = fork();
if ( pr == - 1 ) {
ERROR("fork() failed (for reader process)\n");
return 1;
}
if ( pr == 0 ) run_reader(stream_pipe_read, n_proc, ofh);
/* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
......@@ -1072,11 +1196,19 @@ int main(int argc, char *argv[])
}
if ( p == 0 ) {
FILE *fh;
/* Child process gets the 'read' end of the filename
* pipe, and the 'write' end of the result pipe. */
close(filename_pipe[1]);
close(result_pipe[0]);
run_work(&iargs, filename_pipe[0], result_pipe[1], i);
fh = fdopen(stream_pipe_write[i], "w");
run_work(&iargs, filename_pipe[0], result_pipe[1],
fh, i);
fclose(fh);
exit(0);
}
......@@ -1190,7 +1322,10 @@ int main(int argc, char *argv[])
if ( nextImage == NULL ) {
/* No more images */
finished[i] = 1;
r = write(filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("Write pipe\n");
}
} else {
r = write(filename_pipes[i], nextImage,
......@@ -1226,12 +1361,20 @@ int main(int argc, char *argv[])
}
for ( i=0; i<n_proc; i++ ) {
int status;
waitpid(pids[i], &status, 0);
}
for ( i=0; i<n_proc; i++ ) {
close(filename_pipes[i]);
fclose(result_fhs[i]);
}
free(filename_pipes);
free(result_fhs);
free(stream_pipe_read);
free(stream_pipe_write);
free(pids);
free(finished);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment