Commit 16b97e9e authored by Thomas White's avatar Thomas White
Browse files

Sandboxy stuff

parent 9ff3210f
......@@ -75,10 +75,31 @@
struct sandbox
{
int n_indexable;
int n_processed;
int n_indexable_last_stats;
int n_processed_last_stats;
int t_last_stats;
struct index_args *iargs;
int n_proc;
pid_t *pids;
FILE *ofh;
int *running;
FILE **result_fhs;
int *filename_pipes;
int *stream_pipe_read;
int *stream_pipe_write;
char **last_filename;
};
/* Horrible global variable for signal handler */
struct sandbox *sb;
static char *get_pattern(FILE *fh, char **use_this_one_instead,
int config_basename, const char *prefix)
{
......@@ -285,6 +306,7 @@ static void run_work(const struct index_args *iargs,
{
int allDone = 0;
FILE *fh;
int w;
fh = fdopen(filename_pipe, "r");
if ( fh == NULL ) {
......@@ -292,13 +314,18 @@ static void run_work(const struct index_args *iargs,
return;
}
w = write(results_pipe, "\n", 1);
if ( w < 0 ) {
ERROR("Failed to send request for first filename.\n");
}
while ( !allDone ) {
struct pattern_args pargs;
int w, c;
char buf[1024];
int c;
char *line;
char *rval;
char buf[1024];
line = malloc(1024*sizeof(char));
rval = fgets(line, 1023, fh);
......@@ -306,6 +333,7 @@ static void run_work(const struct index_args *iargs,
free(line);
if ( feof(fh) ) {
allDone = 1;
STATUS("Exiting!\n");
continue;
} else {
ERROR("Read error!\n");
......@@ -377,7 +405,7 @@ static time_t get_monotonic_seconds()
#endif
static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
static int pump_chunk(FILE *fh, FILE *ofh)
{
int chunk_started = 0;
int chunk_finished = 0;
......@@ -392,11 +420,11 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
if ( feof(fh) ) {
/* Process died */
*finished = 1;
if ( chunk_started ) {
ERROR("EOF during chunk!\n");
fprintf(ofh, "Chunk is unfinished!\n");
}
return 1;
} else {
ERROR("fgets() failed: %s\n", strerror(errno));
}
......@@ -413,33 +441,29 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
}
} while ( !chunk_finished );
return 0;
}
static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
static void *run_reader(void *sbv)
{
struct sandbox *sb = sbv;
int done = 0;
int *finished;
FILE **fhs;
int i;
finished = calloc(n_proc, sizeof(int));
if ( finished == NULL ) {
ERROR("Couldn't allocate memory for flags!\n");
exit(1);
}
fhs = calloc(n_proc, sizeof(FILE *));
fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( fhs == NULL ) {
ERROR("Couldn't allocate memory for file handles!\n");
exit(1);
return NULL;
}
for ( i=0; i<n_proc; i++ ) {
fhs[i] = fdopen(stream_pipe_read[i], "r");
for ( i=0; i<sb->n_proc; i++ ) {
fhs[i] = fdopen(sb->stream_pipe_read[i], "r");
if ( fhs[i] == NULL ) {
ERROR("Couldn't fdopen() stream!\n");
exit(1);
return NULL;
}
}
......@@ -455,13 +479,13 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
FD_ZERO(&fds);
fdmax = 0;
for ( i=0; i<n_proc; i++ ) {
for ( i=0; i<sb->n_proc; i++ ) {
int fd;
if ( finished[i] ) continue;
if ( !sb->running[i] ) continue;
fd = stream_pipe_read[i];
fd = sb->stream_pipe_read[i];
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
......@@ -471,45 +495,176 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
ERROR("select() failed: %s\n", strerror(errno));
if ( errno != EINTR ) {
ERROR("select() failed: %s\n", strerror(errno));
} /* Otherwise no big deal */
continue;
}
if ( r == 0 ) continue; /* Nothing this time. Try again */
for ( i=0; i<n_proc; i++ ) {
for ( i=0; i<sb->n_proc; i++ ) {
if ( finished[i] ) continue;
if ( !sb->running[i] ) continue;
if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue;
if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue;
pump_chunk(fhs[i], &finished[i], ofh);
if ( pump_chunk(fhs[i], sb->ofh) ) {
sb->running[i] = 0;
}
}
done = 1;
for ( i=0; i<n_proc; i++ ) {
if ( !finished[i] ) done = 0;
for ( i=0; i<sb->n_proc; i++ ) {
if ( sb->running[i] ) done = 0;
}
}
free(finished);
for ( i=0; i<n_proc; i++ ) {
for ( i=0; i<sb->n_proc; i++ ) {
fclose(fhs[i]);
}
free(fhs);
if ( ofh != stdout ) fclose(ofh);
return NULL;
}
static void start_worker_process(struct sandbox *sb, int slot)
{
pid_t p;
int filename_pipe[2];
int result_pipe[2];
if ( pipe(filename_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return;
}
if ( pipe(result_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return;
}
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
return;
}
if ( p == 0 ) {
FILE *sfh;
int j;
struct sigaction sa;
int r;
/* First, disconnect the signal handler */
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = SIG_DFL;
r = sigaction(SIGCHLD, &sa, NULL);
if ( r == -1 ) {
ERROR("Failed to set signal handler!\n");
return;
}
/* Free resources which will not be needed by worker */
for ( j=0; j<sb->n_proc; j++ ) {
if ( (j != slot) && (sb->running[j]) ) {
close(sb->stream_pipe_write[j]);
}
}
for ( j=0; j<sb->n_proc; j++ ) {
if ( (j != slot) && (sb->running[j]) ) {
fclose(sb->result_fhs[j]);
close(sb->filename_pipes[j]);
}
}
free(sb->filename_pipes);
free(sb->result_fhs);
free(sb->pids);
/* Also prefix, use_this_one_instead and fh */
/* Child process gets the 'read' end of the filename
* pipe, and the 'write' end of the result pipe. */
close(filename_pipe[1]);
close(result_pipe[0]);
sfh = fdopen(sb->stream_pipe_write[slot], "w");
run_work(sb->iargs, filename_pipe[0], result_pipe[1],
sfh, slot);
fclose(sfh);
free(sb->stream_pipe_write);
close(filename_pipe[0]);
close(result_pipe[1]);
exit(0);
}
/* Parent process gets the 'write' end of the filename pipe
* and the 'read' end of the result pipe. */
sb->pids[slot] = p;
sb->running[slot] = 1;
close(filename_pipe[0]);
close(result_pipe[1]);
sb->filename_pipes[slot] = filename_pipe[1];
sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
if ( sb->result_fhs[slot] == NULL ) {
ERROR("fdopen() failed.\n");
return;
}
}
static void signal_handler(int sig, siginfo_t *si, void *uc_v)
{
struct ucontext_t *uc = uc_v;
int i, found;
STATUS("Signal!\n");
if ( si->si_signo != SIGCHLD ) {
ERROR("Unhandled signal %i?\n", si->si_signo);
return;
}
found = 0;
for ( i=0; i<sb->n_proc; i++ ) {
if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) {
found = 1;
break;
}
}
if ( !found ) {
ERROR("SIGCHLD from unknown child %i?\n", si->si_pid);
return;
}
if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED)
|| (si->si_code == CLD_CONTINUED) ) return;
if ( si->si_code == CLD_EXITED )
{
sb->running[i] = 0;
STATUS("Worker process %i exited normally.\n", i);
return;
}
if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) {
ERROR("Unhandled si_code %i (worker process %i).\n",
si->si_code, i);
return;
}
ERROR("Worker process %i exited abnormally!\n", i);
ERROR(" -> Signal %i, last filename %s.\n",
si->si_signo, sb->last_filename[i]);
sb->running[i] = 0;
//start_worker_process(sb, i);
}
......@@ -517,34 +672,34 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, char *use_this_one_instead,
FILE *ofh)
{
int n_indexable, n_processed, n_indexable_last_stats;
int n_processed_last_stats;
int t_last_stats;
pid_t *pids;
int *filename_pipes;
int *stream_pipe_read;
int *stream_pipe_write;
FILE **result_fhs;
int i;
int allDone;
int *finished;
pid_t pr;
struct sigaction sa;
int r;
pthread_t reader_thread;
n_indexable = 0;
n_processed = 0;
n_indexable_last_stats = 0;
n_processed_last_stats = 0;
t_last_stats = get_monotonic_seconds();
sb = calloc(1, sizeof(struct sandbox));
if ( sb == NULL ) {
ERROR("Couldn't allocate memory for sandbox.\n");
return;
}
stream_pipe_read = calloc(n_proc, sizeof(int));
stream_pipe_write = calloc(n_proc, sizeof(int));
if ( stream_pipe_read == NULL ) {
sb->n_indexable = 0;
sb->n_processed = 0;
sb->n_indexable_last_stats = 0;
sb->n_processed_last_stats = 0;
sb->t_last_stats = get_monotonic_seconds();
sb->n_proc = n_proc;
sb->ofh = ofh;
sb->iargs = iargs;
sb->stream_pipe_read = calloc(n_proc, sizeof(int));
sb->stream_pipe_write = calloc(n_proc, sizeof(int));
if ( sb->stream_pipe_read == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
if ( stream_pipe_write == NULL ) {
if ( sb->stream_pipe_write == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
......@@ -558,46 +713,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
stream_pipe_read[i] = stream_pipe[0];
stream_pipe_write[i] = stream_pipe[1];
sb->stream_pipe_read[i] = stream_pipe[0];
sb->stream_pipe_write[i] = stream_pipe[1];
}
pr = fork();
if ( pr == - 1 ) {
ERROR("fork() failed (for reader process)\n");
if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
ERROR("Failed to create reader thread.\n");
return;
}
if ( pr == 0 ) {
/* Free resources not needed by reader
* (but which will be needed by worker or master) */
for ( i=0; i<n_proc; i++ ) {
close(stream_pipe_write[i]);
}
free(prefix);
free(use_this_one_instead);
free(stream_pipe_write);
cleanup_indexing(iargs->ipriv);
free(iargs->indm);
free(iargs->ipriv);
free_detector_geometry(iargs->det);
free(iargs->beam);
free(iargs->element);
free(iargs->hdf5_peak_path);
free_copy_hdf5_field_list(iargs->copyme);
cell_free(iargs->cell);
fclose(fh);
run_reader(stream_pipe_read, n_proc, ofh);
free(stream_pipe_read);
exit(0);
}
/* Set up signal handler to take action if any children die */
sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
sigemptyset(&sa.sa_mask);
......@@ -608,119 +733,37 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
/* Free resources needed by reader only */
if ( ofh != stdout ) fclose(ofh);
for ( i=0; i<n_proc; i++ ) {
close(stream_pipe_read[i]);
}
free(stream_pipe_read);
filename_pipes = calloc(n_proc, sizeof(int));
result_fhs = calloc(n_proc, sizeof(FILE *));
pids = calloc(n_proc, sizeof(pid_t));
if ( filename_pipes == NULL ) {
sb->filename_pipes = calloc(n_proc, sizeof(int));
sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
if ( result_fhs == NULL ) {
if ( sb->result_fhs == NULL ) {
ERROR("Couldn't allocate memory for pipe file handles.\n");
return;
}
if ( pids == NULL ) {
if ( sb->pids == NULL ) {
ERROR("Couldn't allocate memory for PIDs.\n");
return;
}
if ( sb->running == NULL ) {
ERROR("Couldn't allocate memory for process flags.\n");
return;
}
/* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
pid_t p;
int filename_pipe[2];
int result_pipe[2];
if ( pipe(filename_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return;
}
if ( pipe(result_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return;
}
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
return;
}
if ( p == 0 ) {
FILE *sfh;
int j;
/* Free resources which will not be needed by worker */
for ( j=0; j<n_proc; j++ ) {
if ( i != j ) close(stream_pipe_write[j]);
}
for ( j=0; j<i-1; j++ ) {
fclose(result_fhs[j]);
close(filename_pipes[j]);
}
free(prefix);
free(use_this_one_instead);
free(filename_pipes);
free(result_fhs);
fclose(fh);
free(pids);
/* Child process gets the 'read' end of the filename
* pipe, and the 'write' end of the result pipe. */
close(filename_pipe[1]);
close(result_pipe[0]);
sfh = fdopen(stream_pipe_write[i], "w");
run_work(iargs, filename_pipe[0], result_pipe[1],
sfh, i);
fclose(sfh);
free(stream_pipe_write);
close(filename_pipe[0]);
close(result_pipe[1]);
exit(0);
}
/* Parent process gets the 'write' end of the filename pipe
* and the 'read' end of the result pipe. */
pids[i] = p;
close(filename_pipe[0]);
close(result_pipe[1]);
filename_pipes[i] = filename_pipe[1];
result_fhs[i] = fdopen(result_pipe[0], "r");
if ( result_fhs[i] == NULL ) {
ERROR("fdopen() failed.\n");
return;
}
sb->last_filename = calloc(n_proc, sizeof(char *));
if ( sb->last_filename == NULL ) {
ERROR("Couldn't allocate memory for last filename list.\n");
return;
}
/* Free resources which will not be used by the main thread */
cleanup_indexing(iargs->ipriv);
free(iargs->indm);
free(iargs->ipriv);
free_detector_geometry(iargs->det);
free(iargs->beam);
free(iargs->element);
free(iargs->hdf5_peak_path);
free_copy_hdf5_field_list(iargs->copyme);
cell_free(iargs->cell);
/* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
close(stream_pipe_write[i]);
start_worker_process(sb, i);
}
free(stream_pipe_write);
/* Send first image to all children */
for ( i=0; i<n_proc; i++ ) {
......@@ -732,8 +775,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( nextImage != NULL ) {
write(filename_pipes[i], nextImage, strlen(nextImage));
write(filename_pipes[i], "\n", 1);
free(sb->last_filename[i]);
sb->last_filename[i] = strdup(nextImage);
write(sb->filename_pipes[i], nextImage,
strlen(nextImage));
write(sb->filename_pipes[i], "\n", 1);
free(nextImage);
......@@ -742,7 +789,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int r;
/* No more files to process.. already? */
r = write(filename_pipes[i], "\n", 1);
r = write(sb->filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("Write pipe\n");
}
......@@ -751,12 +798,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
finished = calloc(n_proc, sizeof(int));
if ( finished == NULL ) {
ERROR("Couldn't allocate memory for process flags.\n");
return;
}
allDone = 0;
while ( !allDone ) {
......@@ -775,9 +816,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
if ( finished[i] ) continue;
if ( !sb->running[i] ) continue;
fd = fileno(result_fhs[i]);
fd = fileno(sb->result_fhs[i]);
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
......@@ -786,7 +827,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
r = select(fdmax+1, &fds, NULL, NULL, &tv);