17 #ifndef IGN_TRANSPORT_NODE_HH_
18 #define IGN_TRANSPORT_NODE_HH_
25 #include <unordered_set>
30 #pragma warning(push, 0)
32 #include <ignition/msgs.hh>
66 class PublisherPrivate;
96 public:
operator bool();
101 public:
bool Valid()
const;
106 public:
bool Publish(
const ProtoMsg &_msg);
111 private:
bool UpdateThrottling();
115 public:
bool HasConnections()
const;
122 private: std::shared_ptr<PublisherPrivate> dataPtr;
130 public:
virtual ~
Node();
141 const std::string &_topic,
144 return this->Advertise(_topic, T().GetTypeName(), _options);
159 const std::string &_msgTypeName,
162 std::string fullyQualifiedTopic;
164 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
166 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
170 auto currentTopics = this->AdvertisedTopics();
172 if (std::find(currentTopics.begin(), currentTopics.end(),
173 fullyQualifiedTopic) != currentTopics.end())
175 std::cerr <<
"Topic [" << _topic <<
"] already advertised. You cannot"
176 <<
" advertise the same topic twice on the same node."
177 <<
" If you want to advertise the same topic with different"
178 <<
" types, use separate nodes" << std::endl;
182 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
186 this->Shared()->myAddress,
187 this->Shared()->myControlAddress,
188 this->Shared()->pUuid, this->NodeUuid(), _msgTypeName, _options);
190 if (!this->Shared()->msgDiscovery->Advertise(publisher))
192 std::cerr <<
"Node::Advertise(): Error advertising a topic. "
193 <<
"Did you forget to start the discovery service?"
203 public: std::vector<std::string> AdvertisedTopics()
const;
215 const std::string &_topic,
216 void(*_cb)(
const T &_msg),
219 std::function<void(const T &, const MessageInfo &)> f =
222 (*_cb)(_internalMsg);
225 return this->Subscribe<T>(_topic, f, _opts);
237 const std::string &_topic,
238 std::function<
void(
const T &_msg)> &_cb,
241 std::function<void(const T &, const MessageInfo &)> f =
247 return this->Subscribe<T>(_topic, f, _opts);
260 public:
template<
typename C,
typename T>
bool Subscribe(
261 const std::string &_topic,
262 void(C::*_cb)(
const T &_msg),
266 std::function<void(const T &, const MessageInfo &)> f =
267 [_cb, _obj](
const T & _internalMsg,
270 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
274 return this->Subscribe<T>(_topic, f, _opts);
288 const std::string &_topic,
289 void(*_cb)(
const T &_msg,
const MessageInfo &_info),
292 std::function<void(const T &, const MessageInfo &)> f =
293 [_cb](
const T & _internalMsg,
const MessageInfo &_internalInfo)
295 (*_cb)(_internalMsg, _internalInfo);
298 return this->Subscribe<T>(_topic, f, _opts);
311 const std::string &_topic,
312 std::function<
void(
const T &_msg,
const MessageInfo &_info)> &_cb,
315 std::string fullyQualifiedTopic;
317 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
319 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
324 std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
328 subscrHandlerPtr->SetCallback(_cb);
330 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
336 this->Shared()->localSubscriptions.AddHandler(
337 fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
340 this->TopicsSubscribed().insert(fullyQualifiedTopic);
343 if (!this->Shared()->msgDiscovery->Discover(fullyQualifiedTopic))
345 std::cerr <<
"Node::Subscribe(): Error discovering a topic. "
346 <<
"Did you forget to start the discovery service?"
365 public:
template<
typename C,
typename T>
bool Subscribe(
366 const std::string &_topic,
367 void(C::*_cb)(
const T &_msg,
const MessageInfo &_info),
371 std::function<void(const T &, const MessageInfo &)> f =
372 [_cb, _obj](
const T & _internalMsg,
const MessageInfo &_internalInfo)
374 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
375 std::placeholders::_2);
376 cb(_internalMsg, _internalInfo);
379 return this->Subscribe<T>(_topic, f, _opts);
387 public: std::vector<std::string> SubscribedTopics()
const;
392 public:
bool Unsubscribe(
const std::string &_topic);
406 public:
template<
typename T1,
typename T2>
bool Advertise(
407 const std::string &_topic,
408 void(*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
411 std::function<void(const T1 &, T2 &, bool &)> f =
412 [_cb](
const T1 &_internalReq, T2 &_internalRep,
bool &_internalResult)
414 (*_cb)(_internalReq, _internalRep, _internalResult);
417 return this->Advertise<T1, T2>(_topic, f, _options);
432 const std::string &_topic,
433 void(*_cb)(T &_rep,
bool &_result),
436 std::function<void(const msgs::Empty &, T &, bool &)> f =
437 [_cb](
const msgs::Empty &, T &_internalRep,
438 bool &_internalResult)
440 (*_cb)(_internalRep, _internalResult);
442 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
456 const std::string &_topic,
457 void(*_cb)(
const T &_req),
460 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
461 [_cb](
const T &_internalReq, ignition::msgs::Empty &,
464 (*_cb)(_internalReq);
467 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
482 public:
template<
typename T1,
typename T2>
bool Advertise(
483 const std::string &_topic,
484 std::function<
void(
const T1 &_req, T2 &_rep,
bool &_result)> &_cb,
487 std::string fullyQualifiedTopic;
489 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
491 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
496 std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
500 repHandlerPtr->SetCallback(_cb);
502 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
505 this->SrvsAdvertised().insert(fullyQualifiedTopic);
511 this->Shared()->repliers.AddHandler(
512 fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
516 this->Shared()->myReplierAddress,
517 this->Shared()->replierId.ToString(),
518 this->Shared()->pUuid, this->NodeUuid(),
519 T1().GetTypeName(), T2().GetTypeName(), _options);
521 if (!this->Shared()->srvDiscovery->Advertise(publisher))
523 std::cerr <<
"Node::Advertise(): Error advertising a service. "
524 <<
"Did you forget to start the discovery service?"
544 const std::string &_topic,
545 std::function<
void(T &_rep,
bool &_result)> &_cb,
548 std::function<void(const msgs::Empty &, T &, bool &)> f =
549 [_cb](
const msgs::Empty &, T &_internalRep,
550 bool &_internalResult)
552 (_cb)(_internalRep, _internalResult);
554 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
568 const std::string &_topic,
569 std::function<
void(
const T &_req)> &_cb,
572 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
573 [_cb](
const T &_internalReq, ignition::msgs::Empty &,
579 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
595 public:
template<
typename C,
typename T1,
typename T2>
bool Advertise(
596 const std::string &_topic,
597 void(C::*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
601 std::function<void(const T1 &, T2 &, bool &)> f =
602 [_cb, _obj](
const T1 &_internalReq,
604 bool &_internalResult)
606 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
607 std::placeholders::_2, std::placeholders::_3);
608 cb(_internalReq, _internalRep, _internalResult);
611 return this->Advertise<T1, T2>(_topic, f, _options);
626 public:
template<
typename C,
typename T>
bool Advertise(
627 const std::string &_topic,
628 void(C::*_cb)(T &_rep,
bool &_result),
632 std::function<void(const msgs::Empty &, T &, bool &)> f =
633 [_cb, _obj](
const msgs::Empty &, T &_internalRep,
634 bool &_internalResult)
636 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
637 std::placeholders::_2);
638 cb(_internalRep, _internalResult);
641 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
655 public:
template<
typename C,
typename T>
bool Advertise(
656 const std::string &_topic,
657 void(C::*_cb)(
const T &_req),
661 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
662 [_cb, _obj](
const T &_internalReq,
663 ignition::msgs::Empty &,
666 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
670 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
675 public: std::vector<std::string> AdvertisedServices()
const;
687 public:
template<
typename T1,
typename T2>
bool Request(
688 const std::string &_topic,
690 void(*_cb)(
const T2 &_rep,
const bool _result))
692 std::function<void(const T2 &, const bool)> f =
693 [_cb](
const T2 &_internalRep,
const bool _internalResult)
695 (*_cb)(_internalRep, _internalResult);
698 return this->Request<T1, T2>(_topic, _req, f);
712 const std::string &_topic,
713 void(*_cb)(
const T &_rep,
const bool _result))
716 return this->Request(_topic, req, _cb);
729 public:
template<
typename T1,
typename T2>
bool Request(
730 const std::string &_topic,
732 std::function<
void(
const T2 &_rep,
const bool _result)> &_cb)
734 std::string fullyQualifiedTopic;
736 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
738 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
742 bool localResponserFound;
745 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
746 localResponserFound = this->Shared()->repliers.FirstHandler(
747 fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
752 if (localResponserFound)
757 repHandler->RunLocalCallback(_req, rep, result);
764 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
768 reqHandlerPtr->SetMessage(&_req);
771 reqHandlerPtr->SetCallback(_cb);
774 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
777 this->Shared()->requests.AddHandler(
778 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
782 if (this->Shared()->srvDiscovery->Publishers(
783 fullyQualifiedTopic, addresses))
785 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
786 T1().GetTypeName(), T2().GetTypeName());
791 if (!this->Shared()->srvDiscovery->Discover(fullyQualifiedTopic))
793 std::cerr <<
"Node::Request(): Error discovering a service. "
794 <<
"Did you forget to start the discovery service?"
815 const std::string &_topic,
816 std::function<
void(
const T &_rep,
const bool _result)> &_cb)
819 return this->Request(_topic, req, _cb);
833 public:
template<
typename C,
typename T1,
typename T2>
bool Request(
834 const std::string &_topic,
836 void(C::*_cb)(
const T2 &_rep,
const bool _result),
839 std::function<void(const T2 &, const bool)> f =
840 [_cb, _obj](
const T2 &_internalRep,
const bool _internalResult)
842 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
843 std::placeholders::_2);
844 cb(_internalRep, _internalResult);
847 return this->Request<T1, T2>(_topic, _req, f);
861 public:
template<
typename C,
typename T>
bool Request(
862 const std::string &_topic,
863 void(C::*_cb)(
const T &_rep,
const bool _result),
867 return this->Request(_topic, req, _cb, _obj);
878 public:
template<
typename T1,
typename T2>
bool Request(
879 const std::string &_topic,
881 const unsigned int &_timeout,
885 std::string fullyQualifiedTopic;
887 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
889 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
894 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
898 reqHandlerPtr->SetMessage(&_req);
899 reqHandlerPtr->SetResponse(&_rep);
901 std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
905 if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
906 _req.GetTypeName(), _rep.GetTypeName(), repHandler))
909 repHandler->RunLocalCallback(_req, _rep, _result);
914 this->Shared()->requests.AddHandler(
915 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
919 if (this->Shared()->srvDiscovery->Publishers(
920 fullyQualifiedTopic, addresses))
922 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
923 _req.GetTypeName(), _rep.GetTypeName());
928 if (!this->Shared()->srvDiscovery->Discover(fullyQualifiedTopic))
930 std::cerr <<
"Node::Request(): Error discovering a service. "
931 <<
"Did you forget to start the discovery service?"
938 bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
945 if (!reqHandlerPtr->Result())
952 if (!_rep.ParseFromString(reqHandlerPtr->Response()))
954 std::cerr <<
"Node::Request(): Error Parsing the response"
973 const std::string &_topic,
974 const unsigned int &_timeout,
979 return this->Request(_topic, req, _timeout, _rep, _result);
986 public:
template<
typename T>
bool Request(
const std::string &_topic,
991 std::function<void(const ignition::msgs::Empty &, const bool)> f =
992 [](
const ignition::msgs::Empty &,
const bool)
996 return this->Request<T, ignition::msgs::Empty>(_topic, _req, f);
1002 public:
bool UnadvertiseSrv(
const std::string &_topic);
1010 public:
void TopicList(std::vector<std::string> &_topics)
const;
1016 public:
bool TopicInfo(
const std::string &_topic,
1017 std::vector<MessagePublisher> &_publishers)
const;
1025 public:
void ServiceList(std::vector<std::string> &_services)
const;
1031 public:
bool ServiceInfo(
const std::string &_service,
1032 std::vector<ServicePublisher> &_publishers)
const;
1036 private:
const std::string &Partition()
const;
1040 private:
const std::string &NameSpace()
const;
1049 private:
const std::string &NodeUuid()
const;
1053 private: std::unordered_set<std::string> &TopicsSubscribed()
const;
1057 private: std::unordered_set<std::string> &SrvsAdvertised()
const;
1065 private: std::unique_ptr<transport::NodePrivate> dataPtr;
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
bool Request(const std::string &_topic, std::function< void(const T &_rep, const bool _result)> &_cb)
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:814
A class for customizing the publication options for a topic advertised.
Definition: AdvertiseOptions.hh:136
bool Request(const std::string &_topic, void(C::*_cb)(const T &_rep, const bool _result), C *_obj)
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:861
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:687
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service.
Definition: Node.hh:595
bool Advertise(const std::string &_topic, std::function< void(const T &_req)> &_cb, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:567
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
A class that is used to store information about an advertised publisher.
Definition: Node.hh:81
IGNITION_TRANSPORT_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Advertise(const std::string &_topic, void(C::*_cb)(const T &_req), C *_obj, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:655
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg), const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:214
ignition/transport/SubscribeOptions.hh
Definition: SubscribeOptions.hh:35
Node::Publisher Advertise(const std::string &_topic, const AdvertiseMessageOptions &_options=AdvertiseMessageOptions())
Advertise a new topic.
Definition: Node.hh:140
bool Advertise(const std::string &_topic, std::function< void(T &_rep, bool &_result)> &_cb, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without input parameter.
Definition: Node.hh:543
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb, const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:236
Private data for the Node class.
Definition: NodeShared.hh:54
Addresses_M< ServicePublisher > SrvAddresses_M
Definition: TransportTypes.hh:61
A class that provides information about the message received.
Definition: MessageInfo.hh:33
bool Request(const std::string &_topic, void(*_cb)(const T &_rep, const bool _result))
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:711
bool Advertise(const std::string &_topic, void(*_cb)(const T &_req), const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:455
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:65
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:85
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:729
bool Advertise(const std::string &_topic, void(*_cb)(T &_rep, bool &_result), const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without input parameter.
Definition: Node.hh:431
bool Advertise(const std::string &_topic, void(C::*_cb)(T &_rep, bool &_result), C *_obj, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service without input parameter.
Definition: Node.hh:626
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg, const MessageInfo &_info), const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:287
A class for customizing the publication options for a service advertised.
Definition: AdvertiseOptions.hh:224
bool Request(const std::string &_topic, const unsigned int &_timeout, T &_rep, bool &_result)
Request a new service without input parameter using a blocking call.
Definition: Node.hh:972
with the service response.
Definition: RepHandler.hh:102
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:57
A class that allows a client to communicate with other peers.
Definition: Node.hh:64
ignition/transport/Publisher.hh
Definition: Publisher.hh:198
ignition/transport/Publisher.hh
Definition: Publisher.hh:314
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg, const MessageInfo &_info), C *_obj, const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:365
bool Request(const std::string &_topic, const T &_req)
Request a new service without waiting for response.
Definition: Node.hh:986
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service.
Definition: Node.hh:482
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:150
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseServiceOptions &_options=AdvertiseServiceOptions())
Advertise a new service.
Definition: Node.hh:406
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:878
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:833
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg, const MessageInfo &_info)> &_cb, const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:310
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj, const SubscribeOptions &_opts=SubscribeOptions())
Subscribe to a topic registering a callback.
Definition: Node.hh:260
Node::Publisher Advertise(const std::string &_topic, const std::string &_msgTypeName, const AdvertiseMessageOptions &_options=AdvertiseMessageOptions())
Advertise a new topic.
Definition: Node.hh:158