Source code for in2Trans.shpConversion

"""
    Conversion functions to read/ write Shape files
"""


import pathlib
import shapefile
import copy
import numpy as np
import logging

# create local logger
log = logging.getLogger(__name__)


[docs]def SHP2Array(infile, defname=None): """ Read shapefile and convert it to a python dictionary The dictionary contains the name of the paths in the shape file, the np array with the coordinates of the feature points (all stacked in the same array) and information about the starting index and length of each feature Parameters ---------- infile: str path to shape file defname: str name to give to the feature in the shape file Returns ------- SHPdata : dict sks : projection information Name : str list of feature names x : 1D numpy array np array of the x coord of the points in the features y : 1D numpy array np array of the y coord of the points in the features z : 1D numpy array np array of zeros and same size as the x an y coordinates array Start : 1D numpy array np array with the starting index of each feature in the coordinates arrays (as many indexes as features) Length : 1D numpy array np array with the length of each feature in the coordinates arrays (as many indexes as features) thickness (optional) : 1D numpy array np array with the (release or entrainment) thickness of each feature (as many values as features) id : list list of oid as string for each feature ci95: list list of 95% confidence interval of thickness value nParts: list list of parts of polygon (added the total number of points as list item, so if multiple parts len>2) nFeatures: int number of features per line (parts) """ # Input shapefile sf = shapefile.Reader(str(infile)) infile = pathlib.Path(infile) # set defaults for variables layername = None thickness = None slope = None rho = None sks = None iso = None id = None ci95 = None layerN = None # get coordinate system sks = getSHPProjection(infile) # Start reading the shapefile records = sf.shapeRecords() shps = sf.shapes() SHPdata = {} SHPdata['sks'] = sks Name = [] thicknessList = [] idList = [] ci95List = [] layerNameList = [] Length = np.empty((0)) Start = np.empty((0)) Coordx = np.empty((0)) Coordy = np.empty((0)) Coordz = np.empty((0)) start = 0 nParts = [] for n, item in enumerate(shps): pts = item.points # is there a z coordinate? try: # for dams zs = item.z except AttributeError: zs = [0.0] * len(pts) # add info on number of parts nParts.append(list(item.parts) + [len(item.points)]) # check if records are available and extract if records: # loop through fields for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record): # get entity name name = name.lower() if (name == 'name'): layername = str(value) if (name == 'thickness') or (name == 'd0'): thickness = value if (name == 'ci95'): ci95 = value if (name == 'slope'): # for dams slope = value if (name == 'rho'): rho = value if (name == 'sks'): sks = value if (name == 'iso'): iso = value if (name == 'layer'): layerN = value # if name is still empty go through file again and take Layer instead if ((type(layername) is bytes) or (layername is None)): for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record): if (name == 'Layer'): layername = value # if layer still not defined, use generic if layername is None: layername = defname Name.append(layername) log.debug('SHPConv: Found layer %s', layername) thicknessList.append(str(thickness)) ci95List.append(str(ci95)) layerNameList.append(layerN) Start = np.append(Start, start) length = len(pts) Length = np.append(Length, length) start += length for (pt, z) in zip(pts, zs): Coordx = np.append(Coordx, pt[0]) Coordy = np.append(Coordy, pt[1]) Coordz = np.append(Coordz, z) # get unique ID of features in shapefile for rec in sf.records(): id = rec.oid idList.append(str(id)) SHPdata['Name'] = Name SHPdata['thickness'] = thicknessList SHPdata['slope'] = slope SHPdata['Start'] = Start SHPdata['Length'] = Length SHPdata['x'] = Coordx SHPdata['y'] = Coordy SHPdata['z'] = Coordz SHPdata['id'] = idList SHPdata['ci95'] = ci95List SHPdata['layerName'] = layerNameList SHPdata['nParts'] = nParts SHPdata['nFeatures'] = len(Start) sf.close() return SHPdata
[docs]def getSHPProjection(infile): """ Fetch projection from shp file Parameters ---------- infile: str path to shape file Returns ------- sks: str projection string (if available, None if not) """ # get coordinate system prjfile = infile.with_suffix('.prj') if prjfile.is_file(): prjf = open(prjfile, 'r') sks = prjf.readline() prjf.close() return sks else: message = ('No projection layer for shp file %s' % infile) log.warning(message) return None
[docs]def readThickness(infile, defname=None): """ Read shapefile and fetch info on features' ids and thickness values Parameters ---------- infile: str path to shape file defname: str name to give to the feature in the shape file Returns ------- thickness: list list of strings with the (release or entrainment) thickness of each feature (as many values as features) id : list list of strings for oid of each feature in shp file ci95: list list of all ci95 values if provided """ # Input shapefile sf = shapefile.Reader(str(infile)) infile = pathlib.Path(infile) thickness = None id = None ci95 = None # Start reading the shapefile records = sf.shapeRecords() shps = sf.shapes() thicknessList = [] idList = [] ci95List = [] for n, item in enumerate(shps): pts = item.points zs = [0.0] * len(pts) # check if records are available and extract if records: # loop through fields for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record): # get entity name name = name.lower() if (name == 'thickness') or (name == 'd0'): thickness = value if (name == 'ci95'): ci95 = value thicknessList.append(str(thickness)) ci95List.append(str(ci95)) # get unique ID of features in shapefile for rec in sf.records(): id = rec.oid idList.append(str(id)) sf.close() return thicknessList, idList, ci95List
[docs]def readLine(fname, defname, dem): """ Read line from .shp Use SHP2Array to read the shape file. Check if the lines are laying inside the dem extend Parameters ---------- fname: str path to shape file defname: str name to give to the line in the shape file dem: dict dem dictionary Returns ------- Line : dict Line['Name'] : list of lines names Line['Coord'] : np array of the coords of points in lines Line['Start'] : list of starting index of each line in Coord Line['Length'] : list of length of each line in Coord """ log.debug('Reading avalanche path : %s ', str(fname)) header = dem['header'] rasterDEM = dem['rasterData'] Line = SHP2Array(fname, defname) coordx = Line['x'] coordy = Line['y'] for i in range(len(coordx)): Lx = (coordx[i] - header['xllcenter']) / header['cellsize'] Ly = (coordy[i] - header['yllcenter']) / header['cellsize'] if ((Ly < 0) or (Ly > header['nrows']-1) or (Lx < 0) or (Lx > header['ncols']-1)): log.error(fname) raise ValueError('This shape file exceeds dem extent. This is not allowed') elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]): log.error(fname) raise ValueError('This shape file is at least partially outside of DEM, this is not allowed!') return Line
[docs]def readPoints(fname, dem): """ Read points from .shp Use SHP2Array to read the shape file. Check if the points are laying inside the dem extend Parameters ---------- fname: str path to shape file defname: str name to give to the points in the shape file dem: dict dem dictionary Returns ------- Line : dict Line['Name'] : list of points names Line['Coord'] : np array of the coords of points in points Line['Start'] : list of starting index of each point in Coord Line['Length'] : list of length of each point in Coord """ log.debug('Reading split point : %s ', str(fname)) header = dem['header'] rasterDEM = dem['rasterData'] defname = 'SHP' Points = SHP2Array(fname, defname) Pointx = Points['x'] Pointy = Points['y'] for i in range(len(Pointx)): Lx = (Pointx[i] - header['xllcenter']) / header['cellsize'] Ly = (Pointy[i] - header['yllcenter']) / header['cellsize'] if (Ly < 0 or Ly > header['nrows']-1 or Lx < 0 or Lx > header['ncols']-1): raise ValueError('The split point is not on the dem. Try with another split point') elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]): raise ValueError('Nan Value encountered. Try with another split point') return Points
[docs]def removeFeature(featureIn, nFeature2Remove): """ Remove feature number nFeature2Remove from featureIn Parameters ---------- featureIn: dict shape file dicionary (structure produced by SHP2Array, readLine or readPoint) nFeature2Remove: int index of feature to remove from featureIn Returns ------- featureOut : dict shape file dicionary without feature nFeature2Remove """ StartRel = featureIn['Start'] LengthRel = featureIn['Length'] thickness = featureIn['thickness'] featureOut = copy.deepcopy(featureIn) start = StartRel[nFeature2Remove] end = start + LengthRel[nFeature2Remove] # remove feature featureOut['x'] = np.delete(featureIn['x'], np.arange(int(start), int(end))) featureOut['y'] = np.delete(featureIn['y'], np.arange(int(start), int(end))) if 'z' in featureIn.keys(): featureOut['z'] = np.delete(featureIn['z'], np.arange(int(start), int(end))) del featureOut['Name'][nFeature2Remove] StartRel = featureOut['Start'] StartRel[nFeature2Remove:] = StartRel[nFeature2Remove:] - LengthRel[nFeature2Remove] featureOut['Start'] = np.delete(StartRel, nFeature2Remove) featureOut['Length'] = np.delete(LengthRel, nFeature2Remove) featureOut['thickness'] = np.delete(thickness, nFeature2Remove) return featureOut
[docs]def extractFeature(featureIn, nFeature2Extract): """ Extract feature nFeature2Extract from featureIn Parameters ---------- featureIn: dict shape file dicionary (structure produced by SHP2Array, readLine or readPoint) nFeature2Extract: int index of feature to extract from featureIn Returns ------- featureOut : dict shape file dicionary with feature nFeature2Extract """ NameRel = featureIn['Name'] StartRel = featureIn['Start'] LengthRel = featureIn['Length'] thickness = featureIn['thickness'] featureOut = copy.deepcopy(featureIn) # extract feature featureOut['Name'] = [NameRel[nFeature2Extract]] featureOut['Start'] = np.array([0]) featureOut['Length'] = np.array([LengthRel[nFeature2Extract]]) featureOut['thickness'] = np.array([thickness[nFeature2Extract]]) start = StartRel[nFeature2Extract] end = start + LengthRel[nFeature2Extract] featureOut['x'] = featureIn['x'][int(start):int(end)] featureOut['y'] = featureIn['y'][int(start):int(end)] if 'z' in featureIn.keys(): featureOut['z'] = featureIn['z'][int(start):int(end)] return featureOut
[docs]def writeLine2SHPfile(lineDict, lineName, fileName, header=''): """ write a line to shapefile Parameters ---------- lineDict: dict line dict lineName: str line name fileName: str or pathlib path path where the line will be saved line name header: dict optional argument ('' by default). If provided, header dictionary with 'xllcenter' and 'yllcenter' to add to the line Returns ------- fileName : str path where the line has been saved """ fileName = str(fileName) line = np.zeros((np.size(lineDict['x']), 2)) line[:, 0] = lineDict['x'] line[:, 1] = lineDict['y'] if header: line[:, 0] = line[:, 0] + header['xllcenter'] line[:, 1] = line[:, 1] + header['yllcenter'] w = shapefile.Writer(fileName) w.field('name', 'C') w.line([line]) w.record(lineName) w.close() return fileName
[docs]def writePoint2SHPfile(pointDict, pointName, fileName): """ write a point to shapefile Parameters ---------- pointDict: dict point dict pointName: str point name fileName: str or pathlib path path where the point will be saved Returns ------- fileName : str path where the point has been saved """ fileName = str(fileName) w = shapefile.Writer(fileName) w.field('name', 'C') if len(pointDict['x']) > 1 or len(pointDict['y']) > 1: message = 'Length of pointDict is not allowed to exceed one' log.error(message) raise ValueError(message) w.point(pointDict['x'][0], pointDict['y'][0]) w.record(pointName) w.close() return fileName