26 #ifndef _EII_MSGBUS_HPP
27 #define _EII_MSGBUS_HPP
33 #include <condition_variable>
35 #include <eii/utils/thread_safe_queue.h>
117 const std::string topic,
user_data_t* user_data=NULL);
148 std::string service_name,
void* user_data=NULL);
201 template<
class Rep,
class Period>
203 const std::chrono::duration<Rep, Period>& timeout) {
204 int timeout_ms = std::chrono::milliseconds(timeout).count();
208 m_msgbus_ctx, m_recv_ctx, timeout_ms, &msg);
209 if (ret == MSG_RECV_NO_MESSAGE) {
211 }
else if (ret != MSG_SUCCESS) {
360 typedef eii::utils::ThreadSafeQueue<eii::msgbus::Serializable*>
MessageQueue;
375 std::atomic<bool> m_stop;
378 std::condition_variable& m_err_cv;
383 virtual void run() = 0;
407 virtual void start();
436 std::string m_service_name;
471 config_t* msgbus_config, std::condition_variable& err_cv,
473 std::string service_name);
496 std::chrono::milliseconds m_timeout;
499 std::string m_service_name;
511 LOG_DEBUG_0(
"Subscriber thread started");
516 utils::QueueRetCode ret_queue = utils::QueueRetCode::SUCCESS;
519 std::string subscriber_ts = m_service_name +
"_subscriber_ts";
520 std::string subscriber_exit_ts =
521 m_service_name +
"_subscriber_exit_ts";
522 std::string subscriber_blocked_ts =
523 m_service_name +
"_subscriber_blocked_ts";
526 while (!m_stop.load()) {
538 DO_PROFILING(this->m_profile, msg, subscriber_ts.c_str());
541 received =
new T(msg);
547 ret_queue = m_output_queue->push(received);
548 if (ret_queue == utils::QueueRetCode::QUEUE_FULL) {
549 ret_queue = m_output_queue->push_wait(received);
550 if(ret_queue != utils::QueueRetCode::SUCCESS) {
551 LOG_ERROR_0(
"Failed to enqueue received message, "
553 m_err_cv.notify_all();
566 }
catch (
const std::exception& ex) {
567 LOG_ERROR(
"Error in subscriber thread: %s", ex.what());
575 if (received != NULL) {
577 }
else if (msg != NULL) {
589 LOG_DEBUG_0(
"Subscriber thread stopped");
611 std::string service_name,
612 std::chrono::milliseconds timeout=
613 std::chrono::milliseconds(250)) :
616 static_assert(std::is_base_of<Serializable, T>::value,
617 "Template must be subclass of eii::msgbus::Serializable");
620 m_output_queue = output_queue;
622 m_service_name = service_name;
640 #endif // _EII_MSGBUS_HPP