3#include "debugtrace.hpp"
16using rte = std::runtime_error;
24std::weak_ptr<StreamMgr>
_mgr;
27using Lck = std::scoped_lock<std::mutex>;
38 auto mgr =
_mgr.lock();
44 auto mgr =
_mgr.lock();
51 throw rte(
"Fatal: could not allocate stream manager!");
59 assert(std::this_thread::get_id() == mgr->main_thread_id);
67 : main_thread_id(std::this_thread::get_id())
72 rescanDAQDevices(
true);
75void StreamMgr::checkRightThread()
const {
76 assert(std::this_thread::get_id() == main_thread_id);
81 std::function<
void()> callback) {
83 DEBUGTRACE_PRINT(background);
86 if (_inputStream || _outputStream) {
87 throw rte(
"Rescanning DAQ devices only possible when no stream is running");
89 if (!_devices_mtx.try_lock()) {
90 throw rte(
"A background DAQ device scan is probably already running");
92 _devices_mtx.unlock();
94 std::scoped_lock
lck(_devices_mtx);
97 rescanDAQDevices_impl(callback);
99 DEBUGTRACE_PRINT(
"Rescanning DAQ devices on different thread...");
100 _pool.
push_task(&StreamMgr::rescanDAQDevices_impl,
this, callback);
103void StreamMgr::rescanDAQDevices_impl(std::function<
void()> callback) {
105 std::scoped_lock
lck(_devices_mtx);
111void StreamMgr::inCallback(
const DaqData &data) {
115 std::scoped_lock<std::mutex>
lck(_inDataHandler_mtx);
117 assert(_inputFilters.size() == data.
nchannels);
119 if (std::count_if(_inputFilters.cbegin(), _inputFilters.cend(),
120 [](
const auto &a) { return bool(a); }) > 0) {
128 if (_inputFilters[ch]) {
129 DEBUGTRACE_PRINT(
"Filter ch:");
130 DEBUGTRACE_PRINT(ch);
132 _inputFilters[ch]->filter(inout);
133 input_filtered.fromFloat(ch, inout);
135 DEBUGTRACE_PRINT(
"No filter ch:");
136 DEBUGTRACE_PRINT(ch);
137 input_filtered.copyInFromRaw(ch, data.
raw_ptr(0, ch));
141 for (
auto &handler : _inDataHandlers) {
142 handler->inCallback(input_filtered);
147 for (
auto &handler : _inDataHandlers) {
148 handler->inCallback(data);
158 std::scoped_lock<std::mutex>
lck(_siggen_mtx);
164 assert(daq !=
nullptr);
171#define DEBUG_FILLDATA 0
184 assert(data.
nframes == signal.size());
186 T *res =
reinterpret_cast<T *
>(data.
raw_ptr());
187 if (std::is_floating_point<T>()) {
189 for (
us frame = 0; frame < data.
nframes; frame++) {
190#if DEBUG_FILLDATA == 1
191 DEBUGTRACE_PRINT(
"SLOW flt");
192 data.setSlow(frame, ch,
193 reinterpret_cast<const int8_t *
>(&signal[frame]));
195 res[ch * data.
nframes + frame] = signal[frame];
201 for (
us frame = 0; frame < data.
nframes; frame++) {
202 const T val = (signal[frame] * std::numeric_limits<T>::max());
203#if DEBUG_FILLDATA == 1
204 data.setSlow(frame, ch,
reinterpret_cast<const int8_t *
>(&val));
206 res[ch * data.
nframes + frame] = val;
214void StreamMgr::outCallback(
DaqData &data) {
218 std::scoped_lock<std::mutex>
lck(_siggen_mtx);
221 vd signal = _siggen->genSignal(data.
nframes);
222 switch (data.
dtype) {
224 fillData<float>(data, signal);
227 fillData<double>(data, signal);
230 fillData<int8_t>(data, signal);
233 fillData<int16_t>(data, signal);
236 fillData<int32_t>(data, signal);
258 _inputStream.reset();
259 _outputStream.reset();
266 _inputStream.reset();
267 _outputStream.reset();
276 [](
auto &i) { return i.enabled; });
280 [](
auto &i) { return i.enabled; });
283 std::scoped_lock
lck(_devices_mtx);
288 for (
auto &devinfoi : _devices) {
289 if (config.
match(*devinfoi)) {
290 devinfo = devinfoi.get();
294 if (devinfo ==
nullptr) {
295 throw rte(
"Could not find a device with name " + config.
device_name +
296 " in list of devices.");
300 DEBUGTRACE_PRINT(isInput);
302 bool isDuplex = isInput && isOutput;
304 if (!isInput && !isOutput) {
305 throw rte(
"Neither input, nor output channels enabled for "
306 "stream. Cannot start.");
309 if (isInput && _inputStream) {
310 throw rte(
"Error: an input stream is already running. Please "
311 "first stop existing stream");
312 }
else if (isOutput && _outputStream) {
313 throw rte(
"Error: output stream is already running. Please "
314 "first stop existing stream");
315 }
else if (_inputStream) {
316 if (_inputStream->duplexMode() && isOutput) {
317 throw rte(
"Error: output stream is already running (in duplex mode). "
319 "first stop existing stream");
323 if (_outputStream && isInput && _outputStream->duplexModeForced &&
324 config.
match(*_outputStream)) {
325 throw rte(
"This device is already opened for output. If input is also "
326 "required, please enable duplex mode for this device");
329 if (_inputStream && isOutput && _inputStream->duplexModeForced &&
330 config.
match(*_inputStream)) {
331 throw rte(
"This device is already opened for input. If output is also "
332 "required, please enable duplex mode for this device");
338 using namespace std::placeholders;
345 inCallback = std::bind(&StreamMgr::inCallback,
this, _1);
348 for (
auto &handler : _inDataHandlers) {
349 handler->reset(daq.get());
352 d fs = daq->samplerate();
354 _inputFilters.clear();
359 _inputFilters.push_back(
nullptr);
362 for (
auto &ch : daq->inchannel_config) {
364 if (ch.digitalHighPassCutOn < 0) {
365 _inputFilters.push_back(
nullptr);
366 }
else if (ch.digitalHighPassCutOn == 0) {
367 throw rte(
"Digital highpass cuton should be > 0 if activated");
370 _inputFilters.emplace_back(std::make_unique<SeriesBiquad>(
379 outCallback = std::bind(&StreamMgr::outCallback,
this, _1);
383 DEBUGTRACE_PRINT(
"Resetting _siggen with new samplerate of ");
384 DEBUGTRACE_PRINT(daq->samplerate());
385 _siggen->reset(daq->samplerate());
391 daq->start(inCallback, outCallback);
395 _inputStream = std::move(daq);
397 _outputStream = std::move(daq);
407 throw rte(
"Input stream is not running");
410 _inputStream.reset();
412 for (
auto &handler : _inDataHandlers) {
413 handler->reset(
nullptr);
419 if (_inputStream && _inputStream->duplexMode()) {
420 _inputStream.reset();
422 if (!_outputStream) {
423 throw rte(
"Output stream is not running");
425 _outputStream.reset();
434 std::scoped_lock<std::mutex>
lck(_inDataHandler_mtx);
435 handler->
reset(_inputStream.get());
437 if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
438 _inDataHandlers.cend()) {
439 throw std::runtime_error(
"Error: handler already added. Probably start() "
440 "is called more than once on a handler object");
442 _inDataHandlers.push_back(handler);
443 DEBUGTRACE_PRINT(_inDataHandlers.size());
449 std::scoped_lock<std::mutex>
lck(_inDataHandler_mtx);
450 _inDataHandlers.remove(&handler);
452 DEBUGTRACE_PRINT(_inDataHandlers.size());
474 return _inputStream.get();
478 if (_inputStream && _inputStream->duplexMode()) {
479 return _inputStream.get();
481 return _outputStream.get();
Information regarding a stream.
Configuration of a DAQ device.
std::vector< DaqChannel > inchannel_config
Channel configuration for input channels.
bool monitorOutput
If set to true and if the device has this capability, the output channels are added as input channels...
string device_name
The internal device name this DAQ configuration applies to.
std::vector< DaqChannel > outchannel_config
Channel configuration for output channels.
bool match(const DeviceInfo &devinfo) const
Check to see whether the DAQ configuration matches with the device. This means, some basic checks are...
Data coming from / going to DAQ. Non-interleaved format, which means data in buffer is ordered by cha...
arma::Mat< d > toFloat() const
Convert samples to floating point values and return a nframes x nchannels array of floats....
DataTypeDescriptor::DataType dtype
The data type corresponding to a sample.
byte_t * raw_ptr(const us frame=0, const us channel=0)
Return pointer to the raw data corresponding to a certain sample (frame, channel combo).
us nchannels
The number of channels.
us size_bytes() const
Return the total number of bytes.
us nframes
The number of frames in this block of data.
Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a custom device,...
static std::unique_ptr< Daq > createDaq(const DeviceInfo &devinfo, const DaqConfiguration &config)
Create a Daq based on given device info and configuration.
double samplerate() const
Returns current sample rate.
virtual StreamStatus getStreamStatus() const =0
Get stream status corresponding to current DAQ.
Structure containing device info parameters.
static DeviceInfoList getDeviceInfo()
Create a list of DeviceInfo's that are at call time avalable.
bool hasInternalOutputMonitor
Whether the device has an internal monitor of the output signal. If true, the device is able to monit...
void push_task(F &&task, A &&...args)
Wrapper around BS::thread_pool::push_task(...)
static SeriesBiquad firstOrderHighPass(const d fs, const d cuton_Hz)
Create a SeriesBiquad object for a first order high-pass filter.
Stream manager. Used to manage the input and output streams. Implemented as a singleton: only one str...
static std::shared_ptr< StreamMgr > getInstance()
Get access to stream manager instance.
Daq::StreamStatus getStreamStatus(const StreamType type) const
Get the streamstatus object corresponding to a given stream.
void rescanDAQDevices(bool background=false, std::function< void()> callback=std::function< void()>())
Triggers a background scan of the DAQ devices, which updates the internally stored list of devices....
void startStream(const DaqConfiguration &config)
Start a stream based on given configuration.
const Daq * getDaq(StreamType type) const
Get DAQ pointer for a given stream. Gives a nullptr if stream is not running.
void setSiggen(std::shared_ptr< Siggen > s)
Set active signal generator for output streams. Only one ‘Siggen’ is active at the same time....
void stopAllStreams()
Stop and delete all streams. Also called on destruction of the StreamMgr.
void stopStream(const StreamType stype)
Stop stream of given type (input / output/ duplex);.
bool isStreamRunningOK(const StreamType type) const
Check if a certain stream is running. If running with no errors, it returns true. If an error occured...
std::weak_ptr< StreamMgr > _mgr
The main global handle to a stream, stored in a weak pointer, if it does not yet exist,...
std::scoped_lock< std::mutex > Lck
bool fillData(DaqData &data, const vd &signal)
Converts from double precision floating point to output signal in non-interleaving format.
std::function< void(DaqData &)> OutDaqCallback
std::function< void(const DaqData &)> InDaqCallback
std::shared_ptr< StreamMgr > SmgrHandle
std::scoped_lock< std::mutex > lck
size_t us
We often use boolean values.