Source code for dipplanner.main

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011-2012 Thomas Chiroux
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.
# If not, see <http://www.gnu.org/licenses/gpl.html>
#
# This module is part of dipplanner, a Dive planning Tool written in python

"""main dipplanner module for command line usage.

This module is used by the only "executable" of the project:
bin/dipplanner (which is an empty shell)

runs in command line and output resulting dive profile
also initiate log files
"""

__version__ = "0.3nightly"

__authors__ = [
    # alphabetical order by last name
    'Thomas Chiroux', ]

import sys
import logging
from collections import OrderedDict

# dependencies imports
from jinja2 import Environment, PackageLoader

# local imports
from dipplanner import settings
from dipplanner.dive import Dive
from dipplanner.tank import Tank
from dipplanner.segment import SegmentDive
from dipplanner.tools import altitude_to_pressure

LOGGER = logging.getLogger("dipplanner")


[docs]def activate_debug(): """setup the default debug parameters it's mainly used for test cases who needs also logging to be set *Keyword Arguments:* <none> *Return:* <nothing> *Raise:* <nothing> """ LOGGER.setLevel(logging.DEBUG) # create file handler which logs even debug messages file_handler = logging.FileHandler("dipplanner.log") file_handler.setLevel(logging.INFO) # create console handler with a higher log level stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.WARNING) # create formatter and add it to the handlers formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(name)s - %(message)s") file_handler.setFormatter(formatter) stream_handler.setFormatter(formatter) # add the handlers to the logger LOGGER.addHandler(file_handler) LOGGER.addHandler(stream_handler)
[docs]def activate_debug_for_tests(): """setup the default debug parameters it's mainly used for test cases who needs also logging to be set *Keyword Arguments:* <none> *Return:* <nothing> *Raise:* <nothing> """ LOGGER.setLevel(logging.CRITICAL) stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(name)s - %(message)s") stream_handler.setFormatter(formatter) LOGGER.addHandler(stream_handler)
[docs]def parse_config_file(filenames): """parse a config file and change default settings values *Keyword Arguments:* :filename: (str) -- name (and path) of the config file to be parsed *Returns:* <nothing> *Raise:* Nothing, but can exit """ from ConfigParser import SafeConfigParser if filenames is not None: config = SafeConfigParser() filesread = config.read(filenames) else: LOGGER.info("No config file found: skip config from files") return {} missing = set(filenames) - set(filesread) if len(filesread) == 0: LOGGER.info("No config file found: skip config from files") return {} if len(missing) > 0: if len(missing) == 1: LOGGER.warning( "Config file : %s not found, skip it" % list(missing)[0]) else: LOGGER.warning("Config files : %s not found, skip them" % ', '.join(str(n) for n in list(missing))) # now try to find each parameter and set the new setting if config.has_section('advanced'): section = 'advanced' if config.has_option(section, 'fresh_water_density'): settings.FRESH_WATER_DENSITY = float( config.get(section, 'fresh_water_density')) if config.has_option(section, 'sea_water_density'): settings.SEA_WATER_DENSITY = float( config.get(section, 'sea_water_density')) if config.has_option(section, 'absolute_max_ppo2'): settings.ABSOLUTE_MAX_PPO2 = float( config.get(section, 'absolute_max_ppo2')) if config.has_option(section, 'absolute_min_ppo2'): settings.ABSOLUTE_MIN_PPO2 = float( config.get(section, 'absolute_min_ppo2')) if config.has_option(section, 'absolute_max_tank_pressure'): settings.ABSOLUTE_MAX_TANK_PRESSURE = float( config.get(section, 'absolute_max_tank_pressure')) if config.has_option(section, 'absolute_max_tank_size'): settings.ABSOLUTE_MAX_TANK_SIZE = float( config.get(section, 'absolute_max_tank_size')) if config.has_option(section, 'surface_temp'): settings.SURFACE_TEMP = float(config.get(section, 'surface_temp')) if config.has_option(section, 'he_narcotic_value'): settings.HE_NARCOTIC_VALUE = float( config.get(section, 'he_narcotic_value')) if config.has_option(section, 'n2_narcotic_value'): settings.N2_NARCOTIC_VALUE = float( config.get(section, 'n2_narcotic_value')) if config.has_option(section, 'o2_narcotic_value'): settings.O2_NARCOTIC_VALUE = float( config.get(section, 'o2_narcotic_value')) if config.has_option(section, 'ar_narcotic_value'): settings.AR_NARCOTIC_VALUE = float( config.get(section, 'ar_narcotic_value')) if config.has_option(section, 'stop_depth_increment'): settings.STOP_DEPTH_INCREMENT = float( config.get(section, 'stop_depth_increment')) if config.has_option(section, 'last_stop_depth'): settings.LAST_STOP_DEPTH = float( config.get(section, 'last_stop_depth')) if config.has_option(section, 'stop_time_increment'): settings.STOP_TIME_INCREMENT = float( config.get(section, 'stop_time_increment')) if config.has_option(section, 'force_all_stops'): settings.FORCE_ALL_STOPS = eval( ''.join(config.get(section, 'force_all_stops')).title()) if config.has_option(section, 'ambiant_pressure_sea_level'): settings.AMBIANT_PRESSURE_SEA_LEVEL = float( config.get(section, 'ambiant_pressure_sea_level')) if config.has_option(section, 'method_for_depth_calculation'): method = config.get(section, 'method_for_depth_calculation') if method == 'simple' or method == 'complex': settings.METHOD_FOR_DEPTH_CALCULATION = method if config.has_option(section, 'travel_switch'): travel = config.get(section, 'travel_switch') if travel == 'late' or travel == 'early': settings.TRAVEL_SWITCH = travel if config.has_option(section, 'flight_altitude'): settings.FLIGHT_ALTITUDE = float( config.get(section, 'flight_altitude')) if config.has_section('output'): section = 'output' if config.has_option(section, 'template'): settings.TEMPLATE = config.get(section, 'template') if config.has_section('general'): section = 'general' if config.has_option(section, 'deco_model'): model = config.get(section, 'deco_model') if model == "ZHL16b" or model == "ZHL16c": settings.DECO_MODEL = model if config.has_option(section, 'max_ppo2'): settings.DEFAULT_MAX_PPO2 = float(config.get(section, 'max_ppo2')) if config.has_option(section, 'min_ppo2'): settings.DEFAULT_MIN_PPO2 = float(config.get(section, 'min_ppo2')) if config.has_option(section, 'max_end'): settings.DEFAULT_MAX_END = float(config.get(section, 'max_end')) if config.has_option(section, 'descent_rate'): settings.DESCENT_RATE = float( config.get(section, 'descent_rate')) / 60 if config.has_option(section, 'ascent_rate'): settings.ASCENT_RATE = float( config.get(section, 'ascent_rate')) / 60 if config.has_option(section, 'dive_consumption_rate'): settings.DIVE_CONSUMPTION_RATE = \ float(config.get(section, 'dive_consumption_rate')) / 60 if config.has_option(section, 'deco_consumption_rate'): settings.DIVE_CONSUMPTION_RATE = \ float(config.get(section, 'deco_consumption_rate')) / 60 if config.has_option(section, 'gf_low'): settings.GF_LOW = float( eval(''.join(config.get(section, 'gf_low')).strip('%'))) / 100 if config.has_option(section, 'gf_high'): settings.GF_HIGH = float( eval(''.join(config.get(section, 'gf_high')).strip('%'))) / 100 if config.has_option(section, 'water'): if config.get(section, 'water') == 'sea': settings.WATER_DENSITY = settings.SEA_WATER_DENSITY elif config.get(section, 'water') == 'fresh': settings.WATER_DENSITY = settings.FRESH_WATER_DENSITY if config.has_option(section, 'altitude'): settings.AMBIANT_PRESSURE_SURFACE = \ altitude_to_pressure(float(config.get(section, 'altitude'))) if config.has_option(section, 'run_time'): settings.RUN_TIME = eval( ''.join(config.get(section, 'run_time')).title()) if config.has_option(section, 'use_oc_deco'): settings.USE_OC_DECO = eval( ''.join(config.get(section, 'use_oc_deco')).title()) if config.has_option(section, 'multilevel_mode'): settings.MULTILEVEL_MODE = eval( ''.join(config.get(section, 'multilevel_mode')).title()) if config.has_option(section, 'automatic_tank_refill'): settings.AUTOMATIC_TANK_REFILL = eval( ''.join(config.get(section, 'automatic_tank_refill')).title()) #dives = { 'dive1': { 'tanks': {}, 'segments': {}, 'surface_interval':0} } dives = {} dive_number = 1 # initialization while config.has_section('dive%s' % dive_number): section = 'dive%s' % dive_number dives[section] = {'tanks': {}, 'segments': {}, 'surface_interval': 0} for parameter_name, parameter_value in config.items(section): if parameter_name == 'surface_interval': dives[section]['surface_interval'] = eval(parameter_value) elif parameter_name[0:4] == 'tank': #number = parameter_name[4:] (name, f_o2, f_he, volume, pressure, rule) =\ parameter_value.split(";") dives[section]['tanks'][name] = Tank( float(f_o2), float(f_he), max_ppo2=settings.DEFAULT_MAX_PPO2, tank_vol=float(eval(volume)), tank_pressure=float(eval(pressure)), tank_rule=rule) if dives[section]['tanks'] == {}: # no tank provided, try to get the previous tanks try: dives[section]['tanks'] = \ dives['dive%s' % (dive_number - 1)]['tanks'] except KeyError: print "Error : no tank provided for this dive !" sys.exit(0) for parameter_name, parameter_value in config.items(section): if parameter_name[0:7] == 'segment': #number = parameter_name[4:] (depth, time, tankname, setpoint) = parameter_value.split(";") try: dives[section]['segments'][parameter_name] = SegmentDive( float(eval(depth)), float(eval(time)), dives[section]['tanks'][tankname], float(setpoint)) except KeyError: print("Error : tank name (%s) in not found in tank list " "!" % tankname) sys.exit(0) except: raise dives[section]['segments'] = OrderedDict( sorted(dives[section]['segments'].items(), key=lambda t: t[0])) dive_number += 1 return OrderedDict(sorted(dives.items(), key=lambda t: t[0]))
[docs]def parse_arguments(): """parse all command lines options could also exit from program because of wrong arguments *Keyword Arguments:* <none> *Returns:* a tuple of two dicts: :args: :dives: * args is the result of argpaser * dives dict is in the following form: .. code-block:: python dives = { 'dive1': { 'tanks': {}, 'segments': {}, 'surface_interval':0 }, 'dive2': { 'tanks': {}, 'segments': {}, 'surface_interval':60 }} *Raise:* Nothing, but can exit """ import argparse usage = """%(prog)s [options]""" description = """%(prog)s calculates and output dive profile Thomas Chiroux, 2011-2012 - see http://dipplanner.org """ epilog = "" parser = argparse.ArgumentParser( description=description, usage=usage, epilog=epilog, formatter_class=argparse.RawTextHelpFormatter) group1 = parser.add_argument_group( "Mandatory Options", """Either presence of tank and segment inside a config file or this mandatory options (at leat one of each) are needed for this program to run""") group1.add_argument( "-c", "--config", dest="config_files", action="append", type=str, metavar="STRING", help="""path for config file. Default : ./config.cfg see manual for more infos on config file""") group1.add_argument( "-t", "--tank", dest="tanks", action="append", type=str, metavar="STRING", help="""Tank used for the dive Format: "tank_name;f_o2;f_he;Volume(l);Pressure(bar);Minimum gas rule" Example: "airtank;0.21;0.0;12;200,50b" Minimum gas rule in format : xxxb of 1/x """) group1.add_argument( "-s", "--segment", dest="segments", action="append", type=str, metavar="STRING", help="""Input segments used for the dive Format: "depth;duration;tank;setpoint" Example: "30;20*60;airtank;0.0" * depth: in meter * duration : in seconds (operations are allowed like: '30*60') * tank : name of the tank (same name as tank options) * setpoint : 0 if OC, setpoint if CCR You can specify multiple args like: %(prog)s [other_options] -s "30;1000;airtank;0.0" -s "20;800;airtank;0.0" """) group1.add_argument( "--surfaceinterval", dest="surfaceinterval", type=str, metavar="SECONDS", help="""Optional Surface Interval in seconds""") group2 = parser.add_argument_group("Dive Parameters") group2.add_argument( "--model", metavar="VAL", type=str, default="ZHL16c", choices=['ZHL16b', 'ZHL16c'], help="""Decompression model: either ZHL16b or ZHL16c """) group2.add_argument( "--gflow", metavar="VAL", type=str, help="""GF low, in """) group2.add_argument( "--gfhigh", metavar="VAL", type=str, help="""GF high, in """) group2.add_argument( "--water", metavar="VAL", type=str, help="""type of water : sea or fresh""") group2.add_argument( "--altitude", metavar="VAL", type=int, help="""altitude of the dive in meter.""") group2.add_argument( "--diveconsrate", metavar="VAL", type=str, help="""gas consumption rate during dive (in l/minute).""") group2.add_argument( "--decoconsrate", metavar="VAL", type=str, help="""gas consumption rate during deco (in l/minute).""") group2.add_argument( "--descentrate", metavar="VAL", type=str, help="""descent rate (in m/minute).""") group2.add_argument( "--ascentrate", metavar="VAL", type=str, help="""ascent rate (in m/minute).""") group2.add_argument( "--maxppo2", metavar="VAL", type=float, help="max allowed ppo2 for this dive.") group2.add_argument( "--minppo2", metavar="VAL", type=float, help="minimum allowed ppo2 for this dive.") group2.add_argument( "--maxend", metavar="VAL", type=float, help="max END allowed for this dive.") group2.add_argument( "--samegasfordeco", action="store_true", help="if set, do not use deco tanks (or bailout) for decompressions") group2.add_argument( "--forcesegmenttime", action="store_true", help="""if set, each input segment will be dove at the full time of the segment. By default the segment time is shortened by descent or ascent time """) group3 = parser.add_argument_group("Advanced Parameters") group3.add_argument( "--depthcalcmethod", metavar="simple|complex", type=str, help="""method used for pressure from depth calculation. Simple method uses only +10m = +1bar Complex methods uses real water density""") group3.add_argument( "--travelswitch", metavar="late|early", type=str, help="""Travel switch method (late or early). if late, it will keep the travel as long as possible if early, it will switch to bottom tank as soon as is it breathable""") group3.add_argument( "--surfacetemp", metavar="VAL", type=float, help="""Temperature at surface in celcius""") group3.add_argument( "--ambiantpressureatsea", metavar="VAL", type=float, help="""Change ambiant pressure at sea level (in bar)""") group4 = parser.add_argument_group("Output Parameters") group4.add_argument( "--template", metavar="TEMPLATE", type=str, help="""Name of the template to be used The template file should be present in ./templates""") # parse the options args = parser.parse_args() dives = parse_config_file(args.config_files) if dives is None: dives = {} if args.gflow: try: settings.GF_LOW = float(eval(args.gflow.strip('%'))) / 100 except ValueError: parser.error("Error while parsing option gflow : %s" % args.gflow) if args.gfhigh: try: settings.GF_HIGH = float(eval(args.gfhigh.strip('%'))) / 100 except ValueError: parser.error("Error while parsing option gfhigh: %s" % args.gfhigh) if args.water: if args.water.lower() == "sea": settings.WATER_DENSITY = settings.SEA_WATER_DENSITY if args.water.lower() == "fresh": settings.WATER_DENSITY = settings.FRESH_WATER_DENSITY if args.altitude: settings.AMBIANT_PRESSURE_SURFACE = altitude_to_pressure(args.altitude) if args.diveconsrate: try: settings.DIVE_CONSUMPTION_RATE = \ float(eval(args.diveconsrate)) / 60 except ValueError: parser.error("Error while parsing option diveconsrate : %s" % args.diveconsrate) if args.decoconsrate: try: settings.DECO_CONSUMPTION_RATE = \ float(eval(args.decoconsrate)) / 60 except ValueError: parser.error("Error while parsing option decoconsrate : %s" % args.decoconsrate) if args.descentrate: try: settings.DESCENT_RATE = float(eval(args.descentrate)) / 60 except ValueError: parser.error("Error while parsing option descentrate : %s" % args.descentrate) if args.ascentrate: try: settings.ASCENT_RATE = float(eval(args.ascentrate)) / 60 except ValueError: parser.error("Error while parsing option ascentrate : %s" % args.ascentrate) if args.model: settings.DECO_MODEL = args.model if args.maxppo2: settings.DEFAULT_MAX_PPO2 = args.maxppo2 if args.minppo2: settings.DEFAULT_MIN_PPO2 = args.minppo2 if args.maxend: settings.DEFAULT_MAX_END = args.maxend if args.samegasfordeco: settings.USE_OC_DECO = False if args.forcesegmenttime: settings.RUN_TIME = False if args.depthcalcmethod == 'simple' or args.depthcalcmethod == 'complex': settings.METHOD_FOR_DEPTH_CALCULATION = args.depthcalcmethod if args.travelswitch == 'late' or args.travelswitch == 'early': settings.TRAVEL_SWITCH = args.travelswitch if args.surfacetemp is not None: settings.SURFACE_TEMP = args.surfacetemp if args.ambiantpressureatsea: print "---------------- %s ---------------------" % \ args.ambiantpressureatsea settings.AMBIANT_PRESSURE_SEA_LEVEL = args.ambiantpressureatsea # try to find tank(s) and segment(s). # if found, add this dive to the (eventually) other dives defined in config # files. # this will be the last dive tanks = {} if args.tanks: for tank in args.tanks: (name, f_o2, f_he, volume, pressure, rule) = tank.split(";") tanks[name] = Tank( float(f_o2), float(f_he), max_ppo2=settings.DEFAULT_MAX_PPO2, tank_vol=float(eval(volume)), tank_pressure=float(eval(pressure)), tank_rule=rule) if tanks == {}: # no tank provided, try to get the previous tanks try: tanks = dives[dives.items()[-1][0]]['tanks'] except (KeyError, IndexError): print "Error : no tank provided for this dive !" sys.exit(0) segments = {} if args.segments: num_seg = 1 for seg in args.segments: (depth, time, tankname, setpoint) = seg.split(";") # looks for tank in tanks try: #seg_name = 'segment%s' % num_seg #print seg_name segments['segment%s' % num_seg] = SegmentDive( float(eval(depth)), float(eval(time)), tanks[tankname], float(setpoint)) except KeyError: parser.error( "Error : tank name (%s) in not found in tank list !" % tankname) except: pass num_seg += 1 segments = OrderedDict(sorted(segments.items(), key=lambda t: t[0])) if args.surfaceinterval: dives['diveCLI'] = {'tanks': tanks, 'segments': segments, 'surface_interval': eval(args.surfaceinterval)} else: dives['diveCLI'] = {'tanks': tanks, 'segments': segments, 'surface_interval': 0} if args.template: settings.TEMPLATE = args.template # returns return (args, dives)
[docs]def main(): """main main uses the parameters, tanks and dives given in config file(s) and/or command line, calculates the dives and return the output in stdout. *Keyword Arguments:* <none> *Return:* <nothing> *Raise:* <nothing> """ if sys.version_info < (2, 7): raise SystemExit("ERROR: This programm needs python 2.7 or greater") activate_debug() settings.__VERSION__ = __version__ (args, dives) = parse_arguments() profiles = [] previous_dive = None for dive in dives: if previous_dive is None: current_dive = Dive( dives[dive]['segments'].values(), dives[dive]['tanks'].values()) else: current_dive = Dive( dives[dive]['segments'].values(), dives[dive]['tanks'].values(), previous_dive ) if dives[dive]['surface_interval']: current_dive.do_surface_interval(dives[dive]['surface_interval']) current_dive.do_dive_without_exceptions() profiles.append(current_dive) previous_dive = current_dive # now, dive exceptins do not stop the program anymore, but can be # displayed in the output template instead. The used MUST take care of # the result. # now calculate no flight time based on the last dive try: current_dive.no_flight_time_wo_exception() except Exception, unhandled_exc: LOGGER.error("Exception while calculating no flight time: %s" % unhandled_exc) # now Prepare the output env = Environment(loader=PackageLoader('dipplanner', 'templates')) tpl = env.get_template(settings.TEMPLATE) text = tpl.render(settings=settings, dives=profiles) print text

Project Versions