Source code for sensors.Utilities.pyaudiohandler

from threading import Thread
import numpy as np
import scipy.signal as sg
import scipy.fft as sp
from scipy.io.wavfile import write
import pyaudio
from sensors.Utilities.numpy_ringbuffer import RingBuffer
from datetime import datetime
import os
from colorama import Fore, Back, Style
from functools import partial
import time
import ctypes


[docs]class PyAudioHandler(Thread): """Class to handle USB dongle input by reading chunks of mono data from the wingbeat device and sending data to the Bokeh application and saving it. :param visual_trigger: object created by :class:`sensors.GUIclasses.single_save.SingleSaveTab` :type visual_trigger: Bokeh object :param visual_ringbuffer: object created by :class:`sensors.GUIclasses.audio_stream.StreamAudioTab` :type visual_ringbuffer: Bokeh object """ def __init__(self, visual_trigger, visual_ringbuffer): Thread.__init__(self) self.visual_trigger = visual_trigger self.visual_ringbuffer = visual_ringbuffer self.config = visual_trigger.config self.doc = self.config.curdoc self.sampling_rate = self.config.sampling_rate self.chunk_size = self.config.chunk_size self.decimation_factor = self.config.decimation_factor self.number_of_saved_chunks = self.config.number_of_saved_chunks self.number_of_chunks_to_wait = self.config.number_of_chunks_to_wait self.timeslice = visual_trigger.timeslice # number of chunks within timeframe self.multiples = visual_trigger.multiples self.threshold = visual_trigger.threshold self.gain = visual_trigger.gain self.circularbuffer_save = RingBuffer(self.multiples*self.chunk_size) self.circularbuffer_vis = RingBuffer(self.multiples*self.chunk_size//self.decimation_factor) self.data = {'values': None} self.delay = 0 self.save_in_progress = False self.running = False self.thread = None
[docs] def start(self): """Starts the audio thread. """ self.running = True self.thread = Thread(target=self.update_audio_data, args=()) self.thread.setDaemon(True) self.thread.start() print('PyAudiohandler thread initialized!')
[docs] def kill(self): """Ends the audio thread. """ self.running = False self.thread.join()
[docs] def py_error_handler(self, filename, line, function, err, fmt): """Function to mute ALSA warnings on creation of pyaudio object. """ pass
[docs] def update_audio_data(self): """Main thread function that gets initialized with the thread generation :func:`PyAudioHandler.start()`. """ # mute ALSA debugging print('Muted ALSA warnings!') ERROR_HANDLER_FUNC = ctypes.CFUNCTYPE(None, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p) c_error_handler = ERROR_HANDLER_FUNC(self.py_error_handler) asound = ctypes.cdll.LoadLibrary('libasound.so.2') asound.snd_lib_error_set_handler(c_error_handler) pa = pyaudio.PyAudio() stream = pa.open( format=pyaudio.paInt16, channels=1, rate=int(self.sampling_rate), input=True, frames_per_buffer=self.chunk_size, ) while self.running: try: raw_data = np.fromstring(stream.read(self.chunk_size), dtype=np.int16) # whole raw data recorded within given timeframe extended into ringbuffer self.circularbuffer_save.extend(raw_data) chunk_signal = raw_data / 32768.0 # PSD of chunk PSD_chunk_signal = 10*np.log10(sg.welch(chunk_signal, fs=self.sampling_rate, window='hann', nperseg=256, noverlap=128+64)[1]) # down sample signal chunks signal_resamp = sg.resample_poly(chunk_signal, 1, self.decimation_factor, padtype='edge') # fill ringbuffer used for visualization self.circularbuffer_vis.extend(signal_resamp) buffer_resamp = self.circularbuffer_vis.get() # check if triggered save is active if self.visual_trigger.trigger: if max(chunk_signal*self.gain.value) > self.threshold.value or self.save_in_progress: self.save_in_progress = True self.delay += 1 if self.delay == self.number_of_chunks_to_wait: self.delay = 0 self.save_in_progress = False # save signal and return raw triggered signal and filename triggered_signal, filename = self.save_trig_signal() else: triggered_signal, filename = None, None else: triggered_signal, filename = None, None else: triggered_signal, filename = None, None if self.visual_ringbuffer.save_all: self.save_all() self.visual_ringbuffer.save_all = False # save data in dict self.data['values'] = signal_resamp, PSD_chunk_signal, buffer_resamp, triggered_signal, filename # update corresponding visualization object if self.visual_trigger.trigger: self.doc.add_next_tick_callback(partial(self.visual_trigger.signal_update, self.data)) else: self.doc.add_next_tick_callback(partial(self.visual_ringbuffer.signal_update, self.data)) except: continue
[docs] def save_all(self): """Method to save the ringbuffer to a ``wav`` file with the corresponding time stamp. """ ringbuffer_data = self.circularbuffer_save.get() timestamp = datetime.now().strftime("_%Y-%m-%d_%H-%M-%S") pre = 'all' path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config.wav_path, pre + timestamp) os.makedirs(path) file_name = f'ringbuffer_{int(self.multiples*self.timeslice)}ms.wav' write(os.path.join(path, file_name), self.sampling_rate, ringbuffer_data.astype(np.int16)) print(Fore.RED + 'Saved ringbuffer to ' + self.config.wav_path + '/' + timestamp + Fore.WHITE)
[docs] def save_trig_signal(self): """Method to save a triggered signal to a ``wav`` file with the corresponding time stamp. :return: triggered raw signal, filename :rtype: 1D float array, string """ ringbuffer_data = self.circularbuffer_save.get() triggered_signal = ringbuffer_data[-self.number_of_saved_chunks*self.chunk_size:] pre = 'trig' dt = datetime.now() timestamp = dt.strftime(f"_%Y-%m-%d_%H-%M-%S") + '-' + str(int(dt.microsecond/1000)) path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config.wav_path, pre + timestamp) os.makedirs(path) file_name = f'signal_{int(self.number_of_saved_chunks*self.timeslice)}ms.wav' write(os.path.join(path, file_name), self.sampling_rate, triggered_signal.astype(np.int16)) print(Fore.RED + 'Saved signal to ' + self.config.wav_path + '/' + pre + timestamp + Fore.WHITE) return triggered_signal, os.path.join(path, file_name)