24 #define MSG_ID_DEBUG 0 60 name_copy =
format (0,
"%s%c", msg_name, 0);
77 .format =
"tx-msg: stream %d local seq %d attempt %d",
78 .format_args =
"i4i4i4",
83 u32 stream_id, local_sequence, retry_count;
86 ed->stream_id = stream_id;
87 ed->local_sequence = local_sequence;
88 ed->retry_count = retry_count;
121 return (
void *) b->
data;
126 uword index,
int notify_application)
140 .format =
"delete peer %s from all_peer_bitmap",
169 memset (p, 0,
sizeof (p[0]));
182 .format =
"get_or_create %s peer %s stream %d seq %d",
183 .format_args =
"t4T4i4i4",
192 u32 is_new, peer, stream_index, rx_sequence;
196 ed->is_new = q ? 0 : 1;
198 ed->stream_index = s->
index;
253 .format =
"resend-retired: search for local seq %d",
262 ed->local_sequence = local_sequence;
268 if (retry->local_sequence == local_sequence)
270 elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
271 mcm->transport.tx_buffer (mcm->transport.opaque,
272 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
273 retry->buffer_index);
284 .format =
"resend-retired: FAILED search for local seq %d",
293 ed->local_sequence = local_sequence;
306 uword pi = p - stream->peers;
307 uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
310 dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
312 if (MC_EVENT_LOGGING > 0)
314 ELOG_TYPE_DECLARE (e) = {
315 .format =
"delete_retry_fifo_elt: peer %s is %s",
316 .format_args =
"T4t4",
318 .enum_strings = {
"alive",
"dead", },
320 struct { u32 peer, is_alive; } * ed;
321 ed = ELOG_DATA (mcm->elog_main, e);
322 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
323 ed->is_alive = is_alive;
328 hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
331 return dead_peer_bitmap;
372 uword *dead_peer_bitmap = 0;
399 .format =
"resend local seq %d attempt %d",
400 .format_args =
"i4i4",
406 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
408 ELOG_TYPE_DECLARE (ev) = {
409 .format =
"resend: needed by peer %s local seq %d",
410 .format_args =
"T4i4",
412 struct { u32 peer, rx_sequence; } * ed;
413 ed = ELOG_DATA (mcm->elog_main, ev);
414 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
415 ed->rx_sequence = r->local_sequence;
454 r->unacked_by_peer_bitmap =
455 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
482 if (s->
state != MC_STREAM_STATE_invalid)
493 mc_msg_join_or_leave_request_t *mp;
497 memset (mp, 0,
sizeof (*mp));
498 mp->type = MC_MSG_TYPE_join_or_leave_request;
500 mp->stream_index = stream_index;
501 mp->is_join = is_join;
528 if (s->
state != MC_STREAM_STATE_join_in_progress)
533 s->
state = MC_STREAM_STATE_ready;
540 .format =
"stream %d join timeout",
574 .format =
"stream %d resend join request",
592 char *
name = va_arg (*va,
char *);
600 buf[n_buf_bytes - 1] = 0;
620 .format =
"stream index %d already named %s",
621 .format_args =
"i4s16",
630 ed->stream_index = p[0];
640 s->
state = MC_STREAM_STATE_name_known;
649 .format =
"stream index %d named %s",
650 .format_args =
"i4s16",
659 ed->stream_index = s->
index;
680 .name =
"mc_register_stream_name",
697 u32 last_global_sequence_processed)
742 if (s->
state == MC_STREAM_STATE_ready)
749 == MC_STREAM_STATE_invalid))
752 .name =
"mc-internal",
774 &mc_register_stream_name_msg, config->
name);
837 s->
state = MC_STREAM_STATE_join_in_progress;
850 .format =
"stream index %d join request %s",
851 .format_args =
"i4s16",
860 ed->stream_index = s->
index;
871 ELOG_TYPE (e,
"join complete stream %d");
897 .format =
"leave-stream: %d",.format_args =
"i4",
905 ed->index = stream_index;
910 s->
state = MC_STREAM_STATE_name_known;
915 mc_msg_join_or_leave_request_t * req,
919 mc_msg_join_reply_t *rep;
925 if (!s || s->
state != MC_STREAM_STATE_ready)
940 if (this_s->
state != MC_STREAM_STATE_ready
941 && this_s->
state != MC_STREAM_STATE_name_known)
952 memset (rep, 0,
sizeof (rep[0]));
953 rep->type = MC_MSG_TYPE_join_reply;
954 rep->stream_index = req->stream_index;
972 mc_msg_join_reply_t * mp,
u32 buffer_index)
980 if (!s || s->
state != MC_STREAM_STATE_join_in_progress)
985 s->
state = MC_STREAM_STATE_catchup;
989 mp->stream_index, mp->catchup_peer_id);
1006 if (s->
state == MC_STREAM_STATE_catchup
1007 || s->
state == MC_STREAM_STATE_ready)
1021 mc_msg_user_request_t *mp;
1028 if (s->
state != MC_STREAM_STATE_ready)
1058 mp->global_sequence = 0xdeadbeef;
1059 mp->stream_index = s->
index;
1097 static int once = 0;
1104 if (!s || s->
state != MC_STREAM_STATE_ready)
1114 seq_cmp_result =
mc_seq_cmp (mp->local_sequence,
1122 .format =
"rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1123 .format_args =
"T4i4i4i4",
1128 u32 peer, stream_index, rx_sequence;
1133 ed->stream_index = mp->stream_index;
1134 ed->rx_sequence = mp->local_sequence;
1135 ed->seq_cmp_result = seq_cmp_result;
1138 if (0 && mp->stream_index == 1 && once == 0)
1141 ELOG_TYPE (e,
"FAKE lost msg on stream 1");
1149 if (seq_cmp_result > 0)
1155 mc_msg_user_ack_t *rp;
1160 rp->stream_index = s->
index;
1161 rp->local_sequence = mp->local_sequence;
1162 rp->seq_cmp_result = seq_cmp_result;
1169 .format =
"tx-ack: stream %d local seq %d",
1170 .format_args =
"i4i4",
1179 ed->stream_index = rp->stream_index;
1180 ed->local_sequence = rp->local_sequence;
1187 if (seq_cmp_result < 0)
1194 if (seq_cmp_result == 0)
1199 case MC_STREAM_STATE_ready:
1210 case MC_STREAM_STATE_catchup:
1231 int peer_created = 0;
1242 .format =
"rx-ack: local seq %d peer %s seq_cmp_result %d",
1243 .format_args =
"i4T4i4",
1254 ed->local_sequence = mp->local_sequence;
1256 ed->seq_cmp_result = mp->seq_cmp_result;
1271 if (mp->seq_cmp_result > 0)
1274 mp->seq_cmp_result);
1300 .format =
"ack: for seq %d from peer %s no fifo elt",
1301 .format_args =
"i4T4",
1311 ed->seq = mp->local_sequence;
1337 if (!peer_created &&
1345 .format =
"dup-ack: for seq %d from peer %s",
1346 .format_args =
"i4T4",
1367 .format =
"ack: for seq %d from peer %s",
1368 .format_args =
"i4T4",
1377 ed->seq = mp->local_sequence;
1394 .format =
"ack: retire fifo elt loc seq %d after %d acks",
1395 .format_args =
"i4i4",
1414 #define EVENT_MC_SEND_CATCHUP_DATA 0 1421 uword *event_data = 0;
1428 _vec_len (event_data) = 0;
1432 for (i = 0; i <
vec_len (event_data); i++)
1457 u8 * x = serialize_get (m, sizeof (p->id));
1458 clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1459 serialize_integer (m, p->last_sequence_received,
1460 sizeof (p->last_sequence_received));
1475 for (i = 0; i < n_peers; i++)
1495 mc_msg_catchup_request_t * req,
1505 if (!s || s->
state != MC_STREAM_STATE_ready)
1513 .format =
"catchup-request: from %s stream %d",
1514 .format_args =
"T4i4",
1523 ed->stream = req->stream_index;
1545 mc_msg_catchup_reply_t *rep;
1552 rep->peer_id = req->peer_id;
1553 rep->stream_index = req->stream_index;
1580 #define EVENT_MC_UNSERIALIZE_BUFFER 0 1581 #define EVENT_MC_UNSERIALIZE_CATCHUP 1 1604 if (!s || s->
state == MC_STREAM_STATE_ready)
1623 if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1624 s->our_local_sequence = p->last_sequence_received + 1;
1634 mp->n_data_bytes - n_stream_bytes);
1645 mc_msg_user_request_t *gp;
1655 seq_cmp_result =
mc_seq_cmp (gp->global_sequence,
1656 mp->last_global_sequence_included);
1658 if (seq_cmp_result > 0)
1669 .format =
"catchup replay local sequence 0x%x",
1670 .format_args =
"i4",
1678 ed->local_sequence = gp->local_sequence;
1688 .format =
"catchup discard local sequence 0x%x",
1689 .format_args =
"i4",
1697 ed->local_sequence = gp->local_sequence;
1704 s->
state = MC_STREAM_STATE_ready;
1720 mc_msg_master_assert_t *mp;
1725 f64 now, time_last_master_assert = -1;
1742 if (now >= time_last_master_assert + 1)
1744 time_last_master_assert = now;
1759 .format =
"tx-massert: peer %s global seq %u",
1760 .format_args =
"T4i4",
1765 u32 peer, global_sequence;
1769 ed->global_sequence = mp->global_sequence;
1787 if (!is_master && timeouts++ > 2)
1794 ELOG_TYPE (e,
"become master (was maybe_master)");
1805 ELOG_TYPE (e,
"become slave (was maybe_master)");
1840 ELOG_TYPE (e,
"timeouts; negoitate mastership");
1896 u8 signal_slave = 0;
1897 u8 update_global_sequence = 0;
1901 his_peer_id = mp->peer_id;
1905 seq_cmp_result =
mc_seq_cmp (mp->global_sequence,
1911 && seq_cmp_result >= 0)
1920 if (seq_cmp_result > 0)
1923 update_global_sequence = 1;
1952 .format =
"rx-massert: peer %s global seq %u upd %d slave %d",
1953 .format_args =
"T4i4i1i1",
1960 u32 global_sequence;
1966 ed->global_sequence = mp->global_sequence;
1967 ed->update_sequence = update_global_sequence;
1968 ed->slave = signal_slave;
1988 m = m->next_registration;
1995 u32 multiple_messages_per_vlib_buffer,
2002 u32 bi, n_before, n_after, n_total, n_this_msg;
2007 sbm->
tx.max_n_data_bytes_per_chain = 4096;
2017 gi = msg->global_index;
2035 .format =
"serialize-msg: %s index %d",
2036 .format_args =
"T4i4",
2051 n_this_msg = n_after - n_before;
2052 n_total = n_after +
sizeof (mc_msg_user_request_t);
2056 msg->max_n_bytes_serialized =
2057 clib_max (msg->max_n_bytes_serialized, n_this_msg);
2059 if (!multiple_messages_per_vlib_buffer
2061 || n_total + msg->max_n_bytes_serialized >
2078 u32 multiple_messages_per_vlib_buffer,
2085 if (stream_index == ~0)
2095 multiple_messages_per_vlib_buffer, msg, &va);
2125 .format =
"unserialize-msg: %s rx index %d",
2126 .format_args =
"T4i4",
2167 .format =
"msg-bind: stream %d %s to index %d",
2168 .format_args =
"i4T4i4",
2176 ed->c[0] = s->
index;
2189 .format =
"msg-id-ERROR: %s index %d expected %d",
2190 .format_args =
"T4i4i4",
2234 stream_and_buffer_index);
2242 static u8 *contents;
2281 uword event_type, *event_data = 0;
2287 _vec_len (event_data) = 0;
2294 for (i = 0; i <
vec_len (event_data); i++)
2299 for (i = 0; i <
vec_len (event_data); i++)
2343 u32 i, n_streams, n_stream_msgs;
2349 for (i = 0; i < n_streams; i++)
2359 s->
state = MC_STREAM_STATE_name_known;
2387 .format =
"catchup-bind: %s to %d global index %d stream %d",
2388 .format_args =
"T4i4i4i4",
2400 ed->c[3] = s->
index;
2433 memset (&r, 0,
sizeof (r));
2438 r.runtime_data = &mcm;
2439 r.runtime_data_bytes =
sizeof (&mcm);
2441 r.name = (
char *)
format (0,
"mc-mastership-%s", tag);
2445 r.name = (
char *)
format (0,
"mc-join-ager-%s", tag);
2449 r.name = (
char *)
format (0,
"mc-retry-%s", tag);
2453 r.name = (
char *)
format (0,
"mc-catchup-%s", tag);
2457 r.name = (
char *)
format (0,
"mc-unserialize-%s", tag);
2488 return format (s,
"unknown 0x%x", state);
2491 return format (s,
"%s", t);
2501 #define _(f) case MC_STREAM_STATE_##f: t = #f; break; 2505 return format (s,
"unknown 0x%x", state);
2508 return format (s,
"%s", t);
2528 s =
format (s,
"MC state %U, %d streams joined, global sequence 0x%x",
2535 s =
format (s,
"\n%UMost recent mastership peers:",
2539 s =
format (s,
"\n%U%-30U%.4e",
2548 s =
format (s,
"\n%Ustream `%s' index %d",
2551 s =
format (s,
"\n%Ustate %U",
2557 "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2562 s =
format (s,
"\n%U%Ld/%Ld user requests sent/received",
2566 s =
format (s,
"\n%U%d peers, local/global sequence 0x%x/0x%x",
2575 if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2576 vec_add1 (ps, p[0]);
2580 s =
format (s,
"\n%U%=30s%10s%16s%16s",
2582 "Peer",
"Last seq",
"Retries",
"Future");
2586 s =
format (s,
"\n%U%-30U0x%08x%16Ld%16Ld%s",
static mc_stream_t * mc_stream_by_index(mc_main_t *m, u32 i)
u32 last_global_sequence_processed
void mc_main_init(mc_main_t *mcm, char *tag)
void mc_stream_join_process_hold(void)
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
#define EVENT_MC_SEND_CATCHUP_DATA
static mc_main_t * mc_node_get_main(vlib_node_runtime_t *node)
mhash_t mastership_peer_index_by_id
static void delete_peer_with_index(mc_main_t *mcm, mc_stream_t *s, uword index, int notify_application)
#define hash_set(h, key, value)
void mc_unserialize(mc_main_t *mcm, mc_stream_t *s, u32 buffer_index)
sll srl srl sll sra u16x4 i
static void mc_stream_init(mc_stream_t *s)
static mc_retry_t * prev_retry(mc_stream_t *s, mc_retry_t *r)
static void vlib_signal_one_time_waiting_process(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
static void mc_stream_free(mc_stream_t *s)
static u64 unserialize_likely_small_unsigned_integer(serialize_main_t *m)
u64 user_requests_received
void mc_rx_buffer_unserialize(mc_main_t *mcm, mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
static f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
Suspend a cooperative multi-tasking thread Waits for an event, or for the indicated number of seconds...
static void elog_stream_name(char *buf, int n_buf_bytes, char *v)
#define hash_unset(h, key)
static uword * vlib_process_wait_for_event(vlib_main_t *vm)
static void mc_byte_swap_msg_join_reply(mc_msg_join_reply_t *r)
void serialize_bitmap(serialize_main_t *m, uword *b)
static vlib_main_t * vlib_get_main(void)
static uword clib_fifo_elts(void *v)
vlib_one_time_waiting_process_t * procs_waiting_for_join_done
static uword mc_retry_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
vlib_one_time_waiting_process_t * procs_waiting_for_mc_stream_join
static void maybe_send_window_open_event(vlib_main_t *vm, mc_stream_t *stream)
u32 last_sequence_received
u32 mc_stream_send(mc_main_t *mcm, u32 stream_index, u32 buffer_index)
static f64 vlib_time_now(vlib_main_t *vm)
struct vlib_serialize_buffer_main_t::@22::@24 tx
struct vlib_main_t * vlib_main
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
#define ELOG(em, f, data)
static void mc_byte_swap_msg_user_request(mc_msg_user_request_t *r)
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
void(* rx_buffer)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
mc_serialize_msg_t ** global_msgs
void * mc_get_vlib_buffer(vlib_main_t *vm, u32 n_bytes, u32 *bi_return)
struct _vlib_node_registration vlib_node_registration_t
u32 serialize_close_vlib_buffer(serialize_main_t *m)
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
#define hash_set_mem(h, key, value)
clib_error_t * mc_serialize_internal(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg,...)
#define clib_error_report(e)
void unserialize_mc_main(serialize_main_t *m, va_list *va)
add_epi add_epi sub_epi sub_epi adds_epu subs_epu i16x8 y
static void mc_byte_swap_msg_catchup_reply(mc_msg_catchup_reply_t *r)
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
struct vlib_serialize_buffer_main_t::@22::@25 rx
u32 mc_stream_join(mc_main_t *mcm, mc_stream_config_t *config)
static u32 serialize_vlib_buffer_n_bytes(serialize_main_t *m)
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
u32 relay_global_sequence
u32 vlib_register_node(vlib_main_t *vm, vlib_node_registration_t *r)
static void elog_tx_msg(mc_main_t *m, u32 stream_id, u32 local_sequence, u32 retry_count)
#define mc_serialize_stream(mc, si, msg, args...)
void(* peer_died)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id)
static uword vlib_process_suspend(vlib_main_t *vm, f64 dt)
Suspend a vlib cooperative multi-tasking thread for a period of time.
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
static u8 * format_mc_relay_state(u8 *s, va_list *args)
#define pool_foreach(VAR, POOL, BODY)
Iterate through pool.
mc_stream_config_t config
void unserialize_close_vlib_buffer(serialize_main_t *m)
static uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
Return the first event type which has occurred and a vector of per-event data of that type...
void unserialize_open_data(serialize_main_t *m, u8 *data, uword n_data_bytes)
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
static uword clib_bitmap_is_zero(uword *ai)
predicate function; is an entire bitmap empty?
static mc_retry_t * next_retry(mc_stream_t *s, mc_retry_t *r)
static void perform_catchup(mc_main_t *mcm, mc_msg_catchup_reply_t *mp)
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
serialize_main_t serialize_mains[VLIB_N_RX_TX]
#define MC_STREAM_INDEX_INTERNAL
static int mc_peer_comp(void *a1, void *a2)
#define clib_warning(format, args...)
mc_peer_id_t our_catchup_peer_id
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
uword * stream_index_by_name
uword * procs_waiting_for_stream_name_by_name
static void check_retry(mc_main_t *mcm, mc_stream_t *s)
u8 *(* catchup_snapshot)(struct mc_main_t *mc_main, u8 *snapshot_vector, u32 last_global_sequence_included)
static uword pointer_to_uword(const void *p)
#define hash_create_string(elts, value_bytes)
mc_mastership_peer_t * mastership_peers
#define clib_bitmap_foreach(i, ai, body)
Macro to iterate across set bits in a bitmap.
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
#define hash_unset_mem(h, key)
#define clib_fifo_sub1(f, e)
u16 current_length
Nbytes between current data and the end of this buffer.
static void vlib_process_signal_event(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
vlib_one_time_waiting_process_t * procs_waiting_for_open_window
static void serialize_likely_small_unsigned_integer(serialize_main_t *m, u64 x)
uword * unacked_by_peer_bitmap
static void mc_byte_swap_msg_master_assert(mc_msg_master_assert_t *r)
#define pool_put(P, E)
Free an object E in pool P.
f64 time_last_master_assert_received
void serialize_open_vlib_buffer(serialize_main_t *m, struct vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
mc_catchup_process_arg_t * catchup_process_args
void unserialize_mc_stream(serialize_main_t *m, va_list *va)
void(* catchup)(struct mc_main_t *mc_main, u8 *snapshot_data, u32 n_snapshot_data_bytes)
static uword mc_mastership_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
static void * unserialize_get(serialize_main_t *m, uword n_bytes)
static uword mhash_set(mhash_t *h, void *key, uword new_value, uword *old_value)
#define EVENT_MC_UNSERIALIZE_BUFFER
#define clib_fifo_foreach(v, f, body)
void unserialize_cstring(serialize_main_t *m, char **s)
#define uword_to_pointer(u, type)
static uword vlib_buffer_contents(vlib_main_t *vm, u32 buffer_index, u8 *contents)
Copy buffer contents to memory.
mc_peer_id_t our_ack_peer_id
static format_function_t format_mc_stream_state
static void mc_serialize_init(mc_main_t *mcm)
MC_SERIALIZE_MSG(mc_register_stream_name_msg, static)
static uword mc_catchup_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
serialize_stream_t stream
clib_error_t * serialize(serialize_main_t *m,...)
void mhash_init(mhash_t *h, uword n_value_bytes, uword n_key_bytes)
vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]
uword mc_unserialize_message(mc_main_t *mcm, mc_stream_t *s, serialize_main_t *m)
static uword * delete_retry_fifo_elt(mc_main_t *mcm, mc_stream_t *stream, mc_retry_t *r, uword *dead_peer_bitmap)
void serialize_open_vector(serialize_main_t *m, u8 *vector)
#define vec_free(V)
Free vector's memory (no header).
void serialize_mc_main(serialize_main_t *m, va_list *va)
#define clib_memcpy(a, b, c)
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
void mc_unserialize_internal(mc_main_t *mcm, u32 stream_and_buffer_index)
mhash_t elog_id_by_peer_id
u8 * format_mc_main(u8 *s, va_list *args)
static uword * mhash_get(mhash_t *h, void *key)
static uword mc_unserialize_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
static void unserialize_integer(serialize_main_t *m, void *x, u32 n_bytes)
static void serialize_integer(serialize_main_t *m, u64 x, u32 n_bytes)
#define ELOG_TYPE(f, fmt)
static int mc_peer_id_compare(mc_peer_id_t a, mc_peer_id_t b)
mc_retry_t * retired_fifo
static uword clib_bitmap_get(uword *ai, uword i)
Gets the ith bit value from a bitmap.
#define ELOG_TYPE_DECLARE(f)
void mc_enable_disable_mastership(mc_main_t *mcm, int we_can_be_master)
static void this_node_slave(mc_main_t *mcm)
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
static void remove_retry_from_pool(mc_stream_t *s, mc_retry_t *r)
#define VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX
mc_stream_stats_t stats_last_clear
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
static uword mc_join_ager_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
#define pool_put_index(p, i)
Free pool element with given index.
void mc_stream_leave(mc_main_t *mcm, u32 stream_index)
static void mc_byte_swap_msg_user_ack(mc_msg_user_ack_t *r)
static mc_stream_t * mc_stream_by_name(mc_main_t *m, char *name)
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
static void serialize_mc_stream(serialize_main_t *m, va_list *va)
mc_serialize_stream_msg_t * stream_msgs
static u32 elog_id_for_msg_name(mc_main_t *m, char *msg_name)
mc_stream_peer_stats_t stats
vhost_vring_state_t state
void(* save_snapshot)(struct mc_main_t *mc_main, u32 is_catchup, u8 *snapshot_data, u32 n_snapshot_data_bytes)
mc_serialize_msg_t * mc_msg_registrations
void unserialize_open_vlib_buffer(serialize_main_t *m, struct vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
clib_error_t * unserialize(serialize_main_t *m,...)
#define clib_bitmap_free(v)
Free a bitmap.
static uword vlib_buffer_index_length_in_chain(vlib_main_t *vm, u32 bi)
Get length in bytes of the buffer index buffer chain.
u32 we_can_be_relay_master
u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
#define EVENT_MC_UNSERIALIZE_CATCHUP
vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool
static void vlib_current_process_wait_for_one_time_event_vector(vlib_main_t *vm, vlib_one_time_waiting_process_t **wps)
void serialize_cstring(serialize_main_t *m, char *s)
#define vec_elt(v, i)
Get vector value at index i.
uword * global_msg_index_by_name
static void mc_internal_catchup(mc_main_t *mcm, u8 *data, u32 n_data_bytes)
void mc_wait_for_stream_ready(mc_main_t *m, char *stream_name)
#define clib_fifo_add1(f, e)
#define vec_copy(DST, SRC)
Copy a vector, memcpy wrapper.
uword * unserialize_bitmap(serialize_main_t *m)
u32 elog_string(elog_main_t *em, char *fmt,...)
mc_relay_state_t relay_state
struct _mc_serialize_msg mc_serialize_msg_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
mc_stream_peer_stats_t stats_last_clear
uword * retry_index_by_local_sequence
static void mc_byte_swap_msg_join_or_leave_request(mc_msg_join_or_leave_request_t *r)
static uword vlib_process_wait_for_event_with_type(vlib_main_t *vm, uword **data_vector, uword with_type_opaque)
uword * elog_id_by_msg_name
clib_error_t * va_serialize(serialize_main_t *sm, va_list *va)
#define vec_sort_with_function(vec, f)
Sort a vector using the supplied element comparison function.
mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers
static mc_stream_peer_t * get_or_create_peer_with_id(mc_main_t *mcm, mc_stream_t *s, mc_peer_id_t id, int *created)
static u32 elog_id_for_peer_id(mc_main_t *m, u64 peer_id)
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
format_function_t * format_peer_id
static u8 * mc_internal_catchup_snapshot(mc_main_t *mcm, u8 *data_vector, u32 last_global_sequence_processed)
#define hash_get_mem(h, key)
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
static uword vlib_in_process_context(vlib_main_t *vm)
#define clib_fifo_add2(f, p)
static void this_node_maybe_master(mc_main_t *mcm)
static u32 mc_stream_join_helper(mc_main_t *mcm, mc_stream_config_t *config, u32 is_internal)
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
#define vec_foreach(var, vec)
Vector iterator.
void * serialize_close_vector(serialize_main_t *m)
static void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
static void unserialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
clib_error_t * mc_serialize_va(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg, va_list *va)
#define vec_validate_init_empty(V, I, INIT)
Make sure vector is long enough for given index and initialize empty space (no header, unspecified alignment)
static i32 mc_seq_cmp(u32 x, u32 y)
static void mc_retry_free(mc_main_t *mcm, mc_stream_t *s, mc_retry_t *r)
u32 * stream_msg_index_by_global_index
struct vlib_main_t * vlib_main
static void send_join_or_leave_request(mc_main_t *mcm, u32 stream_index, u32 is_join)
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
static void mc_resend_retired(mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
mc_stream_t * stream_vector
static void vlib_current_process_wait_for_one_time_event(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
static void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
static u32 unserialize_vlib_buffer_n_bytes(serialize_main_t *m)
static void serialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
static uword pool_elts(void *v)
Number of active elements in a pool.