28 #include <qb/qbarray.h>
30 struct mainloop_child_s {
53 crm_trigger_prepare(GSource * source, gint * timeout)
80 crm_trigger_check(GSource * source)
88 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
97 trig->trigger = FALSE;
100 rc = callback(trig->user_data);
102 crm_trace(
"Trigger handler %p not yet complete", trig);
103 trig->running = TRUE;
111 crm_trigger_finalize(GSource * source)
113 crm_trace(
"Trigger %p destroyed", source);
119 gpointer callback_data;
120 GSourceCallbackFuncs *callback_funcs;
122 const GSourceFuncs *source_funcs;
125 GMainContext *context;
142 g_source_refcount(GSource * source)
146 struct _GSourceCopy *evil = (
struct _GSourceCopy*)source;
147 return evil->ref_count;
152 static int g_source_refcount(GSource * source)
158 static GSourceFuncs crm_trigger_funcs = {
161 crm_trigger_dispatch,
162 crm_trigger_finalize,
166 mainloop_setup_trigger(GSource * source,
int priority,
int (*dispatch) (gpointer user_data),
174 trigger->trigger = FALSE;
175 trigger->user_data = userdata;
178 g_source_set_callback(source, dispatch, trigger, NULL);
181 g_source_set_priority(source, priority);
182 g_source_set_can_recurse(source, FALSE);
184 crm_trace(
"Setup %p with ref-count=%u", source, g_source_refcount(source));
185 trigger->id = g_source_attach(source, NULL);
186 crm_trace(
"Attached %p with ref-count=%u", source, g_source_refcount(source));
194 crm_trace(
"Trigger handler %p complete", trig);
195 trig->running = FALSE;
206 GSource *source = NULL;
209 source = g_source_new(&crm_trigger_funcs,
sizeof(
crm_trigger_t));
212 return mainloop_setup_trigger(source, priority, dispatch, userdata);
219 source->trigger = TRUE;
232 gs = (GSource *)source;
234 if(g_source_refcount(gs) > 2) {
235 crm_info(
"Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
238 g_source_destroy(gs);
253 typedef struct signal_s {
255 void (*handler) (
int sig);
274 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
278 if(sig->signal != SIGCHLD) {
280 strsignal(sig->signal), sig->signal,
281 (sig->handler?
"invoking" :
"no"));
284 sig->trigger.trigger = FALSE;
286 sig->handler(sig->signal);
301 mainloop_signal_handler(
int sig)
303 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
309 static GSourceFuncs crm_signal_funcs = {
313 crm_trigger_finalize,
333 struct sigaction old;
335 if (sigemptyset(&mask) < 0) {
336 crm_err(
"Could not set handler for signal %d: %s",
341 memset(&sa, 0,
sizeof(
struct sigaction));
342 sa.sa_handler = dispatch;
343 sa.sa_flags = SA_RESTART;
346 if (sigaction(sig, &sa, &old) < 0) {
347 crm_err(
"Could not set handler for signal %d: %s",
351 return old.sa_handler;
365 mainloop_destroy_signal_entry(
int sig)
369 crm_signals[sig] = NULL;
389 GSource *source = NULL;
390 int priority = G_PRIORITY_HIGH - 1;
392 if (sig == SIGTERM) {
400 if (sig >= NSIG || sig < 0) {
401 crm_err(
"Signal %d is out of range", sig);
404 }
else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
405 crm_trace(
"Signal handler for %d is already installed", sig);
408 }
else if (crm_signals[sig] != NULL) {
409 crm_err(
"Different signal handler for %d is already installed", sig);
414 source = g_source_new(&crm_signal_funcs,
sizeof(
crm_signal_t));
416 crm_signals[sig] = (
crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
419 crm_signals[sig]->handler = dispatch;
420 crm_signals[sig]->signal = sig;
423 mainloop_destroy_signal_entry(sig);
432 if (siginterrupt(sig, 1) < 0) {
433 crm_perror(LOG_INFO,
"Could not enable system call interruptions for signal %d", sig);
443 if (sig >= NSIG || sig < 0) {
444 crm_err(
"Signal %d is out of range", sig);
448 crm_perror(LOG_ERR,
"Could not uninstall signal handler for signal %d", sig);
451 }
else if (crm_signals[sig] == NULL) {
454 mainloop_destroy_signal_entry(sig);
458 static qb_array_t *gio_map = NULL;
464 qb_array_free(gio_map);
467 for (
int sig = 0; sig < NSIG; ++sig) {
468 mainloop_destroy_signal_entry(sig);
475 struct gio_to_qb_poll {
480 qb_ipcs_dispatch_fn_t fn;
481 enum qb_loop_priority p;
485 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer
data)
487 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)
data;
488 gint fd = g_io_channel_unix_get_fd(gio);
496 return (adaptor->fn(fd, condition, adaptor->data) == 0);
500 gio_poll_destroy(gpointer
data)
502 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)
data;
507 if (adaptor->is_used == 0) {
508 crm_trace(
"Marking adaptor %p unused", adaptor);
522 conv_prio_libqb2glib(
enum qb_loop_priority prio)
524 gint ret = G_PRIORITY_DEFAULT;
527 ret = G_PRIORITY_LOW;
530 ret = G_PRIORITY_HIGH;
533 crm_trace(
"Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
550 static enum qb_ipcs_rate_limit
551 conv_libqb_prio2ratelimit(
enum qb_loop_priority prio)
554 enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
557 ret = QB_IPCS_RATE_SLOW;
560 ret = QB_IPCS_RATE_FAST;
563 crm_trace(
"Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
573 gio_poll_dispatch_update(
enum qb_loop_priority p, int32_t fd, int32_t evts,
574 void *
data, qb_ipcs_dispatch_fn_t fn, int32_t add)
576 struct gio_to_qb_poll *adaptor;
580 res = qb_array_index(gio_map, fd, (
void **)&adaptor);
582 crm_err(
"Array lookup failed for fd=%d: %d", fd, res);
586 crm_trace(
"Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
588 if (add && adaptor->source) {
589 crm_err(
"Adaptor for descriptor %d is still in-use", fd);
592 if (!add && !adaptor->is_used) {
593 crm_err(
"Adaptor for descriptor %d is not in-use", fd);
598 channel = g_io_channel_unix_new(fd);
600 crm_err(
"No memory left to add fd=%d", fd);
604 if (adaptor->source) {
605 g_source_remove(adaptor->source);
610 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
613 adaptor->events = evts;
614 adaptor->data =
data;
618 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
619 gio_read_socket, adaptor, gio_poll_destroy);
630 g_io_channel_unref(channel);
632 crm_trace(
"Added to mainloop with gsource id=%d", adaptor->source);
633 if (adaptor->source > 0) {
641 gio_poll_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t evts,
642 void *
data, qb_ipcs_dispatch_fn_t fn)
644 return gio_poll_dispatch_update(p, fd, evts,
data, fn, QB_TRUE);
648 gio_poll_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t evts,
649 void *
data, qb_ipcs_dispatch_fn_t fn)
651 return gio_poll_dispatch_update(p, fd, evts,
data, fn, QB_FALSE);
655 gio_poll_dispatch_del(int32_t fd)
657 struct gio_to_qb_poll *adaptor;
660 if (qb_array_index(gio_map, fd, (
void **)&adaptor) == 0) {
661 if (adaptor->source) {
662 g_source_remove(adaptor->source);
671 .dispatch_add = gio_poll_dispatch_add,
672 .dispatch_mod = gio_poll_dispatch_mod,
673 .dispatch_del = gio_poll_dispatch_del,
676 static enum qb_ipc_type
677 pick_ipc_type(
enum qb_ipc_type requested)
679 const char *env = getenv(
"PCMK_ipc_type");
681 if (env && strcmp(
"shared-mem", env) == 0) {
683 }
else if (env && strcmp(
"socket", env) == 0) {
684 return QB_IPC_SOCKET;
685 }
else if (env && strcmp(
"posix", env) == 0) {
686 return QB_IPC_POSIX_MQ;
687 }
else if (env && strcmp(
"sysv", env) == 0) {
688 return QB_IPC_SYSV_MQ;
689 }
else if (requested == QB_IPC_NATIVE) {
703 struct qb_ipcs_service_handlers *callbacks)
710 struct qb_ipcs_service_handlers *callbacks,
711 enum qb_loop_priority prio)
714 qb_ipcs_service_t *server = NULL;
716 if (gio_map == NULL) {
717 gio_map = qb_array_create_2(64,
sizeof(
struct gio_to_qb_poll), 1);
721 server = qb_ipcs_create(name, 0, pick_ipc_type(
type), callbacks);
723 if (server == NULL) {
728 if (prio != QB_LOOP_MED) {
729 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
732 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
739 rc = qb_ipcs_run(server);
752 qb_ipcs_destroy(server);
756 struct mainloop_io_s {
765 int (*dispatch_fn_ipc) (
const char *buffer, ssize_t length, gpointer userdata);
766 int (*dispatch_fn_io) (gpointer userdata);
767 void (*destroy_fn) (gpointer userdata);
772 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer
data)
774 gboolean keep = TRUE;
777 CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
779 if (condition & G_IO_IN) {
787 crm_trace(
"Message acquisition from %s[%p] failed: %s (%ld)",
790 }
else if (client->dispatch_fn_ipc) {
793 crm_trace(
"New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
794 if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
795 crm_trace(
"Connection to %s no longer required", client->name);
800 }
while (keep && rc > 0 && --max > 0);
803 crm_trace(
"New message from %s[%p] %u", client->name, client, condition);
804 if (client->dispatch_fn_io) {
805 if (client->dispatch_fn_io(client->userdata) < 0) {
806 crm_trace(
"Connection to %s no longer required", client->name);
814 crm_err(
"Connection to %s closed " CRM_XS "client=%p condition=%d",
815 client->name, client, condition);
818 }
else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
819 crm_trace(
"The connection %s[%p] has been closed (I/O condition=%d)",
820 client->name, client, condition);
823 }
else if ((condition & G_IO_IN) == 0) {
851 crm_err(
"Strange condition: %d", condition);
861 mainloop_gio_destroy(gpointer c)
864 char *c_name = strdup(client->name);
869 crm_trace(
"Destroying client %s[%p]", c_name, c);
875 if (client->destroy_fn) {
876 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
878 client->destroy_fn = NULL;
879 destroy_fn(client->userdata);
889 crm_trace(
"Destroyed client %s[%p]", c_name, c);
891 free(client->name); client->name = NULL;
910 if (client == NULL) {
920 client->destroy_fn = callbacks->
destroy;
921 client->dispatch_fn_ipc = callbacks->
dispatch;
948 if (client == NULL) {
951 client->name = strdup(name);
952 client->userdata = userdata;
955 client->destroy_fn = callbacks->
destroy;
956 client->dispatch_fn_io = callbacks->
dispatch;
960 client->channel = g_io_channel_unix_new(fd);
962 g_io_add_watch_full(client->channel, priority,
963 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
964 client, mainloop_gio_destroy);
975 g_io_channel_unref(client->channel);
976 crm_trace(
"Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
987 if (client != NULL) {
988 crm_trace(
"Removing client %s[%p]", client->name, client);
989 if (client->source) {
993 g_source_remove(client->source);
1015 return child->timeout;
1021 return child->privatedata;
1027 child->privatedata = NULL;
1034 if (child->timerid != 0) {
1035 crm_trace(
"Removing timer %d", child->timerid);
1036 g_source_remove(child->timerid);
1049 crm_debug(
"Kill pid %d only. leave group intact.", child->pid);
1050 rc = kill(child->pid, SIGKILL);
1052 crm_debug(
"Kill pid %d's group", child->pid);
1053 rc = kill(-child->pid, SIGKILL);
1057 if (errno != ESRCH) {
1058 crm_perror(LOG_ERR,
"kill(%d, KILL) failed", child->pid);
1066 child_timeout_callback(gpointer p)
1072 if (child->timeout) {
1073 crm_crit(
"%s process (PID %d) will not die!", child->desc, (
int)child->pid);
1077 rc = child_kill_helper(child);
1083 child->timeout = TRUE;
1084 crm_warn(
"%s process (PID %d) timed out", child->desc, (
int)child->pid);
1086 child->timerid = g_timeout_add(5000, child_timeout_callback, child);
1098 bool callback_needed =
true;
1100 rc = waitpid(child->pid, &status,
flags);
1102 crm_trace(
"Child process %d (%s) still active",
1103 child->pid, child->desc);
1104 callback_needed =
false;
1106 }
else if (rc != child->pid) {
1118 crm_notice(
"Wait for child process %d (%s) interrupted: %s",
1121 }
else if (WIFEXITED(status)) {
1122 exitcode = WEXITSTATUS(status);
1123 crm_trace(
"Child process %d (%s) exited with status %d",
1124 child->pid, child->desc, exitcode);
1126 }
else if (WIFSIGNALED(status)) {
1127 signo = WTERMSIG(status);
1128 crm_trace(
"Child process %d (%s) exited with signal %d (%s)",
1129 child->pid, child->desc, signo, strsignal(signo));
1131 #ifdef WCOREDUMP // AIX, SunOS, maybe others
1132 }
else if (WCOREDUMP(status)) {
1134 crm_err(
"Child process %d (%s) dumped core",
1135 child->pid, child->desc);
1139 crm_trace(
"Child process %d (%s) stopped or continued",
1140 child->pid, child->desc);
1141 callback_needed =
false;
1144 if (callback_needed && child->callback) {
1145 child->callback(child, child->pid, core, signo, exitcode);
1147 return callback_needed;
1151 child_death_dispatch(
int signal)
1153 for (GList *iter = child_list; iter; ) {
1154 GList *saved = iter;
1158 if (child_waitpid(child, WNOHANG)) {
1159 crm_trace(
"Removing completed process %d from child list",
1161 child_list = g_list_remove_link(child_list, saved);
1169 child_signal_init(gpointer p)
1176 child_death_dispatch(SIGCHLD);
1188 int waitflags = 0, rc = 0;
1190 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1192 if (
pid == child->pid) {
1197 if (match == NULL) {
1201 rc = child_kill_helper(match);
1208 crm_trace(
"Waiting for signal that child process %d completed",
1212 }
else if(rc != 0) {
1216 waitflags = WNOHANG;
1219 if (!child_waitpid(match, waitflags)) {
1224 child_list = g_list_remove(child_list, match);
1240 static bool need_init = TRUE;
1245 child->timeout = FALSE;
1246 child->privatedata = privatedata;
1247 child->callback = callback;
1248 child->flags =
flags;
1251 child->desc = strdup(desc);
1255 child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1258 child_list = g_list_append(child_list, child);
1266 g_timeout_add(1, child_signal_init, NULL);
1277 struct mainloop_timer_s {
1286 static gboolean mainloop_timer_cb(gpointer user_data)
1289 bool repeat = FALSE;
1290 struct mainloop_timer_s *t = user_data;
1300 crm_trace(
"Invoking callbacks for timer %s", t->name);
1302 if(t->cb(t->userdata) == FALSE) {
1303 crm_trace(
"Timer %s complete", t->name);
1318 if(t && t->id != 0) {
1327 if(t && t->period_ms > 0) {
1328 crm_trace(
"Starting timer %s", t->name);
1329 t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1335 if(t && t->id != 0) {
1336 crm_trace(
"Stopping timer %s", t->name);
1337 g_source_remove(t->id);
1347 last = t->period_ms;
1348 t->period_ms = period_ms;
1351 if(t && t->id != 0 && last != t->period_ms) {
1370 t->period_ms = period_ms;
1373 t->userdata = userdata;
1374 crm_trace(
"Created timer %s with %p %p", t->name, userdata, t->userdata);
1383 crm_trace(
"Destroying timer %s", t->name);
1395 drain_timeout_cb(gpointer user_data)
1397 bool *timeout_popped = (
bool*) user_data;
1399 *timeout_popped = TRUE;
1418 bool timeout_popped = FALSE;
1420 GMainContext *ctx = NULL;
1424 ctx = g_main_loop_get_context(mloop);
1426 time_t start_time = time(NULL);
1428 timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1429 while (!timeout_popped
1430 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1431 g_main_context_iteration(ctx, TRUE);
1434 if (!timeout_popped && (timer > 0)) {
1435 g_source_remove(timer);