18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 31 #include <sys/types.h> 33 #include <sys/socket.h> 37 #include <arpa/inet.h> 41 #include <netinet/in.h> 47 #pragma warning(push, 0) 53 #pragma warning(disable: 4503) 55 #pragma warning(disable: 4996) 59 #include <condition_variable> 86 template<
typename Pub>
96 const bool _verbose =
false)
100 silenceInterval(kDefSilenceInterval),
101 activityInterval(kDefActivityInterval),
102 heartbeatInterval(kDefHeartbeatInterval),
103 connectionCb(nullptr),
104 disconnectionCb(nullptr),
107 numHeartbeatsUninitialized(0),
112 if (
env(
"IGN_IP", ignIp) && !ignIp.empty())
113 this->hostInterfaces = {ignIp};
121 WORD wVersionRequested;
125 wVersionRequested = MAKEWORD(2, 2);
127 if (WSAStartup(wVersionRequested, &wsaData) != 0)
129 std::cerr <<
"Unable to load WinSock DLL" << std::endl;
133 for (
const auto &netIface : this->hostInterfaces)
135 auto succeed = this->RegisterNetIface(netIface);
140 if (netIface == this->hostAddr && !succeed)
142 this->RegisterNetIface(
"127.0.0.1");
143 std::cerr <<
"Did you set the environment variable IGN_IP with a " 144 <<
"correct IP address? " << std::endl
145 <<
" [" << netIface <<
"] seems an invalid local IP " 146 <<
"address." << std::endl
147 <<
" Using 127.0.0.1 as hostname." << std::endl;
148 this->hostAddr =
"127.0.0.1";
157 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
158 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
160 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 171 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
172 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
174 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 180 sockaddr_in localAddr;
181 memset(&localAddr, 0,
sizeof(localAddr));
182 localAddr.sin_family = AF_INET;
183 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
184 localAddr.sin_port = htons(static_cast<u_short>(this->port));
186 if (bind(this->sockets.at(0),
187 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
189 std::cerr <<
"Binding to a local port failed." << std::endl;
194 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
195 this->mcastAddr.sin_family = AF_INET;
196 this->mcastAddr.sin_addr.s_addr =
197 inet_addr(this->kMulticastGroup.c_str());
198 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
201 this->PrintCurrentState();
208 this->exitMutex.lock();
210 this->exitMutex.unlock();
218 if (this->threadReception.joinable())
219 this->threadReception.join();
221 bool exitLoop =
false;
224 std::lock_guard<std::mutex> lock(this->exitMutex);
226 if (this->threadReceptionExiting)
231 std::this_thread::sleep_for(std::chrono::milliseconds(50));
240 for (
const auto &sock : this->sockets)
257 std::lock_guard<std::mutex> lock(this->mutex);
263 this->enabled =
true;
266 auto now = std::chrono::steady_clock::now();
267 this->timeNextHeartbeat = now;
268 this->timeNextActivity = now;
271 this->threadReception = std::thread(&Discovery::RecvMessages,
this);
274 this->threadReceptionExiting =
false;
275 this->threadReception.detach();
286 std::lock_guard<std::mutex> lock(this->mutex);
292 if (!this->info.AddPublisher(_publisher))
299 this->SendMsg(
AdvType, _publisher);
314 public:
bool Discover(
const std::string &_topic)
const 321 std::lock_guard<std::mutex> lock(this->mutex);
326 cb = this->connectionCb;
330 pub.SetTopic(_topic);
331 pub.SetPUuid(this->pUuid);
338 std::lock_guard<std::mutex> lock(this->mutex);
339 found = this->info.Publishers(_topic, addresses);
345 for (
const auto &proc : addresses)
347 for (
const auto &node : proc.second)
367 std::lock_guard<std::mutex> lock(this->mutex);
378 std::lock_guard<std::mutex> lock(this->mutex);
379 return this->info.Publishers(_topic, _publishers);
390 const std::string &_nUuid)
394 std::lock_guard<std::mutex> lock(this->mutex);
400 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
404 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
419 std::lock_guard<std::mutex> lock(this->mutex);
420 return this->hostAddr;
429 std::lock_guard<std::mutex> lock(this->mutex);
430 return this->activityInterval;
440 std::lock_guard<std::mutex> lock(this->mutex);
441 return this->heartbeatInterval;
450 std::lock_guard<std::mutex> lock(this->mutex);
451 return this->silenceInterval;
459 std::lock_guard<std::mutex> lock(this->mutex);
460 this->activityInterval = _ms;
468 std::lock_guard<std::mutex> lock(this->mutex);
469 this->heartbeatInterval = _ms;
477 std::lock_guard<std::mutex> lock(this->mutex);
478 this->silenceInterval = _ms;
487 std::lock_guard<std::mutex> lock(this->mutex);
488 this->connectionCb = _cb;
497 std::lock_guard<std::mutex> lock(this->mutex);
498 this->disconnectionCb = _cb;
504 std::lock_guard<std::mutex> lock(this->mutex);
506 std::cout <<
"---------------" << std::endl;
507 std::cout << std::boolalpha <<
"Enabled: " 508 << this->enabled << std::endl;
509 std::cout <<
"Discovery state" << std::endl;
510 std::cout <<
"\tUUID: " << this->pUuid << std::endl;
511 std::cout <<
"Settings" << std::endl;
512 std::cout <<
"\tActivity: " << this->activityInterval
513 <<
" ms." << std::endl;
514 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
515 <<
"ms." << std::endl;
516 std::cout <<
"\tSilence: " << this->silenceInterval
517 <<
" ms." << std::endl;
518 std::cout <<
"Known information: " << std::endl;
522 Timestamp now = std::chrono::steady_clock::now();
524 std::cout <<
"Activity" << std::endl;
525 if (this->activity.empty())
526 std::cout <<
"\t<empty>" << std::endl;
529 for (
auto &proc : this->activity)
532 std::chrono::duration<double> elapsed = now - proc.second;
534 std::cout <<
"\t" << proc.first << std::endl;
535 std::cout <<
"\t\t" <<
"Since: " << std::chrono::duration_cast<
536 std::chrono::milliseconds>(elapsed).count() <<
" ms. ago. " 540 std::cout <<
"---------------" << std::endl;
545 public:
void TopicList(std::vector<std::string> &_topics)
const 548 std::lock_guard<std::mutex> lock(this->mutex);
549 this->info.TopicList(_topics);
556 std::unique_lock<std::mutex> lk(this->mutex);
558 if (!this->initialized)
560 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
567 private:
void UpdateActivity()
569 Timestamp now = std::chrono::steady_clock::now();
571 std::lock_guard<std::mutex> lock(this->mutex);
573 if (now < this->timeNextActivity)
576 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
579 auto elapsed = now - it->second;
582 if (std::chrono::duration_cast<std::chrono::milliseconds>
583 (elapsed).count() > this->silenceInterval)
586 this->info.DelPublishersByProc(it->first);
592 publisher.SetPUuid(it->first);
594 this->disconnectionCb(publisher);
597 this->activity.erase(it++);
603 this->timeNextActivity = std::chrono::steady_clock::now() +
604 std::chrono::milliseconds(this->activityInterval);
608 private:
void UpdateHeartbeat()
610 Timestamp now = std::chrono::steady_clock::now();
613 std::lock_guard<std::mutex> lock(this->mutex);
615 if (now < this->timeNextHeartbeat)
622 std::map<std::string, std::vector<Pub>> nodes;
624 std::lock_guard<std::mutex> lock(this->mutex);
627 this->info.PublishersByProc(this->pUuid, nodes);
630 for (
const auto &topic : nodes)
632 for (
const auto &node : topic.second)
637 std::lock_guard<std::mutex> lock(this->mutex);
638 if (!this->initialized)
640 ++this->numHeartbeatsUninitialized;
641 if (this->numHeartbeatsUninitialized == 2)
645 this->initialized =
true;
648 this->initializedCv.notify_all();
652 this->timeNextHeartbeat = std::chrono::steady_clock::now() +
653 std::chrono::milliseconds(this->heartbeatInterval);
666 private:
int NextTimeout()
const 668 auto now = std::chrono::steady_clock::now();
669 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
670 auto timeUntilNextActivity = this->timeNextActivity - now;
672 int t =
static_cast<int>(
673 std::chrono::duration_cast<std::chrono::milliseconds>
674 (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
675 int t2 = std::min(t, this->kTimeout);
676 return std::max(t2, 0);
680 private:
void RecvMessages()
682 bool timeToExit =
false;
686 zmq::pollitem_t items[] =
688 {0, this->sockets.at(0), ZMQ_POLLIN, 0},
692 int timeout = this->NextTimeout();
696 zmq::poll(&items[0],
sizeof(items) /
sizeof(items[0]), timeout);
704 if (items[0].revents & ZMQ_POLLIN)
706 this->RecvDiscoveryUpdate();
709 this->PrintCurrentState();
712 this->UpdateHeartbeat();
713 this->UpdateActivity();
717 std::lock_guard<std::mutex> lock(this->exitMutex);
723 std::lock_guard<std::mutex> lock(this->exitMutex);
724 this->threadReceptionExiting =
true;
729 private:
void RecvDiscoveryUpdate()
731 char rcvStr[Discovery::kMaxRcvStr];
734 sockaddr_in clntAddr;
735 socklen_t addrLen =
sizeof(clntAddr);
737 if ((recvfrom(this->sockets.at(0),
738 reinterpret_cast<raw_type *
>(rcvStr),
740 reinterpret_cast<sockaddr *>(&clntAddr),
741 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
743 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 747 srcAddr = inet_ntoa(clntAddr.sin_addr);
748 srcPort = ntohs(clntAddr.sin_port);
752 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": " 753 << srcPort << std::endl;
756 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
763 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
774 if (this->kWireVersion != header.
Version())
777 auto recvPUuid = header.
PUuid();
780 if (recvPUuid == this->pUuid)
787 std::lock_guard<std::mutex> lock(this->mutex);
788 this->activity[recvPUuid] = std::chrono::steady_clock::now();
789 connectCb = this->connectionCb;
790 disconnectCb = this->disconnectionCb;
793 switch (header.
Type())
804 _fromIp != this->hostAddr))
812 std::lock_guard<std::mutex> lock(this->mutex);
813 added = this->info.AddPublisher(advMsg.
Publisher());
816 if (added && connectCb)
829 auto recvTopic = subMsg.
Topic();
834 std::lock_guard<std::mutex> lock(this->mutex);
835 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
840 if (!this->info.Publishers(recvTopic, addresses))
844 for (
const auto &nodeInfo : addresses[this->pUuid])
849 _fromIp != this->hostAddr))
855 this->SendMsg(
AdvType, nodeInfo);
869 std::lock_guard<std::mutex> lock(this->mutex);
870 this->activity.erase(recvPUuid);
876 pub.SetPUuid(recvPUuid);
884 std::lock_guard<std::mutex> lock(this->mutex);
885 this->info.DelPublishersByProc(recvPUuid);
899 _fromIp != this->hostAddr))
912 std::lock_guard<std::mutex> lock(this->mutex);
913 this->info.DelPublisherByNode(advMsg.
Publisher().Topic(),
921 std::cerr <<
"Unknown message type [" << header.
Type() <<
"]\n";
933 private:
template<
typename T>
934 void SendMsg(
const uint8_t _type,
936 const uint16_t _flags = 0)
const 939 Header header(this->Version(), _pub.PUuid(), _type, _flags);
941 std::vector<char> buffer;
943 std::string topic = _pub.Topic();
955 advMsg.
Pack(reinterpret_cast<char*>(&buffer[0]));
956 msgLength =
static_cast<int>(advMsg.
MsgLength());
966 subMsg.
Pack(reinterpret_cast<char*>(&buffer[0]));
967 msgLength =
static_cast<int>(subMsg.
MsgLength());
974 buffer.resize(header.HeaderLength());
975 header.Pack(reinterpret_cast<char*>(&buffer[0]));
976 msgLength = header.HeaderLength();
980 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 981 <<
" type [" << _type <<
"]" << std::endl;
987 for (
const auto &sock : this->Sockets())
989 if (sendto(sock, reinterpret_cast<const raw_type *>(
990 reinterpret_cast<unsigned char*>(&buffer[0])),
992 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
993 sizeof(*(this->MulticastAddr()))) != msgLength)
995 std::cerr <<
"Exception sending a message" << std::endl;
1000 if (this->Verbose())
1003 <<
" msg [" << topic <<
"]" << std::endl;
1009 private:
const std::vector<int> &Sockets()
const 1011 return this->sockets;
1016 private:
const sockaddr_in *MulticastAddr()
const 1018 return &this->mcastAddr;
1023 private:
bool Verbose()
const 1025 return this->verbose;
1030 private: uint8_t Version()
const 1032 return this->kWireVersion;
1039 private:
bool RegisterNetIface(
const std::string &_ip)
1042 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1045 std::cerr <<
"Socket creation failed." << std::endl;
1052 struct in_addr ifAddr;
1053 ifAddr.s_addr = inet_addr(_ip.c_str());
1054 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1055 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1057 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1062 this->sockets.push_back(sock);
1067 struct ip_mreq group;
1068 group.imr_multiaddr.s_addr =
1069 inet_addr(this->kMulticastGroup.c_str());
1070 group.imr_interface.s_addr = inet_addr(_ip.c_str());
1071 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1072 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1074 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1085 private:
static const unsigned int kDefActivityInterval = 100;
1090 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1095 private:
static const unsigned int kDefSilenceInterval = 3000;
1098 private:
const std::string kMulticastGroup =
"224.0.0.7";
1101 private:
const int kTimeout = 250;
1104 private:
static const int kMaxRcvStr = 65536;
1108 private:
static const uint8_t kWireVersion = 6;
1114 private: std::string hostAddr;
1117 private: std::vector<std::string> hostInterfaces;
1120 private: std::string pUuid;
1125 private:
unsigned int silenceInterval;
1130 private:
unsigned int activityInterval;
1135 private:
unsigned int heartbeatInterval;
1153 private:
bool verbose;
1156 private: std::vector<int> sockets;
1159 private: sockaddr_in mcastAddr;
1162 private:
mutable std::mutex mutex;
1165 private: std::thread threadReception;
1174 private: std::mutex exitMutex;
1180 private:
bool initialized;
1183 private:
unsigned int numHeartbeatsUninitialized;
1186 private:
mutable std::condition_variable initializedCv;
1192 private:
bool threadReceptionExiting =
true;
1197 private:
bool enabled;
std::string Topic() const
Get the topic.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:205
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:417
Topic/service available to any subscriber (default scope).
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:94
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
bool Discover(const std::string &_topic) const
Request discovery information about a topic.
Definition: Discovery.hh:314
ignition/transport/Publisher.hh
Definition: Publisher.hh:35
std::map< std::string, std::vector< T >> Addresses_M
Definition: TransportTypes.hh:52
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:158
size_t MsgLength() const
Get the total length of the message.
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:466
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events.
Definition: Discovery.hh:495
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:283
void raw_type
Definition: Discovery.hh:43
A discovery class that implements a distributed topic discovery protocol.
Definition: Discovery.hh:87
Topic/service only available to subscribers in the same process as the publisher. ...
static const uint8_t HeartbeatType
Definition: Packet.hh:38
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:287
void Start()
Start the discovery service.
Definition: Discovery.hh:254
static const uint8_t ByeType
Definition: Packet.hh:39
size_t Unpack(char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:306
size_t Pack(char *_buffer) const
Serialize the subscription message.
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:427
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:545
static const uint8_t SubType
Definition: Packet.hh:36
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:475
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:230
std::map< std::string, Timestamp > activity
Activity information.
Definition: Discovery.hh:1150
size_t Unpack(char *_buffer)
Unserialize a stream of bytes into a Sub.
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events.
Definition: Discovery.hh:485
std::function< void(const T &_publisher)> DiscoveryCallback
Definition: TransportTypes.hh:104
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:365
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:457
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:55
IGNITION_TRANSPORT_VISIBLE std::string determineHost()
Determine IP or hostname.
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:502
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:375
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:448
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:438
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:279
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:120
void WaitForInit() const
Check if ready/initialized.
Definition: Discovery.hh:554
Definition: AdvertiseOptions.hh:25
static const uint8_t AdvType
Definition: Packet.hh:35
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message.
Definition: Discovery.hh:389
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:256
IGNITION_TRANSPORT_VISIBLE std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine.
Topic/service only available to subscribers in the same machine as the publisher. ...
static const uint8_t UnadvType
Definition: Packet.hh:37
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.