18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
31 #include <sys/types.h>
33 #include <sys/socket.h>
37 #include <arpa/inet.h>
41 #include <netinet/in.h>
47 #pragma warning(push, 0)
53 #pragma warning(disable: 4503)
55 #pragma warning(disable: 4996)
59 #include <condition_variable>
86 template<
typename Pub>
96 const bool _verbose =
false)
100 silenceInterval(kDefSilenceInterval),
101 activityInterval(kDefActivityInterval),
102 heartbeatInterval(kDefHeartbeatInterval),
103 connectionCb(nullptr),
104 disconnectionCb(nullptr),
107 numHeartbeatsUninitialized(0),
112 if (
env(
"IGN_IP", ignIp) && !ignIp.empty())
113 this->hostInterfaces = {ignIp};
121 WORD wVersionRequested;
125 wVersionRequested = MAKEWORD(2, 2);
127 if (WSAStartup(wVersionRequested, &wsaData) != 0)
129 std::cerr <<
"Unable to load WinSock DLL" << std::endl;
133 for (
const auto &netIface : this->hostInterfaces)
135 auto succeed = this->RegisterNetIface(netIface);
140 if (netIface == this->hostAddr && !succeed)
142 this->RegisterNetIface(
"127.0.0.1");
143 std::cerr <<
"Did you set the environment variable IGN_IP with a "
144 <<
"correct IP address? " << std::endl
145 <<
" [" << netIface <<
"] seems an invalid local IP "
146 <<
"address." << std::endl
147 <<
" Using 127.0.0.1 as hostname." << std::endl;
148 this->hostAddr =
"127.0.0.1";
157 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
158 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
160 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
171 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
172 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
174 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
180 sockaddr_in localAddr;
181 memset(&localAddr, 0,
sizeof(localAddr));
182 localAddr.sin_family = AF_INET;
183 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
184 localAddr.sin_port = htons(static_cast<u_short>(this->port));
186 if (bind(this->sockets.at(0),
187 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
189 std::cerr <<
"Binding to a local port failed." << std::endl;
194 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
195 this->mcastAddr.sin_family = AF_INET;
196 this->mcastAddr.sin_addr.s_addr =
197 inet_addr(this->kMulticastGroup.c_str());
198 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
201 this->PrintCurrentState();
208 this->exitMutex.lock();
210 this->exitMutex.unlock();
218 if (this->threadReception.joinable())
219 this->threadReception.join();
221 bool exitLoop =
false;
224 std::lock_guard<std::mutex> lock(this->exitMutex);
226 if (this->threadReceptionExiting)
231 std::this_thread::sleep_for(std::chrono::milliseconds(50));
240 for (
const auto &sock : this->sockets)
257 std::lock_guard<std::mutex> lock(this->mutex);
263 this->enabled =
true;
266 auto now = std::chrono::steady_clock::now();
267 this->timeNextHeartbeat = now;
268 this->timeNextActivity = now;
271 this->threadReception = std::thread(&Discovery::RecvMessages,
this);
274 this->threadReceptionExiting =
false;
275 this->threadReception.detach();
286 std::lock_guard<std::mutex> lock(this->mutex);
292 if (!this->info.AddPublisher(_publisher))
299 this->SendMsg(
AdvType, _publisher);
314 public:
bool Discover(
const std::string &_topic)
const
321 std::lock_guard<std::mutex> lock(this->mutex);
326 cb = this->connectionCb;
330 pub.SetTopic(_topic);
331 pub.SetPUuid(this->pUuid);
337 std::lock_guard<std::mutex> lock(this->mutex);
338 found = this->info.Publishers(_topic, addresses);
344 for (
const auto &proc : addresses)
346 for (
const auto &node : proc.second)
366 std::lock_guard<std::mutex> lock(this->mutex);
377 std::lock_guard<std::mutex> lock(this->mutex);
378 return this->info.Publishers(_topic, _publishers);
389 const std::string &_nUuid)
393 std::lock_guard<std::mutex> lock(this->mutex);
399 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
403 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
418 std::lock_guard<std::mutex> lock(this->mutex);
419 return this->hostAddr;
428 std::lock_guard<std::mutex> lock(this->mutex);
429 return this->activityInterval;
439 std::lock_guard<std::mutex> lock(this->mutex);
440 return this->heartbeatInterval;
449 std::lock_guard<std::mutex> lock(this->mutex);
450 return this->silenceInterval;
458 std::lock_guard<std::mutex> lock(this->mutex);
459 this->activityInterval = _ms;
467 std::lock_guard<std::mutex> lock(this->mutex);
468 this->heartbeatInterval = _ms;
476 std::lock_guard<std::mutex> lock(this->mutex);
477 this->silenceInterval = _ms;
486 std::lock_guard<std::mutex> lock(this->mutex);
487 this->connectionCb = _cb;
496 std::lock_guard<std::mutex> lock(this->mutex);
497 this->disconnectionCb = _cb;
503 std::lock_guard<std::mutex> lock(this->mutex);
505 std::cout <<
"---------------" << std::endl;
506 std::cout << std::boolalpha <<
"Enabled: "
507 << this->enabled << std::endl;
508 std::cout <<
"Discovery state" << std::endl;
509 std::cout <<
"\tUUID: " << this->pUuid << std::endl;
510 std::cout <<
"Settings" << std::endl;
511 std::cout <<
"\tActivity: " << this->activityInterval
512 <<
" ms." << std::endl;
513 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
514 <<
"ms." << std::endl;
515 std::cout <<
"\tSilence: " << this->silenceInterval
516 <<
" ms." << std::endl;
517 std::cout <<
"Known information: " << std::endl;
521 Timestamp now = std::chrono::steady_clock::now();
523 std::cout <<
"Activity" << std::endl;
524 if (this->activity.empty())
525 std::cout <<
"\t<empty>" << std::endl;
528 for (
auto &proc : this->activity)
531 std::chrono::duration<double> elapsed = now - proc.second;
533 std::cout <<
"\t" << proc.first << std::endl;
534 std::cout <<
"\t\t" <<
"Since: " << std::chrono::duration_cast<
535 std::chrono::milliseconds>(elapsed).count() <<
" ms. ago. "
539 std::cout <<
"---------------" << std::endl;
544 public:
void TopicList(std::vector<std::string> &_topics)
const
547 std::lock_guard<std::mutex> lock(this->mutex);
548 this->info.TopicList(_topics);
555 std::unique_lock<std::mutex> lk(this->mutex);
557 if (!this->initialized)
559 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
566 private:
void UpdateActivity()
568 Timestamp now = std::chrono::steady_clock::now();
570 std::lock_guard<std::mutex> lock(this->mutex);
572 if (now < this->timeNextActivity)
575 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
578 auto elapsed = now - it->second;
581 if (std::chrono::duration_cast<std::chrono::milliseconds>
582 (elapsed).count() > this->silenceInterval)
585 this->info.DelPublishersByProc(it->first);
591 publisher.SetPUuid(it->first);
592 this->disconnectionCb(publisher);
595 this->activity.erase(it++);
601 this->timeNextActivity = std::chrono::steady_clock::now() +
602 std::chrono::milliseconds(this->activityInterval);
606 private:
void UpdateHeartbeat()
608 Timestamp now = std::chrono::steady_clock::now();
611 std::lock_guard<std::mutex> lock(this->mutex);
613 if (now < this->timeNextHeartbeat)
617 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
620 std::map<std::string, std::vector<Pub>> nodes;
622 std::lock_guard<std::mutex> lock(this->mutex);
625 this->info.PublishersByProc(this->pUuid, nodes);
628 for (
const auto &topic : nodes)
630 for (
const auto &node : topic.second)
635 std::lock_guard<std::mutex> lock(this->mutex);
636 if (!this->initialized)
638 ++this->numHeartbeatsUninitialized;
639 if (this->numHeartbeatsUninitialized == 2)
643 this->initialized =
true;
646 this->initializedCv.notify_all();
650 this->timeNextHeartbeat = std::chrono::steady_clock::now() +
651 std::chrono::milliseconds(this->heartbeatInterval);
664 private:
int NextTimeout()
const
666 auto now = std::chrono::steady_clock::now();
667 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
668 auto timeUntilNextActivity = this->timeNextActivity - now;
670 int t =
static_cast<int>(
671 std::chrono::duration_cast<std::chrono::milliseconds>
672 (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
673 int t2 = std::min(t, this->kTimeout);
674 return std::max(t2, 0);
678 private:
void RecvMessages()
680 bool timeToExit =
false;
684 zmq::pollitem_t items[] =
686 {0, this->sockets.at(0), ZMQ_POLLIN, 0},
690 int timeout = this->NextTimeout();
694 zmq::poll(&items[0],
sizeof(items) /
sizeof(items[0]), timeout);
702 if (items[0].revents & ZMQ_POLLIN)
704 this->RecvDiscoveryUpdate();
707 this->PrintCurrentState();
710 this->UpdateHeartbeat();
711 this->UpdateActivity();
715 std::lock_guard<std::mutex> lock(this->exitMutex);
721 std::lock_guard<std::mutex> lock(this->exitMutex);
722 this->threadReceptionExiting =
true;
727 private:
void RecvDiscoveryUpdate()
729 char rcvStr[Discovery::kMaxRcvStr];
732 sockaddr_in clntAddr;
733 socklen_t addrLen =
sizeof(clntAddr);
735 if ((recvfrom(this->sockets.at(0),
736 reinterpret_cast<raw_type *
>(rcvStr),
738 reinterpret_cast<sockaddr *>(&clntAddr),
739 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
741 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
745 srcAddr = inet_ntoa(clntAddr.sin_addr);
746 srcPort = ntohs(clntAddr.sin_port);
750 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": "
751 << srcPort << std::endl;
754 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
761 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
769 pBody += header.HeaderLength();
772 if (this->kWireVersion != header.Version())
775 auto recvPUuid = header.PUuid();
778 if (recvPUuid == this->pUuid)
782 DiscoveryCallback<Pub> connectCb;
783 DiscoveryCallback<Pub> disconnectCb;
785 std::lock_guard<std::mutex> lock(this->mutex);
786 this->activity[recvPUuid] = std::chrono::steady_clock::now();
787 connectCb = this->connectionCb;
788 disconnectCb = this->disconnectionCb;
791 switch (header.Type())
796 transport::AdvertiseMessage<Pub> advMsg;
797 advMsg.Unpack(pBody);
802 _fromIp != this->hostAddr))
810 std::lock_guard<std::mutex> lock(this->mutex);
811 added = this->info.AddPublisher(advMsg.Publisher());
814 if (added && connectCb)
817 connectCb(advMsg.Publisher());
825 SubscriptionMsg subMsg;
826 subMsg.Unpack(pBody);
827 auto recvTopic = subMsg.Topic();
830 Addresses_M<Pub> addresses;
832 std::lock_guard<std::mutex> lock(this->mutex);
833 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
838 if (!this->info.Publishers(recvTopic, addresses))
842 for (
const auto &nodeInfo : addresses[this->pUuid])
847 _fromIp != this->hostAddr))
853 this->SendMsg(
AdvType, nodeInfo);
867 std::lock_guard<std::mutex> lock(this->mutex);
868 this->activity.erase(recvPUuid);
874 pub.SetPUuid(recvPUuid);
881 std::lock_guard<std::mutex> lock(this->mutex);
882 this->info.DelPublishersByProc(recvPUuid);
890 transport::AdvertiseMessage<Pub> advMsg;
891 advMsg.Unpack(pBody);
896 _fromIp != this->hostAddr))
904 disconnectCb(advMsg.Publisher());
909 std::lock_guard<std::mutex> lock(this->mutex);
910 this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
911 advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
918 std::cerr <<
"Unknown message type [" << header.Type() <<
"]\n";
930 private:
template<
typename T>
931 void SendMsg(
const uint8_t _type,
933 const uint16_t _flags = 0)
const
936 Header header(this->Version(), _pub.PUuid(), _type, _flags);
938 std::vector<char> buffer;
940 std::string topic = _pub.Topic();
948 transport::AdvertiseMessage<T> advMsg(header, _pub);
951 buffer.resize(advMsg.MsgLength());
952 advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
953 msgLength =
static_cast<int>(advMsg.MsgLength());
959 SubscriptionMsg subMsg(header, topic);
962 buffer.resize(subMsg.MsgLength());
963 subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
964 msgLength =
static_cast<int>(subMsg.MsgLength());
971 buffer.resize(header.HeaderLength());
972 header.Pack(reinterpret_cast<char*>(&buffer[0]));
973 msgLength = header.HeaderLength();
977 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
978 <<
" type [" << _type <<
"]" << std::endl;
984 for (
const auto &sock : this->Sockets())
986 if (sendto(sock, reinterpret_cast<const raw_type *>(
987 reinterpret_cast<unsigned char*>(&buffer[0])),
989 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
990 sizeof(*(this->MulticastAddr()))) != msgLength)
992 std::cerr <<
"Exception sending a message" << std::endl;
1000 <<
" msg [" << topic <<
"]" << std::endl;
1006 private:
const std::vector<int> &Sockets()
const
1008 return this->sockets;
1013 private:
const sockaddr_in *MulticastAddr()
const
1015 return &this->mcastAddr;
1020 private:
bool Verbose()
const
1022 return this->verbose;
1027 private: uint8_t Version()
const
1029 return this->kWireVersion;
1036 private:
bool RegisterNetIface(
const std::string &_ip)
1039 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1042 std::cerr <<
"Socket creation failed." << std::endl;
1049 struct in_addr ifAddr;
1050 ifAddr.s_addr = inet_addr(_ip.c_str());
1051 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1052 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1054 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1059 this->sockets.push_back(sock);
1064 struct ip_mreq group;
1065 group.imr_multiaddr.s_addr =
1066 inet_addr(this->kMulticastGroup.c_str());
1067 group.imr_interface.s_addr = inet_addr(_ip.c_str());
1068 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1069 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1071 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1082 private:
static const unsigned int kDefActivityInterval = 100;
1087 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1092 private:
static const unsigned int kDefSilenceInterval = 3000;
1095 private:
const std::string kMulticastGroup =
"224.0.0.7";
1098 private:
const int kTimeout = 250;
1101 private:
static const int kMaxRcvStr = 65536;
1105 private:
static const uint8_t kWireVersion = 8;
1111 private: std::string hostAddr;
1114 private: std::vector<std::string> hostInterfaces;
1117 private: std::string pUuid;
1122 private:
unsigned int silenceInterval;
1127 private:
unsigned int activityInterval;
1132 private:
unsigned int heartbeatInterval;
1135 private: DiscoveryCallback<Pub> connectionCb;
1138 private: DiscoveryCallback<Pub> disconnectionCb;
1141 private: TopicStorage<Pub> info;
1150 private:
bool verbose;
1153 private: std::vector<int> sockets;
1156 private: sockaddr_in mcastAddr;
1159 private:
mutable std::mutex mutex;
1162 private: std::thread threadReception;
1171 private: std::mutex exitMutex;
1177 private:
bool initialized;
1180 private:
unsigned int numHeartbeatsUninitialized;
1183 private:
mutable std::condition_variable initializedCv;
1189 private:
bool threadReceptionExiting =
true;
1194 private:
bool enabled;
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:205
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:416
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:94
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
bool Discover(const std::string &_topic) const
Request discovery information about a topic.
Definition: Discovery.hh:314
ignition/transport/Publisher.hh
Definition: Publisher.hh:37
std::map< std::string, std::vector< T >> Addresses_M
Definition: TransportTypes.hh:53
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:465
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events.
Definition: Discovery.hh:494
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:283
void raw_type
Definition: Discovery.hh:43
A discovery class that implements a distributed topic discovery protocol.
Definition: Discovery.hh:87
Topic/service only available to subscribers in the same process as the publisher. ...
static const uint8_t HeartbeatType
Definition: Packet.hh:38
void Start()
Start the discovery service.
Definition: Discovery.hh:254
static const uint8_t ByeType
Definition: Packet.hh:39
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:54
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:426
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:544
static const uint8_t SubType
Definition: Packet.hh:36
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:474
std::map< std::string, Timestamp > activity
Activity information.
Definition: Discovery.hh:1147
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events.
Definition: Discovery.hh:484
std::function< void(const T &_publisher)> DiscoveryCallback
Definition: TransportTypes.hh:105
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:364
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:456
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:57
IGNITION_TRANSPORT_VISIBLE std::string determineHost()
Determine IP or hostname.
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:501
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:374
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:447
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:437
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:129
void WaitForInit() const
Check if ready/initialized.
Definition: Discovery.hh:553
static const uint8_t AdvType
Definition: Packet.hh:35
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message.
Definition: Discovery.hh:388
IGNITION_TRANSPORT_VISIBLE std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine.
Topic/service only available to subscribers in the same machine as the publisher. ...
static const uint8_t UnadvType
Definition: Packet.hh:37
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.