Source code for in3Utils.geoTrans

""" Opperations and transformations of rasters and lines
"""

import logging
import math
import numpy as np
import scipy as sp
import copy

# Local imports
import avaframe.in2Trans.ascUtils as IOf

# create local logger
log = logging.getLogger(__name__)


[docs]def projectOnRaster(dem, Points, interp='bilinear'): """ Projects the points Points on Raster using a bilinear or nearest interpolation and returns the z coord (no for loop) Input : Points: list of points (x,y) 2 rows as many columns as Points Output: PointsZ: list of points (x,y,z) 3 rows as many columns as Points """ header = dem['header'] rasterdata = dem['rasterData'] xllc = header.xllcenter yllc = header.yllcenter cellsize = header.cellsize xcoor = Points['x'] ycoor = Points['y'] zcoor, ioob = projectOnGrid(xcoor, ycoor, rasterdata, csz=cellsize, xllc=xllc, yllc=yllc, interp=interp) Points['z'] = zcoor return Points, ioob
[docs]def projectOnGrid(x, y, Z, csz=1, xllc=0, yllc=0, interp='bilinear'): """ Projects the points Points on Raster using a bilinear or nearest interpolation and returns the z coord Input : Points: (x, y) coord of the points Output: PointsZ: z coord of the points ioob number of out of bounds indexes """ nrow, ncol = np.shape(Z) # initialize outputs z = np.ones((np.shape(x)))*np.NaN dx = np.ones((np.shape(x)))*np.NaN dy = np.ones((np.shape(x)))*np.NaN f11 = np.ones((np.shape(x)))*np.NaN f12 = np.ones((np.shape(x)))*np.NaN f21 = np.ones((np.shape(x)))*np.NaN f22 = np.ones((np.shape(x)))*np.NaN # find coordinates in normalized ref (origin (0,0) and cellsize 1) Lxx = (x - xllc) / csz Lyy = (y - yllc) / csz Lx = copy.deepcopy(Lxx) Ly = copy.deepcopy(Lyy) # find out of bound indexes if interp == 'nearest': Lx[np.where((Lxx <= -0.5))] = np.NaN Ly[np.where((Lxx <= -0.5))] = np.NaN Lx[np.where(Lxx >= (ncol-0.5))] = np.NaN Ly[np.where(Lxx >= (ncol-0.5))] = np.NaN Lx[np.where(Lyy <= -0.5)] = np.NaN Ly[np.where(Lyy <= -0.5)] = np.NaN Lx[np.where(Lyy >= (nrow-0.5))] = np.NaN Ly[np.where(Lyy >= (nrow-0.5))] = np.NaN elif interp == 'bilinear': Lx[np.where((Lxx < 0))] = np.NaN Ly[np.where((Lxx < 0))] = np.NaN Lx[np.where(Lxx >= (ncol-1))] = np.NaN Ly[np.where(Lxx >= (ncol-1))] = np.NaN Lx[np.where(Lyy < 0)] = np.NaN Ly[np.where(Lyy < 0)] = np.NaN Lx[np.where(Lyy >= (nrow-1))] = np.NaN Ly[np.where(Lyy >= (nrow-1))] = np.NaN # find index of index of not nan value mask = ~np.isnan(Lx+Ly) maskInd = np.argwhere(~np.isnan(Lx+Ly))[:, 0] itot = len(Lx) iinb = len(maskInd) ioob = itot - iinb # find coordinates of the 4 nearest cornes on the raster Lx0 = np.floor(Lx).astype('int') Ly0 = np.floor(Ly).astype('int') Lx1 = Lx0 + 1 Ly1 = Ly0 + 1 # prepare for bilinear interpolation(do not take out of bound into account) if interp == 'nearest': dx[mask] = np.round(Lx[mask]) dy[mask] = np.round(Ly[mask]) z[mask] = Z[dy[mask].astype('int'), dx[mask].astype('int')] elif interp == 'bilinear': dx[mask] = Lx[mask] - Lx0[mask] dy[mask] = Ly[mask] - Ly0[mask] f11[mask] = Z[Ly0[mask], Lx0[mask]] f12[mask] = Z[Ly1[mask], Lx0[mask]] f21[mask] = Z[Ly0[mask], Lx1[mask]] f22[mask] = Z[Ly1[mask], Lx1[mask]] # using bilinear interpolation on the cell z = f11*(1-dx)*(1-dy) + f21*dx*(1-dy) + f12*(1-dx)*dy + f22*dx*dy return z, ioob
[docs]def resizeData(raster, rasterRef): """ Reproject raster on a grid of shape rasterRef Parameters ---------- raster : dict raster dictionary rasterRef : dict reference raster dictionary Returns ------- data : 2D numpy array reprojected data dataRef : 2D numpy array reference data """ if IOf.isEqualASCheader(raster['header'], rasterRef['header']): return raster['rasterData'], rasterRef['rasterData'] else: headerRef = rasterRef['header'] ncols = headerRef.ncols nrows = headerRef.nrows csz = headerRef.cellsize xllc = headerRef.xllcenter yllc = headerRef.yllcenter xgrid = np.linspace(xllc, xllc+(ncols-1)*csz, ncols) ygrid = np.linspace(yllc, yllc+(nrows-1)*csz, nrows) X, Y = np.meshgrid(xgrid, ygrid) Points = {'x': X, 'y': Y} Points, _ = projectOnRaster(raster, Points, interp='bilinear') raster['rasterData'] = Points['z'] return raster['rasterData'], rasterRef['rasterData']
[docs]def remeshDEM(cfg, dem): """ change DEM cell size by reprojecting on a new grid Interpolation is based on griddata with a cubic method. Here would be the place to change the order of the interpolation or to switch to another interpolation method. Parameters ---------- cfg : configParser meshCellSizeThreshold : threshold under which no remeshing is done meshCellSize : desired cell size dem : dict dem dictionary Returns ------- dem : dict reprjected dem dictionary """ cszThreshold = cfg.getfloat('meshCellSizeThreshold') cszNew = cfg.getfloat('meshCellSize') # read dem header headerDEM = dem['header'] nColsDEM = headerDEM.ncols nRowsDEM = headerDEM.nrows xllcenter = headerDEM.xllcenter yllcenter = headerDEM.yllcenter cszDEM = headerDEM.cellsize # remesh if input DEM size does not correspond to the computational cellSize if np.abs(cszNew - cszDEM) > cszThreshold: log.info('Remeshing the input DEM (of cell size %.4g m) to a cell size of %.4g m' % (cszDEM, cszNew)) x = np.linspace(0, (nColsDEM-1)*cszDEM, nColsDEM) + xllcenter y = np.linspace(0, (nRowsDEM-1)*cszDEM, nRowsDEM) + yllcenter xGrid, yGrid = np.meshgrid(x, y) xGrid = xGrid.flatten() yGrid = yGrid.flatten() z = dem['rasterData'] zCopy = np.copy(z) zCopy = zCopy.flatten() mask = np.where(~np.isnan(zCopy)) xGrid = xGrid[mask] yGrid = yGrid[mask] z = zCopy[mask] headerRemeshed = IOf.cASCheader() headerRemeshed.cellsize = cszNew nColsRemeshed = np.floor(nColsDEM * cszDEM / cszNew) nRowsRemeshed = np.floor(nRowsDEM * cszDEM / cszNew) headerRemeshed.ncols = int(nColsRemeshed) headerRemeshed.nrows = int(nRowsRemeshed) headerRemeshed.xllcenter = xllcenter headerRemeshed.yllcenter = yllcenter headerRemeshed.noDataValue = headerDEM.noDataValue dem['header'] = headerRemeshed xNew = np.linspace(0, (nColsRemeshed-1)*cszNew, int(nColsRemeshed)) + xllcenter yNew = np.linspace(0, (nRowsRemeshed-1)*cszNew, int(nRowsRemeshed)) + yllcenter xNewGrid, yNewGrid = np.meshgrid(xNew, yNew) zNew = sp.interpolate.griddata((xGrid, yGrid), z, (xNewGrid, yNewGrid), method='cubic', fill_value=headerDEM.noDataValue) dem['rasterData'] = zNew return dem
[docs]def prepareLine(dem, avapath, distance=10, Point=None): """ 1- Resample the avapath line with a max intervall of distance=10m between points (projected distance on the horizontal plane). 2- Make avalanche profile out of the path (affect a z value using the dem) 3- Get projection of points on the profil (closest point) Parameters ---------- - a dem dictionary - a avapath line dictionary - a resampling Distance - a point dictionary (optional, can contain several point) Returns ------- - the resampled avaprofile - the projection of the point on the profile (if several points were give in input, only the closest point to the profile is projected) """ xcoor = avapath['x'] ycoor = avapath['y'] xcoornew = np.array([xcoor[0]]) ycoornew = np.array([ycoor[0]]) s = np.array([0]) # curvilinear coordinate allong the path # loop on the points of the avapath for i in range(np.shape(xcoor)[0] - 1): Vx = xcoor[i + 1] - xcoor[i] Vy = ycoor[i + 1] - ycoor[i] D = np.sqrt(Vx**2 + Vy**2) nd = int(np.floor(D / distance) + 1) # Resample each segment S0 = s[-1] for j in range(1, nd + 1): xn = j / (nd) * Vx + xcoor[i] yn = j / (nd) * Vy + ycoor[i] xcoornew = np.append(xcoornew, xn) ycoornew = np.append(ycoornew, yn) s = np.append(s, S0 + D * j / nd) ResampAvaPath = avapath ResampAvaPath['x'] = xcoornew ResampAvaPath['y'] = ycoornew ResampAvaPath, _ = projectOnRaster(dem, ResampAvaPath) ResampAvaPath['s'] = s AvaProfile = ResampAvaPath # find split point by computing the distance to the line if Point: projPoint = findSplitPoint(AvaProfile, Point) else: projPoint = None return AvaProfile, projPoint
[docs]def findSplitPoint(AvaProfile, Points): """ Finds the closest point in Points to the AvaProfile and returns its projection on AvaProfile. """ xcoor = AvaProfile['x'] ycoor = AvaProfile['y'] Dist = np.empty((0)) IndSplit = np.empty((0)) for i in range(len(Points['x'])): dist = np.sqrt((xcoor - Points['x'][i])**2 + (ycoor - Points['y'][i])**2) indSplit = np.argmin(dist) IndSplit = np.append(IndSplit, indSplit) Dist = np.append(Dist, dist[indSplit]) ind = np.argmin(Dist) indSplit = int(IndSplit[ind]) projPoint = {} projPoint['x'] = AvaProfile['x'][indSplit] projPoint['y'] = AvaProfile['y'][indSplit] projPoint['z'] = AvaProfile['z'][indSplit] projPoint['s'] = AvaProfile['s'][indSplit] projPoint['indSplit'] = indSplit return projPoint
[docs]def checkProfile(AvaProfile, projSplitPoint=None): """ check that the avalanche profiles goes from top to bottom """ if projSplitPoint: indSplit = projSplitPoint['indSplit'] if AvaProfile['z'][-1] > AvaProfile['z'][0]: log.info('Profile reversed') AvaProfile['x'] = np.flip(AvaProfile['x']) AvaProfile['y'] = np.flip(AvaProfile['y']) AvaProfile['z'] = np.flip(AvaProfile['z']) try: L = AvaProfile['s'][-1] AvaProfile['s'] = L - np.flip(AvaProfile['s']) except KeyError: pass if projSplitPoint: indSplit = len(AvaProfile['x']) - indSplit - 1 projSplitPoint['indSplit'] = indSplit AvaProfile['indSplit'] = indSplit else: projSplitPoint = None AvaProfile['indSplit'] = None return projSplitPoint, AvaProfile
[docs]def findAngleProfile(tmp, ds, dsMin): """ Find the beta point: first point under the beta value given in prepareAngleProfile. Make sure that at least dsMin meters behind the point are also under the beta value otherwise keep searching Parameters ---------- tmp: 1D numpy array index array of point in profile with slope bellow the given beta angle and bellow the splitPoint ds: 1D numpy array distance between points discribed in tmp dsMin: float threshold distance [m] for looking for the beta point (at least dsMin meters below beta degres) Returns ------- idsAnglePoint: int index of beta point """ noBetaFoundMessage = 'No Beta point found. Check your pathAB.shp and splitPoint.shp.' i = 0 condition = True if np.size(tmp) == 0: raise IndexError(noBetaFoundMessage) while (i <= np.size(tmp) and condition): ind = tmp[i] j = 0 dist = 0 while dist < dsMin: try: condition = condition and (tmp[i+j+1] == ind+j+1) dist = dist + ds[i + j] except IndexError: raise IndexError(noBetaFoundMessage) if not condition: i = i + j + 1 break j = j + 1 if condition: idsAnglePoint = ind break condition = True return idsAnglePoint
[docs]def prepareAngleProfile(beta, AvaProfile): """Prepare inputs for findAngleProfile function Read profile (s, z), compute the slope Angle look for points for which the slope is under the given Beta value and that are located downstreem of the splitPoint Parameters ---------- beta: float beta angle in degrees AvaProfile: dict profile dictionary, s, z and a split point(optional) Returns ------- angle: 1D numpy array profile angle tmp: 1D numpy array index array of point in profile with slope bellow the given beta angle and bellow the splitPoint ds: 1D numpy array distance between points discribed in tmp """ s = AvaProfile['s'] z = AvaProfile['z'] try: indSplit = AvaProfile['indSplit'] CuSplit = s[indSplit] except KeyError: log.warning('No split Point given!') CuSplit = 0 ds = np.abs(s - np.roll(s, 1)) dz = np.roll(z, 1) - z ds[0] = 0.0 dz[0] = 0.0 angle = np.rad2deg(np.arctan2(dz, ds)) # get all values where Angle < 10 but >0 # get index of first occurance and go one back to get previous value # (i.e. last value above 10 deg) # tmp = x[(angle < 10.0) & (angle > 0.0) & (x > 450)] tmp = np.where((angle <= beta) & (s > CuSplit)) tmp = np.asarray(tmp).flatten() ds = ds[tmp] return angle, tmp, ds
[docs]def isCounterClockWise(path): """ Determines if a polygon path is mostly clockwise or counter clockwise Parameters ---------- path: matplotlib.path polygon path Returns ------- 1 if the path is counter clockwise, 0 otherwise https://stackoverflow.com/a/45986805/15887086 """ v = path.vertices-path.vertices[0, :] a = np.arctan2(v[1:, 1], v[1:, 0]) return (a[1:] >= a[:-1]).astype(int).mean() >= 0.5
[docs]def findCellsCrossedByLineBresenham(x0, y0, x1, y1, cs): # normalize Cellsize cs to 1 x0 = round(x0/cs) x1 = round(x1/cs) y0 = round(y0/cs) y1 = round(y1/cs) dx = abs(x1-x0) dy = abs(y1-y0) sx = np.sign(x1-x0) # step in x direction sy = np.sign(y1-y0) # step in y direction x = x0 y = y0 n = dx + dy ddx = 2 * dx ddy = 2 * dy if ddx >= ddy: err = dx N = dx else: err = dy N = dy z = [] n = 0 while n < N: errprev = err z.append([x*cs, y*cs]) if ddx >= ddy: x += sx err += ddy if err > ddx: y += sy err -= ddx if (err + errprev) < ddx: z.append([x*cs, (y-sy)*cs]) elif (err + errprev) > ddx: z.append([(x-sx)*cs, y*cs]) # else: # z.append([x*cs, (y-sy)*cs]) # z.append([(x-sx)*cs, y*cs]) else: y += sy err += ddx if err > ddy: x += sx err -= ddy if (err + errprev) < ddy: z.append([(x-sx)*cs, y*cs]) elif (err + errprev) > ddy: z.append([x*cs, (y-sy)*cs]) # else: # z.append([x*cs, (y-sy)*cs]) # z.append([(x-sx)*cs, y*cs]) n = n + 1 z.append([x*cs, y*cs]) return z
[docs]def path2domain(xyPath, rasterTransfo): """ path2domain Creates a domain (irregular raster) along a path, given the path polyline, a domain width and a raster cellsize Usage: [rasterTransfo] = path2domain(xyPath, rasterTransfo) Input: -xyPath: Polyline Coordinates -rasterTransfo['w']: Domain width -rasterTransfo['xllc']: xllc -rasterTransfo['yllc']: yllc -rasterTransfo['cellsize']: cellsize Output: xp, yp Arrays determining a path of width w along a polyline -rasterTransfo['DBXl']: x coord of the left boundary -rasterTransfo['DBXr']: x coord of the right boundary -rasterTransfo['DBYl']: y coord of the left boundary -rasterTransfo['DBYr']: y coord of the right boundary [Fischer2013] Fischer, Jan-Thomas. (2013). A novel approach to evaluate and compare computational snow avalanche simulation. Natural Hazards and Earth System Sciences. 13. 1655-. 10.5194/nhess-13-1655-2013. Uwe Schlifkowitz/ BFW, June 2011 """ xllc = rasterTransfo['xllc'] yllc = rasterTransfo['yllc'] csz = rasterTransfo['cellSize'] x = xyPath['x'] y = xyPath['y'] w = rasterTransfo['domainWidth']/2/csz # Shift grid origin to (0,0) x = x - xllc y = y - yllc # remove scaling due to cellsize x = x/csz y = y/csz # Difference between x- bzw. y-Coordinates of Polyline # first and last Vertex: Difference between this and the next # other vertices: Difference between previous and next dx = np.array((x[1]-x[0])) dy = np.array((y[1]-y[0])) n = len(x) for i in range(2, n): dx = np.append(dx, (x[i]-x[i-2])/2.) dy = np.append(dy, (y[i]-y[i-2])/2.) dx = np.append(dx, x[-1]-x[-2]) dy = np.append(dy, y[-1]-y[-2]) # Direction of normal vector of difference, # a.k.a. bisecting line of angle d = np.arctan2(dy, dx) + math.pi/2 # x- and y-Coordinates (left and right) of path edges, # total width w # x-KOO[left right] DBXl = np.array((x + w * np.cos(d))) DBXr = np.array((x + w * np.cos(d + math.pi))) # y-KOO[left right] DBYl = np.array((y + w * np.sin(d))) DBYr = np.array((y + w * np.sin(d + math.pi))) rasterTransfo['DBXl'] = DBXl rasterTransfo['DBXr'] = DBXr rasterTransfo['DBYl'] = DBYl rasterTransfo['DBYr'] = DBYr return rasterTransfo
[docs]def poly2maskSimple(xdep, ydep, ncols, nrows): """ poly2maskSimple Create a mask from a polyline Usage: mask = poly2maskSimple(ydep, xdep, ncols, nrows) Input: ydep, xdep: Polyline Coordinates ncols, nrows: Raster size Output: mask: Raster of the polyline mask """ mask = np.zeros((nrows, ncols)) xyframe = findCellsCrossedByLineBresenham(xdep[0], ydep[0], xdep[1], ydep[1], 1) xyframe = np.delete(xyframe, -1, 0) xyframe = np.transpose(xyframe) for i in range(1, len(xdep)-1): xyline = findCellsCrossedByLineBresenham(xdep[i], ydep[i], xdep[i+1], ydep[i+1], 1) # last point is first point of the next line xyline = np.delete(xyline, -1, 0) xyline = np.transpose(xyline) xyframe = np.hstack((xyframe, xyline)) xyline = findCellsCrossedByLineBresenham(xdep[-1], ydep[-1], xdep[0], ydep[0], 1) xyline = np.delete(xyline, -1, 0) xyline = np.transpose(xyline) xyframe = np.hstack((xyframe, xyline)) # filling the inside of the polygon with ones # i = xyframe[0] # j = xyframe[1] mv, nv = np.meshgrid(np.linspace(0, ncols-1, ncols), np.linspace(0, nrows-1, nrows)) # create index space # mask = inpolygon(mv, nv, i, j) mask = inpolygon(mv, nv, np.append(xdep, xdep[-1]), np.append(ydep, ydep[-1])) # TODO: decide if we add the margin or not. # for i in range(0, len(xyframe[0, :])): # mask[xyframe[1, i], xyframe[0, i]] = 1 return mask
[docs]def inpolygon(X, Y, xv, yv): """ inpolygon For a polygon defined by vertex points (xv, yv), returns a np array of size X with ones if the points (X, Y) are inside (or on the boundary) of the polygon; Otherwise, returns zeros. Usage: mask = inpolygon(X, Y, xv, yv) Input: X, Y: Set of points to check xv, yv: polygon vertex points Output: mask: np array of zeros and ones Octave Implementation [IN, ON] = inpolygon (X, Y, xv, yv) """ npol = len(xv) xv = np.floor(xv+0.5).astype('int') yv = np.floor(yv+0.5).astype('int') maxXv = np.ceil(np.max(xv)).astype('int') + 1 minXv = np.floor(np.min(xv)).astype('int') maxYv = np.ceil(np.max(yv)).astype('int') + 1 minYv = np.floor(np.min(yv)).astype('int') IN = np.zeros(np.shape(X)) j = npol-1 for i in range(npol-1): deltaxv = xv[j] - xv[i] deltayv = yv[j] - yv[i] # distance = [distance from (X,Y) to edge] * length(edge) distance = deltaxv*(Y-yv[i]) - (X-xv[i])*deltayv # is Y between the y-values of edge i,j # AND (X,Y) on the left of the edge ? for ii in range(minYv, maxYv, 1): for jj in range(minXv, maxXv, 1): if (((yv[i] <= Y[ii][jj] and Y[ii][jj] < yv[j]) or (yv[j] <= Y[ii][jj] and Y[ii][jj] < yv[i])) and (0 < distance[ii][jj]*deltayv)): if IN[ii][jj] == 0: IN[ii][jj] = 1 else: IN[ii][jj] = 0 j = i # for i in range(npol-1): # IN[yv[i]][xv[i]] = 0 return IN
[docs]def areaPoly(X, Y): """Gauss's area formula to calculate polygon area Parameters ---------- - X coord of the vertices - Y coord of the vertices (Without repeating the first vertex!!!) Returns ------- Area of the polygon """ X = np.append(X, X[0]) Y = np.append(Y, Y[0]) sum = 0 for i in range(np.size(X)-1): sum = sum + (X[i]*Y[i+1]-Y[i]*X[i+1])/2 return sum
[docs]def checkOverlap(toCheckRaster, refRaster, nameToCheck, nameRef, crop=False): """Check if two raster overlap Parameters ---------- toCheckRaster : 2D numpy array Raster to check refRaster : 2D numpy array refference Raster crop : boolean if True, remove overlaping part and send a warning Returns ------- if crop is True, return toCheckRaster without the overlaping part and send a warning if needed if crop is False, return error if Rasters overlap otherwise return toCheckRaster """ mask = (toCheckRaster > 0) & (refRaster > 0) if mask.any(): if crop: toCheckRaster[mask] = 0 message = '%s area feature overlapping with %s area - removing the overlapping part' % (nameToCheck, nameRef) log.warning(message) else: message = '%s area features overlapping with %s area - this is not allowed' % (nameToCheck, nameRef) log.error(message) AssertionError(message) return toCheckRaster