Commit 5a9ad87c authored by Thomas White's avatar Thomas White Committed by Thomas White
Browse files

facetron: Use new thread pool

parent 33a8fc19
......@@ -10,5 +10,5 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \
data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \
data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \
data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \
src/stream.h
src/stream.h src/thread-pool.h
SUBDIRS = src data doc doc/examples scripts
......@@ -208,7 +208,7 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \
data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \
data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \
data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \
src/stream.h
src/stream.h src/thread-pool.h
SUBDIRS = src data doc doc/examples scripts
all: config.h
......
......@@ -57,7 +57,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
image.c geometry.c reflections.c stream.c
image.c geometry.c reflections.c stream.c thread-pool.c
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
......
......@@ -73,7 +73,7 @@ cubeit_DEPENDENCIES =
am_facetron_OBJECTS = facetron.$(OBJEXT) cell.$(OBJEXT) \
hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \
peaks.$(OBJEXT) image.$(OBJEXT) geometry.$(OBJEXT) \
reflections.$(OBJEXT) stream.$(OBJEXT)
reflections.$(OBJEXT) stream.$(OBJEXT) thread-pool.$(OBJEXT)
facetron_OBJECTS = $(am_facetron_OBJECTS)
facetron_DEPENDENCIES =
am_get_hkl_OBJECTS = get_hkl.$(OBJEXT) sfac.$(OBJEXT) cell.$(OBJEXT) \
......@@ -298,7 +298,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
image.c geometry.c reflections.c stream.c
image.c geometry.c reflections.c stream.c thread-pool.c
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
......@@ -458,6 +458,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/symmetry.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/templates.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/thread-pool.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/utils.Po@am__quote@
.c.o:
......
......@@ -20,8 +20,6 @@
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#include <sys/time.h>
#include <assert.h>
#include "utils.h"
......@@ -31,30 +29,7 @@
#include "stream.h"
#include "geometry.h"
#include "peaks.h"
#define MAX_THREADS (256)
struct process_args
{
struct image *image;
/* Thread control */
pthread_mutex_t control_mutex; /* Protects the scary stuff below */
int start;
int finish;
int done;
/* Analysis routine */
void (*func)(struct process_args *);
/* Analysis parameters */
const char *sym;
pthread_mutex_t *list_lock; /* Protects 'obs', 'i_full' and 'cts' */
ReflItemList *obs;
double *i_full;
unsigned int *cts;
};
#include "thread-pool.h"
static void show_help(const char *s)
......@@ -79,14 +54,38 @@ static void show_help(const char *s)
}
static void refine_image(struct process_args *pargs)
struct refine_args
{
const char *sym;
ReflItemList *obs;
double *i_full;
struct image *image;
};
static void refine_image(int mytask, void *tasks)
{
struct refine_args *all_args = tasks;
struct refine_args *pargs = &all_args[mytask];
/* Do, er, something. */
}
static void integrate_image(struct process_args *pargs)
struct integrate_args
{
const char *sym;
ReflItemList *obs;
double *i_full;
unsigned int *cts;
pthread_mutex_t *list_lock;
struct image *image;
};
static void integrate_image(int mytask, void *tasks)
{
struct integrate_args *all_args = tasks;
struct integrate_args *pargs = &all_args[mytask];
struct reflhit *spots;
int j, n;
struct hdfile *hdfile;
......@@ -158,177 +157,27 @@ static void integrate_image(struct process_args *pargs)
}
static void *worker_thread(void *pargsv)
{
struct process_args *pargs = pargsv;
int finish;
do {
int wakeup;
/* Acknowledge start */
pthread_mutex_lock(&pargs->control_mutex);
pargs->start = 0;
pthread_mutex_unlock(&pargs->control_mutex);
pargs->func(pargs);
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 1;
pthread_mutex_unlock(&pargs->control_mutex);
/* Go to sleep until told to exit or process next image */
do {
pthread_mutex_lock(&pargs->control_mutex);
/* Either of these can result in the thread waking up */
wakeup = pargs->start || pargs->finish;
finish = pargs->finish;
pthread_mutex_unlock(&pargs->control_mutex);
usleep(20000);
} while ( !wakeup );
} while ( !pargs->finish );
return NULL;
}
static void munch_threads(struct image *images, int n_total_patterns,
struct detector *det, const char *sym,
ReflItemList *obs, double *i_full, unsigned int *cts,
int nthreads, void (*func)(struct process_args *),
const char *text)
static void refine_all(struct image *images, int n_total_patterns,
struct detector *det, const char *sym,
ReflItemList *obs, double *i_full, int nthreads)
{
pthread_t workers[MAX_THREADS];
struct process_args *worker_args[MAX_THREADS];
pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
int worker_active[MAX_THREADS];
struct refine_args *tasks;
int i;
int n_done = 0;
int n_started = 0;
/* Initialise worker arguments with the unchanging data */
for ( i=0; i<nthreads; i++ ) {
worker_args[i] = malloc(sizeof(struct process_args));
worker_active[i] = 0;
pthread_mutex_init(&worker_args[i]->control_mutex, NULL);
worker_args[i]->sym = sym;
worker_args[i]->obs = obs;
worker_args[i]->i_full = i_full;
worker_args[i]->cts = cts;
worker_args[i]->list_lock = &list_lock;
worker_args[i]->func = func;
}
/* Start threads off */
for ( i=0; i<nthreads; i++ ) {
struct process_args *pargs;
int r;
if ( n_started == n_total_patterns ) break;
pargs = worker_args[i];
pargs->image = &images[n_started++];
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pargs->start = 1;
pargs->finish = 0;
pthread_mutex_unlock(&pargs->control_mutex);
worker_active[i] = 1;
r = pthread_create(&workers[i], NULL, worker_thread, pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
}
}
/* Keep threads busy until the end of the data */
do {
int i;
for ( i=0; i<nthreads; i++ ) {
struct process_args *pargs;
int done;
/* Spend time working, not managing threads */
usleep(100000);
/* Are we using this thread record at all? */
if ( !worker_active[i] ) continue;
/* Has the thread finished yet? */
pargs = worker_args[i];
pthread_mutex_lock(&pargs->control_mutex);
done = pargs->done;
pthread_mutex_unlock(&pargs->control_mutex);
if ( !done ) continue;
/* Reset "done" flag */
pthread_mutex_lock(&pargs->control_mutex);
pargs->done = 0;
pthread_mutex_unlock(&pargs->control_mutex);
n_done++;
progress_bar(n_done, n_total_patterns, text);
/* If there are no more patterns, "done" will remain
* zero, so the last pattern will not be re-counted. */
if ( n_started == n_total_patterns ) break;
/* Start work on the next pattern */
pargs->image = &images[n_started++];
pthread_mutex_lock(&pargs->control_mutex);
pargs->start = 1;
pthread_mutex_unlock(&pargs->control_mutex);
}
} while ( n_started < n_total_patterns );
/* Join threads */
for ( i=0; i<nthreads; i++ ) {
if ( !worker_active[i] ) continue;
/* Tell the thread to exit */
struct process_args *pargs = worker_args[i];
pthread_mutex_lock(&pargs->control_mutex);
pargs->finish = 1;
pthread_mutex_unlock(&pargs->control_mutex);
/* Wait for it to join */
pthread_join(workers[i], NULL);
if ( pargs->done ) {
n_done++;
progress_bar(n_done, n_total_patterns, text);
} /* else this thread was not busy */
tasks = malloc(n_total_patterns * sizeof(struct refine_args));
for ( i=0; i<n_total_patterns; i++ ) {
}
tasks[i].sym = sym;
tasks[i].obs = obs;
tasks[i].i_full = i_full;
tasks[i].image = &images[i];
for ( i=0; i<nthreads; i++ ) {
free(worker_args[i]);
}
}
munch_threads(n_total_patterns, nthreads, "Refining",
refine_image, tasks);
static void refine_all(struct image *images, int n_total_patterns,
struct detector *det, const char *sym,
ReflItemList *obs, double *i_full, int nthreads)
{
munch_threads(images, n_total_patterns, det, sym, obs, i_full, NULL,
nthreads, refine_image, "Refining");
free(tasks);
}
......@@ -338,12 +187,28 @@ static void estimate_full(struct image *images, int n_total_patterns,
{
int i;
unsigned int *cts;
struct integrate_args *tasks;
pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
cts = new_list_count();
clear_items(obs);
munch_threads(images, n_total_patterns, det, sym, obs, i_full, cts,
nthreads, integrate_image, "Integrating");
tasks = malloc(n_total_patterns * sizeof(struct integrate_args));
for ( i=0; i<n_total_patterns; i++ ) {
tasks[i].sym = sym;
tasks[i].obs = obs;
tasks[i].i_full = i_full;
tasks[i].cts = cts;
tasks[i].list_lock = &list_lock;
tasks[i].image = &images[i];
}
munch_threads(n_total_patterns, nthreads, "Integrating",
integrate_image, tasks);
free(tasks);
/* Divide the totals to get the means */
for ( i=0; i<num_items(obs); i++ ) {
......
/*
* thread-pool.c
*
* A thread pool implementation
*
* (c) 2006-2010 Thomas White <taw@physics.org>
*
* Part of CrystFEL - crystallography with a FEL
*
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "utils.h"
struct task_queue
{
pthread_mutex_t lock;
int n_tasks;
int *done;
int n_done;
void (*work)(int, void *);
void *work_args;
const char *text;
};
static void *worker_thread(void *pargsv)
{
struct task_queue *q = pargsv;
do {
int i;
int found = 0;
int mytask = -1;
/* Get a task */
pthread_mutex_lock(&q->lock);
for ( i=0; i<q->n_tasks; i++ ) {
if ( q->done[i] == 0 ) {
mytask = i;
found = 1;
}
}
pthread_mutex_unlock(&q->lock);
/* No more tasks? */
if ( !found ) break;
q->work(mytask, q->work_args);
/* Mark this task as done, update totals etc */
pthread_mutex_lock(&q->lock);
q->done[mytask] = 1;
q->n_done++;
progress_bar(q->n_done, q->n_tasks, q->text);
pthread_mutex_unlock(&q->lock);
} while ( 1 );
return NULL;
}
void munch_threads(int n_tasks, int n_threads, const char *text,
void (*work)(int, void *), void *work_args)
{
pthread_t *workers;
int i;
struct task_queue q;
/* The nation of CrystFEL prides itself on having 0% unemployment. */
if ( n_threads > n_tasks ) n_threads = n_tasks;
workers = malloc(n_threads * sizeof(pthread_t));
q.done = malloc(n_tasks * sizeof(int));
pthread_mutex_init(&q.lock, NULL);
q.n_tasks = n_tasks;
q.work = work;
q.work_args = work_args;
q.n_done = 0;
q.text = text;
for ( i=0; i<n_tasks; i++ ) {
q.done[i] = 0;
}
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) {
ERROR("Couldn't start thread %i\n", i);
n_threads = i;
break;
}
}
/* Join threads */
for ( i=0; i<n_threads; i++ ) {
pthread_join(workers[i], NULL);
}
free(q.done);
free(workers);
}
/*
* thread-pool.h
*
* A thread pool implementation
*
* (c) 2006-2010 Thomas White <taw@physics.org>
*
* Part of CrystFEL - crystallography with a FEL
*
*/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
extern void munch_threads(int n_tasks, int n_threads, const char *text,
void (*work)(int, void *), void *work_args);
#endif /* THREAD_POOL_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment