提交 26a9317dd6c3ac4fc51058bda0650af587c20f38

作者 LJH 李佳桓
1 个父辈 131499a3

add

正在显示 1 个修改的文件 包含 232 行增加0 行删除
  1 +/*
  2 + * event_msgqueue.c
  3 + *
  4 + * Libevent threaded message passing primitives
  5 + *
  6 + * Andrew Danforth <acd@weirdness.net>, October 2006
  7 + *
  8 + * Copyright (c) Andrew Danforth, 2006
  9 + *
  10 + */
  11 +
  12 +#include <stdlib.h>
  13 +#include <string.h>
  14 +#include <sys/types.h>
  15 +
  16 +#include "spthread.hpp"
  17 +
  18 +#include "spporting.hpp"
  19 +#include "event_msgqueue.h"
  20 +
  21 +struct circqueue {
  22 + unsigned int head;
  23 + unsigned int tail;
  24 + unsigned int count;
  25 + unsigned int max_entries;
  26 + unsigned int array_elements;
  27 + void **entries;
  28 +};
  29 +
  30 +#define DEFAULT_UNBOUNDED_QUEUE_SIZE 1024
  31 +
  32 +struct event_msgqueue {
  33 + int push_fd;
  34 + int pop_fd;
  35 + int unlock_between_callbacks;
  36 +
  37 + struct event queue_ev;
  38 +
  39 + sp_thread_mutex_t lock;
  40 + void (*callback)(void *, void *);
  41 + void *cbarg;
  42 + struct circqueue *queue;
  43 +};
  44 +
  45 +static unsigned int nextpow2(unsigned int num) {
  46 + --num;
  47 + num |= num >> 1;
  48 + num |= num >> 2;
  49 + num |= num >> 4;
  50 + num |= num >> 8;
  51 + num |= num >> 16;
  52 + return ++num;
  53 +}
  54 +
  55 +#define circqueue_get_length(q) ((q)->count)
  56 +#define circqueue_is_empty(q) (!circqueue_get_length(q))
  57 +#define circqueue_is_full(q) ((q)->count == (q)->array_elements)
  58 +
  59 +static struct circqueue *circqueue_new(unsigned int size) {
  60 + struct circqueue *cq;
  61 +
  62 + if (!(cq = calloc(1, sizeof(struct circqueue))))
  63 + return(NULL);
  64 +
  65 + cq->max_entries = size;
  66 + if (!size || !(cq->array_elements = nextpow2(size)))
  67 + cq->array_elements = DEFAULT_UNBOUNDED_QUEUE_SIZE;
  68 + cq->entries = malloc(sizeof(void *) * cq->array_elements);
  69 + if (!cq->entries) {
  70 + free(cq);
  71 + return(NULL);
  72 + }
  73 +
  74 + return(cq);
  75 +}
  76 +
  77 +static void circqueue_destroy(struct circqueue *cq) {
  78 + free(cq->entries);
  79 + free(cq);
  80 +}
  81 +
  82 +static int circqueue_grow(struct circqueue *cq) {
  83 + void **newents;
  84 + unsigned int newsize = cq->array_elements << 1;
  85 + unsigned int headchunklen = 0, tailchunklen = 0;
  86 +
  87 + if (!(newents = malloc(sizeof(void *) * newsize)))
  88 + return(-1);
  89 +
  90 + if (cq->head < cq->tail)
  91 + headchunklen = cq->tail - cq->head;
  92 + else {
  93 + headchunklen = cq->array_elements - cq->head;
  94 + tailchunklen = cq->tail;
  95 + }
  96 +
  97 + memcpy(newents, &cq->entries[cq->head], sizeof(void *) * headchunklen);
  98 + if (tailchunklen)
  99 + memcpy(&newents[headchunklen], cq->entries, sizeof(void *) * tailchunklen);
  100 +
  101 + cq->head = 0;
  102 + cq->tail = headchunklen + tailchunklen;
  103 + cq->array_elements = newsize;
  104 +
  105 + free(cq->entries);
  106 + cq->entries = newents;
  107 +
  108 + return(0);
  109 +}
  110 +
  111 +static int circqueue_push_tail(struct circqueue *cq, void *elem) {
  112 + if (cq->max_entries) {
  113 + if (cq->count == cq->max_entries)
  114 + return(-1);
  115 + } else if (circqueue_is_full(cq) && circqueue_grow(cq) != 0)
  116 + return(-1);
  117 +
  118 + cq->count++;
  119 + cq->entries[cq->tail++] = elem;
  120 + cq->tail &= cq->array_elements - 1;
  121 +
  122 + return(0);
  123 +}
  124 +
  125 +static void *circqueue_pop_head(struct circqueue *cq) {
  126 + void *data;
  127 +
  128 + if (!cq->count)
  129 + return(NULL);
  130 +
  131 + cq->count--;
  132 + data = cq->entries[cq->head++];
  133 + cq->head &= cq->array_elements - 1;
  134 +
  135 + return(data);
  136 +}
  137 +
  138 +static void msgqueue_pop(int fd, short flags, void *arg) {
  139 + struct event_msgqueue *msgq = arg;
  140 + char buf[64];
  141 +
  142 + recv(fd, buf, sizeof(buf),0);
  143 +
  144 + sp_thread_mutex_lock(&msgq->lock);
  145 + while(!circqueue_is_empty(msgq->queue)) {
  146 + void *qdata;
  147 +
  148 + qdata = circqueue_pop_head(msgq->queue);
  149 +
  150 + if (msgq->unlock_between_callbacks)
  151 + sp_thread_mutex_unlock(&msgq->lock);
  152 +
  153 + msgq->callback(qdata, msgq->cbarg);
  154 +
  155 + if (msgq->unlock_between_callbacks)
  156 + sp_thread_mutex_lock(&msgq->lock);
  157 + }
  158 + sp_thread_mutex_unlock(&msgq->lock);
  159 +}
  160 +
  161 +struct event_msgqueue *msgqueue_new(struct event_base *base, unsigned int max_size, void (*callback)(void *, void *), void *cbarg) {
  162 + struct event_msgqueue *msgq;
  163 + struct circqueue *cq;
  164 + int fds[2];
  165 +
  166 + if (!(cq = circqueue_new(max_size)))
  167 + return(NULL);
  168 +
  169 + if (sp_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
  170 + circqueue_destroy(cq);
  171 + return(NULL);
  172 + }
  173 +
  174 + if (!(msgq = malloc(sizeof(struct event_msgqueue)))) {
  175 + circqueue_destroy(cq);
  176 + sp_close(fds[0]);
  177 + sp_close(fds[1]);
  178 + return(NULL);
  179 + }
  180 +
  181 + msgq->push_fd = fds[0];
  182 + msgq->pop_fd = fds[1];
  183 + msgq->queue = cq;
  184 + msgq->callback = callback;
  185 + msgq->cbarg = cbarg;
  186 + sp_thread_mutex_init(&msgq->lock, NULL);
  187 + event_set(&msgq->queue_ev, msgq->pop_fd, EV_READ | EV_PERSIST, msgqueue_pop, msgq);
  188 + event_base_set(base, &msgq->queue_ev);
  189 + event_add(&msgq->queue_ev, NULL);
  190 +
  191 + msgq->unlock_between_callbacks = 1;
  192 +
  193 + return(msgq);
  194 +}
  195 +
  196 +void msgqueue_destroy(struct event_msgqueue *msgq)
  197 +{
  198 + for( ; msgqueue_length(msgq) > 0; ) {
  199 + sleep( 1 );
  200 + }
  201 +
  202 + event_del(&msgq->queue_ev);
  203 + circqueue_destroy(msgq->queue);
  204 + sp_close(msgq->push_fd);
  205 + sp_close(msgq->pop_fd);
  206 + free(msgq);
  207 +}
  208 +
  209 +int msgqueue_push(struct event_msgqueue *msgq, void *msg) {
  210 + const char buf[1] = { 0 };
  211 + int r = 0;
  212 +
  213 + sp_thread_mutex_lock(&msgq->lock);
  214 + if ((r = circqueue_push_tail(msgq->queue, msg)) == 0) {
  215 + if (circqueue_get_length(msgq->queue) == 1)
  216 + send(msgq->push_fd, buf, 1,0);
  217 + }
  218 + sp_thread_mutex_unlock(&msgq->lock);
  219 +
  220 + return(r);
  221 +}
  222 +
  223 +unsigned int msgqueue_length(struct event_msgqueue *msgq) {
  224 + unsigned int len;
  225 +
  226 + sp_thread_mutex_lock(&msgq->lock);
  227 + len = circqueue_get_length(msgq->queue);
  228 + sp_thread_mutex_unlock(&msgq->lock);
  229 +
  230 + return(len);
  231 +}
  232 +
注册登录 后发表评论