OpenDNSSEC-signer  1.3.8
/build/buildd/opendnssec-1.3.8/signer/src/daemon/worker.c
Go to the documentation of this file.
00001 /*
00002  * $Id: worker.c 6290 2012-04-25 07:42:07Z matthijs $
00003  *
00004  * Copyright (c) 2009 NLNet Labs. All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00016  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00017  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
00019  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00020  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
00021  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00023  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00024  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
00025  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  *
00027  */
00028 
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/hsm.h"
00041 #include "shared/locks.h"
00042 #include "shared/log.h"
00043 #include "shared/status.h"
00044 #include "shared/util.h"
00045 #include "signer/tools.h"
00046 #include "signer/zone.h"
00047 #include "signer/zonedata.h"
00048 
00049 #include <time.h> /* time() */
00050 
00051 ods_lookup_table worker_str[] = {
00052     { WORKER_WORKER, "worker" },
00053     { WORKER_DRUDGER, "drudger" },
00054     { 0, NULL }
00055 };
00056 
00057 
00062 worker_type*
00063 worker_create(allocator_type* allocator, int num, worker_id type)
00064 {
00065     worker_type* worker;
00066 
00067     if (!allocator) {
00068         return NULL;
00069     }
00070     ods_log_assert(allocator);
00071 
00072     worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00073     if (!worker) {
00074         return NULL;
00075     }
00076 
00077     ods_log_debug("create worker[%i]", num +1);
00078     lock_basic_init(&worker->worker_lock);
00079     lock_basic_set(&worker->worker_alarm);
00080     lock_basic_lock(&worker->worker_lock);
00081     worker->allocator = allocator;
00082     worker->thread_num = num +1;
00083     worker->engine = NULL;
00084     worker->task = NULL;
00085     worker->working_with = TASK_NONE;
00086     worker->need_to_exit = 0;
00087     worker->type = type;
00088     worker->clock_in = 0;
00089     worker->jobs_appointed = 0;
00090     worker->jobs_completed = 0;
00091     worker->jobs_failed = 0;
00092     worker->sleeping = 0;
00093     worker->waiting = 0;
00094     lock_basic_unlock(&worker->worker_lock);
00095     return worker;
00096 }
00097 
00098 
00103 static const char*
00104 worker2str(worker_id type)
00105 {
00106     ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00107     if (lt) {
00108         return lt->name;
00109     }
00110     return NULL;
00111 }
00112 
00113 
00118 static int
00119 worker_fulfilled(worker_type* worker)
00120 {
00121     return (worker->jobs_completed + worker->jobs_failed) ==
00122         worker->jobs_appointed;
00123 }
00124 
00125 
00130 static void
00131 worker_perform_task(worker_type* worker)
00132 {
00133     engine_type* engine = NULL;
00134     zone_type* zone = NULL;
00135     task_type* task = NULL;
00136     task_id what = TASK_NONE;
00137     time_t when = 0;
00138     time_t never = (3600*24*365);
00139     ods_status status = ODS_STATUS_OK;
00140     int fallthrough = 0;
00141     int backup = 0;
00142     char* working_dir = NULL;
00143     char* cfg_filename = NULL;
00144     uint32_t tmpserial = 0;
00145     time_t start = 0;
00146     time_t end = 0;
00147 
00148     /* sanity checking */
00149     if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00150         return;
00151     }
00152     ods_log_assert(worker);
00153     ods_log_assert(worker->task);
00154     ods_log_assert(worker->task->zone);
00155 
00156     engine = (engine_type*) worker->engine;
00157     task = (task_type*) worker->task;
00158     zone = (zone_type*) worker->task->zone;
00159     ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00160        worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00161        task_who2str(task->who), (uint32_t) worker->clock_in);
00162 
00163     /* do what you have been told to do */
00164     switch (task->what) {
00165         case TASK_SIGNCONF:
00166             worker->working_with = TASK_SIGNCONF;
00167             /* perform 'load signconf' task */
00168             ods_log_verbose("[%s[%i]] load signconf for zone %s",
00169                 worker2str(worker->type), worker->thread_num,
00170                 task_who2str(task->who));
00171             status = zone_load_signconf(zone, &what);
00172             if (status == ODS_STATUS_UNCHANGED) {
00173                 if (!zone->signconf->last_modified) {
00174                     ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
00175                         worker2str(worker->type), worker->thread_num,
00176                         task_who2str(task->who));
00177                     status = ODS_STATUS_ERR;
00178                 }
00179             }
00180 
00181             /* what to do next */
00182             when = time_now();
00183             if (status == ODS_STATUS_UNCHANGED) {
00184                 if (task->halted != TASK_NONE) {
00185                     goto task_perform_continue;
00186                 } else {
00187                     status = ODS_STATUS_OK;
00188                 }
00189             }
00190 
00191             if (status == ODS_STATUS_OK) {
00196                 lhsm_check_connection((void*)engine);
00197                 status = zone_publish_dnskeys(zone, 0);
00198             }
00199             if (status == ODS_STATUS_OK) {
00200                 status = zone_prepare_nsec3(zone, 0);
00201             }
00202             if (status == ODS_STATUS_OK) {
00203                 status = zonedata_commit(zone->zonedata);
00204             }
00205 
00206             if (status == ODS_STATUS_OK) {
00207                 zone->prepared = 1;
00208                 task->interrupt = TASK_NONE;
00209                 task->halted = TASK_NONE;
00210             } else {
00211                 if (task->halted == TASK_NONE) {
00212                     goto task_perform_fail;
00213                 }
00214                 goto task_perform_continue;
00215             }
00216             fallthrough = 0;
00217             break;
00218         case TASK_READ:
00219             worker->working_with = TASK_READ;
00220             /* perform 'read input adapter' task */
00221             ods_log_verbose("[%s[%i]] read zone %s",
00222                 worker2str(worker->type), worker->thread_num,
00223                 task_who2str(task->who));
00224             if (!zone->prepared) {
00225                 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
00226                     worker2str(worker->type), worker->thread_num,
00227                     task_who2str(task->who));
00228                 status = ODS_STATUS_ERR;
00229             } else {
00230                 status = tools_input(zone);
00231             }
00232 
00233             /* what to do next */
00234             what = TASK_NSECIFY;
00235             when = time_now();
00236             if (status != ODS_STATUS_OK) {
00237                 if (task->halted == TASK_NONE) {
00238                     goto task_perform_fail;
00239                 }
00240                 goto task_perform_continue;
00241             }
00242             fallthrough = 1;
00243         case TASK_NSECIFY:
00244             worker->working_with = TASK_NSECIFY;
00245             ods_log_verbose("[%s[%i]] nsecify zone %s",
00246                 worker2str(worker->type), worker->thread_num,
00247                 task_who2str(task->who));
00248             status = tools_nsecify(zone);
00249 
00250             /* what to do next */
00251             what = TASK_SIGN;
00252             when = time_now();
00253             if (status == ODS_STATUS_OK) {
00254                 if (task->interrupt > TASK_SIGNCONF) {
00255                     task->interrupt = TASK_NONE;
00256                     task->halted = TASK_NONE;
00257                 }
00258             } else {
00259                 if (task->halted == TASK_NONE) {
00260                     goto task_perform_fail;
00261                 }
00262                 goto task_perform_continue;
00263             }
00264             fallthrough = 1;
00265         case TASK_SIGN:
00266             worker->working_with = TASK_SIGN;
00267             ods_log_verbose("[%s[%i]] sign zone %s",
00268                 worker2str(worker->type), worker->thread_num,
00269                 task_who2str(task->who));
00270             tmpserial = zone->zonedata->internal_serial;
00271             status = zone_update_serial(zone);
00272             if (status != ODS_STATUS_OK) {
00273                 ods_log_error("[%s[%i]] unable to sign zone %s: "
00274                     "failed to increment serial",
00275                     worker2str(worker->type), worker->thread_num,
00276                     task_who2str(task->who));
00277             } else {
00278                 /* start timer */
00279                 start = time(NULL);
00280                 if (zone->stats) {
00281                     lock_basic_lock(&zone->stats->stats_lock);
00282                     if (!zone->stats->start_time) {
00283                         zone->stats->start_time = start;
00284                     }
00285                     zone->stats->sig_count = 0;
00286                     zone->stats->sig_soa_count = 0;
00287                     zone->stats->sig_reuse = 0;
00288                     zone->stats->sig_time = 0;
00289                     lock_basic_unlock(&zone->stats->stats_lock);
00290                 }
00291                 /* check the HSM connection before queuing sign operations */
00292                 lhsm_check_connection((void*)engine);
00293                 /* queue menial, hard signing work */
00294                 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00295                 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00296                     "signing zone %s, %u signatures queued",
00297                     worker2str(worker->type), worker->thread_num,
00298                     task_who2str(task->who), worker->jobs_appointed);
00299                 /* sleep until work is done */
00300                 if (!worker->need_to_exit) {
00301                     worker_sleep_unless(worker, 0);
00302                 }
00303                 if (worker->jobs_failed) {
00304                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00305                         "signatures failed", worker2str(worker->type),
00306                         worker->thread_num, task_who2str(task->who),
00307                         worker->jobs_failed, worker->jobs_appointed);
00308                     status = ODS_STATUS_ERR;
00309                 } else if (!worker_fulfilled(worker)) {
00310                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00311                         "signatures completed", worker2str(worker->type),
00312                         worker->thread_num, task_who2str(task->who),
00313                         worker->jobs_completed, worker->jobs_appointed);
00314                     status = ODS_STATUS_ERR;
00315                 } else if (worker->need_to_exit) {
00316                     ods_log_debug("[%s[%i]] sign zone %s failed: worker "
00317                         "needs to exit", worker2str(worker->type),
00318                         worker->thread_num, task_who2str(task->who));
00319                     status = ODS_STATUS_ERR;
00320                 } else {
00321                     ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
00322                         "signatures succeeded", worker2str(worker->type),
00323                         worker->thread_num, task_who2str(task->who),
00324                         worker->jobs_completed, worker->jobs_appointed);
00325                     ods_log_assert(worker->jobs_appointed ==
00326                         worker->jobs_completed);
00327                 }
00328                 worker->jobs_appointed = 0;
00329                 worker->jobs_completed = 0;
00330                 worker->jobs_failed = 0;
00331                 /* stop timer */
00332                 end = time(NULL);
00333                 if (status == ODS_STATUS_OK && zone->stats) {
00334                     lock_basic_lock(&zone->stats->stats_lock);
00335                     zone->stats->sig_time = (end-start);
00336                     lock_basic_unlock(&zone->stats->stats_lock);
00337                 }
00338             }
00339 
00340             /* what to do next */
00341             if (status != ODS_STATUS_OK) {
00342                 /* rollback serial */
00343                 zone->zonedata->internal_serial = tmpserial;
00344                 if (task->halted == TASK_NONE) {
00345                     goto task_perform_fail;
00346                 }
00347                 goto task_perform_continue;
00348             } else {
00349                 if (task->interrupt > TASK_SIGNCONF) {
00350                     task->interrupt = TASK_NONE;
00351                     task->halted = TASK_NONE;
00352                 }
00353             }
00354             what = TASK_AUDIT;
00355             when = time_now();
00356             fallthrough = 1;
00357         case TASK_AUDIT:
00358             worker->working_with = TASK_AUDIT;
00359             if (zone->signconf->audit) {
00360                 ods_log_verbose("[%s[%i]] audit zone %s",
00361                     worker2str(worker->type), worker->thread_num,
00362                     task_who2str(task->who));
00363                 working_dir = strdup(engine->config->working_dir);
00364                 cfg_filename = strdup(engine->config->cfg_filename);
00365                 status = tools_audit(zone, working_dir, cfg_filename);
00366                 if (working_dir)  { free((void*)working_dir); }
00367                 if (cfg_filename) { free((void*)cfg_filename); }
00368                 working_dir = NULL;
00369                 cfg_filename = NULL;
00370             } else {
00371                 status = ODS_STATUS_OK;
00372             }
00373 
00374             /* what to do next */
00375             if (status != ODS_STATUS_OK) {
00376                 if (task->halted == TASK_NONE) {
00377                     goto task_perform_fail;
00378                 }
00379                 goto task_perform_continue;
00380             }
00381             what = TASK_WRITE;
00382             when = time_now();
00383             fallthrough = 1;
00384         case TASK_WRITE:
00385             worker->working_with = TASK_WRITE;
00386             ods_log_verbose("[%s[%i]] write zone %s",
00387                 worker2str(worker->type), worker->thread_num,
00388                 task_who2str(task->who));
00389 
00390             status = tools_output(zone);
00391             zone->processed = 1;
00392 
00393             /* what to do next */
00394             if (status != ODS_STATUS_OK) {
00395                 if (task->halted == TASK_NONE) {
00396                     goto task_perform_fail;
00397                 }
00398                 goto task_perform_continue;
00399             } else {
00400                 if (task->interrupt > TASK_SIGNCONF) {
00401                     task->interrupt = TASK_NONE;
00402                     task->halted = TASK_NONE;
00403                 }
00404             }
00405             if (duration2time(zone->signconf->sig_resign_interval)) {
00406                 what = TASK_SIGN;
00407                 when = time_now() +
00408                     duration2time(zone->signconf->sig_resign_interval);
00409             } else {
00410                 what = TASK_NONE;
00411                 when = time_now() + never;
00412             }
00413             backup = 1;
00414             fallthrough = 0;
00415             break;
00416         case TASK_NONE:
00417             worker->working_with = TASK_NONE;
00418             ods_log_warning("[%s[%i]] none task for zone %s",
00419                 worker2str(worker->type), worker->thread_num,
00420                 task_who2str(task->who));
00421             when = time_now() + never;
00422             fallthrough = 0;
00423             break;
00424         default:
00425             ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00426                 worker2str(worker->type), worker->thread_num,
00427                 task_who2str(task->who));
00428             what = TASK_SIGNCONF;
00429             when = time_now();
00430             fallthrough = 0;
00431             break;
00432     }
00433 
00434     /* no error, reset backoff */
00435     task->backoff = 0;
00436 
00437     /* set next task */
00438     if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00439         task->interrupt != what) {
00440         ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00441             worker2str(worker->type), worker->thread_num,
00442             task_what2str(what), task_who2str(task->who));
00443 
00444         task->what = task->interrupt;
00445         task->when = time_now();
00446         task->halted = what;
00447     } else {
00448         ods_log_debug("[%s[%i]] next task %s for zone %s",
00449             worker2str(worker->type), worker->thread_num,
00450             task_what2str(what), task_who2str(task->who));
00451 
00452         task->what = what;
00453         task->when = when;
00454         if (!fallthrough) {
00455             task->interrupt = TASK_NONE;
00456             task->halted = TASK_NONE;
00457         }
00458     }
00459 
00460     /* backup the last successful run */
00461     if (backup) {
00462         status = zone_backup(zone);
00463         if (status != ODS_STATUS_OK) {
00464             ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00465             worker2str(worker->type), worker->thread_num,
00466             task_who2str(task->who), ods_status2str(status));
00467             /* just a warning */
00468             status = ODS_STATUS_OK;
00469         }
00470         backup = 0;
00471     }
00472     return;
00473 
00474 task_perform_fail:
00475     /* in case of failure, also mark zone processed (for single run usage) */
00476     zone->processed = 1;
00477 
00478     if (task->backoff) {
00479         task->backoff *= 2;
00480         if (task->backoff > ODS_SE_MAX_BACKOFF) {
00481             task->backoff = ODS_SE_MAX_BACKOFF;
00482         }
00483     } else {
00484         task->backoff = 60;
00485     }
00486     ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
00487         worker2str(worker->type), worker->thread_num,
00488         task_what2str(task->what), task_who2str(task->who), task->backoff);
00489 
00490     task->when = time_now() + task->backoff;
00491     return;
00492 
00493 task_perform_continue:
00494     ods_log_info("[%s[%i]] continue task %s for zone %s",
00495         worker2str(worker->type), worker->thread_num,
00496         task_what2str(task->halted), task_who2str(task->who));
00497 
00498     what = task->halted;
00499     task->what = what;
00500     task->when = time_now();
00501     task->interrupt = TASK_NONE;
00502     task->halted = TASK_NONE;
00503     if (zone->processed) {
00504         task->when += duration2time(zone->signconf->sig_resign_interval);
00505     }
00506     return;
00507 }
00508 
00509 
00514 static void
00515 worker_work(worker_type* worker)
00516 {
00517     time_t now, timeout = 1;
00518     zone_type* zone = NULL;
00519 
00520     ods_log_assert(worker);
00521     ods_log_assert(worker->type == WORKER_WORKER);
00522 
00523     while (worker->need_to_exit == 0) {
00524         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00525             worker->thread_num);
00526         lock_basic_lock(&worker->engine->taskq->schedule_lock);
00527         /* [LOCK] schedule */
00528         worker->task = schedule_pop_task(worker->engine->taskq);
00529         /* [UNLOCK] schedule */
00530         if (worker->task) {
00531             worker->working_with = worker->task->what;
00532             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00533 
00534             zone = worker->task->zone;
00535             lock_basic_lock(&zone->zone_lock);
00536             /* [LOCK] zone */
00537             ods_log_debug("[%s[%i]] start working on zone %s",
00538                 worker2str(worker->type), worker->thread_num, zone->name);
00539 
00540             worker->clock_in = time(NULL);
00541             worker_perform_task(worker);
00542 
00543             zone->task = worker->task;
00544 
00545             ods_log_debug("[%s[%i]] finished working on zone %s",
00546                 worker2str(worker->type), worker->thread_num, zone->name);
00547             /* [UNLOCK] zone */
00548 
00549             lock_basic_lock(&worker->engine->taskq->schedule_lock);
00550             /* [LOCK] zone, schedule */
00551             worker->task = NULL;
00552             worker->working_with = TASK_NONE;
00553             (void) schedule_task(worker->engine->taskq, zone->task, 1);
00554             /* [UNLOCK] zone, schedule */
00555             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00556             lock_basic_unlock(&zone->zone_lock);
00557 
00558             timeout = 1;
00559         } else {
00560             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00561                 worker->thread_num);
00562 
00563             /* [LOCK] schedule */
00564             worker->task = schedule_get_first_task(worker->engine->taskq);
00565             /* [UNLOCK] schedule */
00566             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00567 
00568             now = time_now();
00569             if (worker->task && !worker->engine->taskq->loading) {
00570                 timeout = (worker->task->when - now);
00571             } else {
00572                 timeout *= 2;
00573                 if (timeout > ODS_SE_MAX_BACKOFF) {
00574                     timeout = ODS_SE_MAX_BACKOFF;
00575                 }
00576             }
00577             worker->task = NULL;
00578             worker_sleep(worker, timeout);
00579         }
00580     }
00581     /* stop worker, wipe queue */
00582     fifoq_wipe(worker->engine->signq);
00583     return;
00584 }
00585 
00586 
00591 static void
00592 worker_drudge(worker_type* worker)
00593 {
00594     zone_type* zone = NULL;
00595     rrset_type* rrset = NULL;
00596     ods_status status = ODS_STATUS_OK;
00597     worker_type* chief = NULL;
00598     hsm_ctx_t* ctx = NULL;
00599 
00600     ods_log_assert(worker);
00601     ods_log_assert(worker->type == WORKER_DRUDGER);
00602 
00603     ods_log_debug("[%s[%i]] create hsm context",
00604         worker2str(worker->type), worker->thread_num);
00605     ctx = hsm_create_context();
00606     if (!ctx) {
00607         ods_log_crit("[%s[%i]] error creating libhsm context",
00608             worker2str(worker->type), worker->thread_num);
00609     }
00610 
00611     while (worker->need_to_exit == 0) {
00612         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00613             worker->thread_num);
00614         chief = NULL;
00615         zone = NULL;
00616 
00617         lock_basic_lock(&worker->engine->signq->q_lock);
00618         /* [LOCK] schedule */
00619         rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00620         /* [UNLOCK] schedule */
00621         lock_basic_unlock(&worker->engine->signq->q_lock);
00622         if (rrset) {
00623             /* set up the work */
00624             if (chief && chief->task) {
00625                 zone = chief->task->zone;
00626             }
00627             if (!zone) {
00628                 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00629                     worker2str(worker->type), worker->thread_num);
00630             }
00631             if (zone && ctx) {
00632                 ods_log_assert(rrset);
00633                 ods_log_assert(zone->dname);
00634                 ods_log_assert(zone->signconf);
00635 
00636                 worker->clock_in = time(NULL);
00637                 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00638                     chief->clock_in, zone->stats);
00639             } else {
00640                 status = ODS_STATUS_ASSERT_ERR;
00641             }
00642 
00643             if (chief) {
00644                 lock_basic_lock(&chief->worker_lock);
00645                 if (status == ODS_STATUS_OK) {
00646                     chief->jobs_completed += 1;
00647                 } else {
00648                     chief->jobs_failed += 1;
00649                     /* destroy context? */
00650                 }
00651                 lock_basic_unlock(&chief->worker_lock);
00652 
00653                 if (worker_fulfilled(chief) && chief->sleeping) {
00654                     ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
00655                         worker2str(worker->type), worker->thread_num,
00656                         chief->thread_num);
00657                     worker_wakeup(chief);
00658                     chief = NULL;
00659                 }
00660             }
00661             rrset = NULL;
00662         } else {
00663             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00664                 worker->thread_num);
00665 
00666             worker_wait(&worker->engine->signq->q_lock,
00667                 &worker->engine->signq->q_threshold);
00668         }
00669     }
00670     /* stop drudger */
00671 
00672     if (chief && chief->sleeping) {
00673         /* wake up chief */
00674         ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
00675             worker2str(worker->type), worker->thread_num, chief->thread_num);
00676          worker_wakeup(chief);
00677     }
00678     if (ctx) {
00679         /* cleanup open HSM sessions */
00680         ods_log_debug("[%s[%i]] destroy hsm context",
00681             worker2str(worker->type), worker->thread_num);
00682         hsm_destroy_context(ctx);
00683     }
00684     return;
00685 }
00686 
00687 
00692 void
00693 worker_start(worker_type* worker)
00694 {
00695     ods_log_assert(worker);
00696     switch (worker->type) {
00697         case WORKER_DRUDGER:
00698             worker_drudge(worker);
00699             break;
00700         case WORKER_WORKER:
00701             worker_work(worker);
00702             break;
00703         default:
00704             ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00705             return;
00706     }
00707     return;
00708 }
00709 
00710 
00715 void
00716 worker_sleep(worker_type* worker, time_t timeout)
00717 {
00718     ods_log_assert(worker);
00719     lock_basic_lock(&worker->worker_lock);
00720     /* [LOCK] worker */
00721     worker->sleeping = 1;
00722     lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00723         timeout);
00724     /* [UNLOCK] worker */
00725     lock_basic_unlock(&worker->worker_lock);
00726     return;
00727 }
00728 
00729 
00734 void
00735 worker_sleep_unless(worker_type* worker, time_t timeout)
00736 {
00737     ods_log_assert(worker);
00738     lock_basic_lock(&worker->worker_lock);
00739     /* [LOCK] worker */
00740     while (!worker->need_to_exit && !worker_fulfilled(worker)) {
00741         worker->sleeping = 1;
00742         lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00743             timeout);
00744 
00745         ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
00746            "appointed, %u completed, %u failed", worker2str(worker->type),
00747            worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
00748            worker->jobs_failed);
00749     }
00750     /* [UNLOCK] worker */
00751     lock_basic_unlock(&worker->worker_lock);
00752     return;
00753 }
00754 
00755 
00760 void
00761 worker_wakeup(worker_type* worker)
00762 {
00763     ods_log_assert(worker);
00764     if (worker->sleeping) {
00765         ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00766            worker->thread_num);
00767         lock_basic_lock(&worker->worker_lock);
00768         /* [LOCK] worker */
00769         lock_basic_alarm(&worker->worker_alarm);
00770         worker->sleeping = 0;
00771         /* [UNLOCK] worker */
00772         lock_basic_unlock(&worker->worker_lock);
00773     }
00774     return;
00775 }
00776 
00777 
00782 void
00783 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
00784     time_t timeout)
00785 {
00786     lock_basic_lock(lock);
00787     /* [LOCK] worker */
00788     lock_basic_sleep(condition, lock, timeout);
00789     /* [UNLOCK] worker */
00790     lock_basic_unlock(lock);
00791     return;
00792 }
00793 
00794 
00799 void
00800 worker_wait_timeout_locked(lock_basic_type* lock, cond_basic_type* condition,
00801     time_t timeout)
00802 {
00803     lock_basic_sleep(condition, lock, timeout);
00804     return;
00805 }
00806 
00807 
00812 void
00813 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00814 {
00815     worker_wait_timeout(lock, condition, 0);
00816     return;
00817 }
00818 
00819 
00824 void
00825 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00826 {
00827     lock_basic_lock(lock);
00828     /* [LOCK] lock */
00829     lock_basic_alarm(condition);
00830     /* [UNLOCK] lock */
00831     lock_basic_unlock(lock);
00832     return;
00833 }
00834 
00835 
00840 void
00841 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00842 {
00843     lock_basic_lock(lock);
00844     /* [LOCK] lock */
00845     lock_basic_broadcast(condition);
00846     /* [UNLOCK] lock */
00847     lock_basic_unlock(lock);
00848     return;
00849 }
00850 
00851 
00856 void
00857 worker_cleanup(worker_type* worker)
00858 {
00859     allocator_type* allocator;
00860     cond_basic_type worker_cond;
00861     lock_basic_type worker_lock;
00862 
00863     if (!worker) {
00864         return;
00865     }
00866     allocator = worker->allocator;
00867     worker_cond = worker->worker_alarm;
00868     worker_lock = worker->worker_lock;
00869 
00870     allocator_deallocate(allocator, (void*) worker);
00871     lock_basic_destroy(&worker_lock);
00872     lock_basic_off(&worker_cond);
00873     return;
00874 }