понедельник, 19 мая 2014 г.

Timed event queue

Sometimes one need a worker thread which executes tasks not just after they were added, but after some delay. Tasks can be pushed out of order. The code below uses GAsyncQueue to both passing tasks to worker thread and waiting for next event (new task or timer expire). Internal GQueue is used to keep tasks sorted by time they should be run at.



#include <glib.h>
#include <stdio.h>
#include <stdlib.h>
 
struct task_s {
    struct timespec when;
    int number_to_print;
};
 
 
gint
compare_func(gconstpointer a, gconstpointer b, gpointer user_data)
{
    const struct task_s *task_a = a;
    const struct task_s *task_b = b;
 
    if (task_a->when.tv_sec < task_b->when.tv_sec)
        return -1;
    else if (task_a->when.tv_sec > task_b->when.tv_sec)
        return 1;
    else if (task_a->when.tv_nsec < task_b->when.tv_nsec)
        return -1;
    else if (task_a->when.tv_nsec > task_b->when.tv_nsec)
        return 1;
    else
        return 0;
}
 
gpointer
event_handler_thread(gpointer data)
{
    struct timespec now;
    GAsyncQueue *async_q = data;
    GQueue *int_q = g_queue_new();
 
    while (1) {
        struct task_s *task = g_queue_peek_head(int_q);
        gint64 timeout;
        if (task) {
            clock_gettime(CLOCK_REALTIME, &now);
            timeout = (task->when.tv_sec - now.tv_sec) * 1000 * 1000 +
                      (task->when.tv_nsec - now.tv_nsec) / 1000;
            if (timeout <= 0) {
                // remove task from queue
                g_queue_pop_head(int_q);
                // run task
                printf("now = %d.%03d, number = %d\n", (int)now.tv_sec,
                       (int)(now.tv_nsec/(1000*1000)), task->number_to_print);
                if (task->number_to_print == 999) {
                    free(task);
                    break;
                }
                free(task);
                // go to start
                continue;
            }
        }
 
        task = g_async_queue_timeout_pop(async_q, timeout);
 
        if (task)
            g_queue_insert_sorted(int_q, task, compare_func, NULL);
    }
 
    g_queue_free(int_q);
    return NULL;
}
 
void
push_work(GAsyncQueue *aq, int delay_ms, int number_to_print)
{
    delay_ms = delay_ms < 0 ? 0 : delay_ms;
 
    struct timespec now = { 0 };
    clock_gettime(CLOCK_REALTIME, &now);
 
    struct timespec then = now;
    then.tv_sec += delay_ms / 1000;
    then.tv_nsec += (delay_ms % 1000) * 1000 * 1000;
    while (then.tv_nsec >= 1000 * 1000 * 1000) {
        then.tv_sec += 1;
        then.tv_nsec -= 1000 * 1000 * 1000;
    }
 
    struct task_s *task = malloc(sizeof(*task));
    task->when = then;
    task->number_to_print = number_to_print;
 
    g_async_queue_push(aq, task);
}
 
int
main(void)
{
    GAsyncQueue *aq = g_async_queue_new();
    GThread *thread = g_thread_new("event_handler_thread", event_handler_thread, aq);
 
    push_work(aq, 2500, 999);
 
    push_work(aq, 0, 1);
    push_work(aq, 1000, 2);
    push_work(aq, -10, 3);
    push_work(aq, 500, 4);
    push_work(aq, 300, 5);
    push_work(aq, 600, 6);
 
    g_thread_join(thread);
    g_async_queue_unref(aq);
 
    return 0;
}
 

Комментариев нет:

Отправить комментарий