FD.io VPP  v19.08.3-2-gbabecb413
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25 
27 
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30  session_evt_type_t evt_type)
31 {
32  session_event_t *evt;
33  svm_msg_q_msg_t msg;
34  svm_msg_q_t *mq;
35 
36  mq = session_main_get_vpp_event_queue (thread_index);
37  if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38  return -1;
41  {
42  svm_msg_q_unlock (mq);
43  return -2;
44  }
45  switch (evt_type)
46  {
49  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50  evt->rpc_args.fp = data;
51  evt->rpc_args.arg = args;
52  break;
53  case SESSION_IO_EVT_RX:
54  case SESSION_IO_EVT_TX:
58  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59  evt->session_index = *(u32 *) data;
60  break;
65  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66  evt->session_handle = session_handle ((session_t *) data);
67  break;
68  default:
69  clib_warning ("evt unhandled!");
70  svm_msg_q_unlock (mq);
71  return -1;
72  }
73  evt->event_type = evt_type;
74 
75  svm_msg_q_add_and_unlock (mq, &msg);
76  return 0;
77 }
78 
79 int
81 {
82  return session_send_evt_to_thread (&f->master_session_index, 0,
83  f->master_thread_index, evt_type);
84 }
85 
86 int
88  session_evt_type_t evt_type)
89 {
90  return session_send_evt_to_thread (data, 0, thread_index, evt_type);
91 }
92 
93 int
95 {
96  /* only events supported are disconnect and reset */
97  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
98  || evt_type == SESSION_CTRL_EVT_RESET);
99  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
100 }
101 
102 void
104  void *rpc_args)
105 {
106  session_send_evt_to_thread (fp, rpc_args, thread_index,
108 }
109 
110 void
111 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
112 {
113  if (thread_index != vlib_get_thread_index ())
114  session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
115  else
116  {
117  void (*fnp) (void *) = fp;
118  fnp (rpc_args);
119  }
120 }
121 
122 void
124 {
125  session_t *s;
126 
127  s = session_get (tc->s_index, tc->thread_index);
129  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
130  if (!(s->flags & SESSION_F_CUSTOM_TX))
131  {
133  if (svm_fifo_set_event (s->tx_fifo))
134  {
135  session_worker_t *wrk;
136  session_evt_elt_t *elt;
137  wrk = session_main_get_worker (tc->thread_index);
138  if (has_prio)
139  elt = session_evt_alloc_new (wrk);
140  else
141  elt = session_evt_alloc_old (wrk);
142  elt->evt.session_index = tc->s_index;
143  elt->evt.event_type = SESSION_IO_EVT_TX;
144  }
145  }
146 }
147 
148 static void
150 {
151  u32 thread_index = vlib_get_thread_index ();
152  session_evt_elt_t *elt;
153  session_worker_t *wrk;
154 
155  /* If we are in the handler thread, or being called with the worker barrier
156  * held, just append a new event to pending disconnects vector. */
157  if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
158  {
160  elt = session_evt_alloc_ctrl (wrk);
161  clib_memset (&elt->evt, 0, sizeof (session_event_t));
162  elt->evt.session_handle = session_handle (s);
163  elt->evt.event_type = evt;
164  }
165  else
167 }
168 
169 session_t *
170 session_alloc (u32 thread_index)
171 {
172  session_worker_t *wrk = &session_main.wrk[thread_index];
173  session_t *s;
174  u8 will_expand = 0;
175  pool_get_aligned_will_expand (wrk->sessions, will_expand,
177  /* If we have peekers, let them finish */
178  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
179  {
183  }
184  else
185  {
187  }
188  clib_memset (s, 0, sizeof (*s));
189  s->session_index = s - wrk->sessions;
190  s->thread_index = thread_index;
192  return s;
193 }
194 
195 void
197 {
198  if (CLIB_DEBUG)
199  {
200  u8 thread_index = s->thread_index;
201  clib_memset (s, 0xFA, sizeof (*s));
202  pool_put (session_main.wrk[thread_index].sessions, s);
203  return;
204  }
205  SESSION_EVT (SESSION_EVT_FREE, s);
206  pool_put (session_main.wrk[s->thread_index].sessions, s);
207 }
208 
209 u8
210 session_is_valid (u32 si, u8 thread_index)
211 {
212  session_t *s;
214 
215  s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
216 
217  if (!s)
218  return 1;
219 
220  if (s->thread_index != thread_index || s->session_index != si)
221  return 0;
222 
223  if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
224  || s->session_state <= SESSION_STATE_LISTENING)
225  return 1;
226 
227  tc = session_get_transport (s);
228  if (s->connection_index != tc->c_index
229  || s->thread_index != tc->thread_index || tc->s_index != si)
230  return 0;
231 
232  return 1;
233 }
234 
235 static void
237 {
238  app_worker_t *app_wrk;
239 
240  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
241  if (!app_wrk)
242  return;
243  app_worker_cleanup_notify (app_wrk, s, ntf);
244 }
245 
246 void
248 {
251  session_free (s);
252 }
253 
254 /**
255  * Cleans up session and lookup table.
256  *
257  * Transport connection must still be valid.
258  */
259 static void
261 {
262  int rv;
263 
264  /* Delete from the main lookup table. */
265  if ((rv = session_lookup_del_session (s)))
266  clib_warning ("session %u hash delete rv %d", s->session_index, rv);
267 
269 }
270 
271 static session_t *
273 {
274  session_t *s;
275  u32 thread_index = tc->thread_index;
276 
277  ASSERT (thread_index == vlib_get_thread_index ()
278  || transport_protocol_is_cl (tc->proto));
279 
280  s = session_alloc (thread_index);
281  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
282  s->session_state = SESSION_STATE_CLOSED;
283 
284  /* Attach transport to session and vice versa */
285  s->connection_index = tc->c_index;
286  tc->s_index = s->session_index;
287  return s;
288 }
289 
290 /**
291  * Discards bytes from buffer chain
292  *
293  * It discards n_bytes_to_drop starting at first buffer after chain_b
294  */
295 always_inline void
297  vlib_buffer_t ** chain_b,
298  u32 n_bytes_to_drop)
299 {
300  vlib_buffer_t *next = *chain_b;
301  u32 to_drop = n_bytes_to_drop;
302  ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
303  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
304  {
305  next = vlib_get_buffer (vm, next->next_buffer);
306  if (next->current_length > to_drop)
307  {
308  vlib_buffer_advance (next, to_drop);
309  to_drop = 0;
310  }
311  else
312  {
313  to_drop -= next->current_length;
314  next->current_length = 0;
315  }
316  }
317  *chain_b = next;
318 
319  if (to_drop == 0)
320  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
321 }
322 
323 /**
324  * Enqueue buffer chain tail
325  */
326 always_inline int
328  u32 offset, u8 is_in_order)
329 {
330  vlib_buffer_t *chain_b;
331  u32 chain_bi, len, diff;
333  u8 *data;
334  u32 written = 0;
335  int rv = 0;
336 
337  if (is_in_order && offset)
338  {
339  diff = offset - b->current_length;
341  return 0;
342  chain_b = b;
343  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
344  chain_bi = vlib_get_buffer_index (vm, chain_b);
345  }
346  else
347  chain_bi = b->next_buffer;
348 
349  do
350  {
351  chain_b = vlib_get_buffer (vm, chain_bi);
352  data = vlib_buffer_get_current (chain_b);
353  len = chain_b->current_length;
354  if (!len)
355  continue;
356  if (is_in_order)
357  {
358  rv = svm_fifo_enqueue (s->rx_fifo, len, data);
359  if (rv == len)
360  {
361  written += rv;
362  }
363  else if (rv < len)
364  {
365  return (rv > 0) ? (written + rv) : written;
366  }
367  else if (rv > len)
368  {
369  written += rv;
370 
371  /* written more than what was left in chain */
373  return written;
374 
375  /* drop the bytes that have already been delivered */
376  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
377  }
378  }
379  else
380  {
381  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
382  if (rv)
383  {
384  clib_warning ("failed to enqueue multi-buffer seg");
385  return -1;
386  }
387  offset += len;
388  }
389  }
390  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
391  ? chain_b->next_buffer : 0));
392 
393  if (is_in_order)
394  return written;
395 
396  return 0;
397 }
398 
399 /*
400  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
401  * event but on request can queue notification events for later delivery by
402  * calling stream_server_flush_enqueue_events().
403  *
404  * @param tc Transport connection which is to be enqueued data
405  * @param b Buffer to be enqueued
406  * @param offset Offset at which to start enqueueing if out-of-order
407  * @param queue_event Flag to indicate if peer is to be notified or if event
408  * is to be queued. The former is useful when more data is
409  * enqueued and only one event is to be generated.
410  * @param is_in_order Flag to indicate if data is in order
411  * @return Number of bytes enqueued or a negative value if enqueueing failed.
412  */
413 int
415  vlib_buffer_t * b, u32 offset,
416  u8 queue_event, u8 is_in_order)
417 {
418  session_t *s;
419  int enqueued = 0, rv, in_order_off;
420 
421  s = session_get (tc->s_index, tc->thread_index);
422 
423  if (is_in_order)
424  {
425  enqueued = svm_fifo_enqueue (s->rx_fifo,
426  b->current_length,
428  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
429  && enqueued >= 0))
430  {
431  in_order_off = enqueued > b->current_length ? enqueued : 0;
432  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
433  if (rv > 0)
434  enqueued += rv;
435  }
436  }
437  else
438  {
439  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
440  b->current_length,
442  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
443  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
444  /* if something was enqueued, report even this as success for ooo
445  * segment handling */
446  return rv;
447  }
448 
449  if (queue_event)
450  {
451  /* Queue RX event on this fifo. Eventually these will need to be flushed
452  * by calling stream_server_flush_enqueue_events () */
453  session_worker_t *wrk;
454 
456  if (!(s->flags & SESSION_F_RX_EVT))
457  {
458  s->flags |= SESSION_F_RX_EVT;
459  vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
460  }
461  }
462 
463  return enqueued;
464 }
465 
466 int
468  session_dgram_hdr_t * hdr,
469  vlib_buffer_t * b, u8 proto, u8 queue_event)
470 {
471  int enqueued = 0, rv, in_order_off;
472 
474  >= b->current_length + sizeof (*hdr));
475 
476  svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
477  enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
479  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
480  {
481  in_order_off = enqueued > b->current_length ? enqueued : 0;
482  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
483  if (rv > 0)
484  enqueued += rv;
485  }
486  if (queue_event)
487  {
488  /* Queue RX event on this fifo. Eventually these will need to be flushed
489  * by calling stream_server_flush_enqueue_events () */
490  session_worker_t *wrk;
491 
493  if (!(s->flags & SESSION_F_RX_EVT))
494  {
495  s->flags |= SESSION_F_RX_EVT;
496  vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
497  }
498  }
499  return enqueued;
500 }
501 
502 int
504  u32 offset, u32 max_bytes)
505 {
506  session_t *s = session_get (tc->s_index, tc->thread_index);
507  return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
508 }
509 
510 u32
512 {
513  session_t *s = session_get (tc->s_index, tc->thread_index);
514  u32 rv;
515 
516  rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
517 
518  if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
520 
521  return rv;
522 }
523 
524 static inline int
526  svm_fifo_t * f, session_evt_type_t evt_type)
527 {
528  app_worker_t *app_wrk;
529  application_t *app;
530  int i;
531 
532  app = application_get (app_index);
533  if (!app)
534  return -1;
535 
536  for (i = 0; i < f->n_subscribers; i++)
537  {
538  app_wrk = application_get_worker (app, f->subscribers[i]);
539  if (!app_wrk)
540  continue;
541  if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
542  return -1;
543  }
544 
545  return 0;
546 }
547 
548 /**
549  * Notify session peer that new data has been enqueued.
550  *
551  * @param s Stream session for which the event is to be generated.
552  * @param lock Flag to indicate if call should lock message queue.
553  *
554  * @return 0 on success or negative number if failed to send notification.
555  */
556 static inline int
558 {
559  app_worker_t *app_wrk;
560  u32 session_index;
561  u8 n_subscribers;
562 
563  session_index = s->session_index;
564  n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
565 
566  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
567  if (PREDICT_FALSE (!app_wrk))
568  {
569  SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
570  return 0;
571  }
572 
573  SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
574 
575  s->flags &= ~SESSION_F_RX_EVT;
576 
577  /* Application didn't confirm accept yet */
578  if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
579  return 0;
580 
583  return -1;
584 
585  if (PREDICT_FALSE (n_subscribers))
586  {
587  s = session_get (session_index, vlib_get_thread_index ());
588  return session_notify_subscribers (app_wrk->app_index, s,
590  }
591 
592  return 0;
593 }
594 
595 int
597 {
599 }
600 
601 static void
603 {
604  u32 session_index = pointer_to_uword (arg);
605  session_t *s;
606 
607  s = session_get_if_valid (session_index, vlib_get_thread_index ());
608  if (!s)
609  return;
610 
612 }
613 
614 /**
615  * Like session_enqueue_notify, but can be called from a thread that does not
616  * own the session.
617  */
618 void
620 {
621  u32 thread_index = session_thread_from_handle (sh);
622  u32 session_index = session_index_from_handle (sh);
623 
624  /*
625  * Pass session index (u32) as opposed to handle (u64) in case pointers
626  * are not 64-bit.
627  */
628  session_send_rpc_evt_to_thread (thread_index,
630  uword_to_pointer (session_index, void *));
631 }
632 
633 int
635 {
636  app_worker_t *app_wrk;
637 
639 
640  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
641  if (PREDICT_FALSE (!app_wrk))
642  return -1;
643 
646  return -1;
647 
648  if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
649  return session_notify_subscribers (app_wrk->app_index, s,
651 
652  return 0;
653 }
654 
655 /**
656  * Flushes queue of sessions that are to be notified of new data
657  * enqueued events.
658  *
659  * @param thread_index Thread index for which the flush is to be performed.
660  * @return 0 on success or a positive number indicating the number of
661  * failures due to API queue being full.
662  */
663 int
664 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
665 {
666  session_worker_t *wrk = session_main_get_worker (thread_index);
667  session_t *s;
668  int i, errors = 0;
669  u32 *indices;
670 
671  indices = wrk->session_to_enqueue[transport_proto];
672 
673  for (i = 0; i < vec_len (indices); i++)
674  {
675  s = session_get_if_valid (indices[i], thread_index);
676  if (PREDICT_FALSE (!s))
677  {
678  errors++;
679  continue;
680  }
681 
683  errors++;
684  }
685 
686  vec_reset_length (indices);
687  wrk->session_to_enqueue[transport_proto] = indices;
688 
689  return errors;
690 }
691 
692 int
694 {
696  int i, errors = 0;
697  for (i = 0; i < 1 + vtm->n_threads; i++)
698  errors += session_main_flush_enqueue_events (transport_proto, i);
699  return errors;
700 }
701 
702 static inline int
704  session_state_t opened_state)
705 {
706  u32 opaque = 0, new_ti, new_si;
707  app_worker_t *app_wrk;
708  session_t *s = 0;
709  u64 ho_handle;
710 
711  /*
712  * Find connection handle and cleanup half-open table
713  */
714  ho_handle = session_lookup_half_open_handle (tc);
715  if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
716  {
717  SESSION_DBG ("half-open was removed!");
718  return -1;
719  }
721 
722  /* Get the app's index from the handle we stored when opening connection
723  * and the opaque (api_context for external apps) from transport session
724  * index */
725  app_wrk = app_worker_get_if_valid (ho_handle >> 32);
726  if (!app_wrk)
727  return -1;
728 
729  opaque = tc->s_index;
730 
731  if (is_fail)
732  return app_worker_connect_notify (app_wrk, s, opaque);
733 
735  s->session_state = SESSION_STATE_CONNECTING;
736  s->app_wrk_index = app_wrk->wrk_index;
737  new_si = s->session_index;
738  new_ti = s->thread_index;
739 
740  if (app_worker_init_connected (app_wrk, s))
741  {
742  session_free (s);
743  app_worker_connect_notify (app_wrk, 0, opaque);
744  return -1;
745  }
746 
747  s = session_get (new_si, new_ti);
748  s->session_state = opened_state;
750 
751  if (app_worker_connect_notify (app_wrk, s, opaque))
752  {
754  /* Avoid notifying app about rejected session cleanup */
755  s = session_get (new_si, new_ti);
757  session_free (s);
758  return -1;
759  }
760 
761  return 0;
762 }
763 
764 int
766 {
767  return session_stream_connect_notify_inline (tc, is_fail,
768  SESSION_STATE_READY);
769 }
770 
771 int
773 {
774  return session_stream_connect_notify_inline (tc, is_fail,
775  SESSION_STATE_OPENED);
776 }
777 
778 typedef struct _session_switch_pool_args
779 {
780  u32 session_index;
781  u32 thread_index;
782  u32 new_thread_index;
783  u32 new_session_index;
785 
786 /**
787  * Notify old thread of the session pool switch
788  */
789 static void
790 session_switch_pool (void *cb_args)
791 {
793  app_worker_t *app_wrk;
794  session_t *s;
795 
796  ASSERT (args->thread_index == vlib_get_thread_index ());
797  s = session_get (args->session_index, args->thread_index);
798  s->tx_fifo->master_session_index = args->new_session_index;
799  s->tx_fifo->master_thread_index = args->new_thread_index;
801  s->thread_index);
802 
803  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
804  if (app_wrk)
805  {
806  session_handle_t new_sh;
807  new_sh = session_make_handle (args->new_session_index,
808  args->new_thread_index);
809  app_worker_migrate_notify (app_wrk, s, new_sh);
810 
811  /* Trigger app read on the new thread */
813  }
814 
815  session_free (s);
816  clib_mem_free (cb_args);
817 }
818 
819 /**
820  * Move dgram session to the right thread
821  */
822 int
824  u32 old_thread_index, session_t ** new_session)
825 {
826  session_t *new_s;
827  session_switch_pool_args_t *rpc_args;
828 
829  /*
830  * Clone half-open session to the right thread.
831  */
832  new_s = session_clone_safe (tc->s_index, old_thread_index);
833  new_s->connection_index = tc->c_index;
834  new_s->rx_fifo->master_session_index = new_s->session_index;
835  new_s->rx_fifo->master_thread_index = new_s->thread_index;
836  new_s->session_state = SESSION_STATE_READY;
838 
839  /*
840  * Ask thread owning the old session to clean it up and make us the tx
841  * fifo owner
842  */
843  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
844  rpc_args->new_session_index = new_s->session_index;
845  rpc_args->new_thread_index = new_s->thread_index;
846  rpc_args->session_index = tc->s_index;
847  rpc_args->thread_index = old_thread_index;
848  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
849  rpc_args);
850 
851  tc->s_index = new_s->session_index;
852  new_s->connection_index = tc->c_index;
853  *new_session = new_s;
854  return 0;
855 }
856 
857 /**
858  * Notification from transport that connection is being closed.
859  *
860  * A disconnect is sent to application but state is not removed. Once
861  * disconnect is acknowledged by application, session disconnect is called.
862  * Ultimately this leads to close being called on transport (passive close).
863  */
864 void
866 {
867  app_worker_t *app_wrk;
868  session_t *s;
869 
870  s = session_get (tc->s_index, tc->thread_index);
871  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
872  return;
873  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
874  app_wrk = app_worker_get (s->app_wrk_index);
875  app_worker_close_notify (app_wrk, s);
876 }
877 
878 /**
879  * Notification from transport that connection is being deleted
880  *
881  * This removes the session if it is still valid. It should be called only on
882  * previously fully established sessions. For instance failed connects should
883  * call stream_session_connect_notify and indicate that the connect has
884  * failed.
885  */
886 void
888 {
889  session_t *s;
890 
891  /* App might've been removed already */
892  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
893  return;
894 
895  switch (s->session_state)
896  {
897  case SESSION_STATE_CREATED:
898  /* Session was created but accept notification was not yet sent to the
899  * app. Cleanup everything. */
902  session_free (s);
903  break;
904  case SESSION_STATE_ACCEPTING:
905  case SESSION_STATE_TRANSPORT_CLOSING:
906  case SESSION_STATE_CLOSING:
907  case SESSION_STATE_TRANSPORT_CLOSED:
908  /* If transport finishes or times out before we get a reply
909  * from the app, mark transport as closed and wait for reply
910  * before removing the session. Cleanup session table in advance
911  * because transport will soon be closed and closed sessions
912  * are assumed to have been removed from the lookup table */
914  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
917  break;
918  case SESSION_STATE_APP_CLOSED:
919  /* Cleanup lookup table as transport needs to still be valid.
920  * Program transport close to ensure that all session events
921  * have been cleaned up. Once transport close is called, the
922  * session is just removed because both transport and app have
923  * confirmed the close*/
925  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
929  break;
930  case SESSION_STATE_TRANSPORT_DELETED:
931  break;
932  case SESSION_STATE_CLOSED:
934  session_delete (s);
935  break;
936  default:
937  clib_warning ("session state %u", s->session_state);
939  session_delete (s);
940  break;
941  }
942 }
943 
944 /**
945  * Notification from transport that it is closed
946  *
947  * Should be called by transport, prior to calling delete notify, once it
948  * knows that no more data will be exchanged. This could serve as an
949  * early acknowledgment of an active close especially if transport delete
950  * can be delayed a long time, e.g., tcp time-wait.
951  */
952 void
954 {
955  app_worker_t *app_wrk;
956  session_t *s;
957 
958  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
959  return;
960 
961  /* Transport thinks that app requested close but it actually didn't.
962  * Can happen for tcp if fin and rst are received in close succession. */
963  if (s->session_state == SESSION_STATE_READY)
964  {
967  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
968  }
969  /* If app close has not been received or has not yet resulted in
970  * a transport close, only mark the session transport as closed */
971  else if (s->session_state <= SESSION_STATE_CLOSING)
972  {
973  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
974  }
975  /* If app also closed, switch to closed */
976  else if (s->session_state == SESSION_STATE_APP_CLOSED)
977  s->session_state = SESSION_STATE_CLOSED;
978 
979  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
980  if (app_wrk)
982 }
983 
984 /**
985  * Notify application that connection has been reset.
986  */
987 void
989 {
990  app_worker_t *app_wrk;
991  session_t *s;
992 
993  s = session_get (tc->s_index, tc->thread_index);
995  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
996  return;
997  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
998  app_wrk = app_worker_get (s->app_wrk_index);
999  app_worker_reset_notify (app_wrk, s);
1000 }
1001 
1002 int
1004 {
1005  app_worker_t *app_wrk;
1006  session_t *s;
1007 
1008  s = session_get (tc->s_index, tc->thread_index);
1009  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1010  if (!app_wrk)
1011  return -1;
1012  s->session_state = SESSION_STATE_ACCEPTING;
1013  return app_worker_accept_notify (app_wrk, s);
1014 }
1015 
1016 /**
1017  * Accept a stream session. Optionally ping the server by callback.
1018  */
1019 int
1021  u32 thread_index, u8 notify)
1022 {
1023  session_t *s;
1024  int rv;
1025 
1027  s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1028  s->session_state = SESSION_STATE_CREATED;
1029 
1030  if ((rv = app_worker_init_accepted (s)))
1031  return rv;
1032 
1034 
1035  /* Shoulder-tap the server */
1036  if (notify)
1037  {
1038  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1039  return app_worker_accept_notify (app_wrk, s);
1040  }
1041 
1042  return 0;
1043 }
1044 
1045 int
1046 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1047 {
1050  app_worker_t *app_wrk;
1051  session_handle_t sh;
1052  session_t *s;
1053  int rv;
1054 
1056  rv = transport_connect (rmt->transport_proto, tep);
1057  if (rv < 0)
1058  {
1059  SESSION_DBG ("Transport failed to open connection.");
1060  return VNET_API_ERROR_SESSION_CONNECT;
1061  }
1062 
1063  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1064 
1065  /* For dgram type of service, allocate session and fifos now */
1066  app_wrk = app_worker_get (app_wrk_index);
1068  s->app_wrk_index = app_wrk->wrk_index;
1069  s->session_state = SESSION_STATE_OPENED;
1070  if (app_worker_init_connected (app_wrk, s))
1071  {
1072  session_free (s);
1073  return -1;
1074  }
1075 
1076  sh = session_handle (s);
1078  return app_worker_connect_notify (app_wrk, s, opaque);
1079 }
1080 
1081 int
1082 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1083 {
1086  u64 handle;
1087  int rv;
1088 
1090  rv = transport_connect (rmt->transport_proto, tep);
1091  if (rv < 0)
1092  {
1093  SESSION_DBG ("Transport failed to open connection.");
1094  return VNET_API_ERROR_SESSION_CONNECT;
1095  }
1096 
1097  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1098 
1099  /* If transport offers a stream service, only allocate session once the
1100  * connection has been established.
1101  * Add connection to half-open table and save app and tc index. The
1102  * latter is needed to help establish the connection while the former
1103  * is needed when the connect notify comes and we have to notify the
1104  * external app
1105  */
1106  handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
1107  session_lookup_add_half_open (tc, handle);
1108 
1109  /* Store api_context (opaque) for when the reply comes. Not the nicest
1110  * thing but better than allocating a separate half-open pool.
1111  */
1112  tc->s_index = opaque;
1113  if (transport_half_open_has_fifos (rmt->transport_proto))
1114  return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
1115  return 0;
1116 }
1117 
1118 int
1119 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1120 {
1123 
1124  sep->app_wrk_index = app_wrk_index;
1125  sep->opaque = opaque;
1126 
1127  return transport_connect (rmt->transport_proto, tep_cfg);
1128 }
1129 
1131 
1132 /* *INDENT-OFF* */
1137 };
1138 /* *INDENT-ON* */
1139 
1140 /**
1141  * Ask transport to open connection to remote transport endpoint.
1142  *
1143  * Stores handle for matching request with reply since the call can be
1144  * asynchronous. For instance, for TCP the 3-way handshake must complete
1145  * before reply comes. Session is only created once connection is established.
1146  *
1147  * @param app_index Index of the application requesting the connect
1148  * @param st Session type requested.
1149  * @param tep Remote transport endpoint
1150  * @param opaque Opaque data (typically, api_context) the application expects
1151  * on open completion.
1152  */
1153 int
1154 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1155 {
1157  tst = transport_protocol_service_type (rmt->transport_proto);
1158  return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1159 }
1160 
1161 /**
1162  * Ask transport to listen on session endpoint.
1163  *
1164  * @param s Session for which listen will be called. Note that unlike
1165  * established sessions, listen sessions are not associated to a
1166  * thread.
1167  * @param sep Local endpoint to be listened on.
1168  */
1169 int
1171 {
1172  transport_endpoint_t *tep;
1173  u32 tc_index, s_index;
1174 
1175  /* Transport bind/listen */
1176  tep = session_endpoint_to_transport (sep);
1177  s_index = ls->session_index;
1179  s_index, tep);
1180 
1181  if (tc_index == (u32) ~ 0)
1182  return -1;
1183 
1184  /* Attach transport to session. Lookup tables are populated by the app
1185  * worker because local tables (for ct sessions) are not backed by a fib */
1186  ls = listen_session_get (s_index);
1187  ls->connection_index = tc_index;
1188 
1189  return 0;
1190 }
1191 
1192 /**
1193  * Ask transport to stop listening on local transport endpoint.
1194  *
1195  * @param s Session to stop listening on. It must be in state LISTENING.
1196  */
1197 int
1199 {
1202 
1203  if (s->session_state != SESSION_STATE_LISTENING)
1204  return -1;
1205 
1207  if (!tc)
1208  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1209 
1210  if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1213  return 0;
1214 }
1215 
1216 /**
1217  * Initialize session closing procedure.
1218  *
1219  * Request is always sent to session node to ensure that all outstanding
1220  * requests are served before transport is notified.
1221  */
1222 void
1224 {
1225  if (!s)
1226  return;
1227 
1228  if (s->session_state >= SESSION_STATE_CLOSING)
1229  {
1230  /* Session will only be removed once both app and transport
1231  * acknowledge the close */
1232  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1233  || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1235  return;
1236  }
1237 
1238  s->session_state = SESSION_STATE_CLOSING;
1240 }
1241 
1242 /**
1243  * Force a close without waiting for data to be flushed
1244  */
1245 void
1247 {
1248  if (s->session_state >= SESSION_STATE_CLOSING)
1249  return;
1250  /* Drop all outstanding tx data */
1252  s->session_state = SESSION_STATE_CLOSING;
1254 }
1255 
1256 /**
1257  * Notify transport the session can be disconnected. This should eventually
1258  * result in a delete notification that allows us to cleanup session state.
1259  * Called for both active/passive disconnects.
1260  *
1261  * Must be called from the session's thread.
1262  */
1263 void
1265 {
1266  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1267  {
1268  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1269  s->session_state = SESSION_STATE_CLOSED;
1270  /* If transport is already deleted, just free the session */
1271  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1273  return;
1274  }
1275 
1276  /* If the tx queue wasn't drained, the transport can continue to try
1277  * sending the outstanding data (in closed state it cannot). It MUST however
1278  * at one point, either after sending everything or after a timeout, call
1279  * delete notify. This will finally lead to the complete cleanup of the
1280  * session.
1281  */
1282  s->session_state = SESSION_STATE_APP_CLOSED;
1283 
1285  s->thread_index);
1286 }
1287 
1288 /**
1289  * Force transport close
1290  */
1291 void
1293 {
1294  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1295  {
1296  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1297  s->session_state = SESSION_STATE_CLOSED;
1298  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1300  return;
1301  }
1302 
1303  s->session_state = SESSION_STATE_APP_CLOSED;
1305  s->thread_index);
1306 }
1307 
1308 /**
1309  * Cleanup transport and session state.
1310  *
1311  * Notify transport of the cleanup and free the session. This should
1312  * be called only if transport reported some error and is already
1313  * closed.
1314  */
1315 void
1317 {
1318  /* Delete from main lookup table before we axe the the transport */
1320  if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1322  s->thread_index);
1323  /* Since we called cleanup, no delete notification will come. So, make
1324  * sure the session is properly freed. */
1326 }
1327 
1328 /**
1329  * Allocate event queues in the shared-memory segment
1330  *
1331  * That can either be a newly created memfd segment, that will need to be
1332  * mapped by all stack users, or the binary api's svm region. The latter is
1333  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1334  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1335  * vpp uses api svm region for event queues.
1336  */
1337 void
1339 {
1340  u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1341  ssvm_private_t *eqs = &smm->evt_qs_segment;
1342  api_main_t *am = &api_main;
1343  uword eqs_size = 64 << 20;
1344  pid_t vpp_pid = getpid ();
1345  void *oldheap;
1346  int i;
1347 
1349  evt_q_length = smm->configured_event_queue_length;
1350 
1351  if (smm->evt_qs_use_memfd_seg)
1352  {
1353  if (smm->evt_qs_segment_size)
1354  eqs_size = smm->evt_qs_segment_size;
1355 
1356  eqs->ssvm_size = eqs_size;
1357  eqs->i_am_master = 1;
1358  eqs->my_pid = vpp_pid;
1359  eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1360  eqs->requested_va = smm->session_baseva;
1361 
1363  {
1364  clib_warning ("failed to initialize queue segment");
1365  return;
1366  }
1367  }
1368 
1369  if (smm->evt_qs_use_memfd_seg)
1370  oldheap = ssvm_push_heap (eqs->sh);
1371  else
1372  oldheap = svm_push_data_heap (am->vlib_rp);
1373 
1374  for (i = 0; i < vec_len (smm->wrk); i++)
1375  {
1376  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1378  {evt_q_length, evt_size, 0}
1379  ,
1380  {evt_q_length >> 1, 256, 0}
1381  };
1382  cfg->consumer_pid = 0;
1383  cfg->n_rings = 2;
1384  cfg->q_nitems = evt_q_length;
1385  cfg->ring_cfgs = rc;
1386  smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1387  if (smm->evt_qs_use_memfd_seg)
1388  {
1390  clib_warning ("eventfd returned");
1391  }
1392  }
1393 
1394  if (smm->evt_qs_use_memfd_seg)
1395  ssvm_pop_heap (oldheap);
1396  else
1397  svm_pop_heap (oldheap);
1398 }
1399 
1402 {
1403  session_main_t *smm = &session_main;
1404  if (smm->evt_qs_use_memfd_seg)
1405  return &smm->evt_qs_segment;
1406  return 0;
1407 }
1408 
1409 u64
1411 {
1412  svm_fifo_t *f;
1413 
1414  if (!s->rx_fifo)
1415  return SESSION_INVALID_HANDLE;
1416 
1417  f = s->rx_fifo;
1418  return segment_manager_make_segment_handle (f->segment_manager,
1419  f->segment_index);
1420 }
1421 
1422 /* *INDENT-OFF* */
1427  session_tx_fifo_dequeue_and_snd
1428 };
1429 /* *INDENT-ON* */
1430 
1431 /**
1432  * Initialize session layer for given transport proto and ip version
1433  *
1434  * Allocates per session type (transport proto + ip version) data structures
1435  * and adds arc from session queue node to session type output node.
1436  */
1437 void
1439  const transport_proto_vft_t * vft, u8 is_ip4,
1440  u32 output_node)
1441 {
1442  session_main_t *smm = &session_main;
1443  session_type_t session_type;
1444  u32 next_index = ~0;
1445 
1446  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1447 
1448  vec_validate (smm->session_type_to_next, session_type);
1449  vec_validate (smm->session_tx_fns, session_type);
1450 
1451  /* *INDENT-OFF* */
1452  if (output_node != ~0)
1453  {
1454  foreach_vlib_main (({
1455  next_index = vlib_node_add_next (this_vlib_main,
1456  session_queue_node.index,
1457  output_node);
1458  }));
1459  }
1460  /* *INDENT-ON* */
1461 
1462  smm->session_type_to_next[session_type] = next_index;
1463  smm->session_tx_fns[session_type] =
1464  session_tx_fns[vft->transport_options.tx_type];
1465 }
1466 
1469 {
1470  if (s->session_state != SESSION_STATE_LISTENING)
1473  else
1475  s->connection_index);
1476 }
1477 
1478 void
1480 {
1481  if (s->session_state != SESSION_STATE_LISTENING)
1483  s->connection_index, s->thread_index, tep,
1484  is_lcl);
1485  else
1487  s->connection_index, tep, is_lcl);
1488 }
1489 
1492 {
1494  s->connection_index);
1495 }
1496 
1497 void
1499 {
1500  ASSERT (vlib_get_thread_index () == 0);
1503 }
1504 
1505 static clib_error_t *
1507 {
1508  segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1509  session_main_t *smm = &session_main;
1511  u32 num_threads, preallocated_sessions_per_worker;
1512  session_worker_t *wrk;
1513  int i;
1514 
1515  num_threads = 1 /* main thread */ + vtm->n_threads;
1516 
1517  if (num_threads < 1)
1518  return clib_error_return (0, "n_thread_stacks not set");
1519 
1520  /* Allocate cache line aligned worker contexts */
1521  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1522 
1523  for (i = 0; i < num_threads; i++)
1524  {
1525  wrk = &smm->wrk[i];
1526  wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1527  wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1528  wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1529  wrk->vm = vlib_mains[i];
1530  wrk->last_vlib_time = vlib_time_now (vm);
1532 
1533  if (num_threads > 1)
1535  }
1536 
1537  /* Allocate vpp event queues segment and queue */
1539 
1540  /* Initialize fifo segment main baseva and timeout */
1541  sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1542  sm_args->size = smm->session_va_space_size;
1543  segment_manager_main_init (sm_args);
1544 
1545  /* Preallocate sessions */
1546  if (smm->preallocated_sessions)
1547  {
1548  if (num_threads == 1)
1549  {
1551  }
1552  else
1553  {
1554  int j;
1555  preallocated_sessions_per_worker =
1556  (1.1 * (f64) smm->preallocated_sessions /
1557  (f64) (num_threads - 1));
1558 
1559  for (j = 1; j < num_threads; j++)
1560  {
1561  pool_init_fixed (smm->wrk[j].sessions,
1562  preallocated_sessions_per_worker);
1563  }
1564  }
1565  }
1566 
1569  transport_init ();
1570 
1571  smm->is_enabled = 1;
1572 
1573  /* Enable transports */
1574  transport_enable_disable (vm, 1);
1575  return 0;
1576 }
1577 
1578 void
1580 {
1581  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1583  u8 have_workers = vtm->n_threads != 0;
1584 
1585  /* *INDENT-OFF* */
1586  foreach_vlib_main (({
1587  if (have_workers && ii == 0)
1588  {
1589  vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1590  state);
1591  if (is_en)
1592  {
1593  vlib_node_t *n = vlib_get_node (this_vlib_main,
1595  vlib_start_process (this_vlib_main, n->runtime_index);
1596  }
1597  else
1598  {
1599  vlib_process_signal_event_mt (this_vlib_main,
1602  }
1603 
1604  continue;
1605  }
1606  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1607  state);
1608  }));
1609  /* *INDENT-ON* */
1610 }
1611 
1612 clib_error_t *
1614 {
1615  clib_error_t *error = 0;
1616  if (is_en)
1617  {
1618  if (session_main.is_enabled)
1619  return 0;
1620 
1621  error = session_manager_main_enable (vm);
1623  }
1624  else
1625  {
1626  session_main.is_enabled = 0;
1628  }
1629 
1630  return error;
1631 }
1632 
1633 clib_error_t *
1635 {
1636  session_main_t *smm = &session_main;
1638 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1639  smm->session_va_space_size = 128ULL << 30;
1640  smm->evt_qs_segment_size = 64 << 20;
1641 #else
1642  smm->session_va_space_size = 128 << 20;
1643  smm->evt_qs_segment_size = 1 << 20;
1644 #endif
1645  smm->is_enabled = 0;
1646  smm->session_enable_asap = 0;
1647  return 0;
1648 }
1649 
1650 static clib_error_t *
1652 {
1653  session_main_t *smm = &session_main;
1654  if (smm->session_enable_asap)
1655  {
1657  vnet_session_enable_disable (vm, 1 /* is_en */ );
1659  }
1660  return 0;
1661 }
1662 
1665 
1666 static clib_error_t *
1668 {
1669  session_main_t *smm = &session_main;
1670  u32 nitems;
1671  uword tmp;
1672 
1673  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1674  {
1675  if (unformat (input, "event-queue-length %d", &nitems))
1676  {
1677  if (nitems >= 2048)
1678  smm->configured_event_queue_length = nitems;
1679  else
1680  clib_warning ("event queue length %d too small, ignored", nitems);
1681  }
1682  else if (unformat (input, "preallocated-sessions %d",
1683  &smm->preallocated_sessions))
1684  ;
1685  else if (unformat (input, "v4-session-table-buckets %d",
1687  ;
1688  else if (unformat (input, "v4-halfopen-table-buckets %d",
1690  ;
1691  else if (unformat (input, "v6-session-table-buckets %d",
1693  ;
1694  else if (unformat (input, "v6-halfopen-table-buckets %d",
1696  ;
1697  else if (unformat (input, "v4-session-table-memory %U",
1698  unformat_memory_size, &tmp))
1699  {
1700  if (tmp >= 0x100000000)
1701  return clib_error_return (0, "memory size %llx (%lld) too large",
1702  tmp, tmp);
1704  }
1705  else if (unformat (input, "v4-halfopen-table-memory %U",
1706  unformat_memory_size, &tmp))
1707  {
1708  if (tmp >= 0x100000000)
1709  return clib_error_return (0, "memory size %llx (%lld) too large",
1710  tmp, tmp);
1712  }
1713  else if (unformat (input, "v6-session-table-memory %U",
1714  unformat_memory_size, &tmp))
1715  {
1716  if (tmp >= 0x100000000)
1717  return clib_error_return (0, "memory size %llx (%lld) too large",
1718  tmp, tmp);
1720  }
1721  else if (unformat (input, "v6-halfopen-table-memory %U",
1722  unformat_memory_size, &tmp))
1723  {
1724  if (tmp >= 0x100000000)
1725  return clib_error_return (0, "memory size %llx (%lld) too large",
1726  tmp, tmp);
1728  }
1729  else if (unformat (input, "local-endpoints-table-memory %U",
1730  unformat_memory_size, &tmp))
1731  {
1732  if (tmp >= 0x100000000)
1733  return clib_error_return (0, "memory size %llx (%lld) too large",
1734  tmp, tmp);
1735  smm->local_endpoints_table_memory = tmp;
1736  }
1737  else if (unformat (input, "local-endpoints-table-buckets %d",
1739  ;
1740  else if (unformat (input, "evt_qs_memfd_seg"))
1741  smm->evt_qs_use_memfd_seg = 1;
1742  else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1743  &smm->evt_qs_segment_size))
1744  ;
1745  else if (unformat (input, "enable"))
1746  smm->session_enable_asap = 1;
1747  else
1748  return clib_error_return (0, "unknown input `%U'",
1749  format_unformat_error, input);
1750  }
1751  return 0;
1752 }
1753 
1755 
1756 /*
1757  * fd.io coding-style-patch-verification: ON
1758  *
1759  * Local Variables:
1760  * eval: (c-set-style "gnu")
1761  * End:
1762  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:439
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:124
#define session_endpoint_to_transport_cfg(_sep)
Definition: session_types.h:91
u32 preallocated_sessions
Preallocate session config parameter.
Definition: session.h:196
int app_worker_lock_and_send_event(app_worker_t *app, session_t *s, u8 evt_type)
Send event to application.
void transport_close(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:295
u64 ssvm_size
Definition: ssvm.h:85
static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES]
Definition: session.c:1133
u32 connection_index
Index of the transport connection associated to the session.
uword evt_qs_segment_size
Definition: session.h:178
void session_flush_frames_main_thread(vlib_main_t *vm)
Definition: session.c:1498
int app_worker_init_accepted(session_t *s)
void session_transport_reset(session_t *s)
Force transport close.
Definition: session.c:1292
int session_lookup_del_connection(transport_connection_t *tc)
Delete transport connection from session table.
int app_worker_reset_notify(app_worker_t *app_wrk, session_t *s)
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int session_open(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:1154
static void svm_pop_heap(void *oldheap)
Definition: svm.h:94
session_type_t session_type
Type built from transport and network protocol types.
uword requested_va
Definition: ssvm.h:88
#define SESSION_Q_PROCESS_FLUSH_FRAMES
Definition: session.h:205
svm_msg_q_t * vpp_event_queue
vpp event message queue for worker
Definition: session.h:84
void transport_get_listener_endpoint(transport_proto_t tp, u32 conn_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:363
static u32 svm_fifo_max_enqueue_prod(svm_fifo_t *f)
Maximum number of bytes that can be enqueued into fifo.
Definition: svm_fifo.h:618
static void clib_rwlock_writer_lock(clib_rwlock_t *p)
Definition: lock.h:173
svm_fifo_t * tx_fifo
session_main_t session_main
Definition: session.c:26
int session_tx_fifo_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:503
#define VLIB_MAIN_LOOP_ENTER_FUNCTION(x)
Definition: init.h:176
u32 session_index
Index in thread pool where session was allocated.
int session_stop_listen(session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:1198
unsigned long u64
Definition: types.h:89
transport_connection_t * listen_session_get_transport(session_t *s)
Definition: session.c:1491
static svm_msg_q_t * session_main_get_vpp_event_queue(u32 thread_index)
Definition: session.h:607
u32 configured_v4_halfopen_table_buckets
Definition: session.h:184
u8 session_enable_asap
Enable session manager at startup.
Definition: session.h:170
int session_ho_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:772
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
void session_transport_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:887
transport_connection_t * session_get_transport(session_t *s)
Definition: session.c:1468
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:279
svm_fifo_t * rx_fifo
Pointers to rx/tx buffers.
session_worker_t * wrk
Worker contexts.
Definition: session.h:147
static session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:302
void session_add_self_custom_tx_evt(transport_connection_t *tc, u8 has_prio)
Definition: session.c:123
#define pool_get_aligned_will_expand(P, YESNO, A)
See if pool_get will expand the pool or not.
Definition: pool.h:242
int session_enqueue_dgram_connection(session_t *s, session_dgram_hdr_t *hdr, vlib_buffer_t *b, u8 proto, u8 queue_event)
Definition: session.c:467
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:113
static int session_enqueue_notify_inline(session_t *s)
Notify session peer that new data has been enqueued.
Definition: session.c:557
int session_main_flush_enqueue_events(u8 transport_proto, u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:664
session_evt_type_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:522
static transport_proto_t session_get_transport_proto(session_t *s)
void session_send_rpc_evt_to_thread(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:111
int svm_fifo_peek(svm_fifo_t *f, u32 offset, u32 len, u8 *dst)
Peek data from fifo.
Definition: svm_fifo.c:1007
void session_transport_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:988
int i
int session_open_vc(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1082
void svm_fifo_dequeue_drop_all(svm_fifo_t *f)
Dequeue and drop all bytes from fifo.
Definition: svm_fifo.c:1061
ssvm_shared_header_t * sh
Definition: ssvm.h:84
u32 transport_start_listen(transport_proto_t tp, u32 session_index, transport_endpoint_t *tep)
Definition: transport.c:310
static session_t * session_get(u32 si, u32 thread_index)
Definition: session.h:295
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:424
session_evt_elt_t * event_elts
Pool of session event list elements.
Definition: session.h:105
u8 data[128]
Definition: ipsec.api:251
#define vec_validate_aligned(V, I, A)
Make sure vector is long enough for given index (no header, specified alignment)
Definition: vec.h:450
u32 flags
Session flags.
int session_enqueue_stream_connection(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:414
u64 session_lookup_half_open_handle(transport_connection_t *tc)
static session_t * session_clone_safe(u32 session_index, u32 thread_index)
Definition: session.h:392
u32 local_endpoints_table_memory
Transport table (preallocation) size parameters.
Definition: session.h:192
void session_node_enable_disable(u8 is_en)
Definition: session.c:1579
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
vlib_main_t ** vlib_mains
Definition: buffer.c:332
static uword vlib_node_add_next(vlib_main_t *vm, uword node, uword next_node)
Definition: node_funcs.h:1092
unsigned char u8
Definition: types.h:56
app_worker_t * application_get_worker(application_t *app, u32 wrk_map_index)
Definition: application.c:644
session_fifo_rx_fn session_tx_fifo_peek_and_snd
session_t * sessions
Worker session pool.
Definition: session.h:81
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
double f64
Definition: types.h:142
static session_fifo_rx_fn * session_tx_fns[TRANSPORT_TX_N_FNS]
Definition: session.c:1423
#define vlib_worker_thread_barrier_sync(X)
Definition: threads.h:204
static session_handle_t session_handle(session_t *s)
struct _svm_fifo svm_fifo_t
void session_get_endpoint(session_t *s, transport_endpoint_t *tep, u8 is_lcl)
Definition: session.c:1479
u8 session_type_t
void session_transport_closing_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:865
u64 segment_manager_make_segment_handle(u32 segment_manager_index, u32 segment_index)
int session_open_cl(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1046
#define clib_llist_make_head(LP, name)
Initialize llist head.
Definition: llist.h:106
static session_worker_t * session_main_get_worker(u32 thread_index)
Definition: session.h:593
void session_free_w_fifos(session_t *s)
Definition: session.c:247
#define session_endpoint_to_transport(_sep)
Definition: session_types.h:90
void app_namespaces_init(void)
vlib_main_t * vm
Convenience pointer to this worker&#39;s vlib_main.
Definition: session.h:93
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:1667
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:40
u8 transport_half_open_has_fifos(transport_proto_t tp)
Definition: transport.c:265
int session_send_ctrl_evt_to_thread(session_t *s, session_evt_type_t evt_type)
Definition: session.c:94
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:173
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:86
void segment_manager_dealloc_fifos(svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
#define always_inline
Definition: clib.h:99
clib_llist_index_t new_head
Head of list of elements.
Definition: session.h:114
u32 * session_type_to_next
Per session type output nodes.
Definition: session.h:161
static int session_stream_connect_notify_inline(transport_connection_t *tc, u8 is_fail, session_state_t opened_state)
Definition: session.c:703
static int session_enqueue_chain_tail(session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:327
static void * ssvm_push_heap(ssvm_shared_header_t *sh)
Definition: ssvm.h:143
#define clib_error_return(e, args...)
Definition: error.h:99
svm_region_t * vlib_rp
Current binary api segment descriptor.
Definition: api_common.h:256
uword session_baseva
Session ssvm segment configs.
Definition: session.h:176
vhost_vring_state_t state
Definition: vhost_user.h:146
unsigned int u32
Definition: types.h:88
int session_send_io_evt_to_thread(svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:80
int ssvm_master_init(ssvm_private_t *ssvm, ssvm_segment_type_t type)
Definition: ssvm.c:425
u8 session_is_valid(u32 si, u8 thread_index)
Definition: session.c:210
static void ssvm_pop_heap(void *oldheap)
Definition: ssvm.h:151
void transport_reset(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:301
#define HALF_OPEN_LOOKUP_INVALID_VALUE
#define SESSION_INVALID_HANDLE
Definition: session_types.h:23
int session_lookup_del_half_open(transport_connection_t *tc)
session_state_t
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:257
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1506
struct _transport_proto_vft transport_proto_vft_t
int session_dequeue_notify(session_t *s)
Definition: session.c:634
struct _session_endpoint_cfg session_endpoint_cfg_t
u32 configured_v6_halfopen_table_memory
Definition: session.h:189
u32 configured_v6_session_table_buckets
Definition: session.h:186
static session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:514
static void clib_rwlock_init(clib_rwlock_t *p)
Definition: lock.h:133
#define CLIB_US_TIME_FREQ
Definition: time.h:207
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
session_event_t evt
Definition: session.h:68
transport_service_type_t transport_protocol_service_type(transport_proto_t tp)
Definition: transport.c:271
uword session_va_space_size
Definition: session.h:177
void session_send_rpc_evt_to_thread_force(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:103
u32 configured_v4_session_table_buckets
Session table size parameters.
Definition: session.h:182
void transport_get_endpoint(transport_proto_t tp, u32 conn_index, u32 thread_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:347
int app_worker_accept_notify(app_worker_t *app_wrk, session_t *s)
u32 app_index
Index of application that owns the listener.
struct _unformat_input_t unformat_input_t
u32 configured_v6_halfopen_table_buckets
Definition: session.h:188
u32 configured_event_queue_length
vpp fifo event queue configured length
Definition: session.h:173
u8 is_enabled
Session manager is enabled.
Definition: session.h:168
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:229
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:286
#define APP_INVALID_INDEX
Definition: application.h:167
int svm_fifo_enqueue(svm_fifo_t *f, u32 len, const u8 *src)
Enqueue data to fifo.
Definition: svm_fifo.c:888
enum transport_service_type_ transport_service_type_t
#define PREDICT_FALSE(x)
Definition: clib.h:112
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:182
int session_lookup_del_session(session_t *s)
u32 wrk_index
Worker index in global worker pool.
Definition: application.h:37
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1634
app_worker_t * app_worker_get_if_valid(u32 wrk_index)
static u32 svm_fifo_max_dequeue_prod(svm_fifo_t *f)
Fifo max bytes to dequeue optimized for producer.
Definition: svm_fifo.h:512
session_fifo_rx_fn ** session_tx_fns
Per transport rx function that can either dequeue or peek.
Definition: session.h:156
u64 session_segment_handle(session_t *s)
Definition: session.c:1410
#define foreach_vlib_main(body)
Definition: threads.h:241
static session_evt_elt_t * session_evt_alloc_old(session_worker_t *wrk)
Definition: session.h:280
void transport_cleanup(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:283
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1613
u32 transport_stop_listen(transport_proto_t tp, u32 conn_index)
Definition: transport.c:317
struct _session_switch_pool_args session_switch_pool_args_t
u8 len
Definition: ip_types.api:90
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:203
clib_rwlock_t peekers_rw_locks
Peekers rw lock.
Definition: session.h:120
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P with alignment A.
Definition: pool.h:230
#define SESSION_DBG(_fmt, _args...)
static void clib_rwlock_writer_unlock(clib_rwlock_t *p)
Definition: lock.h:187
static u8 svm_fifo_set_event(svm_fifo_t *f)
Set fifo event flag.
Definition: svm_fifo.h:748
u32 n_rings
number of msg rings
Definition: message_queue.h:54
u8 evt_qs_use_memfd_seg
Definition: session.h:179
void transport_init(void)
Definition: transport.c:759
int app_worker_transport_closed_notify(app_worker_t *app_wrk, session_t *s)
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
ssvm_private_t evt_qs_segment
Event queues memfd segment initialized only if so configured.
Definition: session.h:150
static u8 svm_fifo_needs_deq_ntf(svm_fifo_t *f, u32 n_last_deq)
Check if fifo needs dequeue notification.
Definition: svm_fifo.h:839
#define UNFORMAT_END_OF_INPUT
Definition: format.h:145
u32 runtime_index
Definition: node.h:283
session_handle_t listener_handle
Parent listener session index if the result of an accept.
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:218
int transport_connect(transport_proto_t tp, transport_endpoint_cfg_t *tep)
Definition: transport.c:289
vlib_main_t * vm
Definition: buffer.c:323
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
static transport_connection_t * transport_get_connection(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.h:117
static void vlib_process_signal_event_mt(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
Signal event to process from any thread.
Definition: node_funcs.h:958
static void session_cleanup_notify(session_t *s, session_cleanup_ntf_t ntf)
Definition: session.c:236
void session_free(session_t *s)
Definition: session.c:196
int app_worker_cleanup_notify(app_worker_t *app_wrk, session_t *s, session_cleanup_ntf_t ntf)
#define clib_warning(format, args...)
Definition: error.h:59
static int session_send_evt_to_thread(void *data, void *args, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:29
int session_dgram_connect_notify(transport_connection_t *tc, u32 old_thread_index, session_t **new_session)
Move dgram session to the right thread.
Definition: session.c:823
Don&#39;t register connection in lookup Does not apply to local apps and transports using the network lay...
struct _transport_connection transport_connection_t
#define HIGH_SEGMENT_BASEVA
Definition: svm_common.h:84
int app_worker_init_connected(app_worker_t *app_wrk, session_t *s)
static session_evt_elt_t * session_evt_alloc_new(session_worker_t *wrk)
Definition: session.h:270
u32 my_pid
Definition: ssvm.h:86
void transport_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: transport.c:748
static u8 vlib_thread_is_main_w_barrier(void)
Definition: threads.h:517
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:86
application_t * application_get(u32 app_index)
Definition: application.c:418
static u32 session_thread_from_handle(session_handle_t handle)
#define SESSION_Q_PROCESS_STOP
Definition: session.h:206
int session_listen(session_t *ls, session_endpoint_cfg_t *sep)
Ask transport to listen on session endpoint.
Definition: session.c:1170
static u32 session_index_from_handle(session_handle_t handle)
#define uword_to_pointer(u, type)
Definition: types.h:136
void session_vpp_event_queues_allocate(session_main_t *smm)
Allocate event queues in the shared-memory segment.
Definition: session.c:1338
static session_handle_t session_make_handle(u32 session_index, u32 thread_index)
#define ASSERT(truth)
static int session_notify_subscribers(u32 app_index, session_t *s, svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:525
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
int session_main_flush_all_enqueue_events(u8 transport_proto)
Definition: session.c:693
void segment_manager_main_init(segment_manager_main_init_args_t *a)
static void clib_mem_free(void *p)
Definition: mem.h:226
enum _transport_proto transport_proto_t
static void svm_fifo_clear_deq_ntf(svm_fifo_t *f)
Clear the want notification flag and set has notification.
Definition: svm_fifo.h:806
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 len, u8 *src)
Enqueue a future segment.
Definition: svm_fifo.c:931
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:248
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:148
clib_time_type_t last_vlib_time
vlib_time_now last time around the track
Definition: session.h:87
u32 configured_v4_halfopen_table_memory
Definition: session.h:185
static session_t * session_alloc_for_connection(transport_connection_t *tc)
Definition: session.c:272
int app_worker_migrate_notify(app_worker_t *app_wrk, session_t *s, session_handle_t new_sh)
u32 configured_v4_session_table_memory
Definition: session.h:183
int session_stream_accept_notify(transport_connection_t *tc)
Definition: session.c:1003
static void * clib_mem_alloc(uword size)
Definition: mem.h:153
void session_enqueue_notify_thread(session_handle_t sh)
Like session_enqueue_notify, but can be called from a thread that does not own the session...
Definition: session.c:619
static uword pointer_to_uword(const void *p)
Definition: types.h:131
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u8 * name
Definition: ssvm.h:87
int() session_fifo_rx_fn(session_worker_t *wrk, vlib_node_runtime_t *node, session_evt_elt_t *e, int *n_tx_packets)
Definition: session.h:134
u8 thread_index
Index of the thread that allocated the session.
session_t * session_alloc(u32 thread_index)
Definition: session.c:170
void session_transport_cleanup(session_t *s)
Cleanup transport and session state.
Definition: session.c:1316
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:296
template key/value backing page structure
Definition: bihash_doc.h:44
static void session_switch_pool(void *cb_args)
Notify old thread of the session pool switch.
Definition: session.c:790
static transport_connection_t * transport_get_half_open(transport_proto_t tp, u32 conn_index)
Definition: transport.h:130
u32 local_endpoints_table_buckets
Definition: session.h:193
app_worker_t * app_worker_get(u32 wrk_index)
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
u64 session_handle_t
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
session_fifo_rx_fn session_tx_fifo_dequeue_internal
clib_llist_index_t old_head
Head of list of pending events.
Definition: session.h:117
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:140
volatile u8 session_state
State in session layer state machine.
int session_enqueue_notify(session_t *s)
Definition: session.c:596
void session_transport_closed_notify(transport_connection_t *tc)
Notification from transport that it is closed.
Definition: session.c:953
u32 * session_to_enqueue[TRANSPORT_N_PROTO]
Per-proto vector of sessions to enqueue.
Definition: session.h:96
void session_close(session_t *s)
Initialize session closing procedure.
Definition: session.c:1223
VLIB buffer representation.
Definition: buffer.h:102
u64 uword
Definition: types.h:112
void session_reset(session_t *s)
Force a close without waiting for data to be flushed.
Definition: session.c:1246
ssvm_private_t * session_main_get_evt_q_segment(void)
Definition: session.c:1401
session_cleanup_ntf_t
int session_send_io_evt_to_thread_custom(void *data, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:87
unformat_function_t unformat_memory_size
Definition: format.h:296
static session_evt_elt_t * session_evt_alloc_ctrl(session_worker_t *wrk)
Definition: session.h:246
u32 app_index
Index of owning app.
Definition: application.h:43
static u8 svm_fifo_n_subscribers(svm_fifo_t *f)
Definition: svm_fifo.h:680
int session_lookup_add_connection(transport_connection_t *tc, u64 value)
Add transport connection to a session table.
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
void vlib_worker_thread_barrier_release(vlib_main_t *vm)
Definition: threads.c:1520
u32 configured_v6_session_table_memory
Definition: session.h:187
clib_us_time_t last_vlib_us_time
vlib_time_now rounded to us precision and as u64
Definition: session.h:90
int session_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:765
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
vlib_node_registration_t session_queue_process_node
(constructor) VLIB_REGISTER_NODE (session_queue_process_node)
static u32 vlib_num_workers()
Definition: threads.h:372
static vlib_node_t * vlib_get_node(vlib_main_t *vm, u32 i)
Get vlib node by index.
Definition: node_funcs.h:59
int(* session_open_service_fn)(u32, session_endpoint_t *, u32)
Definition: session.c:1130
u32 app_wrk_index
Index of the app worker that owns the session.
u32 session_tx_fifo_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:511
static void session_delete(session_t *s)
Cleans up session and lookup table.
Definition: session.c:260
void session_transport_close(session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1264
static clib_error_t * session_main_init(vlib_main_t *vm)
Definition: session.c:1651
clib_llist_index_t ctrl_head
Head of control events list.
Definition: session.h:111
int app_worker_close_notify(app_worker_t *app_wrk, session_t *s)
struct _session_endpoint session_endpoint_t
int session_stream_accept(transport_connection_t *tc, u32 listener_index, u32 thread_index, u8 notify)
Accept a stream session.
Definition: session.c:1020
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static transport_connection_t * transport_get_listener(transport_proto_t tp, u32 conn_index)
Definition: transport.h:124
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:167
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 len)
Dequeue and drop bytes from fifo.
Definition: svm_fifo.c:1029
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static void session_enqueue_notify_rpc(void *arg)
Definition: session.c:602
int session_open_app(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1119
void vlib_start_process(vlib_main_t *vm, uword process_index)
Definition: main.c:1587
void session_register_transport(transport_proto_t transport_proto, const transport_proto_vft_t *vft, u8 is_ip4, u32 output_node)
Initialize session layer for given transport proto and ip version.
Definition: session.c:1438
int session_lookup_add_half_open(transport_connection_t *tc, u64 value)
api_main_t api_main
Definition: api_shared.c:35
u8 transport_protocol_is_cl(transport_proto_t tp)
Definition: transport.c:323
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
int app_worker_connect_notify(app_worker_t *app_wrk, session_t *s, u32 opaque)
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:85
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
#define SESSION_EVT(_evt, _args...)
int i_am_master
Definition: ssvm.h:89
static void session_program_transport_ctrl_evt(session_t *s, session_evt_type_t evt)
Definition: session.c:149
static session_t * listen_session_get(u32 ls_index)
Definition: session.h:569
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:978
vl_api_fib_path_nh_proto_t proto
Definition: fib_types.api:125
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:171