Hybrid ICN (hICN) plugin  v21.06-rc0-4-g18fa668
portal.h
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <core/pending_interest.h>
19 #include <glog/logging.h>
20 #include <hicn/transport/config.h>
21 #include <hicn/transport/core/asio_wrapper.h>
22 #include <hicn/transport/core/content_object.h>
23 #include <hicn/transport/core/interest.h>
24 #include <hicn/transport/core/io_module.h>
25 #include <hicn/transport/core/name.h>
26 #include <hicn/transport/core/prefix.h>
27 #include <hicn/transport/errors/errors.h>
28 #include <hicn/transport/interfaces/global_conf_interface.h>
29 #include <hicn/transport/interfaces/portal.h>
30 #include <hicn/transport/portability/portability.h>
31 #include <hicn/transport/utils/fixed_block_allocator.h>
32 
33 #include <future>
34 #include <memory>
35 #include <queue>
36 #include <unordered_map>
37 
38 namespace libconfig {
39 class Setting;
40 }
41 
42 namespace transport {
43 namespace core {
44 
45 namespace portal_details {
46 
47 static constexpr uint32_t pit_size = 1024;
48 
50 #ifdef __vpp__
51  public:
52  HandlerMemory() {}
53 
54  HandlerMemory(const HandlerMemory &) = delete;
55  HandlerMemory &operator=(const HandlerMemory &) = delete;
56 
57  TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
59  }
60 
61  TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
63  pointer);
64  }
65 #else
66  public:
67  HandlerMemory() {}
68 
69  HandlerMemory(const HandlerMemory &) = delete;
70  HandlerMemory &operator=(const HandlerMemory &) = delete;
71 
72  TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
73  return ::operator new(size);
74  }
75 
76  TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
77  ::operator delete(pointer);
78  }
79 #endif
80 };
81 
82 // The allocator to be associated with the handler objects. This allocator only
83 // needs to satisfy the C++11 minimal allocator requirements.
84 template <typename T>
86  public:
87  using value_type = T;
88 
89  explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {}
90 
91  template <typename U>
92  HandlerAllocator(const HandlerAllocator<U> &other) noexcept
93  : memory_(other.memory_) {}
94 
95  TRANSPORT_ALWAYS_INLINE bool operator==(
96  const HandlerAllocator &other) const noexcept {
97  return &memory_ == &other.memory_;
98  }
99 
100  TRANSPORT_ALWAYS_INLINE bool operator!=(
101  const HandlerAllocator &other) const noexcept {
102  return &memory_ != &other.memory_;
103  }
104 
105  TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const {
106  return static_cast<T *>(memory_.allocate(sizeof(T) * n));
107  }
108 
109  TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const {
110  return memory_.deallocate(p);
111  }
112 
113  private:
114  template <typename>
115  friend class HandlerAllocator;
116 
117  // The underlying memory.
118  HandlerMemory &memory_;
119 };
120 
121 // Wrapper class template for handler objects to allow handler memory
122 // allocation to be customised. The allocator_type type and get_allocator()
123 // member function are used by the asynchronous operations to obtain the
124 // allocator. Calls to operator() are forwarded to the encapsulated handler.
125 template <typename Handler>
127  public:
129 
131  : memory_(m), handler_(h) {}
132 
133  allocator_type get_allocator() const noexcept {
134  return allocator_type(memory_);
135  }
136 
137  template <typename... Args>
138  void operator()(Args &&... args) {
139  handler_(std::forward<Args>(args)...);
140  }
141 
142  private:
143  HandlerMemory &memory_;
144  Handler handler_;
145 };
146 
147 // Helper function to wrap a handler object to add custom allocation.
148 template <typename Handler>
149 inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
150  HandlerMemory &m, Handler h) {
151  return CustomAllocatorHandler<Handler>(m, h);
152 }
153 
154 class Pool {
155  public:
156  Pool(asio::io_service &io_service) : io_service_(io_service) {
157  increasePendingInterestPool();
158  }
159 
160  TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
161  // Create pool of pending interests to reuse
162  for (uint32_t i = 0; i < pit_size; i++) {
163  pending_interests_pool_.add(new PendingInterest(
164  Interest::Ptr(nullptr),
165  std::make_unique<asio::steady_timer>(io_service_)));
166  }
167  }
168  PendingInterest::Ptr getPendingInterest() {
169  auto res = pending_interests_pool_.get();
170  while (TRANSPORT_EXPECT_FALSE(!res.first)) {
171  increasePendingInterestPool();
172  res = pending_interests_pool_.get();
173  }
174 
175  return std::move(res.second);
176  }
177 
178  private:
179  utils::ObjectPool<PendingInterest> pending_interests_pool_;
180  asio::io_service &io_service_;
181 };
182 
183 } // namespace portal_details
184 
185 class PortalConfiguration;
186 
187 using PendingInterestHashTable =
188  std::unordered_map<uint32_t, PendingInterest::Ptr>;
189 
191 
213 class Portal {
214  public:
217 
218  friend class PortalConfiguration;
219 
220  Portal() : Portal(internal_io_service_) {}
221 
222  Portal(asio::io_service &io_service)
223  : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
224  io_service_(io_service),
225  packet_pool_(io_service),
226  app_name_("libtransport_application"),
227  consumer_callback_(nullptr),
228  producer_callback_(nullptr),
229  is_consumer_(false) {
237  }
243  void setConsumerCallback(ConsumerCallback *consumer_callback) {
244  consumer_callback_ = consumer_callback;
245  }
246 
252  void setProducerCallback(ProducerCallback *producer_callback) {
253  producer_callback_ = producer_callback;
254  }
255 
264  TRANSPORT_ALWAYS_INLINE void setOutputInterface(
265  const std::string &output_interface) {
266  io_module_->setOutputInterface(output_interface);
267  }
268 
275  TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
276  if (!io_module_) {
277  pending_interest_hash_table_.reserve(portal_details::pit_size);
278  io_module_.reset(IoModule::load(io_module_path_.c_str()));
279 
280  CHECK(io_module_);
281 
282  io_module_->init(std::bind(&Portal::processIncomingMessages, this,
283  std::placeholders::_1, std::placeholders::_2,
284  std::placeholders::_3),
285  std::bind(&Portal::setLocalRoutes, this), io_service_,
286  app_name_);
287  io_module_->connect(is_consumer);
288  is_consumer_ = is_consumer;
289  }
290  }
291 
296 
300  TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
301  return name.getHash32(false) + name.getSuffix();
302  }
303 
309  TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
310  auto it = pending_interest_hash_table_.find(getHash(name));
311  if (it != pending_interest_hash_table_.end()) {
312  return true;
313  }
314 
315  return false;
316  }
317 
332  TRANSPORT_ALWAYS_INLINE void sendInterest(
333  Interest::Ptr &&interest,
334  OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
335  OnInterestTimeoutCallback &&on_interest_timeout_callback =
336  UNSET_CALLBACK) {
337  // Send it
338  interest->encodeSuffixes();
339  io_module_->send(*interest);
340 
341  uint32_t initial_hash = interest->getName().getHash32(false);
342  auto hash = initial_hash + interest->getName().getSuffix();
343  uint32_t seq = interest->getName().getSuffix();
344  uint32_t *suffix = interest->firstSuffix();
345  auto n_suffixes = interest->numberOfSuffixes();
346  uint32_t counter = 0;
347  // Set timers
348  do {
349  auto pending_interest = packet_pool_.getPendingInterest();
350  pending_interest->setInterest(interest);
351  pending_interest->setOnContentObjectCallback(
352  std::move(on_content_object_callback));
353  pending_interest->setOnTimeoutCallback(
354  std::move(on_interest_timeout_callback));
355 
356  pending_interest->startCountdown(
357  portal_details::makeCustomAllocatorHandler(
358  async_callback_memory_,
359  std::bind(&Portal::timerHandler, this, std::placeholders::_1,
360  hash, seq)));
361 
362  auto it = pending_interest_hash_table_.find(hash);
363  if (it != pending_interest_hash_table_.end()) {
364  it->second->cancelTimer();
365 
366  // Get reference to interest packet in order to have it destroyed.
367  auto _int = it->second->getInterest();
368  it->second = std::move(pending_interest);
369  } else {
370  pending_interest_hash_table_[hash] = std::move(pending_interest);
371  }
372 
373  if (suffix) {
374  hash = initial_hash + *suffix;
375  seq = *suffix;
376  suffix++;
377  }
378 
379  } while (counter++ < n_suffixes);
380  }
381 
390  TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
391  uint32_t hash, uint32_t seq) {
392  bool is_stopped = io_service_.stopped();
393  if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
394  return;
395  }
396 
397  if (TRANSPORT_EXPECT_TRUE(!ec)) {
398  PendingInterestHashTable::iterator it =
399  pending_interest_hash_table_.find(hash);
400  if (it != pending_interest_hash_table_.end()) {
401  PendingInterest::Ptr ptr = std::move(it->second);
402  pending_interest_hash_table_.erase(it);
403  auto _int = ptr->getInterest();
404  Name &name = const_cast<Name &>(_int->getName());
405  name.setSuffix(seq);
406 
407  if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
408  ptr->on_interest_timeout_callback_(_int, name);
409  } else if (consumer_callback_) {
410  consumer_callback_->onTimeout(_int, name);
411  }
412  }
413  }
414  }
415 
422  TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
423  assert(io_module_);
424  io_module_->setContentStoreSize(config.csReserved());
425  served_namespaces_.push_back(config.prefix());
426  setLocalRoutes();
427  }
428 
433  TRANSPORT_ALWAYS_INLINE void runEventsLoop() {
434  if (io_service_.stopped()) {
435  io_service_.reset(); // ensure that run()/poll() will do some work
436  }
437 
438  io_service_.run();
439  }
440 
444  TRANSPORT_ALWAYS_INLINE void runOneEvent() {
445  if (io_service_.stopped()) {
446  io_service_.reset(); // ensure that run()/poll() will do some work
447  }
448 
449  io_service_.run_one();
450  }
451 
458  TRANSPORT_ALWAYS_INLINE void sendContentObject(
459  ContentObject &content_object) {
460  io_module_->send(content_object);
461  }
462 
469  TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
470  if (!io_service_.stopped()) {
471  io_service_.dispatch([this]() {
472  clear();
473  io_service_.stop();
474  });
475  }
476  }
477 
481  TRANSPORT_ALWAYS_INLINE void killConnection() {
482  io_module_->closeConnection();
483  }
484 
488  TRANSPORT_ALWAYS_INLINE void clear() {
489  if (!io_service_.stopped()) {
490  io_service_.dispatch(std::bind(&Portal::doClear, this));
491  } else {
492  doClear();
493  }
494  }
495 
499  TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
500  if (!io_service_.stopped()) {
501  io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
502  } else {
503  doClearOne(name);
504  }
505  }
506 
510  TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
511  return io_service_;
512  }
513 
517  TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
518  served_namespaces_.push_back(prefix);
519  if (io_module_->isConnected()) {
520  io_module_->registerRoute(prefix);
521  }
522  }
523 
527  TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() {
528  std::string mod = io_module_path_.substr(0, io_module_path_.find("."));
529  if (mod == "forwarder_module") return false;
530  return true;
531  }
532 
533  private:
537  TRANSPORT_ALWAYS_INLINE void doClear() {
538  for (auto &pend_interest : pending_interest_hash_table_) {
539  pend_interest.second->cancelTimer();
540 
541  // Get interest packet from pending interest and do nothing with it. It
542  // will get destroyed as it goes out of scope.
543  auto _int = pend_interest.second->getInterest();
544  }
545 
546  pending_interest_hash_table_.clear();
547  }
548 
552  TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
553  auto it = pending_interest_hash_table_.find(getHash(name));
554 
555  if (it != pending_interest_hash_table_.end()) {
556  it->second->cancelTimer();
557 
558  // Get interest packet from pending interest and do nothing with it. It
559  // will get destroyed as it goes out of scope.
560  auto _int = it->second->getInterest();
561 
562  pending_interest_hash_table_.erase(it);
563  }
564  }
565 
572  TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
573  Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
574  bool is_stopped = io_service_.stopped();
575  if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
576  return;
577  }
578 
579  if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
580  processControlMessage(buffer);
581  return;
582  }
583 
584  // The buffer is a base class for an interest or a content object
585  Packet &packet_buffer = static_cast<Packet &>(buffer);
586 
587  auto format = packet_buffer.getFormat();
588  if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
589  if (is_consumer_) {
590  processContentObject(static_cast<ContentObject &>(packet_buffer));
591  } else {
592  processInterest(static_cast<Interest &>(packet_buffer));
593  }
594  } else {
595  LOG(ERROR) << "Received not supported packet. Ignoring it.";
596  }
597  }
598 
604  TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
605  for (auto &prefix : served_namespaces_) {
606  if (io_module_->isConnected()) {
607  io_module_->registerRoute(prefix);
608  }
609  }
610  }
611 
612  TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
613  // Interest for a producer
614  DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName();
615  if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
616  producer_callback_->onInterest(interest);
617  }
618  }
619 
628  TRANSPORT_ALWAYS_INLINE void processContentObject(
629  ContentObject &content_object) {
630  DLOG_IF(INFO, VLOG_IS_ON(3))
631  << "processContentObject " << content_object.getName();
632  uint32_t hash = getHash(content_object.getName());
633 
634  auto it = pending_interest_hash_table_.find(hash);
635  if (it != pending_interest_hash_table_.end()) {
636  DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
637 
638  PendingInterest::Ptr interest_ptr = std::move(it->second);
639  pending_interest_hash_table_.erase(it);
640  interest_ptr->cancelTimer();
641  auto _int = interest_ptr->getInterest();
642 
643  if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
644  interest_ptr->on_content_object_callback_(*_int, content_object);
645  } else if (consumer_callback_) {
646  consumer_callback_->onContentObject(*_int, content_object);
647  }
648  } else {
649  DLOG_IF(INFO, VLOG_IS_ON(3))
650  << "No interest pending for received content object.";
651  }
652  }
653 
659  TRANSPORT_ALWAYS_INLINE void processControlMessage(
660  utils::MemBuf &packet_buffer) {
661  io_module_->processControlMessageReply(packet_buffer);
662  }
663 
664  private:
665  portal_details::HandlerMemory async_callback_memory_;
666  std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
667 
668  asio::io_service &io_service_;
669  asio::io_service internal_io_service_;
670  portal_details::Pool packet_pool_;
671 
672  std::string app_name_;
673 
674  PendingInterestHashTable pending_interest_hash_table_;
675  std::list<Prefix> served_namespaces_;
676 
677  ConsumerCallback *consumer_callback_;
678  ProducerCallback *producer_callback_;
679 
680  bool is_consumer_;
681 
682  private:
683  static std::string defaultIoModule();
684  static void parseIoModuleConfiguration(const libconfig::Setting &io_config,
685  std::error_code &ec);
686  static void getModuleConfiguration(
687  interface::global_config::ConfigurationObject &conf, std::error_code &ec);
688  static void setModuleConfiguration(
689  const interface::global_config::ConfigurationObject &conf,
690  std::error_code &ec);
691  static interface::global_config::IoModuleConfiguration conf_;
692  static std::string io_module_path_;
693 };
694 
695 } // namespace core
696 
697 } // end namespace transport
transport::core::Portal::setProducerCallback
void setProducerCallback(ProducerCallback *producer_callback)
Definition: portal.h:252
transport::core::Portal::setConsumerCallback
void setConsumerCallback(ConsumerCallback *consumer_callback)
Definition: portal.h:243
transport::core::Portal::sendContentObject
TRANSPORT_ALWAYS_INLINE void sendContentObject(ContentObject &content_object)
Definition: portal.h:458
transport::core::Portal::clearOne
TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name)
Definition: portal.h:499
transport::core::IoModule
Definition: io_module.h:43
transport::core::Portal::stopEventsLoop
TRANSPORT_ALWAYS_INLINE void stopEventsLoop()
Definition: portal.h:469
transport::core::Portal::bind
TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config)
Definition: portal.h:422
transport::core::portal_details::Pool
Definition: portal.h:154
transport::core::Name
Definition: name.h:45
utils::FixedBlockAllocator
Definition: fixed_block_allocator.h:18
transport::core::hash
Definition: name.h:118
transport::core::Portal::interestIsPending
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name)
Definition: portal.h:309
transport::core::PacketManager
Definition: global_object_pool.h:30
transport::core::portal_details::CustomAllocatorHandler
Definition: portal.h:126
transport::core::Portal::runEventsLoop
TRANSPORT_ALWAYS_INLINE void runEventsLoop()
Definition: portal.h:433
transport::core::ContentObject
Definition: content_object.h:29
transport::core::PendingInterest
Definition: pending_interest.h:40
transport::interface::BasicBindConfig
Definition: portal.h:32
transport::core::portal_details::HandlerMemory
Definition: portal.h:49
transport::core::Portal::getHash
TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name)
Definition: portal.h:300
transport::core::Portal::runOneEvent
TRANSPORT_ALWAYS_INLINE void runOneEvent()
Definition: portal.h:444
transport::core::portal_details::HandlerAllocator
Definition: portal.h:85
transport::core::Portal::timerHandler
TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, uint32_t hash, uint32_t seq)
Definition: portal.h:390
transport
Definition: forwarder_config.h:32
transport::core::Portal::Portal
Portal(asio::io_service &io_service)
Definition: portal.h:222
transport::interface::Portal::ProducerCallback
Definition: portal.h:79
transport::core::Portal::getIoService
TRANSPORT_ALWAYS_INLINE asio::io_service & getIoService()
Definition: portal.h:510
transport::core::Portal
Definition: portal.h:213
transport::core::Portal::sendInterest
TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object_callback=UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback=UNSET_CALLBACK)
Definition: portal.h:332
transport::core::Portal::~Portal
~Portal()
Definition: portal.h:295
transport::core::Portal::clear
TRANSPORT_ALWAYS_INLINE void clear()
Definition: portal.h:488
transport::core::Portal::registerRoute
TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix)
Definition: portal.h:517
transport::core::Portal::isConnectedToFwd
TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd()
Definition: portal.h:527
transport::core::Portal::connect
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer=true)
Definition: portal.h:275
utils::ObjectPool
Definition: object_pool.h:28
transport::core::Prefix
Definition: prefix.h:24
transport::core::Portal::setOutputInterface
TRANSPORT_ALWAYS_INLINE void setOutputInterface(const std::string &output_interface)
Definition: portal.h:264
transport::interface::Portal::ConsumerCallback
Definition: portal.h:68
utils::MemBuf
Definition: membuf.h:45
transport::core::Portal::killConnection
TRANSPORT_ALWAYS_INLINE void killConnection()
Definition: portal.h:481