Hybrid ICN (hICN) plugin  v21.06-rc0-4-g18fa668
connector.h
1 /*
2  * Copyright (c) 2021 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <hicn/transport/core/connector_stats.h>
19 #include <hicn/transport/core/content_object.h>
20 #include <hicn/transport/core/endpoint.h>
21 #include <hicn/transport/core/global_object_pool.h>
22 #include <hicn/transport/core/interest.h>
23 #include <hicn/transport/core/packet.h>
24 #include <hicn/transport/portability/platform.h>
25 #include <hicn/transport/utils/membuf.h>
26 #include <hicn/transport/utils/object_pool.h>
27 #include <hicn/transport/utils/ring_buffer.h>
28 #include <hicn/transport/utils/shared_ptr_utils.h>
29 
30 #include <deque>
31 #include <functional>
32 
33 namespace transport {
34 
35 namespace core {
36 
37 class Connector : public std::enable_shared_from_this<Connector> {
38  public:
39  enum class Type : uint8_t {
40  SOCKET_CONNECTOR,
41  MEMIF_CONNECTOR,
42  LOOPBACK_CONNECTOR,
43  };
44 
45  enum class State : std::uint8_t {
46  CLOSED,
47  CONNECTING,
48  CONNECTED,
49  };
50 
51  enum class Role : std::uint8_t { CONSUMER, PRODUCER };
52 
53  public:
54  static constexpr std::size_t queue_size = 4096;
55  static constexpr std::uint32_t invalid_connector = ~0;
56 
57 #ifdef LINUX
58  static constexpr std::uint16_t max_burst = 256;
59 #endif
60 
61  using Ptr = std::shared_ptr<Connector>;
62  using PacketQueue = std::deque<Packet::Ptr>;
63  using PacketReceivedCallback = std::function<void(
64  Connector *, utils::MemBuf &, const std::error_code &)>;
65  using PacketSentCallback =
66  std::function<void(Connector *, const std::error_code &)>;
67  using OnCloseCallback = std::function<void(Connector *)>;
68  using OnReconnectCallback = std::function<void(Connector *)>;
69  using Id = std::uint64_t;
70 
71  template <typename ReceiveCallback, typename SentCallback, typename OnClose,
72  typename OnReconnect>
73  Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent,
74  OnClose &&close_callback, OnReconnect &&on_reconnect)
75  : receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
76  sent_callback_(std::forward<SentCallback &&>(packet_sent)),
77  on_close_callback_(std::forward<OnClose &&>(close_callback)),
78  on_reconnect_callback_(std::forward<OnReconnect &&>(on_reconnect)),
79  state_(State::CLOSED),
80  connector_id_(invalid_connector) {}
81 
82  virtual ~Connector(){};
83 
84  template <typename ReceiveCallback>
85  void setReceiveCallback(ReceiveCallback &&callback) {
86  receive_callback_ = std::forward<ReceiveCallback &&>(callback);
87  }
88 
89  template <typename SentCallback>
90  void setSentCallback(SentCallback &&callback) {
91  sent_callback_ = std::forward<SentCallback &&>(callback);
92  }
93 
94  template <typename OnClose>
95  void setOnCloseCallback(OnClose &&callback) {
96  on_close_callback_ = std::forward<OnClose &&>(callback);
97  }
98 
99  template <typename OnReconnect>
100  void setReconnectCallback(const OnReconnect &&callback) {
101  on_reconnect_callback_ = std::forward<OnReconnect>(callback);
102  }
103 
104  const PacketReceivedCallback &getReceiveCallback() const {
105  return receive_callback_;
106  }
107 
108  const PacketSentCallback &getSentCallback() { return sent_callback_; }
109 
110  const OnCloseCallback &getOnCloseCallback() { return on_close_callback_; }
111 
112  const OnReconnectCallback &getOnReconnectCallback() {
113  return on_reconnect_callback_;
114  }
115 
116  virtual void send(Packet &packet) = 0;
117 
118  virtual void send(const uint8_t *packet, std::size_t len) = 0;
119 
120  virtual void close() = 0;
121 
122  virtual State state() { return state_; };
123 
124  virtual bool isConnected() { return state_ == State::CONNECTED; }
125 
126  void setConnectorId(Id connector_id) { connector_id_ = connector_id; }
127 
128  Id getConnectorId() { return connector_id_; }
129 
130  void setConnectorName(std::string connector_name) {
131  connector_name_ = connector_name;
132  }
133 
134  std::string getConnectorName() { return connector_name_; }
135 
136  Endpoint getLocalEndpoint() { return local_endpoint_; }
137 
138  Endpoint getRemoteEndpoint() { return remote_endpoint_; }
139 
140  void setRole(Role r) { role_ = r; }
141 
142  Role getRole() { return role_; }
143 
144  static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer,
145  std::size_t size) {
146  utils::MemBuf::Ptr ret;
147 
148  auto format = Packet::getFormatFromBuffer(buffer, size);
149 
150  if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) {
151  if (Packet::isInterest(buffer)) {
153  .getPacketFromExistingBuffer<Interest>(buffer, size);
154  } else {
156  .getPacketFromExistingBuffer<ContentObject>(buffer, size);
157  }
158  } else {
159  ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size);
160  }
161 
162  return ret;
163  }
164 
165  static std::pair<uint8_t *, std::size_t> getRawBuffer() {
166  return core::PacketManager<>::getInstance().getRawBuffer();
167  }
168 
169  protected:
170  inline void sendSuccess(const utils::MemBuf &packet) {
171  stats_.tx_packets_.fetch_add(1, std::memory_order_relaxed);
172  stats_.tx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
173  }
174 
175  inline void receiveSuccess(const utils::MemBuf &packet) {
176  stats_.rx_packets_.fetch_add(1, std::memory_order_relaxed);
177  stats_.rx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
178  }
179 
180  inline void sendFailed() {
181  stats_.drops_.fetch_add(1, std::memory_order_relaxed);
182  }
183 
184  protected:
185  PacketQueue output_buffer_;
186 
187  // Connector events
188  PacketReceivedCallback receive_callback_;
189  PacketSentCallback sent_callback_;
190  OnCloseCallback on_close_callback_;
191  OnReconnectCallback on_reconnect_callback_;
192 
193  // Connector state
194  std::atomic<State> state_;
195  Id connector_id_;
196 
197  // Endpoints
198  Endpoint local_endpoint_;
199  Endpoint remote_endpoint_;
200 
201  // Connector name
202  std::string connector_name_;
203 
204  // Connector role
205  Role role_;
206 
207  // Stats
208  AtomicConnectorStats stats_;
209 };
210 
211 } // namespace core
212 } // namespace transport
transport::core::AtomicConnectorStats
Definition: connector_stats.h:26
transport::core::Packet
Definition: packet.h:51
transport::core::Endpoint
Definition: endpoint.h:26
transport::core::PacketManager
Definition: global_object_pool.h:30
transport::core::ContentObject
Definition: content_object.h:29
transport::core::Interest
Definition: interest.h:30
transport::core::Connector
Definition: connector.h:37
transport
Definition: forwarder_config.h:32
utils::MemBuf
Definition: membuf.h:45