OpenDNSSEC-signer
1.3.8
|
00001 /* 00002 * $Id: worker.c 6290 2012-04-25 07:42:07Z matthijs $ 00003 * 00004 * Copyright (c) 2009 NLNet Labs. All rights reserved. 00005 * 00006 * Redistribution and use in source and binary forms, with or without 00007 * modification, are permitted provided that the following conditions 00008 * are met: 00009 * 1. Redistributions of source code must retain the above copyright 00010 * notice, this list of conditions and the following disclaimer. 00011 * 2. Redistributions in binary form must reproduce the above copyright 00012 * notice, this list of conditions and the following disclaimer in the 00013 * documentation and/or other materials provided with the distribution. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 00016 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00017 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 00019 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00020 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 00021 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00023 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 00024 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 00025 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 * 00027 */ 00028 00034 #include "adapter/adapi.h" 00035 #include "daemon/engine.h" 00036 #include "daemon/worker.h" 00037 #include "shared/allocator.h" 00038 #include "scheduler/schedule.h" 00039 #include "scheduler/task.h" 00040 #include "shared/hsm.h" 00041 #include "shared/locks.h" 00042 #include "shared/log.h" 00043 #include "shared/status.h" 00044 #include "shared/util.h" 00045 #include "signer/tools.h" 00046 #include "signer/zone.h" 00047 #include "signer/zonedata.h" 00048 00049 #include <time.h> /* time() */ 00050 00051 ods_lookup_table worker_str[] = { 00052 { WORKER_WORKER, "worker" }, 00053 { WORKER_DRUDGER, "drudger" }, 00054 { 0, NULL } 00055 }; 00056 00057 00062 worker_type* 00063 worker_create(allocator_type* allocator, int num, worker_id type) 00064 { 00065 worker_type* worker; 00066 00067 if (!allocator) { 00068 return NULL; 00069 } 00070 ods_log_assert(allocator); 00071 00072 worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type)); 00073 if (!worker) { 00074 return NULL; 00075 } 00076 00077 ods_log_debug("create worker[%i]", num +1); 00078 lock_basic_init(&worker->worker_lock); 00079 lock_basic_set(&worker->worker_alarm); 00080 lock_basic_lock(&worker->worker_lock); 00081 worker->allocator = allocator; 00082 worker->thread_num = num +1; 00083 worker->engine = NULL; 00084 worker->task = NULL; 00085 worker->working_with = TASK_NONE; 00086 worker->need_to_exit = 0; 00087 worker->type = type; 00088 worker->clock_in = 0; 00089 worker->jobs_appointed = 0; 00090 worker->jobs_completed = 0; 00091 worker->jobs_failed = 0; 00092 worker->sleeping = 0; 00093 worker->waiting = 0; 00094 lock_basic_unlock(&worker->worker_lock); 00095 return worker; 00096 } 00097 00098 00103 static const char* 00104 worker2str(worker_id type) 00105 { 00106 ods_lookup_table *lt = ods_lookup_by_id(worker_str, type); 00107 if (lt) { 00108 return lt->name; 00109 } 00110 return NULL; 00111 } 00112 00113 00118 static int 00119 worker_fulfilled(worker_type* worker) 00120 { 00121 return (worker->jobs_completed + worker->jobs_failed) == 00122 worker->jobs_appointed; 00123 } 00124 00125 00130 static void 00131 worker_perform_task(worker_type* worker) 00132 { 00133 engine_type* engine = NULL; 00134 zone_type* zone = NULL; 00135 task_type* task = NULL; 00136 task_id what = TASK_NONE; 00137 time_t when = 0; 00138 time_t never = (3600*24*365); 00139 ods_status status = ODS_STATUS_OK; 00140 int fallthrough = 0; 00141 int backup = 0; 00142 char* working_dir = NULL; 00143 char* cfg_filename = NULL; 00144 uint32_t tmpserial = 0; 00145 time_t start = 0; 00146 time_t end = 0; 00147 00148 /* sanity checking */ 00149 if (!worker || !worker->task || !worker->task->zone || !worker->engine) { 00150 return; 00151 } 00152 ods_log_assert(worker); 00153 ods_log_assert(worker->task); 00154 ods_log_assert(worker->task->zone); 00155 00156 engine = (engine_type*) worker->engine; 00157 task = (task_type*) worker->task; 00158 zone = (zone_type*) worker->task->zone; 00159 ods_log_debug("[%s[%i]] perform task %s for zone %s at %u", 00160 worker2str(worker->type), worker->thread_num, task_what2str(task->what), 00161 task_who2str(task->who), (uint32_t) worker->clock_in); 00162 00163 /* do what you have been told to do */ 00164 switch (task->what) { 00165 case TASK_SIGNCONF: 00166 worker->working_with = TASK_SIGNCONF; 00167 /* perform 'load signconf' task */ 00168 ods_log_verbose("[%s[%i]] load signconf for zone %s", 00169 worker2str(worker->type), worker->thread_num, 00170 task_who2str(task->who)); 00171 status = zone_load_signconf(zone, &what); 00172 if (status == ODS_STATUS_UNCHANGED) { 00173 if (!zone->signconf->last_modified) { 00174 ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet", 00175 worker2str(worker->type), worker->thread_num, 00176 task_who2str(task->who)); 00177 status = ODS_STATUS_ERR; 00178 } 00179 } 00180 00181 /* what to do next */ 00182 when = time_now(); 00183 if (status == ODS_STATUS_UNCHANGED) { 00184 if (task->halted != TASK_NONE) { 00185 goto task_perform_continue; 00186 } else { 00187 status = ODS_STATUS_OK; 00188 } 00189 } 00190 00191 if (status == ODS_STATUS_OK) { 00196 lhsm_check_connection((void*)engine); 00197 status = zone_publish_dnskeys(zone, 0); 00198 } 00199 if (status == ODS_STATUS_OK) { 00200 status = zone_prepare_nsec3(zone, 0); 00201 } 00202 if (status == ODS_STATUS_OK) { 00203 status = zonedata_commit(zone->zonedata); 00204 } 00205 00206 if (status == ODS_STATUS_OK) { 00207 zone->prepared = 1; 00208 task->interrupt = TASK_NONE; 00209 task->halted = TASK_NONE; 00210 } else { 00211 if (task->halted == TASK_NONE) { 00212 goto task_perform_fail; 00213 } 00214 goto task_perform_continue; 00215 } 00216 fallthrough = 0; 00217 break; 00218 case TASK_READ: 00219 worker->working_with = TASK_READ; 00220 /* perform 'read input adapter' task */ 00221 ods_log_verbose("[%s[%i]] read zone %s", 00222 worker2str(worker->type), worker->thread_num, 00223 task_who2str(task->who)); 00224 if (!zone->prepared) { 00225 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet", 00226 worker2str(worker->type), worker->thread_num, 00227 task_who2str(task->who)); 00228 status = ODS_STATUS_ERR; 00229 } else { 00230 status = tools_input(zone); 00231 } 00232 00233 /* what to do next */ 00234 what = TASK_NSECIFY; 00235 when = time_now(); 00236 if (status != ODS_STATUS_OK) { 00237 if (task->halted == TASK_NONE) { 00238 goto task_perform_fail; 00239 } 00240 goto task_perform_continue; 00241 } 00242 fallthrough = 1; 00243 case TASK_NSECIFY: 00244 worker->working_with = TASK_NSECIFY; 00245 ods_log_verbose("[%s[%i]] nsecify zone %s", 00246 worker2str(worker->type), worker->thread_num, 00247 task_who2str(task->who)); 00248 status = tools_nsecify(zone); 00249 00250 /* what to do next */ 00251 what = TASK_SIGN; 00252 when = time_now(); 00253 if (status == ODS_STATUS_OK) { 00254 if (task->interrupt > TASK_SIGNCONF) { 00255 task->interrupt = TASK_NONE; 00256 task->halted = TASK_NONE; 00257 } 00258 } else { 00259 if (task->halted == TASK_NONE) { 00260 goto task_perform_fail; 00261 } 00262 goto task_perform_continue; 00263 } 00264 fallthrough = 1; 00265 case TASK_SIGN: 00266 worker->working_with = TASK_SIGN; 00267 ods_log_verbose("[%s[%i]] sign zone %s", 00268 worker2str(worker->type), worker->thread_num, 00269 task_who2str(task->who)); 00270 tmpserial = zone->zonedata->internal_serial; 00271 status = zone_update_serial(zone); 00272 if (status != ODS_STATUS_OK) { 00273 ods_log_error("[%s[%i]] unable to sign zone %s: " 00274 "failed to increment serial", 00275 worker2str(worker->type), worker->thread_num, 00276 task_who2str(task->who)); 00277 } else { 00278 /* start timer */ 00279 start = time(NULL); 00280 if (zone->stats) { 00281 lock_basic_lock(&zone->stats->stats_lock); 00282 if (!zone->stats->start_time) { 00283 zone->stats->start_time = start; 00284 } 00285 zone->stats->sig_count = 0; 00286 zone->stats->sig_soa_count = 0; 00287 zone->stats->sig_reuse = 0; 00288 zone->stats->sig_time = 0; 00289 lock_basic_unlock(&zone->stats->stats_lock); 00290 } 00291 /* check the HSM connection before queuing sign operations */ 00292 lhsm_check_connection((void*)engine); 00293 /* queue menial, hard signing work */ 00294 status = zonedata_queue(zone->zonedata, engine->signq, worker); 00295 ods_log_debug("[%s[%i]] wait until drudgers are finished " 00296 "signing zone %s, %u signatures queued", 00297 worker2str(worker->type), worker->thread_num, 00298 task_who2str(task->who), worker->jobs_appointed); 00299 /* sleep until work is done */ 00300 if (!worker->need_to_exit) { 00301 worker_sleep_unless(worker, 0); 00302 } 00303 if (worker->jobs_failed) { 00304 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00305 "signatures failed", worker2str(worker->type), 00306 worker->thread_num, task_who2str(task->who), 00307 worker->jobs_failed, worker->jobs_appointed); 00308 status = ODS_STATUS_ERR; 00309 } else if (!worker_fulfilled(worker)) { 00310 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00311 "signatures completed", worker2str(worker->type), 00312 worker->thread_num, task_who2str(task->who), 00313 worker->jobs_completed, worker->jobs_appointed); 00314 status = ODS_STATUS_ERR; 00315 } else if (worker->need_to_exit) { 00316 ods_log_debug("[%s[%i]] sign zone %s failed: worker " 00317 "needs to exit", worker2str(worker->type), 00318 worker->thread_num, task_who2str(task->who)); 00319 status = ODS_STATUS_ERR; 00320 } else { 00321 ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u " 00322 "signatures succeeded", worker2str(worker->type), 00323 worker->thread_num, task_who2str(task->who), 00324 worker->jobs_completed, worker->jobs_appointed); 00325 ods_log_assert(worker->jobs_appointed == 00326 worker->jobs_completed); 00327 } 00328 worker->jobs_appointed = 0; 00329 worker->jobs_completed = 0; 00330 worker->jobs_failed = 0; 00331 /* stop timer */ 00332 end = time(NULL); 00333 if (status == ODS_STATUS_OK && zone->stats) { 00334 lock_basic_lock(&zone->stats->stats_lock); 00335 zone->stats->sig_time = (end-start); 00336 lock_basic_unlock(&zone->stats->stats_lock); 00337 } 00338 } 00339 00340 /* what to do next */ 00341 if (status != ODS_STATUS_OK) { 00342 /* rollback serial */ 00343 zone->zonedata->internal_serial = tmpserial; 00344 if (task->halted == TASK_NONE) { 00345 goto task_perform_fail; 00346 } 00347 goto task_perform_continue; 00348 } else { 00349 if (task->interrupt > TASK_SIGNCONF) { 00350 task->interrupt = TASK_NONE; 00351 task->halted = TASK_NONE; 00352 } 00353 } 00354 what = TASK_AUDIT; 00355 when = time_now(); 00356 fallthrough = 1; 00357 case TASK_AUDIT: 00358 worker->working_with = TASK_AUDIT; 00359 if (zone->signconf->audit) { 00360 ods_log_verbose("[%s[%i]] audit zone %s", 00361 worker2str(worker->type), worker->thread_num, 00362 task_who2str(task->who)); 00363 working_dir = strdup(engine->config->working_dir); 00364 cfg_filename = strdup(engine->config->cfg_filename); 00365 status = tools_audit(zone, working_dir, cfg_filename); 00366 if (working_dir) { free((void*)working_dir); } 00367 if (cfg_filename) { free((void*)cfg_filename); } 00368 working_dir = NULL; 00369 cfg_filename = NULL; 00370 } else { 00371 status = ODS_STATUS_OK; 00372 } 00373 00374 /* what to do next */ 00375 if (status != ODS_STATUS_OK) { 00376 if (task->halted == TASK_NONE) { 00377 goto task_perform_fail; 00378 } 00379 goto task_perform_continue; 00380 } 00381 what = TASK_WRITE; 00382 when = time_now(); 00383 fallthrough = 1; 00384 case TASK_WRITE: 00385 worker->working_with = TASK_WRITE; 00386 ods_log_verbose("[%s[%i]] write zone %s", 00387 worker2str(worker->type), worker->thread_num, 00388 task_who2str(task->who)); 00389 00390 status = tools_output(zone); 00391 zone->processed = 1; 00392 00393 /* what to do next */ 00394 if (status != ODS_STATUS_OK) { 00395 if (task->halted == TASK_NONE) { 00396 goto task_perform_fail; 00397 } 00398 goto task_perform_continue; 00399 } else { 00400 if (task->interrupt > TASK_SIGNCONF) { 00401 task->interrupt = TASK_NONE; 00402 task->halted = TASK_NONE; 00403 } 00404 } 00405 if (duration2time(zone->signconf->sig_resign_interval)) { 00406 what = TASK_SIGN; 00407 when = time_now() + 00408 duration2time(zone->signconf->sig_resign_interval); 00409 } else { 00410 what = TASK_NONE; 00411 when = time_now() + never; 00412 } 00413 backup = 1; 00414 fallthrough = 0; 00415 break; 00416 case TASK_NONE: 00417 worker->working_with = TASK_NONE; 00418 ods_log_warning("[%s[%i]] none task for zone %s", 00419 worker2str(worker->type), worker->thread_num, 00420 task_who2str(task->who)); 00421 when = time_now() + never; 00422 fallthrough = 0; 00423 break; 00424 default: 00425 ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s", 00426 worker2str(worker->type), worker->thread_num, 00427 task_who2str(task->who)); 00428 what = TASK_SIGNCONF; 00429 when = time_now(); 00430 fallthrough = 0; 00431 break; 00432 } 00433 00434 /* no error, reset backoff */ 00435 task->backoff = 0; 00436 00437 /* set next task */ 00438 if (fallthrough == 0 && task->interrupt != TASK_NONE && 00439 task->interrupt != what) { 00440 ods_log_debug("[%s[%i]] interrupt task %s for zone %s", 00441 worker2str(worker->type), worker->thread_num, 00442 task_what2str(what), task_who2str(task->who)); 00443 00444 task->what = task->interrupt; 00445 task->when = time_now(); 00446 task->halted = what; 00447 } else { 00448 ods_log_debug("[%s[%i]] next task %s for zone %s", 00449 worker2str(worker->type), worker->thread_num, 00450 task_what2str(what), task_who2str(task->who)); 00451 00452 task->what = what; 00453 task->when = when; 00454 if (!fallthrough) { 00455 task->interrupt = TASK_NONE; 00456 task->halted = TASK_NONE; 00457 } 00458 } 00459 00460 /* backup the last successful run */ 00461 if (backup) { 00462 status = zone_backup(zone); 00463 if (status != ODS_STATUS_OK) { 00464 ods_log_warning("[%s[%i]] unable to backup zone %s: %s", 00465 worker2str(worker->type), worker->thread_num, 00466 task_who2str(task->who), ods_status2str(status)); 00467 /* just a warning */ 00468 status = ODS_STATUS_OK; 00469 } 00470 backup = 0; 00471 } 00472 return; 00473 00474 task_perform_fail: 00475 /* in case of failure, also mark zone processed (for single run usage) */ 00476 zone->processed = 1; 00477 00478 if (task->backoff) { 00479 task->backoff *= 2; 00480 if (task->backoff > ODS_SE_MAX_BACKOFF) { 00481 task->backoff = ODS_SE_MAX_BACKOFF; 00482 } 00483 } else { 00484 task->backoff = 60; 00485 } 00486 ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds", 00487 worker2str(worker->type), worker->thread_num, 00488 task_what2str(task->what), task_who2str(task->who), task->backoff); 00489 00490 task->when = time_now() + task->backoff; 00491 return; 00492 00493 task_perform_continue: 00494 ods_log_info("[%s[%i]] continue task %s for zone %s", 00495 worker2str(worker->type), worker->thread_num, 00496 task_what2str(task->halted), task_who2str(task->who)); 00497 00498 what = task->halted; 00499 task->what = what; 00500 task->when = time_now(); 00501 task->interrupt = TASK_NONE; 00502 task->halted = TASK_NONE; 00503 if (zone->processed) { 00504 task->when += duration2time(zone->signconf->sig_resign_interval); 00505 } 00506 return; 00507 } 00508 00509 00514 static void 00515 worker_work(worker_type* worker) 00516 { 00517 time_t now, timeout = 1; 00518 zone_type* zone = NULL; 00519 00520 ods_log_assert(worker); 00521 ods_log_assert(worker->type == WORKER_WORKER); 00522 00523 while (worker->need_to_exit == 0) { 00524 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00525 worker->thread_num); 00526 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00527 /* [LOCK] schedule */ 00528 worker->task = schedule_pop_task(worker->engine->taskq); 00529 /* [UNLOCK] schedule */ 00530 if (worker->task) { 00531 worker->working_with = worker->task->what; 00532 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00533 00534 zone = worker->task->zone; 00535 lock_basic_lock(&zone->zone_lock); 00536 /* [LOCK] zone */ 00537 ods_log_debug("[%s[%i]] start working on zone %s", 00538 worker2str(worker->type), worker->thread_num, zone->name); 00539 00540 worker->clock_in = time(NULL); 00541 worker_perform_task(worker); 00542 00543 zone->task = worker->task; 00544 00545 ods_log_debug("[%s[%i]] finished working on zone %s", 00546 worker2str(worker->type), worker->thread_num, zone->name); 00547 /* [UNLOCK] zone */ 00548 00549 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00550 /* [LOCK] zone, schedule */ 00551 worker->task = NULL; 00552 worker->working_with = TASK_NONE; 00553 (void) schedule_task(worker->engine->taskq, zone->task, 1); 00554 /* [UNLOCK] zone, schedule */ 00555 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00556 lock_basic_unlock(&zone->zone_lock); 00557 00558 timeout = 1; 00559 } else { 00560 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00561 worker->thread_num); 00562 00563 /* [LOCK] schedule */ 00564 worker->task = schedule_get_first_task(worker->engine->taskq); 00565 /* [UNLOCK] schedule */ 00566 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00567 00568 now = time_now(); 00569 if (worker->task && !worker->engine->taskq->loading) { 00570 timeout = (worker->task->when - now); 00571 } else { 00572 timeout *= 2; 00573 if (timeout > ODS_SE_MAX_BACKOFF) { 00574 timeout = ODS_SE_MAX_BACKOFF; 00575 } 00576 } 00577 worker->task = NULL; 00578 worker_sleep(worker, timeout); 00579 } 00580 } 00581 /* stop worker, wipe queue */ 00582 fifoq_wipe(worker->engine->signq); 00583 return; 00584 } 00585 00586 00591 static void 00592 worker_drudge(worker_type* worker) 00593 { 00594 zone_type* zone = NULL; 00595 rrset_type* rrset = NULL; 00596 ods_status status = ODS_STATUS_OK; 00597 worker_type* chief = NULL; 00598 hsm_ctx_t* ctx = NULL; 00599 00600 ods_log_assert(worker); 00601 ods_log_assert(worker->type == WORKER_DRUDGER); 00602 00603 ods_log_debug("[%s[%i]] create hsm context", 00604 worker2str(worker->type), worker->thread_num); 00605 ctx = hsm_create_context(); 00606 if (!ctx) { 00607 ods_log_crit("[%s[%i]] error creating libhsm context", 00608 worker2str(worker->type), worker->thread_num); 00609 } 00610 00611 while (worker->need_to_exit == 0) { 00612 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00613 worker->thread_num); 00614 chief = NULL; 00615 zone = NULL; 00616 00617 lock_basic_lock(&worker->engine->signq->q_lock); 00618 /* [LOCK] schedule */ 00619 rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief); 00620 /* [UNLOCK] schedule */ 00621 lock_basic_unlock(&worker->engine->signq->q_lock); 00622 if (rrset) { 00623 /* set up the work */ 00624 if (chief && chief->task) { 00625 zone = chief->task->zone; 00626 } 00627 if (!zone) { 00628 ods_log_error("[%s[%i]] unable to drudge: no zone reference", 00629 worker2str(worker->type), worker->thread_num); 00630 } 00631 if (zone && ctx) { 00632 ods_log_assert(rrset); 00633 ods_log_assert(zone->dname); 00634 ods_log_assert(zone->signconf); 00635 00636 worker->clock_in = time(NULL); 00637 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf, 00638 chief->clock_in, zone->stats); 00639 } else { 00640 status = ODS_STATUS_ASSERT_ERR; 00641 } 00642 00643 if (chief) { 00644 lock_basic_lock(&chief->worker_lock); 00645 if (status == ODS_STATUS_OK) { 00646 chief->jobs_completed += 1; 00647 } else { 00648 chief->jobs_failed += 1; 00649 /* destroy context? */ 00650 } 00651 lock_basic_unlock(&chief->worker_lock); 00652 00653 if (worker_fulfilled(chief) && chief->sleeping) { 00654 ods_log_debug("[%s[%i]] wake up chief[%u], work is done", 00655 worker2str(worker->type), worker->thread_num, 00656 chief->thread_num); 00657 worker_wakeup(chief); 00658 chief = NULL; 00659 } 00660 } 00661 rrset = NULL; 00662 } else { 00663 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00664 worker->thread_num); 00665 00666 worker_wait(&worker->engine->signq->q_lock, 00667 &worker->engine->signq->q_threshold); 00668 } 00669 } 00670 /* stop drudger */ 00671 00672 if (chief && chief->sleeping) { 00673 /* wake up chief */ 00674 ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting", 00675 worker2str(worker->type), worker->thread_num, chief->thread_num); 00676 worker_wakeup(chief); 00677 } 00678 if (ctx) { 00679 /* cleanup open HSM sessions */ 00680 ods_log_debug("[%s[%i]] destroy hsm context", 00681 worker2str(worker->type), worker->thread_num); 00682 hsm_destroy_context(ctx); 00683 } 00684 return; 00685 } 00686 00687 00692 void 00693 worker_start(worker_type* worker) 00694 { 00695 ods_log_assert(worker); 00696 switch (worker->type) { 00697 case WORKER_DRUDGER: 00698 worker_drudge(worker); 00699 break; 00700 case WORKER_WORKER: 00701 worker_work(worker); 00702 break; 00703 default: 00704 ods_log_error("[worker] illegal worker (id=%i)", worker->type); 00705 return; 00706 } 00707 return; 00708 } 00709 00710 00715 void 00716 worker_sleep(worker_type* worker, time_t timeout) 00717 { 00718 ods_log_assert(worker); 00719 lock_basic_lock(&worker->worker_lock); 00720 /* [LOCK] worker */ 00721 worker->sleeping = 1; 00722 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00723 timeout); 00724 /* [UNLOCK] worker */ 00725 lock_basic_unlock(&worker->worker_lock); 00726 return; 00727 } 00728 00729 00734 void 00735 worker_sleep_unless(worker_type* worker, time_t timeout) 00736 { 00737 ods_log_assert(worker); 00738 lock_basic_lock(&worker->worker_lock); 00739 /* [LOCK] worker */ 00740 while (!worker->need_to_exit && !worker_fulfilled(worker)) { 00741 worker->sleeping = 1; 00742 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00743 timeout); 00744 00745 ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u " 00746 "appointed, %u completed, %u failed", worker2str(worker->type), 00747 worker->thread_num, worker->jobs_appointed, worker->jobs_completed, 00748 worker->jobs_failed); 00749 } 00750 /* [UNLOCK] worker */ 00751 lock_basic_unlock(&worker->worker_lock); 00752 return; 00753 } 00754 00755 00760 void 00761 worker_wakeup(worker_type* worker) 00762 { 00763 ods_log_assert(worker); 00764 if (worker->sleeping) { 00765 ods_log_debug("[%s[%i]] wake up", worker2str(worker->type), 00766 worker->thread_num); 00767 lock_basic_lock(&worker->worker_lock); 00768 /* [LOCK] worker */ 00769 lock_basic_alarm(&worker->worker_alarm); 00770 worker->sleeping = 0; 00771 /* [UNLOCK] worker */ 00772 lock_basic_unlock(&worker->worker_lock); 00773 } 00774 return; 00775 } 00776 00777 00782 void 00783 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition, 00784 time_t timeout) 00785 { 00786 lock_basic_lock(lock); 00787 /* [LOCK] worker */ 00788 lock_basic_sleep(condition, lock, timeout); 00789 /* [UNLOCK] worker */ 00790 lock_basic_unlock(lock); 00791 return; 00792 } 00793 00794 00799 void 00800 worker_wait_timeout_locked(lock_basic_type* lock, cond_basic_type* condition, 00801 time_t timeout) 00802 { 00803 lock_basic_sleep(condition, lock, timeout); 00804 return; 00805 } 00806 00807 00812 void 00813 worker_wait(lock_basic_type* lock, cond_basic_type* condition) 00814 { 00815 worker_wait_timeout(lock, condition, 0); 00816 return; 00817 } 00818 00819 00824 void 00825 worker_notify(lock_basic_type* lock, cond_basic_type* condition) 00826 { 00827 lock_basic_lock(lock); 00828 /* [LOCK] lock */ 00829 lock_basic_alarm(condition); 00830 /* [UNLOCK] lock */ 00831 lock_basic_unlock(lock); 00832 return; 00833 } 00834 00835 00840 void 00841 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition) 00842 { 00843 lock_basic_lock(lock); 00844 /* [LOCK] lock */ 00845 lock_basic_broadcast(condition); 00846 /* [UNLOCK] lock */ 00847 lock_basic_unlock(lock); 00848 return; 00849 } 00850 00851 00856 void 00857 worker_cleanup(worker_type* worker) 00858 { 00859 allocator_type* allocator; 00860 cond_basic_type worker_cond; 00861 lock_basic_type worker_lock; 00862 00863 if (!worker) { 00864 return; 00865 } 00866 allocator = worker->allocator; 00867 worker_cond = worker->worker_alarm; 00868 worker_lock = worker->worker_lock; 00869 00870 allocator_deallocate(allocator, (void*) worker); 00871 lock_basic_destroy(&worker_lock); 00872 lock_basic_off(&worker_cond); 00873 return; 00874 }