Sometimes one need a worker thread which executes tasks not just after they were added, but after some delay. Tasks can be pushed out of order. The code below uses GAsyncQueue to both passing tasks to worker thread and waiting for next event (new task or timer expire). Internal GQueue is used to keep tasks sorted by time they should be run at.
#include <glib.h>
#include <stdio.h>
#include <stdlib.h>
struct task_s {
struct timespec when;
int number_to_print;
};
gint
compare_func(gconstpointer a, gconstpointer b, gpointer user_data)
{
const struct task_s *task_a = a;
const struct task_s *task_b = b;
if (task_a->when.tv_sec < task_b->when.tv_sec)
return -1;
else if (task_a->when.tv_sec > task_b->when.tv_sec)
return 1;
else if (task_a->when.tv_nsec < task_b->when.tv_nsec)
return -1;
else if (task_a->when.tv_nsec > task_b->when.tv_nsec)
return 1;
else
return 0;
}
gpointer
event_handler_thread(gpointer data)
{
struct timespec now;
GAsyncQueue *async_q = data;
GQueue *int_q = g_queue_new();
while (1) {
struct task_s *task = g_queue_peek_head(int_q);
gint64 timeout;
if (task) {
clock_gettime(CLOCK_REALTIME, &now);
timeout = (task->when.tv_sec - now.tv_sec) * 1000 * 1000 +
(task->when.tv_nsec - now.tv_nsec) / 1000;
if (timeout <= 0) {
// remove task from queue
g_queue_pop_head(int_q);
// run task
printf("now = %d.%03d, number = %d\n", (int)now.tv_sec,
(int)(now.tv_nsec/(1000*1000)), task->number_to_print);
if (task->number_to_print == 999) {
free(task);
break;
}
free(task);
// go to start
continue;
}
}
task = g_async_queue_timeout_pop(async_q, timeout);
if (task)
g_queue_insert_sorted(int_q, task, compare_func, NULL);
}
g_queue_free(int_q);
return NULL;
}
void
push_work(GAsyncQueue *aq, int delay_ms, int number_to_print)
{
delay_ms = delay_ms < 0 ? 0 : delay_ms;
struct timespec now = { 0 };
clock_gettime(CLOCK_REALTIME, &now);
struct timespec then = now;
then.tv_sec += delay_ms / 1000;
then.tv_nsec += (delay_ms % 1000) * 1000 * 1000;
while (then.tv_nsec >= 1000 * 1000 * 1000) {
then.tv_sec += 1;
then.tv_nsec -= 1000 * 1000 * 1000;
}
struct task_s *task = malloc(sizeof(*task));
task->when = then;
task->number_to_print = number_to_print;
g_async_queue_push(aq, task);
}
int
main(void)
{
GAsyncQueue *aq = g_async_queue_new();
GThread *thread = g_thread_new("event_handler_thread", event_handler_thread, aq);
push_work(aq, 2500, 999);
push_work(aq, 0, 1);
push_work(aq, 1000, 2);
push_work(aq, -10, 3);
push_work(aq, 500, 4);
push_work(aq, 300, 5);
push_work(aq, 600, 6);
g_thread_join(thread);
g_async_queue_unref(aq);
return 0;
}