Source code for floodsimilarity.floodsimilarity

import inspect
import json
from joblib import Parallel, delayed
from docstring_parser import parse
from pkg_resources import resource_filename
import re
import floodsimilarity.event_container as ec
import floodsimilarity.event_extraction as ee
# ----------------------------------------------------------------------------


def process_docstring_params_description(para_description):
    """
    Function to extract additional info from the docstring that is provided
    like a dictionary in the parameter description. We process everything that
    is located between curly *{* parenthesis e.g. {"bla": blub}. 

    The content is loaded directly as a dictionary and returned as such.
    """
    try:
        info = re.search(r'\{\"(.*?)\}', para_description, re.S).group(0)
        return json.loads(info)
    except AttributeError:
        pass

def process_docstring_params(docstring):
    """
    Function to extract the properties of the parameters given by the
    docstring. We use only the attributes that are provided by the parser of
    the docstring module.
    """
    para_list = []
    for item in docstring.params:
        doc_dict = {
                "name": item.arg_name,
                "type": item.type_name,
                "optional": item.is_optional
                }
        # Extract additional information from the description
        d = process_docstring_params_description(item.description)
        try:
            doc_dict.update(d)
        except TypeError:
            pass
        finally:
            para_list.append(doc_dict)
    return para_list 


def get_processing_options_params(poption):
    """
    Find the functions and extract their docstrings.
    """
    temp = poption["method"]
    module = __import__(temp["module"], fromlist=[temp["class"]])
    mod_class = getattr(module, temp["class"])
    f_list = inspect.getmembers(mod_class, predicate=inspect.isfunction)
    # This must be single valued list, thats why we can use index 0
    try:
        method = [x for x in  f_list if x[0] == temp["name"]][0]
        docstring = parse(inspect.getdoc(method[1]))
        params = process_docstring_params(docstring)
        if params:
            poption["method"]["params"] = params
        return poption
    # It will through an index error for all the methods that are computed by
    # default, like event duration (ed)
    except IndexError:
        return poption


[docs]def get_processing_options(): ''' Function that returns metadata about computed flood characteristics and the methods used to compute them. Parameters ---------- Returns ------- poptions : dict A dictionary containing the information about possible processing options. Examples -------- >>> from floodsimilarity.floodsimilarity import get_processing_options The keys of the return value lists the indicators that are available. >>> poptions = get_processing_options() >>> poptions.keys() dict_keys(['api', 'ep_max', 'ep_vol', 'ed', 'epfp', 'esfp', 'eprp', 'sm']) Lets assume we need the method to compute the indicator `antecedent precipitation index (api)`. The method name for the given in indicator is given in the `[method][name]` key. >>> poptions['api']['method']['name'] 'compute_event_api' ''' with open(resource_filename(__name__, "json/ec_results.json")) as f: poptions = json.load(f) # https://stackoverflow.com/questions/15078519/python-dictionary-passed-as-an-input-to-a-function-acts-like-a-global-in-that-fu for k, v in poptions.items(): get_processing_options_params(v) return poptions
# ---------------------------------------------------------------------------- def init_event_extraction(event_dict, config_file): ee_object = ee.EventExtraction(gid=event_dict['gid'], start_date=event_dict['start_date'], end_date=event_dict['end_date']) # load config with open(config_file) as f: config = json.load(f) ee_object.load_config(config) return ee_object
[docs]def process_event_extraction(event_dict, config_file='./config.json'): ''' Extract flood events based on direct runoff for a given gauging station and time period. Parameters ---------- event_dict : dict Dictionary with the keys ('gid', 'start_date', 'end_date'). Returns ------- event_dict : :class:`floodsimilarity.event_extraction.EventExtraction` Examples -------- >>> from floodsimilarity.floodsimilarity import process_event_extraction Lets define the event were are interested in. >>> event = { ... 'gid': 460, ... 'start_date': '2008-01-01T00:00:00.00Z', ... 'end_date': '2011-12-31T00:00:00.00Z', ... } Call the function. >>> ee_object = process_event_extraction(event) >>> type(ee_object) <class 'floodsimilarity.event_extraction.EventExtraction'> The results a given in the `results` attribute, a dictionary with additional meta-data. For the given example, we have 12 events detected-so we have 12 entries where each represent a single event in the `ts_json` format. >>> len(ee_object.results['events']['data']['events']['value']) 12 ''' ee_object = init_event_extraction(event_dict, config_file) ee_object.apply_event_extraction() return ee_object
# ---------------------------------------------------------------------------- def init_event_container(event_dict, config_file): ec_object = ec.EventContainer(gid=event_dict['gid'], event_start=event_dict['event_start'], event_end=event_dict['event_end'], function_list=event_dict['function_list']) # load config with open(config_file) as f: config = json.load(f) ec_object.load_config(config) return ec_object def event_container_process_data(ec_object): # process each requested method # I guess if we manage to use lazy properties (class properties loaded on # demand) it is not a good idea to parallelize this step because this will # surely end in race conditions for f in ec_object.function_list: # return a bound method for the ec_object func = getattr(ec_object, f['name']) func() return ec_object def process_event(event, config_file): ''' Wrapper function to link the processing steps for a single event. ''' temp_event = init_event_container(event, config_file) processed_event = event_container_process_data(temp_event) return processed_event
[docs]def process_request(fe_request, parallel=True, config_file='./config.json'): ''' Extract flood events properties for a given gauging station and time period. Parameters ---------- fe_request : list A list of dictionaries (keys 'gid', 'event_start', 'event_end', 'function_list') parallel : bool, optional Processing the list of requests in parallel {default: True}. Returns ------- response_list : list Same number of items as provided by the `fe_request`. Each item is of class :class:`floodsimilarity.event_container.EventContainer` Examples -------- >>> from floodsimilarity.floodsimilarity import process_request >>> fl = [ ... { ... 'name': 'get_event_peak_properties' ... }, ... { ... 'name': 'compute_event_api', ... 'params': { ... 'days': 10, ... 'k': 0.95 ... } ... } ... ] The request from the front-end (fe). >>> fe_request = [ ... { ... 'gid': 383, ... 'event_start': '2013-05-31T00:00:00.00Z', ... 'event_end': '2013-06-10T00:00:00.00Z', ... 'function_list': fl ... } ... ] Call the function. >>> ec_object_list = process_request(fe_request, parallel=False) >>> type(ec_object_list) <class 'list'> >>> type(ec_object_list[0]) <class 'floodsimilarity.event_container.EventContainer'> >>> ec_object_list[0].results['epfp']['data']['epf']['value'] 275.0 ''' if parallel: num_cores = len(fe_request) # process the request response_list = Parallel(n_jobs=num_cores)( delayed(process_event)(event=event, config_file=config_file) for event in fe_request) else: response_list = [ process_event(event=event, config_file=config_file) for event in fe_request ] return response_list