Source code for sensors.GUIclasses.triggered_save

import numpy as np
import scipy.signal as sg

from bokeh.plotting import figure
from bokeh.models import Panel, Button, ColumnDataSource, Slider, Div, PrintfTickFormatter
from bokeh.events import ButtonClick
from bokeh.layouts import layout, gridplot, column, row, Spacer
from bokeh.document import without_document_lock

from sensors.Utilities.signal_to_png import create_png_from_signal
from sensors.GUIclasses.taxonomy_autocomplete_widget import AutocompleteTaxonomy
from sensors.Utilities.write_meta_data import WriteMetaData

import os
import shutil
from tornado import gen
from datetime import datetime
from colorama import Fore

import uuid
import json


[docs]class TriggeredSaveTab: """Class to create a Bokeh tab that displays a life wingbeat stream of the latest audio chunk, the last four triggered signals and their PSD. The duration of the triggered signal can be indirectly specified with the `number_of_saved_chunks`, `chunk_size` and `sampling_rate` parameters in the :ref:`config.yaml` file. To trigger a signal the user can adjust a threshold and a gain. :param config: configuration including :ref:`config.yaml` dictionary and current Bokeh document :type config: object """ def __init__(self, config): """config - configuration object """ self.config = config self.autocomplete_tax = AutocompleteTaxonomy(config) self.chunk_size = config.chunk_size self.sampling_rate = config.sampling_rate self.timeframe = config.timeframe self.number_of_saved_chunks = config.number_of_saved_chunks self.decimation_factor = config.decimation_factor # derived parameters self.max_freq_khz = config.sampling_rate/self.decimation_factor/2 * 0.001 self.timeslice = self.chunk_size/self.sampling_rate * 1000 self.multiples = int(np.ceil(self.timeframe/self.timeslice)) self.template = ("""<div class='content' style="background-color: {background};color: {colour};"> <div class='info'> {status_text} </div> </div>""") # call tab method to create tab self._tab = self.make_tab() # create plot list to iterate through self.source_list = (self.trig_signal_source_1, self.trig_signal_source_2, self.trig_signal_source_3, self.trig_signal_source_4, self.spectrum_source_1, self.spectrum_source_2, self.spectrum_source_3, self.spectrum_source_4) self.plot_list = (self.trig_signal_plot_1, self.trig_signal_plot_2, self.trig_signal_plot_3, self.trig_signal_plot_4, self.spectrum_plot_1, self.spectrum_plot_2, self.spectrum_plot_3, self.spectrum_plot_4) self.executor = config.executor self.plot_counter = 0 self.update_data = False self.trigger = False self.check_update = None @property def tab(self): """Calls method :meth:`make_tab`. """ return self._tab
[docs] def make_tab(self): """ Creates and arranges the elements of the corresponding Bokeh tab. :return: Bokeh panel for the visualization of four triggered signals :rtype: Bokeh object """ self.btn_wingbeats = Button(label='Start Stream', width=310) self.btn_wingbeats.on_event(ButtonClick, self.do_update) self.text_status = Div(text=self.template.format(status_text='Wingbeat stream inactive', background='#DC524C', colour='#FFFFFF'), render_as_text=False) PLOTARGS = dict(tools="", toolbar_location=None, outline_line_color='#595959') self.signal_source = ColumnDataSource(data=dict(t=[], y=[])) self.signal_plot = figure(plot_width=600, plot_height=200, title="Chunk signal", x_range=[0, self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='gain*normalized input [ ]', **PLOTARGS) self.signal_plot.background_fill_color = "#eaeaea" self.signal_plot.line(x="t", y="y", line_color="#024768", source=self.signal_source) self.trig_signal_source_1 = ColumnDataSource(data=dict(t=[], y=[])) self.trig_signal_plot_1 = figure(plot_width=400, plot_height=200, title="Triggered signal 1", x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='normalized input [ ]', **PLOTARGS) self.trig_signal_plot_1.yaxis[0].formatter = PrintfTickFormatter(format="%.0e") self.trig_signal_plot_1.background_fill_color = "#eaeaea" self.trig_signal_plot_1.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_1) self.spectrum_source_1 = ColumnDataSource(data=dict(f=[], y=[])) self.spectrum_plot_1 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 1", y_range=[-130, -80], x_range=[0, self.max_freq_khz], x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS) self.spectrum_plot_1.background_fill_color = "#eaeaea" self.spectrum_plot_1.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_1) self.trig_signal_source_2 = ColumnDataSource(data=dict(t=[], y=[])) self.trig_signal_plot_2 = figure(plot_width=400, plot_height=200, title="Triggered signal 2", x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='normalized input [ ]', **PLOTARGS) self.trig_signal_plot_2.yaxis[0].formatter = PrintfTickFormatter(format="%.0e") self.trig_signal_plot_2.background_fill_color = "#eaeaea" self.trig_signal_plot_2.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_2) self.spectrum_source_2 = ColumnDataSource(data=dict(f=[], y=[])) self.spectrum_plot_2 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 2", y_range=[-130, -80], x_range=[0, self.max_freq_khz], x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS) self.spectrum_plot_2.background_fill_color = "#eaeaea" self.spectrum_plot_2.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_2) self.trig_signal_source_3 = ColumnDataSource(data=dict(t=[], y=[])) self.trig_signal_plot_3 = figure(plot_width=400, plot_height=200, title="Triggered signal 3", x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='normalized input [ ]', **PLOTARGS) self.trig_signal_plot_3.yaxis[0].formatter = PrintfTickFormatter(format="%.0e") self.trig_signal_plot_3.background_fill_color = "#eaeaea" self.trig_signal_plot_3.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_3) self.spectrum_source_3 = ColumnDataSource(data=dict(f=[], y=[])) self.spectrum_plot_3 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 3", y_range=[-130, -80], x_range=[0, self.max_freq_khz], x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS) self.spectrum_plot_3.background_fill_color = "#eaeaea" self.spectrum_plot_3.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_3) self.trig_signal_source_4 = ColumnDataSource(data=dict(t=[], y=[])) self.trig_signal_plot_4 = figure(plot_width=400, plot_height=200, title="Triggered signal 4", x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='normalized input [ ]', **PLOTARGS) self.trig_signal_plot_4.yaxis[0].formatter = PrintfTickFormatter(format="%.0e") self.trig_signal_plot_4.background_fill_color = "#eaeaea" self.trig_signal_plot_4.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_4) self.spectrum_source_4 = ColumnDataSource(data=dict(f=[], y=[])) self.spectrum_plot_4 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 4", y_range=[-130, -80], x_range=[0, self.max_freq_khz], x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS) self.spectrum_plot_4.background_fill_color = "#eaeaea" self.spectrum_plot_4.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_4) # sliders for gain and threshold self.gain = Slider(start=1, end=300, value=200, step=10, title="Gain", default_size=200) self.threshold = Slider(start=0, end=1, value=0.5, step=0.025, title="Threshold", bar_color="#DC524C", orientation='vertical', css_classes=["custom-slider2"], direction='rtl', default_size=148) self.threshold.on_change('value', self.update_threshold) self.threshold_source = ColumnDataSource(data=dict(t=[0, self.timeslice + 5, self.timeslice + 5, 0], y=[self.threshold.value, self.threshold.value, -self.threshold.value, -self.threshold.value])) self.signal_plot.line(x="t", y="y", line_color="#DC524C", source=self.threshold_source) # define tab layout grid = gridplot([self.trig_signal_plot_1, self.spectrum_plot_1, self.trig_signal_plot_2, self.spectrum_plot_2, self.trig_signal_plot_3, self.spectrum_plot_3, self.trig_signal_plot_4, self.spectrum_plot_4], ncols=4, merge_tools=False) widgets_chunk = column(row(self.btn_wingbeats, self.text_status), row(Spacer(width=47), self.gain), row(self.signal_plot, Spacer(width=10), column(Spacer(height=5), self.threshold))) # added because vertical orientation broken atm style = Div(text="""<style> """) _layout = layout(row(widgets_chunk, Spacer(width=47), column(Spacer(height=48), self.autocomplete_tax.widget)), grid,) return Panel(child=_layout, title='WB - Triggered Save')
[docs] @gen.coroutine def signal_update(self, data): """ Updates the chunk and the triggered signal figures. Targeted by next tick callback from \ :meth:`sensors.Utilities.pyaudiohandler.Pyaudiohandler.update_audio_data`. The PSD of the triggered signals \ get computed and plotted in the corresponding figures. :param data: resampled chunk signal, PSD of chunk signal, resampled ringbuffer, raw triggered signal and \ filename provided by :meth:`Pyaudiohandler.update_audio_data` :type data: 1D float array, 1D float array, 1D float array, 1D int16 array and string """ if self.update_data: if data['values'] is None: return chunk_signal, _, _, triggered_signal, filename = data['values'] # iterate through triggered signal plots if triggered_signal is not None and not self.check_update == filename: self.check_update = filename triggered_signal_resamp = sg.resample_poly(triggered_signal / 32768.0, 1, self.decimation_factor, padtype='edge') y_range = max(abs(triggered_signal / 32768.0)) # warning if sensor overdrives if y_range >= 1.0: print('Wingbeat Overdrive Status: OVERDRIVE!') PSD = 10*np.log10(sg.welch(triggered_signal_resamp, fs=self.sampling_rate/self.decimation_factor, window='hann', nperseg=512, noverlap=256+128)[1]) trig_plot = self.plot_list[self.plot_counter] trig_source = self.source_list[self.plot_counter] trig_plot.y_range.start = -y_range trig_plot.y_range.end = y_range t = np.linspace(0, self.number_of_saved_chunks*self.timeslice, len(triggered_signal_resamp)) trig_source.data = dict(t=t, y=triggered_signal_resamp) spectrum_plot = self.plot_list[self.plot_counter + 4] spectrum_source = self.source_list[self.plot_counter + 4] spectrum_plot.y_range.start = np.floor(min(PSD)) spectrum_plot.y_range.end = np.ceil(max(PSD)) f = np.linspace(0, self.max_freq_khz, len(PSD)) spectrum_source.data = dict(f=f, y=PSD) self.plot_counter += 1 if self.plot_counter == 4: self.plot_counter = 0 self.rename_signal() # the if-else below are small optimization: avoid computing and sending # all the x-values, if the length has not changed if len(chunk_signal) == len(self.signal_source.data['y']): self.signal_source.data['y'] = chunk_signal*self.gain.value else: t = np.linspace(0, self.timeslice, len(chunk_signal)) self.signal_source.data = dict(t=t, y=chunk_signal*self.gain.value)
[docs] def do_update(self): """ Gets called on button click. Starts respectively ends the preview of chunk signal and changes the text status corresponding to the button label. Triggered signals only get saved when preview is active. """ if self.btn_wingbeats.label == 'Start Stream': self.btn_wingbeats.label = 'Stop Stream' self.text_status.text = self.template.format(status_text='Wingbeat stream active', colour='#FFFFFF', background='#3EA639') self.update_data = True self.trigger = True else: self.btn_wingbeats.label = 'Start Stream' self.text_status.text = self.template.format(status_text='Wingbeat stream inactive', colour='#FFFFFF', background='#DC524C') self.update_data = False self.trigger = False
[docs] def update_threshold(self, attr, old, new): """Gets called on change of value of threshold slider and updates the threshold line in the chunk signal figure. """ self.threshold_source.data['y'] = [self.threshold.value, self.threshold.value, -self.threshold.value, -self.threshold.value]
[docs] def rename_signal(self): """Gets called by :meth:`signal_update` when save of wingbeat was triggered. """ trigger_time = datetime.now() timestamp = trigger_time.strftime("%Y-%m-%d_%H-%M-%S") datetime_folder = '{}_{}'.format(self.config.sensor_id, timestamp) new_path = os.path.join(os.path.dirname(os.path.dirname(self.check_update)), datetime_folder) self.new_filename = os.path.join(new_path, '{}.{}'.format(uuid.uuid1(), 'wav')) if not os.path.exists(new_path): os.makedirs(new_path) old_directory = os.path.dirname(self.check_update) shutil.move(self.check_update, self.new_filename) print(Fore.RED + 'Signal \'' + self.check_update.split('/')[-2] + '\' has been moved to /' + self.new_filename.split('/')[-2] + ' and file renamed to \'' + self.new_filename.split('/')[-1] + '\'' + Fore.WHITE) shutil.rmtree(old_directory) # UUID for png file png_filename = os.path.join(new_path, '{}.{}'.format(uuid.uuid1(), 'png')) self.create_single_png(png_filename) self.data_dict_temp = self.autocomplete_tax.write_tax_to_data() self.data_dict_temp["date_time"] = trigger_time.strftime("%Y-%m-%dT%H:%M:%SZ") self.data_dict_temp["wingbeat"]["filename"] = os.path.basename(self.new_filename) self.data_dict_temp["wingbeat"]["sample_rate"] = self.sampling_rate self.data_dict_temp["wingbeat"]["png_filename"] = os.path.basename(png_filename) self.data_dict_temp["wingbeat"]["classifications"] = self.data_dict_temp["main_classifications"] # measure meta data and write into json Metadata = WriteMetaData(self.config, self.data_dict_temp) Metadata.write_all_data() self.data_dict_temp = Metadata.data_json # save json in datetime path with open(os.path.join(new_path, 'data.json'), 'w') as outfile: json.dump(self.data_dict_temp, outfile, indent=4, ensure_ascii=False) outfile.close()
[docs] @gen.coroutine @without_document_lock def create_single_png(self, png_filename): """Gets called by :meth:`rename_signal` and creates a ``png`` of the triggered signal and its PSD at user specified path without blocking the visualization. :return: future :rtype: object """ # create a future so button updates don't get blocked yield self.executor.submit(create_png_from_signal, self.new_filename, png_filename, self.decimation_factor)