import numpy as np
import scipy.signal as sg
from bokeh.plotting import figure
from bokeh.models import Panel, Button, ColumnDataSource, Slider, Div, PrintfTickFormatter
from bokeh.models.widgets import TextInput
from bokeh.events import ButtonClick
from bokeh.layouts import layout, gridplot, column, row, Spacer
from bokeh.document import without_document_lock
from sensors.Utilities.signal_to_png import create_png_from_signal
from sensors.GUIclasses.taxonomy_autocomplete_widget import AutocompleteTaxonomy
from sensors.Utilities.write_meta_data import WriteMetaData
from colorama import Fore
from tornado import gen
import time
from datetime import datetime
import shutil
import os
import uuid
import json
[docs]class SingleSaveTab:
"""Class to create a Bokeh tab that displays a life wingbeat stream of the latest audio chunk and the last triggered
signal and its PSD. The duration of the triggered signal can be indirectly specified with the
`number_of_saved_chunks`, `chunk_size` and `sampling_rate` parameters in the :ref:`config.yaml` file. To trigger a
signal the user can adjust a threshold and a gain. The signal ``wav`` file can be saved to a user specified path
together with a png of the signal and its PSD.
:param config: configuration including :ref:`config.yaml` dictionary and current Bokeh document
:type config: object
"""
def __init__(self, config):
self.config = config
self.curdoc = config.curdoc
self.autocomplete_tax = AutocompleteTaxonomy(config)
self.chunk_size = config.chunk_size
self.sampling_rate = config.sampling_rate
self.timeframe = config.timeframe
self.number_of_saved_chunks = config.number_of_saved_chunks
self.decimation_factor = config.decimation_factor
# derived parameters
self.max_freq_khz = config.sampling_rate/self.decimation_factor/2 * 0.001
self.timeslice = self.chunk_size/self.sampling_rate * 1000
self.multiples = int(np.ceil(self.timeframe/self.timeslice))
self.template_1 = """<div class='content' style="background-color: {background};color: {colour};">
<div class='info'> {status_text} </div> </div>"""
self.template_2 = """<div class='content' style="background-color: {background};color: {colour};width: {w};">
<div class='info'> {status_text} </div> </div>"""
self.root = os.environ['HOME']
# call tab method to create tab
self._tab = self.make_tab()
self.executor = config.executor
self.update_data = False
self.trigger = False
self.check_update = None
self.datetime_folder = ""
@property
def tab(self):
"""Calls method :meth:`make_tab`.
"""
return self._tab
[docs] def make_tab(self):
""" Creates and arranges the elements of the corresponding Bokeh tab.
:return: Bokeh panel for the visualization of a single triggered signal and customization of file location
:rtype: Bokeh object
"""
self.btn_wingbeats = Button(label='Start Stream', width=310)
self.btn_wingbeats.on_event(ButtonClick, self.do_update)
self.btn_save = Button(label='Save Signal', width=200, button_type="success")
self.btn_save.disabled = True
self.btn_save.on_event(ButtonClick, self.rename_signal)
self.btn_delete = Button(label='Delete Signal', width=200, button_type="danger")
self.btn_delete.disabled = True
self.btn_delete.on_event(ButtonClick, self.delete_signal)
self.foldername = TextInput(title='Folder name', width=200, value='choose_folder')
self.foldername.on_change('value', self.create_new_filename)
self.text_status = Div(text=self.template_1.format(status_text='Wingbeat stream inactive',
background='#DC524C', colour='#FFFFFF'),
render_as_text=False)
self.save_status = Div(text=self.template_2.
format(status_text='{} {}'.format('Signals will be saved to',
os.path.join(self.root, self.foldername.value)),
w='410px', background='#eaeaea', colour='#000000'), render_as_text=False)
self.overdrive_status = Div(text=self.template_2.format(status_text='Wingbeat Overdrive Status', w='410px',
background='#eaeaea', colour='#000000'), render_as_text=False)
PLOTARGS = dict(tools="", toolbar_location=None, outline_line_color='#595959')
self.signal_source = ColumnDataSource(data=dict(t=[], y=[]))
self.signal_plot = figure(plot_width=600, plot_height=200, title="Chunk signal",
x_range=[0, self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='gain*normalized input [ ]', **PLOTARGS)
self.signal_plot.background_fill_color = "#eaeaea"
self.signal_plot.line(x="t", y="y", line_color="#024768", source=self.signal_source)
self.trig_signal_source = ColumnDataSource(data=dict(t=[], y=[]))
self.trig_signal_plot = figure(plot_width=600, plot_height=250, title="Triggered signal",
x_range=[0, self.number_of_saved_chunks*self.timeslice], y_range=[-1, 1],
x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.trig_signal_plot.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.trig_signal_plot.background_fill_color = "#eaeaea"
self.trig_signal_plot.line(x="t", y="y", line_color="#024768", source=self.trig_signal_source)
overdrive_source = ColumnDataSource(data=dict(t=[0, self.number_of_saved_chunks*self.timeslice + 5,
self.number_of_saved_chunks*self.timeslice + 5, 0],
y=[1.0, 1.0, -1.0, -1.0]))
self.trig_signal_plot.line(x="t", y="y", line_color="#DC524C", line_width=3, source=overdrive_source)
self.spectrum_source = ColumnDataSource(data=dict(f=[], y=[]))
self.spectrum_plot = figure(plot_width=600, plot_height=250, title="PSD of triggered signal",
y_range=[-130, -90], x_range=[0, self.max_freq_khz],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.spectrum_plot.background_fill_color = "#eaeaea"
self.spectrum_plot.line(x="f", y="y", line_color="#024768", source=self.spectrum_source)
# sliders for gain and threshold
self.gain = Slider(start=1, end=300, value=200, step=10, title="Gain", default_size=200)
self.threshold = Slider(start=0, end=1, value=0.5, step=0.025, title="Threshold", bar_color="#DC524C",
orientation='vertical', css_classes=["custom-slider2"], direction='rtl',
default_size=148)
self.threshold.on_change('value', self.update_threshold)
self.threshold_source = ColumnDataSource(data=dict(t=[0, self.timeslice + 5, self.timeslice + 5, 0],
y=[self.threshold.value, self.threshold.value,
-self.threshold.value, -self.threshold.value]))
self.signal_plot.line(x="t", y="y", line_color="#DC524C", source=self.threshold_source)
# define tab layout
widgets_chunk = column(row(self.btn_wingbeats, self.text_status), row(Spacer(width=47), self.gain),
row(self.signal_plot, Spacer(width=10), column(Spacer(height=5), self.threshold)))
# define layout
_layout = layout(
widgets_chunk,
row(column(self.trig_signal_plot, self.spectrum_plot),
column(Spacer(height=24), self.autocomplete_tax.widget, self.foldername, self.save_status,
row(self.btn_save, self.btn_delete), self.overdrive_status), spacing=15)
)
return Panel(child=_layout, title='WB - Single Save')
[docs] @gen.coroutine
def signal_update(self, data):
""" Updates the chunk and the triggered signal. Targeted by next tick callback from \
:meth:`sensors.Utilities.pyaudiohandler.Pyaudiohandler.update_audio_data`. A PSD of the \
triggered signal gets computed and plotted in the corresponding figure.
:param data: resampled chunk signal, PSD of chunk signal, resampled ringbuffer, raw triggered signal and \
filename provided by :meth:`Pyaudiohandler.update_audio_data`
:type data: 1D float array, 1D float array, 1D float array, 1D int16 array and string
"""
if self.update_data:
if data['values'] is None:
return
start = time.time()
chunk_signal, _, _, triggered_signal, filename = data['values']
# iterate through triggered signal plots
if triggered_signal is not None and not self.check_update == filename:
self.check_update = filename
self.old_filename = filename
self.create_new_datetime('value', '', '')
triggered_signal_resamp = sg.resample_poly(triggered_signal / 32768.0, 1, self.decimation_factor,
padtype='edge')
y_range = max(abs(triggered_signal / 32768.0))
# warning if sensor overdrives
if y_range >= 1.0:
self.overdrive_status.text = self.template_2.\
format(status_text='Wingbeat Overdrive Status: OVERDRIVE!', colour='#FFFFFF',
w='410px', background='#DC524C')
else:
self.overdrive_status.text = self.template_2.\
format(status_text='Wingbeat Overdrive Status: Ok!', colour='#FFFFFF',
w='410px', background='#3EA639')
self.trig_signal_plot.y_range.start = -y_range
self.trig_signal_plot.y_range.end = y_range
PSD = 10*np.log10(sg.welch(triggered_signal_resamp, fs=self.sampling_rate/self.decimation_factor,
window='hann', nperseg=512, noverlap=256+128)[1])
self.spectrum_plot.y_range.start = np.floor(min(PSD))
self.spectrum_plot.y_range.end = np.ceil(max(PSD))
t = np.linspace(0, self.number_of_saved_chunks*self.timeslice, len(triggered_signal_resamp))
self.trig_signal_source.data = dict(t=t, y=triggered_signal_resamp)
# print(time.time()-start)
f = np.linspace(0, self.max_freq_khz, len(PSD))
self.spectrum_source.data = dict(f=f, y=PSD)
# stop stream after single triggered save
self.do_update()
# the if-else below are small optimization: avoid computing and sending
# all the x-values, if the length has not changed
if len(chunk_signal) == len(self.signal_source.data['y']):
self.signal_source.data['y'] = chunk_signal*self.gain.value
else:
t = np.linspace(0, self.timeslice, len(chunk_signal))
self.signal_source.data = dict(t=t, y=chunk_signal*self.gain.value)
[docs] def do_update(self):
""" Gets called on button click. Starts respectively ends the preview of chunk signal and
changes the text status corresponding to the button label. Triggered signal only gets saved
when preview is active.
"""
self.create_new_filename('value', '', '')
if self.btn_wingbeats.label == 'Start Stream':
self.btn_wingbeats.label = 'Stop Stream'
self.text_status.text = self.template_1.format(status_text='Wingbeat stream active', colour='#FFFFFF',
background='#3EA639')
self.check_update = None
self.btn_save.disabled = True
self.btn_delete.disabled = True
self.update_data = True
self.trigger = True
self.datetime_folder = ""
else:
self.btn_wingbeats.label = 'Start Stream'
self.text_status.text = self.template_1.format(status_text='Wingbeat stream inactive', colour='#FFFFFF',
background='#DC524C')
self.create_new_filename('value', '', '')
if self.check_update is not None:
self.btn_wingbeats.disabled = True
self.btn_save.disabled = False
self.btn_delete.disabled = False
self.update_data = False
self.trigger = False
[docs] def update_threshold(self, attr, old, new):
"""Gets called on change of value of threshold slider and updates the threshold line in the chunk signal figure.
"""
self.threshold_source.data['y'] = [self.threshold.value, self.threshold.value,
-self.threshold.value, -self.threshold.value]
[docs] def create_new_datetime(self, attr, new, old):
"""Gets called on :meth:`signal_update` after a new signal has been triggered.
Creates a datetime folder in import/export format to store the triggered signal and corresponding PSD png.
"""
self.trigger_time = datetime.now()
timestamp = self.trigger_time.strftime("%Y-%m-%d_%H-%M-%S")
self.datetime_folder = '{}_{}'.format(self.config.sensor_id, timestamp)
[docs] def create_new_filename(self, attr, new, old):
"""Gets called on change of value of customized folder name text input widget and on :meth:`do_update`.
Creates a user specified path to save the triggered signal.
"""
self.new_filename = os.path.join(self.root, self.foldername.value,
self.datetime_folder, '{}.{}'.format(uuid.uuid1(), 'wav'))
self.save_status.text = self.template_2.\
format(status_text='{} {}'.
format('Signal will be saved to', os.path.join(self.root, self.foldername.value)),
w='410px', background='#eaeaea', colour='#000000')
[docs] def delete_signal(self):
"""Gets called on button click. Deletes the last triggered signal.
"""
self.trig_signal_source.data = dict(t=[], y=[])
self.spectrum_source.data = dict(f=[], y=[])
self.btn_wingbeats.disabled = False
self.btn_save.disabled = True
self.btn_delete.disabled = True
old_directory = os.path.dirname(self.old_filename)
try:
shutil.rmtree(old_directory)
self.save_status.text = self.template_2.format(status_text='File deleted!',
w='410px', background='#DC524C', colour='#FFFFFF')
print(Fore.RED + 'Signal \'' + self.old_filename.split('/')[-2] +
'\' has been deleted by user!' + Fore.WHITE)
except:
self.save_status.text = self.template_2.format(status_text='File not found!',
w='410px', background='#DC524C', colour='#FFFFFF')
[docs] def rename_signal(self):
"""Gets called on button click. Renames file to the one created by :meth:`create_new_filename`
"""
path = os.path.dirname(self.new_filename)
self.btn_wingbeats.disabled = False
self.btn_save.disabled = True
self.btn_delete.disabled = True
print(self.new_filename.replace(".wav", ".png"))
print(os.path.join(os.path.dirname(self.new_filename), '{}.{}'.format(uuid.uuid1(), 'png')))
try:
if not os.path.exists(path):
os.makedirs(path)
old_directory = os.path.dirname(self.old_filename)
shutil.move(self.old_filename, self.new_filename)
self.save_status.text = self.template_2.\
format(status_text='{} {}'.format('Saved to', os.path.join(self.root, self.foldername.value,
self.datetime_folder)),
w='410px', background='#3EA639', colour='#FFFFFF')
print(Fore.RED + 'Signal \'' + self.old_filename.split('/')[-2] + '\' has been moved to /'
+ self.new_filename.split('/')[-2] + ' and file renamed to \'' + self.new_filename.split('/')[-1]
+ '\'' + Fore.WHITE)
shutil.rmtree(old_directory)
# UUID for png file
png_filename = os.path.join(os.path.dirname(self.new_filename), '{}.{}'.format(uuid.uuid1(), 'png'))
self.create_single_png(png_filename)
self.data_dict_temp = self.autocomplete_tax.write_tax_to_data()
self.data_dict_temp["date_time"] = self.trigger_time.strftime("%Y-%m-%dT%H:%M:%SZ")
self.data_dict_temp["wingbeat"]["filename"] = os.path.basename(self.new_filename)
self.data_dict_temp["wingbeat"]["sample_rate"] = self.sampling_rate
self.data_dict_temp["wingbeat"]["png_filename"] = os.path.basename(png_filename)
self.data_dict_temp["wingbeat"]["classifications"] = self.data_dict_temp["main_classifications"]
# measure meta data and write into json
Metadata = WriteMetaData(self.config, self.data_dict_temp)
Metadata.write_all_data()
self.data_dict_temp = Metadata.data_json
# save json in datetime path
with open(os.path.join(self.root, self.foldername.value,
self.datetime_folder, 'data.json'), 'w') as outfile:
json.dump(self.data_dict_temp, outfile, indent=4, ensure_ascii=False)
except:
self.save_status.text = self.template_2.format(status_text='File not found!',
w='410px', background='#DC524C', colour='#FFFFFF')
[docs] @gen.coroutine
@without_document_lock
def create_single_png(self, png_filename):
"""Gets called by :meth:`rename_signal` and creates a ``png`` of the triggered signal and its PSD
at user specified path without blocking the visualization.
:return: future
:rtype: object
"""
# create a future so button updates don't get blocked
yield self.executor.submit(create_png_from_signal, self.new_filename, png_filename, self.decimation_factor)