Hybrid ICN (hICN) plugin  v21.06-rc0-4-g18fa668
transport_protocol.h
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <hicn/transport/interfaces/callbacks.h>
19 #include <hicn/transport/interfaces/socket_consumer.h>
20 #include <hicn/transport/interfaces/statistics.h>
21 #include <hicn/transport/utils/object_pool.h>
22 #include <implementation/socket.h>
23 #include <protocols/data_processing_events.h>
24 #include <protocols/fec_base.h>
25 #include <protocols/indexer.h>
26 #include <protocols/reassembly.h>
27 
28 #include <array>
29 #include <atomic>
30 
31 namespace transport {
32 
33 namespace protocol {
34 
35 using namespace core;
36 
37 class IndexVerificationManager;
38 
39 using ReadCallback = interface::ConsumerSocket::ReadCallback;
40 
43  static constexpr std::size_t interest_pool_size = 4096;
44 
45  friend class ManifestIndexManager;
46 
47  public:
49  Indexer *indexer, Reassembly *reassembly);
50 
51  virtual ~TransportProtocol() = default;
52 
53  TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
54 
55  virtual int start();
56 
57  virtual void stop();
58 
59  virtual void resume();
60 
67  virtual std::size_t transportHeaderLength() { return 0; }
68 
69  virtual void scheduleNextInterests() = 0;
70 
71  // Events generated by the indexing
72  virtual void onContentReassembled(std::error_code ec);
73  virtual void onPacketDropped(Interest &interest,
74  ContentObject &content_object,
75  const std::error_code &ec) override = 0;
76  virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0;
77 
78  protected:
79  virtual void onContentObjectReceived(Interest &i, ContentObject &c,
80  std::error_code &ec) = 0;
81  virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0;
82 
83  virtual void sendInterest(const Name &interest_name,
84  std::array<uint32_t, MAX_AGGREGATED_INTEREST>
85  *additional_suffixes = nullptr,
86  uint32_t len = 0);
87 
88  template <typename FECHandler, typename AllocatorHandler>
89  void enableFEC(FECHandler &&fec_handler,
90  AllocatorHandler &&allocator_handler) {
91  if (!fec_decoder_) {
92  // Try to get FEC from environment
93  if (const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE")) {
94  LOG(INFO) << "Using FEC " << fec_str;
95  fec_type_ = fec::FECUtils::fecTypeFromString(fec_str);
96  }
97 
98  if (fec_type_ == fec::FECType::UNKNOWN) {
99  return;
100  }
101 
102  fec_decoder_ = fec::FECUtils::getDecoder(
103  fec_type_, indexer_verifier_->getFirstSuffix());
104  fec_decoder_->setFECCallback(std::forward<FECHandler>(fec_handler));
105  fec_decoder_->setBufferCallback(
106  std::forward<AllocatorHandler>(allocator_handler));
107  indexer_verifier_->enableFec(fec_type_);
108  }
109  }
110 
111  virtual void reset();
112 
113  private:
114  // Consumer Callback
115  void onContentObject(Interest &i, ContentObject &c) override;
116  void onTimeout(Interest::Ptr &i, const Name &n) override;
117  void onError(std::error_code ec) override {}
118 
119  protected:
120  implementation::ConsumerSocket *socket_;
121  std::unique_ptr<Indexer> indexer_verifier_;
122  std::unique_ptr<Reassembly> reassembly_;
123  std::unique_ptr<fec::ConsumerFEC> fec_decoder_;
124  std::shared_ptr<core::Portal> portal_;
125  // True if it si the first time we schedule an interest
126  std::atomic<bool> is_first_;
127  interface::TransportStatistics *stats_;
128 
129  // Callbacks
130  interface::ConsumerInterestCallback *on_interest_retransmission_;
131  interface::ConsumerInterestCallback *on_interest_output_;
132  interface::ConsumerInterestCallback *on_interest_timeout_;
133  interface::ConsumerInterestCallback *on_interest_satisfied_;
134  interface::ConsumerContentObjectCallback *on_content_object_input_;
135  interface::ConsumerContentObjectCallback *on_content_object_;
136  interface::ConsumerTimerCallback *stats_summary_;
137  ReadCallback *on_payload_;
138 
139  bool is_async_;
140 
141  fec::FECType fec_type_;
142 
143  private:
144  std::atomic<bool> is_running_;
145 };
146 
147 } // end namespace protocol
148 } // end namespace transport
transport::interface::ConsumerSocket
Main interface for consumer applications.
Definition: socket_consumer.h:48
transport::core::Name
Definition: name.h:45
transport::core::ContentObject
Definition: content_object.h:29
transport::protocol::ContentObjectProcessingEventCallback
Definition: data_processing_events.h:24
transport::core::Interest
Definition: interest.h:30
transport
Definition: forwarder_config.h:32
transport::protocol::Indexer
Definition: indexer.h:35
transport::protocol::TransportProtocol
Definition: transport_protocol.h:41
transport::protocol::Reassembly
Definition: reassembly.h:35
transport::protocol::TransportProtocol::transportHeaderLength
virtual std::size_t transportHeaderLength()
Get the size of any additional header added by the specific transport implementation.
Definition: transport_protocol.h:67
transport::interface::Portal::ConsumerCallback
Definition: portal.h:68