LASP 1.0
Library for Acoustic Signal Processing
Loading...
Searching...
No Matches
lasp_threadedindatahandler.cpp
Go to the documentation of this file.
1/* #define DEBUGTRACE_ENABLED */
3#include "debugtrace.hpp"
4#include "lasp_daqdata.h"
5#include "lasp_thread.h"
6#include <future>
7#include <optional>
8#include <queue>
9#include <thread>
10
11using namespace std::literals::chrono_literals;
12using lck = std::scoped_lock<std::mutex>;
13using rte = std::runtime_error;
14using std::cerr;
15using std::endl;
16using std::placeholders::_1;
17
18class SafeQueue {
19 std::queue<DaqData> _queue;
20 std::mutex _mtx;
21 std::atomic<uint32_t> _contents{0};
22
23public:
24 void push(const DaqData &d) {
25 DEBUGTRACE_ENTER;
26 lck lock(_mtx);
27 _queue.push(d);
28 _contents++;
29 assert(_contents == _queue.size());
30 }
32 DEBUGTRACE_ENTER;
33 if (empty()) {
34 throw rte("BUG: Pop on empty queue");
35 }
36 lck lock(_mtx);
37
38 /* DaqData d(std::move(_queue.front())); */
39 DaqData d(_queue.front());
40 _queue.pop();
41 _contents--;
42 assert(_contents == _queue.size());
43 return d;
44 }
51 bool empty() const { return _contents == 0; }
52};
53
56 InResetType reset)
57 : _indatahandler(
58 mgr,
59 std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
60 _1),
61 reset),
62 _queue(std::make_unique<SafeQueue>()), inCallback(cb) {
63
64 DEBUGTRACE_ENTER;
65
66}
68 DEBUGTRACE_ENTER;
69 _thread_can_safely_run = true;
70 _indatahandler.start();
71}
72
73void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
74 const DaqData &daqdata) {
75 DEBUGTRACE_ENTER;
76 std::scoped_lock lck(_mtx);
77
78 // Early return in case object is under DESTRUCTION
79 if (!_thread_can_safely_run)
80 return;
81
82 _queue->push(daqdata);
83 if (!_thread_running) {
84 DEBUGTRACE_PRINT("Pushing new thread in pool");
85 _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
86 }
87}
88
90 DEBUGTRACE_ENTER;
91 // Make sure inCallback is no longer called
92 _thread_can_safely_run = false;
93 _indatahandler.stop();
94
95 std::scoped_lock lck(_mtx);
96
97 // Then wait in steps for the thread to stop running.
98 while (_thread_running) {
99 std::this_thread::sleep_for(10us);
100 }
101}
102
104
105 DEBUGTRACE_ENTER;
106 if (_thread_can_safely_run) {
107 stopThread();
108 cerr << "*** BUG: InDataHandlers have not been all stopped, while "
109 "StreamMgr destructor is called. This is a misuse BUG."
110 << endl;
111 abort();
112 }
113}
114
115void ThreadedInDataHandlerBase::threadFcn() {
116
117 DEBUGTRACE_ENTER;
118 _thread_running = true;
119
120 while (!_queue->empty() && _thread_can_safely_run) {
121
122 // Call inCallback_threaded
123 inCallback(_queue->pop());
124 }
125 _thread_running = false;
126}
Data coming from / going to DAQ. Non-interleaved format, which means data in buffer is ordered by cha...
void push_task(F &&task, A &&...args)
Wrapper around BS::thread_pool::push_task(...)
Definition lasp_thread.h:38
void push(const DaqData &d)
bool empty() const
Empty implemented using atomic var, safes some mutex lock/unlock cycles.
Threaded in data handler base. Buffers inCallback data and calls a callback with the same signature o...
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset)
void startThread()
This method should be called from the derived class' constructor, to start the thread and data is inc...
void stopThread()
This method SHOULD be called from all classes that derive on ThreadedInDataHandler....
std::runtime_error rte
Definition lasp_daq.cpp:16
std::function< void(const Daq *)> InResetType
Function definition for the reset callback.
std::function< void(const DaqData &)> InCallbackType
The function definition of callbacks with incoming DAQ data.
std::scoped_lock< std::mutex > lock
std::shared_ptr< StreamMgr > SmgrHandle
std::mutex _mtx
Global mutex, used to restrict the pool creation to a single thread at once.
std::scoped_lock< std::mutex > lck
std::runtime_error rte