Hybrid ICN (hICN) plugin  v21.06-rc0-4-g18fa668
production_protocol.h
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <hicn/transport/interfaces/callbacks.h>
19 #include <hicn/transport/interfaces/socket_producer.h>
20 #include <hicn/transport/interfaces/statistics.h>
21 #include <hicn/transport/utils/object_pool.h>
22 #include <implementation/socket.h>
23 #include <protocols/fec_base.h>
24 #include <protocols/fec_utils.h>
25 #include <utils/content_store.h>
26 
27 #include <atomic>
28 #include <thread>
29 
30 namespace transport {
31 
32 namespace protocol {
33 
34 using namespace core;
35 
37  public:
39  virtual ~ProductionProtocol();
40 
41  bool isRunning() { return is_running_; }
42 
43  virtual int start();
44  virtual void stop();
45 
46  virtual void produce(ContentObject &content_object);
47  virtual uint32_t produceStream(const Name &content_name,
48  std::unique_ptr<utils::MemBuf> &&buffer,
49  bool is_last = true,
50  uint32_t start_offset = 0) = 0;
51  virtual uint32_t produceStream(const Name &content_name,
52  const uint8_t *buffer, size_t buffer_size,
53  bool is_last = true,
54  uint32_t start_offset = 0) = 0;
55  virtual uint32_t produceDatagram(const Name &content_name,
56  std::unique_ptr<utils::MemBuf> &&buffer) = 0;
57  virtual uint32_t produceDatagram(const Name &content_name,
58  const uint8_t *buffer,
59  size_t buffer_size) = 0;
60 
61  void setOutputBufferSize(std::size_t size) { output_buffer_.setLimit(size); }
62  std::size_t getOutputBufferSize() { return output_buffer_.getLimit(); }
63 
64  virtual void registerNamespaceWithNetwork(const Prefix &producer_namespace);
65  const std::list<Prefix> &getNamespaces() const { return served_namespaces_; }
66 
67  protected:
68  // Producer callback
69  virtual void onInterest(core::Interest &i) override = 0;
70  virtual void onError(std::error_code ec) override{};
71 
72  template <typename FECHandler, typename AllocatorHandler>
73  void enableFEC(FECHandler &&fec_handler,
74  AllocatorHandler &&allocator_handler) {
75  if (!fec_encoder_) {
76  // Try to get FEC from environment
77  if (const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE")) {
78  LOG(INFO) << "Using FEC " << fec_str;
79  fec_type_ = fec::FECUtils::fecTypeFromString(fec_str);
80  }
81 
82  if (fec_type_ == fec::FECType::UNKNOWN) {
83  return;
84  }
85 
86  fec_encoder_ = fec::FECUtils::getEncoder(fec_type_, 1);
87  fec_encoder_->setFECCallback(std::forward<FECHandler>(fec_handler));
88  fec_encoder_->setBufferCallback(
89  std::forward<AllocatorHandler>(allocator_handler));
90  }
91  }
92 
93  protected:
95 
96  // Thread pool responsible for IO operations (send data / receive interests)
97  std::vector<utils::EventThread> io_threads_;
98 
99  // TODO remove this thread
100  std::thread listening_thread_;
101  std::shared_ptr<Portal> portal_;
102  std::atomic<bool> is_running_;
104  std::unique_ptr<fec::ProducerFEC> fec_encoder_;
105 
106  // Callbacks
107  interface::ProducerInterestCallback *on_interest_input_;
108  interface::ProducerInterestCallback *on_interest_dropped_input_buffer_;
109  interface::ProducerInterestCallback *on_interest_inserted_input_buffer_;
110  interface::ProducerInterestCallback *on_interest_satisfied_output_buffer_;
111  interface::ProducerInterestCallback *on_interest_process_;
112 
113  interface::ProducerContentObjectCallback *on_new_segment_;
114  interface::ProducerContentObjectCallback *on_content_object_to_sign_;
115  interface::ProducerContentObjectCallback *on_content_object_in_output_buffer_;
116  interface::ProducerContentObjectCallback *on_content_object_output_;
117  interface::ProducerContentObjectCallback
118  *on_content_object_evicted_from_output_buffer_;
119 
120  interface::ProducerContentCallback *on_content_produced_;
121 
122  // Output buffer
123  utils::ContentStore output_buffer_;
124 
125  // List ot routes served by current producer protocol
126  std::list<Prefix> served_namespaces_;
127 
128  // Signature and manifest
129  std::shared_ptr<auth::Signer> signer_;
130  bool making_manifest_;
131 
132  bool is_async_;
133  fec::FECType fec_type_;
134 };
135 
136 } // end namespace protocol
137 } // end namespace transport
transport::core::Name
Definition: name.h:45
transport::protocol::ProductionProtocol
Definition: production_protocol.h:36
utils::ContentStore
Definition: content_store.h:47
transport::interface::ProducerSocket
Definition: socket_producer.h:37
transport::core::ContentObject
Definition: content_object.h:29
transport::core::Interest
Definition: interest.h:30
transport
Definition: forwarder_config.h:32
transport::interface::Portal::ProducerCallback
Definition: portal.h:79
transport::core::Prefix
Definition: prefix.h:24
transport::interface::ProductionStatistics
Definition: statistics.h:34