Skip to content

StreamRaster#

Typed stream-cell raster — produced by Accumulation.streams(...). Carries the threshold and routing tags of its source pipeline so consumers stay aware of how the network was extracted.

Top-level surface:

  • order(method=...) — Strahler / Shreve / Horton / Hack (W-1) / Topological (W-2) ordering.
  • to_vector(flow_direction, dem=None) — vectorise the network into a GeoDataFrame of LineString links with link_id, from_node, to_node, length_m, drop_m, mean_slope, and sinuosity (W-3) columns.
  • main_stem(flow_direction, outlet=None) — binary mask of cells on the longest source-to-outlet path (W-4).
  • prune_short(flow_direction, min_length_m) — drop headwater links below the threshold (W-5).
  • subbasins(flow_direction, method="link") — partition the basin into one sub-basin per stream link.

digitalrivers.stream_raster.StreamRaster #

Bases: Dataset

Boolean/int stream-network raster tagged with extraction threshold.

Parameters:

Name Type Description Default
src Dataset

GDAL dataset wrapping the stream raster.

required
access str

"read_only" (default) or "write".

'read_only'
threshold float | int

Accumulation threshold used to extract this stream network. Stored for provenance and round-trip persistence. Required keyword-only.

required
routing str

Routing scheme of the FlowDirection that produced the upstream accumulation. Required keyword-only. Must be in _SUPPORTED_ROUTING.

required

Raises:

Type Description
ValueError

If routing is not a recognised value at all.

TypeError

If routing is a multi-direction scheme. Convert the FlowDirection to D8 first.

Source code in src/digitalrivers/stream_raster.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
class StreamRaster(Dataset):
    """Boolean/int stream-network raster tagged with extraction threshold.

    Args:
        src: GDAL dataset wrapping the stream raster.
        access: `"read_only"` (default) or `"write"`.
        threshold: Accumulation threshold used to extract this stream
            network. Stored for provenance and round-trip persistence.
            Required keyword-only.
        routing: Routing scheme of the `FlowDirection` that produced the
            upstream accumulation. Required keyword-only. Must be in
            `_SUPPORTED_ROUTING`.

    Raises:
        ValueError: If `routing` is not a recognised value at all.
        TypeError: If `routing` is a multi-direction scheme. Convert the
            `FlowDirection` to D8 first.
    """

    threshold: float | int
    routing: str

    _SUPPORTED_ROUTING: frozenset[str] = frozenset({"d8", "rho8"})

    def __init__(
        self,
        src: gdal.Dataset,
        access: str = "read_only",
        *,
        threshold: float | int,
        routing: str,
    ):
        if routing not in VALID_ROUTING:
            raise ValueError(
                f"routing must be one of {sorted(VALID_ROUTING)}; got {routing!r}"
            )
        if routing not in self._SUPPORTED_ROUTING:
            raise TypeError(
                f"StreamRaster currently supports only single-direction routing "
                f"({sorted(self._SUPPORTED_ROUTING)}); got {routing!r}. "
                f"Convert the FlowDirection to D8 first."
            )
        super().__init__(src, access)
        self.threshold = threshold
        self.routing = routing

    @classmethod
    def from_dataset(
        cls,
        ds: Dataset,
        *,
        threshold: float | int,
        routing: str,
    ) -> StreamRaster:
        """Promote a plain `Dataset` into a `StreamRaster`."""
        return cls(ds.raster, threshold=threshold, routing=routing)

    def to_dataset(self) -> Dataset:
        """Drop the typed wrapper and return the underlying `Dataset`."""
        return Dataset(self.raster)

    def persist_metadata(self) -> None:
        """Write `routing` and `threshold` to the raster's metadata tags."""
        self.meta_data = {
            META_CLASS: type(self).__name__,
            META_ROUTING: self.routing,
            META_THRESHOLD: str(self.threshold),
        }

    @classmethod
    def open(
        cls,
        path: str,
        *,
        threshold: float | int | None = None,
        routing: str | None = None,
    ) -> StreamRaster:
        """Open a `StreamRaster` GeoTIFF.

        Resolution order: explicit kwargs > `DR_*` metadata tags > raise.
        `threshold` is parsed from the tag as a float (it was written via
        `str(self.threshold)`).

        Raises:
            ValueError: If either `routing` or `threshold` cannot be
                resolved from kwargs or metadata tags.
        """
        ds = Dataset.read_file(path)
        md = ds.meta_data or {}
        resolved_routing = routing or md.get(META_ROUTING)
        if resolved_routing is None:
            raise ValueError(
                f"{path!r} carries no DR_ROUTING tag and no routing= was passed. "
                f"Pass routing= explicitly (one of {sorted(VALID_ROUTING)})."
            )
        if threshold is None:
            tag = md.get(META_THRESHOLD)
            if tag is None:
                raise ValueError(
                    f"{path!r} carries no DR_THRESHOLD tag and no threshold= was "
                    f"passed."
                )
            threshold = float(tag)
        return cls(ds.raster, threshold=threshold, routing=resolved_routing)

    def subbasins(
        self,
        flow_direction,
        method: str = "link",
    ) -> WatershedRaster:
        """Partition the basin into one sub-basin per stream link.

        Each cell is labelled with the ID of the first downstream stream
        link it joins. Confluence cells belong to the new downstream link
        (WhiteboxTools / TauDEM convention). Off-stream cells inherit the
        link ID of the first stream cell their flow path reaches.

        Args:
            flow_direction: Single-direction `FlowDirection` aligned to this
                stream raster.
            method: `"link"` (default) — one sub-basin per link. The
                `"min_order"` and `"isobasin"` modes from the spec are
                deferred.

        Returns:
            :class:`WatershedRaster` tagged with this stream raster's
            `routing` (via the FlowDirection). Background cells (those that
            never reach a stream) are 0.

        Raises:
            ValueError: If `method` is not `"link"` or
                `flow_direction` is multi-direction.
        """
        import numpy as np

        from digitalrivers.flow_direction import FlowDirection
        from digitalrivers.watershed_raster import WatershedRaster

        if method != "link":
            raise ValueError(
                f"method must be 'link' (other modes deferred); got {method!r}"
            )
        if not isinstance(flow_direction, FlowDirection):
            raise ValueError("flow_direction must be a FlowDirection instance")
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"subbasins currently supports single-direction routing only; "
                f"got {flow_direction.routing!r}"
            )

        stream_mask = self.read_array().astype(bool, copy=False)
        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        if stream_mask.shape != fdir.shape:
            raise ValueError(
                f"flow_direction shape {fdir.shape} != stream raster shape "
                f"{stream_mask.shape}"
            )

        d_row = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)
        d_col = np.array([0, -1, -1, -1, 0, 1, 1, 1], dtype=np.int32)
        inv_dir = np.array([4, 5, 6, 7, 0, 1, 2, 3], dtype=np.int32)
        rows, cols = stream_mask.shape

        # Incoming-stream count per stream cell (for confluence detection).
        nup = np.zeros(stream_mask.shape, dtype=np.int8)
        for k in range(8):
            dr = int(d_row[k])
            dc = int(d_col[k])
            src_r = slice(max(0, dr), min(rows, rows + dr))
            src_c = slice(max(0, dc), min(cols, cols + dc))
            dst_r = slice(max(0, -dr), min(rows, rows - dr))
            dst_c = slice(max(0, -dc), min(cols, cols - dc))
            sm_src = stream_mask[src_r, src_c]
            fd_src = fdir[src_r, src_c]
            inflow = sm_src & (fd_src == inv_dir[k]) & stream_mask[dst_r, dst_c]
            nup[dst_r, dst_c] += inflow.astype(np.int8)

        link_id = np.zeros((rows, cols), dtype=np.int32)
        next_id = 1
        # Heads and downstream-of-confluence cells start new links.
        # Build link IDs by walking from every head/confluence-downstream entry.
        starts = stream_mask & ((nup == 0) | (nup >= 2))
        for r0, c0 in np.argwhere(starts):
            r0 = int(r0)
            c0 = int(c0)
            if link_id[r0, c0] != 0:
                continue
            current = next_id
            next_id += 1
            link_id[r0, c0] = current
            r, c = r0, c0
            while True:
                d = int(fdir[r, c])
                if d < 0 or d > 7:
                    break
                nr = r + int(d_row[d])
                nc = c + int(d_col[d])
                if not (0 <= nr < rows and 0 <= nc < cols):
                    break
                if not stream_mask[nr, nc]:
                    break
                if nup[nr, nc] >= 2:
                    break  # confluence — handled by its own iteration
                if link_id[nr, nc] != 0:
                    break  # already assigned by an earlier walk
                link_id[nr, nc] = current
                r, c = nr, nc

        # Off-stream cells: walk downstream until hitting a labelled cell.
        out = link_id.copy()
        for r0 in range(rows):
            for c0 in range(cols):
                if out[r0, c0] != 0:
                    continue
                path: list[tuple[int, int]] = []
                r, c = r0, c0
                tail_id = 0
                while True:
                    if out[r, c] != 0:
                        tail_id = int(out[r, c])
                        break
                    path.append((r, c))
                    d = int(fdir[r, c])
                    if d < 0 or d > 7:
                        break
                    nr = r + int(d_row[d])
                    nc = c + int(d_col[d])
                    if not (0 <= nr < rows and 0 <= nc < cols):
                        break
                    r, c = nr, nc
                if tail_id != 0:
                    for pr, pc in path:
                        out[pr, pc] = tail_id

        plain = Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=0,
        )

        import geopandas as gpd
        unique_ids = sorted({int(v) for v in np.unique(out) if v != 0})
        # For each link, the outlet is the cell whose D8 successor either
        # belongs to a different basin or falls off the grid — i.e., the
        # most-downstream cell of the link. Pick the first such cell we
        # find; ties don't matter because all such cells map to the same
        # outlet point cluster on a well-formed flow graph.
        x0, dx, _, y0, _, dy = self.geotransform
        outlet_xs: list[float] = []
        outlet_ys: list[float] = []
        for bid in unique_ids:
            rs, cs = np.where(out == bid)
            chosen_r, chosen_c = int(rs[0]), int(cs[0])
            for r, c in zip(rs.tolist(), cs.tolist()):
                d = int(fdir[r, c])
                if d < 0 or d > 7:
                    chosen_r, chosen_c = r, c
                    break
                nr = r + int(d_row[d])
                nc = c + int(d_col[d])
                if not (0 <= nr < rows and 0 <= nc < cols):
                    chosen_r, chosen_c = r, c
                    break
                if int(out[nr, nc]) != bid:
                    chosen_r, chosen_c = r, c
                    break
            outlet_xs.append(float(x0 + (chosen_c + 0.5) * dx))
            outlet_ys.append(float(y0 + (chosen_r + 0.5) * dy))
        outlets_gdf = gpd.GeoDataFrame(
            {"basin_id": unique_ids,
             "cell_count": [int((out == bid).sum()) for bid in unique_ids]},
            geometry=gpd.points_from_xy(outlet_xs, outlet_ys),
            crs=self.epsg,
        )
        return WatershedRaster.from_dataset(
            plain, routing=flow_direction.routing, outlets=outlets_gdf,
        )

    def order(
        self,
        method: str = "strahler",
        flow_direction=None,
    ) -> "StreamRaster":
        """Compute a stream-order raster: Strahler, Shreve, Horton, Hack, or Topological.

        Args:
            method: `"strahler"` (default), `"shreve"`, `"horton"`,
                `"hack"`, or `"topological"`.
            flow_direction: Single-direction (`d8` / `rho8`) FlowDirection
                aligned to this stream raster. Required — the topology walks
                the flow-direction edges.

        Returns:
            A new `StreamRaster` whose underlying raster holds the stream
            order; non-stream cells hold `0`. dtype is uint16 for most
            methods and uint32 for `shreve` and `topological`. The returned
            object preserves this raster's `threshold` and `routing` tags
            for downstream consumers.

        Raises:
            ValueError: If `method` is unknown or `flow_direction` is
                missing / multi-direction.
        """
        import numpy as np

        from digitalrivers._streams.order import (
            hack,
            horton,
            shreve,
            strahler,
            topological,
        )
        from digitalrivers.flow_direction import FlowDirection

        if method not in ("strahler", "shreve", "horton", "hack", "topological"):
            raise ValueError(
                f"method must be one of 'strahler', 'shreve', 'horton', "
                f"'hack', 'topological'; got {method!r}"
            )
        if not isinstance(flow_direction, FlowDirection):
            raise ValueError(
                "flow_direction is required and must be a FlowDirection"
            )
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"order currently supports single-direction routing only; got "
                f"{flow_direction.routing!r}"
            )
        stream_mask = self.read_array().astype(bool, copy=False)
        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        if stream_mask.shape != fdir.shape:
            raise ValueError(
                f"flow_direction shape {fdir.shape} != stream raster shape "
                f"{stream_mask.shape}"
            )
        if method == "strahler":
            arr = strahler(stream_mask, fdir)
        elif method == "shreve":
            arr = shreve(stream_mask, fdir)
        elif method == "horton":
            arr = horton(stream_mask, fdir)
        elif method == "hack":
            arr = hack(stream_mask, fdir)
        else:
            arr = topological(stream_mask, fdir)
        plain = Dataset.create_from_array(
            arr, geo=self.geotransform, epsg=self.epsg, no_data_value=0,
        )
        return StreamRaster.from_dataset(
            plain, threshold=self.threshold, routing=self.routing
        )

    def prune_short(
        self,
        flow_direction,
        min_length_m: float,
    ) -> "StreamRaster":
        """Drop headwater links shorter than `min_length_m`.

        A **headwater link** runs from a head (in-degree 0) downstream until
        it hits either a confluence (in-degree ≥ 2) or an outlet (no
        downstream stream neighbour). The link's planimetric length is the
        sum of per-step distances using `cell_size` for cardinal moves and
        `cell_size * sqrt(2)` for diagonals.

        Internal links (between two confluences) are left untouched — pruning
        them would create dangles and break the network topology. To prune
        deep into the network repeatedly, call `prune_short` multiple times.

        Args:
            flow_direction: Single-direction (`d8` / `rho8`) `FlowDirection`
                aligned to this stream raster.
            min_length_m: Length threshold in map units. Headwater links with
                planimetric length strictly less than this value are removed.

        Returns:
            A new `StreamRaster` carrying the same `routing` and `threshold`
            tags; the underlying raster is `uint8` (1 at surviving stream
            cells, 0 elsewhere).

        Raises:
            TypeError: If `flow_direction` is not a `FlowDirection`.
            ValueError: If `flow_direction` is multi-direction or shape
                mismatches, or `min_length_m` is negative.
        """
        import numpy as np

        from digitalrivers._streams.order import (
            _DIR_DR,
            _DIR_DC,
            _build_topology,
        )
        from digitalrivers.flow_direction import FlowDirection

        if not isinstance(flow_direction, FlowDirection):
            raise TypeError(
                f"flow_direction must be a FlowDirection; got "
                f"{type(flow_direction).__name__}"
            )
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"prune_short supports single-direction routing only; got "
                f"{flow_direction.routing!r}"
            )
        if min_length_m < 0:
            raise ValueError(
                f"min_length_m must be non-negative; got {min_length_m!r}"
            )

        sm = self.read_array().astype(bool, copy=False).copy()
        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        if sm.shape != fdir.shape:
            raise ValueError(
                f"flow_direction shape {fdir.shape} != stream raster shape "
                f"{sm.shape}"
            )

        indeg, _ds_idx = _build_topology(sm, fdir)
        rows, cols = sm.shape
        cs = float(self.cell_size)
        diag = cs * (2.0 ** 0.5)

        # Trace every headwater link and remove its cells if too short.
        head_locs = list(zip(*np.where(sm & (indeg == 0))))
        for r0, c0 in head_locs:
            path: list[tuple[int, int]] = [(int(r0), int(c0))]
            length = 0.0
            r, c = int(r0), int(c0)
            while True:
                d = int(fdir[r, c])
                if d < 0 or d > 7:
                    break
                nr, nc = r + int(_DIR_DR[d]), c + int(_DIR_DC[d])
                if not (0 <= nr < rows and 0 <= nc < cols):
                    break
                if not sm[nr, nc]:
                    break
                step = diag if int(_DIR_DR[d]) and int(_DIR_DC[d]) else cs
                length += step
                if indeg[nr, nc] >= 2:
                    # (nr, nc) is the downstream confluence — link ends BEFORE
                    # this cell so the confluence stays in the network.
                    break
                path.append((nr, nc))
                r, c = nr, nc
            if length < min_length_m:
                for pr, pc in path:
                    sm[pr, pc] = False

        plain = Dataset.create_from_array(
            sm.astype(np.uint8), geo=self.geotransform, epsg=self.epsg,
            no_data_value=0,
        )
        return StreamRaster.from_dataset(
            plain, threshold=self.threshold, routing=self.routing,
        )

    def main_stem(
        self,
        flow_direction,
        outlet: tuple[int, int] | None = None,
    ) -> np.ndarray:
        """Return a binary mask of cells along the catchment's main stem.

        The main stem is the longest flow path from any outlet to any head.
        Starting at an outlet, the walk picks the upstream stream neighbour
        with the greatest upstream flow length at every junction; ties are
        broken by lower row-major linear index for determinism.

        When `outlet` is `None`, the catchment's main outlet is the stream
        outlet carrying the longest upstream flow length (i.e. the basin
        whose longest source-to-outlet path is largest).

        Args:
            flow_direction: Single-direction (`d8` / `rho8`) `FlowDirection`
                aligned to this stream raster.
            outlet: Optional `(row, col)` of a specific outlet cell. The cell
                must be a stream cell — typically with `fdir = -1` or whose
                D8 receiver is non-stream. If `None`, the longest outlet is
                used.

        Returns:
            `(rows, cols)` bool array — True for cells on the main stem,
            False elsewhere.

        Raises:
            TypeError: If `flow_direction` is not a `FlowDirection`.
            ValueError: If `flow_direction` is multi-direction, shape
                mismatches, or `outlet` is not a stream cell.
        """
        import numpy as np

        from digitalrivers._streams.order import (
            _DIR_DR,
            _DIR_DC,
            _INV_DIR,
            _build_topology,
            _stream_outlets,
            _upstream_length_from_head,
        )
        from digitalrivers.flow_direction import FlowDirection

        if not isinstance(flow_direction, FlowDirection):
            raise TypeError(
                f"flow_direction must be a FlowDirection; got "
                f"{type(flow_direction).__name__}"
            )
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"main_stem supports single-direction routing only; got "
                f"{flow_direction.routing!r}"
            )

        sm = self.read_array().astype(bool, copy=False)
        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        if sm.shape != fdir.shape:
            raise ValueError(
                f"flow_direction shape {fdir.shape} != stream raster shape "
                f"{sm.shape}"
            )

        indeg, _ds_idx = _build_topology(sm, fdir)
        length = _upstream_length_from_head(sm, fdir, indeg)

        if outlet is None:
            outlets = _stream_outlets(sm, fdir)
            if not outlets:
                return np.zeros_like(sm)
            outlet = max(outlets, key=lambda rc: int(length[rc]))
        else:
            r0, c0 = outlet
            if not (0 <= r0 < sm.shape[0] and 0 <= c0 < sm.shape[1]):
                raise ValueError(f"outlet {outlet!r} is outside the raster")
            if not sm[r0, c0]:
                raise ValueError(f"outlet {outlet!r} is not a stream cell")

        rows, cols = sm.shape
        mask = np.zeros_like(sm)
        r, c = int(outlet[0]), int(outlet[1])
        while True:
            mask[r, c] = True
            # Find inflow neighbours (cells whose D8 receiver is (r, c)).
            best: tuple[int, int, int, int] | None = None  # (length, lin, r, c)
            for k in range(8):
                dr = int(_DIR_DR[k])
                dc = int(_DIR_DC[k])
                ur, uc = r + dr, c + dc
                if not (0 <= ur < rows and 0 <= uc < cols):
                    continue
                if not sm[ur, uc]:
                    continue
                if int(fdir[ur, uc]) != int(_INV_DIR[k]):
                    continue
                cand = (int(length[ur, uc]), -(ur * cols + uc), ur, uc)
                if best is None or cand > best:
                    best = cand
            if best is None:
                break
            r, c = best[2], best[3]
        return mask

    def to_vector(
        self,
        flow_direction,
        dem=None,
        single_direction: str = "max",
    ):
        """Vectorise the stream raster into a `GeoDataFrame` of LineString links.

        Walks the flow-direction raster from every head and every cell
        downstream of a confluence until it reaches a confluence or an outlet.
        Each resulting LineString is one stream link.

        Args:
            flow_direction: `FlowDirection` raster aligned to this stream
                raster. Must be a single-direction routing (`d8` or `rho8`)
                — multi-direction inputs raise. (D∞ / MFD inputs would need a
                separate dominant-direction collapse, deferred.)
            dem: Optional `DEM` aligned to the stream raster. When supplied,
                the link attributes include `drop_m` and `mean_slope`.
            single_direction: Reserved for future multi-direction collapse
                (`"max"` for argmax-of-fractions; `"weighted"` for
                weighted-mean direction). Ignored when `flow_direction` is
                already single-direction.

        Returns:
            `geopandas.GeoDataFrame` with columns:
              - `link_id` (int64): 0-based sequential link identifier.
              - `from_node` (int64): node ID at the upstream end (head /
                confluence).
              - `to_node` (int64): node ID at the downstream end.
              - `length_m` (float64): sum of per-step distances using the
                D8 grid-lengths lookup (`cell_size` cardinal,
                `cell_size * sqrt(2)` diagonal).
              - `drop_m` (float64): `z[from] - z[to]` (positive if the
                link descends; clamped to 0 otherwise). NaN if `dem` is
                None.
              - `mean_slope` (float64): `drop_m / length_m` (m/m). NaN if
                `dem` is None or `length_m == 0`.
              - `sinuosity` (float64): `length_m / straight_line_distance`
                between the link's two endpoints (≥ 1.0 for non-degenerate
                links; exactly 1.0 for straight links and single-cell
                degenerate links).
              - `geometry`: shapely `LineString` in the dataset's CRS,
                vertices at cell centres.

        Raises:
            ValueError: If `flow_direction` is multi-direction.
            ValueError: If shapes do not match.
        """
        import geopandas as gpd
        import numpy as np
        from shapely.geometry import LineString

        from digitalrivers.flow_direction import FlowDirection  # for type-narrow

        if not isinstance(flow_direction, FlowDirection):
            raise TypeError(
                f"flow_direction must be a FlowDirection; got {type(flow_direction).__name__}"
            )
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"to_vector currently supports single-direction routing only; "
                f"got {flow_direction.routing!r}. Collapse the flow direction to "
                f"D8 first."
            )

        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        stream_mask = self.read_array().astype(bool, copy=False)
        if fdir.shape != stream_mask.shape:
            raise ValueError(
                f"flow_direction shape {fdir.shape} != stream raster shape "
                f"{stream_mask.shape}"
            )

        if dem is not None:
            z = dem.read_array().astype(np.float64, copy=False)
            no_val = dem.no_data_value[0] if dem.no_data_value else None
            if no_val is not None:
                z = np.where(z == no_val, np.nan, z)
            if z.shape != stream_mask.shape:
                raise ValueError(
                    f"dem shape {z.shape} != stream raster shape {stream_mask.shape}"
                )
        else:
            z = None

        # 8-direction offsets matching DIR_OFFSETS (0=S, 1=SW, ..., 7=SE).
        d_row = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)
        d_col = np.array([0, -1, -1, -1, 0, 1, 1, 1], dtype=np.int32)
        # Inverse direction: if cell at offset (dr, dc) has direction = inv[k],
        # it is flowing INTO us.
        inv_dir = np.array([4, 5, 6, 7, 0, 1, 2, 3], dtype=np.int32)
        grid_lengths = np.array(
            [1.0, np.sqrt(2.0), 1.0, np.sqrt(2.0),
             1.0, np.sqrt(2.0), 1.0, np.sqrt(2.0)],
            dtype=np.float64,
        ) * float(self.cell_size)

        rows, cols = stream_mask.shape

        # Step 1 — incoming-stream count per stream cell.
        nup = np.zeros(stream_mask.shape, dtype=np.int8)
        for k in range(8):
            dr = int(d_row[k])
            dc = int(d_col[k])
            src_r = slice(max(0, dr), min(rows, rows + dr))
            src_c = slice(max(0, dc), min(cols, cols + dc))
            dst_r = slice(max(0, -dr), min(rows, rows - dr))
            dst_c = slice(max(0, -dc), min(cols, cols - dc))
            sm_src = stream_mask[src_r, src_c]
            fd_src = fdir[src_r, src_c]
            # A neighbour at (src) points into (dst) iff its direction equals inv[k].
            inflow = sm_src & (fd_src == inv_dir[k]) & stream_mask[dst_r, dst_c]
            nup[dst_r, dst_c] += inflow.astype(np.int8)

        # Step 2 — find link starts.
        heads_or_confluences_mask = stream_mask & ((nup == 0) | (nup >= 2))

        # Step 3 — walk each link.
        def _trace(start_r: int, start_c: int):
            path = [(start_r, start_c)]
            r, c = start_r, start_c
            length = 0.0
            while True:
                d = int(fdir[r, c])
                if d < 0 or d > 7:
                    break  # sink / outlet
                nr = r + int(d_row[d])
                nc = c + int(d_col[d])
                if not (0 <= nr < rows and 0 <= nc < cols):
                    break
                if not stream_mask[nr, nc]:
                    break
                length += float(grid_lengths[d])
                path.append((nr, nc))
                if nup[nr, nc] >= 2:
                    break
                r, c = nr, nc
            return path, length

        # Assign node IDs: every distinct head / confluence / link-end is a node.
        node_id_grid = np.full(stream_mask.shape, -1, dtype=np.int64)
        next_node_id = 0

        def _get_node_id(r: int, c: int) -> int:
            nonlocal next_node_id
            if node_id_grid[r, c] < 0:
                node_id_grid[r, c] = next_node_id
                next_node_id += 1
            return int(node_id_grid[r, c])

        gt = self.geotransform
        records: list[dict] = []
        link_id = 0
        starts = np.argwhere(heads_or_confluences_mask)
        for r0, c0 in starts:
            r0 = int(r0)
            c0 = int(c0)
            # Skip a confluence cell if it has no outgoing direction (it's an outlet
            # confluence — handled when its upstream link arrives, no link begins here).
            d_start = int(fdir[r0, c0])
            if d_start < 0 or d_start > 7:
                continue
            path, length_m = _trace(r0, c0)
            if len(path) < 2:
                continue
            from_node = _get_node_id(r0, c0)
            r_end, c_end = path[-1]
            to_node = _get_node_id(r_end, c_end)

            xs = [gt[0] + (c + 0.5) * gt[1] + (r + 0.5) * gt[2] for r, c in path]
            ys = [gt[3] + (c + 0.5) * gt[4] + (r + 0.5) * gt[5] for r, c in path]
            geom = LineString(zip(xs, ys))

            # Sinuosity = traced length / straight-line distance between endpoints.
            # Degenerate (single-cell or zero-length) links default to 1.0 — straight.
            straight_m = float(((xs[-1] - xs[0]) ** 2 + (ys[-1] - ys[0]) ** 2) ** 0.5)
            if straight_m > 0.0:
                sinuosity = length_m / straight_m
            else:
                sinuosity = 1.0

            if z is not None:
                z_from = float(z[r0, c0])
                z_to = float(z[r_end, c_end])
                drop_m = max(0.0, z_from - z_to)
                mean_slope = drop_m / length_m if length_m > 0 else np.nan
            else:
                drop_m = np.nan
                mean_slope = np.nan

            records.append({
                "link_id": link_id,
                "from_node": from_node,
                "to_node": to_node,
                "length_m": length_m,
                "drop_m": drop_m,
                "mean_slope": mean_slope,
                "sinuosity": sinuosity,
                "geometry": geom,
            })
            link_id += 1

        crs = self.epsg
        return gpd.GeoDataFrame(records, geometry="geometry", crs=crs)

    def __repr__(self) -> str:
        return (
            f"<StreamRaster rows={self.rows} cols={self.columns} "
            f"threshold={self.threshold!r} routing={self.routing!r}>"
        )

from_dataset(ds, *, threshold, routing) classmethod #

Promote a plain Dataset into a StreamRaster.

Source code in src/digitalrivers/stream_raster.py
@classmethod
def from_dataset(
    cls,
    ds: Dataset,
    *,
    threshold: float | int,
    routing: str,
) -> StreamRaster:
    """Promote a plain `Dataset` into a `StreamRaster`."""
    return cls(ds.raster, threshold=threshold, routing=routing)

to_dataset() #

Drop the typed wrapper and return the underlying Dataset.

Source code in src/digitalrivers/stream_raster.py
def to_dataset(self) -> Dataset:
    """Drop the typed wrapper and return the underlying `Dataset`."""
    return Dataset(self.raster)

persist_metadata() #

Write routing and threshold to the raster's metadata tags.

Source code in src/digitalrivers/stream_raster.py
def persist_metadata(self) -> None:
    """Write `routing` and `threshold` to the raster's metadata tags."""
    self.meta_data = {
        META_CLASS: type(self).__name__,
        META_ROUTING: self.routing,
        META_THRESHOLD: str(self.threshold),
    }

open(path, *, threshold=None, routing=None) classmethod #

Open a StreamRaster GeoTIFF.

Resolution order: explicit kwargs > DR_* metadata tags > raise. threshold is parsed from the tag as a float (it was written via str(self.threshold)).

Raises:

Type Description
ValueError

If either routing or threshold cannot be resolved from kwargs or metadata tags.

Source code in src/digitalrivers/stream_raster.py
@classmethod
def open(
    cls,
    path: str,
    *,
    threshold: float | int | None = None,
    routing: str | None = None,
) -> StreamRaster:
    """Open a `StreamRaster` GeoTIFF.

    Resolution order: explicit kwargs > `DR_*` metadata tags > raise.
    `threshold` is parsed from the tag as a float (it was written via
    `str(self.threshold)`).

    Raises:
        ValueError: If either `routing` or `threshold` cannot be
            resolved from kwargs or metadata tags.
    """
    ds = Dataset.read_file(path)
    md = ds.meta_data or {}
    resolved_routing = routing or md.get(META_ROUTING)
    if resolved_routing is None:
        raise ValueError(
            f"{path!r} carries no DR_ROUTING tag and no routing= was passed. "
            f"Pass routing= explicitly (one of {sorted(VALID_ROUTING)})."
        )
    if threshold is None:
        tag = md.get(META_THRESHOLD)
        if tag is None:
            raise ValueError(
                f"{path!r} carries no DR_THRESHOLD tag and no threshold= was "
                f"passed."
            )
        threshold = float(tag)
    return cls(ds.raster, threshold=threshold, routing=resolved_routing)

subbasins(flow_direction, method='link') #

Partition the basin into one sub-basin per stream link.

Each cell is labelled with the ID of the first downstream stream link it joins. Confluence cells belong to the new downstream link (WhiteboxTools / TauDEM convention). Off-stream cells inherit the link ID of the first stream cell their flow path reaches.

Parameters:

Name Type Description Default
flow_direction

Single-direction FlowDirection aligned to this stream raster.

required
method str

"link" (default) — one sub-basin per link. The "min_order" and "isobasin" modes from the spec are deferred.

'link'

Returns:

Type Description
WatershedRaster

class:WatershedRaster tagged with this stream raster's

WatershedRaster

routing (via the FlowDirection). Background cells (those that

WatershedRaster

never reach a stream) are 0.

Raises:

Type Description
ValueError

If method is not "link" or flow_direction is multi-direction.

Source code in src/digitalrivers/stream_raster.py
def subbasins(
    self,
    flow_direction,
    method: str = "link",
) -> WatershedRaster:
    """Partition the basin into one sub-basin per stream link.

    Each cell is labelled with the ID of the first downstream stream
    link it joins. Confluence cells belong to the new downstream link
    (WhiteboxTools / TauDEM convention). Off-stream cells inherit the
    link ID of the first stream cell their flow path reaches.

    Args:
        flow_direction: Single-direction `FlowDirection` aligned to this
            stream raster.
        method: `"link"` (default) — one sub-basin per link. The
            `"min_order"` and `"isobasin"` modes from the spec are
            deferred.

    Returns:
        :class:`WatershedRaster` tagged with this stream raster's
        `routing` (via the FlowDirection). Background cells (those that
        never reach a stream) are 0.

    Raises:
        ValueError: If `method` is not `"link"` or
            `flow_direction` is multi-direction.
    """
    import numpy as np

    from digitalrivers.flow_direction import FlowDirection
    from digitalrivers.watershed_raster import WatershedRaster

    if method != "link":
        raise ValueError(
            f"method must be 'link' (other modes deferred); got {method!r}"
        )
    if not isinstance(flow_direction, FlowDirection):
        raise ValueError("flow_direction must be a FlowDirection instance")
    if flow_direction.routing not in ("d8", "rho8"):
        raise ValueError(
            f"subbasins currently supports single-direction routing only; "
            f"got {flow_direction.routing!r}"
        )

    stream_mask = self.read_array().astype(bool, copy=False)
    fdir = flow_direction.read_array().astype(np.int32, copy=False)
    if stream_mask.shape != fdir.shape:
        raise ValueError(
            f"flow_direction shape {fdir.shape} != stream raster shape "
            f"{stream_mask.shape}"
        )

    d_row = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)
    d_col = np.array([0, -1, -1, -1, 0, 1, 1, 1], dtype=np.int32)
    inv_dir = np.array([4, 5, 6, 7, 0, 1, 2, 3], dtype=np.int32)
    rows, cols = stream_mask.shape

    # Incoming-stream count per stream cell (for confluence detection).
    nup = np.zeros(stream_mask.shape, dtype=np.int8)
    for k in range(8):
        dr = int(d_row[k])
        dc = int(d_col[k])
        src_r = slice(max(0, dr), min(rows, rows + dr))
        src_c = slice(max(0, dc), min(cols, cols + dc))
        dst_r = slice(max(0, -dr), min(rows, rows - dr))
        dst_c = slice(max(0, -dc), min(cols, cols - dc))
        sm_src = stream_mask[src_r, src_c]
        fd_src = fdir[src_r, src_c]
        inflow = sm_src & (fd_src == inv_dir[k]) & stream_mask[dst_r, dst_c]
        nup[dst_r, dst_c] += inflow.astype(np.int8)

    link_id = np.zeros((rows, cols), dtype=np.int32)
    next_id = 1
    # Heads and downstream-of-confluence cells start new links.
    # Build link IDs by walking from every head/confluence-downstream entry.
    starts = stream_mask & ((nup == 0) | (nup >= 2))
    for r0, c0 in np.argwhere(starts):
        r0 = int(r0)
        c0 = int(c0)
        if link_id[r0, c0] != 0:
            continue
        current = next_id
        next_id += 1
        link_id[r0, c0] = current
        r, c = r0, c0
        while True:
            d = int(fdir[r, c])
            if d < 0 or d > 7:
                break
            nr = r + int(d_row[d])
            nc = c + int(d_col[d])
            if not (0 <= nr < rows and 0 <= nc < cols):
                break
            if not stream_mask[nr, nc]:
                break
            if nup[nr, nc] >= 2:
                break  # confluence — handled by its own iteration
            if link_id[nr, nc] != 0:
                break  # already assigned by an earlier walk
            link_id[nr, nc] = current
            r, c = nr, nc

    # Off-stream cells: walk downstream until hitting a labelled cell.
    out = link_id.copy()
    for r0 in range(rows):
        for c0 in range(cols):
            if out[r0, c0] != 0:
                continue
            path: list[tuple[int, int]] = []
            r, c = r0, c0
            tail_id = 0
            while True:
                if out[r, c] != 0:
                    tail_id = int(out[r, c])
                    break
                path.append((r, c))
                d = int(fdir[r, c])
                if d < 0 or d > 7:
                    break
                nr = r + int(d_row[d])
                nc = c + int(d_col[d])
                if not (0 <= nr < rows and 0 <= nc < cols):
                    break
                r, c = nr, nc
            if tail_id != 0:
                for pr, pc in path:
                    out[pr, pc] = tail_id

    plain = Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=0,
    )

    import geopandas as gpd
    unique_ids = sorted({int(v) for v in np.unique(out) if v != 0})
    # For each link, the outlet is the cell whose D8 successor either
    # belongs to a different basin or falls off the grid — i.e., the
    # most-downstream cell of the link. Pick the first such cell we
    # find; ties don't matter because all such cells map to the same
    # outlet point cluster on a well-formed flow graph.
    x0, dx, _, y0, _, dy = self.geotransform
    outlet_xs: list[float] = []
    outlet_ys: list[float] = []
    for bid in unique_ids:
        rs, cs = np.where(out == bid)
        chosen_r, chosen_c = int(rs[0]), int(cs[0])
        for r, c in zip(rs.tolist(), cs.tolist()):
            d = int(fdir[r, c])
            if d < 0 or d > 7:
                chosen_r, chosen_c = r, c
                break
            nr = r + int(d_row[d])
            nc = c + int(d_col[d])
            if not (0 <= nr < rows and 0 <= nc < cols):
                chosen_r, chosen_c = r, c
                break
            if int(out[nr, nc]) != bid:
                chosen_r, chosen_c = r, c
                break
        outlet_xs.append(float(x0 + (chosen_c + 0.5) * dx))
        outlet_ys.append(float(y0 + (chosen_r + 0.5) * dy))
    outlets_gdf = gpd.GeoDataFrame(
        {"basin_id": unique_ids,
         "cell_count": [int((out == bid).sum()) for bid in unique_ids]},
        geometry=gpd.points_from_xy(outlet_xs, outlet_ys),
        crs=self.epsg,
    )
    return WatershedRaster.from_dataset(
        plain, routing=flow_direction.routing, outlets=outlets_gdf,
    )

order(method='strahler', flow_direction=None) #

Compute a stream-order raster: Strahler, Shreve, Horton, Hack, or Topological.

Parameters:

Name Type Description Default
method str

"strahler" (default), "shreve", "horton", "hack", or "topological".

'strahler'
flow_direction

Single-direction (d8 / rho8) FlowDirection aligned to this stream raster. Required — the topology walks the flow-direction edges.

None

Returns:

Type Description
'StreamRaster'

A new StreamRaster whose underlying raster holds the stream

'StreamRaster'

order; non-stream cells hold 0. dtype is uint16 for most

'StreamRaster'

methods and uint32 for shreve and topological. The returned

'StreamRaster'

object preserves this raster's threshold and routing tags

'StreamRaster'

for downstream consumers.

Raises:

Type Description
ValueError

If method is unknown or flow_direction is missing / multi-direction.

Source code in src/digitalrivers/stream_raster.py
def order(
    self,
    method: str = "strahler",
    flow_direction=None,
) -> "StreamRaster":
    """Compute a stream-order raster: Strahler, Shreve, Horton, Hack, or Topological.

    Args:
        method: `"strahler"` (default), `"shreve"`, `"horton"`,
            `"hack"`, or `"topological"`.
        flow_direction: Single-direction (`d8` / `rho8`) FlowDirection
            aligned to this stream raster. Required — the topology walks
            the flow-direction edges.

    Returns:
        A new `StreamRaster` whose underlying raster holds the stream
        order; non-stream cells hold `0`. dtype is uint16 for most
        methods and uint32 for `shreve` and `topological`. The returned
        object preserves this raster's `threshold` and `routing` tags
        for downstream consumers.

    Raises:
        ValueError: If `method` is unknown or `flow_direction` is
            missing / multi-direction.
    """
    import numpy as np

    from digitalrivers._streams.order import (
        hack,
        horton,
        shreve,
        strahler,
        topological,
    )
    from digitalrivers.flow_direction import FlowDirection

    if method not in ("strahler", "shreve", "horton", "hack", "topological"):
        raise ValueError(
            f"method must be one of 'strahler', 'shreve', 'horton', "
            f"'hack', 'topological'; got {method!r}"
        )
    if not isinstance(flow_direction, FlowDirection):
        raise ValueError(
            "flow_direction is required and must be a FlowDirection"
        )
    if flow_direction.routing not in ("d8", "rho8"):
        raise ValueError(
            f"order currently supports single-direction routing only; got "
            f"{flow_direction.routing!r}"
        )
    stream_mask = self.read_array().astype(bool, copy=False)
    fdir = flow_direction.read_array().astype(np.int32, copy=False)
    if stream_mask.shape != fdir.shape:
        raise ValueError(
            f"flow_direction shape {fdir.shape} != stream raster shape "
            f"{stream_mask.shape}"
        )
    if method == "strahler":
        arr = strahler(stream_mask, fdir)
    elif method == "shreve":
        arr = shreve(stream_mask, fdir)
    elif method == "horton":
        arr = horton(stream_mask, fdir)
    elif method == "hack":
        arr = hack(stream_mask, fdir)
    else:
        arr = topological(stream_mask, fdir)
    plain = Dataset.create_from_array(
        arr, geo=self.geotransform, epsg=self.epsg, no_data_value=0,
    )
    return StreamRaster.from_dataset(
        plain, threshold=self.threshold, routing=self.routing
    )

prune_short(flow_direction, min_length_m) #

Drop headwater links shorter than min_length_m.

A headwater link runs from a head (in-degree 0) downstream until it hits either a confluence (in-degree ≥ 2) or an outlet (no downstream stream neighbour). The link's planimetric length is the sum of per-step distances using cell_size for cardinal moves and cell_size * sqrt(2) for diagonals.

Internal links (between two confluences) are left untouched — pruning them would create dangles and break the network topology. To prune deep into the network repeatedly, call prune_short multiple times.

Parameters:

Name Type Description Default
flow_direction

Single-direction (d8 / rho8) FlowDirection aligned to this stream raster.

required
min_length_m float

Length threshold in map units. Headwater links with planimetric length strictly less than this value are removed.

required

Returns:

Type Description
'StreamRaster'

A new StreamRaster carrying the same routing and threshold

'StreamRaster'

tags; the underlying raster is uint8 (1 at surviving stream

'StreamRaster'

cells, 0 elsewhere).

Raises:

Type Description
TypeError

If flow_direction is not a FlowDirection.

ValueError

If flow_direction is multi-direction or shape mismatches, or min_length_m is negative.

Source code in src/digitalrivers/stream_raster.py
def prune_short(
    self,
    flow_direction,
    min_length_m: float,
) -> "StreamRaster":
    """Drop headwater links shorter than `min_length_m`.

    A **headwater link** runs from a head (in-degree 0) downstream until
    it hits either a confluence (in-degree ≥ 2) or an outlet (no
    downstream stream neighbour). The link's planimetric length is the
    sum of per-step distances using `cell_size` for cardinal moves and
    `cell_size * sqrt(2)` for diagonals.

    Internal links (between two confluences) are left untouched — pruning
    them would create dangles and break the network topology. To prune
    deep into the network repeatedly, call `prune_short` multiple times.

    Args:
        flow_direction: Single-direction (`d8` / `rho8`) `FlowDirection`
            aligned to this stream raster.
        min_length_m: Length threshold in map units. Headwater links with
            planimetric length strictly less than this value are removed.

    Returns:
        A new `StreamRaster` carrying the same `routing` and `threshold`
        tags; the underlying raster is `uint8` (1 at surviving stream
        cells, 0 elsewhere).

    Raises:
        TypeError: If `flow_direction` is not a `FlowDirection`.
        ValueError: If `flow_direction` is multi-direction or shape
            mismatches, or `min_length_m` is negative.
    """
    import numpy as np

    from digitalrivers._streams.order import (
        _DIR_DR,
        _DIR_DC,
        _build_topology,
    )
    from digitalrivers.flow_direction import FlowDirection

    if not isinstance(flow_direction, FlowDirection):
        raise TypeError(
            f"flow_direction must be a FlowDirection; got "
            f"{type(flow_direction).__name__}"
        )
    if flow_direction.routing not in ("d8", "rho8"):
        raise ValueError(
            f"prune_short supports single-direction routing only; got "
            f"{flow_direction.routing!r}"
        )
    if min_length_m < 0:
        raise ValueError(
            f"min_length_m must be non-negative; got {min_length_m!r}"
        )

    sm = self.read_array().astype(bool, copy=False).copy()
    fdir = flow_direction.read_array().astype(np.int32, copy=False)
    if sm.shape != fdir.shape:
        raise ValueError(
            f"flow_direction shape {fdir.shape} != stream raster shape "
            f"{sm.shape}"
        )

    indeg, _ds_idx = _build_topology(sm, fdir)
    rows, cols = sm.shape
    cs = float(self.cell_size)
    diag = cs * (2.0 ** 0.5)

    # Trace every headwater link and remove its cells if too short.
    head_locs = list(zip(*np.where(sm & (indeg == 0))))
    for r0, c0 in head_locs:
        path: list[tuple[int, int]] = [(int(r0), int(c0))]
        length = 0.0
        r, c = int(r0), int(c0)
        while True:
            d = int(fdir[r, c])
            if d < 0 or d > 7:
                break
            nr, nc = r + int(_DIR_DR[d]), c + int(_DIR_DC[d])
            if not (0 <= nr < rows and 0 <= nc < cols):
                break
            if not sm[nr, nc]:
                break
            step = diag if int(_DIR_DR[d]) and int(_DIR_DC[d]) else cs
            length += step
            if indeg[nr, nc] >= 2:
                # (nr, nc) is the downstream confluence — link ends BEFORE
                # this cell so the confluence stays in the network.
                break
            path.append((nr, nc))
            r, c = nr, nc
        if length < min_length_m:
            for pr, pc in path:
                sm[pr, pc] = False

    plain = Dataset.create_from_array(
        sm.astype(np.uint8), geo=self.geotransform, epsg=self.epsg,
        no_data_value=0,
    )
    return StreamRaster.from_dataset(
        plain, threshold=self.threshold, routing=self.routing,
    )

main_stem(flow_direction, outlet=None) #

Return a binary mask of cells along the catchment's main stem.

The main stem is the longest flow path from any outlet to any head. Starting at an outlet, the walk picks the upstream stream neighbour with the greatest upstream flow length at every junction; ties are broken by lower row-major linear index for determinism.

When outlet is None, the catchment's main outlet is the stream outlet carrying the longest upstream flow length (i.e. the basin whose longest source-to-outlet path is largest).

Parameters:

Name Type Description Default
flow_direction

Single-direction (d8 / rho8) FlowDirection aligned to this stream raster.

required
outlet tuple[int, int] | None

Optional (row, col) of a specific outlet cell. The cell must be a stream cell — typically with fdir = -1 or whose D8 receiver is non-stream. If None, the longest outlet is used.

None

Returns:

Type Description
ndarray

(rows, cols) bool array — True for cells on the main stem,

ndarray

False elsewhere.

Raises:

Type Description
TypeError

If flow_direction is not a FlowDirection.

ValueError

If flow_direction is multi-direction, shape mismatches, or outlet is not a stream cell.

Source code in src/digitalrivers/stream_raster.py
def main_stem(
    self,
    flow_direction,
    outlet: tuple[int, int] | None = None,
) -> np.ndarray:
    """Return a binary mask of cells along the catchment's main stem.

    The main stem is the longest flow path from any outlet to any head.
    Starting at an outlet, the walk picks the upstream stream neighbour
    with the greatest upstream flow length at every junction; ties are
    broken by lower row-major linear index for determinism.

    When `outlet` is `None`, the catchment's main outlet is the stream
    outlet carrying the longest upstream flow length (i.e. the basin
    whose longest source-to-outlet path is largest).

    Args:
        flow_direction: Single-direction (`d8` / `rho8`) `FlowDirection`
            aligned to this stream raster.
        outlet: Optional `(row, col)` of a specific outlet cell. The cell
            must be a stream cell — typically with `fdir = -1` or whose
            D8 receiver is non-stream. If `None`, the longest outlet is
            used.

    Returns:
        `(rows, cols)` bool array — True for cells on the main stem,
        False elsewhere.

    Raises:
        TypeError: If `flow_direction` is not a `FlowDirection`.
        ValueError: If `flow_direction` is multi-direction, shape
            mismatches, or `outlet` is not a stream cell.
    """
    import numpy as np

    from digitalrivers._streams.order import (
        _DIR_DR,
        _DIR_DC,
        _INV_DIR,
        _build_topology,
        _stream_outlets,
        _upstream_length_from_head,
    )
    from digitalrivers.flow_direction import FlowDirection

    if not isinstance(flow_direction, FlowDirection):
        raise TypeError(
            f"flow_direction must be a FlowDirection; got "
            f"{type(flow_direction).__name__}"
        )
    if flow_direction.routing not in ("d8", "rho8"):
        raise ValueError(
            f"main_stem supports single-direction routing only; got "
            f"{flow_direction.routing!r}"
        )

    sm = self.read_array().astype(bool, copy=False)
    fdir = flow_direction.read_array().astype(np.int32, copy=False)
    if sm.shape != fdir.shape:
        raise ValueError(
            f"flow_direction shape {fdir.shape} != stream raster shape "
            f"{sm.shape}"
        )

    indeg, _ds_idx = _build_topology(sm, fdir)
    length = _upstream_length_from_head(sm, fdir, indeg)

    if outlet is None:
        outlets = _stream_outlets(sm, fdir)
        if not outlets:
            return np.zeros_like(sm)
        outlet = max(outlets, key=lambda rc: int(length[rc]))
    else:
        r0, c0 = outlet
        if not (0 <= r0 < sm.shape[0] and 0 <= c0 < sm.shape[1]):
            raise ValueError(f"outlet {outlet!r} is outside the raster")
        if not sm[r0, c0]:
            raise ValueError(f"outlet {outlet!r} is not a stream cell")

    rows, cols = sm.shape
    mask = np.zeros_like(sm)
    r, c = int(outlet[0]), int(outlet[1])
    while True:
        mask[r, c] = True
        # Find inflow neighbours (cells whose D8 receiver is (r, c)).
        best: tuple[int, int, int, int] | None = None  # (length, lin, r, c)
        for k in range(8):
            dr = int(_DIR_DR[k])
            dc = int(_DIR_DC[k])
            ur, uc = r + dr, c + dc
            if not (0 <= ur < rows and 0 <= uc < cols):
                continue
            if not sm[ur, uc]:
                continue
            if int(fdir[ur, uc]) != int(_INV_DIR[k]):
                continue
            cand = (int(length[ur, uc]), -(ur * cols + uc), ur, uc)
            if best is None or cand > best:
                best = cand
        if best is None:
            break
        r, c = best[2], best[3]
    return mask

to_vector(flow_direction, dem=None, single_direction='max') #

Vectorise the stream raster into a GeoDataFrame of LineString links.

Walks the flow-direction raster from every head and every cell downstream of a confluence until it reaches a confluence or an outlet. Each resulting LineString is one stream link.

Parameters:

Name Type Description Default
flow_direction

FlowDirection raster aligned to this stream raster. Must be a single-direction routing (d8 or rho8) — multi-direction inputs raise. (D∞ / MFD inputs would need a separate dominant-direction collapse, deferred.)

required
dem

Optional DEM aligned to the stream raster. When supplied, the link attributes include drop_m and mean_slope.

None
single_direction str

Reserved for future multi-direction collapse ("max" for argmax-of-fractions; "weighted" for weighted-mean direction). Ignored when flow_direction is already single-direction.

'max'

Returns:

Type Description

geopandas.GeoDataFrame with columns: - link_id (int64): 0-based sequential link identifier. - from_node (int64): node ID at the upstream end (head / confluence). - to_node (int64): node ID at the downstream end. - length_m (float64): sum of per-step distances using the D8 grid-lengths lookup (cell_size cardinal, cell_size * sqrt(2) diagonal). - drop_m (float64): z[from] - z[to] (positive if the link descends; clamped to 0 otherwise). NaN if dem is None. - mean_slope (float64): drop_m / length_m (m/m). NaN if dem is None or length_m == 0. - sinuosity (float64): length_m / straight_line_distance between the link's two endpoints (≥ 1.0 for non-degenerate links; exactly 1.0 for straight links and single-cell degenerate links). - geometry: shapely LineString in the dataset's CRS, vertices at cell centres.

Raises:

Type Description
ValueError

If flow_direction is multi-direction.

ValueError

If shapes do not match.

Source code in src/digitalrivers/stream_raster.py
def to_vector(
    self,
    flow_direction,
    dem=None,
    single_direction: str = "max",
):
    """Vectorise the stream raster into a `GeoDataFrame` of LineString links.

    Walks the flow-direction raster from every head and every cell
    downstream of a confluence until it reaches a confluence or an outlet.
    Each resulting LineString is one stream link.

    Args:
        flow_direction: `FlowDirection` raster aligned to this stream
            raster. Must be a single-direction routing (`d8` or `rho8`)
            — multi-direction inputs raise. (D∞ / MFD inputs would need a
            separate dominant-direction collapse, deferred.)
        dem: Optional `DEM` aligned to the stream raster. When supplied,
            the link attributes include `drop_m` and `mean_slope`.
        single_direction: Reserved for future multi-direction collapse
            (`"max"` for argmax-of-fractions; `"weighted"` for
            weighted-mean direction). Ignored when `flow_direction` is
            already single-direction.

    Returns:
        `geopandas.GeoDataFrame` with columns:
          - `link_id` (int64): 0-based sequential link identifier.
          - `from_node` (int64): node ID at the upstream end (head /
            confluence).
          - `to_node` (int64): node ID at the downstream end.
          - `length_m` (float64): sum of per-step distances using the
            D8 grid-lengths lookup (`cell_size` cardinal,
            `cell_size * sqrt(2)` diagonal).
          - `drop_m` (float64): `z[from] - z[to]` (positive if the
            link descends; clamped to 0 otherwise). NaN if `dem` is
            None.
          - `mean_slope` (float64): `drop_m / length_m` (m/m). NaN if
            `dem` is None or `length_m == 0`.
          - `sinuosity` (float64): `length_m / straight_line_distance`
            between the link's two endpoints (≥ 1.0 for non-degenerate
            links; exactly 1.0 for straight links and single-cell
            degenerate links).
          - `geometry`: shapely `LineString` in the dataset's CRS,
            vertices at cell centres.

    Raises:
        ValueError: If `flow_direction` is multi-direction.
        ValueError: If shapes do not match.
    """
    import geopandas as gpd
    import numpy as np
    from shapely.geometry import LineString

    from digitalrivers.flow_direction import FlowDirection  # for type-narrow

    if not isinstance(flow_direction, FlowDirection):
        raise TypeError(
            f"flow_direction must be a FlowDirection; got {type(flow_direction).__name__}"
        )
    if flow_direction.routing not in ("d8", "rho8"):
        raise ValueError(
            f"to_vector currently supports single-direction routing only; "
            f"got {flow_direction.routing!r}. Collapse the flow direction to "
            f"D8 first."
        )

    fdir = flow_direction.read_array().astype(np.int32, copy=False)
    stream_mask = self.read_array().astype(bool, copy=False)
    if fdir.shape != stream_mask.shape:
        raise ValueError(
            f"flow_direction shape {fdir.shape} != stream raster shape "
            f"{stream_mask.shape}"
        )

    if dem is not None:
        z = dem.read_array().astype(np.float64, copy=False)
        no_val = dem.no_data_value[0] if dem.no_data_value else None
        if no_val is not None:
            z = np.where(z == no_val, np.nan, z)
        if z.shape != stream_mask.shape:
            raise ValueError(
                f"dem shape {z.shape} != stream raster shape {stream_mask.shape}"
            )
    else:
        z = None

    # 8-direction offsets matching DIR_OFFSETS (0=S, 1=SW, ..., 7=SE).
    d_row = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)
    d_col = np.array([0, -1, -1, -1, 0, 1, 1, 1], dtype=np.int32)
    # Inverse direction: if cell at offset (dr, dc) has direction = inv[k],
    # it is flowing INTO us.
    inv_dir = np.array([4, 5, 6, 7, 0, 1, 2, 3], dtype=np.int32)
    grid_lengths = np.array(
        [1.0, np.sqrt(2.0), 1.0, np.sqrt(2.0),
         1.0, np.sqrt(2.0), 1.0, np.sqrt(2.0)],
        dtype=np.float64,
    ) * float(self.cell_size)

    rows, cols = stream_mask.shape

    # Step 1 — incoming-stream count per stream cell.
    nup = np.zeros(stream_mask.shape, dtype=np.int8)
    for k in range(8):
        dr = int(d_row[k])
        dc = int(d_col[k])
        src_r = slice(max(0, dr), min(rows, rows + dr))
        src_c = slice(max(0, dc), min(cols, cols + dc))
        dst_r = slice(max(0, -dr), min(rows, rows - dr))
        dst_c = slice(max(0, -dc), min(cols, cols - dc))
        sm_src = stream_mask[src_r, src_c]
        fd_src = fdir[src_r, src_c]
        # A neighbour at (src) points into (dst) iff its direction equals inv[k].
        inflow = sm_src & (fd_src == inv_dir[k]) & stream_mask[dst_r, dst_c]
        nup[dst_r, dst_c] += inflow.astype(np.int8)

    # Step 2 — find link starts.
    heads_or_confluences_mask = stream_mask & ((nup == 0) | (nup >= 2))

    # Step 3 — walk each link.
    def _trace(start_r: int, start_c: int):
        path = [(start_r, start_c)]
        r, c = start_r, start_c
        length = 0.0
        while True:
            d = int(fdir[r, c])
            if d < 0 or d > 7:
                break  # sink / outlet
            nr = r + int(d_row[d])
            nc = c + int(d_col[d])
            if not (0 <= nr < rows and 0 <= nc < cols):
                break
            if not stream_mask[nr, nc]:
                break
            length += float(grid_lengths[d])
            path.append((nr, nc))
            if nup[nr, nc] >= 2:
                break
            r, c = nr, nc
        return path, length

    # Assign node IDs: every distinct head / confluence / link-end is a node.
    node_id_grid = np.full(stream_mask.shape, -1, dtype=np.int64)
    next_node_id = 0

    def _get_node_id(r: int, c: int) -> int:
        nonlocal next_node_id
        if node_id_grid[r, c] < 0:
            node_id_grid[r, c] = next_node_id
            next_node_id += 1
        return int(node_id_grid[r, c])

    gt = self.geotransform
    records: list[dict] = []
    link_id = 0
    starts = np.argwhere(heads_or_confluences_mask)
    for r0, c0 in starts:
        r0 = int(r0)
        c0 = int(c0)
        # Skip a confluence cell if it has no outgoing direction (it's an outlet
        # confluence — handled when its upstream link arrives, no link begins here).
        d_start = int(fdir[r0, c0])
        if d_start < 0 or d_start > 7:
            continue
        path, length_m = _trace(r0, c0)
        if len(path) < 2:
            continue
        from_node = _get_node_id(r0, c0)
        r_end, c_end = path[-1]
        to_node = _get_node_id(r_end, c_end)

        xs = [gt[0] + (c + 0.5) * gt[1] + (r + 0.5) * gt[2] for r, c in path]
        ys = [gt[3] + (c + 0.5) * gt[4] + (r + 0.5) * gt[5] for r, c in path]
        geom = LineString(zip(xs, ys))

        # Sinuosity = traced length / straight-line distance between endpoints.
        # Degenerate (single-cell or zero-length) links default to 1.0 — straight.
        straight_m = float(((xs[-1] - xs[0]) ** 2 + (ys[-1] - ys[0]) ** 2) ** 0.5)
        if straight_m > 0.0:
            sinuosity = length_m / straight_m
        else:
            sinuosity = 1.0

        if z is not None:
            z_from = float(z[r0, c0])
            z_to = float(z[r_end, c_end])
            drop_m = max(0.0, z_from - z_to)
            mean_slope = drop_m / length_m if length_m > 0 else np.nan
        else:
            drop_m = np.nan
            mean_slope = np.nan

        records.append({
            "link_id": link_id,
            "from_node": from_node,
            "to_node": to_node,
            "length_m": length_m,
            "drop_m": drop_m,
            "mean_slope": mean_slope,
            "sinuosity": sinuosity,
            "geometry": geom,
        })
        link_id += 1

    crs = self.epsg
    return gpd.GeoDataFrame(records, geometry="geometry", crs=crs)