Source code for floodsimilarity.event_container

import floodsimilarity.event as event
import json
import jsonschema
import sys
from datetime import datetime, timedelta
from pkg_resources import resource_filename


[docs]class EventContainer(event.Event): """ A class that is defined by a "gid" (the unique number for a gauging station) and a start and end date that defines the event. """ def __init__(self, gid, event_start, event_end, function_list): # invoking the __init__ of the parent class event.Event.__init__(self, gid) # ---------------------------- # parse the function list # Remove the methods names "default" schema_file = resource_filename(__name__, "json/fl_schema.json") with open(schema_file) as json_file: fl_schema = json.load(json_file) # Validate the json schema. NOTE: the function only raises an error if # the validation fails. map(jsonschema.validate, function_list, fl_schema) fl = [x for x in function_list if x["name"] != "default"] self.function_list = fl # ---------------------------- self.event_start = datetime.strptime(event_start, "%Y-%m-%dT%H:%M:%S.%fZ") self.event_end = datetime.strptime(event_end, "%Y-%m-%dT%H:%M:%S.%fZ") # Read the schema that is stored in this directory. This will be used # to validate the user given configuration. with open(resource_filename(__name__, 'json/ec_results.json')) as f: self.results = json.load(f) # Computing the number of days between the event start and the event # end. ed = (self.event_end - self.event_start).days self.results["ed"]["data"]["ed"]["value"] = ed # The time span is set to the event start - 30 days and the event end + # 10 days. self.query_start = self.event_start - timedelta(days=30) self.query_end = self.event_end + timedelta(days=10) def __scale_to_basin(self, value, dim="km"): """ Scale the value according to the basin size in square kilometers """ basin = self.basin.to_crs("epsg:3035") if dim == "km": # use the catchment area in square kilometers basin_area = basin['geometry'].area / (1000 * 1000) elif dim == "m": # use the catchment area in square meters basin_area = basin['geometry'].area else: sys.exit("Dimension `dim` must be either given in `m` or `km`, \ representing square meters and square kilometers \ respectively.") return value / basin_area def __q_volume2height(self, value): """ Convert m^3/second to mm/day. """ # 1 m^3 equates to 1000 liters equates to 1000 mm. value = self.__scale_to_basin((value * (60 * 60 * 24) * 1000), dim="m") return value # ------------------------------------------------------------------------- def get_event_peak_properties(self): """ Method to extract the event peak flow of an event. """ temp = event.ts_json2pd(self.q_ts) # slice the data to event start and event end temp = temp.loc[(temp["date"] >= self.event_start) & (temp["date"] <= self.event_end)] max_value = temp["field"].max() # get peak flow self.results["epfp"]["data"]["epf"]["value"] = max_value # peak flow normalized with the basin area nepf = self.__scale_to_basin(max_value) self.results["epfp"]["data"]["nepf"]["value"] = nepf.values[0] # peak flow in mm/day epfh = self.__q_volume2height(max_value) self.results["epfp"]["data"]["eph"]["value"] = epfh.values[0] # date of peak runoff max_date = datetime.strftime(temp.loc[temp["field"].idxmax()]["date"], format="%Y-%m-%d") self.results["epfp"]["data"]["epd"]["value"] = max_date def get_event_peak_rp(self): """ Method to extract the event peak flow of an event. """ temp = event.ts_json2pd(self.rp_ts) # slice the data to event start and event end temp = temp.loc[(temp["date"] >= self.event_start) & (temp["date"] <= self.event_end)] # get peak flow eprp = temp["field"].max() self.results["eprp"]["data"]["eprp"]["value"] = int(eprp) def get_event_start_flow(self): """ Method to extract the event peak flow of an event. """ temp = event.ts_json2pd(self.q_ts) # slice the data to event start and event end temp = temp.loc[(temp["date"] == self.event_start)] # get flow value self.results["esfp"]["data"]["esf"]["value"] = temp["field"].values[0] # flow normalized with the basin area nesf = self.__scale_to_basin(temp["field"].values) self.results["esfp"]["data"]["nesf"]["value"] = nesf.values[0] # flow in mm/day esh = self.__q_volume2height(temp["field"].values[0]) self.results["esfp"]["data"]["esh"]["value"] = esh.values[0]
[docs] def compute_event_api(self, days=10, k=0.95): """ A method that computes the antecedent precipitation index. Parameters ---------- days : int, optional Number of days to be considered prior the event - {"default": 10, "min": 1, "max": 30} k : float, optional Decay factor between 0 and 1 - {"default": 0.95, "min": 0, "max": 1} Returns ------- compute_event_api API as a real number and api as spatial distributed value at event start date """ # ---------------------------------------------------------------- # Event generation, that's why the event start is the last day of the # data we consider for that index. end = self.event_start start = self.event_start - timedelta(days=days) temp_xr = self.precip_data.sel(date=slice(start, end)) # ---------------------------------------------------------------- # apply the api function api_grid = event.api(precip=temp_xr["precipitation"], k=k) self.results["api"]["data"]["api_index"]["value"] = api_grid.mean().item() # ---------------------------------------------------------------- self.results["api"]["data"]["api_map"]["value"] = api_grid
[docs] def compute_event_precip_max(self, days=3): """ A method that computes the event precipitation as the maximum of accumulated precipitation in a x-day period. Parameters ---------- days : int, optional Number of days for accumulated precipitation - {"default": 3, "min": 1, "max": 10} Returns ------- compute_event_precip_max Maxium x-day sum as a real number representing the event precipitation in mm. """ # ---------------------------------------------------------------- temp_xr = self.precip_data.sel(date=slice(self.event_start, self.event_end)) # ---------------------------------------------------------------- # apply the ep function precip_sum_max = event.ep_max(precip=temp_xr["precipitation"], w=days) self.results["ep_max"]["data"]["epm"]["value"] = precip_sum_max.item()
def compute_event_precip_vol(self): """ A method that computes the total event precipitation between event start and event end. Returns ------- compute_event_precip_vol Maxium x-day sum as a real number representing the event Total precipitation during the event in mm. """ # ---------------------------------------------------------------- temp_xr = self.precip_data.sel(date=slice(self.event_start, self.event_end)) # ---------------------------------------------------------------- # total event precipitation precip_vol = event.ep_volume(precip=temp_xr["precipitation"]) self.results["ep_vol"]["data"]["epv"]["value"] = precip_vol.item() def get_event_start_sm(self): """ A method that returns the soilmoisture at the event start. Why do we all the fuzz with the date and time below. We had the case in which the timestamp of the data we query is different from the timestamp of the 'self.event_start'. Even they are on the same day, the slicing did not return anything. Like this, we query all times at the same day, kind of ignoring the timestamp. """ year =self.event_start.year month =self.event_start.month day =self.event_start.day start = datetime(year, month, day, 00, 00, 00, 000000) end = datetime(year, month, day, 23, 59, 59, 999999) # ---------------------------------------------------------------- temp_xr = self.soilmoisture_data.sel(date=slice(start, end)) # compute the spatial mean() sm_mean = temp_xr["soilmoisture"].mean() self.results["sm"]["data"]["sm"]["value"] = sm_mean.item() # ---------------------------------------------------------------- self.results["sm"]["data"]["sm_map"]["value"] = temp_xr["soilmoisture"]