Source code for com4FlowPy.flowClass

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    Functions for calculations at the 'cell level' (vgl. D'Amboise et al., 2022)
"""

import numpy as np
import math


[docs]class Cell:
[docs] def __init__( self, rowindex, colindex, dem_ng, cellsize, flux, z_delta, parent, alpha, exp, flux_threshold, max_z_delta, startcell, fluxDistOldVersionBool=False, FSI=None, forestParams=None, ): """constructor for the Cell class that describes a raster cell that is hit by the GMF. the constructor function is called every time a new instance of type 'Cell' is initialized. NOTE/TODO: parent can be of different data types still, maybe split into two separate variables * bool --> isStart * Cell --> startCell """ self.rowindex = rowindex # index of the Cell in row-direction (i.e. local y-index in the calculation domain) self.colindex = colindex # index of the Cell in column-direction (i.e. local x-index in the calculation domain) self.dem_ng = dem_ng # elevation values in the 3x3 neigbourhood around the Cell self.altitude = dem_ng[1, 1] # elevation value of the cell (central cell of 3x3 neighbourhood) self.cellsize = cellsize # cellsize in meters self.tan_beta = np.zeros_like(self.dem_ng) self.dist = np.zeros_like(self.dem_ng) self.persistence = np.zeros_like(self.dem_ng) self.r_t = np.zeros_like(self.dem_ng) self.no_flow = np.ones_like(self.dem_ng) self.flux = flux self.fluxDep = 0 self.z_delta = z_delta self.alpha = float(alpha) self.exp = int(exp) self.max_z_delta = float(max_z_delta) self.flux_threshold = float(flux_threshold) self.fluxDistOldVersionBool = fluxDistOldVersionBool self.tanAlpha = np.tan(np.deg2rad(self.alpha)) # moved to constructor, so this doesn't have to be calculated on # every iteration of calc_z_delta(self) self.min_distance = 0 # minimal distance to start-cell (i.e. along shortest path) min_distance >= self.minDistXYZ = 0 # minimal distance to start-cell (Actual 3D lenght, not projected!!) self.max_distance = 0 # NOTE: self.max_distance is never used - maybe remove!? self.min_gamma = 0 # NOTE: self.min_gamma (assumingly minimal travel angle to cell) never used - maybe remove!? self.max_gamma = 0 self.sl_gamma = 0 self._SQRT2 = np.sqrt(2.0) self._RAD90 = np.deg2rad(90.0) # NOTE: Forest Interaction included here # if FSI != None AND forestParams != None - then self.ForestBool = True and forestParams and # FSI are accordingly initialized if (FSI is not None) and (forestParams is not None): self.forestBool = True self.forestModule = forestParams["forestModule"] self.skipForestDist = forestParams["skipForestDist"] # forestInteraction: self.forestInteraction = forestParams["forestInteraction"] if self.forestInteraction: if FSI > 0: self.isForest = 1 else: self.isForest = 0 self.forestIntCount = self.isForest else: self.forestInteraction = False if (self.forestModule == "forestFriction") or (self.forestModule == "forestDetrainment"): self.FSI = FSI self.maxAddedFrictionForest = forestParams["maxAddedFriction"] self.minAddedFrictionForest = forestParams["minAddedFriction"] self.noFrictionEffectV = forestParams["velThForFriction"] self.maxAddedDetrainmentForest = forestParams["maxDetrainment"] self.minAddedDetrainmentForest = forestParams["minDetrainment"] self.noDetrainmentEffectV = forestParams["velThForDetrain"] _vThFr = self.noFrictionEffectV _vThDe = self.noDetrainmentEffectV _sqrt2xG = self._SQRT2 * 9.81 self.noFrictionEffectZDelta = (_vThFr * _vThFr) / _sqrt2xG self.noDetrainmentEffectZdelta = (_vThDe * _vThDe) / _sqrt2xG elif self.forestModule == "forestFrictionLayer": if forestParams["fFrLayerType"] == "absolute": self.AlphaFor = FSI elif forestParams["fFrLayerType"] == "relative": self.AlphaFor = self.alpha + FSI self.AlphaFor = max(self.AlphaFor, self.alpha) # Friction in Forest can't be lower than without forest self.tanAlphaFor = np.tan(np.deg2rad(self.AlphaFor)) # NOTE: This is a quick hack to check if all values for Detrainment are set to 0 (as provided in the # .ini file) # if this is the case, then the self.forest_detrainment function does not have to be called inside # self.calc_distribution # TO-DO: clean this up and handle it better if (self.forestModule == "forestFriction") or (self.forestModule == "forestFrictionLayer"): self.forestDetrainmentBool = False elif ( (self.maxAddedDetrainmentForest == 0) and (self.minAddedDetrainmentForest == 0) and (self.noDetrainmentEffectV == 0) ): self.forestDetrainmentBool = False else: self.forestDetrainmentBool = True else: self.forestBool = False self.forestInteraction = False self.FSI = 0.0 self.maxAddedFrictionForest = 0 self.minAddedFrictionForest = 0 self.noFrictionEffectV = 0 self.maxAddedDetrainmentForest = 0 self.minAddedDetrainmentForest = 0 self.noDetrainmentEffectV = 0 self.noFrictionEffectZDelta = 0 self.noDetrainmentEffectZdelta = 0 if type(startcell) == bool: # if a boolean variable (i.e.'True') is passed to the constructor self.is_start = True # set is_start to True else: self.startcell = startcell # give startcell to cell self.is_start = False # set is_start to False self.lOfParents = [] if type(parent) == Cell: self.lOfParents.append(parent) if self.forestInteraction: self.forestIntCount += parent.forestIntCount
[docs] def add_os(self, flux): """ Adds flux to 'existing' flux Parameters ----------- flux: float added flux """ self.flux += flux
[docs] def add_parent(self, parent): """ Adds parent to parents list and optionally the forest interaction value of the parent Parameters ----------- parent: class Cell Cell object to add to the list of parent cells 'lOfParents' """ self.lOfParents.append(parent) if self.forestInteraction: # check if new/ younger parent has a lower forest interaction number # than the older one -> take minimum! if parent.forestIntCount < (self.forestIntCount - self.isForest): self.forestIntCount = parent.forestIntCount + self.isForest
[docs] def calcDistMin(self, calc3D=False): """ function calculates the projected horizontal (self.min_distance) and 3D (self.minDistXYZ) length of the shortest flow path from the start-cell to the current cell. Parameters ----------- calc3D: bool if True, the 3D distance is computed additionally (default: False) """ if calc3D: _ldistMin = [] _lDistMinXYZ = [] for parent in self.lOfParents: _dx = abs(parent.colindex - self.colindex) * self.cellsize _dy = abs(parent.rowindex - self.rowindex) * self.cellsize _dz = abs(parent.altitude - self.altitude) _ldistMin.append(math.sqrt(_dx * _dx + _dy * _dy) + parent.min_distance) _lDistMinXYZ.append(math.sqrt(_dy*_dy + _dy*_dy + _dz*_dz) + parent.minDistXYZ) self.min_distance = np.amin(_ldistMin) self.minDistXYZ = np.amin(_lDistMinXYZ) else: _ldistMin = [] for parent in self.lOfParents: _dx = abs(parent.colindex - self.colindex) * self.cellsize _dy = abs(parent.rowindex - self.rowindex) * self.cellsize _ldistMin.append(math.sqrt(_dx * _dx + _dy * _dy) + parent.min_distance) self.min_distance = np.amin(_ldistMin)
[docs] def calc_fp_travelangle(self): """function calculates the travel-angle along the shortest flow-path from the start-cell to the current cell. The travel-angle along the shortest flow-path is equivalent to the maximum travel angle along all paths from the startcell to this cell. """ _dh = self.startcell.altitude - self.altitude # elevation difference from cell to start-cell self.max_gamma = np.rad2deg(np.arctan(_dh / self.min_distance))
[docs] def calc_sl_travelangle(self): """function calculates the travel-angle between the start cell and the current cell using the shortest connection or straight line (sl) between the two cells. The travel angle calculated with this shortest horizontal distance 'sl_gamma' is always larger or equal to the travel angle 'max_gamma', which is calculated along the (potentially curved) flow path (fp). (vgl. Meißl, 1998) """ _dx = abs(self.startcell.colindex - self.colindex) _dy = abs(self.startcell.rowindex - self.rowindex) _dh = self.startcell.altitude - self.altitude _ds = math.sqrt(_dx * _dx + _dy * _dy) * self.cellsize self.sl_gamma = np.rad2deg(np.arctan(_dh / _ds))
[docs] def calc_z_delta(self): """ function calculates zDelta (velocity or energy line height) to the eligible neighbours NOTE: forestFriction related mechanics are implemented here! """ self.z_delta_neighbour = np.zeros((3, 3)) self.z_gamma = self.altitude - self.dem_ng ds = np.array([[self._SQRT2, 1, self._SQRT2], [1, 0, 1], [self._SQRT2, 1, self._SQRT2]]) if (not self.is_start): if (not self.forestBool): self.calcDistMin() else: self.calcDistMin(calc3D=True) if self.forestBool: if self.forestModule == "forestFrictionLayer": if (not self.is_start) and (self.skipForestDist < self.minDistXYZ): _tanAlpha = self.tanAlphaFor else: _tanAlpha = self.tanAlpha if self.forestModule in ["forestFriction", "forestDetrainment"]: if (not self.is_start) and (self.FSI > 0.) and (self.skipForestDist < self.minDistXYZ): # if forestBool, we assume that forestFriciton is activated # and if FSI > 0 then we also calculate _tanAlpha with forestEffect # NOTE: We also don't assume a forest Effect on potential Start Zells, since this should # ideally be handled by a separate release-area algorithm in the pre-processing # NOTE-TODO: The rest of this implementation is also just copy+pasted from 'foreste_detraiment' # branch and not yet fully tested!! if self.z_delta < self.noFrictionEffectZDelta: # friction at rest v=0 would be applied to start cells _rest = self.maxAddedFrictionForest * self.FSI # rise over run _slope = (_rest - self.minAddedFrictionForest) / (0 - self.noFrictionEffectZDelta) # y = mx + b, shere z_delta is the x friction = max(self.minAddedFrictionForest, _slope * self.z_delta + _rest) _alpha_calc = self.alpha + max(0, friction) # NOTE: not sure what this does, seems redundant! else: _alpha_calc = self.alpha + self.minAddedFrictionForest _tanAlpha = np.tan(np.deg2rad(_alpha_calc)) else: _tanAlpha = self.tanAlpha else: # else simply use tanAlpha _tanAlpha = self.tanAlpha self.z_alpha = ds * self.cellsize * _tanAlpha self.z_delta_neighbour = self.z_delta + self.z_gamma - self.z_alpha self.z_delta_neighbour[self.z_delta_neighbour < 0] = 0 self.z_delta_neighbour[self.z_delta_neighbour > self.max_z_delta] = self.max_z_delta
[docs] def calc_tanbeta(self): """calculates the normalized terrain based routing """ _ds = np.array([[self._SQRT2, 1, self._SQRT2], [1, 1, 1], [self._SQRT2, 1, self._SQRT2]]) _distance = _ds * self.cellsize _beta = np.arctan((self.altitude - self.dem_ng) / _distance) + self._RAD90 self.tan_beta = np.tan(_beta / 2) self.tan_beta[self.z_delta_neighbour <= 0] = 0 self.tan_beta[self.persistence <= 0] = 0 self.tan_beta[1, 1] = 0 if abs(np.sum(self.tan_beta)) > 0: self.r_t = self.tan_beta**self.exp / np.sum(self.tan_beta**self.exp)
[docs] def calc_persistence(self): """ calculates persistence-based routing """ self.persistence = np.zeros_like(self.dem_ng) if self.is_start: self.persistence += 1 elif self.lOfParents[0].is_start: self.persistence += 1 else: for parent in self.lOfParents: dx = parent.colindex - self.colindex dy = parent.rowindex - self.rowindex self.no_flow[dy + 1, dx + 1] = 0 # 3x3 Matrix of ones, every parent gets a 0, no flow to a parent field maxweight = parent.z_delta # Old Calculation if dx == -1: if dy == -1: self.persistence[2, 2] += maxweight self.persistence[2, 1] += 0.707 * maxweight self.persistence[1, 2] += 0.707 * maxweight if dy == 0: self.persistence[1, 2] += maxweight self.persistence[2, 2] += 0.707 * maxweight self.persistence[0, 2] += 0.707 * maxweight if dy == 1: self.persistence[0, 2] += maxweight self.persistence[0, 1] += 0.707 * maxweight self.persistence[1, 2] += 0.707 * maxweight if dx == 0: if dy == -1: self.persistence[2, 1] += maxweight self.persistence[2, 0] += 0.707 * maxweight self.persistence[2, 2] += 0.707 * maxweight if dy == 1: self.persistence[0, 1] += maxweight self.persistence[0, 0] += 0.707 * maxweight self.persistence[0, 2] += 0.707 * maxweight if dx == 1: if dy == -1: self.persistence[2, 0] += maxweight self.persistence[1, 0] += 0.707 * maxweight self.persistence[2, 1] += 0.707 * maxweight if dy == 0: self.persistence[1, 0] += maxweight self.persistence[0, 0] += 0.707 * maxweight self.persistence[2, 0] += 0.707 * maxweight if dy == 1: self.persistence[0, 0] += maxweight self.persistence[0, 1] += 0.707 * maxweight self.persistence[1, 0] += 0.707 * maxweight
[docs] def calc_distribution(self): """ calculates flux and zdelta that is distributed to the adjacent cells Returns ----------- tuple list of row, col, flux, zdelta of adjacent cells that receive flux/zdelta """ self.calc_z_delta() self.calc_persistence() self.persistence *= self.no_flow self.calc_tanbeta() # print(self.persistence) if not self.is_start: # FOREST-Detrainment --> we only assume detrainment if the cell is not a start-cell if self.forestBool and self.forestDetrainmentBool: self.forest_detrainment() self.calc_fp_travelangle() self.calc_sl_travelangle() # FOREST-Detrainment # here we subtract the detrainment from the flux before moving flux to new cells. if self.forestBool and self.forestDetrainmentBool: # NOTE-TODO: check/test what the hard-coded 0.0003 does here or if this should be # substituted by self.flux_threshold???? self.flux = max(0.0003, self.flux - self.detrainment) threshold = self.flux_threshold if np.sum(self.r_t) > 0: self.dist = (self.persistence * self.r_t) / np.sum(self.persistence * self.r_t) * self.flux # This handles (local) flux re-distribution if n cells are below threshold, but lager 0 and m cells are # still above threshold # NOTE: this only works if "0 < n < 8" AND "0 < m < 8", the case where # "0<n<8" AND "m=0" is not handled!!! (in this case flux is "lost") if self.fluxDistOldVersionBool: """ legacy version of code (pre 01-2025) WITH BUG (is used if self.fluxDistOldVersionBool) here 'count' returns the number of neighbor/child cells that receive flux > 0, but below the flux_threshold. """ count = ((0 < self.dist) & (self.dist < threshold)).sum() else: """ default/correct version with FIXED BUG (used if self.fluxDistOldVersionBool == False) her 'count' returns the number of neighbor/child cells which receive flux >= the flux_threshold. """ count = (self.dist >= threshold).sum() # massToDistribute ~ sum of flux of child cells below the flux_threshold mass_to_distribute = np.sum(self.dist[self.dist < threshold]) if mass_to_distribute > 0 and count > 0: # local flux redistribution to eligible child cells self.dist[self.dist >= threshold] += mass_to_distribute / count self.dist[self.dist < threshold] = 0 if np.sum(self.dist) != self.flux and count > 0: # correction/flux conservation for potential rounding losses or gains # (self.flux - np.sum(self.dist)) will either be negative or positive # depending on the direction of the rounding error self.dist[self.dist >= threshold] += (self.flux - np.sum(self.dist)) / count if count == 0: # if all child cells are below flux_threshold, the flux is deposited # TODO: check alternatives (e.g. 'global' redistribution or within generations) self.fluxDep = self.flux row_local, col_local = np.where(self.dist >= threshold) return ( self.rowindex - 1 + row_local, self.colindex - 1 + col_local, self.dist[row_local, col_local], self.z_delta_neighbour[row_local, col_local], )
[docs] def forest_detrainment(self): """ linear decrease of forest effect with regard to alpha increase and kinetic energy height This is the detrainment routine for forest. It should reduce the routing flux of the avalanche. NOTE: This is more or less copied+pasted from 'foreste_detrainment' branch in avaframe/FlowPy repo TODO: Definitely re-check/test this function!! """ _noDetrainmentEffectZdelta = self.noDetrainmentEffectZdelta # detrainment effect scaled to forest, 0 for non-forest _rest = (self.maxAddedDetrainmentForest * self.FSI) # rise over run (should be negative slope) slope = (_rest - self.minAddedDetrainmentForest) / (0 - _noDetrainmentEffectZdelta) # y=mx+b, where zDelta is x self.detrainment = max(self.minAddedDetrainmentForest, slope * self.z_delta + _rest)