18 #include <hicn/transport/core/connector_stats.h>
19 #include <hicn/transport/core/content_object.h>
20 #include <hicn/transport/core/endpoint.h>
21 #include <hicn/transport/core/global_object_pool.h>
22 #include <hicn/transport/core/interest.h>
23 #include <hicn/transport/core/packet.h>
24 #include <hicn/transport/portability/platform.h>
25 #include <hicn/transport/utils/membuf.h>
26 #include <hicn/transport/utils/object_pool.h>
27 #include <hicn/transport/utils/ring_buffer.h>
28 #include <hicn/transport/utils/shared_ptr_utils.h>
37 class Connector :
public std::enable_shared_from_this<Connector> {
39 enum class Type : uint8_t {
45 enum class State : std::uint8_t {
51 enum class Role : std::uint8_t { CONSUMER, PRODUCER };
54 static constexpr std::size_t queue_size = 4096;
55 static constexpr std::uint32_t invalid_connector = ~0;
58 static constexpr std::uint16_t max_burst = 256;
61 using Ptr = std::shared_ptr<Connector>;
62 using PacketQueue = std::deque<Packet::Ptr>;
63 using PacketReceivedCallback = std::function<void(
65 using PacketSentCallback =
66 std::function<void(
Connector *,
const std::error_code &)>;
67 using OnCloseCallback = std::function<void(
Connector *)>;
68 using OnReconnectCallback = std::function<void(
Connector *)>;
69 using Id = std::uint64_t;
71 template <
typename ReceiveCallback,
typename SentCallback,
typename OnClose,
73 Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent,
74 OnClose &&close_callback, OnReconnect &&on_reconnect)
75 : receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
76 sent_callback_(std::forward<SentCallback &&>(packet_sent)),
77 on_close_callback_(std::forward<OnClose &&>(close_callback)),
78 on_reconnect_callback_(std::forward<OnReconnect &&>(on_reconnect)),
79 state_(State::CLOSED),
80 connector_id_(invalid_connector) {}
84 template <
typename ReceiveCallback>
85 void setReceiveCallback(ReceiveCallback &&callback) {
86 receive_callback_ = std::forward<ReceiveCallback &&>(callback);
89 template <
typename SentCallback>
90 void setSentCallback(SentCallback &&callback) {
91 sent_callback_ = std::forward<SentCallback &&>(callback);
94 template <
typename OnClose>
95 void setOnCloseCallback(OnClose &&callback) {
96 on_close_callback_ = std::forward<OnClose &&>(callback);
99 template <
typename OnReconnect>
100 void setReconnectCallback(
const OnReconnect &&callback) {
101 on_reconnect_callback_ = std::forward<OnReconnect>(callback);
104 const PacketReceivedCallback &getReceiveCallback()
const {
105 return receive_callback_;
108 const PacketSentCallback &getSentCallback() {
return sent_callback_; }
110 const OnCloseCallback &getOnCloseCallback() {
return on_close_callback_; }
112 const OnReconnectCallback &getOnReconnectCallback() {
113 return on_reconnect_callback_;
116 virtual void send(
Packet &packet) = 0;
118 virtual void send(
const uint8_t *packet, std::size_t len) = 0;
120 virtual void close() = 0;
122 virtual State state() {
return state_; };
124 virtual bool isConnected() {
return state_ == State::CONNECTED; }
126 void setConnectorId(Id connector_id) { connector_id_ = connector_id; }
128 Id getConnectorId() {
return connector_id_; }
130 void setConnectorName(std::string connector_name) {
131 connector_name_ = connector_name;
134 std::string getConnectorName() {
return connector_name_; }
136 Endpoint getLocalEndpoint() {
return local_endpoint_; }
138 Endpoint getRemoteEndpoint() {
return remote_endpoint_; }
140 void setRole(Role r) { role_ = r; }
142 Role getRole() {
return role_; }
144 static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer,
146 utils::MemBuf::Ptr ret;
148 auto format = Packet::getFormatFromBuffer(buffer, size);
150 if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) {
151 if (Packet::isInterest(buffer)) {
153 .getPacketFromExistingBuffer<
Interest>(buffer, size);
165 static std::pair<uint8_t *, std::size_t> getRawBuffer() {
171 stats_.tx_packets_.fetch_add(1, std::memory_order_relaxed);
172 stats_.tx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
176 stats_.rx_packets_.fetch_add(1, std::memory_order_relaxed);
177 stats_.rx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
180 inline void sendFailed() {
181 stats_.drops_.fetch_add(1, std::memory_order_relaxed);
185 PacketQueue output_buffer_;
188 PacketReceivedCallback receive_callback_;
189 PacketSentCallback sent_callback_;
190 OnCloseCallback on_close_callback_;
191 OnReconnectCallback on_reconnect_callback_;
194 std::atomic<State> state_;
202 std::string connector_name_;