from threading import Thread
import numpy as np
import scipy.signal as sg
import scipy.fft as sp
from scipy.io.wavfile import write
import pyaudio
from sensors.Utilities.numpy_ringbuffer import RingBuffer
from datetime import datetime
import os
from colorama import Fore, Back, Style
from functools import partial
import time
import ctypes
[docs]class PyAudioHandler(Thread):
"""Class to handle USB dongle input by reading chunks of mono data from the wingbeat device and sending data to the
Bokeh application and saving it.
:param visual_trigger: object created by :class:`sensors.GUIclasses.single_save.SingleSaveTab`
:type visual_trigger: Bokeh object
:param visual_ringbuffer: object created by :class:`sensors.GUIclasses.audio_stream.StreamAudioTab`
:type visual_ringbuffer: Bokeh object
"""
def __init__(self, visual_trigger, visual_ringbuffer):
Thread.__init__(self)
self.visual_trigger = visual_trigger
self.visual_ringbuffer = visual_ringbuffer
self.config = visual_trigger.config
self.doc = self.config.curdoc
self.sampling_rate = self.config.sampling_rate
self.chunk_size = self.config.chunk_size
self.decimation_factor = self.config.decimation_factor
self.number_of_saved_chunks = self.config.number_of_saved_chunks
self.number_of_chunks_to_wait = self.config.number_of_chunks_to_wait
self.timeslice = visual_trigger.timeslice
# number of chunks within timeframe
self.multiples = visual_trigger.multiples
self.threshold = visual_trigger.threshold
self.gain = visual_trigger.gain
self.circularbuffer_save = RingBuffer(self.multiples*self.chunk_size)
self.circularbuffer_vis = RingBuffer(self.multiples*self.chunk_size//self.decimation_factor)
self.data = {'values': None}
self.delay = 0
self.save_in_progress = False
self.running = False
self.thread = None
[docs] def start(self):
"""Starts the audio thread.
"""
self.running = True
self.thread = Thread(target=self.update_audio_data, args=())
self.thread.setDaemon(True)
self.thread.start()
print('PyAudiohandler thread initialized!')
[docs] def kill(self):
"""Ends the audio thread.
"""
self.running = False
self.thread.join()
[docs] def py_error_handler(self, filename, line, function, err, fmt):
"""Function to mute ALSA warnings on creation of pyaudio object.
"""
pass
[docs] def update_audio_data(self):
"""Main thread function that gets initialized with the thread generation :func:`PyAudioHandler.start()`.
"""
# mute ALSA debugging
print('Muted ALSA warnings!')
ERROR_HANDLER_FUNC = ctypes.CFUNCTYPE(None, ctypes.c_char_p, ctypes.c_int,
ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p)
c_error_handler = ERROR_HANDLER_FUNC(self.py_error_handler)
asound = ctypes.cdll.LoadLibrary('libasound.so.2')
asound.snd_lib_error_set_handler(c_error_handler)
pa = pyaudio.PyAudio()
stream = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=int(self.sampling_rate),
input=True,
frames_per_buffer=self.chunk_size,
)
while self.running:
try:
raw_data = np.fromstring(stream.read(self.chunk_size), dtype=np.int16)
# whole raw data recorded within given timeframe extended into ringbuffer
self.circularbuffer_save.extend(raw_data)
chunk_signal = raw_data / 32768.0
# PSD of chunk
PSD_chunk_signal = 10*np.log10(sg.welch(chunk_signal, fs=self.sampling_rate,
window='hann', nperseg=256, noverlap=128+64)[1])
# down sample signal chunks
signal_resamp = sg.resample_poly(chunk_signal, 1, self.decimation_factor, padtype='edge')
# fill ringbuffer used for visualization
self.circularbuffer_vis.extend(signal_resamp)
buffer_resamp = self.circularbuffer_vis.get()
# check if triggered save is active
if self.visual_trigger.trigger:
if max(chunk_signal*self.gain.value) > self.threshold.value or self.save_in_progress:
self.save_in_progress = True
self.delay += 1
if self.delay == self.number_of_chunks_to_wait:
self.delay = 0
self.save_in_progress = False
# save signal and return raw triggered signal and filename
triggered_signal, filename = self.save_trig_signal()
else:
triggered_signal, filename = None, None
else:
triggered_signal, filename = None, None
else:
triggered_signal, filename = None, None
if self.visual_ringbuffer.save_all:
self.save_all()
self.visual_ringbuffer.save_all = False
# save data in dict
self.data['values'] = signal_resamp, PSD_chunk_signal, buffer_resamp, triggered_signal, filename
# update corresponding visualization object
if self.visual_trigger.trigger:
self.doc.add_next_tick_callback(partial(self.visual_trigger.signal_update, self.data))
else:
self.doc.add_next_tick_callback(partial(self.visual_ringbuffer.signal_update, self.data))
except:
continue
[docs] def save_all(self):
"""Method to save the ringbuffer to a ``wav`` file with the corresponding time stamp.
"""
ringbuffer_data = self.circularbuffer_save.get()
timestamp = datetime.now().strftime("_%Y-%m-%d_%H-%M-%S")
pre = 'all'
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config.wav_path, pre + timestamp)
os.makedirs(path)
file_name = f'ringbuffer_{int(self.multiples*self.timeslice)}ms.wav'
write(os.path.join(path, file_name), self.sampling_rate, ringbuffer_data.astype(np.int16))
print(Fore.RED + 'Saved ringbuffer to ' + self.config.wav_path + '/' + timestamp + Fore.WHITE)
[docs] def save_trig_signal(self):
"""Method to save a triggered signal to a ``wav`` file with the corresponding time stamp.
:return: triggered raw signal, filename
:rtype: 1D float array, string
"""
ringbuffer_data = self.circularbuffer_save.get()
triggered_signal = ringbuffer_data[-self.number_of_saved_chunks*self.chunk_size:]
pre = 'trig'
dt = datetime.now()
timestamp = dt.strftime(f"_%Y-%m-%d_%H-%M-%S") + '-' + str(int(dt.microsecond/1000))
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config.wav_path, pre + timestamp)
os.makedirs(path)
file_name = f'signal_{int(self.number_of_saved_chunks*self.timeslice)}ms.wav'
write(os.path.join(path, file_name), self.sampling_rate, triggered_signal.astype(np.int16))
print(Fore.RED + 'Saved signal to ' + self.config.wav_path + '/' + pre + timestamp + Fore.WHITE)
return triggered_signal, os.path.join(path, file_name)