EII Message Bus C Reference
msgbus.hpp
Go to the documentation of this file.
1 // Copyright (c) 2021 Intel Corporation.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to
5 // deal in the Software without restriction, including without limitation the
6 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 // sell copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in
11 // all copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 // FROM,OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 // IN THE SOFTWARE.
20 
26 #ifndef _EII_MSGBUS_HPP
27 #define _EII_MSGBUS_HPP
28 
29 #include <atomic>
30 #include <string>
31 #include <chrono>
32 #include <thread>
33 #include <condition_variable>
34 #include <eii/utils/profiling.h>
35 #include <eii/utils/thread_safe_queue.h>
36 #include "eii/msgbus/msgbus.h"
38 
39 namespace eii {
40 namespace msgbus {
41 
42 // Forward declarations
43 class ReceiveContext;
44 class Publisher;
45 class Service;
47 
55 
60 private:
61  // Underlying message bus context
62  void* m_msgbus_ctx;
63 
67  MsgbusContext(const MsgbusContext& src);
68 
72  MsgbusContext& operator=(const MsgbusContext& src);
73 
74 public:
82  MsgbusContext(config_t* config);
83 
88 
101  Publisher* new_publisher(const std::string topic);
102 
117  const std::string topic, user_data_t* user_data=NULL);
118 
132  Service* new_service(std::string service_name, void* user_data=NULL);
133 
148  std::string service_name, void* user_data=NULL);
149 };
150 
156 protected:
157  // Message bus context which the receive context belongs to
158  void* m_msgbus_ctx;
159 
160  // Internal EII Message Bus receive context
161  recv_ctx_t* m_recv_ctx;
162 
169  ReceiveContext(void* msgbus_ctx, recv_ctx_t* recv_ctx);
170 
171 public:
175  ~ReceiveContext();
176 
188 
201  template<class Rep, class Period>
203  const std::chrono::duration<Rep, Period>& timeout) {
204  int timeout_ms = std::chrono::milliseconds(timeout).count();
205  msg_envelope_t* msg = NULL;
206 
208  m_msgbus_ctx, m_recv_ctx, timeout_ms, &msg);
209  if (ret == MSG_RECV_NO_MESSAGE) {
210  return NULL;
211  } else if (ret != MSG_SUCCESS) {
212  throw MsgbusException(ret, "Failed to receive message");
213  }
214 
215  return new MsgEnvelope(msg);
216  }
217 
227 
239 
240 
241  // MsgbusContext class friend so it can call the private constructor
242  friend class MsgbusContext;
243 };
244 
248 class Service : public ReceiveContext {
249 private:
256  Service(void* msgbus_Ctx, recv_ctx_t* recv_ctx);
257 
258 public:
262  ~Service();
263 
275 
276  // MsgbusContext class friend so it can call the private constructor
277  friend class MsgbusContext;
278 };
279 
284 private:
291  ServiceRequester(void* msgbus_ctx, recv_ctx_t* recv_ctx);
292 
293 public:
298 
309  void request(MsgEnvelope* request);
310 
311  // MsgbusContext class friend so it can call the private constructor
312  friend class MsgbusContext;
313 };
314 
318 class Publisher {
319 private:
320  // Publisher's message bus context
321  void* m_msgbus_ctx;
322 
323  // Internal publisher context
324  publisher_ctx_t* m_pub_ctx;
325 
326 protected:
333  Publisher(void* msgbus_ctx, publisher_ctx_t* pub_ctx);
334 
335 public:
339  ~Publisher();
340 
351  void publish(MsgEnvelope* msg);
352 
353  // MsgbusContext class friend so it can call the private constructor
354  friend class MsgbusContext;
355 };
356 
360 typedef eii::utils::ThreadSafeQueue<eii::msgbus::Serializable*> MessageQueue;
361 
366 private:
367  // Publisher thread handler
368  std::thread* m_th;
369 
370 protected:
371  // Message bus context
372  MsgbusContext* m_ctx;
373 
374  // Stop flag
375  std::atomic<bool> m_stop;
376 
377  // Error condition variable to notify users of an error
378  std::condition_variable& m_err_cv;
379 
383  virtual void run() = 0;
384 
385 public:
395  BaseMsgbusThread(config_t* msgbus_config, std::condition_variable& err_cv);
396 
402  virtual ~BaseMsgbusThread();
403 
407  virtual void start();
408 
415  virtual void join();
416 
420  virtual void stop();
421 };
422 
428 private:
429  // Publisher context
430  Publisher* m_pub;
431 
432  // Input message queue
433  MessageQueue* m_input_queue;
434 
435  // AppName variable
436  std::string m_service_name;
437 
438  // Profiling variable
439  eii::utils::Profiling* m_profile;
440 
444  PublisherThread(const PublisherThread& src);
445 
446  PublisherThread& operator=(const PublisherThread& src);
447 
448 protected:
452  void run() override;
453 
454 public:
471  config_t* msgbus_config, std::condition_variable& err_cv,
472  std::string topic, MessageQueue* input_queue,
473  std::string service_name);
474 
479 };
480 
485 template<class T>
487 private:
488  // Subscriber context
489  ReceiveContext* m_recv_ctx;
490 
491  // Output message queue
492  MessageQueue* m_output_queue;
493 
494  // Timeout for time to wait between receiving and checking if the stop
495  // signal has been sent
496  std::chrono::milliseconds m_timeout;
497 
498  // AppName variable
499  std::string m_service_name;
500 
501  // Profiling variable
502  eii::utils::Profiling* m_profile;
503 
504  SubscriberThread& operator=(const SubscriberThread& src) { return *this; };
505 
506 protected:
510  void run() override {
511  LOG_DEBUG_0("Subscriber thread started");
512 
513  msg_envelope_t* msg = NULL;
514  MsgEnvelope* env = NULL;
515  T* received = NULL;
516  utils::QueueRetCode ret_queue = utils::QueueRetCode::SUCCESS;
517 
518  // Profiling related variables
519  std::string subscriber_ts = m_service_name + "_subscriber_ts";
520  std::string subscriber_exit_ts =
521  m_service_name + "_subscriber_exit_ts";
522  std::string subscriber_blocked_ts =
523  m_service_name + "_subscriber_blocked_ts";
524 
525  try {
526  while (!m_stop.load()) {
527  env = m_recv_ctx->recv_timedwait(m_timeout);
528  if (env == NULL) {
529  // Timeout...
530  continue;
531  }
532 
533  // Received message
534  msg = env->get_msg_envelope();
535 
536  // Add timestamp after message is received if profiling is
537  // enabled
538  DO_PROFILING(this->m_profile, msg, subscriber_ts.c_str());
539 
540  // Parse the message into user's provided object
541  received = new T(msg);
542 
543  // env is no longer needed after this point, delete it
544  delete env;
545  env = NULL;
546 
547  ret_queue = m_output_queue->push(received);
548  if (ret_queue == utils::QueueRetCode::QUEUE_FULL) {
549  ret_queue = m_output_queue->push_wait(received);
550  if(ret_queue != utils::QueueRetCode::SUCCESS) {
551  LOG_ERROR_0("Failed to enqueue received message, "
552  "message dropped");
553  m_err_cv.notify_all();
554  break;
555  } else {
556  // Dropping pointer to message here because the memory for
557  // the envelope is now owned by the received variable
558  msg = NULL;
559  received = NULL;
560  }
561  } else {
562  msg = NULL;
563  received = NULL;
564  }
565  }
566  } catch (const std::exception& ex) {
567  LOG_ERROR("Error in subscriber thread: %s", ex.what());
568  }
569 
570  // When the while loop exits, if received is not NULL then it has been
571  // initialized with the underlying msg_envelope_t and now owns the
572  // deletion of its memory. If msg is not NULL and received is NULL,
573  // then this thread owns the msg_envelope_t* structure and must free
574  // it.
575  if (received != NULL) {
576  delete received;
577  } else if (msg != NULL) {
579  }
580 
581  // Even if the above are true, if the MsgEnvelope (env) is not NULL it
582  // must be deleted. If received is NULL, msg is NULL, and env is not
583  // NULL then it currently owns a msg_envelope_t and will approprietly
584  // dispose of it.
585  if (env != NULL) {
586  delete env;
587  }
588 
589  LOG_DEBUG_0("Subscriber thread stopped");
590  };
591 
592 public:
609  SubscriberThread(config_t* msgbus_config, std::condition_variable& err_cv,
610  std::string topic, MessageQueue* output_queue,
611  std::string service_name,
612  std::chrono::milliseconds timeout=
613  std::chrono::milliseconds(250)) :
614  BaseMsgbusThread(msgbus_config, err_cv) {
615 
616  static_assert(std::is_base_of<Serializable, T>::value,
617  "Template must be subclass of eii::msgbus::Serializable");
618 
619  m_recv_ctx = m_ctx->new_subscriber(topic);
620  m_output_queue = output_queue;
621  m_timeout = timeout;
622  m_service_name = service_name;
623  m_profile = new eii::utils::Profiling();
624  };
625 
630  this->stop();
631  delete m_recv_ctx;
632  delete m_ctx;
633  delete m_profile;
634  };
635 };
636 
637 } // namespace msgbus
638 } // namespace eii
639 
640 #endif // _EII_MSGBUS_HPP
eii::msgbus::PublisherThread
Definition: msgbus.hpp:427
eii::msgbus::Publisher::publish
void publish(MsgEnvelope *msg)
eii::msgbus::SubscriberThread::SubscriberThread
SubscriberThread(config_t *msgbus_config, std::condition_variable &err_cv, std::string topic, MessageQueue *output_queue, std::string service_name, std::chrono::milliseconds timeout=std::chrono::milliseconds(250))
Definition: msgbus.hpp:609
eii::msgbus::MsgbusContext::new_publisher
Publisher * new_publisher(const std::string topic)
user_data_t
Definition: msgbus.h:43
eii::msgbus::ServiceRequester::~ServiceRequester
~ServiceRequester()
eii::msgbus::BaseMsgbusThread::start
virtual void start()
eii::msgbus::ReceiveContext::ReceiveContext
ReceiveContext(void *msgbus_ctx, recv_ctx_t *recv_ctx)
msgbus_recv_timedwait
msgbus_ret_t msgbus_recv_timedwait(void *ctx, recv_ctx_t *recv_ctx, int timeout, msg_envelope_t **message)
eii::msgbus::ReceiveContext::recv_wait
MsgEnvelope * recv_wait()
eii::msgbus::MessageQueue
eii::utils::ThreadSafeQueue< eii::msgbus::Serializable * > MessageQueue
Definition: msgbus.hpp:360
eii::msgbus::MsgEnvelope
Definition: msg_envelope.hpp:203
eii::msgbus::BaseMsgbusThread::join
virtual void join()
eii::msgbus::SubscriberThread::~SubscriberThread
~SubscriberThread()
Definition: msgbus.hpp:629
eii::msgbus::MsgbusContext::get_service
ServiceRequester * get_service(std::string service_name, void *user_data=NULL)
msgbus_ret_t
msgbus_ret_t
Definition: msgbusret.h:36
eii::msgbus::PublisherThread::run
void run() override
msg_envelope_t
Definition: msg_envelope.h:111
eii::msgbus::MsgbusContext::new_service
Service * new_service(std::string service_name, void *user_data=NULL)
eii::msgbus::Service::~Service
~Service()
eii::msgbus::BaseMsgbusThread::BaseMsgbusThread
BaseMsgbusThread(config_t *msgbus_config, std::condition_variable &err_cv)
publisher_ctx_t
void * publisher_ctx_t
Definition: msgbus.h:70
eii::msgbus::MsgEnvelope::get_msg_envelope
msg_envelope_t * get_msg_envelope()
eii::msgbus::Service::response
void response(MsgEnvelope *response)
eii::msgbus::PublisherThread::~PublisherThread
~PublisherThread()
eii::msgbus::SubscriberThread::run
void run() override
Definition: msgbus.hpp:510
msgbus.h
Messaging abstraction interface.
eii::msgbus::SubscriberThread
Definition: msgbus.hpp:486
eii::msgbus::BaseMsgbusThread::stop
virtual void stop()
eii::msgbus::BaseMsgbusThread::run
virtual void run()=0
eii::utils::Profiling
Definition: profiling.h:38
eii::msgbus::MsgbusException
Definition: msg_envelope.hpp:46
msg_envelope.hpp
MsgEnvelope interface.
eii::msgbus::Publisher::~Publisher
~Publisher()
eii::msgbus::ReceiveContext::recv_nowait
MsgEnvelope * recv_nowait()
eii::msgbus::ReceiveContext::recv_timedwait
MsgEnvelope * recv_timedwait(const std::chrono::duration< Rep, Period > &timeout)
Definition: msgbus.hpp:202
eii::msgbus::ReceiveContext
Definition: msgbus.hpp:155
eii::msgbus::MsgbusContext::~MsgbusContext
~MsgbusContext()
eii::msgbus::Publisher
Definition: msgbus.hpp:318
eii::msgbus::MsgbusContext
Definition: msgbus.hpp:59
eii::msgbus::ReceiveContext::get_user_data
user_data_t * get_user_data()
eii::msgbus::Service
Definition: msgbus.hpp:248
msgbus_msg_envelope_destroy
void msgbus_msg_envelope_destroy(msg_envelope_t *msg)
eii::msgbus::BaseMsgbusThread::~BaseMsgbusThread
virtual ~BaseMsgbusThread()
eii::msgbus::BaseMsgbusThread
Definition: msgbus.hpp:365
profiling.h
C++ Profiling Library.
recv_ctx_t
Definition: msgbus.h:52
eii::msgbus::ServiceRequester::request
void request(MsgEnvelope *request)
eii::msgbus::Publisher::Publisher
Publisher(void *msgbus_ctx, publisher_ctx_t *pub_ctx)
eii::msgbus::ReceiveContext::~ReceiveContext
~ReceiveContext()
eii::msgbus::ServiceRequester
Definition: msgbus.hpp:283
eii::msgbus::MsgbusContext::new_subscriber
Subscriber * new_subscriber(const std::string topic, user_data_t *user_data=NULL)