18 #include <hicn/transport/auth/signer.h>
19 #include <hicn/transport/utils/event_thread.h>
20 #include <implementation/socket.h>
21 #include <protocols/prod_protocol_bytestream.h>
22 #include <protocols/prod_protocol_rtc.h>
23 #include <utils/content_store.h>
24 #include <utils/suffix_strategy.h>
28 #include <condition_variable>
33 #define REGISTRATION_NOT_ATTEMPTED 0
34 #define REGISTRATION_SUCCESS 1
35 #define REGISTRATION_FAILURE 2
36 #define REGISTRATION_IN_PROGRESS 3
39 namespace implementation {
42 using namespace interface;
47 std::shared_ptr<core::Portal> &&portal)
48 :
Socket(std::move(portal)),
49 producer_interface_(producer_socket),
50 data_packet_size_(default_values::content_object_packet_size),
51 content_object_expiry_time_(default_values::content_object_expiry_time),
53 making_manifest_(
false),
54 hash_algorithm_(auth::CryptoHashType::SHA256),
55 suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
56 on_interest_input_(VOID_HANDLER),
57 on_interest_dropped_input_buffer_(VOID_HANDLER),
58 on_interest_inserted_input_buffer_(VOID_HANDLER),
59 on_interest_satisfied_output_buffer_(VOID_HANDLER),
60 on_interest_process_(VOID_HANDLER),
61 on_new_segment_(VOID_HANDLER),
62 on_content_object_to_sign_(VOID_HANDLER),
63 on_content_object_in_output_buffer_(VOID_HANDLER),
64 on_content_object_output_(VOID_HANDLER),
65 on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
66 on_content_produced_(VOID_HANDLER) {
68 case ProductionProtocolAlgorithms::RTC_PROD:
69 production_protocol_ =
70 std::make_unique<protocol::RTCProductionProtocol>(
this);
72 case ProductionProtocolAlgorithms::BYTE_STREAM:
74 production_protocol_ =
75 std::make_unique<protocol::ByteStreamProductionProtocol>(
this);
82 :
ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
85 asio::io_service &io_service)
87 std::make_shared<core::Portal>(io_service)) {
94 return producer_interface_;
98 producer_interface_ = producer_socket;
101 void connect()
override {
102 portal_->connect(
false);
103 production_protocol_->start();
106 bool isRunning()
override {
return !production_protocol_->isRunning(); };
108 virtual void asyncProduce(
Name content_name,
109 std::unique_ptr<utils::MemBuf> &&buffer,
110 bool is_last, uint32_t offset,
111 uint32_t **last_segment =
nullptr) {
112 if (!async_thread_.stopped()) {
113 auto a = buffer.release();
114 async_thread_.add([
this, content_name, a, is_last, offset,
116 auto buf = std::unique_ptr<utils::MemBuf>(a);
117 if (last_segment != NULL) {
118 **last_segment = offset + produceStream(content_name, std::move(buf),
121 produceStream(content_name, std::move(buf), is_last, offset);
127 virtual uint32_t produceStream(
const Name &content_name,
128 std::unique_ptr<utils::MemBuf> &&buffer,
130 uint32_t start_offset = 0) {
131 return production_protocol_->produceStream(content_name, std::move(buffer),
132 is_last, start_offset);
135 virtual uint32_t produceStream(
const Name &content_name,
136 const uint8_t *buffer,
size_t buffer_size,
138 uint32_t start_offset = 0) {
139 return production_protocol_->produceStream(
140 content_name, buffer, buffer_size, is_last, start_offset);
143 virtual uint32_t produceDatagram(
const Name &content_name,
144 std::unique_ptr<utils::MemBuf> &&buffer) {
145 return production_protocol_->produceDatagram(content_name,
149 virtual uint32_t produceDatagram(
const Name &content_name,
150 const uint8_t *buffer,
size_t buffer_size) {
151 return production_protocol_->produceDatagram(content_name, buffer,
156 production_protocol_->produce(content_object);
159 void registerPrefix(
const Prefix &producer_namespace) {
160 production_protocol_->registerNamespaceWithNetwork(producer_namespace);
163 void stop() { production_protocol_->stop(); }
165 virtual int setSocketOption(
int socket_option_key,
166 uint32_t socket_option_value) {
167 switch (socket_option_key) {
168 case GeneralTransportOptions::DATA_PACKET_SIZE:
169 if (socket_option_value <= default_values::max_content_object_size &&
170 socket_option_value > 0) {
171 data_packet_size_ = socket_option_value;
175 case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
176 production_protocol_->setOutputBufferSize(socket_option_value);
179 case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
180 content_object_expiry_time_ = socket_option_value;
184 return SOCKET_OPTION_NOT_SET;
187 return SOCKET_OPTION_SET;
190 virtual int setSocketOption(
int socket_option_key,
191 std::nullptr_t socket_option_value) {
194 return rescheduleOnIOService(
195 socket_option_key, socket_option_value,
196 [
this](
int socket_option_key,
197 ProducerContentObjectCallback socket_option_value) ->
int {
198 switch (socket_option_key) {
199 case ProducerCallbacksOptions::INTEREST_INPUT:
200 if (socket_option_value == VOID_HANDLER) {
201 on_interest_input_ = VOID_HANDLER;
205 case ProducerCallbacksOptions::INTEREST_DROP:
206 if (socket_option_value == VOID_HANDLER) {
207 on_interest_dropped_input_buffer_ = VOID_HANDLER;
211 case ProducerCallbacksOptions::INTEREST_PASS:
212 if (socket_option_value == VOID_HANDLER) {
213 on_interest_inserted_input_buffer_ = VOID_HANDLER;
217 case ProducerCallbacksOptions::CACHE_HIT:
218 if (socket_option_value == VOID_HANDLER) {
219 on_interest_satisfied_output_buffer_ = VOID_HANDLER;
223 case ProducerCallbacksOptions::CACHE_MISS:
224 if (socket_option_value == VOID_HANDLER) {
225 on_interest_process_ = VOID_HANDLER;
229 case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
230 if (socket_option_value == VOID_HANDLER) {
231 on_new_segment_ = VOID_HANDLER;
235 case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
236 if (socket_option_value == VOID_HANDLER) {
237 on_content_object_in_output_buffer_ = VOID_HANDLER;
241 case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
242 if (socket_option_value == VOID_HANDLER) {
243 on_content_object_output_ = VOID_HANDLER;
247 case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
248 if (socket_option_value == VOID_HANDLER) {
249 on_content_object_to_sign_ = VOID_HANDLER;
254 return SOCKET_OPTION_NOT_SET;
257 return SOCKET_OPTION_SET;
261 virtual int setSocketOption(
int socket_option_key,
bool socket_option_value) {
262 switch (socket_option_key) {
263 case GeneralTransportOptions::MAKE_MANIFEST:
264 making_manifest_ = socket_option_value;
268 return SOCKET_OPTION_NOT_SET;
271 return SOCKET_OPTION_SET;
274 virtual int setSocketOption(
int socket_option_key,
275 Name *socket_option_value) {
276 return SOCKET_OPTION_NOT_SET;
279 virtual int setSocketOption(
280 int socket_option_key,
281 interface::ProducerContentObjectCallback socket_option_value) {
284 return rescheduleOnIOService(
285 socket_option_key, socket_option_value,
286 [
this](
int socket_option_key,
287 ProducerContentObjectCallback socket_option_value) ->
int {
288 switch (socket_option_key) {
289 case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
290 on_new_segment_ = socket_option_value;
293 case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
294 on_content_object_in_output_buffer_ = socket_option_value;
297 case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
298 on_content_object_output_ = socket_option_value;
301 case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
302 on_content_object_to_sign_ = socket_option_value;
306 return SOCKET_OPTION_NOT_SET;
309 return SOCKET_OPTION_SET;
313 virtual int setSocketOption(
314 int socket_option_key,
315 interface::ProducerInterestCallback socket_option_value) {
318 return rescheduleOnIOService(
319 socket_option_key, socket_option_value,
320 [
this](
int socket_option_key,
321 ProducerInterestCallback socket_option_value) ->
int {
322 switch (socket_option_key) {
323 case ProducerCallbacksOptions::INTEREST_INPUT:
324 on_interest_input_ = socket_option_value;
327 case ProducerCallbacksOptions::INTEREST_DROP:
328 on_interest_dropped_input_buffer_ = socket_option_value;
331 case ProducerCallbacksOptions::INTEREST_PASS:
332 on_interest_inserted_input_buffer_ = socket_option_value;
335 case ProducerCallbacksOptions::CACHE_HIT:
336 on_interest_satisfied_output_buffer_ = socket_option_value;
339 case ProducerCallbacksOptions::CACHE_MISS:
340 on_interest_process_ = socket_option_value;
344 return SOCKET_OPTION_NOT_SET;
347 return SOCKET_OPTION_SET;
351 virtual int setSocketOption(
352 int socket_option_key,
353 interface::ProducerContentCallback socket_option_value) {
356 return rescheduleOnIOService(
357 socket_option_key, socket_option_value,
358 [
this](
int socket_option_key,
359 ProducerContentCallback socket_option_value) ->
int {
360 switch (socket_option_key) {
361 case ProducerCallbacksOptions::CONTENT_PRODUCED:
362 on_content_produced_ = socket_option_value;
366 return SOCKET_OPTION_NOT_SET;
369 return SOCKET_OPTION_SET;
373 virtual int setSocketOption(
int socket_option_key,
374 auth::CryptoHashType socket_option_value) {
375 switch (socket_option_key) {
376 case GeneralTransportOptions::HASH_ALGORITHM:
377 hash_algorithm_ = socket_option_value;
380 return SOCKET_OPTION_NOT_SET;
383 return SOCKET_OPTION_SET;
386 virtual int setSocketOption(
387 int socket_option_key,
388 core::NextSegmentCalculationStrategy socket_option_value) {
389 switch (socket_option_key) {
390 case GeneralTransportOptions::SUFFIX_STRATEGY:
391 suffix_strategy_ = socket_option_value;
394 return SOCKET_OPTION_NOT_SET;
397 return SOCKET_OPTION_SET;
400 virtual int setSocketOption(
401 int socket_option_key,
402 const std::shared_ptr<auth::Signer> &socket_option_value) {
403 switch (socket_option_key) {
404 case GeneralTransportOptions::SIGNER: {
407 signer_ = socket_option_value;
410 return SOCKET_OPTION_NOT_SET;
413 return SOCKET_OPTION_SET;
416 virtual int getSocketOption(
int socket_option_key,
417 uint32_t &socket_option_value) {
418 switch (socket_option_key) {
419 case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
420 socket_option_value = production_protocol_->getOutputBufferSize();
423 case GeneralTransportOptions::DATA_PACKET_SIZE:
424 socket_option_value = (uint32_t)data_packet_size_;
427 case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
428 socket_option_value = content_object_expiry_time_;
432 return SOCKET_OPTION_NOT_SET;
435 return SOCKET_OPTION_GET;
438 virtual int getSocketOption(
int socket_option_key,
439 bool &socket_option_value) {
440 switch (socket_option_key) {
441 case GeneralTransportOptions::MAKE_MANIFEST:
442 socket_option_value = making_manifest_;
445 case GeneralTransportOptions::ASYNC_MODE:
446 socket_option_value = is_async_;
450 return SOCKET_OPTION_NOT_GET;
453 return SOCKET_OPTION_GET;
456 virtual int getSocketOption(
457 int socket_option_key,
458 interface::ProducerContentObjectCallback **socket_option_value) {
461 return rescheduleOnIOService(
462 socket_option_key, socket_option_value,
463 [
this](
int socket_option_key,
464 ProducerContentObjectCallback **socket_option_value) ->
int {
465 switch (socket_option_key) {
466 case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
467 *socket_option_value = &on_new_segment_;
470 case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
471 *socket_option_value = &on_content_object_in_output_buffer_;
474 case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
475 *socket_option_value = &on_content_object_output_;
478 case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
479 *socket_option_value = &on_content_object_to_sign_;
483 return SOCKET_OPTION_NOT_GET;
486 return SOCKET_OPTION_GET;
490 virtual int getSocketOption(
491 int socket_option_key,
492 interface::ProducerContentCallback **socket_option_value) {
495 return rescheduleOnIOService(
496 socket_option_key, socket_option_value,
497 [
this](
int socket_option_key,
498 ProducerContentCallback **socket_option_value) ->
int {
499 switch (socket_option_key) {
500 case ProducerCallbacksOptions::CONTENT_PRODUCED:
501 *socket_option_value = &on_content_produced_;
505 return SOCKET_OPTION_NOT_GET;
508 return SOCKET_OPTION_GET;
512 virtual int getSocketOption(
513 int socket_option_key,
514 interface::ProducerInterestCallback **socket_option_value) {
517 return rescheduleOnIOService(
518 socket_option_key, socket_option_value,
519 [
this](
int socket_option_key,
520 ProducerInterestCallback **socket_option_value) ->
int {
521 switch (socket_option_key) {
522 case ProducerCallbacksOptions::INTEREST_INPUT:
523 *socket_option_value = &on_interest_input_;
526 case ProducerCallbacksOptions::INTEREST_DROP:
527 *socket_option_value = &on_interest_dropped_input_buffer_;
530 case ProducerCallbacksOptions::INTEREST_PASS:
531 *socket_option_value = &on_interest_inserted_input_buffer_;
534 case ProducerCallbacksOptions::CACHE_HIT:
535 *socket_option_value = &on_interest_satisfied_output_buffer_;
538 case ProducerCallbacksOptions::CACHE_MISS:
539 *socket_option_value = &on_interest_process_;
543 return SOCKET_OPTION_NOT_GET;
546 return SOCKET_OPTION_GET;
550 virtual int getSocketOption(
551 int socket_option_key,
552 std::shared_ptr<core::Portal> &socket_option_value) {
553 switch (socket_option_key) {
555 socket_option_value = portal_;
558 return SOCKET_OPTION_NOT_GET;
562 return SOCKET_OPTION_GET;
565 virtual int getSocketOption(
int socket_option_key,
566 auth::CryptoHashType &socket_option_value) {
567 switch (socket_option_key) {
568 case GeneralTransportOptions::HASH_ALGORITHM:
569 socket_option_value = hash_algorithm_;
572 return SOCKET_OPTION_NOT_GET;
575 return SOCKET_OPTION_GET;
578 virtual int getSocketOption(
579 int socket_option_key,
580 core::NextSegmentCalculationStrategy &socket_option_value) {
581 switch (socket_option_key) {
582 case GeneralTransportOptions::SUFFIX_STRATEGY:
583 socket_option_value = suffix_strategy_;
586 return SOCKET_OPTION_NOT_GET;
588 return SOCKET_OPTION_GET;
591 virtual int getSocketOption(
592 int socket_option_key,
593 std::shared_ptr<auth::Signer> &socket_option_value) {
594 switch (socket_option_key) {
595 case GeneralTransportOptions::SIGNER: {
597 socket_option_value = signer_;
600 return SOCKET_OPTION_NOT_GET;
603 return SOCKET_OPTION_GET;
606 virtual int setSocketOption(
int socket_option_key,
607 const std::string &socket_option_value) {
608 return SOCKET_OPTION_NOT_SET;
613 template <
typename Lambda,
typename arg2>
614 int rescheduleOnIOServiceWithReference(
int socket_option_key,
615 arg2 &socket_option_value,
616 Lambda lambda_func) {
618 std::function<int(
int, arg2 &)> func = lambda_func;
619 int result = SOCKET_OPTION_SET;
620 if (production_protocol_ && production_protocol_->isRunning()) {
623 std::condition_variable cv;
626 portal_->getIoService().dispatch([&socket_option_key,
627 &socket_option_value, &mtx, &cv,
628 &result, &done, &func]() {
629 std::unique_lock<std::mutex> lck(mtx);
631 result = func(socket_option_key, socket_option_value);
634 std::unique_lock<std::mutex> lck(mtx);
639 result = func(socket_option_key, socket_option_value);
647 template <
typename Lambda,
typename arg2>
648 int rescheduleOnIOService(
int socket_option_key, arg2 socket_option_value,
649 Lambda lambda_func) {
651 std::function<int(
int, arg2)> func = lambda_func;
652 int result = SOCKET_OPTION_SET;
653 if (production_protocol_ && production_protocol_->isRunning()) {
656 std::condition_variable cv;
658 portal_->getIoService().dispatch([&socket_option_key,
659 &socket_option_value, &mtx, &cv,
660 &result, &done, &func]() {
661 std::unique_lock<std::mutex> lck(mtx);
663 result = func(socket_option_key, socket_option_value);
666 std::unique_lock<std::mutex> lck(mtx);
671 result = func(socket_option_key, socket_option_value);
680 asio::io_service io_service_;
681 std::atomic<size_t> data_packet_size_;
682 std::atomic<uint32_t> content_object_expiry_time_;
686 std::atomic<bool> making_manifest_;
687 std::atomic<auth::CryptoHashType> hash_algorithm_;
688 std::atomic<auth::CryptoSuite> crypto_suite_;
690 std::shared_ptr<auth::Signer> signer_;
691 core::NextSegmentCalculationStrategy suffix_strategy_;
693 std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
696 ProducerInterestCallback on_interest_input_;
697 ProducerInterestCallback on_interest_dropped_input_buffer_;
698 ProducerInterestCallback on_interest_inserted_input_buffer_;
699 ProducerInterestCallback on_interest_satisfied_output_buffer_;
700 ProducerInterestCallback on_interest_process_;
702 ProducerContentObjectCallback on_new_segment_;
703 ProducerContentObjectCallback on_content_object_to_sign_;
704 ProducerContentObjectCallback on_content_object_in_output_buffer_;
705 ProducerContentObjectCallback on_content_object_output_;
706 ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
708 ProducerContentCallback on_content_produced_;