from bokeh.plotting import figure
from bokeh.models import Panel, Button, Div, ColumnDataSource, Slider, Spacer, LogColorMapper, PrintfTickFormatter
from bokeh.palettes import Blues256
from bokeh.events import ButtonClick
from bokeh.layouts import layout, column, row, gridplot
from bokeh.document import without_document_lock
from sensors.Utilities.signal_to_png import create_png_from_signal
from sensors.GUIclasses.taxonomy_autocomplete_widget import AutocompleteTaxonomy
import scipy.signal as sg
from scipy.io import wavfile
import numpy as np
import pandas as pd
import os
import tkinter as tk
from tkinter import filedialog
import json
from collections import OrderedDict
from colorama import Fore
from time import sleep
from tornado import gen
[docs]class DataViewerTab:
"""Class to create a Bokeh tab that enables the display of measured images and wingbeat signals. Additional
ViewerTabs can be created on button click that calls :meth:`sensors.main.add_viewer_tab`.
:param config: configuration including :ref:`config.yaml` dictionary and current Bokeh document
:type config: object
"""
def __init__(self, config):
self.config = config
self.curdoc = config.curdoc
self.autocomplete_tax = AutocompleteTaxonomy(config)
self.data = None
self.viewer_counter = config.curdoc.session_context.request.viewer_counter
self.template_status = """<div class='content' style="width: {w}; background-color:
{background};color: {colour};">
<div class='info'> {status_text} </div> </div>"""
self.template_png = """<div class='image_content'>"""
self.template_alert = """<div class='warning'><div class='info'> {status_text} </div> </div>"""
self.decimation_factor = config.decimation_factor
self.max_freq_khz = int(self.config.sampling_rate/self.decimation_factor/2/1000)
# signal to png variables
self.executor = config.executor
# suppress opening of second tkinter window
try:
root = tk.Tk()
root.withdraw()
self.x11_needed = False
except:
print(Fore.RED + 'Enable X11 forwarding on your remote system!', Fore.WHITE)
self.x11_needed = True
# call tab method to create tab
self._tab = self.make_tab()
@property
def tab(self):
"""Calls method :meth:`make_tab`.
"""
return self._tab
[docs] def make_tab(self):
""" Creates and arranges the elements of the corresponding Bokeh tab.
:return: Bokeh panel for the visualization of measured data
:rtype: Bokeh object
"""
if self.x11_needed:
X11_warning = 'Enable X11 forwarding on your remote system!<br/>Data Viewer not available!'
self.text_alert = Div(text=self.template_alert.format(status_text=X11_warning, w='620px'))
_layout = layout(self.text_alert)
else:
# text info
self.text_status_img = Div(text=self.template_status.format(status_text='Open an image file',
background='#eaeaea',
colour='#000000', w='320px'))
self.text_status_sig = Div(text=self.template_status.format(status_text='Open a signal file',
background='#eaeaea',
colour='#000000', w='320px'))
# define container
self.image_div = Div(text=self.template_png)
# define buttons
self.btn_load_img = Button(label='Load Image', width=310)
self.btn_load_img.on_event(ButtonClick, self.load_img)
self.btn_load_sig = Button(label='Load Wingbeat Signal', width=310)
self.btn_load_sig.on_event(ButtonClick, self.load_signal)
self.btn_change_class = Button(label='Change Classification', width=310, button_type="danger")
self.btn_change_class.on_event(ButtonClick, self.change_classification)
self.btn_change_class.disabled = True
self.autocomplete_tax.confidence.on_change('value', self.enable_disable_change_class_button)
self.autocomplete_tax.auto_comp.on_change('value', self.enable_disable_change_class_button)
# define figures, signal
PLOTARGS = dict(tools="", toolbar_location=None, outline_line_color='#595959')
self.signal_source = ColumnDataSource(data=dict(t=[], y=[]))
self.signal_plot = figure(plot_width=710, plot_height=200,
x_range=[0, 500], y_range=[-1, 1], x_axis_label='discrete time [ms]',
y_axis_label='normalized input [ ]', **PLOTARGS)
self.signal_plot.background_fill_color = "#eaeaea"
self.signal_plot.yaxis[0].formatter = PrintfTickFormatter(format="%.0e")
self.signal_plot.line(x="t", y="y", line_color="#024768", source=self.signal_source)
# PSD
self.PSD_source = ColumnDataSource(data=dict(f=[], y=[]))
self.PSD_plot = figure(plot_width=710, plot_height=220, title="PSD",
x_range=[0, self.max_freq_khz], y_range=[-50, 0],
x_axis_label='discrete frequency [kHz]', y_axis_label='[dB]', **PLOTARGS)
self.PSD_plot.background_fill_color = "#eaeaea"
self.PSD_plot.line(x="f", y="y", line_color="#024768", source=self.PSD_source)
# spectrogram
self.PALETTE = Blues256
self.spec_source = ColumnDataSource(data=dict(t=[], f=[], amp=[], w=[], h=[]))
self.spec_plot = figure(plot_width=710, plot_height=220, title="Spectrogram",
y_range=[0, self.max_freq_khz], x_range=[0, 500],
x_axis_label='time [ms]', y_axis_label='frequency [kHz]', **PLOTARGS)
self.spec_plot.background_fill_color = "#eaeaea"
self.spec_plot.rect(x='t', y='f', width=0, height=0, source=self.spec_source)
self.mapper = LogColorMapper(palette=self.PALETTE, low=10**(-4), high=1)
self.spec_plot.rect(x='t', y='f', width='w', height='h',
source=self.spec_source, fill_color={'field': 'amp', 'transform': self.mapper},
line_color={'field': 'amp', 'transform': self.mapper})
# define tab layout
_layout = layout(row(column(row(self.btn_load_img, row(self.text_status_img, sizing_mode='stretch_both')),
self.image_div, self.autocomplete_tax.widget, self.btn_change_class),
column(row(Spacer(width=57), self.btn_load_sig,
row(self.text_status_sig, sizing_mode='stretch_both')),
self.signal_plot, self.PSD_plot, self.spec_plot),
sizing_mode='stretch_both'))
# only first ViewerTab is not closable
if self.viewer_counter == 0:
return Panel(child=_layout, title='Data Viewer')
else:
return Panel(child=_layout, title='Data Viewer(' + str(self.viewer_counter) + ')', closable=True)
[docs] def load_img(self):
""" Gets called on button click. Filedialog opens and asks to select a ``png`` file from drive (X11 forwarding
has to be enabled if the server is controlled via SSH tunnel). The image gets loaded into a div container.
"""
root = os.path.dirname(os.path.dirname(__file__))
data_path = os.path.join(root, self.config.img_path)
file_path = filedialog.askopenfilename(filetypes=[('Image Files', '.png')],
title='Open Image File', initialdir=data_path)
if file_path == '' or file_path == ():
print('Loading aborted!')
else:
self.data_json_path = os.path.dirname(file_path)
self.show_classification('Image Classification')
source = file_path.split(os.path.dirname(root))[-1]
div_text_img = '<div class="image_content"><img src="' + source + '" width="100%" height="100%"></div>'
text_output = file_path.split(data_path)[-1]
self.text_status_img.text = self.template_status.format(status_text=text_output,
background='#3EA639', colour='#ffffff', w='320px')
self.image_div.text = div_text_img
[docs] def load_signal(self):
""" Gets called on button click. Filedialog opens and asks to select a ``wav`` file from drive (X11 forwarding
has to be enabled if the server is controlled via SSH tunnel). The signal, its spectrogram and PSD get plotted
in corresponding figures.
"""
root = os.path.dirname(os.path.dirname(__file__))
data_path = os.path.join(root, self.config.wav_path)
file_path = filedialog.askopenfilename(filetypes=[('Signal Files', '.wav')],
title='Open Wingbeat File', initialdir=data_path)
if file_path == '' or file_path == ():
print('Loading aborted!')
else:
self.data_json_path = os.path.dirname(file_path)
self.show_classification('WB Classification')
samplerate, signal = wavfile.read(file_path)
time = len(signal)/samplerate * 1000
text_output = file_path.split(data_path)[-1]
self.text_status_sig.text = self.template_status.format(status_text=text_output,
background='#3EA639', colour='#ffffff', w='320px')
# resample signal before plotting
signal_resamp = sg.resample_poly(signal, 1, self.decimation_factor, padtype='edge')
# signal plot
t_signal = np.linspace(0, time, len(signal_resamp))
# PSD computation and plot
PSD_signal = 10*np.log10(sg.welch(signal_resamp, fs=samplerate/self.decimation_factor,
window='hann', nperseg=512, noverlap=256+128)[1])
f_max_khz = samplerate/self.decimation_factor/2/1000
f_PSD = np.linspace(0, f_max_khz, len(PSD_signal))
# spectrogram computation
f, t, Sxx = sg.spectrogram(signal_resamp, samplerate/self.decimation_factor, window='hann', nperseg=256,
noverlap=128+64)
# ind_max = np.where(f> self.max_freq_khz * 1000)[0][0]
# f, Sxx = f[:ind_max], Sxx[:ind_max, :]
df = pd.DataFrame(index=list(f), data=Sxx, columns=list(t))
df = pd.DataFrame(df.stack(), columns=['amp']).reset_index()
f_spec, t_spec, amp_spec = (df.loc[:, 'level_0'].values/1000, df.loc[:, 'level_1'].values*1000,
df.loc[:, 'amp'].values)
rec_height, rec_width = (np.diff(f)[0] * np.ones(len(amp_spec)) / 1000,
np.diff(t)[0]*np.ones(len(amp_spec))*1000)
# update all plots at same time
y_range = np.max(np.abs(signal_resamp/32768.0))
self.signal_plot.y_range.start, self.signal_plot.y_range.end = -y_range, y_range
self.spec_plot.x_range.end, self.signal_plot.x_range.end = time, time
self.PSD_plot.y_range.start, self.PSD_plot.y_range.end = np.min(PSD_signal), np.max(PSD_signal)
self.PSD_plot.x_range.end, self.spec_plot.y_range.end = f_max_khz, f_max_khz
self.signal_source.data = dict(t=t_signal, y=signal_resamp/32768.0)
self.PSD_source.data = dict(f=f_PSD, y=PSD_signal)
self.mapper.high = df.amp.max()
self.spec_source.data = dict(t=t_spec, f=f_spec, amp=amp_spec, w=rec_width, h=rec_height)
[docs] def show_classification(self, data_type):
""" Gets called by :meth:`load_image` or :meth:`load_signal` and outputs corresponding taxonomy data in div
container.
:param data_type: "Image", "WB" or "New Classification"
:type data_type: string
"""
if os.path.exists(os.path.join(self.data_json_path, 'data.json')):
with open(os.path.join(self.data_json_path, 'data.json')) as f:
self.data = json.load(f, object_pairs_hook=OrderedDict)
f.close()
if self.data['main_classifications']:
classification = self.data['main_classifications'][0]
if classification['genus']['name'] is None:
text_classification = '{}: {} {}'.format(data_type,
classification['order']['name'],
classification['family']['name'])
elif classification['species']['name'] is None:
text_classification = '{}: {} {}<br/>{}'.format(data_type,
classification['order']['name'],
classification['family']['name'],
classification['genus']['name'])
else:
text_classification = '{}: {} {}<br/>{}'.format(data_type,
classification['order']['name'],
classification['family']['name'],
classification['species']['name'])
if data_type != 'New Classification':
print('Loaded main classification from data.json!')
else:
text_classification = 'No classification available!'
print('No main classification for this signal available!')
else:
text_classification = 'No data.json file available!'
print('No data.json file found!')
self.autocomplete_tax.text_class.text = \
self.template_status.format(status_text=text_classification,
background='#3EA639', colour='#ffffff', w='320px')
[docs] def change_classification(self):
""" Gets called on button click. The specified autocomplete value is looked up in ``species_template.json`` \
and the corresponding taxonomy information and GBIF IDs is used to overwrite the dictionary entries in \
``data.json`` of the selected data folder.
"""
if self.data is None:
print('Load data first!')
else:
self.data['main_classifications'] = self.autocomplete_tax.write_tax_to_data()['main_classifications']
with open(os.path.join(self.data_json_path, 'data.json'), 'w') as outfile:
json.dump(self.data, outfile, indent=4)
outfile.close()
self.autocomplete_tax.confidence.value = 0
print('Classification changed!')
self.show_classification('New Classification')