52 int elsize,
int consumer_pid,
int signal_when_queue_non_empty)
55 pthread_mutexattr_t attr;
56 pthread_condattr_t cattr;
60 memset (q, 0,
sizeof (*q));
64 q->consumer_pid = consumer_pid;
65 q->signal_when_queue_non_empty = signal_when_queue_non_empty;
67 memset (&attr, 0,
sizeof (attr));
68 memset (&cattr, 0,
sizeof (cattr));
70 if (pthread_mutexattr_init (&attr))
72 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
74 if (pthread_mutex_init (&q->mutex, &attr))
76 if (pthread_mutexattr_destroy (&attr))
78 if (pthread_condattr_init (&cattr))
81 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
83 if (pthread_cond_init (&q->condvar, &cattr))
85 if (pthread_condattr_destroy (&cattr))
97 (void) pthread_mutex_destroy (&q->mutex);
98 (void) pthread_cond_destroy (&q->condvar);
105 pthread_mutex_lock (&q->mutex);
111 pthread_mutex_unlock (&q->mutex);
117 return q->cursize == q->maxsize;
127 int need_broadcast = 0;
131 while (q->cursize == q->maxsize)
133 (void) pthread_cond_wait (&q->condvar, &q->mutex);
137 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
143 need_broadcast = (q->cursize == 1);
145 if (q->tail == q->maxsize)
150 (void) pthread_cond_broadcast (&q->condvar);
151 if (q->signal_when_queue_non_empty)
152 kill (q->consumer_pid, q->signal_when_queue_non_empty);
162 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
165 q->tail = (q->tail + 1) % q->maxsize;
177 int need_broadcast = 0;
182 if (pthread_mutex_trylock (&q->mutex))
188 pthread_mutex_lock (&q->mutex);
194 pthread_mutex_unlock (&q->mutex);
197 while (q->cursize == q->maxsize)
199 (void) pthread_cond_wait (&q->condvar, &q->mutex);
203 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
209 need_broadcast = (q->cursize == 1);
211 if (q->tail == q->maxsize)
216 (void) pthread_cond_broadcast (&q->condvar);
217 if (q->signal_when_queue_non_empty)
218 kill (q->consumer_pid, q->signal_when_queue_non_empty);
220 pthread_mutex_unlock (&q->mutex);
232 int need_broadcast = 0;
237 if (pthread_mutex_trylock (&q->mutex))
243 pthread_mutex_lock (&q->mutex);
249 pthread_mutex_unlock (&q->mutex);
252 while (q->cursize + 1 == q->maxsize)
254 (void) pthread_cond_wait (&q->condvar, &q->mutex);
258 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
264 if (q->tail == q->maxsize)
267 need_broadcast = (q->cursize == 1);
269 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
275 if (q->tail == q->maxsize)
280 (void) pthread_cond_broadcast (&q->condvar);
281 if (q->signal_when_queue_non_empty)
282 kill (q->consumer_pid, q->signal_when_queue_non_empty);
284 pthread_mutex_unlock (&q->mutex);
297 int need_broadcast = 0;
303 if (pthread_mutex_trylock (&q->mutex))
309 pthread_mutex_lock (&q->mutex);
315 pthread_mutex_unlock (&q->mutex);
323 while (q->cursize == 0 && rc == 0)
325 rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
329 pthread_mutex_unlock (&q->mutex);
335 while (q->cursize == 0)
337 (void) pthread_cond_wait (&q->condvar, &q->mutex);
342 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
347 if (q->cursize == q->maxsize)
352 if (q->head == q->maxsize)
356 (void) pthread_cond_broadcast (&q->condvar);
358 pthread_mutex_unlock (&q->mutex);
369 pthread_mutex_lock (&q->mutex);
372 pthread_mutex_unlock (&q->mutex);
376 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
380 need_broadcast = (q->cursize == q->maxsize / 2);
385 pthread_mutex_unlock (&q->mutex);
388 (void) pthread_cond_broadcast (&q->condvar);
400 while (q->cursize == 0)
404 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
407 q->head = (q->head + 1) % q->maxsize;
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
int svm_queue_is_full(svm_queue_t *q)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
void svm_queue_unlock(svm_queue_t *q)
svm_queue_t * svm_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
blocking call, return on signal or time-out
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
static f64 unix_time_now(void)
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
void svm_queue_lock(svm_queue_t *q)
#define clib_memcpy(a, b, c)
void svm_queue_free(svm_queue_t *q)
static void clib_mem_free(void *p)
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
#define clib_unix_warning(format, args...)
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
#define CLIB_CACHE_LINE_BYTES