LASP 1.0
Library for Acoustic Signal Processing
Loading...
Searching...
No Matches
lasp_streammgr.cpp
Go to the documentation of this file.
1/* #define DEBUGTRACE_ENABLED */
2#include "lasp_streammgr.h"
3#include "debugtrace.hpp"
4#include "lasp_biquadbank.h"
6#include "lasp_thread.h"
7#include <algorithm>
8#include <assert.h>
9#include <functional>
10#include <iostream>
11#include <memory>
12#include <mutex>
13
14using std::cerr;
15using std::endl;
16using rte = std::runtime_error;
17
24std::weak_ptr<StreamMgr> _mgr;
25std::mutex _mgr_mutex;
26
27using Lck = std::scoped_lock<std::mutex>;
28
36 DEBUGTRACE_ENTER;
37
38 auto mgr = _mgr.lock();
39 if (!mgr) {
40 // Double Check Locking Pattern, if two threads would simultaneously
41 // instantiate the singleton instance.
43
44 auto mgr = _mgr.lock();
45 if (mgr) {
46 return mgr;
47 }
48
49 mgr = SmgrHandle(new StreamMgr());
50 if (!mgr) {
51 throw rte("Fatal: could not allocate stream manager!");
52 }
53 // Update global weak pointer
54 _mgr = mgr;
55 return mgr;
56 }
57#if LASP_DEBUG == 1
58 // Make sure we never ask for a new SmgrHandle from a different thread.
59 assert(std::this_thread::get_id() == mgr->main_thread_id);
60#endif
61
62 return mgr;
63}
64
65StreamMgr::StreamMgr()
66#if LASP_DEBUG == 1
67 : main_thread_id(std::this_thread::get_id())
68#endif
69{
70 DEBUGTRACE_ENTER;
71 // Trigger a scan for the available devices, in the background.
72 rescanDAQDevices(true);
73}
74#if LASP_DEBUG == 1
75void StreamMgr::checkRightThread() const {
76 assert(std::this_thread::get_id() == main_thread_id);
77}
78#endif
79
80void StreamMgr::rescanDAQDevices(bool background,
81 std::function<void()> callback) {
82 DEBUGTRACE_ENTER;
83 DEBUGTRACE_PRINT(background);
84
85 checkRightThread();
86 if (_inputStream || _outputStream) {
87 throw rte("Rescanning DAQ devices only possible when no stream is running");
88 }
89 if (!_devices_mtx.try_lock()) {
90 throw rte("A background DAQ device scan is probably already running");
91 }
92 _devices_mtx.unlock();
93
94 std::scoped_lock lck(_devices_mtx);
95 _devices.clear();
96 if (!background) {
97 rescanDAQDevices_impl(callback);
98 } else {
99 DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
100 _pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
101 }
102}
103void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
104 DEBUGTRACE_ENTER;
105 std::scoped_lock lck(_devices_mtx);
106 _devices = DeviceInfo::getDeviceInfo();
107 if (callback) {
108 callback();
109 }
110}
111void StreamMgr::inCallback(const DaqData &data) {
112
113 DEBUGTRACE_ENTER;
114
115 std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
116
117 assert(_inputFilters.size() == data.nchannels);
118
119 if (std::count_if(_inputFilters.cbegin(), _inputFilters.cend(),
120 [](const auto &a) { return bool(a); }) > 0) {
121
124
125 DaqData input_filtered(data.nframes, data.nchannels, data.dtype);
126
127 for (us ch = 0; ch < data.nchannels; ch++) {
128 if (_inputFilters[ch]) {
129 DEBUGTRACE_PRINT("Filter ch:");
130 DEBUGTRACE_PRINT(ch);
131 vd inout = data.toFloat(ch);
132 _inputFilters[ch]->filter(inout);
133 input_filtered.fromFloat(ch, inout);
134 } else {
135 DEBUGTRACE_PRINT("No filter ch:");
136 DEBUGTRACE_PRINT(ch);
137 input_filtered.copyInFromRaw(ch, data.raw_ptr(0, ch));
138 }
139 }
140
141 for (auto &handler : _inDataHandlers) {
142 handler->inCallback(input_filtered);
143 }
144
145 } else {
147 for (auto &handler : _inDataHandlers) {
148 handler->inCallback(data);
149 }
150 }
151}
152
153void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
154
155 DEBUGTRACE_ENTER;
156 checkRightThread();
157
158 std::scoped_lock<std::mutex> lck(_siggen_mtx);
159
160 // If not set to nullptr, and a stream is running, we update the signal
161 // generator by resetting it.
162 if (isStreamRunningOK(StreamType::output) && siggen) {
163 const Daq *daq = getDaq(StreamType::output);
164 assert(daq != nullptr);
165 // Reset the signal generator.
166 siggen->reset(daq->samplerate());
167 }
168 _siggen = siggen;
169}
170
171#define DEBUG_FILLDATA 0
182template <typename T> bool fillData(DaqData &data, const vd &signal) {
183 /* DEBUGTRACE_ENTER; */
184 assert(data.nframes == signal.size());
185
186 T *res = reinterpret_cast<T *>(data.raw_ptr());
187 if (std::is_floating_point<T>()) {
188 for (us ch = 0; ch < data.nchannels; ch++) {
189 for (us frame = 0; frame < data.nframes; frame++) {
190#if DEBUG_FILLDATA == 1
191 DEBUGTRACE_PRINT("SLOW flt");
192 data.setSlow(frame, ch,
193 reinterpret_cast<const int8_t *>(&signal[frame]));
194#else
195 res[ch * data.nframes + frame] = signal[frame];
196#endif
197 }
198 }
199 } else {
200 for (us ch = 0; ch < data.nchannels; ch++) {
201 for (us frame = 0; frame < data.nframes; frame++) {
202 const T val = (signal[frame] * std::numeric_limits<T>::max());
203#if DEBUG_FILLDATA == 1
204 data.setSlow(frame, ch, reinterpret_cast<const int8_t *>(&val));
205#else
206 res[ch * data.nframes + frame] = val;
207#endif
208 }
209 }
210 }
211
212 return true;
213}
214void StreamMgr::outCallback(DaqData &data) {
215
216 /* DEBUGTRACE_ENTER; */
217
218 std::scoped_lock<std::mutex> lck(_siggen_mtx);
219
220 if (_siggen) {
221 vd signal = _siggen->genSignal(data.nframes);
222 switch (data.dtype) {
224 fillData<float>(data, signal);
225 break;
227 fillData<double>(data, signal);
228 break;
230 fillData<int8_t>(data, signal);
231 break;
233 fillData<int16_t>(data, signal);
234 break;
236 fillData<int32_t>(data, signal);
237 break;
238 }
239 } else {
240 // Set all values to 0.
241 std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
242 }
243}
244
246 DEBUGTRACE_ENTER;
247 checkRightThread();
248 // Stream manager now handled by shared pointer. Each indata handler gets a
249 // shared pointer to the stream manager, and stores a weak pointer to it.
250 // Hence, we do not have to do any cleanup here. It also makes sure that the
251 // order in which destructors are called does not matter anymore. As soon as
252 // the stream manager is destructed, the weak pointers loose there ref, and do
253 // not have to removeInDataHandler() anymore.
254
255 // Stop the streams in this phase, otherwise it might happen during the
256 // destruction of the Siggen, in which case we might get calls to pure
257 // virtual methods. This was really a bug.
258 _inputStream.reset();
259 _outputStream.reset();
260
261
262}
264 DEBUGTRACE_ENTER;
265 checkRightThread();
266 _inputStream.reset();
267 _outputStream.reset();
268}
269
271 DEBUGTRACE_ENTER;
272 checkRightThread();
273
274 bool isInput = std::count_if(config.inchannel_config.cbegin(),
275 config.inchannel_config.cend(),
276 [](auto &i) { return i.enabled; });
277
278 bool isOutput = std::count_if(config.outchannel_config.cbegin(),
279 config.outchannel_config.cend(),
280 [](auto &i) { return i.enabled; });
281
282 // Find the first device that matches with the configuration
283 std::scoped_lock lck(_devices_mtx);
284
285 DeviceInfo *devinfo = nullptr;
286
287 // Match configuration to a device in the list of devices
288 for (auto &devinfoi : _devices) {
289 if (config.match(*devinfoi)) {
290 devinfo = devinfoi.get();
291 break;
292 }
293 }
294 if (devinfo == nullptr) {
295 throw rte("Could not find a device with name " + config.device_name +
296 " in list of devices.");
297 }
298
299 isInput |= (config.monitorOutput && devinfo->hasInternalOutputMonitor);
300 DEBUGTRACE_PRINT(isInput);
301
302 bool isDuplex = isInput && isOutput;
303
304 if (!isInput && !isOutput) {
305 throw rte("Neither input, nor output channels enabled for "
306 "stream. Cannot start.");
307 }
308
309 if (isInput && _inputStream) {
310 throw rte("Error: an input stream is already running. Please "
311 "first stop existing stream");
312 } else if (isOutput && _outputStream) {
313 throw rte("Error: output stream is already running. Please "
314 "first stop existing stream");
315 } else if (_inputStream) {
316 if (_inputStream->duplexMode() && isOutput) {
317 throw rte("Error: output stream is already running (in duplex mode). "
318 "Please "
319 "first stop existing stream");
320 }
321 }
322
323 if (_outputStream && isInput && _outputStream->duplexModeForced &&
324 config.match(*_outputStream)) {
325 throw rte("This device is already opened for output. If input is also "
326 "required, please enable duplex mode for this device");
327 }
328
329 if (_inputStream && isOutput && _inputStream->duplexModeForced &&
330 config.match(*_inputStream)) {
331 throw rte("This device is already opened for input. If output is also "
332 "required, please enable duplex mode for this device");
333 }
334
335 InDaqCallback inCallback;
336 OutDaqCallback outCallback;
337
338 using namespace std::placeholders;
339 std::unique_ptr<Daq> daq = Daq::createDaq(*devinfo, config);
340
341 assert(daq);
342
343 if (isInput) {
345 inCallback = std::bind(&StreamMgr::inCallback, this, _1);
346
348 for (auto &handler : _inDataHandlers) {
349 handler->reset(daq.get());
350 }
351
352 d fs = daq->samplerate();
354 _inputFilters.clear();
355
358 if (config.monitorOutput && devinfo->hasInternalOutputMonitor) {
359 _inputFilters.push_back(nullptr);
360 }
361
362 for (auto &ch : daq->inchannel_config) {
363 if (ch.enabled) {
364 if (ch.digitalHighPassCutOn < 0) {
365 _inputFilters.push_back(nullptr);
366 } else if (ch.digitalHighPassCutOn == 0) {
367 throw rte("Digital highpass cuton should be > 0 if activated");
368 } else {
369 // Put in a digital high-pass filter.
370 _inputFilters.emplace_back(std::make_unique<SeriesBiquad>(
371 SeriesBiquad::firstOrderHighPass(fs, ch.digitalHighPassCutOn)));
372 }
373 }
374 } // End of input filter creation
375 }
376
377 if (isOutput) {
379 outCallback = std::bind(&StreamMgr::outCallback, this, _1);
380
382 if (_siggen) {
383 DEBUGTRACE_PRINT("Resetting _siggen with new samplerate of ");
384 DEBUGTRACE_PRINT(daq->samplerate());
385 _siggen->reset(daq->samplerate());
386 }
387 }
388
391 daq->start(inCallback, outCallback);
392
393 // Move daq ptr to right place
394 if (isInput) {
395 _inputStream = std::move(daq);
396 } else {
397 _outputStream = std::move(daq);
398 }
399}
401
402 DEBUGTRACE_ENTER;
403 checkRightThread();
404
405 if (t == StreamType::input) {
406 if (!_inputStream) {
407 throw rte("Input stream is not running");
408 }
410 _inputStream.reset();
412 for (auto &handler : _inDataHandlers) {
413 handler->reset(nullptr);
414 }
415 } else {
417
419 if (_inputStream && _inputStream->duplexMode()) {
420 _inputStream.reset();
421 } else {
422 if (!_outputStream) {
423 throw rte("Output stream is not running");
424 }
425 _outputStream.reset();
426 } // end else
427 }
428}
429
430void StreamMgr::addInDataHandler(InDataHandler *handler) {
431 DEBUGTRACE_ENTER;
432 checkRightThread();
433 assert(handler);
434 std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
435 handler->reset(_inputStream.get());
436
437 if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
438 _inDataHandlers.cend()) {
439 throw std::runtime_error("Error: handler already added. Probably start() "
440 "is called more than once on a handler object");
441 }
442 _inDataHandlers.push_back(handler);
443 DEBUGTRACE_PRINT(_inDataHandlers.size());
444}
445
446void StreamMgr::removeInDataHandler(InDataHandler &handler) {
447 DEBUGTRACE_ENTER;
448 checkRightThread();
449 std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
450 _inDataHandlers.remove(&handler);
451
452 DEBUGTRACE_PRINT(_inDataHandlers.size());
453}
454
456 /* DEBUGTRACE_ENTER; */
457
458 checkRightThread();
459 // Default constructor, says stream is not running, but also no errors
460
461 const Daq *daq = getDaq(type);
462 if (daq) {
463 return daq->getStreamStatus();
464 } else {
465 return Daq::StreamStatus();
466 }
467}
468
469const Daq *StreamMgr::getDaq(StreamType type) const {
470
471 checkRightThread();
472
473 if (type == StreamType::input) {
474 return _inputStream.get();
475 } else {
476 // Output stream. If input runs in duplex mode, this is also the output
477 // stream. In that case, we return the input stream
478 if (_inputStream && _inputStream->duplexMode()) {
479 return _inputStream.get();
480 } else {
481 return _outputStream.get();
482 }
483 }
484}
Information regarding a stream.
Definition lasp_daq.h:39
Configuration of a DAQ device.
std::vector< DaqChannel > inchannel_config
Channel configuration for input channels.
bool monitorOutput
If set to true and if the device has this capability, the output channels are added as input channels...
string device_name
The internal device name this DAQ configuration applies to.
std::vector< DaqChannel > outchannel_config
Channel configuration for output channels.
bool match(const DeviceInfo &devinfo) const
Check to see whether the DAQ configuration matches with the device. This means, some basic checks are...
Data coming from / going to DAQ. Non-interleaved format, which means data in buffer is ordered by cha...
arma::Mat< d > toFloat() const
Convert samples to floating point values and return a nframes x nchannels array of floats....
DataTypeDescriptor::DataType dtype
The data type corresponding to a sample.
byte_t * raw_ptr(const us frame=0, const us channel=0)
Return pointer to the raw data corresponding to a certain sample (frame, channel combo).
us nchannels
The number of channels.
us size_bytes() const
Return the total number of bytes.
us nframes
The number of frames in this block of data.
Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a custom device,...
Definition lasp_daq.h:29
static std::unique_ptr< Daq > createDaq(const DeviceInfo &devinfo, const DaqConfiguration &config)
Create a Daq based on given device info and configuration.
Definition lasp_daq.cpp:20
double samplerate() const
Returns current sample rate.
Definition lasp_daq.cpp:78
virtual StreamStatus getStreamStatus() const =0
Get stream status corresponding to current DAQ.
Structure containing device info parameters.
static DeviceInfoList getDeviceInfo()
Create a list of DeviceInfo's that are at call time avalable.
bool hasInternalOutputMonitor
Whether the device has an internal monitor of the output signal. If true, the device is able to monit...
void push_task(F &&task, A &&...args)
Wrapper around BS::thread_pool::push_task(...)
Definition lasp_thread.h:38
const InResetType reset
static SeriesBiquad firstOrderHighPass(const d fs, const d cuton_Hz)
Create a SeriesBiquad object for a first order high-pass filter.
Stream manager. Used to manage the input and output streams. Implemented as a singleton: only one str...
static std::shared_ptr< StreamMgr > getInstance()
Get access to stream manager instance.
Daq::StreamStatus getStreamStatus(const StreamType type) const
Get the streamstatus object corresponding to a given stream.
void rescanDAQDevices(bool background=false, std::function< void()> callback=std::function< void()>())
Triggers a background scan of the DAQ devices, which updates the internally stored list of devices....
void startStream(const DaqConfiguration &config)
Start a stream based on given configuration.
const Daq * getDaq(StreamType type) const
Get DAQ pointer for a given stream. Gives a nullptr if stream is not running.
void setSiggen(std::shared_ptr< Siggen > s)
Set active signal generator for output streams. Only one ‘Siggen’ is active at the same time....
void stopAllStreams()
Stop and delete all streams. Also called on destruction of the StreamMgr.
@ output
Output stream.
@ input
Input stream.
void stopStream(const StreamType stype)
Stop stream of given type (input / output/ duplex);.
bool isStreamRunningOK(const StreamType type) const
Check if a certain stream is running. If running with no errors, it returns true. If an error occured...
std::runtime_error rte
Definition lasp_daq.cpp:16
std::mutex _mgr_mutex
std::weak_ptr< StreamMgr > _mgr
The main global handle to a stream, stored in a weak pointer, if it does not yet exist,...
std::scoped_lock< std::mutex > Lck
bool fillData(DaqData &data, const vd &signal)
Converts from double precision floating point to output signal in non-interleaving format.
std::function< void(DaqData &)> OutDaqCallback
Definition lasp_daq.h:23
std::function< void(const DaqData &)> InDaqCallback
Definition lasp_daq.h:18
std::shared_ptr< StreamMgr > SmgrHandle
arma::Col< d > vd
std::scoped_lock< std::mutex > lck
size_t us
We often use boolean values.
Definition lasp_types.h:29