3#include "debugtrace.hpp"
11using namespace std::literals::chrono_literals;
12using lck = std::scoped_lock<std::mutex>;
13using rte = std::runtime_error;
16using std::placeholders::_1;
19 std::queue<DaqData> _queue;
21 std::atomic<uint32_t> _contents{0};
29 assert(_contents == _queue.size());
34 throw rte(
"BUG: Pop on empty queue");
42 assert(_contents == _queue.size());
51 bool empty()
const {
return _contents == 0; }
62 _queue(std::make_unique<
SafeQueue>()), inCallback(cb) {
69 _thread_can_safely_run =
true;
70 _indatahandler.start();
73void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
76 std::scoped_lock
lck(_mtx);
79 if (!_thread_can_safely_run)
82 _queue->push(daqdata);
83 if (!_thread_running) {
84 DEBUGTRACE_PRINT(
"Pushing new thread in pool");
85 _pool.
push_task(&ThreadedInDataHandlerBase::threadFcn,
this);
92 _thread_can_safely_run =
false;
93 _indatahandler.stop();
95 std::scoped_lock
lck(_mtx);
98 while (_thread_running) {
99 std::this_thread::sleep_for(10us);
106 if (_thread_can_safely_run) {
108 cerr <<
"*** BUG: InDataHandlers have not been all stopped, while "
109 "StreamMgr destructor is called. This is a misuse BUG."
115void ThreadedInDataHandlerBase::threadFcn() {
118 _thread_running =
true;
120 while (!_queue->empty() && _thread_can_safely_run) {
123 inCallback(_queue->pop());
125 _thread_running =
false;
Data coming from / going to DAQ. Non-interleaved format, which means data in buffer is ordered by cha...
void push_task(F &&task, A &&...args)
Wrapper around BS::thread_pool::push_task(...)
void push(const DaqData &d)
bool empty() const
Empty implemented using atomic var, safes some mutex lock/unlock cycles.
Threaded in data handler base. Buffers inCallback data and calls a callback with the same signature o...
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset)
void startThread()
This method should be called from the derived class' constructor, to start the thread and data is inc...
~ThreadedInDataHandlerBase()
void stopThread()
This method SHOULD be called from all classes that derive on ThreadedInDataHandler....
std::function< void(const Daq *)> InResetType
Function definition for the reset callback.
std::function< void(const DaqData &)> InCallbackType
The function definition of callbacks with incoming DAQ data.
std::scoped_lock< std::mutex > lock
std::shared_ptr< StreamMgr > SmgrHandle
std::mutex _mtx
Global mutex, used to restrict the pool creation to a single thread at once.
std::scoped_lock< std::mutex > lck