Commit 894f2241 authored by Thomas White's avatar Thomas White
Browse files

Remove run_thread_range()

parent 86b8641c
......@@ -71,40 +71,76 @@ struct refine_args
};
static void refine_image(int mytask, void *tasks)
struct queue_args
{
struct refine_args *all_args = tasks;
struct refine_args *pargs = &all_args[mytask];
int n;
int n_done;
int n_total_patterns;
struct image *images;
struct refine_args task_defaults;
};
static void refine_image(void *task, int id)
{
struct refine_args *pargs = task;
struct image *image = pargs->image;
image->id = id;
pr_refine(image, pargs->full, pargs->sym);
}
static void refine_all(struct image *images, int n_total_patterns,
struct detector *det, const char *sym,
ReflItemList *obs, RefList *full, int nthreads,
FILE *graph, FILE *pgraph)
static void *get_image(void *vqargs)
{
struct refine_args *tasks;
int i;
struct refine_args *task;
struct queue_args *qargs = vqargs;
tasks = malloc(n_total_patterns * sizeof(struct refine_args));
for ( i=0; i<n_total_patterns; i++ ) {
task = malloc(sizeof(struct refine_args));
memcpy(task, &qargs->task_defaults, sizeof(struct refine_args));
tasks[i].sym = sym;
tasks[i].obs = obs;
tasks[i].full = full;
tasks[i].image = &images[i];
tasks[i].graph = graph;
tasks[i].pgraph = pgraph;
task->image = &qargs->images[qargs->n];
}
qargs->n++;
run_thread_range(n_total_patterns, nthreads, "Refining",
refine_image, tasks, 0, 0, 0);
return task;
}
static void done_image(void *vqargs, void *task)
{
struct queue_args *qargs = vqargs;
qargs->n_done++;
progress_bar(qargs->n_done, qargs->n_total_patterns, "Refining");
free(task);
}
free(tasks);
static void refine_all(struct image *images, int n_total_patterns,
struct detector *det, const char *sym,
ReflItemList *obs, RefList *full, int nthreads,
FILE *graph, FILE *pgraph)
{
struct refine_args task_defaults;
struct queue_args qargs;
task_defaults.sym = sym;
task_defaults.obs = obs;
task_defaults.full = full;
task_defaults.image = NULL;
task_defaults.graph = graph;
task_defaults.pgraph = pgraph;
qargs.task_defaults = task_defaults;
qargs.n = 0;
qargs.n_done = 0;
qargs.n_total_patterns = n_total_patterns;
qargs.images = images;
run_threads(nthreads, refine_image, get_image, done_image,
&qargs, n_total_patterns, 0, 0, 0);
}
......
......@@ -114,149 +114,6 @@ signed int get_status_label()
}
/* ---------------------------------- Range --------------------------------- */
enum {
TASK_READY,
TASK_RUNNING,
TASK_FINISHED,
};
struct task_queue_range
{
pthread_mutex_t lock;
int n_tasks;
int *status;
int n_done;
void (*work)(int, void *);
void *work_args;
const char *text;
};
static void *range_worker(void *pargsv)
{
struct worker_args *w = pargsv;
struct task_queue_range *q = w->tqr;
int *cookie;
set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset);
cookie = malloc(sizeof(int));
*cookie = w->id;
pthread_setspecific(status_label_key, cookie);
free(w);
do {
int i;
int found = 0;
int mytask = -1;
/* Get a task */
pthread_mutex_lock(&q->lock);
for ( i=0; i<q->n_tasks; i++ ) {
if ( q->status[i] == TASK_READY ) {
mytask = i;
found = 1;
q->status[i] = TASK_RUNNING;
break;
}
}
pthread_mutex_unlock(&q->lock);
/* No more tasks? */
if ( !found ) break;
q->work(mytask, q->work_args);
/* Mark this task as done, update totals etc */
pthread_mutex_lock(&q->lock);
q->status[mytask] = TASK_FINISHED;
q->n_done++;
if ( q->text != NULL ) {
progress_bar(q->n_done, q->n_tasks, q->text);
}
pthread_mutex_unlock(&q->lock);
} while ( 1 );
free(cookie);
return NULL;
}
void run_thread_range(int n_tasks, int n_threads, const char *text,
void (*work)(int, void *), void *work_args,
int cpu_num, int cpu_groupsize, int cpu_offset)
{
pthread_t *workers;
int i;
struct task_queue_range q;
/* The nation of CrystFEL prides itself on having 0% unemployment. */
if ( n_threads > n_tasks ) n_threads = n_tasks;
pthread_key_create(&status_label_key, NULL);
workers = malloc(n_threads * sizeof(pthread_t));
q.status = malloc(n_tasks * sizeof(int));
pthread_mutex_init(&q.lock, NULL);
q.n_tasks = n_tasks;
q.work = work;
q.work_args = work_args;
q.n_done = 0;
q.text = text;
for ( i=0; i<n_tasks; i++ ) {
q.status[i] = TASK_READY;
}
/* Now it's safe to start using the status labels */
if ( n_threads > 1 ) use_status_labels = 1;
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
struct worker_args *w;
w = malloc(sizeof(struct worker_args));
w->tqr = &q;
w->tq = NULL;
w->id = i;
w->cpu_num = cpu_num;
w->cpu_groupsize = cpu_groupsize;
w->cpu_offset = cpu_offset;
if ( pthread_create(&workers[i], NULL, range_worker, w) ) {
/* Not ERROR() here */
fprintf(stderr, "Couldn't start thread %i\n", i);
n_threads = i;
break;
}
}
/* Join threads */
for ( i=0; i<n_threads; i++ ) {
pthread_join(workers[i], NULL);
}
use_status_labels = 0;
free(q.status);
free(workers);
}
/* ---------------------------- Custom get_task() --------------------------- */
struct task_queue
......
......@@ -24,14 +24,6 @@ extern pthread_mutex_t stderr_lock;
extern signed int get_status_label(void);
/* work() will be called with a number and work_args. The number will be
* unique and in the range 0..n_tasks. A progress bar will be shown using
* "text" and the progress through the tasks, unless "text" is NULL. */
extern void run_thread_range(int n_tasks, int n_threads, const char *text,
void (*work)(int, void *), void *work_args,
int cpu_num, int cpu_groupsize, int cpu_offset);
/* get_task() will be called every time a worker is idle. It returns either
* NULL, indicating that no further work is available, or a pointer which will
* be passed to work(). Work will stop after 'max' tasks have been processed.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment