Source code for geoprofile.profile

import logging
from typing import Any, Dict, List, Optional, Union

import numpy as np
from numpy.typing import NDArray
from plotly import graph_objects as go
from plotly.graph_objs import Figure
from plotly.subplots import make_subplots
from python_tsp.exact import solve_tsp_dynamic_programming
from scipy import spatial
from shapely.geometry import LineString, Point
from skspatial.objects import Line
from tqdm import tqdm

from geoprofile.column import Column

# optional imports
try:
    import contextily as ctx
    import geopandas as gpd
    import matplotlib.pyplot as plt
    from matplotlib.pyplot import Axes
except ImportError:
    ctx = None
    gpd = None
    plt = None
    Axes = None


[docs]class Section:
[docs] def __init__( self, data_list: List[Column], profile_line: LineString, buffer: float = 10, sorting_algorithm: str = "nearest_neighbor", reproject: bool = True, ) -> None: """ Class that filters, sorts and plot columns based on a profile line. Parameters ---------- data_list: list List with Column classes. profile_line: LineString profile line of the scoss section. buffer: float, optional Default is 10 Buffer distance use to include columns in the profile [m] sorting_algorithm: str, optional Default is nearest_neighbor. Define the sorting algorithm use to sort the data points. Can be one of the following: - tsp (traveling salesman problem) - nearest_neighbor - custom reproject: bool, optional Default is True Reproject points of the column onto the profile line. """ if buffer <= 0: raise ValueError("Buffer distance cannot be less than zero") if sorting_algorithm not in ["tsp", "custom", "nearest_neighbor"]: raise ValueError("Sorting option not available") self._data_list = data_list self._profile_line = profile_line self._buffer = buffer self._sorting_algorithm = sorting_algorithm self._reproject = reproject # validate data if len(data_list) != len(set(map(lambda x: x.name, data_list))): logging.warning("No unique names in columns") if len(data_list) != len(set(map(lambda x: frozenset((x.x, x.y)), data_list))): logging.warning("No unique coordinates in columns")
@property def reproject(self) -> bool: """reproject points of the column onto the profile line.""" return self._reproject @property def profile_line(self) -> LineString: """profile line of the scoss section.""" return self._profile_line @property def data_list_all(self) -> List[Column]: """all columns provided by initializing the class""" return self._data_list @property def buffer(self) -> float: """buffer distance use to include columns in the profile [m]""" return self._buffer @property def sorting_algorithm(self) -> str: """sorting algorithm used to sort columns to profile line""" return self._sorting_algorithm @property def profile_polygon(self) -> LineString: """polygon create based on the profile line and the buffer argument""" return self.profile_line.buffer(self.buffer, cap_style=2, join_style=1) @property def coordinates_all(self) -> NDArray[np.floating]: """list of coordinates of the all the column locations""" return np.array( list( map( lambda item: (item.x, item.y), self.data_list_all, ) ) ) @property def coordinates_include(self) -> NDArray[np.floating]: """list of coordinates of the selected column locations""" return np.array( list( map( lambda item: (item.x, item.y), self.data_list_include, ) ) ) @property def data_list_include(self) -> List[Column]: """selected columns based on profile polygon""" data_list_include = [] for item in self.data_list_all: if Point(item.x, item.y).covered_by(self.profile_polygon): data_list_include.append(item) if len(data_list_include) < 1: raise ValueError( "No data points are selected. Change the profile line or increase the buffer distance." ) return data_list_include @property def distance_matrix_include(self) -> NDArray[np.floating]: """Compute the distance matrix. Returns the matrix of all pair-wise distances [m].""" distance_matrix = spatial.distance_matrix( self.coordinates_include, self.coordinates_include, ) return distance_matrix @property def distance_matrix_include_reprojection(self) -> NDArray[np.floating]: """Compute the distance matrix. Returns the matrix of all pair-wise distances [m].""" # make sure the data list order is consistent coordinates = [ self.coordinates_include_reprojection[i] for i in range(len(self.data_list_include)) ] distance_matrix = spatial.distance_matrix( np.array(coordinates), np.array(coordinates), ) return distance_matrix @property def start_node(self) -> int: """Closed column point on the start of the profile line""" return spatial.distance_matrix( self.coordinates_include, list(zip(*self.profile_line.xy)), ).argmin(axis=0)[0] @property def end_node(self) -> int: """Closed column point on the end of the profile line""" return spatial.distance_matrix( self.coordinates_include, list(zip(*self.profile_line.xy)), ).argmin(axis=0)[-1] @property def coordinates_include_reprojection(self) -> Dict[Union[int, str], List[float]]: """list of coordinates of the selected column locations reprojected on the profile line""" nodes = list(zip(*self.profile_line.xy)) projected_point: Dict[Union[int, str], List[float]] = {} for sigment in range(len(nodes) - 1): for i, item in enumerate(self.coordinates_include): if Point(item[0], item[1]).covered_by( LineString((nodes[sigment], nodes[sigment + 1])).buffer( self.buffer, cap_style=2, join_style=1 ) ): line = Line.from_points( point_a=nodes[sigment], point_b=nodes[sigment + 1] ) point = line.project_point((item[0], item[1])) projected_point[i] = [point[0], point[1]] return projected_point @property def sorting(self) -> tuple: """ Based on the soring algorithm the columns are sorted. Returns ------- permutation A permutation of nodes from 0 to n that produces the least total distance distance The total distance the optimal permutation produces """ if self.sorting_algorithm == "tsp": # place start column first start_node = ( spatial.distance_matrix( self.coordinates_all, list(zip(*self.profile_line.xy)), ) .argmin(axis=0)[0] .copy() ) self._data_list.insert(0, self._data_list[start_node]) self._data_list.pop(start_node) if self.reproject: distance_matrix = self.distance_matrix_include_reprojection else: distance_matrix = self.distance_matrix_include # change the cost function such that every arc to # the depot has cost 0. To create an open tsp distance_matrix[:, self.start_node] = 0 return solve_tsp_dynamic_programming(distance_matrix, maxsize=128) elif self.sorting_algorithm == "nearest_neighbor": if self.reproject: distance_matrix = self.distance_matrix_include_reprojection else: distance_matrix = self.distance_matrix_include # replace all self correlated distances to nan distance_matrix[np.diag_indices(len(self.data_list_include))] = np.nan permutation = [self.start_node] while len(permutation) != len(self.coordinates_include): # add next column to permutation list permutation.append( np.nanargmin(distance_matrix, axis=0)[permutation[-1]] ) # set inf to used columns in distance_matrix distance_matrix[:, permutation[-2]] = np.inf distance_matrix[permutation[-2], :] = np.inf return ( permutation, LineString( [self.coordinates_include[i].tolist() for i in permutation] ).length, ) elif self.sorting_algorithm == "custom": if self.reproject: return ( list(range(0, len(self.data_list_include))), LineString(self.coordinates_include_reprojection).length, ) else: return ( list(range(0, len(self.data_list_include))), LineString(self.coordinates_include).length, ) else: raise ValueError
[docs] def plot_map( self, axis: Optional[Axes] = None, add_basemap: bool = False, add_tags: bool = True, debug: bool = False, ) -> Axes: """ Create a map that contain the following: - location of the columns (gray point) - location of the selected columns (black point) - profile line (gray line) - profile polygon (light gray area) - re-projection (black line) Parameters ---------- axis : plt.Axes, optional plt.Axes used to create the map add_basemap : bool, optional default is False Flag that includes basemap in figure. add_tags : bool, optional default is True Show the CTP names as tags on the map Returns ------- plt.Axes """ if gpd is None or plt is None: raise ImportError("No module named 'geopandas' or matplotlib") if axis is None: # plot all column locations axis = gpd.GeoDataFrame( geometry=gpd.points_from_xy(*zip(*self.coordinates_all)), crs="EPSG:28992", ).plot(color="grey") else: # plot all column locations gpd.GeoDataFrame( geometry=gpd.points_from_xy(*zip(*self.coordinates_all)), crs="EPSG:28992", ).plot(ax=axis, color="grey") # add the use defined profile line gpd.GeoSeries( [self.profile_line], crs="EPSG:28992", ).plot(ax=axis, color="grey") # add the use defined profile line gpd.GeoSeries( [self.profile_polygon], crs="EPSG:28992", ).plot(ax=axis, color="grey", alpha=0.3) # plot all column locations that are included gpd.GeoDataFrame( geometry=gpd.points_from_xy(*zip(*self.coordinates_include)), crs="EPSG:28992", ).plot(ax=axis, color="black") # add the use sorting defined profile line x = [] y = [] for node in self.sorting[0]: x.append(self.data_list_include[node].x) y.append(self.data_list_include[node].y) axis.plot(x, y, "-") # add re-projection of point to line if self.reproject: for key, value in self.coordinates_include_reprojection.items(): plt.annotate( "", xy=self.coordinates_include[key], xycoords="data", xytext=value, textcoords="data", arrowprops=dict(arrowstyle="-", connectionstyle="arc3,rad=0."), ) # add labels (column names) to map if add_tags: for i, item in enumerate( self.data_list_include if debug else self.data_list_all ): axis.annotate( i if debug else item.name, xy=(item.x, item.y), xytext=(3, 3), textcoords="offset points", ) # add base map if add_basemap: if ctx is None: raise ImportError("No module named 'contextily'") ctx.add_basemap( axis, crs="EPSG:28992", source=ctx.providers.OpenStreetMap.Mapnik ) axis.ticklabel_format(useOffset=False, style="plain") plt.tight_layout() return axis
[docs] def plot( self, figure: Optional[Figure] = None, x0: float = 0.0, groundwater_level: bool = False, surface_level: bool = False, **kwargs: Any, ) -> Figure: """ Create profile based on the column's location, sorting algorithm and profile line. Parameters ---------- figure: Figure, optional A plotly Figure to add the profile to. x0: float, optional Default is 0.0 Start of the profile line. groundwater_level: bool, optional Default is False Flag that indicated if groundwater level is added to the profile surface_level: bool, optional Default is False Flag that indicated if groundwater level is added to the profile kwargs: Any kwargs past to the column plot function. Returns ------- plotly.graph_objs.Figure """ if kwargs is None: kwargs = {} # get sorting list and profile distance permutation, distance = self.sorting # get the coordinates of the columns if self.reproject: coordinates_dict = self.coordinates_include_reprojection coordinates_dict["start_line"] = [ self.profile_line.xy[0][0], self.profile_line.xy[1][0], ] coordinates_dict["end_line"] = [ self.profile_line.xy[0][-1], self.profile_line.xy[1][-1], ] distance = self.profile_line.length else: coordinates_dict = dict( zip( permutation, [self.coordinates_include[i].tolist() for i in permutation], ) ) coordinates_dict["start_line"] = [ coordinates_dict[permutation[0]][0], coordinates_dict[permutation[0]][1], ] coordinates_dict["end_line"] = [ coordinates_dict[permutation[-1]][0], coordinates_dict[permutation[-1]][1], ] if figure is None: # initialize plot figure = make_subplots( rows=1, cols=1, y_title="Depth [m REF]", shared_yaxes=True, horizontal_spacing=0.01, column_widths=[distance], ) # duplicate start and endpoint permutation.insert(0, "start_line") permutation.append("end_line") # placeholders groundwater_level_list = [] surface_level_list = [] center_list = [] # loop over permutation for i in tqdm(range(1, len(permutation) - 1), desc="Add column to profile"): # right side d_right = ( LineString( ( ( coordinates_dict[permutation[i]][0], coordinates_dict[permutation[i]][1], ), ( coordinates_dict[permutation[i + 1]][0], coordinates_dict[permutation[i + 1]][1], ), ) ).length / 2 ) # left side d_left = ( LineString( ( ( coordinates_dict[permutation[i]][0], coordinates_dict[permutation[i]][1], ), ( coordinates_dict[permutation[i - 1]][0], coordinates_dict[permutation[i - 1]][1], ), ) ).length / 2 ) # add columns to profile self.data_list_include[permutation[i]].plot( figure, **kwargs, x0=x0, d_left=d_left, d_right=d_right, profile=True, ) # fill list center_list.append(x0 + d_left) groundwater_level_list.append( self.data_list_include[permutation[i]].groundwater_level ) surface_level_list.append(self.data_list_include[permutation[i]].z) # set the starting point of the next column x0 += d_left + d_right # Add dashed blue line representing phreatic level if groundwater_level: figure.add_trace( go.Scatter( name="Groundwater level [m REF]", x=center_list, y=groundwater_level_list, line=dict(color="Blue", dash="dash", width=1), showlegend=True, ) ) # Add dashed brown line representing ground level if surface_level: figure.add_trace( go.Scatter( name="Surface level [m REF]", x=center_list, y=surface_level_list, line=dict(color="Brown", dash="dash", width=1), showlegend=True, ) ) return figure