Example: Summing integersΒΆ

This example uses a Varon-T disruptor queue to produce one million integers and sum them up. The queue uses a single producer and a single consumer, each using the vrt_yield_strategy_threaded() described in yield strategies.

Recall that Varon-T depends on the libcork library, which is included in the following block.

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

#include <libcork/core.h>
#include <libcork/helpers/errors.h>
#include <vrt.h>

The first coding task is to define the integer value and value types based on vrt_value and vrt_value_type. The following code demonstrates “subclassing vrt_value as an embedded C struct. The value type, however, is provided as a static instance of vrt_value_type, which has an interface to two functions responsible for allocating and deallocating value instances. vrt_integer_value_type(void)() is a helper function for accessing the static value type _vrt_integer_value_type.

/* --------------------------------------------------------------
 * Integer value and type
 */

struct vrt_integer_value {
    struct vrt_value  parent;
    int32_t  value;
};

static struct vrt_value *
vrt_integer_value_new(struct vrt_value_type *type)
{
    struct vrt_integer_value  *self = cork_new(struct vrt_integer_value);
    return &self->parent;
}

static void
vrt_integer_value_free(struct vrt_value_type *type, struct vrt_value *vself)
{
    struct vrt_integer_value  *iself =
        cork_container_of(vself, struct vrt_integer_value, parent);
    free(iself);
}

static struct vrt_value_type  _vrt_integer_value_type = {
    vrt_integer_value_new,
    vrt_integer_value_free
};

static struct vrt_value_type *
vrt_integer_value_type(void)
{
    return &_vrt_integer_value_type;
}

An integer generator is a straightforward implementation with an embedded producer pointer and a field to store the number of integers to generate. The for() loop simply iterates over the number of integers to produce, claims a value instance from the queue, populates the value() field of the value instance with the current integer, and publishes the value instance back to the queue. After all integers are published, the generator then pushes an EOF() signal to the queue to indicate it has finished.

/* --------------------------------------------------------------
 * Integer producer
 */

struct integer_generator {
    struct vrt_producer  *p;
    int64_t  count;
};

void *
generate_integers(void *ud)
{
    struct integer_generator  *c = ud;
    int32_t  i;
    for (i = 0; i < c->count; i++) {
        struct vrt_value  *vvalue;
        struct vrt_integer_value  *ivalue;
        rpi_check(vrt_producer_claim(c->p, &vvalue));
        ivalue = cork_container_of
                    (vvalue, struct vrt_integer_value, parent);
        ivalue->value = i;
        rpi_check(vrt_producer_publish(c->p));
    }
    rpi_check(vrt_producer_eof(c->p));
    return NULL;
}

A summing consumer is similar to the generator producer in a straightforward implementation. A “summer” is comprised of a consumer client and field for tracking the sum. The consumer iterates over the available value instances in the queue until an EOF is encountered. The value from each value instance is added to the current sum.

/* --------------------------------------------------------------
 * Integer consumer
 */

struct integer_summer {
    struct vrt_consumer  *c;
    int64_t  *sum;
};

void *
sum_integers(void *ud)
{
    int rc;
    struct integer_summer  *c = ud;
    struct vrt_value  *vvalue;
    int64_t  sum = 0;
    while ((rc = vrt_consumer_next(c->c, &vvalue)) != VRT_QUEUE_EOF) {
        if (rc == 0) {
            struct vrt_integer_value  *ivalue =
                cork_container_of(vvalue, struct vrt_integer_value, parent);
            sum += ivalue->value;
        }
    }
    if (rc == VRT_QUEUE_EOF) {
        *c->sum = sum;
    }
    return NULL;
}

The disruptor queue is implemented where each client (producer and consumer) executes in a separate thread. The vrt_queue_client structure is a wrapper around queue clients that generalizes vrt_queue_threaded(), and it is demonstrated as a design pattern. The critical steps are thread management (create and join) and configuration of the appropriate yield strategies for producers and consumers.

/* --------------------------------------------------------------
 * Threaded queue
 */

struct vrt_queue_client {
    void *(*run)(void *);
    void *ud;
};

int
vrt_queue_threaded(struct vrt_queue *q, struct vrt_queue_client *clients)
{
    size_t  i;
    size_t  client_count = 0;
    struct vrt_queue_client  *client;
    for (client = clients; client->run != NULL; client++) {
        client_count++;
    }

    pthread_t  *tids;
    tids = cork_calloc(client_count, sizeof(pthread_t));

    /* Choose a yield strategy */
    for (i = 0; i < cork_array_size(&q->producers); i++) {
        struct vrt_producer  *p = cork_array_at(&q->producers, i);
        p->yield = vrt_yield_strategy_threaded();
    }

    for (i = 0; i < cork_array_size(&q->consumers); i++) {
        struct vrt_consumer  *c = cork_array_at(&q->consumers, i);
        c->yield = vrt_yield_strategy_threaded();
    }

    /* Create the client threads */
    for (i = 0; i < client_count; i++) {
        pthread_create(&tids[i], NULL, clients[i].run, clients[i].ud);
    }

    for (i = 0; i < client_count; i++) {
        pthread_join(tids[i], NULL);
    }

    free(tids);
    return 0;
}

The main function drives the disuptor queue. After successful allocation of the queue, producer, and consumer, the generator and summer are configured and added to the queue as the clients. Recall that each application client (generator and summer) has an embedded queue-specific client (producer and consumer, respectively). The disruptor queue is invoked through the call vrt_queue_threaded(). Note that result corresponds to sum in integer_summer.

int
main(int argc, const char **argv)
{
    struct vrt_queue  *q;
    struct vrt_producer  *p;
    struct vrt_consumer  *c;
    int64_t  result;
    size_t  QUEUE_SIZE = 64;

    /* Note that the parameter for queue size is a power of 2. */
    rip_check(q = vrt_queue_new("queue_sum", vrt_integer_value_type(),
                                QUEUE_SIZE));
    rip_check(p = vrt_producer_new("generator", 4, q));
    rip_check(c = vrt_consumer_new("summer", q));

    struct integer_generator  integer_generator = {
        p, 1000000
    };

    struct integer_summer  integer_summer = {
        c, &result
    };

    struct vrt_queue_client  clients[] = {
        { generate_integers, &integer_generator },
        { sum_integers, &integer_summer },
        { NULL, NULL }
    };

    rii_check(vrt_queue_threaded(q, clients));

    fprintf(stdout, "Result: %" PRId64 "\n", result);
    vrt_queue_free(q);
    return 0;
}