17 #ifndef IGN_TRANSPORT_NODE_HH_ 18 #define IGN_TRANSPORT_NODE_HH_ 21 #pragma warning(push, 0) 23 #include <google/protobuf/message.h> 34 #include <unordered_set> 39 #pragma warning(push, 0) 41 #include <ignition/msgs.hh> 94 public:
explicit PublisherId(
const std::string &_topic);
99 public:
operator bool();
105 public:
bool Valid()
const;
110 public: std::string Topic()
const;
113 private: std::string topic =
"";
121 public:
virtual ~
Node();
132 const std::string &_topic,
135 return this->Advertise(_topic, T().GetTypeName(), _options);
150 const std::string &_msgTypeName,
153 std::string fullyQualifiedTopic;
155 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
157 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
161 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
163 auto currentTopics = this->TopicsAdvertised();
165 if (currentTopics.find(fullyQualifiedTopic) != currentTopics.end())
167 std::cerr <<
"Topic [" << _topic <<
"] already advertised. You cannot" 168 <<
" advertise the same topic twice on the same node." 169 <<
" If you want to advertise the same topic with different" 170 <<
" types, use separate nodes" << std::endl;
175 this->TopicsAdvertised().insert(fullyQualifiedTopic);
179 this->Shared()->myAddress,
180 this->Shared()->myControlAddress,
181 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
184 if (!this->Shared()->msgDiscovery->Advertise(publisher))
186 std::cerr <<
"Node::Advertise(): Error advertising a topic. " 187 <<
"Did you forget to start the discovery service?" 197 public: std::vector<std::string> AdvertisedTopics()
const;
202 public:
bool Unadvertise(
const std::string &_topic);
208 public:
bool Publish(
const std::string &_topic,
227 const std::string &_topic,
228 void(*_cb)(
const T &_msg))
230 std::function<void(const T &)> f = [_cb](
const T & _internalMsg)
232 (*_cb)(_internalMsg);
235 return this->Subscribe<T>(_topic, f);
245 const std::string &_topic,
246 std::function<
void(
const T &_msg)> &_cb)
248 std::string fullyQualifiedTopic;
250 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
252 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
257 std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
261 subscrHandlerPtr->SetCallback(_cb);
263 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
269 this->Shared()->localSubscriptions.AddHandler(
270 fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
273 this->TopicsSubscribed().insert(fullyQualifiedTopic);
276 if (!this->Shared()->msgDiscovery->Discover(fullyQualifiedTopic))
278 std::cerr <<
"Node::Subscribe(): Error discovering a topic. " 279 <<
"Did you forget to start the discovery service?" 295 public:
template<
typename C,
typename T>
bool Subscribe(
296 const std::string &_topic,
297 void(C::*_cb)(
const T &_msg),
300 std::function<void(const T &)> f = [_cb, _obj](
const T & _internalMsg)
302 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
306 return this->Subscribe<T>(_topic, f);
314 public: std::vector<std::string> SubscribedTopics()
const;
319 public:
bool Unsubscribe(
const std::string &_topic);
333 public:
template<
typename T1,
typename T2>
bool Advertise(
334 const std::string &_topic,
335 void(*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
338 std::function<void(const T1 &, T2 &, bool &)> f =
339 [_cb](
const T1 &_internalReq, T2 &_internalRep,
bool &_internalResult)
341 (*_cb)(_internalReq, _internalRep, _internalResult);
344 return this->Advertise<T1, T2>(_topic, f, _options);
359 const std::string &_topic,
360 void(*_cb)(T &_rep,
bool &_result),
363 std::function<void(const msgs::Empty &, T &, bool &)> f =
364 [_cb](
const msgs::Empty &, T &_internalRep,
365 bool &_internalResult)
367 (*_cb)(_internalRep, _internalResult);
369 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
383 const std::string &_topic,
384 void(*_cb)(
const T &_req),
387 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
388 [_cb](
const T &_internalReq, ignition::msgs::Empty &,
391 (*_cb)(_internalReq);
394 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
409 public:
template<
typename T1,
typename T2>
bool Advertise(
410 const std::string &_topic,
411 std::function<
void(
const T1 &_req, T2 &_rep,
bool &_result)> &_cb,
414 std::string fullyQualifiedTopic;
416 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
418 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
423 std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
427 repHandlerPtr->SetCallback(_cb);
429 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
432 this->SrvsAdvertised().insert(fullyQualifiedTopic);
438 this->Shared()->repliers.AddHandler(
439 fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
443 this->Shared()->myReplierAddress,
444 this->Shared()->replierId.ToString(),
445 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
446 T1().GetTypeName(), T2().GetTypeName());
448 if (!this->Shared()->srvDiscovery->Advertise(publisher))
450 std::cerr <<
"Node::Advertise(): Error advertising a service. " 451 <<
"Did you forget to start the discovery service?" 471 const std::string &_topic,
472 std::function<
void(T &_rep,
bool &_result)> &_cb,
475 std::function<void(const msgs::Empty &, T &, bool &)> f =
476 [_cb](
const msgs::Empty &, T &_internalRep,
477 bool &_internalResult)
479 (_cb)(_internalRep, _internalResult);
481 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
495 const std::string &_topic,
496 std::function<
void(
const T &_req)> &_cb,
499 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
500 [_cb](
const T &_internalReq, ignition::msgs::Empty &,
506 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
522 public:
template<
typename C,
typename T1,
typename T2>
bool Advertise(
523 const std::string &_topic,
524 void(C::*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
528 std::function<void(const T1 &, T2 &, bool &)> f =
529 [_cb, _obj](
const T1 &_internalReq,
531 bool &_internalResult)
533 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
534 std::placeholders::_2, std::placeholders::_3);
535 cb(_internalReq, _internalRep, _internalResult);
538 return this->Advertise<T1, T2>(_topic, f, _options);
553 public:
template<
typename C,
typename T>
bool Advertise(
554 const std::string &_topic,
555 void(C::*_cb)(T &_rep,
bool &_result),
559 std::function<void(const msgs::Empty &, T &, bool &)> f =
560 [_cb, _obj](
const msgs::Empty &, T &_internalRep,
561 bool &_internalResult)
563 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
564 std::placeholders::_2);
565 cb(_internalRep, _internalResult);
568 return this->Advertise<msgs::Empty, T>(_topic, f, _options);
582 public:
template<
typename C,
typename T>
bool Advertise(
583 const std::string &_topic,
584 void(C::*_cb)(
const T &_req),
588 std::function<void(const T &, ignition::msgs::Empty &, bool &)> f =
589 [_cb, _obj](
const T &_internalReq,
590 ignition::msgs::Empty &,
593 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
597 return this->Advertise<T, ignition::msgs::Empty>(_topic, f, _options);
602 public: std::vector<std::string> AdvertisedServices()
const;
614 public:
template<
typename T1,
typename T2>
bool Request(
615 const std::string &_topic,
617 void(*_cb)(
const T2 &_rep,
const bool _result))
619 std::function<void(const T2 &, const bool)> f =
620 [_cb](
const T2 &_internalRep,
const bool _internalResult)
622 (*_cb)(_internalRep, _internalResult);
625 return this->Request<T1, T2>(_topic, _req, f);
639 const std::string &_topic,
640 void(*_cb)(
const T &_rep,
const bool _result))
643 return this->Request(_topic, req, _cb);
656 public:
template<
typename T1,
typename T2>
bool Request(
657 const std::string &_topic,
659 std::function<
void(
const T2 &_rep,
const bool _result)> &_cb)
661 std::string fullyQualifiedTopic;
663 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
665 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
669 bool localResponserFound;
672 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
673 localResponserFound = this->Shared()->repliers.FirstHandler(
674 fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
679 if (localResponserFound)
684 repHandler->RunLocalCallback(_req, rep, result);
691 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
695 reqHandlerPtr->SetMessage(_req);
698 reqHandlerPtr->SetCallback(_cb);
701 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
704 this->Shared()->requests.AddHandler(
705 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
709 if (this->Shared()->srvDiscovery->Publishers(
710 fullyQualifiedTopic, addresses))
712 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
713 T1().GetTypeName(), T2().GetTypeName());
718 if (!this->Shared()->srvDiscovery->Discover(fullyQualifiedTopic))
720 std::cerr <<
"Node::Request(): Error discovering a service. " 721 <<
"Did you forget to start the discovery service?" 742 const std::string &_topic,
743 std::function<
void(
const T &_rep,
const bool _result)> &_cb)
746 return this->Request(_topic, req, _cb);
760 public:
template<
typename C,
typename T1,
typename T2>
bool Request(
761 const std::string &_topic,
763 void(C::*_cb)(
const T2 &_rep,
const bool _result),
766 std::function<void(const T2 &, const bool)> f =
767 [_cb, _obj](
const T2 &_internalRep,
const bool _internalResult)
769 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
770 std::placeholders::_2);
771 cb(_internalRep, _internalResult);
774 return this->Request<T1, T2>(_topic, _req, f);
788 public:
template<
typename C,
typename T>
bool Request(
789 const std::string &_topic,
790 void(C::*_cb)(
const T &_rep,
const bool _result),
794 return this->Request(_topic, req, _cb, _obj);
805 public:
template<
typename T1,
typename T2>
bool Request(
806 const std::string &_topic,
808 const unsigned int &_timeout,
812 std::string fullyQualifiedTopic;
814 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
816 std::cerr <<
"Service [" << _topic <<
"] is not valid." << std::endl;
821 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
825 reqHandlerPtr->SetMessage(_req);
827 std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
831 if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
832 T1().GetTypeName(), T2().GetTypeName(), repHandler))
835 repHandler->RunLocalCallback(_req, _rep, _result);
840 this->Shared()->requests.AddHandler(
841 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
845 if (this->Shared()->srvDiscovery->Publishers(
846 fullyQualifiedTopic, addresses))
848 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
849 T1().GetTypeName(), T2().GetTypeName());
854 if (!this->Shared()->srvDiscovery->Discover(fullyQualifiedTopic))
856 std::cerr <<
"Node::Request(): Error discovering a service. " 857 <<
"Did you forget to start the discovery service?" 864 bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
871 if (!reqHandlerPtr->Result())
878 if (!_rep.ParseFromString(reqHandlerPtr->Response()))
880 std::cerr <<
"Node::Request(): Error Parsing the response" 899 const std::string &_topic,
900 const unsigned int &_timeout,
905 return this->Request(_topic, req, _timeout, _rep, _result);
912 public:
template<
typename T>
bool Request(
const std::string &_topic,
917 std::function<void(const ignition::msgs::Empty &, const bool)> f =
918 [](
const ignition::msgs::Empty &,
const bool)
922 return this->Request<T, ignition::msgs::Empty>(_topic, _req, f);
928 public:
bool UnadvertiseSrv(
const std::string &_topic);
936 public:
void TopicList(std::vector<std::string> &_topics)
const;
942 public:
bool TopicInfo(
const std::string &_topic,
943 std::vector<MessagePublisher> &_publishers)
const;
951 public:
void ServiceList(std::vector<std::string> &_services)
const;
957 public:
bool ServiceInfo(
const std::string &_service,
958 std::vector<ServicePublisher> &_publishers)
const;
962 private:
const std::string &Partition()
const;
966 private:
const std::string &NameSpace()
const;
975 private:
const std::string &NodeUuid()
const;
979 private: std::unordered_set<std::string> &TopicsAdvertised()
const;
983 private: std::unordered_set<std::string> &TopicsSubscribed()
const;
987 private: std::unordered_set<std::string> &SrvsAdvertised()
const;
998 private:
bool PublishHelper(
const std::string &_topic,
1003 protected: std::unique_ptr<transport::NodePrivate>
dataPtr;
bool Advertise(const std::string &_topic, void(*_cb)(T &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without input parameter.
Definition: Node.hh:358
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
bool Request(const std::string &_topic, std::function< void(const T &_rep, const bool _result)> &_cb)
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:741
bool Advertise(const std::string &_topic, void(*_cb)(const T &_req), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:382
Node::PublisherId Advertise(const std::string &_topic, const std::string &_msgTypeName, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:149
bool Request(const std::string &_topic, void(C::*_cb)(const T &_rep, const bool _result), C *_obj)
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:788
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:614
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
Node::PublisherId Advertise(const std::string &_topic, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:131
IGNITION_TRANSPORT_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Advertise(const std::string &_topic, void(C::*_cb)(const T &_req), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:582
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg))
Subscribe to a topic registering a callback.
Definition: Node.hh:226
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:50
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:409
Private data for the Node class.
Definition: NodeShared.hh:53
Addresses_M< ServicePublisher > SrvAddresses_M
Definition: TransportTypes.hh:60
bool Request(const std::string &_topic, void(*_cb)(const T &_rep, const bool _result))
Request a new service without input parameter using a non-blocking call.
Definition: Node.hh:638
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:64
bool Advertise(const std::string &_topic, void(C::*_cb)(T &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without input parameter.
Definition: Node.hh:553
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:84
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:656
bool Request(const std::string &_topic, const unsigned int &_timeout, T &_rep, bool &_result)
Request a new service without input parameter using a blocking call.
Definition: Node.hh:898
bool Advertise(const std::string &_topic, std::function< void(T &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without input parameter.
Definition: Node.hh:470
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:522
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb)
Subscribe to a topic registering a callback.
Definition: Node.hh:244
with the service response.
Definition: RepHandler.hh:98
std::unique_ptr< transport::NodePrivate > dataPtr
Definition: Node.hh:1003
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:55
A class that allows a client to communicate with other peers.
Definition: Node.hh:72
ignition/transport/Publisher.hh
Definition: Publisher.hh:175
ignition/transport/Publisher.hh
Definition: Publisher.hh:264
A class that is used to store information about an advertised publisher.
Definition: Node.hh:86
bool Advertise(const std::string &_topic, std::function< void(const T &_req)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service without any output parameter.
Definition: Node.hh:494
bool Request(const std::string &_topic, const T &_req)
Request a new service without waiting for response.
Definition: Node.hh:912
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:333
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:102
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:805
Definition: AdvertiseOptions.hh:25
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:760
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj)
Subscribe to a topic registering a callback.
Definition: Node.hh:295