import numpy as np
import scipy.signal as sg
from bokeh.plotting import figure
from bokeh.models import Panel, Button, ColumnDataSource, Slider, Div, PrintfTickFormatter
from bokeh.events import ButtonClick
from bokeh.layouts import layout, gridplot, column, row, Spacer
from bokeh.document import without_document_lock
from sensors.Utilities.signal_to_png import create_png_from_signal
from sensors.GUIclasses.taxonomy_autocomplete_widget import AutocompleteTaxonomy
from sensors.Utilities.write_meta_data import WriteMetaData
import os
import shutil
from tornado import gen
from datetime import datetime
from colorama import Fore
import uuid
import json
[docs]class TriggeredSaveTab:
"""Class to create a Bokeh tab that displays a life wingbeat stream of the latest audio chunk, the last four
triggered signals and their PSD. The duration of the triggered signal can be indirectly specified with the
`number_of_saved_chunks`, `chunk_size` and `sampling_rate` parameters in the :ref:`config.yaml` file.
To trigger a signal the user can adjust a threshold and a gain.
:param config: configuration including :ref:`config.yaml` dictionary and current Bokeh document
:type config: object
"""
def __init__(self, config):
"""config - configuration object
"""
self.config = config
self.autocomplete_tax = AutocompleteTaxonomy(config)
self.chunk_size = config.chunk_size
self.sampling_rate = config.sampling_rate
self.timeframe = config.timeframe
self.number_of_saved_chunks = config.number_of_saved_chunks
self.decimation_factor = config.decimation_factor
# derived parameters
self.max_freq_khz = config.sampling_rate/self.decimation_factor/2 * 0.001
self.timeslice = self.chunk_size/self.sampling_rate * 1000
self.multiples = int(np.ceil(self.timeframe/self.timeslice))
self.template = ("""<div class='content' style="background-color: {background};color: {colour};">
<div class='info'> {status_text} </div> </div>""")
# call tab method to create tab
self._tab = self.make_tab()
# create plot list to iterate through
self.source_list = (self.trig_signal_source_1, self.trig_signal_source_2,
self.trig_signal_source_3, self.trig_signal_source_4,
self.spectrum_source_1, self.spectrum_source_2,
self.spectrum_source_3, self.spectrum_source_4)
self.plot_list = (self.trig_signal_plot_1, self.trig_signal_plot_2,
self.trig_signal_plot_3, self.trig_signal_plot_4,
self.spectrum_plot_1, self.spectrum_plot_2,
self.spectrum_plot_3, self.spectrum_plot_4)
self.executor = config.executor
self.plot_counter = 0
self.update_data = False
self.trigger = False
self.check_update = None
@property
def tab(self):
"""Calls method :meth:`make_tab`.
"""
return self._tab
[docs] def make_tab(self):
""" Creates and arranges the elements of the corresponding Bokeh tab.
:return: Bokeh panel for the visualization of four triggered signals
:rtype: Bokeh object
"""
self.btn_wingbeats = Button(label='Start Stream', width=310)
self.btn_wingbeats.on_event(ButtonClick, self.do_update)
self.text_status = Div(text=self.template.format(status_text='Wingbeat stream inactive',
background='#DC524C', colour='#FFFFFF'), render_as_text=False)
PLOTARGS = dict(tools="", toolbar_location=None, outline_line_color='#595959')
self.signal_source = ColumnDataSource(data=dict(t=[], y=[]))
self.signal_plot = figure(plot_width=600, plot_height=200, title="Chunk signal",
x_range=[0, self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='gain*normalized input [ ]', **PLOTARGS)
self.signal_plot.background_fill_color = "#eaeaea"
self.signal_plot.line(x="t", y="y", line_color="#024768", source=self.signal_source)
self.trig_signal_source_1 = ColumnDataSource(data=dict(t=[], y=[]))
self.trig_signal_plot_1 = figure(plot_width=400, plot_height=200, title="Triggered signal 1",
x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.trig_signal_plot_1.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.trig_signal_plot_1.background_fill_color = "#eaeaea"
self.trig_signal_plot_1.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_1)
self.spectrum_source_1 = ColumnDataSource(data=dict(f=[], y=[]))
self.spectrum_plot_1 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 1",
y_range=[-130, -80], x_range=[0, self.max_freq_khz],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.spectrum_plot_1.background_fill_color = "#eaeaea"
self.spectrum_plot_1.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_1)
self.trig_signal_source_2 = ColumnDataSource(data=dict(t=[], y=[]))
self.trig_signal_plot_2 = figure(plot_width=400, plot_height=200, title="Triggered signal 2",
x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.trig_signal_plot_2.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.trig_signal_plot_2.background_fill_color = "#eaeaea"
self.trig_signal_plot_2.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_2)
self.spectrum_source_2 = ColumnDataSource(data=dict(f=[], y=[]))
self.spectrum_plot_2 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 2",
y_range=[-130, -80], x_range=[0, self.max_freq_khz],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.spectrum_plot_2.background_fill_color = "#eaeaea"
self.spectrum_plot_2.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_2)
self.trig_signal_source_3 = ColumnDataSource(data=dict(t=[], y=[]))
self.trig_signal_plot_3 = figure(plot_width=400, plot_height=200, title="Triggered signal 3",
x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.trig_signal_plot_3.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.trig_signal_plot_3.background_fill_color = "#eaeaea"
self.trig_signal_plot_3.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_3)
self.spectrum_source_3 = ColumnDataSource(data=dict(f=[], y=[]))
self.spectrum_plot_3 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 3",
y_range=[-130, -80], x_range=[0, self.max_freq_khz],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.spectrum_plot_3.background_fill_color = "#eaeaea"
self.spectrum_plot_3.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_3)
self.trig_signal_source_4 = ColumnDataSource(data=dict(t=[], y=[]))
self.trig_signal_plot_4 = figure(plot_width=400, plot_height=200, title="Triggered signal 4",
x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.trig_signal_plot_4.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.trig_signal_plot_4.background_fill_color = "#eaeaea"
self.trig_signal_plot_4.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source_4)
self.spectrum_source_4 = ColumnDataSource(data=dict(f=[], y=[]))
self.spectrum_plot_4 = figure(plot_width=300, plot_height=200, title="PSD of triggered signal 4",
y_range=[-130, -80], x_range=[0, self.max_freq_khz],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.spectrum_plot_4.background_fill_color = "#eaeaea"
self.spectrum_plot_4.line(x="f", y="y", line_color="#024768", source=self.spectrum_source_4)
# sliders for gain and threshold
self.gain = Slider(start=1, end=300, value=200, step=10, title="Gain", default_size=200)
self.threshold = Slider(start=0, end=1, value=0.5, step=0.025, title="Threshold", bar_color="#DC524C",
orientation='vertical', css_classes=["custom-slider2"], direction='rtl',
default_size=148)
self.threshold.on_change('value', self.update_threshold)
self.threshold_source = ColumnDataSource(data=dict(t=[0, self.timeslice + 5, self.timeslice + 5, 0],
y=[self.threshold.value, self.threshold.value,
-self.threshold.value, -self.threshold.value]))
self.signal_plot.line(x="t", y="y", line_color="#DC524C", source=self.threshold_source)
# define tab layout
grid = gridplot([self.trig_signal_plot_1, self.spectrum_plot_1, self.trig_signal_plot_2,
self.spectrum_plot_2, self.trig_signal_plot_3, self.spectrum_plot_3,
self.trig_signal_plot_4, self.spectrum_plot_4], ncols=4, merge_tools=False)
widgets_chunk = column(row(self.btn_wingbeats, self.text_status), row(Spacer(width=47), self.gain),
row(self.signal_plot, Spacer(width=10), column(Spacer(height=5), self.threshold)))
# added because vertical orientation broken atm
style = Div(text="""<style> """)
_layout = layout(row(widgets_chunk, Spacer(width=47), column(Spacer(height=48), self.autocomplete_tax.widget)),
grid,)
return Panel(child=_layout, title='WB - Triggered Save')
[docs] @gen.coroutine
def signal_update(self, data):
""" Updates the chunk and the triggered signal figures. Targeted by next tick callback from \
:meth:`sensors.Utilities.pyaudiohandler.Pyaudiohandler.update_audio_data`. The PSD of the triggered signals \
get computed and plotted in the corresponding figures.
:param data: resampled chunk signal, PSD of chunk signal, resampled ringbuffer, raw triggered signal and \
filename provided by :meth:`Pyaudiohandler.update_audio_data`
:type data: 1D float array, 1D float array, 1D float array, 1D int16 array and string
"""
if self.update_data:
if data['values'] is None:
return
chunk_signal, _, _, triggered_signal, filename = data['values']
# iterate through triggered signal plots
if triggered_signal is not None and not self.check_update == filename:
self.check_update = filename
triggered_signal_resamp = sg.resample_poly(triggered_signal / 32768.0, 1, self.decimation_factor,
padtype='edge')
y_range = max(abs(triggered_signal / 32768.0))
# warning if sensor overdrives
if y_range >= 1.0:
print('Wingbeat Overdrive Status: OVERDRIVE!')
PSD = 10*np.log10(sg.welch(triggered_signal_resamp, fs=self.sampling_rate/self.decimation_factor,
window='hann', nperseg=512, noverlap=256+128)[1])
trig_plot = self.plot_list[self.plot_counter]
trig_source = self.source_list[self.plot_counter]
trig_plot.y_range.start = -y_range
trig_plot.y_range.end = y_range
t = np.linspace(0, self.number_of_saved_chunks*self.timeslice, len(triggered_signal_resamp))
trig_source.data = dict(t=t, y=triggered_signal_resamp)
spectrum_plot = self.plot_list[self.plot_counter + 4]
spectrum_source = self.source_list[self.plot_counter + 4]
spectrum_plot.y_range.start = np.floor(min(PSD))
spectrum_plot.y_range.end = np.ceil(max(PSD))
f = np.linspace(0, self.max_freq_khz, len(PSD))
spectrum_source.data = dict(f=f, y=PSD)
self.plot_counter += 1
if self.plot_counter == 4:
self.plot_counter = 0
self.rename_signal()
# the if-else below are small optimization: avoid computing and sending
# all the x-values, if the length has not changed
if len(chunk_signal) == len(self.signal_source.data['y']):
self.signal_source.data['y'] = chunk_signal*self.gain.value
else:
t = np.linspace(0, self.timeslice, len(chunk_signal))
self.signal_source.data = dict(t=t, y=chunk_signal*self.gain.value)
[docs] def do_update(self):
""" Gets called on button click. Starts respectively ends the preview of chunk signal and
changes the text status corresponding to the button label. Triggered signals only get saved
when preview is active.
"""
if self.btn_wingbeats.label == 'Start Stream':
self.btn_wingbeats.label = 'Stop Stream'
self.text_status.text = self.template.format(status_text='Wingbeat stream active', colour='#FFFFFF',
background='#3EA639')
self.update_data = True
self.trigger = True
else:
self.btn_wingbeats.label = 'Start Stream'
self.text_status.text = self.template.format(status_text='Wingbeat stream inactive', colour='#FFFFFF',
background='#DC524C')
self.update_data = False
self.trigger = False
[docs] def update_threshold(self, attr, old, new):
"""Gets called on change of value of threshold slider and updates the threshold line in the chunk signal figure.
"""
self.threshold_source.data['y'] = [self.threshold.value, self.threshold.value,
-self.threshold.value, -self.threshold.value]
[docs] def rename_signal(self):
"""Gets called by :meth:`signal_update` when save of wingbeat was triggered.
"""
trigger_time = datetime.now()
timestamp = trigger_time.strftime("%Y-%m-%d_%H-%M-%S")
datetime_folder = '{}_{}'.format(self.config.sensor_id, timestamp)
new_path = os.path.join(os.path.dirname(os.path.dirname(self.check_update)), datetime_folder)
self.new_filename = os.path.join(new_path, '{}.{}'.format(uuid.uuid1(), 'wav'))
if not os.path.exists(new_path):
os.makedirs(new_path)
old_directory = os.path.dirname(self.check_update)
shutil.move(self.check_update, self.new_filename)
print(Fore.RED + 'Signal \'' + self.check_update.split('/')[-2] + '\' has been moved to /'
+ self.new_filename.split('/')[-2] + ' and file renamed to \'' + self.new_filename.split('/')[-1]
+ '\'' + Fore.WHITE)
shutil.rmtree(old_directory)
# UUID for png file
png_filename = os.path.join(new_path, '{}.{}'.format(uuid.uuid1(), 'png'))
self.create_single_png(png_filename)
self.data_dict_temp = self.autocomplete_tax.write_tax_to_data()
self.data_dict_temp["date_time"] = trigger_time.strftime("%Y-%m-%dT%H:%M:%SZ")
self.data_dict_temp["wingbeat"]["filename"] = os.path.basename(self.new_filename)
self.data_dict_temp["wingbeat"]["sample_rate"] = self.sampling_rate
self.data_dict_temp["wingbeat"]["png_filename"] = os.path.basename(png_filename)
self.data_dict_temp["wingbeat"]["classifications"] = self.data_dict_temp["main_classifications"]
# measure meta data and write into json
Metadata = WriteMetaData(self.config, self.data_dict_temp)
Metadata.write_all_data()
self.data_dict_temp = Metadata.data_json
# save json in datetime path
with open(os.path.join(new_path, 'data.json'), 'w') as outfile:
json.dump(self.data_dict_temp, outfile, indent=4, ensure_ascii=False)
outfile.close()
[docs] @gen.coroutine
@without_document_lock
def create_single_png(self, png_filename):
"""Gets called by :meth:`rename_signal` and creates a ``png`` of the triggered signal and its PSD
at user specified path without blocking the visualization.
:return: future
:rtype: object
"""
# create a future so button updates don't get blocked
yield self.executor.submit(create_png_from_signal, self.new_filename, png_filename, self.decimation_factor)