All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 
21 #ifdef _WIN32
22  // For socket(), connect(), send(), and recv().
23  #include <Winsock2.h>
24  #include <Ws2def.h>
25  #include <Ws2ipdef.h>
26  #include <Ws2tcpip.h>
27  // Type used for raw data on this platform.
28  using raw_type = char;
29 #else
30  // For data types
31  #include <sys/types.h>
32  // For socket(), connect(), send(), and recv()
33  #include <sys/socket.h>
34  // For gethostbyname()
35  #include <netdb.h>
36  // For inet_addr()
37  #include <arpa/inet.h>
38  // For close()
39  #include <unistd.h>
40  // For sockaddr_in
41  #include <netinet/in.h>
42  // Type used for raw data on this platform
43  using raw_type = void;
44 #endif
45 
46 #ifdef _WIN32
47  #pragma warning(push, 0)
48 #endif
49 #include <zmq.hpp>
50 #ifdef _WIN32
51  #pragma warning(pop)
52  // Suppress "decorated name length exceed" warning in STL.
53  #pragma warning(disable: 4503)
54  // Suppress "depreted API warnings" in WINSOCK.
55  #pragma warning(disable: 4996)
56 #endif
57 
58 #include <algorithm>
59 #include <condition_variable>
60 #include <map>
61 #include <memory>
62 #include <mutex>
63 #include <string>
64 #include <thread>
65 #include <vector>
66 
73 
74 namespace ignition
75 {
76  namespace transport
77  {
86  template<typename Pub>
88  {
94  public: Discovery(const std::string &_pUuid,
95  const int _port,
96  const bool _verbose = false)
97  : port(_port),
98  hostAddr(determineHost()),
99  pUuid(_pUuid),
100  silenceInterval(kDefSilenceInterval),
101  activityInterval(kDefActivityInterval),
102  heartbeatInterval(kDefHeartbeatInterval),
103  connectionCb(nullptr),
104  disconnectionCb(nullptr),
105  verbose(_verbose),
106  initialized(false),
107  numHeartbeatsUninitialized(0),
108  exit(false),
109  enabled(false)
110  {
111  std::string ignIp;
112  if (env("IGN_IP", ignIp) && !ignIp.empty())
113  this->hostInterfaces = {ignIp};
114  else
115  {
116  // Get the list of network interfaces in this host.
117  this->hostInterfaces = determineInterfaces();
118  }
119 
120 #ifdef _WIN32
121  WORD wVersionRequested;
122  WSADATA wsaData;
123 
124  // Request WinSock v2.2.
125  wVersionRequested = MAKEWORD(2, 2);
126  // Load WinSock DLL.
127  if (WSAStartup(wVersionRequested, &wsaData) != 0)
128  {
129  std::cerr << "Unable to load WinSock DLL" << std::endl;
130  return;
131  }
132 #endif
133  for (const auto &netIface : this->hostInterfaces)
134  {
135  auto succeed = this->RegisterNetIface(netIface);
136 
137  // If the IP address that we're selecting as the main IP address of
138  // the host is invalid, we change it to 127.0.0.1 .
139  // This is probably because IGN_IP is set to a wrong value.
140  if (netIface == this->hostAddr && !succeed)
141  {
142  this->RegisterNetIface("127.0.0.1");
143  std::cerr << "Did you set the environment variable IGN_IP with a "
144  << "correct IP address? " << std::endl
145  << " [" << netIface << "] seems an invalid local IP "
146  << "address." << std::endl
147  << " Using 127.0.0.1 as hostname." << std::endl;
148  this->hostAddr = "127.0.0.1";
149  }
150  }
151 
152  // Socket option: SO_REUSEADDR. This options is used only for receiving
153  // data. We can reuse the same socket for receiving multicast data from
154  // multiple interfaces. We will use the socket at position 0 for
155  // receiving data.
156  int reuseAddr = 1;
157  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
158  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
159  {
160  std::cerr << "Error setting socket option (SO_REUSEADDR)."
161  << std::endl;
162  return;
163  }
164 
165 #ifdef SO_REUSEPORT
166  // Socket option: SO_REUSEPORT. This options is used only for receiving
167  // data. We can reuse the same socket for receiving multicast data from
168  // multiple interfaces. We will use the socket at position 0 for
169  // receiving data.
170  int reusePort = 1;
171  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
172  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
173  {
174  std::cerr << "Error setting socket option (SO_REUSEPORT)."
175  << std::endl;
176  return;
177  }
178 #endif
179  // Bind the first socket to the discovery port.
180  sockaddr_in localAddr;
181  memset(&localAddr, 0, sizeof(localAddr));
182  localAddr.sin_family = AF_INET;
183  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
184  localAddr.sin_port = htons(static_cast<u_short>(this->port));
185 
186  if (bind(this->sockets.at(0),
187  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
188  {
189  std::cerr << "Binding to a local port failed." << std::endl;
190  return;
191  }
192 
193  // Set 'mcastAddr' to the multicast discovery group.
194  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
195  this->mcastAddr.sin_family = AF_INET;
196  this->mcastAddr.sin_addr.s_addr =
197  inet_addr(this->kMulticastGroup.c_str());
198  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
199 
200  if (this->verbose)
201  this->PrintCurrentState();
202  }
203 
205  public: virtual ~Discovery()
206  {
207  // Tell the service thread to terminate.
208  this->exitMutex.lock();
209  this->exit = true;
210  this->exitMutex.unlock();
211 
212  // Don't join on Windows, because it can hang when this object
213  // is destructed on process exit (e.g., when it's a global static).
214  // I think that it's due to this bug:
215  // https://connect.microsoft.com/VisualStudio/feedback/details/747145/std-thread-join-hangs-if-called-after-main-exits-when-using-vs2012-rc
216 #ifndef _WIN32
217  // Wait for the service threads to finish before exit.
218  if (this->threadReception.joinable())
219  this->threadReception.join();
220 #else
221  bool exitLoop = false;
222  while (!exitLoop)
223  {
224  std::lock_guard<std::mutex> lock(this->exitMutex);
225  {
226  if (this->threadReceptionExiting)
227  {
228  exitLoop = true;
229  }
230  }
231  std::this_thread::sleep_for(std::chrono::milliseconds(50));
232  }
233 #endif
234  // Broadcast a BYE message to trigger the remote cancellation of
235  // all our advertised topics.
236  this->SendMsg(ByeType,
237  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
238 
239  // Close sockets.
240  for (const auto &sock : this->sockets)
241  {
242 #ifdef _WIN32
243  closesocket(sock);
244  WSACleanup();
245 #else
246  close(sock);
247 #endif
248  }
249  }
250 
254  public: void Start()
255  {
256  {
257  std::lock_guard<std::mutex> lock(this->mutex);
258 
259  // The service is already running.
260  if (this->enabled)
261  return;
262 
263  this->enabled = true;
264  }
265 
266  auto now = std::chrono::steady_clock::now();
267  this->timeNextHeartbeat = now;
268  this->timeNextActivity = now;
269 
270  // Start the thread that receives discovery information.
271  this->threadReception = std::thread(&Discovery::RecvMessages, this);
272 
273 #ifdef _WIN32
274  this->threadReceptionExiting = false;
275  this->threadReception.detach();
276 #endif
277  }
278 
283  public: bool Advertise(const Pub &_publisher)
284  {
285  {
286  std::lock_guard<std::mutex> lock(this->mutex);
287 
288  if (!this->enabled)
289  return false;
290 
291  // Add the addressing information (local publisher).
292  if (!this->info.AddPublisher(_publisher))
293  return false;
294  }
295 
296  // Only advertise a message outside this process if the scope
297  // is not 'Process'
298  if (_publisher.Options().Scope() != Scope_t::PROCESS)
299  this->SendMsg(AdvType, _publisher);
300 
301  return true;
302  }
303 
314  public: bool Discover(const std::string &_topic) const
315  {
317  bool found;
318  Addresses_M<Pub> addresses;
319 
320  {
321  std::lock_guard<std::mutex> lock(this->mutex);
322 
323  if (!this->enabled)
324  return false;
325 
326  cb = this->connectionCb;
327  }
328 
329  Pub pub;
330  pub.SetTopic(_topic);
331  pub.SetPUuid(this->pUuid);
332 
333  // Send a discovery request.
334  this->SendMsg(SubType, pub);
335 
336  {
337  std::lock_guard<std::mutex> lock(this->mutex);
338  found = this->info.Publishers(_topic, addresses);
339  }
340 
341  if (found)
342  {
343  // I already have information about this topic.
344  for (const auto &proc : addresses)
345  {
346  for (const auto &node : proc.second)
347  {
348  if (cb)
349  {
350  // Execute the user's callback for a service request. Notice
351  // that we only execute one callback for preventing receive
352  // multiple service responses for a single request.
353  cb(node);
354  }
355  }
356  }
357  }
358 
359  return true;
360  }
361 
364  public: const TopicStorage<Pub> &Info() const
365  {
366  std::lock_guard<std::mutex> lock(this->mutex);
367  return this->info;
368  }
369 
374  public: bool Publishers(const std::string &_topic,
375  Addresses_M<Pub> &_publishers) const
376  {
377  std::lock_guard<std::mutex> lock(this->mutex);
378  return this->info.Publishers(_topic, _publishers);
379  }
380 
388  public: bool Unadvertise(const std::string &_topic,
389  const std::string &_nUuid)
390  {
391  Pub inf;
392  {
393  std::lock_guard<std::mutex> lock(this->mutex);
394 
395  if (!this->enabled)
396  return false;
397 
398  // Don't do anything if the topic is not advertised by any of my nodes
399  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
400  return true;
401 
402  // Remove the topic information.
403  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
404  }
405 
406  // Only unadvertise a message outside this process if the scope
407  // is not 'Process'.
408  if (inf.Options().Scope() != Scope_t::PROCESS)
409  this->SendMsg(UnadvType, inf);
410 
411  return true;
412  }
413 
416  public: std::string HostAddr() const
417  {
418  std::lock_guard<std::mutex> lock(this->mutex);
419  return this->hostAddr;
420  }
421 
426  public: unsigned int ActivityInterval() const
427  {
428  std::lock_guard<std::mutex> lock(this->mutex);
429  return this->activityInterval;
430  }
431 
437  public: unsigned int HeartbeatInterval() const
438  {
439  std::lock_guard<std::mutex> lock(this->mutex);
440  return this->heartbeatInterval;
441  }
442 
447  public: unsigned int SilenceInterval() const
448  {
449  std::lock_guard<std::mutex> lock(this->mutex);
450  return this->silenceInterval;
451  }
452 
456  public: void SetActivityInterval(const unsigned int _ms)
457  {
458  std::lock_guard<std::mutex> lock(this->mutex);
459  this->activityInterval = _ms;
460  }
461 
465  public: void SetHeartbeatInterval(const unsigned int _ms)
466  {
467  std::lock_guard<std::mutex> lock(this->mutex);
468  this->heartbeatInterval = _ms;
469  }
470 
474  public: void SetSilenceInterval(const unsigned int _ms)
475  {
476  std::lock_guard<std::mutex> lock(this->mutex);
477  this->silenceInterval = _ms;
478  }
479 
484  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
485  {
486  std::lock_guard<std::mutex> lock(this->mutex);
487  this->connectionCb = _cb;
488  }
489 
494  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
495  {
496  std::lock_guard<std::mutex> lock(this->mutex);
497  this->disconnectionCb = _cb;
498  }
499 
501  public: void PrintCurrentState() const
502  {
503  std::lock_guard<std::mutex> lock(this->mutex);
504 
505  std::cout << "---------------" << std::endl;
506  std::cout << std::boolalpha << "Enabled: "
507  << this->enabled << std::endl;
508  std::cout << "Discovery state" << std::endl;
509  std::cout << "\tUUID: " << this->pUuid << std::endl;
510  std::cout << "Settings" << std::endl;
511  std::cout << "\tActivity: " << this->activityInterval
512  << " ms." << std::endl;
513  std::cout << "\tHeartbeat: " << this->heartbeatInterval
514  << "ms." << std::endl;
515  std::cout << "\tSilence: " << this->silenceInterval
516  << " ms." << std::endl;
517  std::cout << "Known information: " << std::endl;
518  this->info.Print();
519 
520  // Used to calculate the elapsed time.
521  Timestamp now = std::chrono::steady_clock::now();
522 
523  std::cout << "Activity" << std::endl;
524  if (this->activity.empty())
525  std::cout << "\t<empty>" << std::endl;
526  else
527  {
528  for (auto &proc : this->activity)
529  {
530  // Elapsed time since the last update from this publisher.
531  std::chrono::duration<double> elapsed = now - proc.second;
532 
533  std::cout << "\t" << proc.first << std::endl;
534  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
535  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
536  << std::endl;
537  }
538  }
539  std::cout << "---------------" << std::endl;
540  }
541 
544  public: void TopicList(std::vector<std::string> &_topics) const
545  {
546  this->WaitForInit();
547  std::lock_guard<std::mutex> lock(this->mutex);
548  this->info.TopicList(_topics);
549  }
550 
553  public: void WaitForInit() const
554  {
555  std::unique_lock<std::mutex> lk(this->mutex);
556 
557  if (!this->initialized)
558  {
559  this->initializedCv.wait(lk, [this]{return this->initialized;});
560  }
561  }
562 
566  private: void UpdateActivity()
567  {
568  Timestamp now = std::chrono::steady_clock::now();
569 
570  std::lock_guard<std::mutex> lock(this->mutex);
571 
572  if (now < this->timeNextActivity)
573  return;
574 
575  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
576  {
577  // Elapsed time since the last update from this publisher.
578  auto elapsed = now - it->second;
579 
580  // This publisher has expired.
581  if (std::chrono::duration_cast<std::chrono::milliseconds>
582  (elapsed).count() > this->silenceInterval)
583  {
584  // Remove all the info entries for this process UUID.
585  this->info.DelPublishersByProc(it->first);
586 
587  // Notify without topic information. This is useful to inform the
588  // client that a remote node is gone, even if we were not
589  // interested in its topics.
590  Pub publisher;
591  publisher.SetPUuid(it->first);
592  this->disconnectionCb(publisher);
593 
594  // Remove the activity entry.
595  this->activity.erase(it++);
596  }
597  else
598  ++it;
599  }
600 
601  this->timeNextActivity = std::chrono::steady_clock::now() +
602  std::chrono::milliseconds(this->activityInterval);
603  }
604 
606  private: void UpdateHeartbeat()
607  {
608  Timestamp now = std::chrono::steady_clock::now();
609 
610  {
611  std::lock_guard<std::mutex> lock(this->mutex);
612 
613  if (now < this->timeNextHeartbeat)
614  return;
615  }
616 
617  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
618  this->SendMsg(HeartbeatType, pub);
619 
620  std::map<std::string, std::vector<Pub>> nodes;
621  {
622  std::lock_guard<std::mutex> lock(this->mutex);
623 
624  // Re-advertise topics that are advertised inside this process.
625  this->info.PublishersByProc(this->pUuid, nodes);
626  }
627 
628  for (const auto &topic : nodes)
629  {
630  for (const auto &node : topic.second)
631  this->SendMsg(AdvType, node);
632  }
633 
634  {
635  std::lock_guard<std::mutex> lock(this->mutex);
636  if (!this->initialized)
637  {
638  ++this->numHeartbeatsUninitialized;
639  if (this->numHeartbeatsUninitialized == 2)
640  {
641  // We consider the discovery initialized after two cycles of
642  // heartbeats sent.
643  this->initialized = true;
644 
645  // Notify anyone waiting for the initialization phase to finish.
646  this->initializedCv.notify_all();
647  }
648  }
649 
650  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
651  std::chrono::milliseconds(this->heartbeatInterval);
652  }
653  }
654 
664  private: int NextTimeout() const
665  {
666  auto now = std::chrono::steady_clock::now();
667  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
668  auto timeUntilNextActivity = this->timeNextActivity - now;
669 
670  int t = static_cast<int>(
671  std::chrono::duration_cast<std::chrono::milliseconds>
672  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
673  int t2 = std::min(t, this->kTimeout);
674  return std::max(t2, 0);
675  }
676 
678  private: void RecvMessages()
679  {
680  bool timeToExit = false;
681  while (!timeToExit)
682  {
683  // Poll socket for a reply, with timeout.
684  zmq::pollitem_t items[] =
685  {
686  {0, this->sockets.at(0), ZMQ_POLLIN, 0},
687  };
688 
689  // Calculate the timeout.
690  int timeout = this->NextTimeout();
691 
692  try
693  {
694  zmq::poll(&items[0], sizeof(items) / sizeof(items[0]), timeout);
695  }
696  catch(...)
697  {
698  continue;
699  }
700 
701  // If we got a reply, process it.
702  if (items[0].revents & ZMQ_POLLIN)
703  {
704  this->RecvDiscoveryUpdate();
705 
706  if (this->verbose)
707  this->PrintCurrentState();
708  }
709 
710  this->UpdateHeartbeat();
711  this->UpdateActivity();
712 
713  // Is it time to exit?
714  {
715  std::lock_guard<std::mutex> lock(this->exitMutex);
716  if (this->exit)
717  timeToExit = true;
718  }
719  }
720 #ifdef _WIN32
721  std::lock_guard<std::mutex> lock(this->exitMutex);
722  this->threadReceptionExiting = true;
723 #endif
724  }
725 
727  private: void RecvDiscoveryUpdate()
728  {
729  char rcvStr[Discovery::kMaxRcvStr];
730  std::string srcAddr;
731  uint16_t srcPort;
732  sockaddr_in clntAddr;
733  socklen_t addrLen = sizeof(clntAddr);
734 
735  if ((recvfrom(this->sockets.at(0),
736  reinterpret_cast<raw_type *>(rcvStr),
737  this->kMaxRcvStr, 0,
738  reinterpret_cast<sockaddr *>(&clntAddr),
739  reinterpret_cast<socklen_t *>(&addrLen))) < 0)
740  {
741  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
742  << std::endl;
743  return;
744  }
745  srcAddr = inet_ntoa(clntAddr.sin_addr);
746  srcPort = ntohs(clntAddr.sin_port);
747 
748  if (this->verbose)
749  {
750  std::cout << "\nReceived discovery update from " << srcAddr << ": "
751  << srcPort << std::endl;
752  }
753 
754  this->DispatchDiscoveryMsg(srcAddr, rcvStr);
755  }
756 
757 
761  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
762  char *_msg)
763  {
764  Header header;
765  char *pBody = _msg;
766 
767  // Create the header from the raw bytes.
768  header.Unpack(_msg);
769  pBody += header.HeaderLength();
770 
771  // Discard the message if the wire protocol is different than mine.
772  if (this->kWireVersion != header.Version())
773  return;
774 
775  auto recvPUuid = header.PUuid();
776 
777  // Discard our own discovery messages.
778  if (recvPUuid == this->pUuid)
779  return;
780 
781  // Update timestamp and cache the callbacks.
782  DiscoveryCallback<Pub> connectCb;
783  DiscoveryCallback<Pub> disconnectCb;
784  {
785  std::lock_guard<std::mutex> lock(this->mutex);
786  this->activity[recvPUuid] = std::chrono::steady_clock::now();
787  connectCb = this->connectionCb;
788  disconnectCb = this->disconnectionCb;
789  }
790 
791  switch (header.Type())
792  {
793  case AdvType:
794  {
795  // Read the rest of the fields.
796  transport::AdvertiseMessage<Pub> advMsg;
797  advMsg.Unpack(pBody);
798 
799  // Check scope of the topic.
800  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
801  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
802  _fromIp != this->hostAddr))
803  {
804  return;
805  }
806 
807  // Register an advertised address for the topic.
808  bool added;
809  {
810  std::lock_guard<std::mutex> lock(this->mutex);
811  added = this->info.AddPublisher(advMsg.Publisher());
812  }
813 
814  if (added && connectCb)
815  {
816  // Execute the client's callback.
817  connectCb(advMsg.Publisher());
818  }
819 
820  break;
821  }
822  case SubType:
823  {
824  // Read the rest of the fields.
825  SubscriptionMsg subMsg;
826  subMsg.Unpack(pBody);
827  auto recvTopic = subMsg.Topic();
828 
829  // Check if at least one of my nodes advertises the topic requested.
830  Addresses_M<Pub> addresses;
831  {
832  std::lock_guard<std::mutex> lock(this->mutex);
833  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
834  {
835  break;
836  }
837 
838  if (!this->info.Publishers(recvTopic, addresses))
839  break;
840  }
841 
842  for (const auto &nodeInfo : addresses[this->pUuid])
843  {
844  // Check scope of the topic.
845  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
846  (nodeInfo.Options().Scope() == Scope_t::HOST &&
847  _fromIp != this->hostAddr))
848  {
849  continue;
850  }
851 
852  // Answer an ADVERTISE message.
853  this->SendMsg(AdvType, nodeInfo);
854  }
855 
856  break;
857  }
858  case HeartbeatType:
859  {
860  // The timestamp has already been updated.
861  break;
862  }
863  case ByeType:
864  {
865  // Remove the activity entry for this publisher.
866  {
867  std::lock_guard<std::mutex> lock(this->mutex);
868  this->activity.erase(recvPUuid);
869  }
870 
871  if (disconnectCb)
872  {
873  Pub pub;
874  pub.SetPUuid(recvPUuid);
875  // Notify the new disconnection.
876  disconnectCb(pub);
877  }
878 
879  // Remove the address entry for this topic.
880  {
881  std::lock_guard<std::mutex> lock(this->mutex);
882  this->info.DelPublishersByProc(recvPUuid);
883  }
884 
885  break;
886  }
887  case UnadvType:
888  {
889  // Read the address.
890  transport::AdvertiseMessage<Pub> advMsg;
891  advMsg.Unpack(pBody);
892 
893  // Check scope of the topic.
894  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
895  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
896  _fromIp != this->hostAddr))
897  {
898  return;
899  }
900 
901  if (disconnectCb)
902  {
903  // Notify the new disconnection.
904  disconnectCb(advMsg.Publisher());
905  }
906 
907  // Remove the address entry for this topic.
908  {
909  std::lock_guard<std::mutex> lock(this->mutex);
910  this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
911  advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
912  }
913 
914  break;
915  }
916  default:
917  {
918  std::cerr << "Unknown message type [" << header.Type() << "]\n";
919  break;
920  }
921  }
922  }
923 
930  private: template<typename T>
931  void SendMsg(const uint8_t _type,
932  const T &_pub,
933  const uint16_t _flags = 0) const
934  {
935  // Create the header.
936  Header header(this->Version(), _pub.PUuid(), _type, _flags);
937  auto msgLength = 0;
938  std::vector<char> buffer;
939 
940  std::string topic = _pub.Topic();
941 
942  switch (_type)
943  {
944  case AdvType:
945  case UnadvType:
946  {
947  // Create the [UN]ADVERTISE message.
948  transport::AdvertiseMessage<T> advMsg(header, _pub);
949 
950  // Allocate a buffer and serialize the message.
951  buffer.resize(advMsg.MsgLength());
952  advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
953  msgLength = static_cast<int>(advMsg.MsgLength());
954  break;
955  }
956  case SubType:
957  {
958  // Create the [UN]SUBSCRIBE message.
959  SubscriptionMsg subMsg(header, topic);
960 
961  // Allocate a buffer and serialize the message.
962  buffer.resize(subMsg.MsgLength());
963  subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
964  msgLength = static_cast<int>(subMsg.MsgLength());
965  break;
966  }
967  case HeartbeatType:
968  case ByeType:
969  {
970  // Allocate a buffer and serialize the message.
971  buffer.resize(header.HeaderLength());
972  header.Pack(reinterpret_cast<char*>(&buffer[0]));
973  msgLength = header.HeaderLength();
974  break;
975  }
976  default:
977  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
978  << " type [" << _type << "]" << std::endl;
979  return;
980  }
981 
982  // Send the discovery message to the multicast group through all the
983  // sockets.
984  for (const auto &sock : this->Sockets())
985  {
986  if (sendto(sock, reinterpret_cast<const raw_type *>(
987  reinterpret_cast<unsigned char*>(&buffer[0])),
988  msgLength, 0,
989  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
990  sizeof(*(this->MulticastAddr()))) != msgLength)
991  {
992  std::cerr << "Exception sending a message" << std::endl;
993  return;
994  }
995  }
996 
997  if (this->Verbose())
998  {
999  std::cout << "\t* Sending " << MsgTypesStr[_type]
1000  << " msg [" << topic << "]" << std::endl;
1001  }
1002  }
1003 
1006  private: const std::vector<int> &Sockets() const
1007  {
1008  return this->sockets;
1009  }
1010 
1013  private: const sockaddr_in *MulticastAddr() const
1014  {
1015  return &this->mcastAddr;
1016  }
1017 
1020  private: bool Verbose() const
1021  {
1022  return this->verbose;
1023  }
1024 
1027  private: uint8_t Version() const
1028  {
1029  return this->kWireVersion;
1030  }
1031 
1036  private: bool RegisterNetIface(const std::string &_ip)
1037  {
1038  // Make a new socket for sending discovery information.
1039  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1040  if (sock < 0)
1041  {
1042  std::cerr << "Socket creation failed." << std::endl;
1043  return false;
1044  }
1045 
1046  // Socket option: IP_MULTICAST_IF.
1047  // This socket option needs to be applied to each socket used to send
1048  // data. This option selects the source interface for outgoing messages.
1049  struct in_addr ifAddr;
1050  ifAddr.s_addr = inet_addr(_ip.c_str());
1051  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1052  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1053  {
1054  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1055  << std::endl;
1056  return false;
1057  }
1058 
1059  this->sockets.push_back(sock);
1060 
1061  // Join the multicast group. We have to do it for each network interface
1062  // but we can do it on the same socket. We will use the socket at
1063  // position 0 for receiving multicast information.
1064  struct ip_mreq group;
1065  group.imr_multiaddr.s_addr =
1066  inet_addr(this->kMulticastGroup.c_str());
1067  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1068  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1069  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1070  {
1071  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1072  << std::endl;
1073  return false;
1074  }
1075 
1076  return true;
1077  }
1078 
1082  private: static const unsigned int kDefActivityInterval = 100;
1083 
1087  private: static const unsigned int kDefHeartbeatInterval = 1000;
1088 
1092  private: static const unsigned int kDefSilenceInterval = 3000;
1093 
1095  private: const std::string kMulticastGroup = "224.0.0.7";
1096 
1098  private: const int kTimeout = 250;
1099 
1101  private: static const int kMaxRcvStr = 65536;
1102 
1105  private: static const uint8_t kWireVersion = 8;
1106 
1108  private: int port;
1109 
1111  private: std::string hostAddr;
1112 
1114  private: std::vector<std::string> hostInterfaces;
1115 
1117  private: std::string pUuid;
1118 
1122  private: unsigned int silenceInterval;
1123 
1127  private: unsigned int activityInterval;
1128 
1132  private: unsigned int heartbeatInterval;
1133 
1135  private: DiscoveryCallback<Pub> connectionCb;
1136 
1138  private: DiscoveryCallback<Pub> disconnectionCb;
1139 
1141  private: TopicStorage<Pub> info;
1142 
1147  protected: std::map<std::string, Timestamp> activity;
1148 
1150  private: bool verbose;
1151 
1153  private: std::vector<int> sockets;
1154 
1156  private: sockaddr_in mcastAddr;
1157 
1159  private: mutable std::mutex mutex;
1160 
1162  private: std::thread threadReception;
1163 
1165  private: Timestamp timeNextHeartbeat;
1166 
1168  private: Timestamp timeNextActivity;
1169 
1171  private: std::mutex exitMutex;
1172 
1177  private: bool initialized;
1178 
1180  private: unsigned int numHeartbeatsUninitialized;
1181 
1183  private: mutable std::condition_variable initializedCv;
1184 
1186  private: bool exit;
1187 
1188 #ifdef _WIN32
1189  private: bool threadReceptionExiting = true;
1191 #endif
1192 
1194  private: bool enabled;
1195  };
1196 
1200 
1204  }
1205 }
1206 
1207 #endif
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:205
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:416
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:94
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
bool Discover(const std::string &_topic) const
Request discovery information about a topic.
Definition: Discovery.hh:314
ignition/transport/Publisher.hh
Definition: Publisher.hh:37
std::map< std::string, std::vector< T >> Addresses_M
Definition: TransportTypes.hh:53
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:465
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events.
Definition: Discovery.hh:494
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:283
void raw_type
Definition: Discovery.hh:43
A discovery class that implements a distributed topic discovery protocol.
Definition: Discovery.hh:87
Topic/service only available to subscribers in the same process as the publisher. ...
static const uint8_t HeartbeatType
Definition: Packet.hh:38
void Start()
Start the discovery service.
Definition: Discovery.hh:254
static const uint8_t ByeType
Definition: Packet.hh:39
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:54
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:426
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:544
static const uint8_t SubType
Definition: Packet.hh:36
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:474
std::map< std::string, Timestamp > activity
Activity information.
Definition: Discovery.hh:1147
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events.
Definition: Discovery.hh:484
std::function< void(const T &_publisher)> DiscoveryCallback
Definition: TransportTypes.hh:105
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:364
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:456
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:57
IGNITION_TRANSPORT_VISIBLE std::string determineHost()
Determine IP or hostname.
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:501
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:374
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:447
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:437
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:129
void WaitForInit() const
Check if ready/initialized.
Definition: Discovery.hh:553
static const uint8_t AdvType
Definition: Packet.hh:35
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message.
Definition: Discovery.hh:388
IGNITION_TRANSPORT_VISIBLE std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine.
Topic/service only available to subscribers in the same machine as the publisher. ...
static const uint8_t UnadvType
Definition: Packet.hh:37
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.