FD.io VPP  v18.01-8-g0eacf49
Vector Packet Processing
client.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31 
32 #include <vpp/api/vpe_msg_enum.h>
33 
34 #include "vppapiclient.h"
35 
36 /*
37  * Asynchronous mode:
38  * Client registers a callback. All messages are sent to the callback.
39  * Synchronous mode:
40  * Client calls blocking read().
41  * Clients are expected to collate events on a queue.
42  * vac_write() -> suspends RX thread
43  * vac_read() -> resumes RX thread
44  */
45 
46 #define vl_typedefs /* define message structures */
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_typedefs
49 
50 #define vl_endianfun /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_endianfun
53 
56 
57 typedef struct {
59  pthread_t rx_thread_handle;
61  pthread_mutex_t queue_lock;
62  pthread_cond_t suspend_cv;
63  pthread_cond_t resume_cv;
64  pthread_mutex_t timeout_lock;
65  pthread_cond_t timeout_cv;
66  pthread_cond_t timeout_cancel_cv;
67  pthread_cond_t terminate_cv;
68 } vac_main_t;
69 
73 bool rx_is_running = false;
74 
75 static void
76 init (void)
77 {
78  vac_main_t *pm = &vac_main;
79  memset(pm, 0, sizeof(*pm));
80  pthread_mutex_init(&pm->queue_lock, NULL);
81  pthread_cond_init(&pm->suspend_cv, NULL);
82  pthread_cond_init(&pm->resume_cv, NULL);
83  pthread_mutex_init(&pm->timeout_lock, NULL);
84  pthread_cond_init(&pm->timeout_cv, NULL);
85  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
86  pthread_cond_init(&pm->terminate_cv, NULL);
87 }
88 
89 static void
90 cleanup (void)
91 {
92  vac_main_t *pm = &vac_main;
93  pthread_cond_destroy(&pm->suspend_cv);
94  pthread_cond_destroy(&pm->resume_cv);
95  pthread_cond_destroy(&pm->timeout_cv);
96  pthread_cond_destroy(&pm->timeout_cancel_cv);
97  pthread_cond_destroy(&pm->terminate_cv);
98  pthread_mutex_destroy(&pm->queue_lock);
99  pthread_mutex_destroy(&pm->timeout_lock);
100  memset (pm, 0, sizeof (*pm));
101 }
102 
103 /*
104  * Satisfy external references when -lvlib is not available.
105  */
106 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
107 {
108  clib_warning ("vlib_cli_output called...");
109 }
110 
111 void
112 vac_free (void * msg)
113 {
114  vl_msg_api_free (msg);
115 }
116 
117 static void
118 vac_api_handler (void *msg)
119 {
120  u16 id = ntohs(*((u16 *)msg));
121  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
122  int l = ntohl(msgbuf->data_len);
123  if (l == 0)
124  clib_warning("Message ID %d has wrong length: %d\n", id, l);
125 
126  /* Call Python callback */
127  ASSERT(vac_callback);
128  (vac_callback)(msg, l);
129  vac_free(msg);
130 }
131 
132 static void *
133 vac_rx_thread_fn (void *arg)
134 {
137  vl_api_memclnt_keepalive_reply_t *rmp;
138  vac_main_t *pm = &vac_main;
139  api_main_t *am = &api_main;
141  uword msg;
142 
143  q = am->vl_input_queue;
144 
145  while (1)
146  while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0))
147  {
148  u16 id = ntohs(*((u16 *)msg));
149  switch (id) {
150  case VL_API_RX_THREAD_EXIT:
151  vl_msg_api_free((void *) msg);
152  /* signal waiting threads that this thread is about to terminate */
153  pthread_mutex_lock(&pm->queue_lock);
154  pthread_cond_signal(&pm->terminate_cv);
155  pthread_mutex_unlock(&pm->queue_lock);
156  pthread_exit(0);
157  return 0;
158  break;
159 
160  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
161  vl_msg_api_free((void * )msg);
162  /* Suspend thread and signal reader */
163  pthread_mutex_lock(&pm->queue_lock);
164  pthread_cond_signal(&pm->suspend_cv);
165  /* Wait for the resume signal */
166  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
167  pthread_mutex_unlock(&pm->queue_lock);
168  break;
169 
170  case VL_API_MEMCLNT_READ_TIMEOUT:
171  clib_warning("Received read timeout in async thread\n");
172  vl_msg_api_free((void *) msg);
173  break;
174 
175  case VL_API_MEMCLNT_KEEPALIVE:
176  mp = (void *)msg;
177  rmp = vl_msg_api_alloc (sizeof (*rmp));
178  memset (rmp, 0, sizeof (*rmp));
179  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
180  rmp->context = mp->context;
181  shmem_hdr = am->shmem_hdr;
182  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
183  vl_msg_api_free((void *) msg);
184  break;
185 
186  default:
187  vac_api_handler((void *)msg);
188  }
189  }
190 }
191 
192 static void *
194 {
196  vac_main_t *pm = &vac_main;
197  api_main_t *am = &api_main;
198  struct timespec ts;
199  struct timeval tv;
200  u16 timeout;
201  int rv;
202 
203  while (1)
204  {
205  /* Wait for poke */
206  pthread_mutex_lock(&pm->timeout_lock);
207  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
208  timeout = read_timeout;
209  gettimeofday(&tv, NULL);
210  ts.tv_sec = tv.tv_sec + timeout;
211  ts.tv_nsec = 0;
212  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
213  &pm->timeout_lock, &ts);
214  pthread_mutex_unlock(&pm->timeout_lock);
215  if (rv == ETIMEDOUT)
216  {
217  ep = vl_msg_api_alloc (sizeof (*ep));
218  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
220  }
221  }
222  pthread_exit(0);
223 }
224 
225 void
227 {
228  api_main_t *am = &api_main;
229  vac_main_t *pm = &vac_main;
231 
232  if (!pm->rx_thread_handle) return;
233  pthread_mutex_lock(&pm->queue_lock);
234  if (rx_is_running)
235  {
236  ep = vl_msg_api_alloc (sizeof (*ep));
237  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
239  /* Wait for RX thread to tell us it has suspendend */
240  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
241  rx_is_running = false;
242  }
243  pthread_mutex_unlock(&pm->queue_lock);
244 }
245 
246 void
248 {
249  vac_main_t *pm = &vac_main;
250  if (!pm->rx_thread_handle) return;
251  pthread_mutex_lock(&pm->queue_lock);
252  if (rx_is_running) goto unlock;
253  pthread_cond_signal(&pm->resume_cv);
254  rx_is_running = true;
255  unlock:
256  pthread_mutex_unlock(&pm->queue_lock);
257 }
258 
259 static uword *
261 {
262  api_main_t *am = &api_main;
263  return (am->msg_index_by_name_and_crc);
264 }
265 
266 int
268 {
269  api_main_t *am = &api_main;
271 }
272 
273 int
274 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
275  int rx_qlen)
276 {
277  int rv = 0;
278  vac_main_t *pm = &vac_main;
279 
280  init();
281  if (chroot_prefix != NULL)
282  vl_set_memory_root_path (chroot_prefix);
283 
284  if ((rv = vl_client_api_map("/vpe-api"))) {
285  clib_warning ("vl_client_api map rv %d", rv);
286  return rv;
287  }
288 
289  if (vl_client_connect(name, 0, rx_qlen) < 0) {
291  return (-1);
292  }
293 
294  if (cb) {
295  /* Start the rx queue thread */
296  rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
297  if (rv) {
298  clib_warning("pthread_create returned %d", rv);
300  return (-1);
301  }
302  vac_callback = cb;
303  rx_is_running = true;
304  }
305 
306  /* Start read timeout thread */
307  rv = pthread_create(&pm->timeout_thread_handle, NULL,
309  if (rv) {
310  clib_warning("pthread_create returned %d", rv);
312  return (-1);
313  }
314 
315  pm->connected_to_vlib = 1;
316 
317  return (0);
318 }
319 
320 int
322 {
323  api_main_t *am = &api_main;
324  vac_main_t *pm = &vac_main;
325 
326  if (!pm->connected_to_vlib) return 0;
327 
328  if (pm->rx_thread_handle) {
330  uword junk;
331  ep = vl_msg_api_alloc (sizeof (*ep));
332  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
334 
335  /* wait (with timeout) until RX thread has finished */
336  struct timespec ts;
337  struct timeval tv;
338  gettimeofday(&tv, NULL);
339  ts.tv_sec = tv.tv_sec + 5;
340  ts.tv_nsec = 0;
341  pthread_mutex_lock(&pm->queue_lock);
342  int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
343  pthread_mutex_unlock(&pm->queue_lock);
344  /* now join so we wait until thread has -really- finished */
345  if (rv == ETIMEDOUT)
346  pthread_cancel(pm->rx_thread_handle);
347  else
348  pthread_join(pm->rx_thread_handle, (void **) &junk);
349  }
350  if (pm->timeout_thread_handle)
351  pthread_cancel(pm->timeout_thread_handle);
352 
355  vac_callback = 0;
356 
357  cleanup();
358 
359  return (0);
360 }
361 
362 static void
363 set_timeout (unsigned short timeout)
364 {
365  vac_main_t *pm = &vac_main;
366  pthread_mutex_lock(&pm->timeout_lock);
367  read_timeout = timeout;
368  pthread_cond_signal(&pm->timeout_cv);
369  pthread_mutex_unlock(&pm->timeout_lock);
370 }
371 
372 static void
374 {
375  vac_main_t *pm = &vac_main;
376  pthread_mutex_lock(&pm->timeout_lock);
377  pthread_cond_signal(&pm->timeout_cancel_cv);
378  pthread_mutex_unlock(&pm->timeout_lock);
379 }
380 
381 int
382 vac_read (char **p, int *l, u16 timeout)
383 {
385  api_main_t *am = &api_main;
386  vac_main_t *pm = &vac_main;
388  vl_api_memclnt_keepalive_reply_t *rmp;
389  uword msg;
390  msgbuf_t *msgbuf;
391  int rv;
393 
394  if (!pm->connected_to_vlib) return -1;
395 
396  *l = 0;
397 
398  if (am->our_pid == 0) return (-1);
399 
400  /* Poke timeout thread */
401  if (timeout)
402  set_timeout(timeout);
403 
404  q = am->vl_input_queue;
405 
406  again:
407  rv = unix_shared_memory_queue_sub(q, (u8 *)&msg, 0);
408  if (rv == 0) {
409  u16 msg_id = ntohs(*((u16 *)msg));
410  switch (msg_id) {
411  case VL_API_RX_THREAD_EXIT:
412  printf("Received thread exit\n");
413  return -1;
414  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
415  printf("Received thread suspend\n");
416  goto error;
417  case VL_API_MEMCLNT_READ_TIMEOUT:
418  printf("Received read timeout %ds\n", timeout);
419  goto error;
420  case VL_API_MEMCLNT_KEEPALIVE:
421  /* Handle an alive-check ping from vpp. */
422  mp = (void *)msg;
423  rmp = vl_msg_api_alloc (sizeof (*rmp));
424  memset (rmp, 0, sizeof (*rmp));
425  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
426  rmp->context = mp->context;
427  shmem_hdr = am->shmem_hdr;
428  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
429  vl_msg_api_free((void *) msg);
430  /*
431  * Python code is blissfully unaware of these pings, so
432  * act as if it never happened...
433  */
434  goto again;
435 
436  default:
437  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
438  *l = ntohl(msgbuf->data_len);
439  if (*l == 0) {
440  printf("Unregistered API message: %d\n", msg_id);
441  goto error;
442  }
443  }
444  *p = (char *)msg;
445 
446  /* Let timeout notification thread know we're done */
447  unset_timeout();
448 
449  } else {
450  printf("Read failed with %d\n", rv);
451  }
452  return (rv);
453 
454  error:
455  vl_msg_api_free((void *) msg);
456  /* Client might forget to resume RX thread on failure */
457  vac_rx_resume ();
458  return -1;
459 }
460 
461 /*
462  * XXX: Makes the assumption that client_index is the first member
463  */
464 typedef VL_API_PACKED(struct _vl_api_header {
465  u16 _vl_msg_id;
466  u32 client_index;
467 }) vl_api_header_t;
468 
469 static unsigned int
470 vac_client_index (void)
471 {
472  return (api_main.my_client_index);
473 }
474 
475 int
476 vac_write (char *p, int l)
477 {
478  int rv = -1;
479  api_main_t *am = &api_main;
480  vl_api_header_t *mp = vl_msg_api_alloc(l);
482  vac_main_t *pm = &vac_main;
483 
484  if (!pm->connected_to_vlib) return -1;
485  if (!mp) return (-1);
486 
487  memcpy(mp, p, l);
488  mp->client_index = vac_client_index();
489  q = am->shmem_hdr->vl_input_queue;
490  rv = unix_shared_memory_queue_add(q, (u8 *)&mp, 0);
491  if (rv != 0) {
492  clib_warning("vpe_api_write fails: %d\n", rv);
493  /* Clear message */
494  vac_free(mp);
495  }
496  return (rv);
497 }
498 
499 int
500 vac_get_msg_index (unsigned char * name)
501 {
502  return vl_api_get_msg_index (name);
503 }
504 
505 int
507 {
508  int max = 0;
509  hash_pair_t *hp;
511  hash_foreach_pair (hp, h,
512  ({
513  if (hp->value[0] > max)
514  max = hp->value[0];
515  }));
516 
517  return max;
518 }
519 
520 void
522 {
523  if (cb) clib_error_register_handler (cb, 0);
524 }
void vac_rx_resume(void)
Definition: client.c:247
int vac_get_msg_index(unsigned char *name)
Definition: client.c:500
void vl_client_disconnect(void)
pthread_t timeout_thread_handle
Definition: client.c:60
unix_shared_memory_queue_t * vl_input_queue
Definition: api_common.h:68
int my_client_index
All VLIB-side message handlers use my_client_index to identify the queue / client.
Definition: api_common.h:306
#define NULL
Definition: clib.h:55
typedef VL_API_PACKED(struct _vl_api_header{u16 _vl_msg_id;u32 client_index;})
Definition: client.c:464
static void set_timeout(unsigned short timeout)
Definition: client.c:363
pthread_t rx_thread_handle
Definition: client.c:59
int vac_msg_table_max_index(void)
Definition: client.c:506
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: client.c:106
int vac_write(char *p, int l)
Definition: client.c:476
void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
bool rx_is_running
Definition: client.c:73
unix_shared_memory_queue_t * vl_input_queue
Peer input queue pointer.
Definition: api_common.h:300
uword value[0]
Definition: hash.h:164
int our_pid
Current process PID.
Definition: api_common.h:248
u8 connected_to_vlib
Definition: client.c:58
pthread_cond_t timeout_cancel_cv
Definition: client.c:66
int vl_client_connect(const char *name, int ctx_quota, int input_queue_size)
pthread_mutex_t queue_lock
Definition: client.c:61
vlib_main_t ** vlib_mains
Definition: client.c:55
void vl_client_api_unmap(void)
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:261
void vac_rx_suspend(void)
Definition: client.c:226
void * vl_msg_api_alloc(int nbytes)
u32 vl_api_get_msg_index(u8 *name_and_crc)
vac_callback_t vac_callback
Definition: client.c:71
static void unset_timeout(void)
Definition: client.c:373
vlib_main_t vlib_global_main
Definition: client.c:54
vl_shmem_hdr_t * shmem_hdr
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
int vac_connect(char *name, char *chroot_prefix, vac_callback_t cb, int rx_qlen)
Definition: client.c:274
int vac_disconnect(void)
Definition: client.c:321
void(* vac_error_callback_t)(void *, unsigned char *, int)
Definition: vppapiclient.h:21
void vl_msg_api_free(void *)
static void cleanup(void)
Definition: client.c:90
vac_main_t vac_main
Definition: client.c:70
static void * vac_timeout_thread_fn(void *arg)
Definition: client.c:193
u16 read_timeout
Definition: client.c:72
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:198
int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
api_main_t api_main
Definition: api_shared.c:35
vlib_main_t * vm
Definition: buffer.c:283
vec_header_t h
Definition: buffer.c:282
#define clib_warning(format, args...)
Definition: error.h:59
pthread_cond_t suspend_cv
Definition: client.c:62
static uword * vac_msg_table_get_hash(void)
Definition: client.c:260
void vl_msg_api_send_shmem(unix_shared_memory_queue_t *q, u8 *elem)
static uword hash_elts(void *v)
Definition: hash.h:117
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u32 data_len
message length not including header
Definition: api_common.h:137
Message header structure.
Definition: api_common.h:134
int vac_read(char **p, int *l, u16 timeout)
Definition: client.c:382
static void init(void)
Definition: client.c:76
pthread_cond_t terminate_cv
Definition: client.c:67
u64 uword
Definition: types.h:112
pthread_cond_t timeout_cv
Definition: client.c:65
unsigned short u16
Definition: types.h:57
unsigned char u8
Definition: types.h:56
pthread_cond_t resume_cv
Definition: client.c:63
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:372
void vac_set_error_handler(vac_error_callback_t cb)
Definition: client.c:521
int vac_msg_table_size(void)
Definition: client.c:267
void(* vac_callback_t)(unsigned char *data, int len)
Definition: vppapiclient.h:20
pthread_mutex_t timeout_lock
Definition: client.c:64
void vac_free(void *msg)
Definition: client.c:112
static void vac_api_handler(void *msg)
Definition: client.c:118
int vl_client_api_map(const char *region_name)
void vl_set_memory_root_path(const char *root_path)
uword * msg_index_by_name_and_crc
client message index hash table
Definition: api_common.h:321
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static void * vac_rx_thread_fn(void *arg)
Definition: client.c:133