FD.io VPP  v19.08-27-gf4dcae4
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 #include <sys/eventfd.h>
19 
20 static inline svm_msg_q_ring_t *
22 {
23  return vec_elt_at_index (mq->rings, ring_index);
24 }
25 
27 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
28 {
29  return svm_msg_q_ring_inline (mq, ring_index);
30 }
31 
32 static inline void *
34 {
35  ASSERT (elt_index < ring->nitems);
36  return (ring->data + elt_index * ring->elsize);
37 }
38 
41 {
42  svm_msg_q_ring_cfg_t *ring_cfg;
43  uword rings_sz = 0, mq_sz;
44  svm_msg_q_ring_t *ring;
45  u8 *base, *rings_ptr;
46  vec_header_t *vh;
47  u32 vec_sz, q_sz;
48  svm_msg_q_t *mq;
49  int i;
50 
51  ASSERT (cfg);
52 
53  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
54  for (i = 0; i < cfg->n_rings; i++)
55  {
56  if (cfg->ring_cfgs[i].data)
57  continue;
58  ring_cfg = &cfg->ring_cfgs[i];
59  rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
60  }
61 
62  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
63  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
65  if (!base)
66  return 0;
67 
68  mq = (svm_msg_q_t *) base;
69  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
70  sizeof (svm_msg_q_msg_t));
71  mq->q->consumer_pid = cfg->consumer_pid;
72  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
73  vh->len = cfg->n_rings;
74  mq->rings = (svm_msg_q_ring_t *) (vh + 1);
75  rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
76  for (i = 0; i < cfg->n_rings; i++)
77  {
78  ring = &mq->rings[i];
79  ring->elsize = cfg->ring_cfgs[i].elsize;
80  ring->nitems = cfg->ring_cfgs[i].nitems;
81  ring->cursize = ring->head = ring->tail = 0;
82  if (cfg->ring_cfgs[i].data)
83  ring->data = cfg->ring_cfgs[i].data;
84  else
85  {
86  ring->data = rings_ptr;
87  rings_ptr += (uword) ring->nitems * ring->elsize;
88  }
89  }
90 
91  return mq;
92 }
93 
94 void
96 {
97  svm_queue_free (mq->q);
98  clib_mem_free (mq);
99 }
100 
103 {
104  svm_msg_q_msg_t msg;
105  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
106 
107  ASSERT (ring->cursize < ring->nitems);
108  msg.ring_index = ring - mq->rings;
109  msg.elt_index = ring->tail;
110  ring->tail = (ring->tail + 1) % ring->nitems;
111  clib_atomic_fetch_add (&ring->cursize, 1);
112  return msg;
113 }
114 
115 int
117  u8 noblock, svm_msg_q_msg_t * msg)
118 {
119  if (noblock)
120  {
121  if (svm_msg_q_try_lock (mq))
122  return -1;
124  || svm_msg_q_ring_is_full (mq, ring_index)))
125  {
126  svm_msg_q_unlock (mq);
127  return -2;
128  }
129  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
131  {
132  svm_msg_q_unlock (mq);
133  return -2;
134  }
135  }
136  else
137  {
138  svm_msg_q_lock (mq);
139  while (svm_msg_q_is_full (mq)
140  || svm_msg_q_ring_is_full (mq, ring_index))
141  svm_msg_q_wait (mq);
142  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
143  }
144  return 0;
145 }
146 
149 {
150  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
151  svm_msg_q_ring_t *ring;
152 
153  vec_foreach (ring, mq->rings)
154  {
155  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
156  continue;
157  msg.ring_index = ring - mq->rings;
158  msg.elt_index = ring->tail;
159  ring->tail = (ring->tail + 1) % ring->nitems;
160  clib_atomic_fetch_add (&ring->cursize, 1);
161  break;
162  }
163  return msg;
164 }
165 
166 void *
168 {
170  return svm_msg_q_ring_data (ring, msg->elt_index);
171 }
172 
173 void
175 {
176  svm_msg_q_ring_t *ring;
177 
178  ASSERT (vec_len (mq->rings) > msg->ring_index);
179  ring = &mq->rings[msg->ring_index];
180  if (msg->elt_index == ring->head)
181  {
182  ring->head = (ring->head + 1) % ring->nitems;
183  }
184  else
185  {
186  clib_warning ("message out of order");
187  /* for now, expect messages to be processed in order */
188  ASSERT (0);
189  }
190  clib_atomic_fetch_sub (&ring->cursize, 1);
191 }
192 
193 static int
195 {
196  u32 dist1, dist2, tail, head;
197  svm_msg_q_ring_t *ring;
198 
199  if (vec_len (mq->rings) <= msg->ring_index)
200  return 0;
201  ring = &mq->rings[msg->ring_index];
202  tail = ring->tail;
203  head = ring->head;
204 
205  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
206  if (tail == head)
207  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
208  else
209  dist2 = ((ring->nitems + tail) - head) % ring->nitems;
210  return (dist1 < dist2);
211 }
212 
213 int
214 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
215 {
216  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
217  return svm_queue_add (mq->q, (u8 *) msg, nowait);
218 }
219 
220 void
222 {
223  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
224  svm_queue_add_raw (mq->q, (u8 *) msg);
225  svm_msg_q_unlock (mq);
226 }
227 
228 int
230  svm_q_conditional_wait_t cond, u32 time)
231 {
232  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
233 }
234 
235 void
237 {
238  svm_queue_sub_raw (mq->q, (u8 *) msg);
239 }
240 
241 void
243 {
244  mq->q->consumer_evtfd = fd;
245 }
246 
247 void
249 {
250  mq->q->producer_evtfd = fd;
251 }
252 
253 int
255 {
256  int fd;
257  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
258  return -1;
260  return 0;
261 }
262 
263 int
265 {
266  int fd;
267  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
268  return -1;
270  return 0;
271 }
272 
273 /*
274  * fd.io coding-style-patch-verification: ON
275  *
276  * Local Variables:
277  * eval: (c-set-style "gnu")
278  * End:
279  */
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:220
static u8 svm_msg_q_msg_is_invalid(svm_msg_q_msg_t *msg)
Check if message is invalid.
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:239
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
#define clib_atomic_fetch_sub(a, b)
Definition: atomics.h:24
for(i=1;i<=collision_buckets;i++)
int i
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
unsigned char u8
Definition: types.h:56
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:21
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:348
static uword vec_header_bytes(uword header_bytes)
Definition: vec_bootstrap.h:80
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:40
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:95
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
void svm_msg_q_set_producer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue producer.
#define PREDICT_FALSE(x)
Definition: clib.h:111
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:27
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
static void svm_msg_q_wait(svm_msg_q_t *mq)
Wait for message queue event.
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_ svm_msg_q_ring_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:40
Unidirectional shared-memory multi-ring message queue.
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
u32 len
Number of elements in vector (NOT its allocated length).
Definition: vec_bootstrap.h:60
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static void clib_mem_free(void *p)
Definition: mem.h:226
vector header structure
Definition: vec_bootstrap.h:55
struct svm_msg_q_ svm_msg_q_t
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:443
void svm_msg_q_set_consumer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue consumer.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
#define clib_atomic_fetch_add(a, b)
Definition: atomics.h:23
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:161
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
int svm_msg_q_alloc_producer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:33
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 nitems
max size of the ring
Definition: message_queue.h:30