LASP 1.0
Library for Acoustic Signal Processing
Loading...
Searching...
No Matches
lasp_record.py
Go to the documentation of this file.
1#!/usr/bin/python3.8
2# -*- coding: utf-8 -*-
3"""
4Read data from stream and record sound and video at the same time
5"""
6import dataclasses, logging, os, time, h5py, threading
7import numpy as np
8
9from .lasp_atomic import Atomic
10from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler,
11 StreamMgr)
12
13
14@dataclasses.dataclass
16 curT: float = 0
17 done: bool = False
18
19
21 """
22 Class used to perform a recording. Recording data can come in from a
23 different thread, that is supposed to call the `inCallback` method, with
24 audio data as an argument.
25 """
26
28 self,
29 fn: str,
30 streammgr: StreamMgr,
31 rectime: float = None,
32 wait: bool = True,
33 progressCallback=None,
34 startDelay: float = 0,
35 ):
36 """
37 Start a recording. Blocks if wait is set to True.
38
39 Args:
40 fn: Filename to record to. Extension is automatically added if not
41 provided.
42 stream: AvStream instance to record from. Should have input
43 channels!
44 rectime: Recording time [s], None for infinite, in seconds. If set
45 to None, or np.inf, the recording continues indefintely.
46 progressCallback: callable that is called with an instance of
47 RecordStatus instance as argument.
48 startDelay: Optional delay added before the recording is *actually*
49 started in [s].
50 """
51 ext = ".h5"
52 if ext not in fn:
53 fn += ext
54
55 self.smgr = streammgr
56 self.metadata = None
57
58 if startDelay < 0:
59 raise RuntimeError("Invalid start delay value. Should be >= 0")
60
61 self.startDelay = startDelay
62
63 # Flag used to indicate that we have passed the start delay
64 self.startDelay_passed = False
65
66 # The amount of seconds (float) that is to be recorded
67 self.rectime = rectime
68
69 # The file name to store data to
70 self.fn = fn
71
73
74 # Counter of the number of blocks
75 self.ablockno = Atomic(0)
76
77 # Stop flag, set when recording is finished.
78 self.stop = Atomic(False)
79
80 # Mutex, on who is working with the H5py data
81 self.file_mtx = threading.Lock()
82
83 self.progressCallback = progressCallback
84
85 try:
86 # Open the file
87 self.f = h5py.File(self.fn, "w", 'stdio')
88 self.f.flush()
89 except Exception as e:
90 logging.error(f'Error creating measurement file {e}')
91 raise
92
93 # This flag is used to delete the file on finish(), and can be used
94 # when a recording is canceled.
95 self.deleteFile = False
96
97 # Try to obtain stream metadata
98 streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input)
99 if not streamstatus.runningOK():
100 raise RuntimeError(
101 "Stream is not running properly. Please first start the stream"
102 )
103
104 self.ad = None
105
106 logging.debug("Starting record....")
107
108 self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback)
109
110 if wait:
111 logging.debug("Stop recording with CTRL-C")
112 try:
113 while not self.stop():
114 time.sleep(0.01)
115 except KeyboardInterrupt:
116 logging.debug("Keyboard interrupt on record")
117 finally:
118 self.finish()
119
120 def resetCallback(self, daq):
121 """
122 Function called with initial stream data.
123 """
124 with self.file_mtx:
125 in_ch = daq.enabledInChannels()
126 blocksize = daq.framesPerBlock()
127 self.blocksize = blocksize
128 self.nchannels = daq.neninchannels()
129 self.fs = daq.samplerate()
130
131 f = self.f
132
133 f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR
134 f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR
135
136 # Set the bunch of attributes
137 f.attrs["samplerate"] = daq.samplerate()
138 f.attrs["nchannels"] = daq.neninchannels()
139 f.attrs["blocksize"] = blocksize
140 f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch]
141 f.attrs["channelNames"] = [ch.name for ch in in_ch]
142
143 # Add the start delay here, as firstFrames() is called right after the
144 # constructor is called. time.time() returns a floating point
145 # number of seconds after epoch.
146 f.attrs["time"] = time.time() + self.startDelay
147
148 # In V2, we do not store JSON metadata anymore, but just an enumeration
149 # index to a physical quantity.
150 f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch]
151
152 # Measured physical quantity metadata
153 # This was how it was in LASP version < 1.0
154 # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
155 f.flush()
156
157 def firstFrames(self, adata):
158 """
159 Set up the dataset in which to store the audio data. This will create
160 the attribute `self.ad`
161
162 Args:
163 adata: Numpy array with data from DAQ
164
165 """
166
167 # The array data type cannot
168 # datatype = daq.dataType()
169 dtype = np.dtype(adata.dtype)
170
171 self.ad = self.f.create_dataset(
172 "audio",
173 (1, self.blocksize, self.nchannels),
174 dtype=dtype,
175 maxshape=(
176 None, # This means, we can add blocks
177 # indefinitely
178 self.blocksize,
179 self.nchannels,
180 ),
181 compression="gzip",
182 )
183 self.f.flush()
184
185 def inCallback(self, adata):
186 """
187 This method is called when a block of audio data from the stream is
188 available. It should return either True or False.
189
190 When returning False, it will stop the stream.
191
192 """
193 if self.stop():
194 logging.debug('Stop flag set, early return in inCallback')
195 # Stop flag is raised. We do not add any data anymore.
196 return True
197
198 with self.file_mtx:
199
200 if self.ad is None:
201 self.firstFrames(adata)
202
203 self.__addTimeData(adata)
204 return True
205
206 def setDelete(self, val: bool):
207 """
208 Set the delete flag. If set, measurement file is deleted at the end of
209 the recording. Typically used for cleaning up after canceling a
210 recording.
211 """
212 with self.file_mtx:
213 self.deleteFile = val
214
215 def finish(self):
216 """
217 This method should be called to finish and a close a recording file,
218 remove the queue from the stream, etc.
219
220 """
221 logging.debug("Recording::finish()")
222
223 self.stop <<= True
224
225 with self.file_mtx:
226 self.f.flush()
227 # Remove indata handler, which also should remove callback function
228 # from StreamMgr. This, however does not have to happen
229 # instantaneously. For which we have to implement extra mutex
230 # guards in this class
231 del self.indh
232 self.indh = None
233
234 # Remove handle to dataset otherwise the h5 file is not closed
235 # properly.
236 del self.ad
237 self.ad = None
238
239 try:
240 # Close the recording file
241 self.f.close()
242 del self.f
243 except Exception as e:
244 logging.error(f"Error closing file: {e}")
245
246 logging.debug("Recording ended")
247 if self.deleteFile:
248 self.__deleteFile()
249
250 def __deleteFile(self):
251 """
252 Cleanup the recording file.
253 """
254 try:
255 os.remove(self.fn)
256 except Exception as e:
257 logging.error(f"Error deleting file: {self.fn}: {str(e)}")
258
259 def __addTimeData(self, indata):
260 """
261 Called by handleQueue() and adds new time data to the storage file.
262 """
263 # logging.debug('Recording::__addTimeData()')
264
265 curT = self.ablockno() * self.blocksize / self.fs
266
267 # Increase the block counter
268 self.ablockno += 1
269
270 if curT < self.startDelay and not self.startDelay_passed:
271 # Start delay has not been passed
272 return
273 elif curT >= 0 and not self.startDelay_passed:
274 # Start delay passed, switch the flag!
275 self.startDelay_passed = True
276
277 # Reset the audio block counter and the recording time
278 self.ablockno = Atomic(1)
279 curT = 0
280
281 ablockno = self.ablockno()
282 recstatus = RecordStatus(curT=curT, done=False)
283
284 if self.progressCallback is not None:
285 self.progressCallback(recstatus)
286
287 curT_rounded_to_seconds = int(curT)
288 if curT_rounded_to_seconds > self.curT_rounded_to_seconds:
289 self.curT_rounded_to_seconds = curT_rounded_to_seconds
290 print(f"{curT_rounded_to_seconds}", end="", flush=True)
291 else:
292 print(".", end="", flush=True)
293
294 if self.rectime is not None and curT > self.rectime:
295 # We are done!
296 if self.progressCallback is not None:
297 recstatus.done = True
298 self.progressCallback(recstatus)
299 self.stop <<= True
300 return
301
302 # Add the data to the file, and resize the audio data blocks
303 self.ad.resize(ablockno, axis=0)
304 self.ad[ablockno - 1, :, :] = indata
305 self.f.flush()
Implementation of atomic operations on integers and booleans.
Class used to perform a recording.
firstFrames(self, adata)
Set up the dataset in which to store the audio data.
inCallback(self, adata)
This method is called when a block of audio data from the stream is available.
resetCallback(self, daq)
Function called with initial stream data.
__init__(self, str fn, StreamMgr streammgr, float rectime=None, bool wait=True, progressCallback=None, float startDelay=0)
Start a recording.
finish(self)
This method should be called to finish and a close a recording file, remove the queue from the stream...
setDelete(self, bool val)
Set the delete flag.