cctools
Data Structures | Macros | Variables
work_queue.h File Reference

A master-worker library. More...

#include "timestamp.h"

Go to the source code of this file.

Data Structures

struct  work_queue_task
 A task description. More...
struct  work_queue_stats
 Statistics describing a work queue. More...

Macros

#define WORK_QUEUE_DEFAULT_PORT   9123
 Default Work Queue port number.
#define WORK_QUEUE_RANDOM_PORT   0
 Indicates that any port number may be chosen.
#define WORK_QUEUE_WAITFORTASK   -1
 Wait for a task to complete before returning.
#define WORK_QUEUE_SCHEDULE_FCFS   1
 Select worker on a first-come-first-serve basis.
#define WORK_QUEUE_SCHEDULE_FILES   2
 Select worker that has the most data required by the task.
#define WORK_QUEUE_SCHEDULE_TIME   3
 Select worker that has the fastest execution time on previous tasks.
#define WORK_QUEUE_SCHEDULE_RAND   4
 Select a random worker.
#define WORK_QUEUE_TASK_ORDER_FIFO   0
 Retrieve tasks based on first-in-first-out order.
#define WORK_QUEUE_TASK_ORDER_LIFO   1
 Retrieve tasks based on last-in-first-out order.
#define WORK_QUEUE_INPUT   0
 Specify an input object.
#define WORK_QUEUE_OUTPUT   1
 Specify an output object.
#define WORK_QUEUE_NOCACHE   0
 Do not cache file at execution site.
#define WORK_QUEUE_CACHE   1
 Cache file at execution site for later use.
#define WORK_QUEUE_MASTER_MODE_STANDALONE   0
 Work Queue master does not report to the catalog server.
#define WORK_QUEUE_MASTER_MODE_CATALOG   1
 Work Queue master reports to catalog server.

Functions

Functions - Tasks
struct work_queue_taskwork_queue_task_create (const char *full_command)
 Create a new task object.
void work_queue_task_specify_file (struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags)
 Add a file to a task.
void work_queue_task_specify_buffer (struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags)
 Add an input buffer to a task.
void work_queue_task_specify_file_command (struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags)
void work_queue_task_specify_tag (struct work_queue_task *t, const char *tag)
 Attach a user defined string tag to the task.
void work_queue_task_specify_algorithm (struct work_queue_task *t, int algo)
 Select the scheduling algorithm for a single task.
void work_queue_task_delete (struct work_queue_task *t)
 Delete a task.
Functions - Queues
struct work_queue * work_queue_create (int port)
 Create a new work queue.
int work_queue_submit (struct work_queue *q, struct work_queue_task *t)
 Submit a task to a queue.
struct work_queue_taskwork_queue_wait (struct work_queue *q, int timeout)
 Wait for a task to complete.
int work_queue_hungry (struct work_queue *q)
 Determine whether the queue is 'hungry' for more tasks.
int work_queue_empty (struct work_queue *q)
 Determine whether the queue is empty.
int work_queue_port (struct work_queue *q)
 Get the listening port of the queue.
const char * work_queue_name (struct work_queue *q)
 Get the project name of the queue.
void work_queue_get_stats (struct work_queue *q, struct work_queue_stats *s)
 Get queue statistics.
char * work_queue_get_worker_summary (struct work_queue *q)
 Summarize workers.
int work_queue_activate_fast_abort (struct work_queue *q, double multiplier)
 Turn on or off fast abort functionality for a given queue.
void work_queue_specify_algorithm (struct work_queue *q, int algo)
 Change the worker selection algorithm.
void work_queue_specify_task_order (struct work_queue *q, int order)
 Specify how the submitted tasks should be ordered.
void work_queue_specify_name (struct work_queue *q, const char *name)
 Change the project name for a given queue.
void work_queue_specify_priority (struct work_queue *q, int priority)
 Change the priority for a given queue.
void work_queue_specify_master_mode (struct work_queue *q, int mode)
 Specify the master mode for a given queue.
struct work_queue_taskwork_queue_cancel_by_taskid (struct work_queue *q, int id)
 Cancel a submitted task using its task id and remove it from queue.
struct work_queue_taskwork_queue_cancel_by_tasktag (struct work_queue *q, const char *tag)
 Cancel a submitted task using its tag and remove it from queue.
int work_queue_shut_down_workers (struct work_queue *q, int n)
 Shut down workers connected to the work_queue system.
void work_queue_delete (struct work_queue *q)
 Delete a work queue.
Functions - Deprecated
void work_queue_task_specify_input_buf (struct work_queue_task *t, const char *buf, int length, const char *rname)
 Add an input buffer to a task.
void work_queue_task_specify_input_file (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task.
void work_queue_task_specify_input_file_do_not_cache (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task, without caching.
void work_queue_task_specify_output_file (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task.
void work_queue_task_specify_output_file_do_not_cache (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task without caching.

Variables

double wq_option_fast_abort_multiplier
 Initial setting for fast abort multiplier upon creating queue.
int wq_option_scheduler
 Initial setting for algorithm to assign tasks to workers upon creating queue .

Detailed Description

A master-worker library.

The work queue provides an implementation of the master-worker computing model using TCP sockets, Unix applications, and files as intermediate buffers. A master process uses work_queue_create to create a queue, then work_queue_submit to submit tasks. Once tasks are running, call work_queue_wait to wait for completion. The generic worker program can be run on any machine, and simply needs to be told the host and port of the master.


Macro Definition Documentation

#define WORK_QUEUE_DEFAULT_PORT   9123

Default Work Queue port number.

#define WORK_QUEUE_RANDOM_PORT   0

Indicates that any port number may be chosen.

#define WORK_QUEUE_WAITFORTASK   -1

Wait for a task to complete before returning.

#define WORK_QUEUE_SCHEDULE_FCFS   1

Select worker on a first-come-first-serve basis.

#define WORK_QUEUE_SCHEDULE_FILES   2

Select worker that has the most data required by the task.

#define WORK_QUEUE_SCHEDULE_TIME   3

Select worker that has the fastest execution time on previous tasks.

#define WORK_QUEUE_SCHEDULE_RAND   4

Select a random worker.

#define WORK_QUEUE_TASK_ORDER_FIFO   0

Retrieve tasks based on first-in-first-out order.

#define WORK_QUEUE_TASK_ORDER_LIFO   1

Retrieve tasks based on last-in-first-out order.

#define WORK_QUEUE_INPUT   0

Specify an input object.

#define WORK_QUEUE_OUTPUT   1

Specify an output object.

#define WORK_QUEUE_NOCACHE   0

Do not cache file at execution site.

#define WORK_QUEUE_CACHE   1

Cache file at execution site for later use.

#define WORK_QUEUE_MASTER_MODE_STANDALONE   0

Work Queue master does not report to the catalog server.

#define WORK_QUEUE_MASTER_MODE_CATALOG   1

Work Queue master reports to catalog server.


Function Documentation

struct work_queue_task* work_queue_task_create ( const char *  full_command)
read

Create a new task object.

Once created and elaborated with functions such as work_queue_task_specify_file and work_queue_task_specify_buffer, the task should be passed to work_queue_submit.

Parameters:
full_commandThe shell command line to be executed by the task.
Returns:
A new task object.

Referenced by work_queue.Task::__init__().

void work_queue_task_specify_file ( struct work_queue_task t,
const char *  local_name,
const char *  remote_name,
int  type,
int  flags 
)

Add a file to a task.

Parameters:
tA task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the remote execution site.
typeMust be one of the following values:
flagsMay be zero to indicate no special handling or any of the following or'd together:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.

Referenced by work_queue.Task::specify_file().

void work_queue_task_specify_buffer ( struct work_queue_task t,
const char *  data,
int  length,
const char *  remote_name,
int  flags 
)

Add an input buffer to a task.

Parameters:
tA task object.
dataThe data to be passed as an input file.
lengthThe length of the buffer, in bytes
remote_nameThe name of the remote file to create.
flagsMay be zero to indicate no special handling or any of the following or'd together:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.

Referenced by work_queue.Task::specify_buffer().

void work_queue_task_specify_tag ( struct work_queue_task t,
const char *  tag 
)

Attach a user defined string tag to the task.

This field is not interpreted by the work queue, but is provided for the user's convenience in identifying tasks when they complete.

Parameters:
tA task object.
tagThe tag to attach to task t.

Referenced by work_queue.Task::specify_tag().

void work_queue_task_specify_algorithm ( struct work_queue_task t,
int  algo 
)

Select the scheduling algorithm for a single task.

To change the scheduling algorithm for all tasks, use work_queue_specify_algorithm instead.

Parameters:
tA task object.
algoThe algorithm to use in assigning this task to a worker:

Referenced by work_queue.Task::specify_algorithm().

void work_queue_task_delete ( struct work_queue_task t)

Delete a task.

This may be called on tasks after they are returned from work_queue_wait.

Parameters:
tThe task to delete.

Referenced by work_queue.Task::__init__().

struct work_queue* work_queue_create ( int  port)
read

Create a new work queue.

Users may modify the behavior of work_queue_create by setting the following environmental variables before calling the function:

  • WORK_QUEUE_PORT: This sets the default port of the queue (if unset, the default is 9123).
  • WORK_QUEUE_LOW_PORT: If the user requests a random port, then this sets the first port number in the scan range (if unset, the default is 1024).
  • WORK_QUEUE_HIGH_PORT: If the user requests a random port, then this sets the last port number in the scan range (if unset, the default is 32767).
  • WORK_QUEUE_NAME: This sets the project name of the queue, which is reported to a catalog server (by default this is unset).
  • WORK_QUEUE_PRIORITY: This sets the priority of the queue, which is used by workers to sort masters such that higher priority masters will be served first (if unset, the default is 10).

If the queue has a project name, then queue statistics and information will be reported to a catalog server. To specify the catalog server, the user may set the CATALOG_HOST and CATALOG_PORT environmental variables as described in catalog_query_create.

Parameters:
portThe port number to listen on. If zero is specified, then the default is chosen, and if -1 is specified, a random port is chosen.
Returns:
A new work queue, or null if it could not be created.

Referenced by work_queue.WorkQueue::__init__().

int work_queue_submit ( struct work_queue *  q,
struct work_queue_task t 
)

Submit a task to a queue.

Once a task is submitted to a queue, it is not longer under the user's control and should not be inspected until returned via work_queue_wait. Once returned, it is safe to re-submit the same take object via work_queue_submit.

Parameters:
qA work queue object.
tA task object returned from work_queue_task_create.
Returns:
An integer taskid assigned to the submitted task.

Referenced by work_queue.WorkQueue::submit().

struct work_queue_task* work_queue_wait ( struct work_queue *  q,
int  timeout 
)
read

Wait for a task to complete.

This call will block until either a task has completed, the timeout has expired, or the queue is empty. If a task has completed, the corresponding task object will be returned by this function. The caller may examine the task and then dispose of it using work_queue_task_delete.

If the task ran to completion, then the result field will be zero and the return_status field will contain the Unix exit code of the task. If the task could not, then the result field will be non-zero and the return_status field will be undefined.

Parameters:
qA work queue object.
timeoutThe number of seconds to wait for a completed task before returning. Use an integer time to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed.
Returns:
A completed task description, or null if the queue is empty or the timeout was reached without a completed task.

Referenced by work_queue.WorkQueue::wait().

int work_queue_hungry ( struct work_queue *  q)

Determine whether the queue is 'hungry' for more tasks.

While the Work Queue can handle a very large number of tasks, it runs most efficiently when the number of tasks is slightly larger than the number of active workers. This function gives the user of a flexible application a hint about whether it would be better to submit more tasks via work_queue_submit or wait for some to complete via work_queue_wait.

Parameters:
qA work queue object.
Returns:
The number of additional tasks that can be efficiently submitted, or zero if the queue has enough to work with right now.

Referenced by work_queue.WorkQueue::hungry().

int work_queue_empty ( struct work_queue *  q)

Determine whether the queue is empty.

When all of the desired tasks have been submitted to the queue, the user should continue to call work_queue_wait until this function returns true.

Parameters:
qA work queue object.
Returns:
True if the queue is completely empty, false otherwise.

Referenced by work_queue.WorkQueue::empty().

int work_queue_port ( struct work_queue *  q)

Get the listening port of the queue.

As noted in work_queue_create, there are many controls that affect what TCP port the queue will listen on. Rather than assuming a specific port, the user should simply call this function to determine what port was selected.

Parameters:
qA work queue object.
Returns:
The port the queue is listening on.

Referenced by work_queue.WorkQueue::__init__().

const char* work_queue_name ( struct work_queue *  q)

Get the project name of the queue.

Parameters:
qA work queue object.
Returns:
The project name of the queue.

Referenced by work_queue.WorkQueue::__init__().

void work_queue_get_stats ( struct work_queue *  q,
struct work_queue_stats s 
)

Get queue statistics.

Parameters:
qA work queue object.
sA pointer to a buffer that will be filed with statistics.

Referenced by work_queue.WorkQueue::__init__().

char* work_queue_get_worker_summary ( struct work_queue *  q)

Summarize workers.

This function summarizes the workers currently connected to the master, indicating how many from each worker pool are attached.

Parameters:
qA work queue object.
Returns:
A newly allocated string describing the distribution of workers by pool. The caller must release this string via free().
int work_queue_activate_fast_abort ( struct work_queue *  q,
double  multiplier 
)

Turn on or off fast abort functionality for a given queue.

Parameters:
qA work queue object.
multiplierThe multiplier of the average task time at which point to abort; if negative (and by default) fast_abort is deactivated.
Returns:
0 if activated or deactivated with an appropriate multiplier, 1 if deactivated due to inappropriate multiplier.

Referenced by work_queue.WorkQueue::activate_fast_abort().

void work_queue_specify_algorithm ( struct work_queue *  q,
int  algo 
)

Change the worker selection algorithm.

Note that this function controls which worker will be selected for a given task while work_queue_specify_task_order controls which task will be executed next.

Parameters:
qA work queue object.
algoThe algorithm to use in assigning a task to a worker:

Referenced by work_queue.WorkQueue::specify_algorithm().

void work_queue_specify_task_order ( struct work_queue *  q,
int  order 
)

Specify how the submitted tasks should be ordered.

Note that this function controls which task to execute next, while work_queue_specify_algorithm controls which worker it should be assigned to.

Parameters:
qA work queue object.
orderThe ordering to use for dispatching submitted tasks:

Referenced by work_queue.WorkQueue::specify_task_order().

void work_queue_specify_name ( struct work_queue *  q,
const char *  name 
)

Change the project name for a given queue.

Parameters:
qA work queue object.
nameThe new project name.

Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_name().

void work_queue_specify_priority ( struct work_queue *  q,
int  priority 
)

Change the priority for a given queue.

Parameters:
qA work queue object.
priorityThe new priority of the queue. Higher priority masters will attract workers first.

Referenced by work_queue.WorkQueue::specify_priority().

void work_queue_specify_master_mode ( struct work_queue *  q,
int  mode 
)

Specify the master mode for a given queue.

Parameters:
qA work queue object.
mode

Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_master_mode().

struct work_queue_task* work_queue_cancel_by_taskid ( struct work_queue *  q,
int  id 
)
read

Cancel a submitted task using its task id and remove it from queue.

Parameters:
qA work queue object.
idThe taskid returned from work_queue_submit.
Returns:
The task description of the cancelled task, or null if the task was not found in queue. The returned task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.

Referenced by work_queue.WorkQueue::cancel_by_taskid().

struct work_queue_task* work_queue_cancel_by_tasktag ( struct work_queue *  q,
const char *  tag 
)
read

Cancel a submitted task using its tag and remove it from queue.

Parameters:
qA work queue object.
tagThe tag name assigned to task using work_queue_task_specify_tag.
Returns:
The task description of the cancelled task, or null if the task was not found in queue. The returned task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.

Referenced by work_queue.WorkQueue::cancel_by_tasktag().

int work_queue_shut_down_workers ( struct work_queue *  q,
int  n 
)

Shut down workers connected to the work_queue system.

Gives a best effort and then returns the number of workers given the shut down order.

Parameters:
qA work queue object.
nThe number to shut down. All workers if given "0".

Referenced by work_queue.WorkQueue::shutdown_workers().

void work_queue_delete ( struct work_queue *  q)

Delete a work queue.

This function should only be called after work_queue_empty returns true.

Parameters:
qA work queue to delete.

Referenced by work_queue.WorkQueue::__init__().

void work_queue_task_specify_input_buf ( struct work_queue_task t,
const char *  buf,
int  length,
const char *  rname 
)

Add an input buffer to a task.

Parameters:
tThe task to which to add parameters
bufA pointer to the data buffer to send to the worker to be available to the commands.
lengthThe number of bytes of data in the buffer
rnameThe name of the file in which to store the buffer data on the worker
Deprecated:
Use work_queue_task_specify_buffer instead.
void work_queue_task_specify_input_file ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task.

Parameters:
tThe task to which to add parameters
fnameThe name of the data file to send to the worker to be available to the commands.
rnameThe name of the file in which to store the buffer data on the worker.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_input_file_do_not_cache ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task, without caching.

Parameters:
tThe task to which to add parameters
fnameThe name of the data file to send to the worker to be available to the commands.
rnameThe name of the file in which to store the buffer data on the worker.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_output_file ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task.

Parameters:
tThe task to which to add parameters
rnameThe name of a file created by the program when it runs.
fnameThe name of the file local target for copying rname back.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_output_file_do_not_cache ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task without caching.

Parameters:
tThe task to which to add parameters
rnameThe name of a file created by the program when it runs.
fnameThe name of the file local target for copying rname back.
Deprecated:
See work_queue_task_specify_file instead.

Variable Documentation

double wq_option_fast_abort_multiplier

Initial setting for fast abort multiplier upon creating queue.

Turned off if less than 0. Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.

int wq_option_scheduler

Initial setting for algorithm to assign tasks to workers upon creating queue .

Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.