Source code for sensors.GUIclasses.single_save

import numpy as np
import scipy.signal as sg

from bokeh.plotting import figure
from bokeh.models import Panel, Button, ColumnDataSource, Slider, Div, PrintfTickFormatter
from bokeh.models.widgets import TextInput
from bokeh.events import ButtonClick
from bokeh.layouts import layout, gridplot, column, row, Spacer
from bokeh.document import without_document_lock

from sensors.Utilities.signal_to_png import create_png_from_signal
from sensors.GUIclasses.taxonomy_autocomplete_widget import AutocompleteTaxonomy
from sensors.Utilities.write_meta_data import WriteMetaData

from colorama import Fore

from tornado import gen
import time
from datetime import datetime
import shutil
import os

import uuid
import json


[docs]class SingleSaveTab: """Class to create a Bokeh tab that displays a life wingbeat stream of the latest audio chunk and the last triggered signal and its PSD. The duration of the triggered signal can be indirectly specified with the `number_of_saved_chunks`, `chunk_size` and `sampling_rate` parameters in the :ref:`config.yaml` file. To trigger a signal the user can adjust a threshold and a gain. The signal ``wav`` file can be saved to a user specified path together with a png of the signal and its PSD. :param config: configuration including :ref:`config.yaml` dictionary and current Bokeh document :type config: object """ def __init__(self, config): self.config = config self.curdoc = config.curdoc self.autocomplete_tax = AutocompleteTaxonomy(config) self.chunk_size = config.chunk_size self.sampling_rate = config.sampling_rate self.timeframe = config.timeframe self.number_of_saved_chunks = config.number_of_saved_chunks self.decimation_factor = config.decimation_factor # derived parameters self.max_freq_khz = config.sampling_rate/self.decimation_factor/2 * 0.001 self.timeslice = self.chunk_size/self.sampling_rate * 1000 self.multiples = int(np.ceil(self.timeframe/self.timeslice)) self.template_1 = """<div class='content' style="background-color: {background};color: {colour};"> <div class='info'> {status_text} </div> </div>""" self.template_2 = """<div class='content' style="background-color: {background};color: {colour};width: {w};"> <div class='info'> {status_text} </div> </div>""" self.root = os.environ['HOME'] # call tab method to create tab self._tab = self.make_tab() self.executor = config.executor self.update_data = False self.trigger = False self.check_update = None self.datetime_folder = "" @property def tab(self): """Calls method :meth:`make_tab`. """ return self._tab
[docs] def make_tab(self): """ Creates and arranges the elements of the corresponding Bokeh tab. :return: Bokeh panel for the visualization of a single triggered signal and customization of file location :rtype: Bokeh object """ self.btn_wingbeats = Button(label='Start Stream', width=310) self.btn_wingbeats.on_event(ButtonClick, self.do_update) self.btn_save = Button(label='Save Signal', width=200, button_type="success") self.btn_save.disabled = True self.btn_save.on_event(ButtonClick, self.rename_signal) self.btn_delete = Button(label='Delete Signal', width=200, button_type="danger") self.btn_delete.disabled = True self.btn_delete.on_event(ButtonClick, self.delete_signal) self.foldername = TextInput(title='Folder name', width=200, value='choose_folder') self.foldername.on_change('value', self.create_new_filename) self.text_status = Div(text=self.template_1.format(status_text='Wingbeat stream inactive', background='#DC524C', colour='#FFFFFF'), render_as_text=False) self.save_status = Div(text=self.template_2. format(status_text='{} {}'.format('Signals will be saved to', os.path.join(self.root, self.foldername.value)), w='410px', background='#eaeaea', colour='#000000'), render_as_text=False) self.overdrive_status = Div(text=self.template_2.format(status_text='Wingbeat Overdrive Status', w='410px', background='#eaeaea', colour='#000000'), render_as_text=False) PLOTARGS = dict(tools="", toolbar_location=None, outline_line_color='#595959') self.signal_source = ColumnDataSource(data=dict(t=[], y=[])) self.signal_plot = figure(plot_width=600, plot_height=200, title="Chunk signal", x_range=[0, self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='gain*normalized input [ ]', **PLOTARGS) self.signal_plot.background_fill_color = "#eaeaea" self.signal_plot.line(x="t", y="y", line_color="#024768", source=self.signal_source) self.trig_signal_source = ColumnDataSource(data=dict(t=[], y=[])) self.trig_signal_plot = figure(plot_width=600, plot_height=250, title="Triggered signal", x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1], x_axis_label='discrete time [ms]', y_axis_label='normalized input [ ]', **PLOTARGS) self.trig_signal_plot.yaxis[0].formatter = PrintfTickFormatter(format="%.0e") self.trig_signal_plot.background_fill_color = "#eaeaea" self.trig_signal_plot.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source) overdrive_source = ColumnDataSource(data=dict(t=[0, self.number_of_saved_chunks*self.timeslice + 5, self.number_of_saved_chunks*self.timeslice + 5, 0], y=[1.0, 1.0, -1.0, -1.0])) self.trig_signal_plot.line(x="t", y="y", line_color="#DC524C", line_width=3, source=overdrive_source) self.spectrum_source = ColumnDataSource(data=dict(f=[], y=[])) self.spectrum_plot = figure(plot_width=600, plot_height=250, title="PSD of triggered signal", y_range=[-130, -90], x_range=[0, self.max_freq_khz], x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS) self.spectrum_plot.background_fill_color = "#eaeaea" self.spectrum_plot.line(x="f", y="y", line_color="#024768", source=self.spectrum_source) # sliders for gain and threshold self.gain = Slider(start=1, end=300, value=200, step=10, title="Gain", default_size=200) self.threshold = Slider(start=0, end=1, value=0.5, step=0.025, title="Threshold", bar_color="#DC524C", orientation='vertical', css_classes=["custom-slider2"], direction='rtl', default_size=148) self.threshold.on_change('value', self.update_threshold) self.threshold_source = ColumnDataSource(data=dict(t=[0, self.timeslice + 5, self.timeslice + 5, 0], y=[self.threshold.value, self.threshold.value, -self.threshold.value, -self.threshold.value])) self.signal_plot.line(x="t", y="y", line_color="#DC524C", source=self.threshold_source) # define tab layout widgets_chunk = column(row(self.btn_wingbeats, self.text_status), row(Spacer(width=47), self.gain), row(self.signal_plot, Spacer(width=10), column(Spacer(height=5), self.threshold))) # define layout _layout = layout( widgets_chunk, row(column(self.trig_signal_plot, self.spectrum_plot), column(Spacer(height=24), self.autocomplete_tax.widget, self.foldername, self.save_status, row(self.btn_save, self.btn_delete), self.overdrive_status), spacing=15) ) return Panel(child=_layout, title='WB - Single Save')
[docs] @gen.coroutine def signal_update(self, data): """ Updates the chunk and the triggered signal. Targeted by next tick callback from \ :meth:`sensors.Utilities.pyaudiohandler.Pyaudiohandler.update_audio_data`. A PSD of the \ triggered signal gets computed and plotted in the corresponding figure. :param data: resampled chunk signal, PSD of chunk signal, resampled ringbuffer, raw triggered signal and \ filename provided by :meth:`Pyaudiohandler.update_audio_data` :type data: 1D float array, 1D float array, 1D float array, 1D int16 array and string """ if self.update_data: if data['values'] is None: return start = time.time() chunk_signal, _, _, triggered_signal, filename = data['values'] # iterate through triggered signal plots if triggered_signal is not None and not self.check_update == filename: self.check_update = filename self.old_filename = filename self.create_new_datetime('value', '', '') triggered_signal_resamp = sg.resample_poly(triggered_signal / 32768.0, 1, self.decimation_factor, padtype='edge') y_range = max(abs(triggered_signal / 32768.0)) # warning if sensor overdrives if y_range >= 1.0: self.overdrive_status.text = self.template_2.\ format(status_text='Wingbeat Overdrive Status: OVERDRIVE!', colour='#FFFFFF', w='410px', background='#DC524C') else: self.overdrive_status.text = self.template_2.\ format(status_text='Wingbeat Overdrive Status: Ok!', colour='#FFFFFF', w='410px', background='#3EA639') self.trig_signal_plot.y_range.start = -y_range self.trig_signal_plot.y_range.end = y_range PSD = 10*np.log10(sg.welch(triggered_signal_resamp, fs=self.sampling_rate/self.decimation_factor, window='hann', nperseg=512, noverlap=256+128)[1]) self.spectrum_plot.y_range.start = np.floor(min(PSD)) self.spectrum_plot.y_range.end = np.ceil(max(PSD)) t = np.linspace(0, self.number_of_saved_chunks*self.timeslice, len(triggered_signal_resamp)) self.trig_signal_source.data = dict(t=t, y=triggered_signal_resamp) # print(time.time()-start) f = np.linspace(0, self.max_freq_khz, len(PSD)) self.spectrum_source.data = dict(f=f, y=PSD) # stop stream after single triggered save self.do_update() # the if-else below are small optimization: avoid computing and sending # all the x-values, if the length has not changed if len(chunk_signal) == len(self.signal_source.data['y']): self.signal_source.data['y'] = chunk_signal*self.gain.value else: t = np.linspace(0, self.timeslice, len(chunk_signal)) self.signal_source.data = dict(t=t, y=chunk_signal*self.gain.value)
[docs] def do_update(self): """ Gets called on button click. Starts respectively ends the preview of chunk signal and changes the text status corresponding to the button label. Triggered signal only gets saved when preview is active. """ self.create_new_filename('value', '', '') if self.btn_wingbeats.label == 'Start Stream': self.btn_wingbeats.label = 'Stop Stream' self.text_status.text = self.template_1.format(status_text='Wingbeat stream active', colour='#FFFFFF', background='#3EA639') self.check_update = None self.btn_save.disabled = True self.btn_delete.disabled = True self.update_data = True self.trigger = True self.datetime_folder = "" else: self.btn_wingbeats.label = 'Start Stream' self.text_status.text = self.template_1.format(status_text='Wingbeat stream inactive', colour='#FFFFFF', background='#DC524C') self.create_new_filename('value', '', '') if self.check_update is not None: self.btn_wingbeats.disabled = True self.btn_save.disabled = False self.btn_delete.disabled = False self.update_data = False self.trigger = False
[docs] def update_threshold(self, attr, old, new): """Gets called on change of value of threshold slider and updates the threshold line in the chunk signal figure. """ self.threshold_source.data['y'] = [self.threshold.value, self.threshold.value, -self.threshold.value, -self.threshold.value]
[docs] def create_new_datetime(self, attr, new, old): """Gets called on :meth:`signal_update` after a new signal has been triggered. Creates a datetime folder in import/export format to store the triggered signal and corresponding PSD png. """ self.trigger_time = datetime.now() timestamp = self.trigger_time.strftime("%Y-%m-%d_%H-%M-%S") self.datetime_folder = '{}_{}'.format(self.config.sensor_id, timestamp)
[docs] def create_new_filename(self, attr, new, old): """Gets called on change of value of customized folder name text input widget and on :meth:`do_update`. Creates a user specified path to save the triggered signal. """ self.new_filename = os.path.join(self.root, self.foldername.value, self.datetime_folder, '{}.{}'.format(uuid.uuid1(), 'wav')) self.save_status.text = self.template_2.\ format(status_text='{} {}'. format('Signal will be saved to', os.path.join(self.root, self.foldername.value)), w='410px', background='#eaeaea', colour='#000000')
[docs] def delete_signal(self): """Gets called on button click. Deletes the last triggered signal. """ self.trig_signal_source.data = dict(t=[], y=[]) self.spectrum_source.data = dict(f=[], y=[]) self.btn_wingbeats.disabled = False self.btn_save.disabled = True self.btn_delete.disabled = True old_directory = os.path.dirname(self.old_filename) try: shutil.rmtree(old_directory) self.save_status.text = self.template_2.format(status_text='File deleted!', w='410px', background='#DC524C', colour='#FFFFFF') print(Fore.RED + 'Signal \'' + self.old_filename.split('/')[-2] + '\' has been deleted by user!' + Fore.WHITE) except: self.save_status.text = self.template_2.format(status_text='File not found!', w='410px', background='#DC524C', colour='#FFFFFF')
[docs] def rename_signal(self): """Gets called on button click. Renames file to the one created by :meth:`create_new_filename` """ path = os.path.dirname(self.new_filename) self.btn_wingbeats.disabled = False self.btn_save.disabled = True self.btn_delete.disabled = True print(self.new_filename.replace(".wav", ".png")) print(os.path.join(os.path.dirname(self.new_filename), '{}.{}'.format(uuid.uuid1(), 'png'))) try: if not os.path.exists(path): os.makedirs(path) old_directory = os.path.dirname(self.old_filename) shutil.move(self.old_filename, self.new_filename) self.save_status.text = self.template_2.\ format(status_text='{} {}'.format('Saved to', os.path.join(self.root, self.foldername.value, self.datetime_folder)), w='410px', background='#3EA639', colour='#FFFFFF') print(Fore.RED + 'Signal \'' + self.old_filename.split('/')[-2] + '\' has been moved to /' + self.new_filename.split('/')[-2] + ' and file renamed to \'' + self.new_filename.split('/')[-1] + '\'' + Fore.WHITE) shutil.rmtree(old_directory) # UUID for png file png_filename = os.path.join(os.path.dirname(self.new_filename), '{}.{}'.format(uuid.uuid1(), 'png')) self.create_single_png(png_filename) self.data_dict_temp = self.autocomplete_tax.write_tax_to_data() self.data_dict_temp["date_time"] = self.trigger_time.strftime("%Y-%m-%dT%H:%M:%SZ") self.data_dict_temp["wingbeat"]["filename"] = os.path.basename(self.new_filename) self.data_dict_temp["wingbeat"]["sample_rate"] = self.sampling_rate self.data_dict_temp["wingbeat"]["png_filename"] = os.path.basename(png_filename) self.data_dict_temp["wingbeat"]["classifications"] = self.data_dict_temp["main_classifications"] # measure meta data and write into json Metadata = WriteMetaData(self.config, self.data_dict_temp) Metadata.write_all_data() self.data_dict_temp = Metadata.data_json # save json in datetime path with open(os.path.join(self.root, self.foldername.value, self.datetime_folder, 'data.json'), 'w') as outfile: json.dump(self.data_dict_temp, outfile, indent=4, ensure_ascii=False) except: self.save_status.text = self.template_2.format(status_text='File not found!', w='410px', background='#DC524C', colour='#FFFFFF')
[docs] @gen.coroutine @without_document_lock def create_single_png(self, png_filename): """Gets called by :meth:`rename_signal` and creates a ``png`` of the triggered signal and its PSD at user specified path without blocking the visualization. :return: future :rtype: object """ # create a future so button updates don't get blocked yield self.executor.submit(create_png_from_signal, self.new_filename, png_filename, self.decimation_factor)