LASP 1.0
Library for Acoustic Signal Processing
Loading...
Searching...
No Matches
lasp_octavefilter.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""!
4Author: J.A. de Jong - ASCEE
5
6Provides the implementations of (fractional) octave filter banks
7
8"""
9__all__ = ['FirOctaveFilterBank', 'FirThirdOctaveFilterBank',
10 'OverallFilterBank', 'SosOctaveFilterBank',
11 'SosThirdOctaveFilterBank']
12
13from .filter.filterbank_design import (OctaveBankDesigner,
14 ThirdOctaveBankDesigner)
15from .lasp_cpp import BiquadBank
16import numpy as np
17
18
20 """
21 Dummy type filter bank. Does nothing special, only returns output in a
22 sensible way
23 """
24
25 def __init__(self, fs):
26 """
27 Initialize overall filter bank
28 """
29 self.fs = fs
30 self.N = 0
31 self.xs = [0]
32
33 def filter_(self, data):
34 """
35 Filter input data
36 """
37 if data.ndim == 2 and data.shape[1] != 1:
38 raise RuntimeError("invalid number of channels, should be 1")
39
40 if data.shape[0] == 0:
41 # No new time samples
42 return {}
43
44 # Output given as a dictionary with x as the key
45 output = {}
46
47 tstart = self.N / self.fs
48 Ncur = data.shape[0]
49 tend = tstart + Ncur / self.fs
50
51 t = np.linspace(tstart, tend, Ncur, endpoint=False)
52 self.N += Ncur
53
54 output['Overall'] = {'t': t, 'data': data, 'x': 0}
55 return output
56
57 def decimation(self, x):
58 return [1]
59
60
62 """
63 Single channel (fractional) octave filter bank implementation, based on FIR
64 filters and sample rate decimation.
65 """
66
67 def __init__(self, fs, xmin, xmax):
68 """
69 Initialize a OctaveFilterBank object.
70
71 Args:
72 fs: Sampling frequency of base signal
73
74 """
75 assert np.isclose(fs, 48000), "Only sampling frequency" \
76 " available is 48 kHz"
77
78 self.fs = fs
79 self.xs = list(range(xmin, xmax + 1))
80
81 maxdecimation = self.designer.firDecimation(self.xs[0])
82 self.decimators = []
83 for dec in maxdecimation:
84 self.decimators.append(Decimator(1, dec))
85
86 xs_d1 = []
87 xs_d4 = []
88 xs_d16 = []
89 xs_d64 = []
90 xs_d256 = []
91
92 self.filterbanks = []
93 # Sort the x values in categories according to the required decimation
94 for x in self.xs:
95 dec = self.designer.firDecimation(x)
96 if len(dec) == 1 and dec[0] == 1:
97 xs_d1.append(x)
98 elif len(dec) == 1 and dec[0] == 4:
99 xs_d4.append(x)
100 elif len(dec) == 2:
101 xs_d16.append(x)
102 elif len(dec) == 3:
103 xs_d64.append(x)
104 elif len(dec) == 4:
105 xs_d256.append(x)
106 else:
107 raise ValueError(f'No decimation found for x={x}')
108
109 xs_all = [xs_d1, xs_d4, xs_d16, xs_d64, xs_d256]
110 for xs in xs_all:
111 nominals_txt = []
112 if len(xs) > 0:
113 firs = np.empty((self.designer.firFilterLength, len(xs)), order='F')
114 for i, x in enumerate(xs):
115 # These are the filters that do not require lasp_decimation
116 # prior to filtering
117 nominals_txt.append(self.designer.nominal_txt(x))
118 firs[:, i] = self.designer.createFirFilter(x)
119 filterbank = {'fb': pyxFilterBank(firs, 1024),
120 'xs': xs,
121 'nominals': nominals_txt}
122 self.filterbanks.append(filterbank)
123
124 # Sample input counter.
125 self.N = 0
126
127 self.dec = [1, 4, 16, 64, 256]
128
129 # Filter output counters
130 # These intial delays are found 'experimentally' using a toneburst
131 # response.
132 self.Nf = [915, 806, 780, 582, 338]
133
134 def filterd(self, dec_stage, data):
135 """
136 Filter data for a given decimation stage
137
138 Args:
139 dec_stage: decimation stage
140 data: Pre-filtered data
141 """
142 output = {}
143 if data.shape[0] == 0:
144 return output
145
146 filtered = self.filterbanks[dec_stage]['fb'].filter_(data)
147 Nf = filtered.shape[0]
148 if Nf > 0:
149 dec = self.dec[dec_stage]
150 fd = self.fs/dec
151
152 oldNf = self.Nf[dec_stage]
153 tstart = oldNf/fd
154 tend = tstart + Nf/fd
155
156 t = np.linspace(tstart, tend, Nf, endpoint=False)
157 self.Nf[dec_stage] += Nf
158 for i, nom_txt in enumerate(self.filterbanks[dec_stage]['nominals']):
159 x = self.designer.nominal_txt_tox(nom_txt)
160 output[nom_txt] = {'t': t, 'data': filtered[:, [i]], 'x': x}
161
162 return output
163
164 def filter_(self, data):
165 """
166 Filter input data
167 """
168 assert data.ndim == 2
169 assert data.shape[1] == 1, "invalid number of channels, should be 1"
170
171 if data.shape[0] == 0:
172 return {}
173
174 # Output given as a dictionary with x as the key
175 output = {}
176 output_unsorted = {}
177 self.N += data.shape[0]
178
179 output_unsorted = {**output_unsorted, **self.filterd(0, data)}
180
181 for i in range(len(self.decimators)):
182 dec_stage = i+1
183 if data.shape[0] > 0:
184 # Apply a decimation stage
185 data = self.decimators[i].decimate(data)
186 output_unsorted = {**output_unsorted,
187 **self.filterd(dec_stage, data)}
188
189 # Create sorted output
190 for x in self.xs:
191 nom_txt = self.designer.nominal_txt(x)
192 output[nom_txt] = output_unsorted[nom_txt]
193
194 return output
195
196 def decimation(self, x):
197 return self.designer.firDecimation(x)
198
199
201 """
202 Filter bank which uses FIR filtering for each octave frequency band
203 """
204
205 def __init__(self, fs, xmin, xmax):
207 FirFilterBank.__init__(self, fs, xmin, xmax)
208
209
211 """
212 Filter bank which uses FIR filtering for each one-third octave frequency
213 band.
214 """
215
216 def __init__(self, fs, xmin, xmax):
218 FirFilterBank.__init__(self, fs, xmin, xmax)
219
220
222 def __init__(self, fs, xmin, xmax):
223 """
224 Initialize a second order sections filterbank
225
226 Args:
227 fs: Sampling frequency [Hz]
228 xmin: Minimum value for the bands
229 xmax: Maximum value for the bands
230 """
231 if xmin is None:
232 xmin = self.designer.xs[0]
233 if xmax is None:
234 xmax = self.designer.xs[-1]
235
236 self.fs = fs
237 self.xs = list(range(xmin, xmax + 1))
238
239 # The number of parallel filters
240 nfilt = len(self.xs)
241
242 self.nfilt = nfilt
243
244 sos = None
245 for i, x in enumerate(self.xs):
246 channel = self.designer.createSOSFilter(x)
247 if sos is None:
248 sos = np.empty((channel.size, len(self.xs)))
249 sos[:, i] = channel.flatten()
250
251 self._fb = BiquadBank(sos)
252
253 self.xmin = xmin
254 self.xmax = xmax
255 self.N = 0
256
257 def filter_(self, data):
258 """
259 Filter input data
260 """
261
262 if data.ndim > 1 and data.shape[1] != 1:
263 raise RuntimeError("invalid number of channels, should be 1")
264
265 if data.shape[0] == 0:
266 return {}
267
268 filtered_data = self._fb.filter(data)
269 if filtered_data.ndim == 1:
270 filtered_data = filtered_data[:, None]
271
272 # Output given as a dictionary with nom_txt as the key
273 output = {}
274
275 tstart = self.N / self.fs
276 Ncur = data.shape[0]
277 tend = tstart + Ncur / self.fs
278
279 t = np.linspace(tstart, tend, Ncur, endpoint=False)
280 self.N += Ncur
281
282 for i, x in enumerate(self.xs):
283 # '31.5' to '16k'
284 nom_txt = self.designer.nominal_txt(x)
285 output[nom_txt] = {'t': t, 'data': filtered_data[:, [i]], 'x': x}
286
287 return output
288
289 def decimation(self, x):
290 return [1]
291
292
294 """
295 Filter bank which uses FIR filtering for each one-third octave frequency
296 band.
297 """
298
299 def __init__(self, fs, xmin=None, xmax=None):
300 """
301 Initialize a second order sections filterbank.
302
303 Args:
304 fs: Sampling frequency [Hz]
305 xmin: Minimum value for the bands
306 xmax: Maximum value for the bands
307 """
309 SosFilterBank.__init__(self, fs, xmin, xmax)
310
311
313 """
314 Filter bank which uses FIR filtering for each one-third octave frequency
315 band.
316 """
317
318 def __init__(self, fs, xmin=None, xmax=None):
319 """
320 Initialize a second order sections filterbank.
321
322 Args:
323 fs: Sampling frequency [Hz]
324 xmin: Minimum value for the bands, if not specified, use minimum
325 xmax: Maximum value for the bands, if not specified, use maximum
326 """
328 SosFilterBank.__init__(self, fs, xmin, xmax)
Single channel (fractional) octave filter bank implementation, based on FIR filters and sample rate d...
__init__(self, fs, xmin, xmax)
Initialize a OctaveFilterBank object.
filter_(self, data)
Filter input data.
filterd(self, dec_stage, data)
Filter data for a given decimation stage.
Filter bank which uses FIR filtering for each octave frequency band.
__init__(self, fs, xmin, xmax)
Initialize a OctaveFilterBank object.
Filter bank which uses FIR filtering for each one-third octave frequency band.
__init__(self, fs, xmin, xmax)
Initialize a OctaveFilterBank object.
filter_(self, data)
Filter input data.
__init__(self, fs)
Initialize overall filter bank.
filter_(self, data)
Filter input data.
__init__(self, fs, xmin, xmax)
Initialize a second order sections filterbank.
Filter bank which uses FIR filtering for each one-third octave frequency band.
__init__(self, fs, xmin=None, xmax=None)
Initialize a second order sections filterbank.
Filter bank which uses FIR filtering for each one-third octave frequency band.
__init__(self, fs, xmin=None, xmax=None)
Initialize a second order sections filterbank.