Commit a22d0dc8 authored by Thomas White's avatar Thomas White
Browse files

indexamajig: Use new thread pool

parent a7d2cab1
......@@ -24,7 +24,7 @@ process_hkl_LDADD = @LIBS@
indexamajig_SOURCES = indexamajig.c hdf5-file.c utils.c cell.c image.c \
peaks.c index.c filters.c diffraction.c detector.c \
sfac.c dirax.c reflections.c templates.c symmetry.c \
geometry.c
geometry.c thread-pool.c
indexamajig_LDADD = @LIBS@
if HAVE_OPENCL
indexamajig_SOURCES += diffraction-gpu.c cl-utils.c
......
......@@ -91,7 +91,7 @@ hdfsee_DEPENDENCIES =
am__indexamajig_SOURCES_DIST = indexamajig.c hdf5-file.c utils.c \
cell.c image.c peaks.c index.c filters.c diffraction.c \
detector.c sfac.c dirax.c reflections.c templates.c symmetry.c \
geometry.c diffraction-gpu.c cl-utils.c
geometry.c thread-pool.c diffraction-gpu.c cl-utils.c
@HAVE_OPENCL_TRUE@am__objects_1 = diffraction-gpu.$(OBJEXT) \
@HAVE_OPENCL_TRUE@ cl-utils.$(OBJEXT)
am_indexamajig_OBJECTS = indexamajig.$(OBJEXT) hdf5-file.$(OBJEXT) \
......@@ -99,7 +99,7 @@ am_indexamajig_OBJECTS = indexamajig.$(OBJEXT) hdf5-file.$(OBJEXT) \
index.$(OBJEXT) filters.$(OBJEXT) diffraction.$(OBJEXT) \
detector.$(OBJEXT) sfac.$(OBJEXT) dirax.$(OBJEXT) \
reflections.$(OBJEXT) templates.$(OBJEXT) symmetry.$(OBJEXT) \
geometry.$(OBJEXT) $(am__objects_1)
geometry.$(OBJEXT) thread-pool.$(OBJEXT) $(am__objects_1)
indexamajig_OBJECTS = $(am_indexamajig_OBJECTS)
indexamajig_DEPENDENCIES =
am__pattern_sim_SOURCES_DIST = pattern_sim.c diffraction.c utils.c \
......@@ -273,7 +273,7 @@ process_hkl_LDADD = @LIBS@
indexamajig_SOURCES = indexamajig.c hdf5-file.c utils.c cell.c image.c \
peaks.c index.c filters.c diffraction.c detector.c sfac.c \
dirax.c reflections.c templates.c symmetry.c geometry.c \
$(am__append_3)
thread-pool.c $(am__append_3)
indexamajig_LDADD = @LIBS@
@HAVE_GTK_TRUE@hdfsee_SOURCES = hdfsee.c displaywindow.c render.c hdf5-file.c utils.c image.c \
@HAVE_GTK_TRUE@ filters.c
......
......@@ -389,7 +389,7 @@ int main(int argc, char *argv[])
do {
n_done = run_threads(nthreads, add_image, get_image,
(void *)&qargs, chunk_size);
(void *)&qargs, NULL, chunk_size);
n_images += n_done;
......
......@@ -568,7 +568,7 @@ int main(int argc, char *argv[])
qargs.static_args.bes = &bes;
qargs.static_args.gas = &gas;
n_images = run_threads(nthreads, sum_image, get_image, &qargs, 0);
n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0);
fclose(fh);
......
......@@ -35,9 +35,7 @@
#include "sfac.h"
#include "filters.h"
#include "reflections.h"
#define MAX_THREADS (96)
#include "thread-pool.h"
enum {
......@@ -46,11 +44,9 @@ enum {
};
struct process_args
/* Information about the indexing process which is common to all patterns */
struct static_index_args
{
/* Input */
char *filename;
int id;
pthread_mutex_t *gpu_mutex; /* Protects "gctx" */
UnitCell *cell;
int config_cmfilter;
......@@ -76,20 +72,37 @@ struct process_args
struct gpu_context *gctx;
int peaks;
/* Thread control and output */
pthread_mutex_t control_mutex; /* Protects the scary stuff below */
int start;
int finish;
int done;
int indexable;
int sane;
/* Output stream */
pthread_mutex_t *output_mutex; /* Protects the output stream */
FILE *ofh;
};
/* Information about the indexing process for one pattern */
struct index_args
{
/* "Input" */
char *filename;
struct static_index_args static_args;
/* "Output" */
int indexable;
int sane;
};
/* Information needed to choose the next task and dispatch it */
struct queue_args
{
FILE *fh;
char *prefix;
struct static_index_args static_args;
int n_indexable;
int n_sane;
};
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
......@@ -100,6 +113,7 @@ static void show_help(const char *s)
"\n"
" -i, --input=<filename> Specify file containing list of images to process.\n"
" '-' means stdin, which is the default.\n"
" -o, --output=<filename> Write indexed stream to this file. '-' for stdout.\n"
"\n"
" --indexing=<method> Use 'method' for indexing. Choose from:\n"
" none : no indexing (default)\n"
......@@ -273,38 +287,39 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx,
}
static void process_image(struct process_args *pargs)
static void process_image(void *pp, int cookie)
{
struct index_args *pargs = pp;
struct hdfile *hdfile;
struct image image;
struct image *simage;
float *data_for_measurement;
size_t data_size;
char *filename = pargs->filename;
UnitCell *cell = pargs->cell;
int config_cmfilter = pargs->config_cmfilter;
int config_noisefilter = pargs->config_noisefilter;
int config_writedrx = pargs->config_writedrx;
int config_dumpfound = pargs->config_dumpfound;
int config_verbose = pargs->config_verbose;
int config_alternate = pargs->config_alternate;
int config_nearbragg = pargs->config_nearbragg;
int config_gpu = pargs->config_gpu;
int config_simulate = pargs->config_simulate;
int config_nomatch = pargs->config_nomatch;
int config_polar = pargs->config_polar;
IndexingMethod indm = pargs->indm;
const double *intensities = pargs->intensities;
struct gpu_context *gctx = pargs->gctx;
UnitCell *cell = pargs->static_args.cell;
int config_cmfilter = pargs->static_args.config_cmfilter;
int config_noisefilter = pargs->static_args.config_noisefilter;
int config_writedrx = pargs->static_args.config_writedrx;
int config_dumpfound = pargs->static_args.config_dumpfound;
int config_verbose = pargs->static_args.config_verbose;
int config_alternate = pargs->static_args.config_alternate;
int config_nearbragg = pargs->static_args.config_nearbragg;
int config_gpu = pargs->static_args.config_gpu;
int config_simulate = pargs->static_args.config_simulate;
int config_nomatch = pargs->static_args.config_nomatch;
int config_polar = pargs->static_args.config_polar;
IndexingMethod indm = pargs->static_args.indm;
const double *intensities = pargs->static_args.intensities;
struct gpu_context *gctx = pargs->static_args.gctx;
image.features = NULL;
image.data = NULL;
image.indexed_cell = NULL;
image.id = pargs->id;
image.id = cookie;
image.filename = filename;
image.hits = NULL;
image.n_hits = 0;
image.det = pargs->det;
image.det = pargs->static_args.det;
/* View head-on (unit cell is tilted) */
image.orientation.w = 1.0;
......@@ -325,7 +340,7 @@ static void process_image(struct process_args *pargs)
return;
}
hdf5_read(hdfile, &image, pargs->config_satcorr);
hdf5_read(hdfile, &image, pargs->static_args.config_satcorr);
if ( config_cmfilter ) {
filter_cm(&image);
......@@ -342,7 +357,7 @@ static void process_image(struct process_args *pargs)
memcpy(data_for_measurement, image.data, data_size);
}
switch ( pargs->peaks )
switch ( pargs->static_args.peaks )
{
case PEAK_HDF5 :
/* Get peaks from HDF5 */
......@@ -352,7 +367,7 @@ static void process_image(struct process_args *pargs)
}
break;
case PEAK_ZAEF :
search_peaks(&image, pargs->threshold);
search_peaks(&image, pargs->static_args.threshold);
break;
}
......@@ -362,7 +377,8 @@ static void process_image(struct process_args *pargs)
image.data = data_for_measurement;
if ( config_dumpfound ) {
dump_peaks(&image, pargs->ofh, pargs->output_mutex);
dump_peaks(&image, pargs->static_args.ofh,
pargs->static_args.output_mutex);
}
/* Not indexing nor writing xfel.drx?
......@@ -374,7 +390,7 @@ static void process_image(struct process_args *pargs)
/* Calculate orientation matrix (by magic) */
if ( config_writedrx || (indm != INDEXING_NONE) ) {
index_pattern(&image, cell, indm, config_nomatch,
config_verbose, pargs->ipriv);
config_verbose, pargs->static_args.ipriv);
}
/* No cell at this point? Then we're done. */
......@@ -382,7 +398,7 @@ static void process_image(struct process_args *pargs)
pargs->indexable = 1;
/* Sanity check */
if ( pargs->config_sanity
if ( pargs->static_args.config_sanity
&& !peak_sanity_check(&image, image.indexed_cell, 0, 0.1) ) {
STATUS("Failed peak sanity check.\n");
goto done;
......@@ -393,9 +409,10 @@ static void process_image(struct process_args *pargs)
/* Measure intensities if requested */
if ( config_nearbragg ) {
output_intensities(&image, image.indexed_cell,
pargs->output_mutex, config_polar,
pargs->config_sa, pargs->config_closer,
pargs->ofh, 0, 0.1);
pargs->static_args.output_mutex,
config_polar, pargs->static_args.config_sa,
pargs->static_args.config_closer,
pargs->static_args.ofh, 0, 0.1);
}
simage = get_simage(&image, config_alternate);
......@@ -403,10 +420,10 @@ static void process_image(struct process_args *pargs)
/* Simulate if requested */
if ( config_simulate ) {
if ( config_gpu ) {
pthread_mutex_lock(pargs->gpu_mutex);
pthread_mutex_lock(pargs->static_args.gpu_mutex);
simulate_and_write(simage, &gctx, intensities,
image.indexed_cell);
pthread_mutex_unlock(pargs->gpu_mutex);
pthread_mutex_unlock(pargs->static_args.gpu_mutex);
} else {
simulate_and_write(simage, NULL, intensities,
image.indexed_cell);
......@@ -430,37 +447,40 @@ done:
}
static void *worker_thread(void *pargsv)
static void *get_image(void *qp)
{
struct process_args *pargs = pargsv;
int finish;
char line[1024];
struct index_args *pargs;
char *rval;
struct queue_args *qargs = qp;
do {
/* Get the next filename */
rval = fgets(line, 1023, qargs->fh);
if ( rval == NULL ) return NULL;
int wakeup;
pargs = malloc(sizeof(struct index_args));
process_image(pargs);
memcpy(&pargs->static_args, &qargs->static_args,
sizeof(struct static_index_args));
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 1;
pargs->start = 0;
pthread_mutex_unlock(&pargs->control_mutex);
chomp(line);
pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1);
snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line);
/* Go to sleep until told to exit or process next image */
do {
return pargs;
}
pthread_mutex_lock(&pargs->control_mutex);
/* Either of these can result in the thread waking up */
wakeup = pargs->start || pargs->finish;
finish = pargs->finish;
pthread_mutex_unlock(&pargs->control_mutex);
usleep(20000);
} while ( !wakeup );
static void finalise_image(void *qp, void *pp)
{
struct queue_args *qargs = qp;
struct index_args *pargs = pp;
} while ( !pargs->finish );
qargs->n_indexable += pargs->indexable;
qargs->n_sane += pargs->sane;
return NULL;
free(pargs->filename);
free(pargs);
}
......@@ -474,8 +494,6 @@ int main(int argc, char *argv[])
FILE *ofh;
char *rval = NULL;
int n_images;
int n_indexable;
int n_sane;
int config_noindex = 0;
int config_dumpfound = 0;
int config_nearbragg = 0;
......@@ -506,15 +524,13 @@ int main(int argc, char *argv[])
char *speaks = NULL;
int peaks;
int nthreads = 1;
pthread_t workers[MAX_THREADS];
struct process_args *worker_args[MAX_THREADS];
int worker_active[MAX_THREADS];
int i;
pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t gpu_mutex = PTHREAD_MUTEX_INITIALIZER;
char prepare_line[1024];
char prepare_filename[1024];
IndexingPrivate *ipriv;
struct queue_args qargs;
/* Long options */
const struct option longopts[] = {
......@@ -670,7 +686,7 @@ int main(int argc, char *argv[])
}
}
if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) {
if ( nthreads == 0 ) {
ERROR("Invalid number of threads.\n");
return 1;
}
......@@ -740,158 +756,41 @@ int main(int argc, char *argv[])
}
gsl_set_error_handler_off();
n_images = 0;
n_indexable = 0;
n_sane = 0;
for ( i=0; i<nthreads; i++ ) {
worker_args[i] = malloc(sizeof(struct process_args));
worker_args[i]->filename = malloc(1024);
worker_args[i]->ofh = ofh;
worker_args[i]->peaks = peaks;
worker_active[i] = 0;
}
/* Start threads off */
for ( i=0; i<nthreads; i++ ) {
char line[1024];
struct process_args *pargs;
int r;
pargs = worker_args[i];
if ( strlen(prepare_line) > 0 ) {
strcpy(line, prepare_line);
prepare_line[0] = '\0';
} else {
rval = fgets(line, 1023, fh);
if ( rval == NULL ) continue;
}
chomp(line);
snprintf(pargs->filename, 1023, "%s%s", prefix, line);
n_images++;
pargs->output_mutex = &output_mutex;
pargs->gpu_mutex = &gpu_mutex;
pthread_mutex_init(&pargs->control_mutex, NULL);
pargs->config_cmfilter = config_cmfilter;
pargs->config_noisefilter = config_noisefilter;
pargs->config_writedrx = config_writedrx;
pargs->config_dumpfound = config_dumpfound;
pargs->config_verbose = config_verbose;
pargs->config_alternate = config_alternate;
pargs->config_nearbragg = config_nearbragg;
pargs->config_gpu = config_gpu;
pargs->config_simulate = config_simulate;
pargs->config_nomatch = config_nomatch;
pargs->config_polar = config_polar;
pargs->config_sanity = config_sanity;
pargs->config_satcorr = config_satcorr;
pargs->config_sa = config_sa;
pargs->config_closer = config_closer;
pargs->cell = cell;
pargs->det = det;
pargs->ipriv = ipriv;
pargs->indm = indm;
pargs->intensities = intensities;
pargs->gctx = gctx;
pargs->threshold = threshold;
pargs->id = i;
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pargs->start = 1;
pargs->finish = 0;
pthread_mutex_unlock(&pargs->control_mutex);
worker_active[i] = 1;
r = pthread_create(&workers[i], NULL, worker_thread, pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
}
}
/* Keep threads busy until the end of the data */
do {
int i;
for ( i=0; i<nthreads; i++ ) {
char line[1024];
struct process_args *pargs;
int done;
/* Spend CPU time indexing, not checking results */
usleep(100000);
/* Are we using this thread record at all? */
if ( !worker_active[i] ) continue;
/* Has the thread finished yet? */
pargs = worker_args[i];
pthread_mutex_lock(&pargs->control_mutex);
done = pargs->done;
pthread_mutex_unlock(&pargs->control_mutex);
if ( !done ) continue;
/* Results will be processed after checking if
* there are any more images to process. */
/* Get next filename */
rval = fgets(line, 1023, fh);
/* In this case, the result of the last file
* file will be processed when the thread is
* joined. */
if ( rval == NULL ) break;
/* Record the result */
n_indexable += pargs->indexable;
n_sane += pargs->sane;
chomp(line);
snprintf(pargs->filename, 1023, "%s%s", prefix, line);
n_images++;
/* Wake the thread up ... */
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pargs->start = 1;
pthread_mutex_unlock(&pargs->control_mutex);
}
} while ( rval != NULL );
/* Join threads */
for ( i=0; i<nthreads; i++ ) {
if ( !worker_active[i] ) goto free;
/* Tell the thread to exit */
struct process_args *pargs = worker_args[i];
pthread_mutex_lock(&pargs->control_mutex);
pargs->finish = 1;
pthread_mutex_unlock(&pargs->control_mutex);
/* Wait for it to join */
pthread_join(workers[i], NULL);
worker_active[i] = 0;
n_indexable += pargs->indexable;
n_sane += pargs->sane;
free:
if ( worker_args[i]->filename != NULL ) {
free(worker_args[i]->filename);
}
free(worker_args[i]);
}
qargs.static_args.gpu_mutex = &gpu_mutex;
qargs.static_args.cell = cell;
qargs.static_args.config_cmfilter = config_cmfilter;
qargs.static_args.config_noisefilter = config_noisefilter;
qargs.static_args.config_writedrx = config_writedrx;
qargs.static_args.config_dumpfound = config_dumpfound;
qargs.static_args.config_verbose = config_verbose;
qargs.static_args.config_alternate = config_alternate;
qargs.static_args.config_nearbragg = config_nearbragg;
qargs.static_args.config_gpu = config_gpu;
qargs.static_args.config_simulate = config_simulate;
qargs.static_args.config_nomatch = config_nomatch;
qargs.static_args.config_polar = config_polar;
qargs.static_args.config_sanity = config_sanity;
qargs.static_args.config_satcorr = config_satcorr;
qargs.static_args.config_sa = config_sa;
qargs.static_args.config_closer = config_closer;
qargs.static_args.threshold = threshold;
qargs.static_args.det = det;
qargs.static_args.indm = indm;
qargs.static_args.ipriv = ipriv;
qargs.static_args.intensities = intensities;
qargs.static_args.gctx = gctx;
qargs.static_args.peaks = peaks;
qargs.static_args.output_mutex = &output_mutex;
qargs.static_args.ofh = ofh;
qargs.fh = fh;
qargs.prefix = prefix;
qargs.n_indexable = 0;
qargs.n_sane = 0;
n_images = run_threads(nthreads, process_image, get_image,
finalise_image, &qargs, 0);
cleanup_indexing(ipriv);
......@@ -902,7 +801,7 @@ int main(int argc, char *argv[])
fclose(fh);
STATUS("There were %i images. %i could be indexed, of which %i"
" looked sane.\n", n_images, n_indexable, n_sane);
" looked sane.\n", n_images, qargs.n_indexable, qargs.n_sane);
if ( gctx != NULL ) {
cleanup_gpu(gctx);
......
......@@ -214,7 +214,7 @@ static void integrate_all(int nthreads, struct detector *det, FILE *fh,
qargs.static_args.config_sanity = qargs.static_args.config_sanity;
qargs.static_args.output_mutex = &output_mutex;
run_threads(nthreads, process_image, get_image, &qargs, 0);
run_threads(nthreads, process_image, get_image, NULL, &qargs, 0);
}
......
......@@ -150,6 +150,7 @@ struct task_queue
int *cookies;
void *(*get_task)(void *);
void (*finalise)(void *, void *);
void *queue_args;
void (*work)(void *, int);
};
......@@ -200,6 +201,9 @@ static void *task_worker(void *pargsv)
pthread_mutex_lock(&q->lock);
q->n_completed++;
q->cookies[mycookie] = 0;
if ( q->finalise ) {
q->finalise(q, task);
}
pthread_mutex_unlock(&q->lock);
} while ( 1 );
......@@ -209,7 +213,8 @@ static void *task_worker(void *pargsv)
int run_threads(int n_threads, void (*work)(void *, int),
void *(*get_task)(void *), void *queue_args, int max)
void *(*get_task)(void *), void (*final)(void *, void *),
void *queue_args, int max)
{
pthread_t *workers;
int i;
......@@ -220,6 +225,7 @@ int run_threads(int n_threads, void (*work)(void *, int),
pthread_mutex_init(&q.lock, NULL);
q.work = work;
q.get_task = get_task;
q.finalise = final;
q.queue_args = queue_args;
q.n_started = 0;
q.n_completed = 0;
......
......@@ -20,7 +20,7 @@
/* work() will be called with a number and work_args. The number will be
* unique and in the range 0..n_tasks. A progress bar will be shown using
* "text" and the progress through the tasks. */
* "text" and the progress through the tasks, unless "text" is NULL. */
extern void run_thread_range(int n_tasks, int n_threads, const char *text,
void (*work)(int, void *), void *work_args);
......@@ -28,10 +28,14 @@ extern void run_thread_range(int n_tasks, int n_threads, const char *text,
/* get_task() will be called every time a worker is idle. It returns either
* NULL, indicating that no further work is available, or a pointer which will
* be passed to work(). Work will stop after 'max' tasks have been processed.
* get_task() does not need to be re-entrant.
* final() will be called once per image, and will be given both queue_args