Commit 8cb69e84 authored by Thomas White's avatar Thomas White
Browse files

indexamajig: Rework threading not to use pthread_timedjoin_np()

parent 6aee185f
......@@ -42,6 +42,7 @@
struct process_args
{
/* Input */
char *filename;
int id;
pthread_mutex_t *output_mutex; /* Protects stdout */
......@@ -66,8 +67,17 @@ struct process_args
const double *intensities;
const unsigned int *counts;
struct gpu_context *gctx;
/* Thread control and output */
pthread_mutex_t control_mutex; /* Protects the scary stuff below */
int start;
int finish;
int done;
int hit;
int peaks_sane;
};
struct process_result
{
int hit;
......@@ -237,13 +247,13 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx,
}
static void *process_image(void *pargsv)
static struct process_result process_image(struct process_args *pargs)
{
struct process_args *pargs = pargsv;
struct hdfile *hdfile;
struct image image;
struct image *simage;
float *data_for_measurement;
struct process_result result;
size_t data_size;
const char *filename = pargs->filename;
UnitCell *cell = pargs->cell;
......@@ -262,7 +272,6 @@ static void *process_image(void *pargsv)
const double *intensities = pargs->intensities;
const unsigned int *counts = pargs->counts;
struct gpu_context *gctx = pargs->gctx;
struct process_result *result;
image.features = NULL;
image.data = NULL;
......@@ -281,10 +290,8 @@ static void *process_image(void *pargsv)
STATUS("Processing '%s'\n", image.filename);
result = malloc(sizeof(*result));
if ( result == NULL ) return NULL;
result->peaks_sane = 0;
result->hit = 0;
result.peaks_sane = 0;
result.hit = 0;
hdfile = hdfile_open(filename);
if ( hdfile == NULL ) {
......@@ -344,7 +351,7 @@ static void *process_image(void *pargsv)
STATUS("Failed peak sanity check.\n");
goto done;
} else {
result->peaks_sane = 1;
result.peaks_sane = 1;
}
/* Measure intensities if requested */
......@@ -385,14 +392,51 @@ done:
hdfile_close(hdfile);
if ( image.indexed_cell == NULL ) {
result->hit = 0;
result.hit = 0;
} else {
result->hit = 1;
result.hit = 1;
}
return result;
}
static void *worker_thread(void *pargsv)
{
struct process_args *pargs = pargsv;
int finish;
do {
struct process_result result;
int wakeup;
result = process_image(pargs);
pthread_mutex_lock(&pargs->control_mutex);
pargs->hit = result.hit;
pargs->peaks_sane = result.peaks_sane;
pargs->done = 1;
pargs->start = 0;
pthread_mutex_unlock(&pargs->control_mutex);
/* Go to sleep until told to exit or process next image */
do {
pthread_mutex_lock(&pargs->control_mutex);
/* Either of these can result in the thread waking up */
wakeup = pargs->start || pargs->finish;
finish = pargs->finish;
pthread_mutex_unlock(&pargs->control_mutex);
usleep(20000);
} while ( !wakeup );
} while ( !pargs->finish );
return NULL;
}
int main(int argc, char *argv[])
{
int c;
......@@ -590,7 +634,7 @@ int main(int argc, char *argv[])
worker_active[i] = 0;
}
/* Initially, fire off the full number of threads */
/* Start threads off */
for ( i=0; i<nthreads; i++ ) {
char line[1024];
......@@ -608,6 +652,7 @@ int main(int argc, char *argv[])
pargs->output_mutex = &output_mutex;
pargs->gpu_mutex = &gpu_mutex;
pthread_mutex_init(&pargs->control_mutex, NULL);
pargs->config_cmfilter = config_cmfilter;
pargs->config_noisefilter = config_noisefilter;
pargs->config_writedrx = config_writedrx;
......@@ -629,9 +674,14 @@ int main(int argc, char *argv[])
pargs->counts = counts;
pargs->gctx = gctx;
pargs->id = i;
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pargs->start = 1;
pargs->finish = 0;
pthread_mutex_unlock(&pargs->control_mutex);
worker_active[i] = 1;
r = pthread_create(&workers[i], NULL, process_image, pargs);
r = pthread_create(&workers[i], NULL, worker_thread, pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
......@@ -639,7 +689,7 @@ int main(int argc, char *argv[])
}
/* Start new threads as old ones finish */
/* Keep threads busy until the end of the data */
do {
int i;
......@@ -647,65 +697,59 @@ int main(int argc, char *argv[])
for ( i=0; i<nthreads; i++ ) {
char line[1024];
int r;
struct process_result *result = NULL;
struct timespec t;
struct timeval tv;
struct process_args *pargs;
int done;
/* Spend CPU time indexing, not checking results */
usleep(100000);
/* Are we using this thread record at all? */
if ( !worker_active[i] ) continue;
/* Has the thread finished yet? */
pargs = worker_args[i];
pthread_mutex_lock(&pargs->control_mutex);
done = pargs->done;
pthread_mutex_unlock(&pargs->control_mutex);
if ( !done ) continue;
gettimeofday(&tv, NULL);
t.tv_sec = tv.tv_sec;
t.tv_nsec = tv.tv_usec * 1000 + 20000;
r = pthread_timedjoin_np(workers[i], (void *)&result,
&t);
if ( r != 0 ) continue; /* Not ready yet */
worker_active[i] = 0;
if ( result != NULL ) {
n_hits += result->hit;
n_sane += result->peaks_sane;
free(result);
}
/* Record the result */
n_hits += pargs->hit;
n_sane += pargs->peaks_sane;
/* Get next filename */
rval = fgets(line, 1023, fh);
if ( rval == NULL ) break;
chomp(line);
snprintf(pargs->filename, 1023, "%s%s", prefix, line);
worker_active[i] = 1;
r = pthread_create(&workers[i], NULL, process_image,
pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
}
n_images++;
/* Wake the thread up ... */
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pargs->start = 1;
pthread_mutex_unlock(&pargs->control_mutex);
}
} while ( rval != NULL );
/* Catch all remaining threads */
/* Join threads */
for ( i=0; i<nthreads; i++ ) {
struct process_result *result = NULL;
if ( !worker_active[i] ) goto free;
pthread_join(workers[i], (void *)&result);
/* Tell the thread to exit */
struct process_args *pargs = worker_args[i];
pargs->finish = 1;
/* Wait for it to join */
pthread_join(workers[i], NULL);
worker_active[i] = 0;
if ( result != NULL ) {
n_hits += result->hit;
free(result);
}
n_hits += pargs->hit;
n_sane += pargs->peaks_sane;
free:
if ( worker_args[i]->filename != NULL ) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment