18 #include <core/pending_interest.h>
19 #include <glog/logging.h>
20 #include <hicn/transport/config.h>
21 #include <hicn/transport/core/asio_wrapper.h>
22 #include <hicn/transport/core/content_object.h>
23 #include <hicn/transport/core/interest.h>
24 #include <hicn/transport/core/io_module.h>
25 #include <hicn/transport/core/name.h>
26 #include <hicn/transport/core/prefix.h>
27 #include <hicn/transport/errors/errors.h>
28 #include <hicn/transport/interfaces/global_conf_interface.h>
29 #include <hicn/transport/interfaces/portal.h>
30 #include <hicn/transport/portability/portability.h>
31 #include <hicn/transport/utils/fixed_block_allocator.h>
36 #include <unordered_map>
45 namespace portal_details {
47 static constexpr uint32_t pit_size = 1024;
57 TRANSPORT_ALWAYS_INLINE
void *allocate(std::size_t size) {
61 TRANSPORT_ALWAYS_INLINE
void deallocate(
void *pointer) {
72 TRANSPORT_ALWAYS_INLINE
void *allocate(std::size_t size) {
73 return ::operator
new(size);
76 TRANSPORT_ALWAYS_INLINE
void deallocate(
void *pointer) {
77 ::operator
delete(pointer);
93 : memory_(other.memory_) {}
95 TRANSPORT_ALWAYS_INLINE
bool operator==(
97 return &memory_ == &other.memory_;
100 TRANSPORT_ALWAYS_INLINE
bool operator!=(
102 return &memory_ != &other.memory_;
105 TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n)
const {
106 return static_cast<T *
>(memory_.allocate(
sizeof(T) * n));
109 TRANSPORT_ALWAYS_INLINE
void deallocate(T *p, std::size_t )
const {
110 return memory_.deallocate(p);
125 template <
typename Handler>
131 : memory_(m), handler_(h) {}
137 template <
typename... Args>
138 void operator()(Args &&... args) {
139 handler_(std::forward<Args>(args)...);
148 template <
typename Handler>
156 Pool(asio::io_service &io_service) : io_service_(io_service) {
157 increasePendingInterestPool();
160 TRANSPORT_ALWAYS_INLINE
void increasePendingInterestPool() {
162 for (uint32_t i = 0; i < pit_size; i++) {
164 Interest::Ptr(
nullptr),
165 std::make_unique<asio::steady_timer>(io_service_)));
168 PendingInterest::Ptr getPendingInterest() {
169 auto res = pending_interests_pool_.get();
170 while (TRANSPORT_EXPECT_FALSE(!res.first)) {
171 increasePendingInterestPool();
172 res = pending_interests_pool_.get();
175 return std::move(res.second);
180 asio::io_service &io_service_;
185 class PortalConfiguration;
187 using PendingInterestHashTable =
188 std::unordered_map<uint32_t, PendingInterest::Ptr>;
218 friend class PortalConfiguration;
223 : io_module_(nullptr, [](
IoModule *module) { IoModule::unload(module); }),
224 io_service_(io_service),
225 packet_pool_(io_service),
226 app_name_(
"libtransport_application"),
227 consumer_callback_(
nullptr),
228 producer_callback_(
nullptr),
229 is_consumer_(
false) {
244 consumer_callback_ = consumer_callback;
253 producer_callback_ = producer_callback;
265 const std::string &output_interface) {
266 io_module_->setOutputInterface(output_interface);
275 TRANSPORT_ALWAYS_INLINE
void connect(
bool is_consumer =
true) {
277 pending_interest_hash_table_.reserve(portal_details::pit_size);
278 io_module_.reset(IoModule::load(io_module_path_.c_str()));
282 io_module_->init(std::bind(&Portal::processIncomingMessages,
this,
283 std::placeholders::_1, std::placeholders::_2,
284 std::placeholders::_3),
285 std::bind(&Portal::setLocalRoutes,
this), io_service_,
287 io_module_->connect(is_consumer);
288 is_consumer_ = is_consumer;
301 return name.getHash32(
false) + name.getSuffix();
310 auto it = pending_interest_hash_table_.find(
getHash(name));
311 if (it != pending_interest_hash_table_.end()) {
333 Interest::Ptr &&interest,
334 OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
335 OnInterestTimeoutCallback &&on_interest_timeout_callback =
338 interest->encodeSuffixes();
339 io_module_->send(*interest);
341 uint32_t initial_hash = interest->getName().getHash32(
false);
342 auto hash = initial_hash + interest->getName().getSuffix();
343 uint32_t seq = interest->getName().getSuffix();
344 uint32_t *suffix = interest->firstSuffix();
345 auto n_suffixes = interest->numberOfSuffixes();
346 uint32_t counter = 0;
349 auto pending_interest = packet_pool_.getPendingInterest();
350 pending_interest->setInterest(interest);
351 pending_interest->setOnContentObjectCallback(
352 std::move(on_content_object_callback));
353 pending_interest->setOnTimeoutCallback(
354 std::move(on_interest_timeout_callback));
356 pending_interest->startCountdown(
357 portal_details::makeCustomAllocatorHandler(
358 async_callback_memory_,
362 auto it = pending_interest_hash_table_.find(
hash);
363 if (it != pending_interest_hash_table_.end()) {
364 it->second->cancelTimer();
367 auto _int = it->second->getInterest();
368 it->second = std::move(pending_interest);
370 pending_interest_hash_table_[
hash] = std::move(pending_interest);
374 hash = initial_hash + *suffix;
379 }
while (counter++ < n_suffixes);
391 uint32_t
hash, uint32_t seq) {
392 bool is_stopped = io_service_.stopped();
393 if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
397 if (TRANSPORT_EXPECT_TRUE(!ec)) {
398 PendingInterestHashTable::iterator it =
399 pending_interest_hash_table_.find(
hash);
400 if (it != pending_interest_hash_table_.end()) {
401 PendingInterest::Ptr ptr = std::move(it->second);
402 pending_interest_hash_table_.erase(it);
403 auto _int = ptr->getInterest();
404 Name &name =
const_cast<Name &
>(_int->getName());
407 if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
408 ptr->on_interest_timeout_callback_(_int, name);
409 }
else if (consumer_callback_) {
410 consumer_callback_->onTimeout(_int, name);
422 TRANSPORT_ALWAYS_INLINE
void bind(
const BindConfig &config) {
424 io_module_->setContentStoreSize(config.csReserved());
425 served_namespaces_.push_back(config.prefix());
434 if (io_service_.stopped()) {
445 if (io_service_.stopped()) {
449 io_service_.run_one();
460 io_module_->send(content_object);
470 if (!io_service_.stopped()) {
471 io_service_.dispatch([
this]() {
482 io_module_->closeConnection();
488 TRANSPORT_ALWAYS_INLINE
void clear() {
489 if (!io_service_.stopped()) {
490 io_service_.dispatch(std::bind(&Portal::doClear,
this));
500 if (!io_service_.stopped()) {
501 io_service_.dispatch(std::bind(&Portal::doClearOne,
this, name));
518 served_namespaces_.push_back(prefix);
519 if (io_module_->isConnected()) {
520 io_module_->registerRoute(prefix);
528 std::string mod = io_module_path_.substr(0, io_module_path_.find(
"."));
529 if (mod ==
"forwarder_module")
return false;
537 TRANSPORT_ALWAYS_INLINE
void doClear() {
538 for (
auto &pend_interest : pending_interest_hash_table_) {
539 pend_interest.second->cancelTimer();
543 auto _int = pend_interest.second->getInterest();
546 pending_interest_hash_table_.clear();
552 TRANSPORT_ALWAYS_INLINE
void doClearOne(
const Name &name) {
553 auto it = pending_interest_hash_table_.find(
getHash(name));
555 if (it != pending_interest_hash_table_.end()) {
556 it->second->cancelTimer();
560 auto _int = it->second->getInterest();
562 pending_interest_hash_table_.erase(it);
572 TRANSPORT_ALWAYS_INLINE
void processIncomingMessages(
573 Connector *c,
utils::MemBuf &buffer,
const std::error_code &ec) {
574 bool is_stopped = io_service_.stopped();
575 if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
579 if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
580 processControlMessage(buffer);
585 Packet &packet_buffer =
static_cast<Packet &
>(buffer);
587 auto format = packet_buffer.getFormat();
588 if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
590 processContentObject(
static_cast<ContentObject &
>(packet_buffer));
592 processInterest(
static_cast<Interest &
>(packet_buffer));
595 LOG(ERROR) <<
"Received not supported packet. Ignoring it.";
604 TRANSPORT_ALWAYS_INLINE
void setLocalRoutes() {
605 for (
auto &prefix : served_namespaces_) {
606 if (io_module_->isConnected()) {
607 io_module_->registerRoute(prefix);
612 TRANSPORT_ALWAYS_INLINE
void processInterest(Interest &interest) {
614 DLOG_IF(INFO, VLOG_IS_ON(3)) <<
"processInterest " << interest.getName();
615 if (TRANSPORT_EXPECT_TRUE(producer_callback_ !=
nullptr)) {
616 producer_callback_->onInterest(interest);
628 TRANSPORT_ALWAYS_INLINE
void processContentObject(
629 ContentObject &content_object) {
630 DLOG_IF(INFO, VLOG_IS_ON(3))
631 <<
"processContentObject " << content_object.getName();
632 uint32_t hash =
getHash(content_object.getName());
634 auto it = pending_interest_hash_table_.find(hash);
635 if (it != pending_interest_hash_table_.end()) {
636 DLOG_IF(INFO, VLOG_IS_ON(3)) <<
"Found pending interest.";
638 PendingInterest::Ptr interest_ptr = std::move(it->second);
639 pending_interest_hash_table_.erase(it);
640 interest_ptr->cancelTimer();
641 auto _int = interest_ptr->getInterest();
643 if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
644 interest_ptr->on_content_object_callback_(*_int, content_object);
645 }
else if (consumer_callback_) {
646 consumer_callback_->onContentObject(*_int, content_object);
649 DLOG_IF(INFO, VLOG_IS_ON(3))
650 <<
"No interest pending for received content object.";
659 TRANSPORT_ALWAYS_INLINE
void processControlMessage(
661 io_module_->processControlMessageReply(packet_buffer);
665 portal_details::HandlerMemory async_callback_memory_;
666 std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
668 asio::io_service &io_service_;
669 asio::io_service internal_io_service_;
670 portal_details::Pool packet_pool_;
672 std::string app_name_;
674 PendingInterestHashTable pending_interest_hash_table_;
675 std::list<Prefix> served_namespaces_;
677 ConsumerCallback *consumer_callback_;
678 ProducerCallback *producer_callback_;
683 static std::string defaultIoModule();
684 static void parseIoModuleConfiguration(
const libconfig::Setting &io_config,
685 std::error_code &ec);
686 static void getModuleConfiguration(
687 interface::global_config::ConfigurationObject &conf, std::error_code &ec);
688 static void setModuleConfiguration(
689 const interface::global_config::ConfigurationObject &conf,
690 std::error_code &ec);
691 static interface::global_config::IoModuleConfiguration conf_;
692 static std::string io_module_path_;