FD.io VPP  v16.12-rc0-308-g931be3a
Vector Packet Processing
unix_shared_memory_queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
28 #include <signal.h>
29 
30 /*
31  * unix_shared_memory_queue_init
32  *
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  * be popular choices.
36  * coid = consumer coid, from ChannelCreate
37  * pid = consumer pid
38  * pulse_code = pulse code consumer expects
39  * pulse_value = pulse value consumer expects
40  * consumer_prio = consumer's priority, so pulses won't change
41  * the consumer's priority.
42  *
43  * The idea is to call this function in the queue consumer,
44  * and e-mail the queue pointer to the producer(s).
45  *
46  * The spp process / main thread allocates one of these
47  * at startup; its main input queue. The spp main input queue
48  * has a pointer to it in the shared memory segment header.
49  *
50  * You probably want to be on an svm data heap before calling this
51  * function.
52  */
55  int elsize,
56  int consumer_pid,
57  int signal_when_queue_non_empty)
58 {
60  pthread_mutexattr_t attr;
61  pthread_condattr_t cattr;
62 
64  + nels * elsize, CLIB_CACHE_LINE_BYTES);
65  memset (q, 0, sizeof (*q));
66 
67  q->elsize = elsize;
68  q->maxsize = nels;
69  q->consumer_pid = consumer_pid;
70  q->signal_when_queue_non_empty = signal_when_queue_non_empty;
71 
72  memset (&attr, 0, sizeof (attr));
73  memset (&cattr, 0, sizeof (attr));
74 
75  if (pthread_mutexattr_init (&attr))
76  clib_unix_warning ("mutexattr_init");
77  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
78  clib_unix_warning ("pthread_mutexattr_setpshared");
79  if (pthread_mutex_init (&q->mutex, &attr))
80  clib_unix_warning ("mutex_init");
81  if (pthread_mutexattr_destroy (&attr))
82  clib_unix_warning ("mutexattr_destroy");
83  if (pthread_condattr_init (&cattr))
84  clib_unix_warning ("condattr_init");
85  /* prints funny-looking messages in the Linux target */
86  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
87  clib_unix_warning ("condattr_setpshared");
88  if (pthread_cond_init (&q->condvar, &cattr))
89  clib_unix_warning ("cond_init1");
90  if (pthread_condattr_destroy (&cattr))
91  clib_unix_warning ("cond_init2");
92 
93  return (q);
94 }
95 
96 /*
97  * unix_shared_memory_queue_free
98  */
99 void
101 {
102  (void) pthread_mutex_destroy (&q->mutex);
103  (void) pthread_cond_destroy (&q->condvar);
104  clib_mem_free (q);
105 }
106 
107 void
109 {
110  pthread_mutex_lock (&q->mutex);
111 }
112 
113 void
115 {
116  pthread_mutex_unlock (&q->mutex);
117 }
118 
119 int
121 {
122  return q->cursize == q->maxsize;
123 }
124 
125 /*
126  * unix_shared_memory_queue_add_nolock
127  */
128 int
130  u8 * elem)
131 {
132  i8 *tailp;
133  int need_broadcast = 0;
134 
135  if (PREDICT_FALSE (q->cursize == q->maxsize))
136  {
137  while (q->cursize == q->maxsize)
138  {
139  (void) pthread_cond_wait (&q->condvar, &q->mutex);
140  }
141  }
142 
143  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
144  clib_memcpy (tailp, elem, q->elsize);
145 
146  q->tail++;
147  q->cursize++;
148 
149  need_broadcast = (q->cursize == 1);
150 
151  if (q->tail == q->maxsize)
152  q->tail = 0;
153 
154  if (need_broadcast)
155  {
156  (void) pthread_cond_broadcast (&q->condvar);
157  if (q->signal_when_queue_non_empty)
158  kill (q->consumer_pid, q->signal_when_queue_non_empty);
159  }
160  return 0;
161 }
162 
163 int
165 {
166  i8 *tailp;
167 
168  if (PREDICT_FALSE (q->cursize == q->maxsize))
169  {
170  while (q->cursize == q->maxsize)
171  ;
172  }
173 
174  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
175  clib_memcpy (tailp, elem, q->elsize);
176 
177  q->tail++;
178  q->cursize++;
179 
180  if (q->tail == q->maxsize)
181  q->tail = 0;
182  return 0;
183 }
184 
185 
186 /*
187  * unix_shared_memory_queue_add
188  */
189 int
191  u8 * elem, int nowait)
192 {
193  i8 *tailp;
194  int need_broadcast = 0;
195 
196  if (nowait)
197  {
198  /* zero on success */
199  if (pthread_mutex_trylock (&q->mutex))
200  {
201  return (-1);
202  }
203  }
204  else
205  pthread_mutex_lock (&q->mutex);
206 
207  if (PREDICT_FALSE (q->cursize == q->maxsize))
208  {
209  if (nowait)
210  {
211  pthread_mutex_unlock (&q->mutex);
212  return (-2);
213  }
214  while (q->cursize == q->maxsize)
215  {
216  (void) pthread_cond_wait (&q->condvar, &q->mutex);
217  }
218  }
219 
220  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
221  clib_memcpy (tailp, elem, q->elsize);
222 
223  q->tail++;
224  q->cursize++;
225 
226  need_broadcast = (q->cursize == 1);
227 
228  if (q->tail == q->maxsize)
229  q->tail = 0;
230 
231  if (need_broadcast)
232  {
233  (void) pthread_cond_broadcast (&q->condvar);
234  if (q->signal_when_queue_non_empty)
235  kill (q->consumer_pid, q->signal_when_queue_non_empty);
236  }
237  pthread_mutex_unlock (&q->mutex);
238 
239  return 0;
240 }
241 
242 /*
243  * unix_shared_memory_queue_sub
244  */
245 int
247  u8 * elem, int nowait)
248 {
249  i8 *headp;
250  int need_broadcast = 0;
251 
252  if (nowait)
253  {
254  /* zero on success */
255  if (pthread_mutex_trylock (&q->mutex))
256  {
257  return (-1);
258  }
259  }
260  else
261  pthread_mutex_lock (&q->mutex);
262 
263  if (PREDICT_FALSE (q->cursize == 0))
264  {
265  if (nowait)
266  {
267  pthread_mutex_unlock (&q->mutex);
268  return (-2);
269  }
270  while (q->cursize == 0)
271  {
272  (void) pthread_cond_wait (&q->condvar, &q->mutex);
273  }
274  }
275 
276  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
277  clib_memcpy (elem, headp, q->elsize);
278 
279  q->head++;
280  if (q->cursize == q->maxsize)
281  need_broadcast = 1;
282 
283  q->cursize--;
284 
285  if (q->head == q->maxsize)
286  q->head = 0;
287 
288  if (need_broadcast)
289  (void) pthread_cond_broadcast (&q->condvar);
290 
291  pthread_mutex_unlock (&q->mutex);
292 
293  return 0;
294 }
295 
296 int
298 {
299  i8 *headp;
300 
301  if (PREDICT_FALSE (q->cursize == 0))
302  {
303  while (q->cursize == 0)
304  ;
305  }
306 
307  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
308  clib_memcpy (elem, headp, q->elsize);
309 
310  q->head++;
311  q->cursize--;
312 
313  if (q->head == q->maxsize)
314  q->head = 0;
315  return 0;
316 }
317 
318 /*
319  * fd.io coding-style-patch-verification: ON
320  *
321  * Local Variables:
322  * eval: (c-set-style "gnu")
323  * End:
324  */
int unix_shared_memory_queue_is_full(unix_shared_memory_queue_t *q)
void unix_shared_memory_queue_free(unix_shared_memory_queue_t *q)
char i8
Definition: types.h:45
int unix_shared_memory_queue_add_nolock(unix_shared_memory_queue_t *q, u8 *elem)
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define PREDICT_FALSE(x)
Definition: clib.h:97
int unix_shared_memory_queue_sub_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_lock(unix_shared_memory_queue_t *q)
int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define clib_memcpy(a, b, c)
Definition: string.h:64
#define clib_unix_warning(format, args...)
Definition: error.h:68
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
static void clib_mem_free(void *p)
Definition: mem.h:176
int unix_shared_memory_queue_add_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_unlock(unix_shared_memory_queue_t *q)
unsigned char u8
Definition: types.h:56
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:117
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
struct _unix_shared_memory_queue unix_shared_memory_queue_t