All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SubscriptionHandler.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGN_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
19 #define IGN_TRANSPORT_SUBSCRIPTIONHANDLER_HH_
20 
21 #ifdef _MSC_VER
22 #pragma warning(push, 0)
23 #endif
24 #include <google/protobuf/message.h>
25 #ifdef _MSC_VER
26 #pragma warning(pop)
27 #endif
28 
29 #include <chrono>
30 #include <iostream>
31 #include <memory>
32 #include <string>
33 #include <utility>
34 #include <ignition/msgs/Factory.hh>
35 
41 
42 namespace ignition
43 {
44  namespace transport
45  {
50  {
54  public: explicit ISubscriptionHandler(const std::string &_nUuid,
55  const SubscribeOptions &_opts = SubscribeOptions())
56  : hUuid(Uuid().ToString()),
57  opts(_opts),
58  lastCbTimestamp(std::chrono::seconds{0}),
59  periodNs(0.0),
60  nUuid(_nUuid)
61  {
62  if (this->opts.Throttled())
63  this->periodNs = 1e9 / this->opts.MsgsPerSec();
64  }
65 
67  public: virtual ~ISubscriptionHandler()
68  {
69  }
70 
75  public: virtual bool RunLocalCallback(const ProtoMsg &_msg,
76  const MessageInfo &_info) = 0;
77 
82  public: virtual const std::shared_ptr<ProtoMsg> CreateMsg(
83  const std::string &_data,
84  const std::string &_type) const = 0;
85 
89  public: virtual std::string TypeName() = 0;
90 
93  public: std::string NodeUuid() const
94  {
95  return this->nUuid;
96  }
97 
100  public: std::string HandlerUuid() const
101  {
102  return this->hUuid;
103  }
104 
108  protected: bool UpdateThrottling()
109  {
110  if (!this->opts.Throttled())
111  return true;
112 
113  Timestamp now = std::chrono::steady_clock::now();
114 
115  // Elapsed time since the last callback execution.
116  auto elapsed = now - this->lastCbTimestamp;
117 
118  if (std::chrono::duration_cast<std::chrono::nanoseconds>(
119  elapsed).count() < this->periodNs)
120  {
121  return false;
122  }
123 
124  // Update the last callback execution.
125  this->lastCbTimestamp = now;
126  return true;
127  }
128 
130  protected: std::string hUuid;
131 
133  protected: SubscribeOptions opts;
134 
137 
140  protected: double periodNs;
141 
143  private: std::string nUuid;
144  };
145 
150  template <typename T> class SubscriptionHandler
151  : public ISubscriptionHandler
152  {
153  // Documentation inherited.
154  public: explicit SubscriptionHandler(const std::string &_nUuid,
155  const SubscribeOptions &_opts = SubscribeOptions())
156  : ISubscriptionHandler(_nUuid, _opts)
157  {
158  }
159 
160  // Documentation inherited.
161  public: const std::shared_ptr<ProtoMsg> CreateMsg(
162  const std::string &_data,
163  const std::string &/*_type*/) const
164  {
165  // Instantiate a specific protobuf message
166  auto msgPtr = std::make_shared<T>();
167 
168  // Create the message using some serialized data
169  if (!msgPtr->ParseFromString(_data))
170  {
171  std::cerr << "SubscriptionHandler::CreateMsg() error: ParseFromString"
172  << " failed" << std::endl;
173  }
174 
175  return msgPtr;
176  }
177 
178  // Documentation inherited.
179  public: std::string TypeName()
180  {
181  return T().GetTypeName();
182  }
183 
186  public: void SetCallback(const MsgCallback<T> &_cb)
187  {
188  this->cb = _cb;
189  }
190 
191  // Documentation inherited.
192  public: bool RunLocalCallback(const ProtoMsg &_msg,
193  const MessageInfo &_info)
194  {
195  // No callback stored.
196  if (!this->cb)
197  {
198  std::cerr << "SubscriptionHandler::RunLocalCallback() error: "
199  << "Callback is NULL" << std::endl;
200  return false;
201  }
202 
203  // Check the subscription throttling option.
204  if (!this->UpdateThrottling())
205  return true;
206 
207 #if GOOGLE_PROTOBUF_VERSION > 2999999
208  auto msgPtr = google::protobuf::down_cast<const T*>(&_msg);
209 #else
210  auto msgPtr = google::protobuf::internal::down_cast<const T*>(&_msg);
211 #endif
212 
213  this->cb(*msgPtr, _info);
214  return true;
215  }
216 
218  private: MsgCallback<T> cb;
219  };
220 
223  template <> class SubscriptionHandler<ProtoMsg>
224  : public ISubscriptionHandler
225  {
226  // Documentation inherited.
227  public: explicit SubscriptionHandler(const std::string &_nUuid,
228  const SubscribeOptions &_opts = SubscribeOptions())
229  : ISubscriptionHandler(_nUuid, _opts)
230  {
231  }
232 
233  // Documentation inherited.
234  public: const std::shared_ptr<ProtoMsg> CreateMsg(
235  const std::string &_data,
236  const std::string &_type) const
237  {
238  std::shared_ptr<google::protobuf::Message> msgPtr;
239 
240  const google::protobuf::Descriptor *desc =
241  google::protobuf::DescriptorPool::generated_pool()
242  ->FindMessageTypeByName(_type);
243 
244  // First, check if we have the descriptor from the generated proto
245  // classes.
246  if (desc)
247  {
248  msgPtr.reset(google::protobuf::MessageFactory::generated_factory()
249  ->GetPrototype(desc)->New());
250  }
251  else
252  {
253  // Fallback on Ignition Msgs if the message type is not found.
254  msgPtr = ignition::msgs::Factory::New(_type);
255  }
256 
257  if (!msgPtr)
258  return nullptr;
259 
260  // Create the message using some serialized data
261  if (!msgPtr->ParseFromString(_data))
262  {
263  std::cerr << "CreateMsg() error: ParseFromString failed" << std::endl;
264  return nullptr;
265  }
266 
267  return std::move(msgPtr);
268  }
269 
270  // Documentation inherited.
271  public: std::string TypeName()
272  {
273  return kGenericMessageType;
274  }
275 
278  public: void SetCallback(const MsgCallback<ProtoMsg> &_cb)
279  {
280  this->cb = _cb;
281  }
282 
283  // Documentation inherited.
284  public: bool RunLocalCallback(const ProtoMsg &_msg,
285  const MessageInfo &_info)
286  {
287  // No callback stored.
288  if (!this->cb)
289  {
290  std::cerr << "SubscriptionHandler::RunLocalCallback() "
291  << "error: Callback is NULL" << std::endl;
292  return false;
293  }
294 
295  // Check the subscription throttling option.
296  if (!this->UpdateThrottling())
297  return true;
298 
299  this->cb(_msg, _info);
300  return true;
301  }
302 
304  private: MsgCallback<ProtoMsg> cb;
305  };
306  }
307 }
308 
309 #endif
std::string NodeUuid() const
Get the node UUID.
Definition: SubscriptionHandler.hh:93
bool UpdateThrottling()
Check if message subscription is throttled.
Definition: SubscriptionHandler.hh:108
std::string HandlerUuid() const
Get the unique UUID of this handler.
Definition: SubscriptionHandler.hh:100
bool RunLocalCallback(const ProtoMsg &_msg, const MessageInfo &_info)
Executes the local callback registered for this handler.
Definition: SubscriptionHandler.hh:284
Timestamp lastCbTimestamp
Timestamp of the last callback executed.
Definition: SubscriptionHandler.hh:136
SubscriptionHandler(const std::string &_nUuid, const SubscribeOptions &_opts=SubscribeOptions())
Definition: SubscriptionHandler.hh:227
ISubscriptionHandler(const std::string &_nUuid, const SubscribeOptions &_opts=SubscribeOptions())
Constructor.
Definition: SubscriptionHandler.hh:54
void SetCallback(const MsgCallback< T > &_cb)
Set the callback for this handler.
Definition: SubscriptionHandler.hh:186
ignition/transport/SubscribeOptions.hh
Definition: SubscribeOptions.hh:35
double periodNs
If throttling is enabled, the minimum period for receiving a message in nanoseconds.
Definition: SubscriptionHandler.hh:140
std::function< void(const T &_msg, const MessageInfo &_info)> MsgCallback
Definition: TransportTypes.hh:125
SubscriptionHandler(const std::string &_nUuid, const SubscribeOptions &_opts=SubscribeOptions())
Definition: SubscriptionHandler.hh:154
SubscribeOptions opts
Subscribe options.
Definition: SubscriptionHandler.hh:133
std::string hUuid
Unique handler's UUID.
Definition: SubscriptionHandler.hh:130
A portable class for representing a Universally Unique Identifier.
Definition: Uuid.hh:41
A class that provides information about the message received.
Definition: MessageInfo.hh:33
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:65
std::string TypeName()
Get the type of the messages from which this subscriber handler is subscribed.
Definition: SubscriptionHandler.hh:179
const std::shared_ptr< ProtoMsg > CreateMsg(const std::string &_data, const std::string &) const
Create a specific protobuf message given its serialized data.
Definition: SubscriptionHandler.hh:161
#define IGNITION_TRANSPORT_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:57
const std::shared_ptr< ProtoMsg > CreateMsg(const std::string &_data, const std::string &_type) const
Create a specific protobuf message given its serialized data.
Definition: SubscriptionHandler.hh:234
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:150
std::string TypeName()
Get the type of the messages from which this subscriber handler is subscribed.
Definition: SubscriptionHandler.hh:271
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:129
const std::string kGenericMessageType
The string type used for generic messages.
Definition: TransportTypes.hh:132
bool RunLocalCallback(const ProtoMsg &_msg, const MessageInfo &_info)
Executes the local callback registered for this handler.
Definition: SubscriptionHandler.hh:192
ignition/transport/SubscriptionHandler.hh
Definition: SubscriptionHandler.hh:49
virtual ~ISubscriptionHandler()
Destructor.
Definition: SubscriptionHandler.hh:67
void SetCallback(const MsgCallback< ProtoMsg > &_cb)
Set the callback for this handler.
Definition: SubscriptionHandler.hh:278