Skip to content

Spatial Operations#

Spatial indexing (KD-tree, STRtree), point-in-face queries, mesh clipping by polygon, and bounding box subsetting.

pyramids.netcdf.ugrid.MeshSpatialIndex #

Lazy-built spatial index for mesh elements.

Uses scipy.spatial.cKDTree for nearest-neighbor queries and shapely.STRtree for point-in-polygon queries. Both indexes are built on demand and cached.

Attributes:

Name Type Description
_mesh

Reference to the Mesh2d topology.

_node_tree Any

KD-tree on node coordinates (lazy).

_face_tree Any

KD-tree on face centroids (lazy).

_face_strtree Any

STRtree on face polygons (lazy).

_face_polygons list[Any] | None

List of Shapely polygons (lazy).

Source code in src/pyramids/netcdf/ugrid/spatial.py
class MeshSpatialIndex:
    """Lazy-built spatial index for mesh elements.

    Uses scipy.spatial.cKDTree for nearest-neighbor queries and
    shapely.STRtree for point-in-polygon queries. Both indexes
    are built on demand and cached.

    Attributes:
        _mesh: Reference to the Mesh2d topology.
        _node_tree: KD-tree on node coordinates (lazy).
        _face_tree: KD-tree on face centroids (lazy).
        _face_strtree: STRtree on face polygons (lazy).
        _face_polygons: List of Shapely polygons (lazy).
    """

    def __init__(self, mesh: Mesh2d):
        self._mesh = mesh
        self._node_tree: Any = None
        self._face_tree: Any = None
        self._face_strtree: Any = None
        self._face_polygons: list[Any] | None = None

    @property
    def node_tree(self) -> Any:
        """KD-tree on node coordinates. Lazy-built on first access."""
        if self._node_tree is None:
            from scipy.spatial import cKDTree
            self._node_tree = cKDTree(
                np.column_stack([self._mesh.node_x, self._mesh.node_y])
            )
        return self._node_tree

    @property
    def face_tree(self) -> Any:
        """KD-tree on face centroids. Lazy-built on first access."""
        if self._face_tree is None:
            from scipy.spatial import cKDTree
            cx, cy = self._mesh.face_centroids
            self._face_tree = cKDTree(np.column_stack([cx, cy]))
        return self._face_tree

    @property
    def face_strtree(self) -> Any:
        """Shapely STRtree on face polygons. Lazy-built on first access."""
        if self._face_strtree is None:
            self._face_polygons = self._build_face_polygons()
            self._face_strtree = STRtree(self._face_polygons)
        return self._face_strtree

    @property
    def face_polygons(self) -> list[Any]:
        """List of Shapely Polygon objects for all faces."""
        if self._face_polygons is None:
            self._face_polygons = self._build_face_polygons()
        return self._face_polygons

    def _build_face_polygons(self) -> list[Any]:
        """Build Shapely Polygon objects for all mesh faces.

        Returns:
            List of Shapely Polygon objects, one per face.
        """
        polygons = []
        for i in range(self._mesh.n_face):
            coords = self._mesh.get_face_polygon(i)
            closed = np.vstack([coords, coords[0:1]])
            polygons.append(Polygon(closed))
        return polygons

    def locate_nearest_node(
        self,
        x: float | np.ndarray,
        y: float | np.ndarray,
        k: int = 1,
    ) -> np.ndarray:
        """Find k nearest nodes to query point(s).

        Args:
            x: Query x-coordinate(s) (scalar or array).
            y: Query y-coordinate(s) (scalar or array).
            k: Number of nearest neighbors to find.

        Returns:
            Array of node indices. Shape: (k,) for scalar input,
            (n_queries, k) for array input.
        """
        points = np.column_stack([np.atleast_1d(x), np.atleast_1d(y)])
        _, indices = self.node_tree.query(points, k=k)
        result = np.asarray(indices)
        return result

    def locate_nearest_face(
        self,
        x: float | np.ndarray,
        y: float | np.ndarray,
        k: int = 1,
    ) -> np.ndarray:
        """Find k nearest face centroids to query point(s).

        Args:
            x: Query x-coordinate(s) (scalar or array).
            y: Query y-coordinate(s) (scalar or array).
            k: Number of nearest neighbors to find.

        Returns:
            Array of face indices. Shape: (k,) for scalar input,
            (n_queries, k) for array input.
        """
        points = np.column_stack([np.atleast_1d(x), np.atleast_1d(y)])
        _, indices = self.face_tree.query(points, k=k)
        result = np.asarray(indices)
        return result

    def locate_nodes_in_bounds(
        self,
        xmin: float,
        ymin: float,
        xmax: float,
        ymax: float,
    ) -> np.ndarray:
        """Find all nodes within a bounding box.

        Args:
            xmin: Minimum x-coordinate.
            ymin: Minimum y-coordinate.
            xmax: Maximum x-coordinate.
            ymax: Maximum y-coordinate.

        Returns:
            Array of node indices within the bounding box.
        """
        mask = (
            (self._mesh.node_x >= xmin)
            & (self._mesh.node_x <= xmax)
            & (self._mesh.node_y >= ymin)
            & (self._mesh.node_y <= ymax)
        )
        result = np.where(mask)[0]
        return result

    def locate_faces_in_bounds(
        self,
        xmin: float,
        ymin: float,
        xmax: float,
        ymax: float,
    ) -> np.ndarray:
        """Find all faces whose centroids fall within a bounding box.

        Args:
            xmin: Minimum x-coordinate.
            ymin: Minimum y-coordinate.
            xmax: Maximum x-coordinate.
            ymax: Maximum y-coordinate.

        Returns:
            Array of face indices within the bounding box.
        """
        cx, cy = self._mesh.face_centroids
        mask = (cx >= xmin) & (cx <= xmax) & (cy >= ymin) & (cy <= ymax)
        result = np.where(mask)[0]
        return result

    def locate_faces(
        self,
        x: np.ndarray,
        y: np.ndarray,
    ) -> np.ndarray:
        """Find which face contains each query point.

        Uses Shapely STRtree for exact containment testing.
        Returns -1 for points outside all faces.

        Args:
            x: Query x-coordinates (array).
            y: Query y-coordinates (array).

        Returns:
            Array of face indices, -1 for points outside mesh.
        """
        x = np.atleast_1d(x)
        y = np.atleast_1d(y)
        result = np.full(len(x), -1, dtype=np.intp)

        points = shapely.points(x, y)
        strtree = self.face_strtree
        input_idx, tree_idx = strtree.query(points, predicate="within")

        for pt_i, face_i in zip(input_idx, tree_idx):
            if result[pt_i] == -1:
                result[pt_i] = face_i

        return result

node_tree property #

KD-tree on node coordinates. Lazy-built on first access.

face_tree property #

KD-tree on face centroids. Lazy-built on first access.

face_strtree property #

Shapely STRtree on face polygons. Lazy-built on first access.

face_polygons property #

List of Shapely Polygon objects for all faces.

locate_nearest_node(x, y, k=1) #

Find k nearest nodes to query point(s).

Parameters:

Name Type Description Default
x float | ndarray

Query x-coordinate(s) (scalar or array).

required
y float | ndarray

Query y-coordinate(s) (scalar or array).

required
k int

Number of nearest neighbors to find.

1

Returns:

Type Description
ndarray

Array of node indices. Shape: (k,) for scalar input,

ndarray

(n_queries, k) for array input.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def locate_nearest_node(
    self,
    x: float | np.ndarray,
    y: float | np.ndarray,
    k: int = 1,
) -> np.ndarray:
    """Find k nearest nodes to query point(s).

    Args:
        x: Query x-coordinate(s) (scalar or array).
        y: Query y-coordinate(s) (scalar or array).
        k: Number of nearest neighbors to find.

    Returns:
        Array of node indices. Shape: (k,) for scalar input,
        (n_queries, k) for array input.
    """
    points = np.column_stack([np.atleast_1d(x), np.atleast_1d(y)])
    _, indices = self.node_tree.query(points, k=k)
    result = np.asarray(indices)
    return result

locate_nearest_face(x, y, k=1) #

Find k nearest face centroids to query point(s).

Parameters:

Name Type Description Default
x float | ndarray

Query x-coordinate(s) (scalar or array).

required
y float | ndarray

Query y-coordinate(s) (scalar or array).

required
k int

Number of nearest neighbors to find.

1

Returns:

Type Description
ndarray

Array of face indices. Shape: (k,) for scalar input,

ndarray

(n_queries, k) for array input.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def locate_nearest_face(
    self,
    x: float | np.ndarray,
    y: float | np.ndarray,
    k: int = 1,
) -> np.ndarray:
    """Find k nearest face centroids to query point(s).

    Args:
        x: Query x-coordinate(s) (scalar or array).
        y: Query y-coordinate(s) (scalar or array).
        k: Number of nearest neighbors to find.

    Returns:
        Array of face indices. Shape: (k,) for scalar input,
        (n_queries, k) for array input.
    """
    points = np.column_stack([np.atleast_1d(x), np.atleast_1d(y)])
    _, indices = self.face_tree.query(points, k=k)
    result = np.asarray(indices)
    return result

locate_nodes_in_bounds(xmin, ymin, xmax, ymax) #

Find all nodes within a bounding box.

Parameters:

Name Type Description Default
xmin float

Minimum x-coordinate.

required
ymin float

Minimum y-coordinate.

required
xmax float

Maximum x-coordinate.

required
ymax float

Maximum y-coordinate.

required

Returns:

Type Description
ndarray

Array of node indices within the bounding box.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def locate_nodes_in_bounds(
    self,
    xmin: float,
    ymin: float,
    xmax: float,
    ymax: float,
) -> np.ndarray:
    """Find all nodes within a bounding box.

    Args:
        xmin: Minimum x-coordinate.
        ymin: Minimum y-coordinate.
        xmax: Maximum x-coordinate.
        ymax: Maximum y-coordinate.

    Returns:
        Array of node indices within the bounding box.
    """
    mask = (
        (self._mesh.node_x >= xmin)
        & (self._mesh.node_x <= xmax)
        & (self._mesh.node_y >= ymin)
        & (self._mesh.node_y <= ymax)
    )
    result = np.where(mask)[0]
    return result

locate_faces_in_bounds(xmin, ymin, xmax, ymax) #

Find all faces whose centroids fall within a bounding box.

Parameters:

Name Type Description Default
xmin float

Minimum x-coordinate.

required
ymin float

Minimum y-coordinate.

required
xmax float

Maximum x-coordinate.

required
ymax float

Maximum y-coordinate.

required

Returns:

Type Description
ndarray

Array of face indices within the bounding box.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def locate_faces_in_bounds(
    self,
    xmin: float,
    ymin: float,
    xmax: float,
    ymax: float,
) -> np.ndarray:
    """Find all faces whose centroids fall within a bounding box.

    Args:
        xmin: Minimum x-coordinate.
        ymin: Minimum y-coordinate.
        xmax: Maximum x-coordinate.
        ymax: Maximum y-coordinate.

    Returns:
        Array of face indices within the bounding box.
    """
    cx, cy = self._mesh.face_centroids
    mask = (cx >= xmin) & (cx <= xmax) & (cy >= ymin) & (cy <= ymax)
    result = np.where(mask)[0]
    return result

locate_faces(x, y) #

Find which face contains each query point.

Uses Shapely STRtree for exact containment testing. Returns -1 for points outside all faces.

Parameters:

Name Type Description Default
x ndarray

Query x-coordinates (array).

required
y ndarray

Query y-coordinates (array).

required

Returns:

Type Description
ndarray

Array of face indices, -1 for points outside mesh.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def locate_faces(
    self,
    x: np.ndarray,
    y: np.ndarray,
) -> np.ndarray:
    """Find which face contains each query point.

    Uses Shapely STRtree for exact containment testing.
    Returns -1 for points outside all faces.

    Args:
        x: Query x-coordinates (array).
        y: Query y-coordinates (array).

    Returns:
        Array of face indices, -1 for points outside mesh.
    """
    x = np.atleast_1d(x)
    y = np.atleast_1d(y)
    result = np.full(len(x), -1, dtype=np.intp)

    points = shapely.points(x, y)
    strtree = self.face_strtree
    input_idx, tree_idx = strtree.query(points, predicate="within")

    for pt_i, face_i in zip(input_idx, tree_idx):
        if result[pt_i] == -1:
            result[pt_i] = face_i

    return result

pyramids.netcdf.ugrid.spatial.clip_mesh(dataset, mask, touch=True) #

Clip a UGRID dataset to a polygon mask.

Selects faces that intersect (touch=True) or are fully contained within (touch=False) the mask polygon. Renumbers nodes and edges to produce a compact, self-consistent mesh.

Parameters:

Name Type Description Default
dataset Any

Source UgridDataset.

required
mask Any

Polygon mask (GeoDataFrame, FeatureCollection, or Shapely geometry).

required
touch bool

If True, include faces that touch the mask boundary. If False, only include faces fully inside.

True

Returns:

Type Description
Any

New UgridDataset with clipped mesh and subset data.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def clip_mesh(
    dataset: Any,
    mask: Any,
    touch: bool = True,
) -> Any:
    """Clip a UGRID dataset to a polygon mask.

    Selects faces that intersect (touch=True) or are fully contained
    within (touch=False) the mask polygon. Renumbers nodes and edges
    to produce a compact, self-consistent mesh.

    Args:
        dataset: Source UgridDataset.
        mask: Polygon mask (GeoDataFrame, FeatureCollection, or Shapely geometry).
        touch: If True, include faces that touch the mask boundary.
            If False, only include faces fully inside.

    Returns:
        New UgridDataset with clipped mesh and subset data.
    """
    mesh = dataset.mesh

    if hasattr(mask, "_gdf"):
        mask_geom = unary_union(mask._gdf.geometry)
    elif hasattr(mask, "geometry"):
        mask_geom = unary_union(mask.geometry)
    else:
        mask_geom = mask

    spatial_idx = MeshSpatialIndex(mesh)
    face_polys = spatial_idx.face_polygons
    tree = STRtree(face_polys)

    predicate = "intersects" if touch else "contains"
    candidates = tree.query(mask_geom, predicate=predicate)

    selected_faces = sorted(int(c) for c in candidates)

    result = _subset_mesh_by_face_indices(dataset, selected_faces)
    return result

pyramids.netcdf.ugrid.spatial.subset_by_bounds(dataset, xmin, ymin, xmax, ymax) #

Subset mesh to faces whose centroids fall within a bounding box.

Faster than clip_mesh because it only checks face centroids against the box without building Shapely polygons or doing intersection tests. Uses vectorized numpy comparisons.

Parameters:

Name Type Description Default
dataset Any

Source UgridDataset.

required
xmin float

Minimum x-coordinate.

required
ymin float

Minimum y-coordinate.

required
xmax float

Maximum x-coordinate.

required
ymax float

Maximum y-coordinate.

required

Returns:

Type Description
Any

New UgridDataset with faces whose centroids are within the box.

Source code in src/pyramids/netcdf/ugrid/spatial.py
def subset_by_bounds(
    dataset: Any,
    xmin: float,
    ymin: float,
    xmax: float,
    ymax: float,
) -> Any:
    """Subset mesh to faces whose centroids fall within a bounding box.

    Faster than clip_mesh because it only checks face centroids
    against the box without building Shapely polygons or doing
    intersection tests. Uses vectorized numpy comparisons.

    Args:
        dataset: Source UgridDataset.
        xmin: Minimum x-coordinate.
        ymin: Minimum y-coordinate.
        xmax: Maximum x-coordinate.
        ymax: Maximum y-coordinate.

    Returns:
        New UgridDataset with faces whose centroids are within the box.
    """
    mesh = dataset.mesh
    cx, cy = mesh.face_centroids
    mask = (cx >= xmin) & (cx <= xmax) & (cy >= ymin) & (cy <= ymax)
    selected_faces = np.where(mask)[0].tolist()

    result = _subset_mesh_by_face_indices(dataset, selected_faces)
    return result