Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 
21 #ifdef _WIN32
22  // For socket(), connect(), send(), and recv().
23  #include <Winsock2.h>
24  #include <Ws2def.h>
25  #include <Ws2ipdef.h>
26  #include <Ws2tcpip.h>
27  // Type used for raw data on this platform.
28  using raw_type = char;
29 #else
30  // For data types
31  #include <sys/types.h>
32  // For socket(), connect(), send(), and recv()
33  #include <sys/socket.h>
34  // For gethostbyname()
35  #include <netdb.h>
36  // For inet_addr()
37  #include <arpa/inet.h>
38  // For close()
39  #include <unistd.h>
40  // For sockaddr_in
41  #include <netinet/in.h>
42  // Type used for raw data on this platform
43  using raw_type = void;
44 #endif
45 
46 #ifdef _WIN32
47  #pragma warning(push, 0)
48 #endif
49 #include <zmq.hpp>
50 #ifdef _WIN32
51  #pragma warning(pop)
52  // Suppress "decorated name length exceed" warning in STL.
53  #pragma warning(disable: 4503)
54  // Suppress "depreted API warnings" in WINSOCK.
55  #pragma warning(disable: 4996)
56 #endif
57 
58 #include <algorithm>
59 #include <condition_variable>
60 #include <map>
61 #include <memory>
62 #include <mutex>
63 #include <string>
64 #include <thread>
65 #include <vector>
66 
73 
74 namespace ignition
75 {
76  namespace transport
77  {
86  template<typename Pub>
88  {
94  public: Discovery(const std::string &_pUuid,
95  const int _port,
96  const bool _verbose = false)
97  : port(_port),
98  hostAddr(determineHost()),
99  pUuid(_pUuid),
100  silenceInterval(kDefSilenceInterval),
101  activityInterval(kDefActivityInterval),
102  heartbeatInterval(kDefHeartbeatInterval),
103  connectionCb(nullptr),
104  disconnectionCb(nullptr),
105  verbose(_verbose),
106  initialized(false),
107  numHeartbeatsUninitialized(0),
108  exit(false),
109  enabled(false)
110  {
111  std::string ignIp;
112  if (env("IGN_IP", ignIp) && !ignIp.empty())
113  this->hostInterfaces = {ignIp};
114  else
115  {
116  // Get the list of network interfaces in this host.
117  this->hostInterfaces = determineInterfaces();
118  }
119 
120 #ifdef _WIN32
121  WORD wVersionRequested;
122  WSADATA wsaData;
123 
124  // Request WinSock v2.2.
125  wVersionRequested = MAKEWORD(2, 2);
126  // Load WinSock DLL.
127  if (WSAStartup(wVersionRequested, &wsaData) != 0)
128  {
129  std::cerr << "Unable to load WinSock DLL" << std::endl;
130  return;
131  }
132 #endif
133  for (const auto &netIface : this->hostInterfaces)
134  {
135  auto succeed = this->RegisterNetIface(netIface);
136 
137  // If the IP address that we're selecting as the main IP address of
138  // the host is invalid, we change it to 127.0.0.1 .
139  // This is probably because IGN_IP is set to a wrong value.
140  if (netIface == this->hostAddr && !succeed)
141  {
142  this->RegisterNetIface("127.0.0.1");
143  std::cerr << "Did you set the environment variable IGN_IP with a "
144  << "correct IP address? " << std::endl
145  << " [" << netIface << "] seems an invalid local IP "
146  << "address." << std::endl
147  << " Using 127.0.0.1 as hostname." << std::endl;
148  this->hostAddr = "127.0.0.1";
149  }
150  }
151 
152  // Socket option: SO_REUSEADDR. This options is used only for receiving
153  // data. We can reuse the same socket for receiving multicast data from
154  // multiple interfaces. We will use the socket at position 0 for
155  // receiving data.
156  int reuseAddr = 1;
157  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
158  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
159  {
160  std::cerr << "Error setting socket option (SO_REUSEADDR)."
161  << std::endl;
162  return;
163  }
164 
165 #ifdef SO_REUSEPORT
166  // Socket option: SO_REUSEPORT. This options is used only for receiving
167  // data. We can reuse the same socket for receiving multicast data from
168  // multiple interfaces. We will use the socket at position 0 for
169  // receiving data.
170  int reusePort = 1;
171  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
172  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
173  {
174  std::cerr << "Error setting socket option (SO_REUSEPORT)."
175  << std::endl;
176  return;
177  }
178 #endif
179  // Bind the first socket to the discovery port.
180  sockaddr_in localAddr;
181  memset(&localAddr, 0, sizeof(localAddr));
182  localAddr.sin_family = AF_INET;
183  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
184  localAddr.sin_port = htons(static_cast<u_short>(this->port));
185 
186  if (bind(this->sockets.at(0),
187  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
188  {
189  std::cerr << "Binding to a local port failed." << std::endl;
190  return;
191  }
192 
193  // Set 'mcastAddr' to the multicast discovery group.
194  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
195  this->mcastAddr.sin_family = AF_INET;
196  this->mcastAddr.sin_addr.s_addr =
197  inet_addr(this->kMulticastGroup.c_str());
198  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
199 
200  if (this->verbose)
201  this->PrintCurrentState();
202  }
203 
205  public: virtual ~Discovery()
206  {
207  // Tell the service thread to terminate.
208  this->exitMutex.lock();
209  this->exit = true;
210  this->exitMutex.unlock();
211 
212  // Don't join on Windows, because it can hang when this object
213  // is destructed on process exit (e.g., when it's a global static).
214  // I think that it's due to this bug:
215  // https://connect.microsoft.com/VisualStudio/feedback/details/747145/std-thread-join-hangs-if-called-after-main-exits-when-using-vs2012-rc
216 #ifndef _WIN32
217  // Wait for the service threads to finish before exit.
218  if (this->threadReception.joinable())
219  this->threadReception.join();
220 #else
221  bool exitLoop = false;
222  while (!exitLoop)
223  {
224  std::lock_guard<std::mutex> lock(this->exitMutex);
225  {
226  if (this->threadReceptionExiting)
227  {
228  exitLoop = true;
229  }
230  }
231  std::this_thread::sleep_for(std::chrono::milliseconds(50));
232  }
233 #endif
234  // Broadcast a BYE message to trigger the remote cancellation of
235  // all our advertised topics.
236  this->SendMsg(ByeType,
237  Publisher("", "", this->pUuid, "", Scope_t::ALL));
238 
239  // Close sockets.
240  for (const auto &sock : this->sockets)
241  {
242 #ifdef _WIN32
243  closesocket(sock);
244  WSACleanup();
245 #else
246  close(sock);
247 #endif
248  }
249  }
250 
254  public: void Start()
255  {
256  {
257  std::lock_guard<std::mutex> lock(this->mutex);
258 
259  // The service is already running.
260  if (this->enabled)
261  return;
262 
263  this->enabled = true;
264  }
265 
266  auto now = std::chrono::steady_clock::now();
267  this->timeNextHeartbeat = now;
268  this->timeNextActivity = now;
269 
270  // Start the thread that receives discovery information.
271  this->threadReception = std::thread(&Discovery::RecvMessages, this);
272 
273 #ifdef _WIN32
274  this->threadReceptionExiting = false;
275  this->threadReception.detach();
276 #endif
277  }
278 
283  public: bool Advertise(const Pub &_publisher)
284  {
285  {
286  std::lock_guard<std::mutex> lock(this->mutex);
287 
288  if (!this->enabled)
289  return false;
290 
291  // Add the addressing information (local publisher).
292  if (!this->info.AddPublisher(_publisher))
293  return false;
294  }
295 
296  // Only advertise a message outside this process if the scope
297  // is not 'Process'
298  if (_publisher.Scope() != Scope_t::PROCESS)
299  this->SendMsg(AdvType, _publisher);
300 
301  return true;
302  }
303 
314  public: bool Discover(const std::string &_topic) const
315  {
317  bool found;
318  Addresses_M<Pub> addresses;
319 
320  {
321  std::lock_guard<std::mutex> lock(this->mutex);
322 
323  if (!this->enabled)
324  return false;
325 
326  cb = this->connectionCb;
327  }
328 
329  Pub pub;
330  pub.SetTopic(_topic);
331  pub.SetPUuid(this->pUuid);
332  pub.SetScope(Scope_t::ALL);
333 
334  // Send a discovery request.
335  this->SendMsg(SubType, pub);
336 
337  {
338  std::lock_guard<std::mutex> lock(this->mutex);
339  found = this->info.Publishers(_topic, addresses);
340  }
341 
342  if (found)
343  {
344  // I already have information about this topic.
345  for (const auto &proc : addresses)
346  {
347  for (const auto &node : proc.second)
348  {
349  if (cb)
350  {
351  // Execute the user's callback for a service request. Notice
352  // that we only execute one callback for preventing receive
353  // multiple service responses for a single request.
354  cb(node);
355  }
356  }
357  }
358  }
359 
360  return true;
361  }
362 
365  public: const TopicStorage<Pub> &Info() const
366  {
367  std::lock_guard<std::mutex> lock(this->mutex);
368  return this->info;
369  }
370 
375  public: bool Publishers(const std::string &_topic,
376  Addresses_M<Pub> &_publishers) const
377  {
378  std::lock_guard<std::mutex> lock(this->mutex);
379  return this->info.Publishers(_topic, _publishers);
380  }
381 
389  public: bool Unadvertise(const std::string &_topic,
390  const std::string &_nUuid)
391  {
392  Pub inf;
393  {
394  std::lock_guard<std::mutex> lock(this->mutex);
395 
396  if (!this->enabled)
397  return false;
398 
399  // Don't do anything if the topic is not advertised by any of my nodes
400  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
401  return true;
402 
403  // Remove the topic information.
404  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
405  }
406 
407  // Only unadvertise a message outside this process if the scope
408  // is not 'Process'.
409  if (inf.Scope() != Scope_t::PROCESS)
410  this->SendMsg(UnadvType, inf);
411 
412  return true;
413  }
414 
417  public: std::string HostAddr() const
418  {
419  std::lock_guard<std::mutex> lock(this->mutex);
420  return this->hostAddr;
421  }
422 
427  public: unsigned int ActivityInterval() const
428  {
429  std::lock_guard<std::mutex> lock(this->mutex);
430  return this->activityInterval;
431  }
432 
438  public: unsigned int HeartbeatInterval() const
439  {
440  std::lock_guard<std::mutex> lock(this->mutex);
441  return this->heartbeatInterval;
442  }
443 
448  public: unsigned int SilenceInterval() const
449  {
450  std::lock_guard<std::mutex> lock(this->mutex);
451  return this->silenceInterval;
452  }
453 
457  public: void SetActivityInterval(const unsigned int _ms)
458  {
459  std::lock_guard<std::mutex> lock(this->mutex);
460  this->activityInterval = _ms;
461  }
462 
466  public: void SetHeartbeatInterval(const unsigned int _ms)
467  {
468  std::lock_guard<std::mutex> lock(this->mutex);
469  this->heartbeatInterval = _ms;
470  }
471 
475  public: void SetSilenceInterval(const unsigned int _ms)
476  {
477  std::lock_guard<std::mutex> lock(this->mutex);
478  this->silenceInterval = _ms;
479  }
480 
485  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
486  {
487  std::lock_guard<std::mutex> lock(this->mutex);
488  this->connectionCb = _cb;
489  }
490 
495  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  this->disconnectionCb = _cb;
499  }
500 
502  public: void PrintCurrentState() const
503  {
504  std::lock_guard<std::mutex> lock(this->mutex);
505 
506  std::cout << "---------------" << std::endl;
507  std::cout << std::boolalpha << "Enabled: "
508  << this->enabled << std::endl;
509  std::cout << "Discovery state" << std::endl;
510  std::cout << "\tUUID: " << this->pUuid << std::endl;
511  std::cout << "Settings" << std::endl;
512  std::cout << "\tActivity: " << this->activityInterval
513  << " ms." << std::endl;
514  std::cout << "\tHeartbeat: " << this->heartbeatInterval
515  << "ms." << std::endl;
516  std::cout << "\tSilence: " << this->silenceInterval
517  << " ms." << std::endl;
518  std::cout << "Known information: " << std::endl;
519  this->info.Print();
520 
521  // Used to calculate the elapsed time.
522  Timestamp now = std::chrono::steady_clock::now();
523 
524  std::cout << "Activity" << std::endl;
525  if (this->activity.empty())
526  std::cout << "\t<empty>" << std::endl;
527  else
528  {
529  for (auto &proc : this->activity)
530  {
531  // Elapsed time since the last update from this publisher.
532  std::chrono::duration<double> elapsed = now - proc.second;
533 
534  std::cout << "\t" << proc.first << std::endl;
535  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
536  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
537  << std::endl;
538  }
539  }
540  std::cout << "---------------" << std::endl;
541  }
542 
545  public: void TopicList(std::vector<std::string> &_topics) const
546  {
547  this->WaitForInit();
548  std::lock_guard<std::mutex> lock(this->mutex);
549  this->info.TopicList(_topics);
550  }
551 
554  public: void WaitForInit() const
555  {
556  std::unique_lock<std::mutex> lk(this->mutex);
557 
558  if (!this->initialized)
559  {
560  this->initializedCv.wait(lk, [this]{return this->initialized;});
561  }
562  }
563 
567  private: void UpdateActivity()
568  {
569  Timestamp now = std::chrono::steady_clock::now();
570 
571  std::lock_guard<std::mutex> lock(this->mutex);
572 
573  if (now < this->timeNextActivity)
574  return;
575 
576  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
577  {
578  // Elapsed time since the last update from this publisher.
579  auto elapsed = now - it->second;
580 
581  // This publisher has expired.
582  if (std::chrono::duration_cast<std::chrono::milliseconds>
583  (elapsed).count() > this->silenceInterval)
584  {
585  // Remove all the info entries for this process UUID.
586  this->info.DelPublishersByProc(it->first);
587 
588  // Notify without topic information. This is useful to inform the
589  // client that a remote node is gone, even if we were not
590  // interested in its topics.
591  Pub publisher;
592  publisher.SetPUuid(it->first);
593  publisher.SetScope(Scope_t::ALL);
594  this->disconnectionCb(publisher);
595 
596  // Remove the activity entry.
597  this->activity.erase(it++);
598  }
599  else
600  ++it;
601  }
602 
603  this->timeNextActivity = std::chrono::steady_clock::now() +
604  std::chrono::milliseconds(this->activityInterval);
605  }
606 
608  private: void UpdateHeartbeat()
609  {
610  Timestamp now = std::chrono::steady_clock::now();
611 
612  {
613  std::lock_guard<std::mutex> lock(this->mutex);
614 
615  if (now < this->timeNextHeartbeat)
616  return;
617  }
618 
619  Publisher pub("", "", this->pUuid, "", Scope_t::ALL);
620  this->SendMsg(HeartbeatType, pub);
621 
622  std::map<std::string, std::vector<Pub>> nodes;
623  {
624  std::lock_guard<std::mutex> lock(this->mutex);
625 
626  // Re-advertise topics that are advertised inside this process.
627  this->info.PublishersByProc(this->pUuid, nodes);
628  }
629 
630  for (const auto &topic : nodes)
631  {
632  for (const auto &node : topic.second)
633  this->SendMsg(AdvType, node);
634  }
635 
636  {
637  std::lock_guard<std::mutex> lock(this->mutex);
638  if (!this->initialized)
639  {
640  ++this->numHeartbeatsUninitialized;
641  if (this->numHeartbeatsUninitialized == 2)
642  {
643  // We consider the discovery initialized after two cycles of
644  // heartbeats sent.
645  this->initialized = true;
646 
647  // Notify anyone waiting for the initialization phase to finish.
648  this->initializedCv.notify_all();
649  }
650  }
651 
652  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
653  std::chrono::milliseconds(this->heartbeatInterval);
654  }
655  }
656 
666  private: int NextTimeout() const
667  {
668  auto now = std::chrono::steady_clock::now();
669  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
670  auto timeUntilNextActivity = this->timeNextActivity - now;
671 
672  int t = static_cast<int>(
673  std::chrono::duration_cast<std::chrono::milliseconds>
674  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
675  int t2 = std::min(t, this->kTimeout);
676  return std::max(t2, 0);
677  }
678 
680  private: void RecvMessages()
681  {
682  bool timeToExit = false;
683  while (!timeToExit)
684  {
685  // Poll socket for a reply, with timeout.
686  zmq::pollitem_t items[] =
687  {
688  {0, this->sockets.at(0), ZMQ_POLLIN, 0},
689  };
690 
691  // Calculate the timeout.
692  int timeout = this->NextTimeout();
693 
694  try
695  {
696  zmq::poll(&items[0], sizeof(items) / sizeof(items[0]), timeout);
697  }
698  catch(...)
699  {
700  continue;
701  }
702 
703  // If we got a reply, process it.
704  if (items[0].revents & ZMQ_POLLIN)
705  {
706  this->RecvDiscoveryUpdate();
707 
708  if (this->verbose)
709  this->PrintCurrentState();
710  }
711 
712  this->UpdateHeartbeat();
713  this->UpdateActivity();
714 
715  // Is it time to exit?
716  {
717  std::lock_guard<std::mutex> lock(this->exitMutex);
718  if (this->exit)
719  timeToExit = true;
720  }
721  }
722 #ifdef _WIN32
723  std::lock_guard<std::mutex> lock(this->exitMutex);
724  this->threadReceptionExiting = true;
725 #endif
726  }
727 
729  private: void RecvDiscoveryUpdate()
730  {
731  char rcvStr[Discovery::kMaxRcvStr];
732  std::string srcAddr;
733  uint16_t srcPort;
734  sockaddr_in clntAddr;
735  socklen_t addrLen = sizeof(clntAddr);
736 
737  if ((recvfrom(this->sockets.at(0),
738  reinterpret_cast<raw_type *>(rcvStr),
739  this->kMaxRcvStr, 0,
740  reinterpret_cast<sockaddr *>(&clntAddr),
741  reinterpret_cast<socklen_t *>(&addrLen))) < 0)
742  {
743  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
744  << std::endl;
745  return;
746  }
747  srcAddr = inet_ntoa(clntAddr.sin_addr);
748  srcPort = ntohs(clntAddr.sin_port);
749 
750  if (this->verbose)
751  {
752  std::cout << "\nReceived discovery update from " << srcAddr << ": "
753  << srcPort << std::endl;
754  }
755 
756  this->DispatchDiscoveryMsg(srcAddr, rcvStr);
757  }
758 
759 
763  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
764  char *_msg)
765  {
766  Header header;
767  char *pBody = _msg;
768 
769  // Create the header from the raw bytes.
770  header.Unpack(_msg);
771  pBody += header.HeaderLength();
772 
773  // Discard the message if the wire protocol is different than mine.
774  if (this->kWireVersion != header.Version())
775  return;
776 
777  auto recvPUuid = header.PUuid();
778 
779  // Discard our own discovery messages.
780  if (recvPUuid == this->pUuid)
781  return;
782 
783  // Update timestamp and cache the callbacks.
784  DiscoveryCallback<Pub> connectCb;
785  DiscoveryCallback<Pub> disconnectCb;
786  {
787  std::lock_guard<std::mutex> lock(this->mutex);
788  this->activity[recvPUuid] = std::chrono::steady_clock::now();
789  connectCb = this->connectionCb;
790  disconnectCb = this->disconnectionCb;
791  }
792 
793  switch (header.Type())
794  {
795  case AdvType:
796  {
797  // Read the rest of the fields.
799  advMsg.Unpack(pBody);
800 
801  // Check scope of the topic.
802  if ((advMsg.Publisher().Scope() == Scope_t::PROCESS) ||
803  (advMsg.Publisher().Scope() == Scope_t::HOST &&
804  _fromIp != this->hostAddr))
805  {
806  return;
807  }
808 
809  // Register an advertised address for the topic.
810  bool added;
811  {
812  std::lock_guard<std::mutex> lock(this->mutex);
813  added = this->info.AddPublisher(advMsg.Publisher());
814  }
815 
816  if (added && connectCb)
817  {
818  // Execute the client's callback.
819  connectCb(advMsg.Publisher());
820  }
821 
822  break;
823  }
824  case SubType:
825  {
826  // Read the rest of the fields.
827  SubscriptionMsg subMsg;
828  subMsg.Unpack(pBody);
829  auto recvTopic = subMsg.Topic();
830 
831  // Check if at least one of my nodes advertises the topic requested.
832  Addresses_M<Pub> addresses;
833  {
834  std::lock_guard<std::mutex> lock(this->mutex);
835  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
836  {
837  break;
838  }
839 
840  if (!this->info.Publishers(recvTopic, addresses))
841  break;
842  }
843 
844  for (const auto &nodeInfo : addresses[this->pUuid])
845  {
846  // Check scope of the topic.
847  if ((nodeInfo.Scope() == Scope_t::PROCESS) ||
848  (nodeInfo.Scope() == Scope_t::HOST &&
849  _fromIp != this->hostAddr))
850  {
851  continue;
852  }
853 
854  // Answer an ADVERTISE message.
855  this->SendMsg(AdvType, nodeInfo);
856  }
857 
858  break;
859  }
860  case HeartbeatType:
861  {
862  // The timestamp has already been updated.
863  break;
864  }
865  case ByeType:
866  {
867  // Remove the activity entry for this publisher.
868  {
869  std::lock_guard<std::mutex> lock(this->mutex);
870  this->activity.erase(recvPUuid);
871  }
872 
873  if (disconnectCb)
874  {
875  Pub pub;
876  pub.SetPUuid(recvPUuid);
877  pub.SetScope(Scope_t::ALL);
878  // Notify the new disconnection.
879  disconnectCb(pub);
880  }
881 
882  // Remove the address entry for this topic.
883  {
884  std::lock_guard<std::mutex> lock(this->mutex);
885  this->info.DelPublishersByProc(recvPUuid);
886  }
887 
888  break;
889  }
890  case UnadvType:
891  {
892  // Read the address.
894  advMsg.Unpack(pBody);
895 
896  // Check scope of the topic.
897  if ((advMsg.Publisher().Scope() == Scope_t::PROCESS) ||
898  (advMsg.Publisher().Scope() == Scope_t::HOST &&
899  _fromIp != this->hostAddr))
900  {
901  return;
902  }
903 
904  if (disconnectCb)
905  {
906  // Notify the new disconnection.
907  disconnectCb(advMsg.Publisher());
908  }
909 
910  // Remove the address entry for this topic.
911  {
912  std::lock_guard<std::mutex> lock(this->mutex);
913  this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
914  advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
915  }
916 
917  break;
918  }
919  default:
920  {
921  std::cerr << "Unknown message type [" << header.Type() << "]\n";
922  break;
923  }
924  }
925  }
926 
933  private: template<typename T>
934  void SendMsg(const uint8_t _type,
935  const T &_pub,
936  const uint16_t _flags = 0) const
937  {
938  // Create the header.
939  Header header(this->Version(), _pub.PUuid(), _type, _flags);
940  auto msgLength = 0;
941  std::vector<char> buffer;
942 
943  std::string topic = _pub.Topic();
944 
945  switch (_type)
946  {
947  case AdvType:
948  case UnadvType:
949  {
950  // Create the [UN]ADVERTISE message.
951  transport::AdvertiseMessage<T> advMsg(header, _pub);
952 
953  // Allocate a buffer and serialize the message.
954  buffer.resize(advMsg.MsgLength());
955  advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
956  msgLength = static_cast<int>(advMsg.MsgLength());
957  break;
958  }
959  case SubType:
960  {
961  // Create the [UN]SUBSCRIBE message.
962  SubscriptionMsg subMsg(header, topic);
963 
964  // Allocate a buffer and serialize the message.
965  buffer.resize(subMsg.MsgLength());
966  subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
967  msgLength = static_cast<int>(subMsg.MsgLength());
968  break;
969  }
970  case HeartbeatType:
971  case ByeType:
972  {
973  // Allocate a buffer and serialize the message.
974  buffer.resize(header.HeaderLength());
975  header.Pack(reinterpret_cast<char*>(&buffer[0]));
976  msgLength = header.HeaderLength();
977  break;
978  }
979  default:
980  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
981  << " type [" << _type << "]" << std::endl;
982  return;
983  }
984 
985  // Send the discovery message to the multicast group through all the
986  // sockets.
987  for (const auto &sock : this->Sockets())
988  {
989  if (sendto(sock, reinterpret_cast<const raw_type *>(
990  reinterpret_cast<unsigned char*>(&buffer[0])),
991  msgLength, 0,
992  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
993  sizeof(*(this->MulticastAddr()))) != msgLength)
994  {
995  std::cerr << "Exception sending a message" << std::endl;
996  return;
997  }
998  }
999 
1000  if (this->Verbose())
1001  {
1002  std::cout << "\t* Sending " << MsgTypesStr[_type]
1003  << " msg [" << topic << "]" << std::endl;
1004  }
1005  }
1006 
1009  private: const std::vector<int> &Sockets() const
1010  {
1011  return this->sockets;
1012  }
1013 
1016  private: const sockaddr_in *MulticastAddr() const
1017  {
1018  return &this->mcastAddr;
1019  }
1020 
1023  private: bool Verbose() const
1024  {
1025  return this->verbose;
1026  }
1027 
1030  private: uint8_t Version() const
1031  {
1032  return this->kWireVersion;
1033  }
1034 
1039  private: bool RegisterNetIface(const std::string &_ip)
1040  {
1041  // Make a new socket for sending discovery information.
1042  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1043  if (sock < 0)
1044  {
1045  std::cerr << "Socket creation failed." << std::endl;
1046  return false;
1047  }
1048 
1049  // Socket option: IP_MULTICAST_IF.
1050  // This socket option needs to be applied to each socket used to send
1051  // data. This option selects the source interface for outgoing messages.
1052  struct in_addr ifAddr;
1053  ifAddr.s_addr = inet_addr(_ip.c_str());
1054  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1055  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1056  {
1057  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1058  << std::endl;
1059  return false;
1060  }
1061 
1062  this->sockets.push_back(sock);
1063 
1064  // Join the multicast group. We have to do it for each network interface
1065  // but we can do it on the same socket. We will use the socket at
1066  // position 0 for receiving multicast information.
1067  struct ip_mreq group;
1068  group.imr_multiaddr.s_addr =
1069  inet_addr(this->kMulticastGroup.c_str());
1070  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1071  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1072  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1073  {
1074  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1075  << std::endl;
1076  return false;
1077  }
1078 
1079  return true;
1080  }
1081 
1085  private: static const unsigned int kDefActivityInterval = 100;
1086 
1090  private: static const unsigned int kDefHeartbeatInterval = 1000;
1091 
1095  private: static const unsigned int kDefSilenceInterval = 3000;
1096 
1098  private: const std::string kMulticastGroup = "224.0.0.7";
1099 
1101  private: const int kTimeout = 250;
1102 
1104  private: static const int kMaxRcvStr = 65536;
1105 
1108  private: static const uint8_t kWireVersion = 6;
1109 
1111  private: int port;
1112 
1114  private: std::string hostAddr;
1115 
1117  private: std::vector<std::string> hostInterfaces;
1118 
1120  private: std::string pUuid;
1121 
1125  private: unsigned int silenceInterval;
1126 
1130  private: unsigned int activityInterval;
1131 
1135  private: unsigned int heartbeatInterval;
1136 
1138  private: DiscoveryCallback<Pub> connectionCb;
1139 
1141  private: DiscoveryCallback<Pub> disconnectionCb;
1142 
1144  private: TopicStorage<Pub> info;
1145 
1150  protected: std::map<std::string, Timestamp> activity;
1151 
1153  private: bool verbose;
1154 
1156  private: std::vector<int> sockets;
1157 
1159  private: sockaddr_in mcastAddr;
1160 
1162  private: mutable std::mutex mutex;
1163 
1165  private: std::thread threadReception;
1166 
1168  private: Timestamp timeNextHeartbeat;
1169 
1171  private: Timestamp timeNextActivity;
1172 
1174  private: std::mutex exitMutex;
1175 
1180  private: bool initialized;
1181 
1183  private: unsigned int numHeartbeatsUninitialized;
1184 
1186  private: mutable std::condition_variable initializedCv;
1187 
1189  private: bool exit;
1190 
1191 #ifdef _WIN32
1192  private: bool threadReceptionExiting = true;
1194 #endif
1195 
1197  private: bool enabled;
1198  };
1199 
1203 
1207  }
1208 }
1209 
1210 #endif
std::string Topic() const
Get the topic.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:205
uint16_t Version() const
Get the discovery protocol version.
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:417
Topic/service available to any subscriber (default scope).
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:94
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
bool Discover(const std::string &_topic) const
Request discovery information about a topic.
Definition: Discovery.hh:314
ignition/transport/Publisher.hh
Definition: Publisher.hh:35
Header included in each discovery message containing the version of the discovery protocol...
Definition: Packet.hh:54
std::map< std::string, std::vector< T >> Addresses_M
Definition: TransportTypes.hh:52
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:158
size_t MsgLength() const
Get the total length of the message.
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:466
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events.
Definition: Discovery.hh:495
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:283
void raw_type
Definition: Discovery.hh:43
std::string PUuid() const
Get the process uuid.
A discovery class that implements a distributed topic discovery protocol.
Definition: Discovery.hh:87
Topic/service only available to subscribers in the same process as the publisher. ...
static const uint8_t HeartbeatType
Definition: Packet.hh:38
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:287
void Start()
Start the discovery service.
Definition: Discovery.hh:254
static const uint8_t ByeType
Definition: Packet.hh:39
size_t Unpack(char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:306
size_t Pack(char *_buffer) const
Serialize the subscription message.
size_t Unpack(const char *_buffer)
Unserialize the header.
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:427
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:545
static const uint8_t SubType
Definition: Packet.hh:36
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:475
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:230
std::map< std::string, Timestamp > activity
Activity information.
Definition: Discovery.hh:1150
size_t Unpack(char *_buffer)
Unserialize a stream of bytes into a Sub.
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events.
Definition: Discovery.hh:485
std::function< void(const T &_publisher)> DiscoveryCallback
Definition: TransportTypes.hh:104
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:365
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:457
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:55
IGNITION_TRANSPORT_VISIBLE std::string determineHost()
Determine IP or hostname.
int HeaderLength() const
Get the header length.
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:502
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:375
uint8_t Type() const
Get the message type.
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:448
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:438
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:279
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:120
void WaitForInit() const
Check if ready/initialized.
Definition: Discovery.hh:554
Definition: AdvertiseOptions.hh:25
static const uint8_t AdvType
Definition: Packet.hh:35
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message.
Definition: Discovery.hh:389
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:256
IGNITION_TRANSPORT_VISIBLE std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine.
Topic/service only available to subscribers in the same machine as the publisher. ...
static const uint8_t UnadvType
Definition: Packet.hh:37
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.