FD.io VPP  v19.08.2-294-g37e99c22d
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 #include <sys/eventfd.h>
19 
20 static inline svm_msg_q_ring_t *
22 {
23  return vec_elt_at_index (mq->rings, ring_index);
24 }
25 
27 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
28 {
29  return svm_msg_q_ring_inline (mq, ring_index);
30 }
31 
32 static inline void *
34 {
35  ASSERT (elt_index < ring->nitems);
36  return (ring->data + elt_index * ring->elsize);
37 }
38 
41 {
42  svm_msg_q_ring_cfg_t *ring_cfg;
43  uword rings_sz = 0, mq_sz;
44  svm_msg_q_ring_t *ring;
45  u8 *base, *rings_ptr;
46  vec_header_t *vh;
47  u32 vec_sz, q_sz;
48  svm_msg_q_t *mq;
49  int i;
50 
51  ASSERT (cfg);
52 
53  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
54  for (i = 0; i < cfg->n_rings; i++)
55  {
56  if (cfg->ring_cfgs[i].data)
57  continue;
58  ring_cfg = &cfg->ring_cfgs[i];
59  rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
60  }
61 
62  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
63  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
65  if (!base)
66  return 0;
67 
68  mq = (svm_msg_q_t *) base;
69  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
70  sizeof (svm_msg_q_msg_t));
71  mq->q->consumer_pid = cfg->consumer_pid;
72  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
73  vh->len = cfg->n_rings;
74  mq->rings = (svm_msg_q_ring_t *) (vh + 1);
75  rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
76  for (i = 0; i < cfg->n_rings; i++)
77  {
78  ring = &mq->rings[i];
79  ring->elsize = cfg->ring_cfgs[i].elsize;
80  ring->nitems = cfg->ring_cfgs[i].nitems;
81  ring->cursize = ring->head = ring->tail = 0;
82  if (cfg->ring_cfgs[i].data)
83  ring->data = cfg->ring_cfgs[i].data;
84  else
85  {
86  ring->data = rings_ptr;
87  rings_ptr += (uword) ring->nitems * ring->elsize;
88  }
89  }
90 
91  return mq;
92 }
93 
94 void
96 {
97  svm_queue_free (mq->q);
98  clib_mem_free (mq);
99 }
100 
103 {
104  svm_msg_q_msg_t msg;
105  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
106 
107  ASSERT (ring->cursize < ring->nitems);
108  msg.ring_index = ring - mq->rings;
109  msg.elt_index = ring->tail;
110  ring->tail = (ring->tail + 1) % ring->nitems;
111  clib_atomic_fetch_add (&ring->cursize, 1);
112  return msg;
113 }
114 
115 int
117  u8 noblock, svm_msg_q_msg_t * msg)
118 {
119  if (noblock)
120  {
121  if (svm_msg_q_try_lock (mq))
122  return -1;
124  || svm_msg_q_ring_is_full (mq, ring_index)))
125  {
126  svm_msg_q_unlock (mq);
127  return -2;
128  }
129  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
130  }
131  else
132  {
133  svm_msg_q_lock (mq);
134  while (svm_msg_q_is_full (mq)
135  || svm_msg_q_ring_is_full (mq, ring_index))
136  svm_msg_q_wait (mq);
137  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
138  }
139  return 0;
140 }
141 
144 {
145  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
146  svm_msg_q_ring_t *ring;
147 
148  vec_foreach (ring, mq->rings)
149  {
150  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
151  continue;
152  msg.ring_index = ring - mq->rings;
153  msg.elt_index = ring->tail;
154  ring->tail = (ring->tail + 1) % ring->nitems;
155  clib_atomic_fetch_add (&ring->cursize, 1);
156  break;
157  }
158  return msg;
159 }
160 
161 void *
163 {
165  return svm_msg_q_ring_data (ring, msg->elt_index);
166 }
167 
168 void
170 {
171  svm_msg_q_ring_t *ring;
172  int need_signal;
173 
174  ASSERT (vec_len (mq->rings) > msg->ring_index);
175  ring = &mq->rings[msg->ring_index];
176  if (msg->elt_index == ring->head)
177  {
178  ring->head = (ring->head + 1) % ring->nitems;
179  }
180  else
181  {
182  clib_warning ("message out of order");
183  /* for now, expect messages to be processed in order */
184  ASSERT (0);
185  }
186 
187  need_signal = ring->cursize == ring->nitems;
188  clib_atomic_fetch_sub (&ring->cursize, 1);
189 
190  if (PREDICT_FALSE (need_signal))
191  svm_queue_send_signal (mq->q, 0);
192 }
193 
194 static int
196 {
197  u32 dist1, dist2, tail, head;
198  svm_msg_q_ring_t *ring;
199 
200  if (vec_len (mq->rings) <= msg->ring_index)
201  return 0;
202  ring = &mq->rings[msg->ring_index];
203  tail = ring->tail;
204  head = ring->head;
205 
206  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
207  if (tail == head)
208  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
209  else
210  dist2 = ((ring->nitems + tail) - head) % ring->nitems;
211  return (dist1 < dist2);
212 }
213 
214 int
215 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
216 {
217  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
218  return svm_queue_add (mq->q, (u8 *) msg, nowait);
219 }
220 
221 void
223 {
224  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
225  svm_queue_add_raw (mq->q, (u8 *) msg);
226  svm_msg_q_unlock (mq);
227 }
228 
229 int
231  svm_q_conditional_wait_t cond, u32 time)
232 {
233  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
234 }
235 
236 void
238 {
239  svm_queue_sub_raw (mq->q, (u8 *) msg);
240 }
241 
242 void
244 {
245  mq->q->consumer_evtfd = fd;
246 }
247 
248 void
250 {
251  mq->q->producer_evtfd = fd;
252 }
253 
254 int
256 {
257  int fd;
258  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
259  return -1;
261  return 0;
262 }
263 
264 int
266 {
267  int fd;
268  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
269  return -1;
271  return 0;
272 }
273 
274 /*
275  * fd.io coding-style-patch-verification: ON
276  *
277  * Local Variables:
278  * eval: (c-set-style "gnu")
279  * End:
280  */
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:228
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:247
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
#define clib_atomic_fetch_sub(a, b)
Definition: atomics.h:24
for(i=1;i<=collision_buckets;i++)
int i
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
unsigned char u8
Definition: types.h:56
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:21
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:356
static uword vec_header_bytes(uword header_bytes)
Definition: vec_bootstrap.h:80
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:40
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:95
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
void svm_msg_q_set_producer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue producer.
#define PREDICT_FALSE(x)
Definition: clib.h:112
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:27
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
static void svm_msg_q_wait(svm_msg_q_t *mq)
Wait for message queue event.
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_ svm_msg_q_ring_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:40
Unidirectional shared-memory multi-ring message queue.
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
u32 len
Number of elements in vector (NOT its allocated length).
Definition: vec_bootstrap.h:60
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static void clib_mem_free(void *p)
Definition: mem.h:226
vector header structure
Definition: vec_bootstrap.h:55
struct svm_msg_q_ svm_msg_q_t
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:451
void svm_msg_q_set_consumer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue consumer.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
#define clib_atomic_fetch_add(a, b)
Definition: atomics.h:23
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:161
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
int svm_msg_q_alloc_producer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:33
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
Definition: queue.c:134
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 nitems
max size of the ring
Definition: message_queue.h:30