from bokeh.models import Button, TextInput, Panel, Div, NumericInput, Dropdown, RadioButtonGroup
from bokeh.events import ButtonClick
from bokeh.layouts import layout, column, row, gridplot
from bokeh.document import without_document_lock
from tornado import gen
import os
import re
from datetime import datetime
import zipfile
from tkinter import filedialog
import tkinter as tk
import ruamel.yaml
import json
from colorama import Fore
[docs]class ExportTab:
"""Class to create a Bokeh widget to export
:param config: configuration including current Bokeh document and paths to data
:type config: object
"""
def __init__(self, config):
self.config = config
self.curdoc = config.curdoc
self.root = os.environ['HOME']
self.template = """<div class='content' style="background-color: {background};color: {colour};width: {w};">
<div class='info'> {status_text} </div> </div>"""
self.template_status = """<div class='content' style="width: {w}; background-color:
{background};color: {colour};">
<div class='info'> {status_text} </div> </div>"""
self.executor = config.executor
self.list_of_lists_folders = []
self.zip_counter = 0
self.zip_callback = None
self.in_progress = False
self._tab = self.make_tab()
self.new_filename = os.path.join(self.root, self.export_path.value)
# size to split data in MB
self.split_size_MB = 200
# size to split data in bytes
self.split_size_bytes = 1024 ** 2 * self.split_size_MB
self.yaml = ruamel.yaml.YAML()
ruamel.yaml.representer.\
RoundTripRepresenter.add_representer(tuple, ruamel.yaml.representer.Representer.represent_tuple)
self.yaml.default_flow_style = None
self.yaml.representer.add_representer(type(None), self.my_represent_none)
self.yaml.explicit_start = False
self.yaml.indent(mapping=3)
self.yaml.preserve_quotes = True
# suppress opening of second tkinter window
try:
root = tk.Tk()
root.withdraw()
except:
print('Enable X11 forwarding on your remote system!')
@property
def tab(self):
"""Calls method :meth:`make_widget`.
"""
return self._tab
[docs] def make_tab(self):
""" Creates the Export widget, including text inputs to specify paths to saved zip files.
:return: Bokeh widget for export zips
:rtype: Bokeh object
"""
WIDTH_INPUT = 200
# define button to start export
self.btn_export = Button(label='Export Data', width=310)
self.btn_export.on_event(ButtonClick, self.init_create_zips)
# define button to save config file
self.btn_save_config = Button(label='Save config file', width=310)
self.btn_save_config.on_event(ButtonClick, self.save_config_to_file)
# define button to expand/collapse multisensor settings
self.btn_edit_info = Button(label='Enable information editing', width=310)
self.btn_edit_info.on_event(ButtonClick, self.edit_info)
# define text input for custom config file
self.config_filename = TextInput(width=WIDTH_INPUT, value=self.config.custom_config_filename)
# self.config_status = Div(text=self.template.
# format(status_text='Choose filename for config file!',
# w='410px', background='#eaeaea', colour='#000000'), render_as_text=False)
self.user_id = NumericInput(width=WIDTH_INPUT//4, value=self.config.user_id,
low=1, high=100, mode='int')
self.user_id.disabled = True
self.sensor_id = NumericInput(width=WIDTH_INPUT//4, value=self.config.sensor_id,
low=1, high=100, mode='int')
self.sensor_id.disabled = True
self.postcode = NumericInput(width=WIDTH_INPUT*33//100, value=self.config.postcode,
low=1000, high=99999, mode='int')
self.postcode.disabled = True
self.name = TextInput(width=WIDTH_INPUT, value=self.config.name, css_classes=["custom-input"])
self.name.disabled = True
self.latitude = NumericInput(width=WIDTH_INPUT*475//1000, value=self.config.latitude,
low=-90, high=90, mode='float')
self.latitude.disabled = True
self.longitude = NumericInput(width=WIDTH_INPUT*475//1000, value=self.config.longitude,
low=-180, high=180, mode='float')
self.longitude.disabled = True
LABELS = ['SingleSave', 'TriggeredSave']
self.wb_mode = RadioButtonGroup(labels=LABELS, active=int(self.config.wb_triggered_save),
width=WIDTH_INPUT)
WIDTH_TEXTS = 100
filename_Text = TextInput(width=WIDTH_TEXTS, value='Config filename:', css_classes=["custom-input"])
filename_Text.disabled = True
user_id_Text = TextInput(width=WIDTH_TEXTS, value='User ID:', css_classes=["custom-input"])
user_id_Text.disabled = True
sensor_id_Text = TextInput(width=WIDTH_TEXTS, value='Sensor ID:', css_classes=["custom-input"])
sensor_id_Text.disabled = True
postcode_Text = TextInput(width=WIDTH_TEXTS, value='Postcode:', css_classes=["custom-input"])
postcode_Text.disabled = True
name_Text = TextInput(width=WIDTH_TEXTS, value='Name:', css_classes=["custom-input"])
name_Text.disabled = True
gps_Text = TextInput(width=WIDTH_TEXTS, value='Lat. / Long.:', css_classes=["custom-input"])
gps_Text.disabled = True
wb_mode_Text = TextInput(width=WIDTH_TEXTS, value='Wingbeat mode:', css_classes=["custom-input"])
wb_mode_Text.disabled = True
multisensor_info = column(row(user_id_Text, self.user_id),
row(sensor_id_Text, self.sensor_id),
row(postcode_Text, self.postcode),
row(name_Text, self.name),
row(gps_Text, self.latitude, self.longitude),
row(wb_mode_Text, self.wb_mode),)
save_config = column(self.btn_edit_info, multisensor_info,
row(filename_Text, self.config_filename),
self.btn_save_config,
)
self.config_load_status = Div(text=self.template.
format(status_text='Select startup configuration file',
w='200px', background='#eaeaea', colour='#000000'), render_as_text=False)
self.config_dropdown = Dropdown(label=self.config.custom_config_filename, width=200,
button_type="warning", menu=self.get_all_config_files())
self.config_dropdown.on_click(self.get_dropdown_item)
load_config_layout = column(self.config_load_status, self.config_dropdown)
# define text input for path to saved zips
self.export_path = TextInput(width=190, value='export_folder')
self.export_path.disabled = False
self.export_path.on_change('value', self.create_new_filename)
self.save_status = Div(text=self.template.
format(status_text='{} {}'.format('Data will be exported to',
os.path.join(self.root, self.export_path.value)),
w='410px', background='#eaeaea', colour='#000000'), render_as_text=False)
self.progression = Div(text=self.template_status.format(status_text='Export data to zip file(s)',
background='#eaeaea', colour='#000000', w='320px'))
export_folder_Text = TextInput(width=WIDTH_TEXTS+10, value='Export foldername:', css_classes=["custom-input"])
export_folder_Text.disabled = True
export_layout = column(self.progression, self.save_status,
row(export_folder_Text, self.export_path),
self.btn_export,
)
return Panel(child=layout(row(export_layout, load_config_layout, save_config)),
title='Export')
[docs] @staticmethod
def my_represent_none(self, data_dict):
return self.represent_scalar(u'tag:yaml.org,2002:null', u'null')
[docs] def edit_info(self):
if self.btn_edit_info.label == 'Enable information editing':
self.btn_edit_info.label = 'Disable information editing'
self.user_id.disabled, self.sensor_id.disabled = False, False
self.postcode.disabled, self.name.disabled = False, False
self.latitude.disabled, self.longitude.disabled = False, False
else:
self.btn_edit_info.label = 'Enable information editing'
self.user_id.disabled, self.sensor_id.disabled = True, True
self.postcode.disabled, self.name.disabled = True, True
self.latitude.disabled, self.longitude.disabled = True, True
[docs] @staticmethod
def get_all_config_files():
if os.path.exists('sensors/custom_config_files/'):
files = next(os.walk('sensors/custom_config_files/'))[2]
for f in files:
if f.endswith('.json'):
files.remove(f)
else:
files = []
return sorted(files)
[docs] def get_dropdown_item(self, event):
load_config_filename = event.item
self.change_load_config_file(load_config_filename)
[docs] def change_load_config_file(self, load_config_filename):
self.config_dropdown.menu = self.get_all_config_files()
self.config_dropdown.label = load_config_filename
dict_filename = {"custom_config_filename": load_config_filename}
if os.path.exists('sensors/custom_config_files/'):
with open('sensors/custom_config_files/load_custom_config.json', 'w') as outfile:
json.dump(dict_filename, outfile, indent=4, ensure_ascii=False)
print('Loading config file ' + Fore.GREEN + load_config_filename + Fore.WHITE +
' when restarting a new Bokeh session!')
else:
print(Fore.RED + 'The folder sensors/custom_config_files does not exist!\n'
'Please start a new Bokeh session!' + Fore.WHITE)
[docs] def save_config_to_file(self):
with open('sensors/custom_config_files/' + self.config.custom_config_filename) as fp:
config = self.yaml.load(fp)
config['user_id'] = self.user_id.value
config['sensor_id'] = self.sensor_id.value
config['postcode'] = self.postcode.value
config['name'] = self.name.value
config['latitude'] = float("{:3.6f}".format(self.latitude.value))
config['longitude'] = float("{:3.6f}".format(self.longitude.value))
config['analog_gain'] = self.config.visual_camera.camera_settings.analog_gain.value
config['digital_gain'] = self.config.visual_camera.camera_settings.digital_gain.value
config['white_balance'] = [self.config.visual_camera.camera_settings.white_balance_1.value,
self.config.visual_camera.camera_settings.white_balance_2.value]
config['wb_triggered_save'] = bool(self.wb_mode.active)
with open('sensors/custom_config_files/' + self.config_filename.value, 'wb') as f:
self.yaml.dump(config, f)
print('Saved new config file ' + Fore.GREEN + self.config_filename.value + Fore.WHITE + '!\n' +
Fore.RED + 'Restart Bokeh session for changes to take effect!' + Fore.WHITE)
self.change_load_config_file(self.config_filename.value)
if self.btn_edit_info.label == 'Disable information editing':
self.edit_info()
[docs] def create_new_filename(self, attr, new, old):
"""Gets called on change of value of customized folder name text input widget.
Creates a user specified path to save export zips.
"""
self.new_filename = os.path.join(self.root, self.export_path.value)
self.save_status.text = self.template.\
format(status_text='{} {}'.
format('Data will be exported to', os.path.join(self.root, self.export_path.value)),
w='410px', background='#eaeaea', colour='#000000')
[docs] def init_create_zips(self):
"""Gets called on button click to show/hide camera setting sliders and numeric inputs.
"""
if self.btn_export.label == 'Export Data':
self.btn_export.label = 'Abort (progression not lost)'
root = os.path.dirname(os.path.dirname(__file__))
init_path = os.path.join(root, os.path.dirname(self.config.img_path))
self.folder_selected = filedialog.askdirectory(title='Select parent folder of data directories',
initialdir=init_path)
if self.folder_selected == '' or self.folder_selected == ():
self.progression.text = self.template_status.format(status_text='Export aborted!',
background='#DC524C', colour='#000000', w='320px')
print('No data selected, export aborted!')
self.btn_export.label = 'Export Data'
return
else:
self.progression.text = self.template_status.format(status_text='Preparing for data export...',
background='#eaeaea', colour='#000000', w='320px')
self.list_of_lists_folders = self.get_list_of_lists_folders()
if self.list_of_lists_folders == [[]]:
self.list_of_lists_folders = [[self.folder_selected]]
print('{} {} {}!'.format('Initializing thread to create',
str(len(self.list_of_lists_folders)), 'zip file(s)'))
now = datetime.now()
self.timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")
self.output_path = os.path.join(self.new_filename, '{}_{}'.format('export', self.timestamp))
os.makedirs(self.output_path)
self.in_progress = False
self.zip_callback = self.curdoc.add_periodic_callback(self.periodic_callback, 500)
else:
# clean up after saves are done or user abortion
self.btn_export.label = 'Export Data'
if self.zip_callback is not None:
self.curdoc.remove_periodic_callback(self.zip_callback)
self.zip_callback = None
self.progression.text = self.template_status.format(status_text=str(self.zip_counter) +
' zip(s) saved!',
background='#3EA639', colour='#ffffff', w='320px')
print('{} zip(s) saved to {}!'.format(str(self.zip_counter), self.new_filename))
self.zip_counter = 0
[docs] def periodic_callback(self):
"""Targeted by periodic callback from :meth:`init_create_zips`. Method checks if zip is in progress and
initiate the creation of a new archive if false.
"""
if self.in_progress:
# zipping in progress, do nothing so js callbacks aren't blocked
pass
else:
self.in_progress = True
self.curdoc.add_next_tick_callback(self.create_single_zip)
[docs] @gen.coroutine
def update_progression(self):
"""Targeted by next tick callback from :meth:`create_single_zip`. Method checks if all zips are done and exits
if true or updates the progression if false.
"""
if self.zip_counter == len(self.list_of_lists_folders):
# all zips done, reset all
self.curdoc.add_next_tick_callback(self.init_create_zips)
else:
progression_text = str(self.zip_counter) + ' of ' + str(len(self.list_of_lists_folders)) + ' zip(s) saved!'
self.progression.text = self.template_status.format(status_text=progression_text, background='#eaeaea',
colour='#000000', w='320px')
[docs] @gen.coroutine
@without_document_lock
def create_single_zip(self):
"""Targeted by next tick callback from :meth:`periodic_callback`. The method creates one zip in a parallel
thread that is not blocking the main Bokeh visualization thread.
:return: future
:rtype: object
"""
self.zip_counter += 1
folders = self.list_of_lists_folders[self.zip_counter-1]
filename = '{}_{}_{}.zip'.format('part', str(self.zip_counter), self.timestamp)
output_filename = os.path.join(self.output_path, filename).replace(os.sep, '/')
print(os.path.basename(output_filename))
# create a future so png callback doesn't get blocked
self.in_progress = yield self.executor.submit(self.zipit, folders, output_filename)
if self.zip_callback is not None:
self.curdoc.add_next_tick_callback(self.update_progression)
[docs] def get_list_of_lists_folders(self):
"""Method creates a list of folder lists from the selected starting path, whereby each folder list's memory \
size is smaller than the specified `max_size` in bytes. To check that folder lists only contain data paths
in import/export format, each folder is checked for the existence of a ``data.json`` file and no existing
subdirectories.
"""
list_of_lists = []
folder_list = []
total_size = 0
data_folder_format = re.compile('.*_.*-.*-.*_.*-.*-.*')
for dirpath, dirnames, filenames in os.walk(self.folder_selected):
# if data.json not in dirpath, continue loop
if not os.path.isfile(os.path.join(dirpath, 'data.json')):
folder_name = os.path.basename(dirpath)
if data_folder_format.match(folder_name) is not None:
print(Fore.RED + 'No data.json found in directory {}!'.format(folder_name) + Fore.WHITE)
continue
subdir_exists = False
for item in os.listdir(dirpath):
if os.path.isdir(os.path.join(dirpath, item)):
subdir_exists = True
break
# if subdirectory exists, continue loop
if subdir_exists:
continue
size_folder = 0
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
size_folder += os.path.getsize(fp)
if total_size < self.split_size_bytes:
folder_list.append(dirpath.replace(os.sep, '/'))
else:
list_of_lists.append(folder_list)
total_size = size_folder
folder_list = [dirpath.replace(os.sep, '/')]
# add last folder_list
list_of_lists.append(folder_list)
folders_count = 0
for folder_list in list_of_lists:
folders_count += len(folder_list)
print('{} {} {}'.format(folders_count, 'data sets found within', self.folder_selected))
return list_of_lists
[docs] @staticmethod
def zipit(folder_list, output_filename):
"""Creates a zip archive of a list of folders and saves it to the specified output path
:param folder_list: list of folders to add to archive
:type folder_list: list of paths
:param output_filename: filename of archive
:type output_filename: path
"""
zip_file = zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED)
for folder_index, folder in enumerate(folder_list):
for dirpath, dirnames, filenames in os.walk(folder):
for filename in filenames:
zip_file.write(
os.path.join(dirpath, filename),
os.path.relpath(os.path.join(dirpath, filename), os.path.join(folder_list[folder_index], '..')))
zip_file.close()
return False