FD.io VPP  v19.01.1-17-ge106252
Vector Packet Processing
message_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19 
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22 
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <svm/queue.h>
26 
27 typedef struct svm_msg_q_ring_
28 {
29  volatile u32 cursize; /**< current size of the ring */
30  u32 nitems; /**< max size of the ring */
31  volatile u32 head; /**< current head (for dequeue) */
32  volatile u32 tail; /**< current tail (for enqueue) */
33  u32 elsize; /**< size of an element */
34  u8 *data; /**< chunk of memory for msg data */
35 } __clib_packed svm_msg_q_ring_t;
36 
37 typedef struct svm_msg_q_
38 {
39  svm_queue_t *q; /**< queue for exchanging messages */
40  svm_msg_q_ring_t *rings; /**< rings with message data*/
41 } __clib_packed svm_msg_q_t;
42 
43 typedef struct svm_msg_q_ring_cfg_
44 {
47  void *data;
49 
50 typedef struct svm_msg_q_cfg_
51 {
52  int consumer_pid; /**< pid of msg consumer */
53  u32 q_nitems; /**< msg queue size (not rings) */
54  u32 n_rings; /**< number of msg rings */
55  svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
57 
58 typedef union
59 {
60  struct
61  {
62  u32 ring_index; /**< ring index, could be u8 */
63  u32 elt_index; /**< index in ring */
64  };
67 
68 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
69 /**
70  * Allocate message queue
71  *
72  * Allocates a message queue on the heap. Based on the configuration options,
73  * apart from the message queue this also allocates (one or multiple)
74  * shared-memory rings for the messages.
75  *
76  * @param cfg configuration options: queue len, consumer pid,
77  * ring configs
78  * @return message queue
79  */
81 
82 /**
83  * Free message queue
84  *
85  * @param mq message queue to be freed
86  */
87 void svm_msg_q_free (svm_msg_q_t * mq);
88 
89 /**
90  * Allocate message buffer
91  *
92  * Message is allocated on the first available ring capable of holding
93  * the requested number of bytes.
94  *
95  * @param mq message queue
96  * @param nbytes number of bytes needed for message
97  * @return message structure pointing to the ring and position
98  * allocated
99  */
101 
102 /**
103  * Allocate message buffer on ring
104  *
105  * Message is allocated, on requested ring. The caller MUST check that
106  * the ring is not full.
107  *
108  * @param mq message queue
109  * @param ring_index ring on which the allocation should occur
110  * @return message structure pointing to the ring and position
111  * allocated
112  */
114 
115 /**
116  * Lock message queue and allocate message buffer on ring
117  *
118  * This should be used when multiple writers/readers are expected to
119  * compete for the rings/queue. Message should be enqueued by calling
120  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
121  * the message in enqueued.
122  *
123  * @param mq message queue
124  * @param ring_index ring on which the allocation should occur
125  * @param noblock flag that indicates if request should block
126  * @param msg pointer to message to be filled in
127  * @return 0 on success, negative number otherwise
128  */
130  u8 noblock, svm_msg_q_msg_t * msg);
131 
132 /**
133  * Free message buffer
134  *
135  * Marks message buffer on ring as free.
136  *
137  * @param mq message queue
138  * @param msg message to be freed
139  */
141 
142 /**
143  * Producer enqueue one message to queue
144  *
145  * Prior to calling this, the producer should've obtained a message buffer
146  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
147  *
148  * @param mq message queue
149  * @param msg message (pointer to ring position) to be enqueued
150  * @param nowait flag to indicate if request is blocking or not
151  * @return success status
152  */
153 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
154 
155 /**
156  * Producer enqueue one message to queue with mutex held
157  *
158  * Prior to calling this, the producer should've obtained a message buffer
159  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
160  * the queue mutex is held.
161  *
162  * @param mq message queue
163  * @param msg message (pointer to ring position) to be enqueued
164  * @return success status
165  */
167 
168 /**
169  * Consumer dequeue one message from queue
170  *
171  * This returns the message pointing to the data in the message rings.
172  * The consumer is expected to call @ref svm_msg_q_free_msg once it
173  * finishes processing/copies the message data.
174  *
175  * @param mq message queue
176  * @param msg pointer to structure where message is to be received
177  * @param cond flag that indicates if request should block or not
178  * @param time time to wait if condition it SVM_Q_TIMEDWAIT
179  * @return success status
180  */
181 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
182  svm_q_conditional_wait_t cond, u32 time);
183 
184 /**
185  * Consumer dequeue one message from queue with mutex held
186  *
187  * Returns the message pointing to the data in the message rings under the
188  * assumption that the message queue lock is already held. The consumer is
189  * expected to call @ref svm_msg_q_free_msg once it finishes
190  * processing/copies the message data.
191  *
192  * @param mq message queue
193  * @param msg pointer to structure where message is to be received
194  * @return success status
195  */
197 
198 /**
199  * Get data for message in queue
200  *
201  * @param mq message queue
202  * @param msg message for which the data is requested
203  * @return pointer to data
204  */
206 
207 /**
208  * Get message queue ring
209  *
210  * @param mq message queue
211  * @param ring_index index of ring
212  * @return pointer to ring
213  */
214 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
215 
216 /**
217  * Set event fd for queue consumer
218  *
219  * If set, queue will exclusively use eventfds for signaling. Moreover,
220  * afterwards, the queue should only be used in non-blocking mode. Waiting
221  * for events should be done externally using something like epoll.
222  *
223  * @param mq message queue
224  * @param fd consumer eventfd
225  */
226 void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
227 
228 /**
229  * Set event fd for queue producer
230  *
231  * If set, queue will exclusively use eventfds for signaling. Moreover,
232  * afterwards, the queue should only be used in non-blocking mode. Waiting
233  * for events should be done externally using something like epoll.
234  *
235  * @param mq message queue
236  * @param fd producer eventfd
237  */
238 void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
239 
240 /**
241  * Allocate event fd for queue consumer
242  */
244 
245 /**
246  * Allocate event fd for queue consumer
247  */
249 
250 /**
251  * Check if message queue is full
252  */
253 static inline u8
255 {
256  return (mq->q->cursize == mq->q->maxsize);
257 }
258 
259 static inline u8
261 {
262  ASSERT (ring_index < vec_len (mq->rings));
263  return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
264 }
265 
266 /**
267  * Check if message queue is empty
268  */
269 static inline u8
271 {
272  return (mq->q->cursize == 0);
273 }
274 
275 /**
276  * Check length of message queue
277  */
278 static inline u32
280 {
281  return mq->q->cursize;
282 }
283 
284 /**
285  * Check if message is invalid
286  */
287 static inline u8
289 {
290  return (msg->as_u64 == (u64) ~ 0);
291 }
292 
293 /**
294  * Try locking message queue
295  */
296 static inline int
298 {
299  return pthread_mutex_trylock (&mq->q->mutex);
300 }
301 
302 /**
303  * Lock, or block trying, the message queue
304  */
305 static inline int
307 {
308  return pthread_mutex_lock (&mq->q->mutex);
309 }
310 
311 /**
312  * Unlock message queue
313  */
314 static inline void
316 {
317  /* The other side of the connection is not polling */
318  if (mq->q->cursize < (mq->q->maxsize / 8))
319  (void) pthread_cond_broadcast (&mq->q->condvar);
320  pthread_mutex_unlock (&mq->q->mutex);
321 }
322 
323 /**
324  * Wait for message queue event
325  *
326  * Must be called with mutex held. The queue only works non-blocking
327  * with eventfds, so handle blocking calls as an exception here.
328  */
329 static inline void
331 {
332  svm_queue_wait (mq->q);
333 }
334 
335 /**
336  * Timed wait for message queue event
337  *
338  * Must be called with mutex held.
339  *
340  * @param mq message queue
341  * @param timeout time in seconds
342  */
343 static inline int
344 svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
345 {
346  return svm_queue_timedwait (mq->q, timeout);
347 }
348 
349 static inline int
351 {
352  return mq->q->consumer_evtfd;
353 }
354 
355 static inline int
357 {
358  return mq->q->producer_evtfd;
359 }
360 
361 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
362 
363 /*
364  * fd.io coding-style-patch-verification: ON
365  *
366  * Local Variables:
367  * eval: (c-set-style "gnu")
368  * End:
369  */
static u8 svm_msg_q_msg_is_invalid(svm_msg_q_msg_t *msg)
Check if message is invalid.
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
unsigned long u64
Definition: types.h:89
void svm_msg_q_set_consumer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue consumer.
static int svm_msg_q_get_producer_eventfd(svm_msg_q_t *mq)
static u8 svm_msg_q_is_empty(svm_msg_q_t *mq)
Check if message queue is empty.
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
unsigned char u8
Definition: types.h:56
void svm_msg_q_set_producer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue producer.
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:40
unsigned int u32
Definition: types.h:88
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
Definition: queue.c:151
struct svm_msg_q_cfg_ svm_msg_q_cfg_t
static void svm_msg_q_wait(svm_msg_q_t *mq)
Wait for message queue event.
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_ svm_msg_q_ring_t
struct svm_msg_q_ring_cfg_ svm_msg_q_ring_cfg_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:40
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
static int svm_msg_q_timedwait(svm_msg_q_t *mq, double timeout)
Timed wait for message queue event.
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
static int svm_msg_q_get_consumer_eventfd(svm_msg_q_t *mq)
#define ASSERT(truth)
int svm_msg_q_alloc_producer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
struct svm_msg_q_ svm_msg_q_t
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:27
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
Definition: queue.c:183
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:95
u32 nitems
max size of the ring
Definition: message_queue.h:30