FD.io VPP  v16.12-rc0-308-g931be3a
Vector Packet Processing
mc.c
Go to the documentation of this file.
1 /*
2  * mc.c: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include <vlib/vlib.h>
19 
20 /*
21  * 1 to enable msg id training wheels, which are useful for tracking
22  * down catchup and/or partitioned network problems
23  */
24 #define MSG_ID_DEBUG 0
25 
27 
28 static u32
30 {
31  uword *p, r;
32  mhash_t *h = &m->elog_id_by_peer_id;
33 
34  if (!m->elog_id_by_peer_id.hash)
35  mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
36 
37  p = mhash_get (h, &peer_id);
38  if (p)
39  return p[0];
40  r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
41  mhash_set (h, &peer_id, r, /* old_value */ 0);
42  return r;
43 }
44 
45 static u32
46 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
47 {
48  uword *p, r;
49  uword *h = m->elog_id_by_msg_name;
50  u8 *name_copy;
51 
52  if (!h)
53  h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
54 
55  p = hash_get_mem (h, msg_name);
56  if (p)
57  return p[0];
58  r = elog_string (m->elog_main, "%s", msg_name);
59 
60  name_copy = format (0, "%s%c", msg_name, 0);
61 
62  hash_set_mem (h, name_copy, r);
63  m->elog_id_by_msg_name = h;
64 
65  return r;
66 }
67 
68 static void
69 elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
70  u32 retry_count)
71 {
72  if (MC_EVENT_LOGGING > 0)
73  {
74  /* *INDENT-OFF* */
75  ELOG_TYPE_DECLARE (e) =
76  {
77  .format = "tx-msg: stream %d local seq %d attempt %d",
78  .format_args = "i4i4i4",
79  };
80  /* *INDENT-ON* */
81  struct
82  {
83  u32 stream_id, local_sequence, retry_count;
84  } *ed;
85  ed = ELOG_DATA (m->elog_main, e);
86  ed->stream_id = stream_id;
87  ed->local_sequence = local_sequence;
88  ed->retry_count = retry_count;
89  }
90 }
91 
92 /*
93  * seq_cmp
94  * correctly compare two unsigned sequence numbers.
95  * This function works so long as x and y are within 2**(n-1) of each
96  * other, where n = bits(x, y).
97  *
98  * Magic decoder ring:
99  * seq_cmp == 0 => x and y are equal
100  * seq_cmp < 0 => x is "in the past" with respect to y
101  * seq_cmp > 0 => x is "in the future" with respect to y
102  */
105 {
106  return (i32) x - (i32) y;
107 }
108 
109 void *
110 mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
111 {
112  u32 n_alloc, bi;
113  vlib_buffer_t *b;
114 
115  n_alloc = vlib_buffer_alloc (vm, &bi, 1);
116  ASSERT (n_alloc == 1);
117 
118  b = vlib_get_buffer (vm, bi);
119  b->current_length = n_bytes;
120  *bi_return = bi;
121  return (void *) b->data;
122 }
123 
124 static void
126  uword index, int notify_application)
127 {
128  mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
129  ASSERT (p != 0);
130  if (s->config.peer_died && notify_application)
131  s->config.peer_died (mcm, s, p->id);
132 
133  s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
134 
135  if (MC_EVENT_LOGGING > 0)
136  {
137  /* *INDENT-OFF* */
138  ELOG_TYPE_DECLARE (e) =
139  {
140  .format = "delete peer %s from all_peer_bitmap",
141  .format_args = "T4",
142  };
143  /* *INDENT-ON* */
144  struct
145  {
146  u32 peer;
147  } *ed = 0;
148 
149  ed = ELOG_DATA (mcm->elog_main, e);
150  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
151  }
152  /* Do not delete the pool / hash table entries, or we lose sequence number state */
153 }
154 
155 static mc_stream_peer_t *
157  mc_stream_t * s, mc_peer_id_t id, int *created)
158 {
159  uword *q = mhash_get (&s->peer_index_by_id, &id);
160  mc_stream_peer_t *p;
161 
162  if (q)
163  {
164  p = pool_elt_at_index (s->peers, q[0]);
165  goto done;
166  }
167 
168  pool_get (s->peers, p);
169  memset (p, 0, sizeof (p[0]));
170  p->id = id;
171  p->last_sequence_received = ~0;
172  mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
173  if (created)
174  *created = 1;
175 
176 done:
177  if (MC_EVENT_LOGGING > 0)
178  {
179  /* *INDENT-OFF* */
180  ELOG_TYPE_DECLARE (e) =
181  {
182  .format = "get_or_create %s peer %s stream %d seq %d",
183  .format_args = "t4T4i4i4",
184  .n_enum_strings = 2,
185  .enum_strings = {
186  "old", "new",
187  },
188  };
189  /* *INDENT-ON* */
190  struct
191  {
192  u32 is_new, peer, stream_index, rx_sequence;
193  } *ed = 0;
194 
195  ed = ELOG_DATA (mcm->elog_main, e);
196  ed->is_new = q ? 0 : 1;
197  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
198  ed->stream_index = s->index;
199  ed->rx_sequence = p->last_sequence_received;
200  }
201  /* $$$$ Enable or reenable this peer */
202  s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
203  return p;
204 }
205 
206 static void
208 {
210 
211  if (pool_elts (stream->retry_pool) >= stream->config.window_size)
212  return;
213 
216 
217  if (stream->procs_waiting_for_open_window)
218  _vec_len (stream->procs_waiting_for_open_window) = 0;
219 }
220 
221 static void
223 {
224  mc_retry_t record, *retp;
225 
226  if (r->unacked_by_peer_bitmap)
227  _vec_len (r->unacked_by_peer_bitmap) = 0;
228 
229  if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
230  {
231  clib_fifo_sub1 (s->retired_fifo, record);
233  }
234 
235  clib_fifo_add2 (s->retired_fifo, retp);
236 
237  retp->buffer_index = r->buffer_index;
238  retp->local_sequence = r->local_sequence;
239 
240  r->buffer_index = ~0; /* poison buffer index in this retry */
241 }
242 
243 static void
244 mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
245 {
246  mc_retry_t *retry;
247 
248  if (MC_EVENT_LOGGING > 0)
249  {
250  /* *INDENT-OFF* */
251  ELOG_TYPE_DECLARE (e) =
252  {
253  .format = "resend-retired: search for local seq %d",
254  .format_args = "i4",
255  };
256  /* *INDENT-ON* */
257  struct
258  {
259  u32 local_sequence;
260  } *ed;
261  ed = ELOG_DATA (mcm->elog_main, e);
262  ed->local_sequence = local_sequence;
263  }
264 
265  /* *INDENT-OFF* */
266  clib_fifo_foreach (retry, s->retired_fifo,
267  ({
268  if (retry->local_sequence == local_sequence)
269  {
270  elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
271  mcm->transport.tx_buffer (mcm->transport.opaque,
272  MC_TRANSPORT_USER_REQUEST_TO_RELAY,
273  retry->buffer_index);
274  return;
275  }
276  }));
277  /* *INDENT-ON* */
278 
279  if (MC_EVENT_LOGGING > 0)
280  {
281  /* *INDENT-OFF* */
282  ELOG_TYPE_DECLARE (e) =
283  {
284  .format = "resend-retired: FAILED search for local seq %d",
285  .format_args = "i4",
286  };
287  /* *INDENT-ON* */
288  struct
289  {
290  u32 local_sequence;
291  } *ed;
292  ed = ELOG_DATA (mcm->elog_main, e);
293  ed->local_sequence = local_sequence;
294  }
295 }
296 
297 static uword *
299  mc_stream_t * stream,
300  mc_retry_t * r, uword * dead_peer_bitmap)
301 {
302  mc_stream_peer_t *p;
303 
304  /* *INDENT-OFF* */
305  pool_foreach (p, stream->peers, ({
306  uword pi = p - stream->peers;
307  uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
308 
309  if (! is_alive)
310  dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
311 
312  if (MC_EVENT_LOGGING > 0)
313  {
314  ELOG_TYPE_DECLARE (e) = {
315  .format = "delete_retry_fifo_elt: peer %s is %s",
316  .format_args = "T4t4",
317  .n_enum_strings = 2,
318  .enum_strings = { "alive", "dead", },
319  };
320  struct { u32 peer, is_alive; } * ed;
321  ed = ELOG_DATA (mcm->elog_main, e);
322  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
323  ed->is_alive = is_alive;
324  }
325  }));
326  /* *INDENT-ON* */
327 
328  hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
329  mc_retry_free (mcm, stream, r);
330 
331  return dead_peer_bitmap;
332 }
333 
336 {
337  return (r->prev_index != ~0
338  ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
339 }
340 
343 {
344  return (r->next_index != ~0
345  ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
346 }
347 
348 always_inline void
350 {
351  mc_retry_t *p = prev_retry (s, r);
352  mc_retry_t *n = next_retry (s, r);
353 
354  if (p)
355  p->next_index = r->next_index;
356  else
358  if (n)
359  n->prev_index = r->prev_index;
360  else
362 
363  pool_put_index (s->retry_pool, r - s->retry_pool);
364 }
365 
366 static void
368 {
369  mc_retry_t *r;
370  vlib_main_t *vm = mcm->vlib_main;
371  f64 now = vlib_time_now (vm);
372  uword *dead_peer_bitmap = 0;
373  u32 ri, ri_next;
374 
375  for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
376  {
377  r = pool_elt_at_index (s->retry_pool, ri);
378  ri_next = r->next_index;
379 
380  if (now < r->sent_at + s->config.retry_interval)
381  continue;
382 
383  r->n_retries += 1;
384  if (r->n_retries > s->config.retry_limit)
385  {
386  dead_peer_bitmap =
387  delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
388  remove_retry_from_pool (s, r);
389  }
390  else
391  {
392  if (MC_EVENT_LOGGING > 0)
393  {
394  mc_stream_peer_t *p;
395 
396  /* *INDENT-OFF* */
397  ELOG_TYPE_DECLARE (t) =
398  {
399  .format = "resend local seq %d attempt %d",
400  .format_args = "i4i4",
401  };
402  /* *INDENT-ON* */
403 
404  /* *INDENT-OFF* */
405  pool_foreach (p, s->peers, ({
406  if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
407  {
408  ELOG_TYPE_DECLARE (ev) = {
409  .format = "resend: needed by peer %s local seq %d",
410  .format_args = "T4i4",
411  };
412  struct { u32 peer, rx_sequence; } * ed;
413  ed = ELOG_DATA (mcm->elog_main, ev);
414  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
415  ed->rx_sequence = r->local_sequence;
416  }
417  }));
418  /* *INDENT-ON* */
419 
420  struct
421  {
422  u32 sequence;
423  u32 trail;
424  } *ed;
425  ed = ELOG_DATA (mcm->elog_main, t);
426  ed->sequence = r->local_sequence;
427  ed->trail = r->n_retries;
428  }
429 
430  r->sent_at = vlib_time_now (vm);
431  s->stats.n_retries += 1;
432 
433  elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
434 
435  mcm->transport.tx_buffer
436  (mcm->transport.opaque,
438  }
439  }
440 
441  maybe_send_window_open_event (mcm->vlib_main, s);
442 
443  /* Delete any dead peers we've found. */
444  if (!clib_bitmap_is_zero (dead_peer_bitmap))
445  {
446  uword i;
447 
448  /* *INDENT-OFF* */
449  clib_bitmap_foreach (i, dead_peer_bitmap, ({
450  delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
451 
452  /* Delete any references to just deleted peer in retry pool. */
453  pool_foreach (r, s->retry_pool, ({
454  r->unacked_by_peer_bitmap =
455  clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
456  }));
457  }));
458 /* *INDENT-ON* */
459  clib_bitmap_free (dead_peer_bitmap);
460  }
461 }
462 
465 {
466  mc_main_t **p = (void *) node->runtime_data;
467  return p[0];
468 }
469 
470 static uword
472  vlib_node_runtime_t * node, vlib_frame_t * f)
473 {
474  mc_main_t *mcm = mc_node_get_main (node);
475  mc_stream_t *s;
476 
477  while (1)
478  {
479  vlib_process_suspend (vm, 1.0);
480  vec_foreach (s, mcm->stream_vector)
481  {
482  if (s->state != MC_STREAM_STATE_invalid)
483  check_retry (mcm, s);
484  }
485  }
486  return 0; /* not likely */
487 }
488 
489 static void
490 send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
491 {
492  vlib_main_t *vm = mcm->vlib_main;
493  mc_msg_join_or_leave_request_t *mp;
494  u32 bi;
495 
496  mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
497  memset (mp, 0, sizeof (*mp));
498  mp->type = MC_MSG_TYPE_join_or_leave_request;
499  mp->peer_id = mcm->transport.our_ack_peer_id;
500  mp->stream_index = stream_index;
501  mp->is_join = is_join;
502 
504 
505  /*
506  * These msgs are unnumbered, unordered so send on the from-relay
507  * channel.
508  */
510 }
511 
512 static uword
514  vlib_node_runtime_t * node, vlib_frame_t * f)
515 {
516  mc_main_t *mcm = mc_node_get_main (node);
517 
518  while (1)
519  {
520  if (mcm->joins_in_progress)
521  {
522  mc_stream_t *s;
524  f64 now = vlib_time_now (vm);
525 
526  vec_foreach (s, mcm->stream_vector)
527  {
528  if (s->state != MC_STREAM_STATE_join_in_progress)
529  continue;
530 
531  if (now > s->join_timeout)
532  {
533  s->state = MC_STREAM_STATE_ready;
534 
535  if (MC_EVENT_LOGGING > 0)
536  {
537  /* *INDENT-OFF* */
538  ELOG_TYPE_DECLARE (e) =
539  {
540  .format = "stream %d join timeout",
541  };
542  /* *INDENT-ON* */
543  ELOG (mcm->elog_main, e, s->index);
544  }
545  /* Make sure that this app instance exists as a stream peer,
546  or we may answer a catchup request with a NULL
547  all_peer_bitmap... */
549  (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
550 
554  _vec_len (s->procs_waiting_for_join_done) = 0;
555 
556  mcm->joins_in_progress--;
557  ASSERT (mcm->joins_in_progress >= 0);
558  }
559  else
560  {
561  /* Resent join request which may have been lost. */
562  send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
563 
564  /* We're *not* alone, retry for as long as it takes */
565  if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
566  s->join_timeout = vlib_time_now (vm) + 2.0;
567 
568 
569  if (MC_EVENT_LOGGING > 0)
570  {
571  /* *INDENT-OFF* */
572  ELOG_TYPE_DECLARE (e) =
573  {
574  .format = "stream %d resend join request",
575  };
576  /* *INDENT-ON* */
577  ELOG (mcm->elog_main, e, s->index);
578  }
579  }
580  }
581  }
582 
583  vlib_process_suspend (vm, .5);
584  }
585 
586  return 0; /* not likely */
587 }
588 
589 static void
591 {
592  char *name = va_arg (*va, char *);
593  serialize_cstring (m, name);
594 }
595 
596 static void
597 elog_stream_name (char *buf, int n_buf_bytes, char *v)
598 {
599  clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
600  buf[n_buf_bytes - 1] = 0;
601 }
602 
603 static void
605 {
606  mc_main_t *mcm = va_arg (*va, mc_main_t *);
607  char *name;
608  mc_stream_t *s;
609  uword *p;
610 
611  unserialize_cstring (m, &name);
612 
613  if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
614  {
615  if (MC_EVENT_LOGGING > 0)
616  {
617  /* *INDENT-OFF* */
618  ELOG_TYPE_DECLARE (e) =
619  {
620  .format = "stream index %d already named %s",
621  .format_args = "i4s16",
622  };
623  /* *INDENT-ON* */
624  struct
625  {
626  u32 stream_index;
627  char name[16];
628  } *ed;
629  ed = ELOG_DATA (mcm->elog_main, e);
630  ed->stream_index = p[0];
631  elog_stream_name (ed->name, sizeof (ed->name), name);
632  }
633 
634  vec_free (name);
635  return;
636  }
637 
638  vec_add2 (mcm->stream_vector, s, 1);
639  mc_stream_init (s);
640  s->state = MC_STREAM_STATE_name_known;
641  s->index = s - mcm->stream_vector;
642  s->config.name = name;
643 
644  if (MC_EVENT_LOGGING > 0)
645  {
646  /* *INDENT-OFF* */
647  ELOG_TYPE_DECLARE (e) =
648  {
649  .format = "stream index %d named %s",
650  .format_args = "i4s16",
651  };
652  /* *INDENT-ON* */
653  struct
654  {
655  u32 stream_index;
656  char name[16];
657  } *ed;
658  ed = ELOG_DATA (mcm->elog_main, e);
659  ed->stream_index = s->index;
660  elog_stream_name (ed->name, sizeof (ed->name), name);
661  }
662 
663  hash_set_mem (mcm->stream_index_by_name, name, s->index);
664 
666  if (p)
667  {
670  vec_foreach (wp, w[0])
674  }
675 }
676 
677 /* *INDENT-OFF* */
678 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
679 {
680  .name = "mc_register_stream_name",
683 };
684 /* *INDENT-ON* */
685 
686 void
688  mc_stream_t * stream,
689  mc_peer_id_t peer_id, u32 buffer_index)
690 {
691  return mc_unserialize (mcm, stream, buffer_index);
692 }
693 
694 static u8 *
696  u8 * data_vector,
697  u32 last_global_sequence_processed)
698 {
700 
701  /* Append serialized data to data vector. */
702  serialize_open_vector (&m, data_vector);
703  m.stream.current_buffer_index = vec_len (data_vector);
704 
705  serialize (&m, serialize_mc_main, mcm);
706  return serialize_close_vector (&m);
707 }
708 
709 static void
710 mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
711 {
713 
714  unserialize_open_data (&s, data, n_data_bytes);
715 
716  unserialize (&s, unserialize_mc_main, mcm);
717 }
718 
719 /* Overridden from the application layer, not actually used here */
720 void mc_stream_join_process_hold (void) __attribute__ ((weak));
721 void
723 {
724 }
725 
726 static u32
728  mc_stream_config_t * config, u32 is_internal)
729 {
730  mc_stream_t *s;
731  vlib_main_t *vm = mcm->vlib_main;
732 
733  s = 0;
734  if (!is_internal)
735  {
736  uword *p;
737 
738  /* Already have a stream with given name? */
739  if ((s = mc_stream_by_name (mcm, config->name)))
740  {
741  /* Already joined and ready? */
742  if (s->state == MC_STREAM_STATE_ready)
743  return s->index;
744  }
745 
746  /* First join MC internal stream. */
747  if (!mcm->stream_vector
749  == MC_STREAM_STATE_invalid))
750  {
751  static mc_stream_config_t c = {
752  .name = "mc-internal",
753  .rx_buffer = mc_rx_buffer_unserialize,
754  .catchup = mc_internal_catchup,
755  .catchup_snapshot = mc_internal_catchup_snapshot,
756  };
757 
758  c.save_snapshot = config->save_snapshot;
759 
760  mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
761  }
762 
763  /* If stream is still unknown register this name and wait for
764  sequenced message to name stream. This way all peers agree
765  on stream name to index mappings. */
766  s = mc_stream_by_name (mcm, config->name);
767  if (!s)
768  {
770  u8 *name_copy = format (0, "%s", config->name);
771 
772  mc_serialize_stream (mcm,
774  &mc_register_stream_name_msg, config->name);
775 
776  /* Wait for this stream to be named. */
777  p =
779  name_copy);
780  if (p)
781  w =
783  p[0]);
784  else
785  {
788  mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
789  sizeof
790  (uword));
792  name_copy,
794  w[0] = 0;
795  }
796 
797  vec_add2 (w[0], wp, 1);
799  vec_free (name_copy);
800  }
801 
802  /* Name should be known now. */
803  s = mc_stream_by_name (mcm, config->name);
804  ASSERT (s != 0);
805  ASSERT (s->state == MC_STREAM_STATE_name_known);
806  }
807 
808  if (!s)
809  {
810  vec_add2 (mcm->stream_vector, s, 1);
811  mc_stream_init (s);
812  s->index = s - mcm->stream_vector;
813  }
814 
815  {
816  /* Save name since we could have already used it as hash key. */
817  char *name_save = s->config.name;
818 
819  s->config = config[0];
820 
821  if (name_save)
822  s->config.name = name_save;
823  }
824 
825  if (s->config.window_size == 0)
826  s->config.window_size = 8;
827 
828  if (s->config.retry_interval == 0.0)
829  s->config.retry_interval = 1.0;
830 
831  /* Sanity. */
832  ASSERT (s->config.retry_interval < 30);
833 
834  if (s->config.retry_limit == 0)
835  s->config.retry_limit = 7;
836 
837  s->state = MC_STREAM_STATE_join_in_progress;
838  if (!s->peer_index_by_id.hash)
839  mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
840 
841  /* If we don't hear from someone in 5 seconds, we're alone */
842  s->join_timeout = vlib_time_now (vm) + 5.0;
843  mcm->joins_in_progress++;
844 
845  if (MC_EVENT_LOGGING > 0)
846  {
847  /* *INDENT-OFF* */
848  ELOG_TYPE_DECLARE (e) =
849  {
850  .format = "stream index %d join request %s",
851  .format_args = "i4s16",
852  };
853  /* *INDENT-ON* */
854  struct
855  {
856  u32 stream_index;
857  char name[16];
858  } *ed;
859  ed = ELOG_DATA (mcm->elog_main, e);
860  ed->stream_index = s->index;
861  elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
862  }
863 
864  send_join_or_leave_request (mcm, s->index, 1 /* join */ );
865 
867  (vm, &s->procs_waiting_for_join_done);
868 
869  if (MC_EVENT_LOGGING)
870  {
871  ELOG_TYPE (e, "join complete stream %d");
872  ELOG (mcm->elog_main, e, s->index);
873  }
874 
875  return s->index;
876 }
877 
878 u32
880 {
881  return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
882 }
883 
884 void
885 mc_stream_leave (mc_main_t * mcm, u32 stream_index)
886 {
887  mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
888 
889  if (!s)
890  return;
891 
892  if (MC_EVENT_LOGGING)
893  {
894  /* *INDENT-OFF* */
895  ELOG_TYPE_DECLARE (t) =
896  {
897  .format = "leave-stream: %d",.format_args = "i4",
898  };
899  /* *INDENT-ON* */
900  struct
901  {
902  u32 index;
903  } *ed;
904  ed = ELOG_DATA (mcm->elog_main, t);
905  ed->index = stream_index;
906  }
907 
908  send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
909  mc_stream_free (s);
910  s->state = MC_STREAM_STATE_name_known;
911 }
912 
913 void
915  mc_msg_join_or_leave_request_t * req,
916  u32 buffer_index)
917 {
918  mc_stream_t *s;
919  mc_msg_join_reply_t *rep;
920  u32 bi;
921 
923 
924  s = mc_stream_by_index (mcm, req->stream_index);
925  if (!s || s->state != MC_STREAM_STATE_ready)
926  return;
927 
928  /* If the peer is joining, create it */
929  if (req->is_join)
930  {
931  mc_stream_t *this_s;
932 
933  /* We're not in a position to catch up a peer until all
934  stream joins are complete. */
935  if (0)
936  {
937  /* XXX This is hard to test so we've. */
938  vec_foreach (this_s, mcm->stream_vector)
939  {
940  if (this_s->state != MC_STREAM_STATE_ready
941  && this_s->state != MC_STREAM_STATE_name_known)
942  return;
943  }
944  }
945  else if (mcm->joins_in_progress > 0)
946  return;
947 
948  (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
949  /* created */ 0);
950 
951  rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
952  memset (rep, 0, sizeof (rep[0]));
953  rep->type = MC_MSG_TYPE_join_reply;
954  rep->stream_index = req->stream_index;
955 
957  /* These two are already in network byte order... */
958  rep->peer_id = mcm->transport.our_ack_peer_id;
959  rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
960 
962  }
963  else
964  {
965  if (s->config.peer_died)
966  s->config.peer_died (mcm, s, req->peer_id);
967  }
968 }
969 
970 void
972  mc_msg_join_reply_t * mp, u32 buffer_index)
973 {
974  mc_stream_t *s;
975 
977 
978  s = mc_stream_by_index (mcm, mp->stream_index);
979 
980  if (!s || s->state != MC_STREAM_STATE_join_in_progress)
981  return;
982 
983  /* Switch to catchup state; next join reply
984  for this stream will be ignored. */
985  s->state = MC_STREAM_STATE_catchup;
986 
987  mcm->joins_in_progress--;
989  mp->stream_index, mp->catchup_peer_id);
990 }
991 
992 void
993 mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
994 {
995  mc_stream_t *s;
996 
997  while (1)
998  {
999  s = mc_stream_by_name (m, stream_name);
1000  if (s)
1001  break;
1003  }
1004 
1005  /* It's OK to send a message in catchup and ready states. */
1006  if (s->state == MC_STREAM_STATE_catchup
1007  || s->state == MC_STREAM_STATE_ready)
1008  return;
1009 
1010  /* Otherwise we are waiting for a join to finish. */
1013 }
1014 
1015 u32
1016 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
1017 {
1018  mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
1019  vlib_main_t *vm = mcm->vlib_main;
1020  mc_retry_t *r;
1021  mc_msg_user_request_t *mp;
1022  vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1023  u32 ri;
1024 
1025  if (!s)
1026  return 0;
1027 
1028  if (s->state != MC_STREAM_STATE_ready)
1030  (vm, &s->procs_waiting_for_join_done);
1031 
1032  while (pool_elts (s->retry_pool) >= s->config.window_size)
1033  {
1036  }
1037 
1038  pool_get (s->retry_pool, r);
1039  ri = r - s->retry_pool;
1040 
1041  r->prev_index = s->retry_tail_index;
1042  r->next_index = ~0;
1043  s->retry_tail_index = ri;
1044 
1045  if (r->prev_index == ~0)
1046  s->retry_head_index = ri;
1047  else
1048  {
1050  p->next_index = ri;
1051  }
1052 
1053  vlib_buffer_advance (b, -sizeof (mp[0]));
1054  mp = vlib_buffer_get_current (b);
1055 
1056  mp->peer_id = mcm->transport.our_ack_peer_id;
1057  /* mp->transport.global_sequence set by relay agent. */
1058  mp->global_sequence = 0xdeadbeef;
1059  mp->stream_index = s->index;
1060  mp->local_sequence = s->our_local_sequence++;
1061  mp->n_data_bytes =
1062  vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
1063 
1064  r->buffer_index = buffer_index;
1065  r->local_sequence = mp->local_sequence;
1066  r->sent_at = vlib_time_now (vm);
1067  r->n_retries = 0;
1068 
1069  /* Retry will be freed when all currently known peers have acked. */
1072 
1074  r - s->retry_pool);
1075 
1076  elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
1077 
1079 
1080  mcm->transport.tx_buffer (mcm->transport.opaque,
1081  MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
1082 
1083  s->user_requests_sent++;
1084 
1085  /* return amount of window remaining */
1086  return s->config.window_size - pool_elts (s->retry_pool);
1087 }
1088 
1089 void
1090 mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
1091  u32 buffer_index)
1092 {
1093  vlib_main_t *vm = mcm->vlib_main;
1094  mc_stream_t *s;
1095  mc_stream_peer_t *peer;
1096  i32 seq_cmp_result;
1097  static int once = 0;
1098 
1100 
1101  s = mc_stream_by_index (mcm, mp->stream_index);
1102 
1103  /* Not signed up for this stream? Turf-o-matic */
1104  if (!s || s->state != MC_STREAM_STATE_ready)
1105  {
1106  vlib_buffer_free_one (vm, buffer_index);
1107  return;
1108  }
1109 
1110  /* Find peer, including ourselves. */
1111  peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1112  /* created */ 0);
1113 
1114  seq_cmp_result = mc_seq_cmp (mp->local_sequence,
1115  peer->last_sequence_received + 1);
1116 
1117  if (MC_EVENT_LOGGING > 0)
1118  {
1119  /* *INDENT-OFF* */
1120  ELOG_TYPE_DECLARE (e) =
1121  {
1122  .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1123  .format_args = "T4i4i4i4",
1124  };
1125  /* *INDENT-ON* */
1126  struct
1127  {
1128  u32 peer, stream_index, rx_sequence;
1129  i32 seq_cmp_result;
1130  } *ed;
1131  ed = ELOG_DATA (mcm->elog_main, e);
1132  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1133  ed->stream_index = mp->stream_index;
1134  ed->rx_sequence = mp->local_sequence;
1135  ed->seq_cmp_result = seq_cmp_result;
1136  }
1137 
1138  if (0 && mp->stream_index == 1 && once == 0)
1139  {
1140  once = 1;
1141  ELOG_TYPE (e, "FAKE lost msg on stream 1");
1142  ELOG (mcm->elog_main, e, 0);
1143  return;
1144  }
1145 
1146  peer->last_sequence_received += seq_cmp_result == 0;
1148 
1149  if (seq_cmp_result > 0)
1150  peer->stats.n_msgs_from_future += 1;
1151 
1152  /* Send ack even if msg from future */
1153  if (1)
1154  {
1155  mc_msg_user_ack_t *rp;
1156  u32 bi;
1157 
1158  rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1159  rp->peer_id = mcm->transport.our_ack_peer_id;
1160  rp->stream_index = s->index;
1161  rp->local_sequence = mp->local_sequence;
1162  rp->seq_cmp_result = seq_cmp_result;
1163 
1164  if (MC_EVENT_LOGGING > 0)
1165  {
1166  /* *INDENT-OFF* */
1167  ELOG_TYPE_DECLARE (e) =
1168  {
1169  .format = "tx-ack: stream %d local seq %d",
1170  .format_args = "i4i4",
1171  };
1172  /* *INDENT-ON* */
1173  struct
1174  {
1175  u32 stream_index;
1176  u32 local_sequence;
1177  } *ed;
1178  ed = ELOG_DATA (mcm->elog_main, e);
1179  ed->stream_index = rp->stream_index;
1180  ed->local_sequence = rp->local_sequence;
1181  }
1182 
1184 
1185  mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1186  /* Msg from past? If so, free the buffer... */
1187  if (seq_cmp_result < 0)
1188  {
1189  vlib_buffer_free_one (vm, buffer_index);
1190  peer->stats.n_msgs_from_past += 1;
1191  }
1192  }
1193 
1194  if (seq_cmp_result == 0)
1195  {
1196  vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
1197  switch (s->state)
1198  {
1199  case MC_STREAM_STATE_ready:
1200  vlib_buffer_advance (b, sizeof (mp[0]));
1201  s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
1202 
1203  /* Stream vector can change address via rx callback for mc-internal
1204  stream. */
1205  s = mc_stream_by_index (mcm, mp->stream_index);
1206  ASSERT (s != 0);
1207  s->last_global_sequence_processed = mp->global_sequence;
1208  break;
1209 
1210  case MC_STREAM_STATE_catchup:
1211  clib_fifo_add1 (s->catchup_fifo, buffer_index);
1212  break;
1213 
1214  default:
1215  clib_warning ("stream in unknown state %U",
1217  break;
1218  }
1219  }
1220 }
1221 
1222 void
1223 mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
1224  u32 buffer_index)
1225 {
1226  vlib_main_t *vm = mcm->vlib_main;
1227  uword *p;
1228  mc_stream_t *s;
1229  mc_stream_peer_t *peer;
1230  mc_retry_t *r;
1231  int peer_created = 0;
1232 
1234 
1235  s = mc_stream_by_index (mcm, mp->stream_index);
1236 
1237  if (MC_EVENT_LOGGING > 0)
1238  {
1239  /* *INDENT-OFF* */
1240  ELOG_TYPE_DECLARE (t) =
1241  {
1242  .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1243  .format_args = "i4T4i4",
1244  };
1245  /* *INDENT-ON* */
1246 
1247  struct
1248  {
1249  u32 local_sequence;
1250  u32 peer;
1251  i32 seq_cmp_result;
1252  } *ed;
1253  ed = ELOG_DATA (mcm->elog_main, t);
1254  ed->local_sequence = mp->local_sequence;
1255  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1256  ed->seq_cmp_result = mp->seq_cmp_result;
1257  }
1258 
1259  /* Unknown stream? */
1260  if (!s)
1261  return;
1262 
1263  /* Find the peer which just ack'ed. */
1264  peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1265  /* created */ &peer_created);
1266 
1267  /*
1268  * Peer reports message from the future. If it's not in the retry
1269  * fifo, look for a retired message.
1270  */
1271  if (mp->seq_cmp_result > 0)
1272  {
1273  p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1274  mp->seq_cmp_result);
1275  if (p == 0)
1276  mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1277 
1278  /* Normal retry should fix it... */
1279  return;
1280  }
1281 
1282  /*
1283  * Pointer to the indicated retry fifo entry.
1284  * Worth hashing because we could use a window size of 100 or 1000.
1285  */
1286  p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1287 
1288  /*
1289  * Is this a duplicate ACK, received after we've retired the
1290  * fifo entry. This can happen when learning about new
1291  * peers.
1292  */
1293  if (p == 0)
1294  {
1295  if (MC_EVENT_LOGGING > 0)
1296  {
1297  /* *INDENT-OFF* */
1298  ELOG_TYPE_DECLARE (t) =
1299  {
1300  .format = "ack: for seq %d from peer %s no fifo elt",
1301  .format_args = "i4T4",
1302  };
1303  /* *INDENT-ON* */
1304 
1305  struct
1306  {
1307  u32 seq;
1308  u32 peer;
1309  } *ed;
1310  ed = ELOG_DATA (mcm->elog_main, t);
1311  ed->seq = mp->local_sequence;
1312  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1313  }
1314 
1315  return;
1316  }
1317 
1318  r = pool_elt_at_index (s->retry_pool, p[0]);
1319 
1320  /* Make sure that this new peer ACKs our msgs from now on */
1321  if (peer_created)
1322  {
1323  mc_retry_t *later_retry = next_retry (s, r);
1324 
1325  while (later_retry)
1326  {
1327  later_retry->unacked_by_peer_bitmap =
1328  clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1329  peer - s->peers);
1330  later_retry = next_retry (s, later_retry);
1331  }
1332  }
1333 
1334  ASSERT (mp->local_sequence == r->local_sequence);
1335 
1336  /* If we weren't expecting to hear from this peer */
1337  if (!peer_created &&
1338  !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1339  {
1340  if (MC_EVENT_LOGGING > 0)
1341  {
1342  /* *INDENT-OFF* */
1343  ELOG_TYPE_DECLARE (t) =
1344  {
1345  .format = "dup-ack: for seq %d from peer %s",
1346  .format_args = "i4T4",
1347  };
1348  /* *INDENT-ON* */
1349  struct
1350  {
1351  u32 seq;
1352  u32 peer;
1353  } *ed;
1354  ed = ELOG_DATA (mcm->elog_main, t);
1355  ed->seq = r->local_sequence;
1356  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1357  }
1359  return;
1360  }
1361 
1362  if (MC_EVENT_LOGGING > 0)
1363  {
1364  /* *INDENT-OFF* */
1365  ELOG_TYPE_DECLARE (t) =
1366  {
1367  .format = "ack: for seq %d from peer %s",
1368  .format_args = "i4T4",
1369  };
1370  /* *INDENT-ON* */
1371  struct
1372  {
1373  u32 seq;
1374  u32 peer;
1375  } *ed;
1376  ed = ELOG_DATA (mcm->elog_main, t);
1377  ed->seq = mp->local_sequence;
1378  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1379  }
1380 
1382  clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1383 
1384  /* Not all clients have ack'ed */
1386  {
1387  return;
1388  }
1389  if (MC_EVENT_LOGGING > 0)
1390  {
1391  /* *INDENT-OFF* */
1392  ELOG_TYPE_DECLARE (t) =
1393  {
1394  .format = "ack: retire fifo elt loc seq %d after %d acks",
1395  .format_args = "i4i4",
1396  };
1397  /* *INDENT-ON* */
1398  struct
1399  {
1400  u32 seq;
1401  u32 npeers;
1402  } *ed;
1403  ed = ELOG_DATA (mcm->elog_main, t);
1404  ed->seq = r->local_sequence;
1405  ed->npeers = pool_elts (s->peers);
1406  }
1407 
1408  hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1409  mc_retry_free (mcm, s, r);
1410  remove_retry_from_pool (s, r);
1412 }
1413 
1414 #define EVENT_MC_SEND_CATCHUP_DATA 0
1415 
1416 static uword
1418  vlib_node_runtime_t * node, vlib_frame_t * f)
1419 {
1420  mc_main_t *mcm = mc_node_get_main (node);
1421  uword *event_data = 0;
1423  int i;
1424 
1425  while (1)
1426  {
1427  if (event_data)
1428  _vec_len (event_data) = 0;
1429  vlib_process_wait_for_event_with_type (vm, &event_data,
1431 
1432  for (i = 0; i < vec_len (event_data); i++)
1433  {
1434  args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
1435 
1437  args->catchup_opaque,
1438  args->catchup_snapshot);
1439 
1440  /* Send function will free snapshot data vector. */
1441  pool_put (mcm->catchup_process_args, args);
1442  }
1443  }
1444 
1445  return 0; /* not likely */
1446 }
1447 
1448 static void
1450 {
1451  mc_stream_t *s = va_arg (*va, mc_stream_t *);
1452  mc_stream_peer_t *p;
1453 
1454  serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1455  /* *INDENT-OFF* */
1456  pool_foreach (p, s->peers, ({
1457  u8 * x = serialize_get (m, sizeof (p->id));
1458  clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1459  serialize_integer (m, p->last_sequence_received,
1460  sizeof (p->last_sequence_received));
1461  }));
1462 /* *INDENT-ON* */
1464 }
1465 
1466 void
1468 {
1469  mc_stream_t *s = va_arg (*va, mc_stream_t *);
1470  u32 i, n_peers;
1471  mc_stream_peer_t *p;
1472 
1473  unserialize_integer (m, &n_peers, sizeof (u32));
1474  mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1475  for (i = 0; i < n_peers; i++)
1476  {
1477  u8 *x;
1478  pool_get (s->peers, p);
1479  x = unserialize_get (m, sizeof (p->id));
1480  clib_memcpy (p->id.as_u8, x, sizeof (p->id));
1482  sizeof (p->last_sequence_received));
1483  mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
1484  0);
1485  }
1487 
1488  /* This is really bad. */
1489  if (!s->all_peer_bitmap)
1490  clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1491 }
1492 
1493 void
1495  mc_msg_catchup_request_t * req,
1496  u32 catchup_opaque)
1497 {
1498  vlib_main_t *vm = mcm->vlib_main;
1499  mc_stream_t *s;
1501 
1503 
1504  s = mc_stream_by_index (mcm, req->stream_index);
1505  if (!s || s->state != MC_STREAM_STATE_ready)
1506  return;
1507 
1508  if (MC_EVENT_LOGGING > 0)
1509  {
1510  /* *INDENT-OFF* */
1511  ELOG_TYPE_DECLARE (t) =
1512  {
1513  .format = "catchup-request: from %s stream %d",
1514  .format_args = "T4i4",
1515  };
1516  /* *INDENT-ON* */
1517  struct
1518  {
1519  u32 peer, stream;
1520  } *ed;
1521  ed = ELOG_DATA (mcm->elog_main, t);
1522  ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1523  ed->stream = req->stream_index;
1524  }
1525 
1526  /*
1527  * The application has to snapshoot its data structures right
1528  * here, right now. If we process any messages after
1529  * noting the last global sequence we've processed, the client
1530  * won't be able to accurately reconstruct our data structures.
1531  *
1532  * Once the data structures are e.g. vec_dup()'ed, we
1533  * send the resulting messages from a separate process, to
1534  * make sure that we don't cause a bunch of message retransmissions
1535  */
1536  pool_get (mcm->catchup_process_args, args);
1537 
1538  args->stream_index = s - mcm->stream_vector;
1539  args->catchup_opaque = catchup_opaque;
1540  args->catchup_snapshot = 0;
1541 
1542  /* Construct catchup reply and snapshot state for stream to send as
1543  catchup reply payload. */
1544  {
1545  mc_msg_catchup_reply_t *rep;
1546  serialize_main_t m;
1547 
1548  vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1549 
1550  rep = (void *) args->catchup_snapshot;
1551 
1552  rep->peer_id = req->peer_id;
1553  rep->stream_index = req->stream_index;
1554  rep->last_global_sequence_included = s->last_global_sequence_processed;
1555 
1556  /* Setup for serialize to append to catchup snapshot. */
1559 
1560  serialize (&m, serialize_mc_stream, s);
1561 
1563 
1564  /* Actually copy internal state */
1566  (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
1567 
1568  rep = (void *) args->catchup_snapshot;
1569  rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1570 
1572  }
1573 
1574  /* now go send it... */
1577  args - mcm->catchup_process_args);
1578 }
1579 
1580 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1581 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1582 
1583 void
1584 mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
1585  u32 catchup_opaque)
1586 {
1588  mcm->unserialize_process,
1590  pointer_to_uword (mp));
1591 }
1592 
1593 static void
1594 perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1595 {
1596  mc_stream_t *s;
1597  i32 seq_cmp_result;
1598 
1600 
1601  s = mc_stream_by_index (mcm, mp->stream_index);
1602 
1603  /* Never heard of this stream or already caught up. */
1604  if (!s || s->state == MC_STREAM_STATE_ready)
1605  return;
1606 
1607  {
1608  serialize_main_t m;
1609  mc_stream_peer_t *p;
1610  u32 n_stream_bytes;
1611 
1612  /* For offline sim replay: save the entire catchup snapshot... */
1613  if (s->config.save_snapshot)
1614  s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
1615  mp->n_data_bytes);
1616 
1617  unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1619 
1620  /* Make sure we start numbering our messages as expected */
1621  /* *INDENT-OFF* */
1622  pool_foreach (p, s->peers, ({
1623  if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1624  s->our_local_sequence = p->last_sequence_received + 1;
1625  }));
1626 /* *INDENT-ON* */
1627 
1628  n_stream_bytes = m.stream.current_buffer_index;
1629 
1630  /* No need to unserialize close; nothing to free. */
1631 
1632  /* After serialized stream is user's catchup data. */
1633  s->config.catchup (mcm, mp->data + n_stream_bytes,
1634  mp->n_data_bytes - n_stream_bytes);
1635  }
1636 
1637  /* Vector could have been moved by catchup.
1638  This can only happen for mc-internal stream. */
1639  s = mc_stream_by_index (mcm, mp->stream_index);
1640 
1641  s->last_global_sequence_processed = mp->last_global_sequence_included;
1642 
1643  while (clib_fifo_elts (s->catchup_fifo))
1644  {
1645  mc_msg_user_request_t *gp;
1646  u32 bi;
1647  vlib_buffer_t *b;
1648 
1649  clib_fifo_sub1 (s->catchup_fifo, bi);
1650 
1651  b = vlib_get_buffer (mcm->vlib_main, bi);
1652  gp = vlib_buffer_get_current (b);
1653 
1654  /* Make sure we're replaying "new" news */
1655  seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1656  mp->last_global_sequence_included);
1657 
1658  if (seq_cmp_result > 0)
1659  {
1660  vlib_buffer_advance (b, sizeof (gp[0]));
1661  s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1662  s->last_global_sequence_processed = gp->global_sequence;
1663 
1664  if (MC_EVENT_LOGGING)
1665  {
1666  /* *INDENT-OFF* */
1667  ELOG_TYPE_DECLARE (t) =
1668  {
1669  .format = "catchup replay local sequence 0x%x",
1670  .format_args = "i4",
1671  };
1672  /* *INDENT-ON* */
1673  struct
1674  {
1675  u32 local_sequence;
1676  } *ed;
1677  ed = ELOG_DATA (mcm->elog_main, t);
1678  ed->local_sequence = gp->local_sequence;
1679  }
1680  }
1681  else
1682  {
1683  if (MC_EVENT_LOGGING)
1684  {
1685  /* *INDENT-OFF* */
1686  ELOG_TYPE_DECLARE (t) =
1687  {
1688  .format = "catchup discard local sequence 0x%x",
1689  .format_args = "i4",
1690  };
1691  /* *INDENT-ON* */
1692  struct
1693  {
1694  u32 local_sequence;
1695  } *ed;
1696  ed = ELOG_DATA (mcm->elog_main, t);
1697  ed->local_sequence = gp->local_sequence;
1698  }
1699 
1700  vlib_buffer_free_one (mcm->vlib_main, bi);
1701  }
1702  }
1703 
1704  s->state = MC_STREAM_STATE_ready;
1705 
1706  /* Now that we are caught up wake up joining process. */
1707  {
1712  _vec_len (s->procs_waiting_for_join_done) = 0;
1713  }
1714 }
1715 
1716 static void
1718 {
1719  vlib_main_t *vm = mcm->vlib_main;
1720  mc_msg_master_assert_t *mp;
1721  uword event_type;
1722  int timeouts = 0;
1723  int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1724  clib_error_t *error;
1725  f64 now, time_last_master_assert = -1;
1726  u32 bi;
1727 
1728  while (1)
1729  {
1730  if (!mcm->we_can_be_relay_master)
1731  {
1733  if (MC_EVENT_LOGGING)
1734  {
1735  ELOG_TYPE (e, "become slave (config)");
1736  ELOG (mcm->elog_main, e, 0);
1737  }
1738  return;
1739  }
1740 
1741  now = vlib_time_now (vm);
1742  if (now >= time_last_master_assert + 1)
1743  {
1744  time_last_master_assert = now;
1745  mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1746 
1747  mp->peer_id = mcm->transport.our_ack_peer_id;
1748  mp->global_sequence = mcm->relay_global_sequence;
1749 
1750  /*
1751  * these messages clog the event log, set MC_EVENT_LOGGING higher
1752  * if you want them
1753  */
1754  if (MC_EVENT_LOGGING > 1)
1755  {
1756  /* *INDENT-OFF* */
1757  ELOG_TYPE_DECLARE (e) =
1758  {
1759  .format = "tx-massert: peer %s global seq %u",
1760  .format_args = "T4i4",
1761  };
1762  /* *INDENT-ON* */
1763  struct
1764  {
1765  u32 peer, global_sequence;
1766  } *ed;
1767  ed = ELOG_DATA (mcm->elog_main, e);
1768  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1769  ed->global_sequence = mp->global_sequence;
1770  }
1771 
1773 
1774  error =
1775  mcm->transport.tx_buffer (mcm->transport.opaque,
1777  if (error)
1778  clib_error_report (error);
1779  }
1780 
1782  event_type = vlib_process_get_events (vm, /* no event data */ 0);
1783 
1784  switch (event_type)
1785  {
1786  case ~0:
1787  if (!is_master && timeouts++ > 2)
1788  {
1790  mcm->relay_master_peer_id =
1792  if (MC_EVENT_LOGGING)
1793  {
1794  ELOG_TYPE (e, "become master (was maybe_master)");
1795  ELOG (mcm->elog_main, e, 0);
1796  }
1797  return;
1798  }
1799  break;
1800 
1801  case MC_RELAY_STATE_SLAVE:
1804  {
1805  ELOG_TYPE (e, "become slave (was maybe_master)");
1806  ELOG (mcm->elog_main, e, 0);
1807  }
1808  return;
1809  }
1810  }
1811 }
1812 
1813 static void
1815 {
1816  vlib_main_t *vm = mcm->vlib_main;
1817  uword event_type;
1818  int timeouts = 0;
1819 
1820  if (MC_EVENT_LOGGING)
1821  {
1822  ELOG_TYPE (e, "become slave");
1823  ELOG (mcm->elog_main, e, 0);
1824  }
1825 
1826  while (1)
1827  {
1829  event_type = vlib_process_get_events (vm, /* no event data */ 0);
1830 
1831  switch (event_type)
1832  {
1833  case ~0:
1834  if (timeouts++ > 2)
1835  {
1837  mcm->relay_master_peer_id = ~0ULL;
1838  if (MC_EVENT_LOGGING)
1839  {
1840  ELOG_TYPE (e, "timeouts; negoitate mastership");
1841  ELOG (mcm->elog_main, e, 0);
1842  }
1843  return;
1844  }
1845  break;
1846 
1847  case MC_RELAY_STATE_SLAVE:
1849  timeouts = 0;
1850  break;
1851  }
1852  }
1853 }
1854 
1855 static uword
1857  vlib_node_runtime_t * node, vlib_frame_t * f)
1858 {
1859  mc_main_t *mcm = mc_node_get_main (node);
1860 
1861  while (1)
1862  {
1863  switch (mcm->relay_state)
1864  {
1866  case MC_RELAY_STATE_MASTER:
1867  this_node_maybe_master (mcm);
1868  break;
1869 
1870  case MC_RELAY_STATE_SLAVE:
1871  this_node_slave (mcm);
1872  break;
1873  }
1874  }
1875  return 0; /* not likely */
1876 }
1877 
1878 void
1879 mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1880 {
1881  if (we_can_be_master != mcm->we_can_be_relay_master)
1882  {
1883  mcm->we_can_be_relay_master = we_can_be_master;
1885  mcm->mastership_process,
1887  }
1888 }
1889 
1890 void
1891 mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
1892  u32 buffer_index)
1893 {
1894  mc_peer_id_t his_peer_id, our_peer_id;
1895  i32 seq_cmp_result;
1896  u8 signal_slave = 0;
1897  u8 update_global_sequence = 0;
1898 
1900 
1901  his_peer_id = mp->peer_id;
1902  our_peer_id = mcm->transport.our_ack_peer_id;
1903 
1904  /* compare the incoming global sequence with ours */
1905  seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1906  mcm->relay_global_sequence);
1907 
1908  /* If the sender has a lower peer id and the sender's sequence >=
1909  our global sequence, we become a slave. Otherwise we are master. */
1910  if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
1911  && seq_cmp_result >= 0)
1912  {
1914  mcm->mastership_process,
1916  signal_slave = 1;
1917  }
1918 
1919  /* Update our global sequence. */
1920  if (seq_cmp_result > 0)
1921  {
1922  mcm->relay_global_sequence = mp->global_sequence;
1923  update_global_sequence = 1;
1924  }
1925 
1926  {
1927  uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1929 
1930  if (q)
1931  p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1932  else
1933  {
1934  vec_add2 (mcm->mastership_peers, p, 1);
1935  p->peer_id = his_peer_id;
1937  p - mcm->mastership_peers,
1938  /* old_value */ 0);
1939  }
1941  }
1942 
1943  /*
1944  * these messages clog the event log, set MC_EVENT_LOGGING higher
1945  * if you want them.
1946  */
1947  if (MC_EVENT_LOGGING > 1)
1948  {
1949  /* *INDENT-OFF* */
1950  ELOG_TYPE_DECLARE (e) =
1951  {
1952  .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1953  .format_args = "T4i4i1i1",
1954  };
1955  /* *INDENT-ON* */
1956 
1957  struct
1958  {
1959  u32 peer;
1960  u32 global_sequence;
1961  u8 update_sequence;
1962  u8 slave;
1963  } *ed;
1964  ed = ELOG_DATA (mcm->elog_main, e);
1965  ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1966  ed->global_sequence = mp->global_sequence;
1967  ed->update_sequence = update_global_sequence;
1968  ed->slave = signal_slave;
1969  }
1970 }
1971 
1972 static void
1974 {
1975  mc_serialize_msg_t *m;
1976  vlib_main_t *vm = vlib_get_main ();
1977 
1979  = hash_create_string ( /* elts */ 0, sizeof (uword));
1980 
1981  m = vm->mc_msg_registrations;
1982 
1983  while (m)
1984  {
1985  m->global_index = vec_len (mcm->global_msgs);
1986  hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
1987  vec_add1 (mcm->global_msgs, m);
1988  m = m->next_registration;
1989  }
1990 }
1991 
1992 clib_error_t *
1994  u32 stream_index,
1995  u32 multiple_messages_per_vlib_buffer,
1996  mc_serialize_msg_t * msg, va_list * va)
1997 {
1998  mc_stream_t *s;
1999  clib_error_t *error;
2002  u32 bi, n_before, n_after, n_total, n_this_msg;
2003  u32 si, gi;
2004 
2005  if (!sbm->vlib_main)
2006  {
2007  sbm->tx.max_n_data_bytes_per_chain = 4096;
2008  sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
2009  }
2010 
2011  if (sbm->first_buffer == 0)
2012  serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
2013 
2014  n_before = serialize_vlib_buffer_n_bytes (m);
2015 
2016  s = mc_stream_by_index (mc, stream_index);
2017  gi = msg->global_index;
2018  ASSERT (msg == vec_elt (mc->global_msgs, gi));
2019 
2020  si = ~0;
2023 
2025 
2026  /* For first time message is sent, use name to identify message. */
2027  if (si == ~0 || MSG_ID_DEBUG)
2028  serialize_cstring (m, msg->name);
2029 
2030  if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2031  {
2032  /* *INDENT-OFF* */
2033  ELOG_TYPE_DECLARE (e) =
2034  {
2035  .format = "serialize-msg: %s index %d",
2036  .format_args = "T4i4",
2037  };
2038  /* *INDENT-ON* */
2039  struct
2040  {
2041  u32 c[2];
2042  } *ed;
2043  ed = ELOG_DATA (mc->elog_main, e);
2044  ed->c[0] = elog_id_for_msg_name (mc, msg->name);
2045  ed->c[1] = si;
2046  }
2047 
2048  error = va_serialize (m, va);
2049 
2050  n_after = serialize_vlib_buffer_n_bytes (m);
2051  n_this_msg = n_after - n_before;
2052  n_total = n_after + sizeof (mc_msg_user_request_t);
2053 
2054  /* For max message size ignore first message where string name is sent. */
2055  if (si != ~0)
2056  msg->max_n_bytes_serialized =
2057  clib_max (msg->max_n_bytes_serialized, n_this_msg);
2058 
2059  if (!multiple_messages_per_vlib_buffer
2060  || si == ~0
2061  || n_total + msg->max_n_bytes_serialized >
2063  {
2064  bi = serialize_close_vlib_buffer (m);
2065  sbm->first_buffer = 0;
2066  if (!error)
2067  mc_stream_send (mc, stream_index, bi);
2068  else if (bi != ~0)
2069  vlib_buffer_free_one (mc->vlib_main, bi);
2070  }
2071 
2072  return error;
2073 }
2074 
2075 clib_error_t *
2077  u32 stream_index,
2078  u32 multiple_messages_per_vlib_buffer,
2079  mc_serialize_msg_t * msg, ...)
2080 {
2081  vlib_main_t *vm = mc->vlib_main;
2082  va_list va;
2083  clib_error_t *error;
2084 
2085  if (stream_index == ~0)
2086  {
2087  if (vm->mc_main && vm->mc_stream_index == ~0)
2090  stream_index = vm->mc_stream_index;
2091  }
2092 
2093  va_start (va, msg);
2094  error = mc_serialize_va (mc, stream_index,
2095  multiple_messages_per_vlib_buffer, msg, &va);
2096  va_end (va);
2097  return error;
2098 }
2099 
2100 uword
2102  mc_stream_t * s, serialize_main_t * m)
2103 {
2105  u32 gi, si;
2106 
2108 
2109  if (!(si == ~0 || MSG_ID_DEBUG))
2110  {
2111  sm = vec_elt_at_index (s->stream_msgs, si);
2112  gi = sm->global_index;
2113  }
2114  else
2115  {
2116  char *name;
2117 
2118  unserialize_cstring (m, &name);
2119 
2120  if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2121  {
2122  /* *INDENT-OFF* */
2123  ELOG_TYPE_DECLARE (e) =
2124  {
2125  .format = "unserialize-msg: %s rx index %d",
2126  .format_args = "T4i4",
2127  };
2128  /* *INDENT-ON* */
2129  struct
2130  {
2131  u32 c[2];
2132  } *ed;
2133  ed = ELOG_DATA (mcm->elog_main, e);
2134  ed->c[0] = elog_id_for_msg_name (mcm, name);
2135  ed->c[1] = si;
2136  }
2137 
2138  {
2139  uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
2140  gi = p ? p[0] : ~0;
2141  }
2142 
2143  /* Unknown message? */
2144  if (gi == ~0)
2145  {
2146  vec_free (name);
2147  goto done;
2148  }
2149 
2152 
2153  /* Stream local index unknown? Create it. */
2154  if (si == ~0)
2155  {
2156  vec_add2 (s->stream_msgs, sm, 1);
2157 
2158  si = sm - s->stream_msgs;
2159  sm->global_index = gi;
2161 
2162  if (MC_EVENT_LOGGING > 0)
2163  {
2164  /* *INDENT-OFF* */
2165  ELOG_TYPE_DECLARE (e) =
2166  {
2167  .format = "msg-bind: stream %d %s to index %d",
2168  .format_args = "i4T4i4",
2169  };
2170  /* *INDENT-ON* */
2171  struct
2172  {
2173  u32 c[3];
2174  } *ed;
2175  ed = ELOG_DATA (mcm->elog_main, e);
2176  ed->c[0] = s->index;
2177  ed->c[1] = elog_id_for_msg_name (mcm, name);
2178  ed->c[2] = si;
2179  }
2180  }
2181  else
2182  {
2183  sm = vec_elt_at_index (s->stream_msgs, si);
2184  if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
2185  {
2186  /* *INDENT-OFF* */
2187  ELOG_TYPE_DECLARE (e) =
2188  {
2189  .format = "msg-id-ERROR: %s index %d expected %d",
2190  .format_args = "T4i4i4",
2191  };
2192  /* *INDENT-ON* */
2193  struct
2194  {
2195  u32 c[3];
2196  } *ed;
2197  ed = ELOG_DATA (mcm->elog_main, e);
2198  ed->c[0] = elog_id_for_msg_name (mcm, name);
2199  ed->c[1] = si;
2200  ed->c[2] = ~0;
2201  if (sm->global_index <
2203  ed->c[2] =
2205  }
2206  }
2207 
2208  vec_free (name);
2209  }
2210 
2211  if (gi != ~0)
2212  {
2213  mc_serialize_msg_t *msg;
2214  msg = vec_elt (mcm->global_msgs, gi);
2215  unserialize (m, msg->unserialize, mcm);
2216  }
2217 
2218 done:
2219  return gi != ~0;
2220 }
2221 
2222 void
2223 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2224 {
2225  vlib_main_t *vm = mcm->vlib_main;
2229  mc_stream_t *stream;
2230  u32 buffer_index;
2231 
2232  sb =
2234  stream_and_buffer_index);
2235  buffer_index = sb->buffer_index;
2236  stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2238 
2239  if (stream->config.save_snapshot)
2240  {
2241  u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2242  static u8 *contents;
2243  vec_reset_length (contents);
2244  vec_validate (contents, n_bytes - 1);
2245  vlib_buffer_contents (vm, buffer_index, contents);
2246  stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
2247  n_bytes);
2248  }
2249 
2251 
2252  unserialize_open_vlib_buffer (m, vm, sbm);
2253 
2254  clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2255 
2256  while (unserialize_vlib_buffer_n_bytes (m) > 0)
2257  mc_unserialize_message (mcm, stream, m);
2258 
2259  /* Frees buffer. */
2261 }
2262 
2263 void
2264 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2265 {
2266  vlib_main_t *vm = mcm->vlib_main;
2269  sb->stream_index = s->index;
2270  sb->buffer_index = buffer_index;
2274 }
2275 
2276 static uword
2278  vlib_node_runtime_t * node, vlib_frame_t * f)
2279 {
2280  mc_main_t *mcm = mc_node_get_main (node);
2281  uword event_type, *event_data = 0;
2282  int i;
2283 
2284  while (1)
2285  {
2286  if (event_data)
2287  _vec_len (event_data) = 0;
2288 
2290  event_type = vlib_process_get_events (vm, &event_data);
2291  switch (event_type)
2292  {
2294  for (i = 0; i < vec_len (event_data); i++)
2295  mc_unserialize_internal (mcm, event_data[i]);
2296  break;
2297 
2299  for (i = 0; i < vec_len (event_data); i++)
2300  {
2301  u8 *mp = uword_to_pointer (event_data[i], u8 *);
2302  perform_catchup (mcm, (void *) mp);
2303  vec_free (mp);
2304  }
2305  break;
2306 
2307  default:
2308  break;
2309  }
2310  }
2311 
2312  return 0; /* not likely */
2313 }
2314 
2315 void
2317 {
2318  mc_main_t *mcm = va_arg (*va, mc_main_t *);
2319  mc_stream_t *s;
2321  mc_serialize_msg_t *msg;
2322 
2323  serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2324  vec_foreach (s, mcm->stream_vector)
2325  {
2326  /* Stream name. */
2327  serialize_cstring (m, s->config.name);
2328 
2329  /* Serialize global names for all sent messages. */
2330  serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2331  vec_foreach (sm, s->stream_msgs)
2332  {
2333  msg = vec_elt (mcm->global_msgs, sm->global_index);
2334  serialize_cstring (m, msg->name);
2335  }
2336  }
2337 }
2338 
2339 void
2341 {
2342  mc_main_t *mcm = va_arg (*va, mc_main_t *);
2343  u32 i, n_streams, n_stream_msgs;
2344  char *name;
2345  mc_stream_t *s;
2347 
2348  unserialize_integer (m, &n_streams, sizeof (u32));
2349  for (i = 0; i < n_streams; i++)
2350  {
2351  unserialize_cstring (m, &name);
2352  if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
2353  {
2354  vec_validate (mcm->stream_vector, i);
2355  s = vec_elt_at_index (mcm->stream_vector, i);
2356  mc_stream_init (s);
2357  s->index = s - mcm->stream_vector;
2358  s->config.name = name;
2359  s->state = MC_STREAM_STATE_name_known;
2361  }
2362  else
2363  vec_free (name);
2364 
2365  s = vec_elt_at_index (mcm->stream_vector, i);
2366 
2367  vec_free (s->stream_msgs);
2369 
2370  unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2371  vec_resize (s->stream_msgs, n_stream_msgs);
2372  vec_foreach (sm, s->stream_msgs)
2373  {
2374  uword *p;
2375  u32 si, gi;
2376 
2377  unserialize_cstring (m, &name);
2378  p = hash_get (mcm->global_msg_index_by_name, name);
2379  gi = p ? p[0] : ~0;
2380  si = sm - s->stream_msgs;
2381 
2382  if (MC_EVENT_LOGGING > 0)
2383  {
2384  /* *INDENT-OFF* */
2385  ELOG_TYPE_DECLARE (e) =
2386  {
2387  .format = "catchup-bind: %s to %d global index %d stream %d",
2388  .format_args = "T4i4i4i4",
2389  };
2390  /* *INDENT-ON* */
2391 
2392  struct
2393  {
2394  u32 c[4];
2395  } *ed;
2396  ed = ELOG_DATA (mcm->elog_main, e);
2397  ed->c[0] = elog_id_for_msg_name (mcm, name);
2398  ed->c[1] = si;
2399  ed->c[2] = gi;
2400  ed->c[3] = s->index;
2401  }
2402 
2403  vec_free (name);
2404 
2405  sm->global_index = gi;
2406  if (gi != ~0)
2407  {
2409  gi, ~0);
2411  }
2412  }
2413  }
2414 }
2415 
2416 void
2417 mc_main_init (mc_main_t * mcm, char *tag)
2418 {
2419  vlib_main_t *vm = vlib_get_main ();
2420 
2421  mcm->vlib_main = vm;
2422  mcm->elog_main = &vm->elog_main;
2423 
2424  mcm->relay_master_peer_id = ~0ULL;
2426 
2428  = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
2429 
2430  {
2432 
2433  memset (&r, 0, sizeof (r));
2434 
2435  r.type = VLIB_NODE_TYPE_PROCESS;
2436 
2437  /* Point runtime data to main instance. */
2438  r.runtime_data = &mcm;
2439  r.runtime_data_bytes = sizeof (&mcm);
2440 
2441  r.name = (char *) format (0, "mc-mastership-%s", tag);
2442  r.function = mc_mastership_process;
2443  mcm->mastership_process = vlib_register_node (vm, &r);
2444 
2445  r.name = (char *) format (0, "mc-join-ager-%s", tag);
2446  r.function = mc_join_ager_process;
2447  mcm->join_ager_process = vlib_register_node (vm, &r);
2448 
2449  r.name = (char *) format (0, "mc-retry-%s", tag);
2450  r.function = mc_retry_process;
2451  mcm->retry_process = vlib_register_node (vm, &r);
2452 
2453  r.name = (char *) format (0, "mc-catchup-%s", tag);
2454  r.function = mc_catchup_process;
2455  mcm->catchup_process = vlib_register_node (vm, &r);
2456 
2457  r.name = (char *) format (0, "mc-unserialize-%s", tag);
2458  r.function = mc_unserialize_process;
2459  mcm->unserialize_process = vlib_register_node (vm, &r);
2460  }
2461 
2462  if (MC_EVENT_LOGGING > 0)
2463  mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
2464  sizeof (mc_peer_id_t));
2465 
2466  mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
2467  sizeof (mc_peer_id_t));
2468  mc_serialize_init (mcm);
2469 }
2470 
2471 static u8 *
2472 format_mc_relay_state (u8 * s, va_list * args)
2473 {
2474  mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2475  char *t = 0;
2476  switch (state)
2477  {
2479  t = "negotiate";
2480  break;
2481  case MC_RELAY_STATE_MASTER:
2482  t = "master";
2483  break;
2484  case MC_RELAY_STATE_SLAVE:
2485  t = "slave";
2486  break;
2487  default:
2488  return format (s, "unknown 0x%x", state);
2489  }
2490 
2491  return format (s, "%s", t);
2492 }
2493 
2494 static u8 *
2495 format_mc_stream_state (u8 * s, va_list * args)
2496 {
2497  mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2498  char *t = 0;
2499  switch (state)
2500  {
2501 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2503 #undef _
2504  default:
2505  return format (s, "unknown 0x%x", state);
2506  }
2507 
2508  return format (s, "%s", t);
2509 }
2510 
2511 static int
2512 mc_peer_comp (void *a1, void *a2)
2513 {
2514  mc_stream_peer_t *p1 = a1;
2515  mc_stream_peer_t *p2 = a2;
2516 
2517  return mc_peer_id_compare (p1->id, p2->id);
2518 }
2519 
2520 u8 *
2521 format_mc_main (u8 * s, va_list * args)
2522 {
2523  mc_main_t *mcm = va_arg (*args, mc_main_t *);
2524  mc_stream_t *t;
2525  mc_stream_peer_t *p, *ps;
2526  uword indent = format_get_indent (s);
2527 
2528  s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2531 
2532  {
2534  f64 now = vlib_time_now (mcm->vlib_main);
2535  s = format (s, "\n%UMost recent mastership peers:",
2536  format_white_space, indent + 2);
2537  vec_foreach (mp, mcm->mastership_peers)
2538  {
2539  s = format (s, "\n%U%-30U%.4e",
2540  format_white_space, indent + 4,
2541  mcm->transport.format_peer_id, mp->peer_id,
2543  }
2544  }
2545 
2546  vec_foreach (t, mcm->stream_vector)
2547  {
2548  s = format (s, "\n%Ustream `%s' index %d",
2549  format_white_space, indent + 2, t->config.name, t->index);
2550 
2551  s = format (s, "\n%Ustate %U",
2552  format_white_space, indent + 4,
2554 
2555  s =
2556  format (s,
2557  "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2558  format_white_space, indent + 4, t->config.retry_interval,
2561 
2562  s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2563  format_white_space, indent + 4,
2565 
2566  s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2567  format_white_space, indent + 4,
2568  pool_elts (t->peers),
2570 
2571  ps = 0;
2572  /* *INDENT-OFF* */
2573  pool_foreach (p, t->peers,
2574  ({
2575  if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2576  vec_add1 (ps, p[0]);
2577  }));
2578  /* *INDENT-ON* */
2580  s = format (s, "\n%U%=30s%10s%16s%16s",
2581  format_white_space, indent + 6,
2582  "Peer", "Last seq", "Retries", "Future");
2583 
2584  vec_foreach (p, ps)
2585  {
2586  s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2587  format_white_space, indent + 6,
2588  mcm->transport.format_peer_id, p->id.as_u64,
2590  p->stats.n_msgs_from_past -
2595  p->id.as_u64 ? " (self)" : ""));
2596  }
2597  vec_free (ps);
2598  }
2599 
2600  return s;
2601 }
2602 
2603 /*
2604  * fd.io coding-style-patch-verification: ON
2605  *
2606  * Local Variables:
2607  * eval: (c-set-style "gnu")
2608  * End:
2609  */
static mc_stream_t * mc_stream_by_index(mc_main_t *m, u32 i)
Definition: mc.h:596
u32 last_global_sequence_processed
Definition: mc.h:461
u32 buffer_index
Definition: mc.h:324
void mc_main_init(mc_main_t *mcm, char *tag)
Definition: mc.c:2417
void mc_stream_join_process_hold(void)
Definition: mc.c:722
mc_stream_state_t state
Definition: mc.h:407
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:396
#define EVENT_MC_SEND_CATCHUP_DATA
Definition: mc.c:1414
static mc_main_t * mc_node_get_main(vlib_node_runtime_t *node)
Definition: mc.c:464
mhash_t mastership_peer_index_by_id
Definition: mc.h:534
Definition: mhash.h:46
static void delete_peer_with_index(mc_main_t *mcm, mc_stream_t *s, uword index, int notify_application)
Definition: mc.c:125
#define hash_set(h, key, value)
Definition: hash.h:254
void mc_unserialize(mc_main_t *mcm, mc_stream_t *s, u32 buffer_index)
Definition: mc.c:2264
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:343
static void mc_stream_init(mc_stream_t *s)
Definition: mc.h:489
mc_relay_state_t
Definition: mc.h:502
#define clib_min(x, y)
Definition: clib.h:326
static mc_retry_t * prev_retry(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:335
static void vlib_signal_one_time_waiting_process(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
Definition: node_funcs.h:961
static void mc_stream_free(mc_stream_t *s)
Definition: mc.h:477
elog_main_t * elog_main
Definition: mc.h:577
uword * all_peer_bitmap
Definition: mc.h:441
static u64 unserialize_likely_small_unsigned_integer(serialize_main_t *m)
Definition: serialize.h:254
u64 user_requests_received
Definition: mc.h:473
f64 sent_at
Definition: mc.h:339
u32 mc_stream_index
Definition: main.h:136
void mc_rx_buffer_unserialize(mc_main_t *mcm, mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.c:687
static f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
Suspend a cooperative multi-tasking thread Waits for an event, or for the indicated number of seconds...
Definition: node_funcs.h:684
static void elog_stream_name(char *buf, int n_buf_bytes, char *v)
Definition: mc.c:597
#define hash_unset(h, key)
Definition: hash.h:260
u8 runtime_data[0]
Definition: node.h:472
static uword * vlib_process_wait_for_event(vlib_main_t *vm)
Definition: node_funcs.h:604
static void mc_byte_swap_msg_join_reply(mc_msg_join_reply_t *r)
Definition: mc.h:126
void serialize_bitmap(serialize_main_t *m, uword *b)
Definition: serialize.c:359
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
static uword clib_fifo_elts(void *v)
Definition: fifo.h:66
mhash_t peer_index_by_id
Definition: mc.h:444
vlib_one_time_waiting_process_t * procs_waiting_for_join_done
Definition: mc.h:449
static uword mc_retry_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:471
u64 n_msgs_from_future
Definition: mc.h:308
vlib_one_time_waiting_process_t * procs_waiting_for_mc_stream_join
Definition: main.h:138
static void maybe_send_window_open_event(vlib_main_t *vm, mc_stream_t *stream)
Definition: mc.c:207
u32 last_sequence_received
Definition: mc.h:317
u32 mc_stream_send(mc_main_t *mcm, u32 stream_index, u32 buffer_index)
Definition: mc.c:1016
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:182
#define MSG_ID_DEBUG
Definition: mc.c:24
struct vlib_serialize_buffer_main_t::@22::@24 tx
struct vlib_main_t * vlib_main
Definition: buffer.h:330
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
Definition: mc.c:1891
#define ELOG(em, f, data)
Definition: elog.h:382
static void mc_byte_swap_msg_user_request(mc_msg_user_request_t *r)
Definition: mc.h:152
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:482
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
Definition: mc.c:1223
void(* rx_buffer)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.h:366
mc_serialize_msg_t ** global_msgs
Definition: mc.h:565
void * mc_get_vlib_buffer(vlib_main_t *vm, u32 n_bytes, u32 *bi_return)
Definition: mc.c:110
struct _vlib_node_registration vlib_node_registration_t
u32 serialize_close_vlib_buffer(serialize_main_t *m)
Definition: dpdk_buffer.c:1246
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:521
mc_stream_state_t
Definition: mc.h:396
u32 * catchup_fifo
Definition: mc.h:433
#define hash_set_mem(h, key, value)
Definition: hash.h:274
clib_error_t * mc_serialize_internal(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg,...)
Definition: mc.c:2076
#define clib_error_report(e)
Definition: error.h:125
void unserialize_mc_main(serialize_main_t *m, va_list *va)
Definition: mc.c:2340
add_epi add_epi sub_epi sub_epi adds_epu subs_epu i16x8 y
Definition: vector_sse2.h:299
u64 relay_master_peer_id
Definition: mc.h:529
static void mc_byte_swap_msg_catchup_reply(mc_msg_catchup_reply_t *r)
Definition: mc.h:213
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
Definition: mc.c:971
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
Definition: pool.h:200
struct vlib_serialize_buffer_main_t::@22::@25 rx
u32 mc_stream_join(mc_main_t *mcm, mc_stream_config_t *config)
Definition: mc.c:879
mc_stream_stats_t stats
Definition: mc.h:435
u32 index
Definition: mc.h:410
mc_main_t * mc_main
Definition: main.h:133
static u32 serialize_vlib_buffer_n_bytes(serialize_main_t *m)
Definition: buffer.h:374
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
f64 join_timeout
Definition: mc.h:447
u32 relay_global_sequence
Definition: mc.h:540
u32 vlib_register_node(vlib_main_t *vm, vlib_node_registration_t *r)
Definition: node.c:452
static void elog_tx_msg(mc_main_t *m, u32 stream_id, u32 local_sequence, u32 retry_count)
Definition: mc.c:69
mc_retry_t * retry_pool
Definition: mc.h:415
#define mc_serialize_stream(mc, si, msg, args...)
Definition: mc.h:648
void(* peer_died)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id)
Definition: mc.h:385
static uword vlib_process_suspend(vlib_main_t *vm, f64 dt)
Suspend a vlib cooperative multi-tasking thread for a period of time.
Definition: node_funcs.h:432
u32 retry_tail_index
Definition: mc.h:418
u32 n_retries
Definition: mc.h:330
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
Definition: mc.c:1584
static u8 * format_mc_relay_state(u8 *s, va_list *args)
Definition: mc.c:2472
#define pool_foreach(VAR, POOL, BODY)
Iterate through pool.
Definition: pool.h:348
mc_stream_config_t config
Definition: mc.h:405
void unserialize_close_vlib_buffer(serialize_main_t *m)
Definition: dpdk_buffer.c:1271
static uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
Return the first event type which has occurred and a vector of per-event data of that type...
Definition: node_funcs.h:527
void unserialize_open_data(serialize_main_t *m, u8 *data, uword n_data_bytes)
Definition: serialize.c:890
#define always_inline
Definition: clib.h:84
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:190
static uword clib_bitmap_is_zero(uword *ai)
predicate function; is an entire bitmap empty?
Definition: bitmap.h:57
u8 * format_white_space(u8 *s, va_list *va)
Definition: std-formats.c:113
static mc_retry_t * next_retry(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:342
int i32
Definition: types.h:81
static void perform_catchup(mc_main_t *mcm, mc_msg_catchup_reply_t *mp)
Definition: mc.c:1594
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
void * opaque
Definition: mc.h:291
serialize_main_t serialize_mains[VLIB_N_RX_TX]
Definition: mc.h:571
#define MC_STREAM_INDEX_INTERNAL
Definition: mc.h:413
static int mc_peer_comp(void *a1, void *a2)
Definition: mc.c:2512
#define clib_warning(format, args...)
Definition: error.h:59
unsigned long u64
Definition: types.h:89
mc_peer_id_t our_catchup_peer_id
Definition: mc.h:294
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
Definition: vec.h:201
u32 retry_head_index
Definition: mc.h:418
uword * stream_index_by_name
Definition: mc.h:546
uword * procs_waiting_for_stream_name_by_name
Definition: mc.h:548
static void check_retry(mc_main_t *mcm, mc_stream_t *s)
Definition: mc.c:367
u8 *(* catchup_snapshot)(struct mc_main_t *mc_main, u8 *snapshot_vector, u32 last_global_sequence_included)
Definition: mc.h:371
static uword pointer_to_uword(const void *p)
Definition: types.h:131
u32 join_ager_process
Definition: mc.h:559
#define hash_create_string(elts, value_bytes)
Definition: hash.h:652
mc_mastership_peer_t * mastership_peers
Definition: mc.h:531
char * name
Definition: main.h:98
#define hash_get(h, key)
Definition: hash.h:248
u32 next_index
Definition: mc.h:333
#define clib_bitmap_foreach(i, ai, body)
Macro to iterate across set bits in a bitmap.
Definition: bitmap.h:361
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:369
#define hash_unset_mem(h, key)
Definition: hash.h:280
static uword format_get_indent(u8 *s)
Definition: format.h:72
#define clib_fifo_sub1(f, e)
Definition: fifo.h:224
u64 as_u64
Definition: mc.h:43
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:82
static void vlib_process_signal_event(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
Definition: node_funcs.h:931
vlib_one_time_waiting_process_t * procs_waiting_for_open_window
Definition: mc.h:451
static void serialize_likely_small_unsigned_integer(serialize_main_t *m, u64 x)
Definition: serialize.h:218
uword * unacked_by_peer_bitmap
Definition: mc.h:336
u32 prev_index
Definition: mc.h:333
static void mc_byte_swap_msg_master_assert(mc_msg_master_assert_t *r)
Definition: mc.h:70
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:214
f64 time_last_master_assert_received
Definition: mc.h:513
u32 mastership_process
Definition: mc.h:558
#define ELOG_DATA(em, f)
Definition: elog.h:392
void serialize_open_vlib_buffer(serialize_main_t *m, struct vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
Definition: dpdk_buffer.c:1232
mc_catchup_process_arg_t * catchup_process_args
Definition: mc.h:554
void unserialize_mc_stream(serialize_main_t *m, va_list *va)
Definition: mc.c:1467
u32 catchup_process
Definition: mc.h:561
void(* catchup)(struct mc_main_t *mc_main, u8 *snapshot_data, u32 n_snapshot_data_bytes)
Definition: mc.h:376
static uword mc_mastership_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:1856
mc_peer_id_t peer_id
Definition: mc.h:511
static void * unserialize_get(serialize_main_t *m, uword n_bytes)
Definition: serialize.h:171
static uword mhash_set(mhash_t *h, void *key, uword new_value, uword *old_value)
Definition: mhash.h:117
#define EVENT_MC_UNSERIALIZE_BUFFER
Definition: mc.c:1580
#define clib_fifo_foreach(v, f, body)
Definition: fifo.h:279
u32 retry_process
Definition: mc.h:560
void unserialize_cstring(serialize_main_t *m, char **s)
Definition: serialize.c:178
#define uword_to_pointer(u, type)
Definition: types.h:136
static uword vlib_buffer_contents(vlib_main_t *vm, u32 buffer_index, u8 *contents)
Copy buffer contents to memory.
Definition: buffer_funcs.h:143
mc_peer_id_t our_ack_peer_id
Definition: mc.h:293
static format_function_t format_mc_stream_state
Definition: mc.c:26
static void mc_serialize_init(mc_main_t *mcm)
Definition: mc.c:1973
MC_SERIALIZE_MSG(mc_register_stream_name_msg, static)
static uword mc_catchup_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:1417
serialize_stream_t stream
Definition: serialize.h:147
clib_error_t * serialize(serialize_main_t *m,...)
Definition: serialize.c:671
void mhash_init(mhash_t *h, uword n_value_bytes, uword n_key_bytes)
Definition: mhash.c:168
vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]
Definition: mc.h:573
u32 current_buffer_index
Definition: serialize.h:62
svmdb_client_t * c
u32 retry_limit
Definition: mc.h:363
uword mc_unserialize_message(mc_main_t *mcm, mc_stream_t *s, serialize_main_t *m)
Definition: mc.c:2101
static uword * delete_retry_fifo_elt(mc_main_t *mcm, mc_stream_t *stream, mc_retry_t *r, uword *dead_peer_bitmap)
Definition: mc.c:298
void serialize_open_vector(serialize_main_t *m, u8 *vector)
Definition: serialize.c:908
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:300
Definition: mc.h:322
void serialize_mc_main(serialize_main_t *m, va_list *va)
Definition: mc.c:2316
u32 window_size
Definition: mc.h:357
#define clib_memcpy(a, b, c)
Definition: string.h:64
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:203
void mc_unserialize_internal(mc_main_t *mcm, u32 stream_and_buffer_index)
Definition: mc.c:2223
f64 retry_interval
Definition: mc.h:360
mhash_t elog_id_by_peer_id
Definition: mc.h:580
u8 * format_mc_main(u8 *s, va_list *args)
Definition: mc.c:2521
elog_main_t elog_main
Definition: main.h:141
static uword * mhash_get(mhash_t *h, void *key)
Definition: mhash.h:110
static uword mc_unserialize_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:2277
static void unserialize_integer(serialize_main_t *m, void *x, u32 n_bytes)
Definition: serialize.h:201
static void serialize_integer(serialize_main_t *m, u64 x, u32 n_bytes)
Definition: serialize.h:185
#define ELOG_TYPE(f, fmt)
Definition: elog.h:370
static int mc_peer_id_compare(mc_peer_id_t a, mc_peer_id_t b)
Definition: mc.h:54
mc_retry_t * retired_fifo
Definition: mc.h:426
static uword clib_bitmap_get(uword *ai, uword i)
Gets the ith bit value from a bitmap.
Definition: bitmap.h:197
#define ELOG_TYPE_DECLARE(f)
Definition: elog.h:350
void mc_enable_disable_mastership(mc_main_t *mcm, int we_can_be_master)
Definition: mc.c:1879
static void this_node_slave(mc_main_t *mcm)
Definition: mc.c:1814
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
Definition: mc.c:1090
u32 max_packet_size
Definition: mc.h:298
u8 as_u8[8]
Definition: mc.h:42
static void remove_retry_from_pool(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:349
#define VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX
Definition: buffer.h:306
mc_stream_stats_t stats_last_clear
Definition: mc.h:435
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
Definition: mc.h:284
Definition: mc.h:522
static uword mc_join_ager_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:513
#define pool_put_index(p, i)
Free pool element with given index.
Definition: pool.h:228
#define ASSERT(truth)
void mc_stream_leave(mc_main_t *mcm, u32 stream_index)
Definition: mc.c:885
unsigned int u32
Definition: types.h:88
static void mc_byte_swap_msg_user_ack(mc_msg_user_ack_t *r)
Definition: mc.h:173
char * name
Definition: mc.h:354
static mc_stream_t * mc_stream_by_name(mc_main_t *m, char *name)
Definition: mc.h:589
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
Definition: mc.c:914
static void serialize_mc_stream(serialize_main_t *m, va_list *va)
Definition: mc.c:1449
mc_serialize_stream_msg_t * stream_msgs
Definition: mc.h:464
static u32 elog_id_for_msg_name(mc_main_t *m, char *msg_name)
Definition: mc.c:46
mc_stream_peer_stats_t stats
Definition: mc.h:319
vhost_vring_state_t state
Definition: vhost-user.h:80
void(* save_snapshot)(struct mc_main_t *mc_main, u32 is_catchup, u8 *snapshot_data, u32 n_snapshot_data_bytes)
Definition: mc.h:380
mc_serialize_msg_t * mc_msg_registrations
Definition: main.h:169
void unserialize_open_vlib_buffer(serialize_main_t *m, struct vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
Definition: dpdk_buffer.c:1239
clib_error_t * unserialize(serialize_main_t *m,...)
Definition: serialize.c:683
#define clib_bitmap_free(v)
Free a bitmap.
Definition: bitmap.h:92
static uword vlib_buffer_index_length_in_chain(vlib_main_t *vm, u32 bi)
Get length in bytes of the buffer index buffer chain.
Definition: buffer_funcs.h:129
u32 we_can_be_relay_master
Definition: mc.h:527
u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
Definition: dpdk_buffer.c:643
uword * hash
Definition: mhash.h:69
u8 *( format_function_t)(u8 *s, va_list *args)
Definition: format.h:48
#define EVENT_MC_UNSERIALIZE_CATCHUP
Definition: mc.c:1581
vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool
Definition: mc.h:550
#define clib_max(x, y)
Definition: clib.h:319
u64 uword
Definition: types.h:112
static void vlib_current_process_wait_for_one_time_event_vector(vlib_main_t *vm, vlib_one_time_waiting_process_t **wps)
Definition: node_funcs.h:993
void serialize_cstring(serialize_main_t *m, char *s)
Definition: serialize.c:164
#define vec_elt(v, i)
Get vector value at index i.
uword * global_msg_index_by_name
Definition: mc.h:568
static void mc_internal_catchup(mc_main_t *mcm, u8 *data, u32 n_data_bytes)
Definition: mc.c:710
void mc_wait_for_stream_ready(mc_main_t *m, char *stream_name)
Definition: mc.c:993
Definition: defs.h:47
#define clib_fifo_add1(f, e)
Definition: fifo.h:192
int joins_in_progress
Definition: mc.h:552
#define vec_copy(DST, SRC)
Copy a vector, memcpy wrapper.
Definition: vec.h:351
uword * unserialize_bitmap(serialize_main_t *m)
Definition: serialize.c:377
u32 unserialize_process
Definition: mc.h:562
u32 elog_string(elog_main_t *em, char *fmt,...)
Definition: elog.c:525
mc_relay_state_t relay_state
Definition: mc.h:524
struct _mc_serialize_msg mc_serialize_msg_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:142
unsigned char u8
Definition: types.h:56
mc_stream_peer_stats_t stats_last_clear
Definition: mc.h:319
uword * retry_index_by_local_sequence
Definition: mc.h:429
mc_stream_peer_t * peers
Definition: mc.h:438
static void mc_byte_swap_msg_join_or_leave_request(mc_msg_join_or_leave_request_t *r)
Definition: mc.h:106
u64 user_requests_sent
Definition: mc.h:472
static uword vlib_process_wait_for_event_with_type(vlib_main_t *vm, uword **data_vector, uword with_type_opaque)
Definition: node_funcs.h:649
uword * elog_id_by_msg_name
Definition: mc.h:582
mc_peer_id_t id
Definition: mc.h:314
u64 n_msgs_from_past
Definition: mc.h:307
clib_error_t * va_serialize(serialize_main_t *sm, va_list *va)
Definition: serialize.c:650
u64 n_retries
Definition: mc.h:345
#define vec_sort_with_function(vec, f)
Sort a vector using the supplied element comparison function.
Definition: vec.h:920
mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers
Definition: mc.h:585
static mc_stream_peer_t * get_or_create_peer_with_id(mc_main_t *mcm, mc_stream_t *s, mc_peer_id_t id, int *created)
Definition: mc.c:156
static u32 elog_id_for_peer_id(mc_main_t *m, u64 peer_id)
Definition: mc.c:29
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
Definition: mc.h:287
format_function_t * format_peer_id
Definition: mc.h:300
static u8 * mc_internal_catchup_snapshot(mc_main_t *mcm, u8 *data_vector, u32 last_global_sequence_processed)
Definition: mc.c:695
#define hash_get_mem(h, key)
Definition: hash.h:268
u32 our_local_sequence
Definition: mc.h:454
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
Definition: mc.h:277
static uword vlib_in_process_context(vlib_main_t *vm)
Definition: node_funcs.h:404
#define clib_fifo_add2(f, p)
Definition: fifo.h:200
static void this_node_maybe_master(mc_main_t *mcm)
Definition: mc.c:1717
static u32 mc_stream_join_helper(mc_main_t *mcm, mc_stream_config_t *config, u32 is_internal)
Definition: mc.c:727
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:418
#define MC_EVENT_LOGGING
Definition: mc.h:27
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.h:280
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
Definition: mc.c:1494
u8 data[0]
Packet data.
Definition: buffer.h:154
#define vec_foreach(var, vec)
Vector iterator.
void * serialize_close_vector(serialize_main_t *m)
Definition: serialize.c:918
static void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
Definition: buffer_funcs.h:322
static void unserialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
Definition: mc.c:604
u32 local_sequence
Definition: mc.h:327
clib_error_t * mc_serialize_va(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg, va_list *va)
Definition: mc.c:1993
#define vec_validate_init_empty(V, I, INIT)
Make sure vector is long enough for given index and initialize empty space (no header, unspecified alignment)
Definition: vec.h:445
static i32 mc_seq_cmp(u32 x, u32 y)
Definition: mc.c:104
static void mc_retry_free(mc_main_t *mcm, mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:222
u32 * stream_msg_index_by_global_index
Definition: mc.h:467
struct vlib_main_t * vlib_main
Definition: mc.h:576
static void send_join_or_leave_request(mc_main_t *mcm, u32 stream_index, u32 is_join)
Definition: mc.c:490
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:69
mc_transport_t transport
Definition: mc.h:537
static void mc_resend_retired(mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
Definition: mc.c:244
mc_stream_t * stream_vector
Definition: mc.h:543
Definition: defs.h:46
static void vlib_current_process_wait_for_one_time_event(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
Definition: node_funcs.h:980
static void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
Definition: mc.h:192
static u32 unserialize_vlib_buffer_n_bytes(serialize_main_t *m)
Definition: buffer_funcs.h:566
static void serialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
Definition: mc.c:590
static uword pool_elts(void *v)
Number of active elements in a pool.
Definition: pool.h:109