Skip to content

DEM#

DEM processing — sink filling, D8 flow direction, flow accumulation, slope, and basin filtering. Subclasses pyramids.dataset.Dataset, so all pyramids methods are inherited.

digitalrivers.dem.DEM #

Bases: Dataset

Digital Elevation Model processor.

Wraps a GDAL raster dataset and adds hydrological analysis methods: sink filling, D8 flow direction, flow accumulation, and slope computation.

Parameters:

Name Type Description Default
src Dataset

GDAL dataset containing a single-band elevation raster.

required
access str

"read_only" (default) or "write".

'read_only'
Source code in src/digitalrivers/dem.py
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
class DEM(Dataset):
    """Digital Elevation Model processor.

    Wraps a GDAL raster dataset and adds hydrological analysis methods:
    sink filling, D8 flow direction, flow accumulation, and slope
    computation.

    Args:
        src: GDAL dataset containing a single-band elevation raster.
        access: `"read_only"` (default) or `"write"`.
    """

    def __init__(self, src: gdal.Dataset, access: str = "read_only"):
        super().__init__(src, access)

    @property
    def values(self):
        """Elevation array with no-data cells replaced by `np.nan`.

        Reads band 0 as `float32` and masks every cell whose value is
        close to the raster's no-data value (relative tolerance 1e-5).

        Returns:
            np.ndarray: 2-D `float32` array of shape `(rows, columns)`.
        """
        values = self.read_array(band=0).astype(np.float32)
        # get the value stores in no data value cells
        no_val = self.no_data_value[0]
        values[np.isclose(values, no_val, rtol=0.00001)] = np.nan
        return values

    def fill_depressions(
        self,
        method: str = "priority_flood",
        epsilon: float = 0.0,
        inplace: bool = False,
    ) -> DEM | None:
        """Fill closed depressions in the DEM.

        Three algorithms are available via the `method` argument:

        * `"priority_flood"` (default) — Barnes, Lehman & Mulla (2014) Priority-Flood
          with the two-queue plateau optimisation. With `epsilon == 0` it produces flat
          fills; with `epsilon > 0` it produces a strictly monotonic surface (every cell
          has at least one strictly lower neighbour along the flood path) at the cost of
          a small elevation inflation proportional to plateau width.
        * `"wang_liu"` — Wang & Liu (2006). Flat fill, no epsilon. Equivalent in output
          to `priority_flood` with `epsilon == 0`; kept as a named alternative for
          callers who plan to resolve flats explicitly afterwards (P4).
        * `"planchon_darboux"` — Planchon & Darboux (2002). Iterative directional-sweep
          algorithm. Slower than Priority-Flood on large DEMs; kept as a low-relief
          reference. Requires `epsilon > 0`.

        No-data handling is uniform across methods: cells flagged no-data act as outlets
        (they cannot be filled, and data cells adjacent to them are seeded as drainage
        sources alongside the true raster boundary).

        **Precision note.** Priority-flood / planchon-darboux compute the cumulative lift
        in float64 but the output is cast back to the input dtype. For `float32` DEMs
        with `epsilon` in the `0.1`-class on wide plateaus, the accumulated lift can
        approach float32's relative precision near the spill elevation and very long
        plateaus may underflow to identical filled values. Prefer `float64` inputs
        when running with `epsilon > 0` and large depressions; `wang_liu` /
        `epsilon=0` are immune.

        Args:
            method: One of `"priority_flood"`, `"wang_liu"`, `"planchon_darboux"`.
            epsilon: Per-step elevation lift inside depressions. `0.0` (default for
                `priority_flood`) returns a non-strictly-decreasing surface — flats
                remain flat. Positive values guarantee a unique downhill path at the
                cost of slight elevation inflation. `planchon_darboux` requires
                `epsilon > 0`.
            inplace: If `True` the current instance is updated in place and `None`
                is returned. If `False` (default) a new `DEM` is returned.

        Returns:
            DEM | None: A new `DEM` containing the filled elevation, or `None` when
            `inplace` is `True`.

        Raises:
            ValueError: If `method` is unknown, or `planchon_darboux` is requested
                with `epsilon <= 0`.
        """
        elev = self.values
        nodata_mask = np.isnan(elev)
        z_fill = _fill_depressions_array(
            elev.astype(np.float64, copy=False),
            nodata_mask=nodata_mask,
            method=method,
            epsilon=epsilon,
        )
        # Restore the original raster's no-data sentinel (the array carries NaN; the
        # GeoTIFF needs the numeric sentinel).
        no_val = self.no_data_value[0]
        z_fill[nodata_mask] = no_val

        # Build a plain Dataset (cls=Dataset so we don't get a DEM via cls(...)), then
        # wrap with the typed DEM. This mirrors the pattern used in flow_direction().
        plain_ds = Dataset.dataset_like(self, z_fill.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def breach_depressions(
        self,
        method: str = "least_cost",
        max_depth: float | None = None,
        max_length: int | None = None,
        fill_remaining: bool = True,
        inplace: bool = False,
    ) -> DEM | None:
        """Breach depressions in the DEM (Lindsay 2016 family).

        Breaching is the structural alternative to filling: instead of raising the pit
        floor, it cuts a channel through the lowest barrier between the pit and an
        outlet. On LiDAR DEMs this is usually more realistic — most internal pits are
        data artefacts and the natural drainage path is preserved by cutting the artefact
        away rather than inflating the surrounding terrain.

        Three methods are available via the `method` argument:

        * `"single_cell"` — cheap O(n) preprocessing pass that resolves isolated 1-cell
          pits by lowering an intermediate first-order neighbour to the midpoint of the
          pit and a lower second-order cell. Does nothing if no such configuration exists.
        * `"least_cost"` (default) — Lindsay 2016 Dijkstra-from-each-pit. Carves a
          strictly monotonic channel from the pit to the nearest outlet. Optional
          `max_depth` and `max_length` constraints abort the breach for any pit whose
          channel would exceed them; aborted pits are left unresolved.
        * `"hybrid"` — try `least_cost` first; pits that fail their constraint fall
          back to the Priority-Flood depression fill (P2). The breach phase has already
          lowered parts of the DEM where partial breaching occurred, so the fill operates
          on a modified surface and produces less overall lift than fill-only.

        No-data cells act as free outlets — any Dijkstra path that reaches a no-data cell
        terminates the search.

        Args:
            method: One of `"single_cell"`, `"least_cost"`, `"hybrid"`.
            max_depth: Maximum cumulative `|Δz|` for a single breach path. `None`
                disables the constraint.
            max_length: Maximum path length in cells. `None` disables.
            fill_remaining: Only meaningful when `method="hybrid"`. If `True`
                (default), unresolved pits are passed to Priority-Flood with
                `epsilon=0`. If `False`, they are left as pits in the output.
            inplace: If `True` the current instance is updated in place and `None` is
                returned. If `False` (default) a new `DEM` is returned.

        Returns:
            DEM | None: A new `DEM` containing the breached elevation, or `None` when
            `inplace` is `True`.

        Raises:
            ValueError: If `method` is unknown.
        """
        elev = self.values
        nodata_mask = np.isnan(elev)
        z_out = _breach_depressions_array(
            elev.astype(np.float64, copy=False),
            nodata_mask=nodata_mask,
            method=method,
            max_depth=max_depth,
            max_length=max_length,
            fill_remaining=fill_remaining,
        )
        no_val = self.no_data_value[0]
        z_out[np.isnan(z_out)] = no_val
        plain_ds = Dataset.dataset_like(self, z_out.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def resolve_flats(
        self,
        max_iter: int = 1000,
        epsilon: float = 1e-5,
        connectivity: int = 8,
        inplace: bool = False,
    ) -> DEM | None:
        """Impose a deterministic gradient on every flat plateau in the DEM.

        After `fill_depressions(method="wang_liu")` (or `"priority_flood"` with
        `epsilon=0`), every closed depression is filled to its spill elevation — but the
        interior of each filled depression is a flat plateau with no defined steepest
        descent, so D8 flow direction over the result has `NO_FLOW` cells across every
        plateau. `resolve_flats` nudges those cells so each has a unique downhill
        neighbour: combined Garbrecht & Martz (1997) gradient — drain *towards* the
        nearest outlet (LEC) with a tiebreak that drains *away from* the nearest rim
        (HEC). The towards-lower gradient is weighted `2x` so it dominates and the
        away-from-higher gradient acts as a deterministic tiebreaker.

        Plateaus without a low-edge cell (closed depressions that survived the fill — they
        should not exist if you ran `fill_depressions` first) are left untouched.

        Args:
            max_iter: Safety cap on BFS levels per plateau. Real plateaus rarely exceed
                `max(rows, cols)`; the default `1000` is essentially unbounded.
            epsilon: Per-BFS-step elevation lift. Total lift over a plateau is at most
                `(2 * max_high_dist + max_low_dist) * epsilon`; choose small enough
                that this stays well below the minimum elevation step between adjacent
                non-plateau cells. Default `1e-5` is safe for ~1000-cell-wide plateaus.
            connectivity: 4 or 8. Controls plateau-labelling and BFS step direction;
                LEC/HEC classification always uses 8-connectivity (Garbrecht-Martz
                convention). Default is 8.
            inplace: If `True` the current instance is updated in place and `None` is
                returned. If `False` (default) a new `DEM` is returned.

        Returns:
            DEM | None: A new `DEM` with flat plateaus resolved, or `None` when
            `inplace` is `True`.

        Raises:
            ValueError: If `connectivity` is not 4 or 8.
        """
        elev = self.values
        nodata_mask = np.isnan(elev)
        z_out = _resolve_flats_array(
            elev.astype(np.float64, copy=False),
            nodata_mask=nodata_mask,
            epsilon=epsilon,
            connectivity=connectivity,
            max_iter=max_iter,
        )
        no_val = self.no_data_value[0]
        z_out[np.isnan(z_out)] = no_val
        plain_ds = Dataset.dataset_like(self, z_out.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def hand(
        self,
        streams,
        flow_direction=None,
        *,
        method: str = "d8",
    ) -> Dataset:
        """Compute Height Above Nearest Drainage (Rennó 2008 / Nobre 2011).

        Two methods are supported:

        * **`"d8"` (default)** — follows the D8 / Rho8 flow-direction raster
          downstream from every cell until it reaches a stream cell, and
          assigns `elev[cell] - elev[stream_cell]`. Orphans / sinks / no-data
          cells whose flow path never reaches a stream are NaN.
        * **`"euclidean"`** — for every cell, the nearest stream cell in 2-D
          space (Euclidean distance) is used as the reference. Cheaper than
          D8-HAND because there is no path tracing, but it does the wrong
          thing across ridges (a cell can be 2-D-closer to a stream in a
          different basin). Requires `scipy.ndimage`.

        Args:
            streams: `StreamRaster` aligned to this DEM. Only the underlying
                stream mask is read.
            flow_direction: Single-direction `FlowDirection` (`d8` /
                `rho8`) aligned to this DEM. Required for `method="d8"`;
                ignored for `method="euclidean"`.
            method: `"d8"` (default) or `"euclidean"`.

        Returns:
            `Dataset` containing the float32 HAND raster. No-data cells use
            this DEM's no-data sentinel.

        Raises:
            ValueError: If `method` is unknown, shapes do not match, or
                `flow_direction` is missing / multi-direction for the D8
                method.
        """
        from digitalrivers.stream_raster import StreamRaster

        if method not in ("d8", "euclidean"):
            raise ValueError(
                f"method must be 'd8' or 'euclidean'; got {method!r}"
            )
        if not isinstance(streams, StreamRaster):
            raise ValueError("streams must be a StreamRaster instance")

        if method == "d8":
            return self._hand_d8(streams, flow_direction)
        return self._hand_euclidean(streams)

    def _hand_d8(self, streams, flow_direction) -> Dataset:
        """D8-traced HAND — original Rennó-style implementation."""
        from digitalrivers._streams.hand import hand_d8
        from digitalrivers.flow_direction import FlowDirection

        if not isinstance(flow_direction, FlowDirection):
            raise ValueError(
                "flow_direction must be a FlowDirection instance for method='d8'"
            )
        if flow_direction.routing not in ("d8", "rho8"):
            raise ValueError(
                f"hand currently supports single-direction routing only; "
                f"got {flow_direction.routing!r}"
            )

        elev = self.values
        fdir = flow_direction.read_array().astype(np.int32, copy=False)
        stream_arr = streams.read_array().astype(bool, copy=False)
        if not (elev.shape == fdir.shape == stream_arr.shape):
            raise ValueError(
                f"Shape mismatch: dem={elev.shape}, flow_direction="
                f"{fdir.shape}, streams={stream_arr.shape}"
            )

        hand_arr = hand_d8(elev, fdir, stream_arr).astype(np.float32, copy=False)
        no_val = float(self.no_data_value[0])
        hand_arr = np.where(np.isnan(hand_arr), no_val, hand_arr)
        return Dataset.create_from_array(
            hand_arr,
            geo=self.geotransform,
            epsg=self.epsg,
            no_data_value=no_val,
        )

    def _hand_euclidean(self, streams) -> Dataset:
        """Euclidean-nearest-stream HAND — no flow direction required."""
        import warnings
        from scipy.ndimage import distance_transform_edt

        elev = self.values
        stream_arr = streams.read_array().astype(bool, copy=False)
        if elev.shape != stream_arr.shape:
            raise ValueError(
                f"Shape mismatch: dem={elev.shape}, streams={stream_arr.shape}"
            )
        # Drop stream cells that sit on no-data DEM positions. If we don't,
        # `distance_transform_edt` will happily point at them and the
        # resulting per-cell elevation lookup returns NaN — silently
        # corrupting HAND over a swath around the bad pixel.
        valid_elev = ~np.isnan(elev)
        dropped = int((stream_arr & ~valid_elev).sum())
        if dropped:
            warnings.warn(
                f"{dropped} stream cell(s) coincide with DEM no-data; "
                f"dropping them before the Euclidean nearest-stream lookup.",
                UserWarning,
                stacklevel=3,
            )
            stream_arr = stream_arr & valid_elev
        if not stream_arr.any():
            raise ValueError(
                "streams raster contains no stream cells with valid elevation "
                "— HAND is undefined"
            )
        # distance_transform_edt with return_indices returns (distance, indices),
        # where indices[*, r, c] is the (row, col) of the nearest True (stream)
        # cell from (r, c). We want the *complement* of stream_arr because
        # the EDT measures distance to the nearest False cell.
        _, (ri, ci) = distance_transform_edt(~stream_arr, return_indices=True)
        nearest_elev = elev[ri, ci]
        hand_arr = (elev - nearest_elev).astype(np.float32, copy=False)
        no_val = float(self.no_data_value[0])
        hand_arr = np.where(np.isnan(hand_arr), no_val, hand_arr)
        return Dataset.create_from_array(
            hand_arr,
            geo=self.geotransform,
            epsg=self.epsg,
            no_data_value=no_val,
        )

    def full_hydro_pipeline(
        self,
        *,
        fill_method: str = "priority_flood",
        flow_method: str = "d8",
        stream_threshold_cells: int | None = None,
    ) -> dict:
        """Composite: fill → flow_direction → accumulate (→ optional streams).

        Convenience entry point that chains the four most common steps of a
        DEM-hydrology pre-processing pipeline. Equivalent to:

        ```python
        filled = dem.fill_depressions(method=fill_method)
        fdir = filled.flow_direction(method=flow_method)
        acc = fdir.accumulate()
        streams = acc.streams(threshold=stream_threshold_cells)  # if provided
        ```

        Args:
            fill_method: Argument forwarded to `fill_depressions`. Defaults
                to `"priority_flood"`.
            flow_method: Argument forwarded to `flow_direction`. Defaults to
                `"d8"`.
            stream_threshold_cells: Optional accumulation threshold (in
                cells). When supplied, a `StreamRaster` is also returned in
                the result dict under the `"streams"` key. When None, the
                streams step is skipped.

        Returns:
            `dict` with keys `"filled_dem"` (DEM), `"flow_direction"`
            (FlowDirection), and `"accumulation"` (Accumulation); plus an
            optional `"streams"` (StreamRaster) when
            `stream_threshold_cells` is supplied.
        """
        filled = self.fill_depressions(method=fill_method)
        fdir = filled.flow_direction(method=flow_method)
        acc = fdir.accumulate()
        out: dict = {
            "filled_dem": filled,
            "flow_direction": fdir,
            "accumulation": acc,
        }
        if stream_threshold_cells is not None:
            out["streams"] = acc.streams(threshold=stream_threshold_cells)
        return out

    def stochastic_depressions(
        self,
        sigma: float,
        n_runs: int = 100,
        *,
        seed: int | None = None,
        method: str = "priority_flood",
    ) -> Dataset:
        """Per-cell depression-occurrence probability via Monte-Carlo.

        Adds Gaussian noise (zero-mean, supplied `sigma`) to the DEM, runs
        depression detection on each noisy realisation, and aggregates the
        per-cell probability across `n_runs` realisations. Cells with high
        probability are robust depressions; low probability cells are likely
        noise artefacts of a noisy DEM.

        Args:
            sigma: Standard deviation of the Gaussian noise in DEM elevation
                units. Must be non-negative. A reasonable choice is the DEM's
                stated vertical error.
            n_runs: Number of Monte-Carlo realisations. Must be positive.
                Defaults to 100.
            seed: Optional seed for the random number generator. Pass an
                integer for reproducible results.
            method: Fill-depressions algorithm passed through to
                `fill_depressions` for each realisation. Defaults to
                `"priority_flood"`.

        Returns:
            `Dataset` of float32 occurrence probabilities in `[0.0, 1.0]`,
            aligned to this DEM. No-data sentinel `-1.0`.

        Raises:
            ValueError: If `sigma` is negative or `n_runs` is not positive.
        """
        if sigma < 0:
            raise ValueError(f"sigma must be non-negative; got {sigma!r}")
        if n_runs <= 0:
            raise ValueError(f"n_runs must be positive; got {n_runs!r}")

        from digitalrivers._conditioning.pitremoval import (
            fill_depressions as _fill_depressions_kernel,
        )

        rng = np.random.default_rng(seed)
        elev = self.values
        # Pre-compute the no-data mask once — it doesn't change between
        # realisations since the noise only perturbs valid cells.
        nodata_mask = np.isnan(elev)
        prob = np.zeros(elev.shape, dtype=np.float32)
        for _ in range(int(n_runs)):
            noise = rng.normal(0.0, sigma, size=elev.shape).astype(
                elev.dtype, copy=False
            )
            noisy = elev + noise
            # Call the kernel directly — no GDAL Dataset wrapping inside the
            # Monte-Carlo loop. The kernel accepts a plain ndarray and an
            # optional nodata_mask.
            filled = _fill_depressions_kernel(
                noisy, nodata_mask=nodata_mask, method=method,
            )
            depr = (filled - noisy) > 0
            prob += depr.astype(np.float32)
        prob /= float(n_runs)
        return Dataset.create_from_array(
            prob,
            geo=self.geotransform,
            epsg=self.epsg,
            no_data_value=-1.0,
        )

    def burn_streams(
        self,
        streams,
        method: str = "fill_burn",
        *,
        sharp: float = 10.0,
        smooth: float = 2.0,
        buffer_cells: int = 5,
        constant_drop: float = 1.0,
        max_breach_depth: float | None = None,
        max_breach_length: int | None = None,
        inplace: bool = False,
    ) -> DEM | None:
        """Condition the DEM by burning a vector stream network into it.

        Three methods are specified by P20; this implementation ships
        `"fill_burn"` (Lindsay 2018 — used by WhiteboxTools' FillBurn) as
        the default. `"agree"` (Hellweger 1997) and
        `"topological_breach"` (Lindsay 2016) raise `NotImplementedError`.

        Fill-burn algorithm:

        1. Rasterise every LineString in `streams` onto a stream mask.
        2. Lower every stream cell's elevation by `constant_drop`.
        3. Run `fill_depressions(method="priority_flood")` so the
           surrounding cells drain naturally into the channel.

        Args:
            streams: `GeoDataFrame` of LineString geometries.
            method: `"fill_burn"` (default); `"agree"` and
                `"topological_breach"` raise `NotImplementedError`.
            sharp / smooth / buffer_cells: AGREE parameters (unused for
                fill_burn).
            constant_drop: Elevation drop applied to every stream cell
                in fill_burn (default 1.0 map unit).
            max_breach_depth / max_breach_length: topological_breach
                parameters (unused for fill_burn).
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: New DEM with the conditioned surface, or None when
            `inplace=True`.

        Examples:
            - Fill-burn lowers the stream-row of a flat DEM:

                >>> import numpy as np
                >>> import geopandas as gpd
                >>> from shapely.geometry import LineString
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> # Horizontal stream down row 2 (y = -2.5).
                >>> line = LineString([(0.5, -2.5), (4.5, -2.5)])
                >>> layer = gpd.GeoDataFrame(geometry=[line], crs=4326)
                >>> burnt = dem.burn_streams(layer, constant_drop=2.0)
                >>> bool(float(burnt.values[2, 2]) < float(burnt.values[0, 0]))
                True
        """
        if method == "agree":
            # Rasterise the stream lines, then apply a gradient buffer:
            # stream cells drop by `sharp`, buffer cells drop linearly from
            # `sharp` at the stream to `0` at buffer_cells radius. The
            # cumulative drop is then offset by `smooth` so the buffer
            # perimeter sits `smooth` units lower than the original DEM.
            elev = self.values
            rows, cols = elev.shape
            gt = self.geotransform
            stream_mask = np.zeros((rows, cols), dtype=bool)
            streams = _reproject_if_needed(streams, self.epsg)
            for geom in streams.geometry:
                if geom is None or geom.is_empty:
                    continue
                if geom.geom_type == "MultiLineString":
                    for sub in geom.geoms:
                        self._rasterise_line(sub, stream_mask, gt)
                else:
                    self._rasterise_line(geom, stream_mask, gt)
            # Distance-from-stream within the buffer (cell-step BFS).
            dist = np.full((rows, cols), np.inf, dtype=np.float64)
            dist[stream_mask] = 0.0
            from collections import deque
            frontier: deque[tuple[int, int, int]] = deque(
                (int(r), int(c), 0) for r, c in zip(*np.where(stream_mask))
            )
            while frontier:
                r, c, d = frontier.popleft()
                if d >= buffer_cells:
                    continue
                for dr in (-1, 0, 1):
                    for dc in (-1, 0, 1):
                        if dr == 0 and dc == 0:
                            continue
                        nr = r + dr
                        nc = c + dc
                        if not (0 <= nr < rows and 0 <= nc < cols):
                            continue
                        if dist[nr, nc] > d + 1:
                            dist[nr, nc] = d + 1
                            frontier.append((nr, nc, d + 1))
            z = elev.astype(np.float64, copy=True)
            within_buffer = dist <= buffer_cells
            # Linear gradient: sharp at stream (dist=0) → 0 at perimeter.
            drop = np.where(
                within_buffer,
                sharp * (1.0 - dist / max(buffer_cells, 1)) + smooth,
                0.0,
            )
            z = z - drop
            no_val = self.no_data_value[0]
            z[np.isnan(z)] = no_val
            plain_ds = Dataset.dataset_like(
                self, z.astype(elev.dtype, copy=False)
            )
            if inplace:
                self._update_inplace(plain_ds.raster)
                return None
            return DEM(plain_ds.raster)

        if method == "topological_breach":
            # Lindsay 2016: rasterise the stream network onto the DEM
            # (like fill_burn but without the final priority-flood), then
            # invoke the Phase 1 least-cost breach engine so every internal
            # pit Dijkstras outward toward a stream cell. The burned stream
            # cells sit max_breach_depth-or-equivalent below their
            # surroundings, so the breach paths follow the vector topology
            # by construction.
            elev = self.values
            rows, cols = elev.shape
            gt = self.geotransform
            stream_mask = np.zeros((rows, cols), dtype=bool)
            streams = _reproject_if_needed(streams, self.epsg)
            for geom in streams.geometry:
                if geom is None or geom.is_empty:
                    continue
                if geom.geom_type == "MultiLineString":
                    for sub in geom.geoms:
                        self._rasterise_line(sub, stream_mask, gt)
                else:
                    self._rasterise_line(geom, stream_mask, gt)
            z = elev.astype(np.float64, copy=True)
            z[stream_mask] = z[stream_mask] - constant_drop
            nodata_mask = np.isnan(z)
            z = _breach_depressions_array(
                z, nodata_mask=nodata_mask, method="hybrid",
                max_depth=max_breach_depth,
                max_length=max_breach_length,
                fill_remaining=True,
            )
            no_val = self.no_data_value[0]
            z[np.isnan(z)] = no_val
            plain_ds = Dataset.dataset_like(
                self, z.astype(elev.dtype, copy=False)
            )
            if inplace:
                self._update_inplace(plain_ds.raster)
                return None
            return DEM(plain_ds.raster)

        if method != "fill_burn":
            raise NotImplementedError(
                f"method={method!r} not yet implemented (supported: "
                "'fill_burn', 'agree', 'topological_breach')"
            )

        elev = self.values
        rows, cols = elev.shape
        gt = self.geotransform
        x0, dx, _, y0, _, dy = gt
        stream_mask = np.zeros((rows, cols), dtype=bool)

        streams = _reproject_if_needed(streams, self.epsg)

        for geom in streams.geometry:
            if geom is None or geom.is_empty:
                continue
            if geom.geom_type == "MultiLineString":
                for sub in geom.geoms:
                    self._rasterise_line(sub, stream_mask, gt)
            else:
                # Delegate to the shared 2× oversampled floor-rasteriser so
                # burn_streams, enforce_breaklines, and enforce_culverts all
                # snap line samples to cells identically (I1 fix).
                self._rasterise_line(geom, stream_mask, gt)

        z = elev.astype(np.float64, copy=True)
        z[stream_mask] = z[stream_mask] - constant_drop
        nodata_mask = np.isnan(z)
        z = _fill_depressions_array(
            z, nodata_mask=nodata_mask, method="priority_flood", epsilon=0.0,
        )
        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def _rasterise_line(self, geom, mask, gt):
        """Rasterise a single LineString into `mask` using the supplied
        geotransform. Helper for `burn_streams` MultiLineString handling.
        """
        x0, dx, _, y0, _, dy = gt
        rows, cols = mask.shape
        coords = list(geom.coords)
        for (x1, y1), (x2, y2) in zip(coords[:-1], coords[1:]):
            c1 = (x1 - x0) / dx
            r1 = (y1 - y0) / dy
            c2 = (x2 - x0) / dx
            r2 = (y2 - y0) / dy
            # 2x oversampling avoids skipping cells when the line crosses
            # cell boundaries exactly between samples.
            steps = max(int(abs(r2 - r1)), int(abs(c2 - c1)), 1) * 2
            for i in range(steps + 1):
                t = i / steps
                r = int(np.floor(r1 + t * (r2 - r1)))
                c = int(np.floor(c1 + t * (c2 - c1)))
                if 0 <= r < rows and 0 <= c < cols:
                    mask[r, c] = True

    def enforce_culverts(
        self,
        roads,
        streams,
        culvert_drop: float = 0.5,
        inplace: bool = False,
    ) -> DEM | None:
        """Lower DEM cells at every stream-road intersection by
        `culvert_drop` so subsequent flow routing crosses roads instead of
        dead-ending against them. Simplified version of WhiteboxTools'
        `BurnStreamsAtRoads`.

        Args:
            roads: `GeoDataFrame` of LineString road geometries.
            streams: `GeoDataFrame` of LineString stream geometries.
            culvert_drop: Elevation drop applied to each intersection cell.
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: New DEM with culverts enforced, or None when
            `inplace=True`.
        """
        elev = self.values
        rows, cols = elev.shape
        gt = self.geotransform
        road_mask = np.zeros((rows, cols), dtype=bool)
        stream_mask = np.zeros((rows, cols), dtype=bool)
        for layer, mask in ((roads, road_mask), (streams, stream_mask)):
            layer = _reproject_if_needed(layer, self.epsg)
            for geom in layer.geometry:
                if geom is None or geom.is_empty:
                    continue
                if geom.geom_type == "MultiLineString":
                    for sub in geom.geoms:
                        self._rasterise_line(sub, mask, gt)
                else:
                    self._rasterise_line(geom, mask, gt)

        crossings = road_mask & stream_mask
        z = elev.astype(np.float64, copy=True)
        z[crossings] = z[crossings] - culvert_drop
        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def _polygon_cell_indices(self, geom, gt, rows, cols):
        """Return `(rows_idx, cols_idx)` of cells whose centre is inside `geom`.

        Uses `shapely.contains_xy` for one batched point-in-polygon test
        per polygon — orders of magnitude faster than the per-cell
        `geom.intersects(Point)` loop that this helper replaces.

        Args:
            geom: Shapely Polygon / MultiPolygon. The bounding box is used to
                clip the candidate cell range; the polygon itself decides
                which of those candidates are kept.
            gt: Six-element GDAL geotransform of this raster.
            rows: Number of rows in the raster.
            cols: Number of columns in the raster.

        Returns:
            Tuple `(rs, cs)` of int ndarrays giving the row / column
            indices of every cell whose centre lies inside `geom`. Empty
            arrays when the polygon's bounding box does not overlap the
            raster envelope.

        Examples:
            - A polygon entirely inside a single cell returns one index:

                >>> import numpy as np
                >>> from shapely.geometry import Polygon
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> ds = Dataset.create_from_array(
                ...     np.zeros((5, 5), dtype=np.float32),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> # Tight polygon around cell-centre (col=2, row=2) at (2.5, -2.5).
                >>> poly = Polygon(
                ...     [(2.4, -2.6), (2.6, -2.6), (2.6, -2.4), (2.4, -2.4)]
                ... )
                >>> rs, cs = dem._polygon_cell_indices(poly, dem.geotransform, 5, 5)
                >>> rs.tolist(), cs.tolist()
                ([2], [2])

            - A polygon entirely outside the raster envelope returns empty
              arrays:

                >>> import numpy as np
                >>> from shapely.geometry import Polygon
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> ds = Dataset.create_from_array(
                ...     np.zeros((5, 5), dtype=np.float32),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> far = Polygon(
                ...     [(100, 100), (101, 100), (101, 101), (100, 101)]
                ... )
                >>> rs, cs = dem._polygon_cell_indices(far, dem.geotransform, 5, 5)
                >>> rs.size, cs.size
                (0, 0)

            - Returned indices are integer ndarrays suitable for fancy
              indexing into the raster:

                >>> import numpy as np
                >>> from shapely.geometry import Polygon
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> ds = Dataset.create_from_array(
                ...     np.zeros((5, 5), dtype=np.float32),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> poly = Polygon([(0, 0), (3, 0), (3, -3), (0, -3)])
                >>> rs, cs = dem._polygon_cell_indices(poly, dem.geotransform, 5, 5)
                >>> np.issubdtype(rs.dtype, np.integer)
                True
        """
        import numpy as np
        import shapely

        x0, dx, _, y0, _, dy = gt
        minx, miny, maxx, maxy = geom.bounds
        c_lo = max(0, int((minx - x0) / dx))
        c_hi = min(cols, int((maxx - x0) / dx) + 1)
        r_lo = max(0, int((maxy - y0) / dy))
        r_hi = min(rows, int((miny - y0) / dy) + 1)
        if c_lo >= c_hi or r_lo >= r_hi:
            return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.int64)
        rs_idx, cs_idx = np.meshgrid(
            np.arange(r_lo, r_hi), np.arange(c_lo, c_hi), indexing="ij",
        )
        xs = x0 + (cs_idx + 0.5) * dx
        ys = y0 + (rs_idx + 0.5) * dy
        inside = shapely.contains_xy(geom, xs, ys)
        return rs_idx[inside], cs_idx[inside]

    def hydroflatten(
        self,
        water_polygons,
        method: str = "min",
        inplace: bool = False,
    ) -> DEM | None:
        """Flatten lake / pond surfaces to a single elevation per polygon.

        For each input polygon, sample the DEM cells the polygon covers
        and assign every cell in the polygon the per-polygon statistic
        (`"min"` by default — the most defensive choice for hydrology;
        `"mean"` and `"median"` are also supported).

        Args:
            water_polygons: `GeoDataFrame` of Polygon / MultiPolygon
                geometries.
            method: `"min"` (default), `"mean"`, or `"median"`.
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: Hydroflattened DEM.
        """
        if method not in ("min", "mean", "median"):
            raise ValueError(
                f"method must be 'min', 'mean', or 'median'; got {method!r}"
            )

        elev = self.values
        gt = self.geotransform
        x0, dx, _, y0, _, dy = gt
        rows, cols = elev.shape

        water_polygons = _reproject_if_needed(water_polygons, self.epsg)

        z = elev.astype(np.float64, copy=True)
        for geom in water_polygons.geometry:
            if geom is None or geom.is_empty:
                continue
            rs, cs = self._polygon_cell_indices(geom, gt, rows, cols)
            if rs.size == 0:
                continue
            vals = z[rs, cs]
            vals = vals[np.isfinite(vals)]
            if vals.size == 0:
                continue
            if method == "min":
                target = float(vals.min())
            elif method == "mean":
                target = float(vals.mean())
            else:
                target = float(np.median(vals))
            z[rs, cs] = target

        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def burn_buildings(
        self,
        building_polygons,
        lift: float = 50.0,
        inplace: bool = False,
    ) -> DEM | None:
        """Lift building footprints above the DEM by `lift` map units so
        2D flood routing flows around them.

        Args:
            building_polygons: `GeoDataFrame` of Polygon geometries.
            lift: Elevation added to every cell whose centre falls inside a
                polygon.
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: DEM with buildings raised.
        """
        elev = self.values
        gt = self.geotransform
        x0, dx, _, y0, _, dy = gt
        rows, cols = elev.shape

        building_polygons = _reproject_if_needed(building_polygons, self.epsg)

        z = elev.astype(np.float64, copy=True)
        for geom in building_polygons.geometry:
            if geom is None or geom.is_empty:
                continue
            rs, cs = self._polygon_cell_indices(geom, gt, rows, cols)
            if rs.size:
                z[rs, cs] = z[rs, cs] + lift

        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def enforce_breaklines(
        self,
        breaklines,
        lift: float = 5.0,
        inplace: bool = False,
    ) -> DEM | None:
        """Raise linear barriers (levees, walls, kerbs) above the surrounding DEM.

        Args:
            breaklines: `GeoDataFrame` of LineString geometries.
            lift: Elevation added at each rasterised cell along the lines.
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: DEM with breaklines enforced.
        """
        elev = self.values
        gt = self.geotransform
        rows, cols = elev.shape
        mask = np.zeros((rows, cols), dtype=bool)

        breaklines = _reproject_if_needed(breaklines, self.epsg)

        for geom in breaklines.geometry:
            if geom is None or geom.is_empty:
                continue
            if geom.geom_type == "MultiLineString":
                for sub in geom.geoms:
                    self._rasterise_line(sub, mask, gt)
            else:
                self._rasterise_line(geom, mask, gt)

        z = elev.astype(np.float64, copy=True)
        z[mask] = z[mask] + lift
        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def subgrid_bathymetry(
        self,
        scale_factor: int,
        n_bins: int = 10,
    ) -> "pd.DataFrame":
        """Build per-coarse-cell bathymetry tables (SFINCS-style).

        For each coarse cell (`scale_factor × scale_factor` block of fine
        cells), compute a histogram-like table mapping a coarsened water-
        depth level to the wetted area within the block. This is the
        sub-grid representation SFINCS and similar reduced-order 2D models
        use to recover small-scale topography without resolving it on the
        coarse grid.

        Args:
            scale_factor: Integer aggregation factor (>= 2).
            n_bins: Number of depth bins per coarse cell.

        Returns:
            `pandas.DataFrame` indexed by coarse-cell `(row, col)` with
            `n_bins + 2` columns: `z_min`, `z_max`, plus
            `frac_below_<k>` for `k` in `[1, n_bins]` giving the
            fraction of fine cells at or below the `k`-th depth bin.
            For flat blocks (`z_max == z_min`) every `frac_below_<k>`
            is `1.0`.

        Raises:
            ValueError: For `scale_factor < 2` or `n_bins < 1`.

        Examples:
            - A flat block produces `frac_below_<k> == 1.0` for every bin
              (B1 regression — the columns are always present):

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> ds = Dataset.create_from_array(
                ...     np.full((4, 4), 5.0, dtype=np.float32),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> df = DEM(ds.raster).subgrid_bathymetry(scale_factor=2, n_bins=3)
                >>> sorted(df.columns.tolist())
                ['frac_below_1', 'frac_below_2', 'frac_below_3', 'z_max', 'z_min']
                >>> float(df["frac_below_1"].iloc[0])
                1.0
        """
        import numpy as np
        import pandas as pd

        if scale_factor < 2:
            raise ValueError(
                f"scale_factor must be >= 2; got {scale_factor}"
            )
        if n_bins < 1:
            raise ValueError(f"n_bins must be >= 1; got {n_bins}")

        elev = self.values
        rows, cols = elev.shape
        out_rows = rows // scale_factor
        out_cols = cols // scale_factor

        records: list[dict] = []
        for br in range(out_rows):
            for bc in range(out_cols):
                block = elev[
                    br * scale_factor : (br + 1) * scale_factor,
                    bc * scale_factor : (bc + 1) * scale_factor,
                ].ravel()
                valid = block[np.isfinite(block)]
                if valid.size == 0:
                    continue
                z_min = float(valid.min())
                z_max = float(valid.max())
                rec = {"row": br, "col": bc, "z_min": z_min, "z_max": z_max}
                if z_max == z_min:
                    # Flat block: every bin is "below" the single value, so
                    # frac_below_k == 1.0 for every k. (The B1 review found
                    # that the original code computed this list but never
                    # wrote it into the record, dropping the frac columns
                    # entirely when every block was flat.)
                    for k in range(1, n_bins + 1):
                        rec[f"frac_below_{k}"] = 1.0
                else:
                    bin_edges = np.linspace(z_min, z_max, n_bins + 1)
                    for k, edge in enumerate(bin_edges[1:], start=1):
                        rec[f"frac_below_{k}"] = float(
                            (valid <= edge).sum() / valid.size
                        )
                records.append(rec)
        df = pd.DataFrame(records).set_index(["row", "col"])
        return df

    def export(
        self,
        path: str,
        target: str,
        *,
        breaklines=None,
        walls=None,
        buildings=None,
        manning_n=None,
        boundary_conditions=None,
        validate: bool = True,
        **kwargs,
    ) -> dict:
        """Export the DEM to a hydrodynamic-model format.

        Every target listed below ships with a working writer. Most were
        backfilled after the initial Phase-3 cut; `lisflood_fp` is the
        canonical Arc-ASCII writer and remains the only target that
        actually requires a sinks-free input.

        Args:
            path: Output file path.
            target: One of `"hec_ras"`, `"tuflow"`, `"sfincs"`,
                `"lisflood_fp"`, `"iber"`, `"gmsh"`.
            breaklines / walls / buildings / manning_n / boundary_conditions:
                Reserved for target-specific bundles. Currently ignored by
                the LISFLOOD-FP writer.
            validate: When `True` (default), refuse to export a DEM with
                internal sinks. **Only applied for** `target="lisflood_fp"`
                — the Arc-ASCII writer is the only target where downstream
                tooling actually requires sinks-free input. Other writers
                skip the sink scan even when `validate=True` so the
                `local_minima_8` pass does not run unnecessarily (I4
                fixup). Pass `validate=False` to also disable the
                lisflood_fp guard.
            **kwargs: Target-specific options.

        Returns:
            `dict` mapping artefact label → file path written.

        Raises:
            ValueError: For unknown `target`.
            RuntimeError: When `target == "lisflood_fp"`, `validate=True`,
                and the DEM has internal sinks.
        """
        valid_targets = {
            "hec_ras", "tuflow", "sfincs", "lisflood_fp", "iber", "gmsh",
        }
        if target not in valid_targets:
            raise ValueError(
                f"target must be one of {sorted(valid_targets)}; got {target!r}"
            )

        # Only run the (expensive) sink scan when we'll actually export to
        # an implemented target. The unimplemented targets raise
        # NotImplementedError further down and would otherwise pay the full
        # validation cost for nothing.
        if validate and target == "lisflood_fp":
            from digitalrivers._conditioning.pitremoval import local_minima_8
            sinks = local_minima_8(self.values)
            if int(sinks.sum()) > 0:
                raise RuntimeError(
                    f"DEM has {int(sinks.sum())} internal sinks; either fix "
                    "them (DEM.fill_depressions) or pass validate=False"
                )

        elev = self.values
        gt = self.geotransform
        x0, dx, _, y0, _, dy = gt
        rows, cols = elev.shape
        nodata = float(self.no_data_value[0])
        out = np.where(np.isnan(elev), nodata, elev)
        cell_size = abs(dx)
        yllcorner = y0 + rows * dy

        if target == "lisflood_fp":
            with open(path, "w", encoding="ascii", newline="\n") as fh:
                fh.write(f"ncols {cols}\n")
                fh.write(f"nrows {rows}\n")
                fh.write(f"xllcorner {x0}\n")
                fh.write(f"yllcorner {yllcorner}\n")
                fh.write(f"cellsize {cell_size}\n")
                fh.write(f"NODATA_value {nodata}\n")
                for r in range(rows):
                    fh.write(" ".join(f"{out[r, c]:.6f}" for c in range(cols)))
                    fh.write("\n")
            return {"dem_asc": path}

        if target == "hec_ras":
            # HEC-RAS Mapper expects a single-band float32 GeoTIFF in the
            # dataset CRS with consistent geotransform — exactly what
            # Dataset.create_from_array(driver_type="GTiff", path=...) writes.
            Dataset.create_from_array(
                out.astype(np.float32, copy=False),
                geo=gt, epsg=self.epsg,
                no_data_value=nodata,
                driver_type="GTiff", path=path,
            )
            return {"dem_tif": path}

        if target == "tuflow":
            # ESRI floating-point grid (.flt binary, row-major little-endian
            # float32, top-left first) + .hdr text header.
            flt_path = path if path.endswith(".flt") else path + ".flt"
            hdr_path = flt_path[:-4] + ".hdr"
            out.astype(np.float32, copy=False).tofile(flt_path)
            with open(hdr_path, "w") as fh:
                fh.write(f"ncols {cols}\n")
                fh.write(f"nrows {rows}\n")
                fh.write(f"xllcorner {x0}\n")
                fh.write(f"yllcorner {yllcorner}\n")
                fh.write(f"cellsize {cell_size}\n")
                fh.write(f"NODATA_value {nodata}\n")
                fh.write("byteorder LSBFIRST\n")
            return {"dem_flt": flt_path, "dem_hdr": hdr_path}

        if target == "sfincs":
            # SFINCS .dep: row-major little-endian float32, no header.
            # Companion .msk: 0 where no-data, 1 elsewhere.
            dep_path = path if path.endswith(".dep") else path + ".dep"
            msk_path = dep_path[:-4] + ".msk"
            out.astype(np.float32, copy=False).tofile(dep_path)
            mask = np.where(np.isnan(elev), 0, 1).astype(np.uint8)
            mask.tofile(msk_path)
            return {"dem_dep": dep_path, "dem_msk": msk_path}

        if target == "gmsh":
            # Minimal .geo script: define the DEM bounds as a rectangle
            # with a uniform characteristic length. Downstream meshers can
            # be run via `gmsh -2 <path>`.
            geo_path = path if path.endswith(".geo") else path + ".geo"
            ext_x_lo = x0
            ext_x_hi = x0 + cols * dx
            ext_y_hi = y0
            ext_y_lo = y0 + rows * dy
            cl = cell_size
            with open(geo_path, "w") as fh:
                fh.write(f"cl = {cl};\n")
                fh.write(f"Point(1) = {{{ext_x_lo}, {ext_y_lo}, 0, cl}};\n")
                fh.write(f"Point(2) = {{{ext_x_hi}, {ext_y_lo}, 0, cl}};\n")
                fh.write(f"Point(3) = {{{ext_x_hi}, {ext_y_hi}, 0, cl}};\n")
                fh.write(f"Point(4) = {{{ext_x_lo}, {ext_y_hi}, 0, cl}};\n")
                fh.write("Line(1) = {1, 2};\n")
                fh.write("Line(2) = {2, 3};\n")
                fh.write("Line(3) = {3, 4};\n")
                fh.write("Line(4) = {4, 1};\n")
                fh.write("Line Loop(1) = {1, 2, 3, 4};\n")
                fh.write("Plane Surface(1) = {1};\n")
            return {"geo": geo_path}

        if target == "iber":
            # Iber expects a .dat ascii mesh; pending mesh generation
            # (Phase 4 P33) we write a placeholder boundary file that the
            # user can refine in Iber's pre-processor.
            dat_path = path if path.endswith(".dat") else path + ".dat"
            with open(dat_path, "w") as fh:
                fh.write("# Iber mesh boundary (auto-generated)\n")
                fh.write(f"NCOLS {cols}\nNROWS {rows}\n")
                fh.write(f"XLLCORNER {x0}\nYLLCORNER {yllcorner}\n")
                fh.write(f"CELLSIZE {cell_size}\nNODATA {nodata}\n")
                for r in range(rows):
                    fh.write(" ".join(f"{out[r, c]:.6f}" for c in range(cols)))
                    fh.write("\n")
            return {"dem_dat": dat_path}

        # Unreachable — guarded by valid_targets check above.
        raise NotImplementedError(target)

    def anudem_interpolate(
        self,
        mask=None,
        max_iter: int = 200,
        tol: float = 1e-3,
        method: str = "laplacian",
        inplace: bool = False,
    ) -> DEM | None:
        """ANUDEM-lite: Laplacian-relaxation gap fill (P25).

        A pragmatic subset of Hutchinson 1989 ANUDEM that handles the
        common gap-filling case: a DEM with NaN holes (cloud shadows,
        survey gaps, vegetation occlusion) is filled by Gauss-Seidel
        Laplacian relaxation, holding the known cells fixed. Each
        iteration replaces every unknown cell with the mean of its four
        4-connected neighbours; iteration stops when the maximum change
        in a sweep drops below `tol` or after `max_iter` sweeps.

        Two solver methods are available:

        - `"laplacian"` (default): solves Δz = 0 via the 4-neighbour
          mean iteration. Fast, smooth interior, but only C⁰ continuity
          at the anchor cells — the surface has visible "kinks" at
          known points.
        - `"biharmonic"`: solves Δ²z = 0 by alternating two Laplacian
          sweeps (relax `u = Δz`, then relax `z` so `Δz = u`).
          C¹ continuity at anchors; closer to Hutchinson 1989 ANUDEM's
          tension-spline objective but still without multigrid
          acceleration or drainage enforcement.

        Limitations vs full ANUDEM:

        - No multigrid acceleration; iteration cost is O(N · max_iter).
        - No drainage enforcement. For stream-conditioned DEMs, combine
          with :meth:`burn_streams` before or after.
        - No tension parameter (Hutchinson's λ); the biharmonic mode
          is a fixed-λ approximation.

        Args:
            mask: Optional bool array same shape as the DEM. `True`
                marks cells whose values must be preserved (in addition
                to the existing finite cells). `None` keeps every
                finite cell fixed.
            max_iter: Maximum relaxation sweeps.
            tol: Convergence tolerance — stop when `max |Δz| < tol`.
            method: `"laplacian"` (default) or `"biharmonic"`.
            inplace: If True, update the instance; else return a new DEM.

        Returns:
            DEM | None: Filled DEM, or None when `inplace=True`.

        Raises:
            ValueError: If the input DEM has no finite cells or `method`
                is not `"laplacian"` / `"biharmonic"`.

        Examples:
            - Fill a single-cell NaN hole with the default Laplacian solver;
              the filled value sits inside the bracketing range:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.array(
                ...     [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]],
                ...     dtype=np.float32,
                ... )
                >>> ds = Dataset.create_from_array(
                ...     np.where(np.isnan(z), -9999.0, z),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> filled = DEM(ds.raster).anudem_interpolate(
                ...     method="laplacian", max_iter=200, tol=1e-6,
                ... )
                >>> out = filled.values
                >>> bool(1.0 <= out[1, 1] <= 9.0)
                True

            - The biharmonic mode (P32 backfill) approximates Hutchinson
              1989's Delta^2 z = 0 by alternating Laplacian sweeps:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.array(
                ...     [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]],
                ...     dtype=np.float32,
                ... )
                >>> ds = Dataset.create_from_array(
                ...     np.where(np.isnan(z), -9999.0, z),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> filled = DEM(ds.raster).anudem_interpolate(
                ...     method="biharmonic", max_iter=200, tol=1e-5,
                ... )
                >>> bool(np.isfinite(filled.values).all())
                True

            - Unknown method raises ValueError:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> ds = Dataset.create_from_array(
                ...     np.ones((2, 2), dtype=np.float32),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> DEM(ds.raster).anudem_interpolate(method="bogus")
                Traceback (most recent call last):
                    ...
                ValueError: method must be 'laplacian' or 'biharmonic'; got 'bogus'

            - The 5-point stencil uses edge-replication (Neumann) boundary
              padding, so a NaN cell adjacent to the raster boundary is
              filled from its in-bounds neighbours only — no periodic-wrap
              contamination from the opposite edge:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> # Top-left NaN with very different anchor at bottom-right.
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> z[-1, -1] = -100.0
                >>> z[0, 0] = np.nan
                >>> ds = Dataset.create_from_array(
                ...     np.where(np.isnan(z), -9999.0, z),
                ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> filled = DEM(ds.raster).anudem_interpolate(
                ...     method="laplacian", max_iter=300, tol=1e-6,
                ... )
                >>> bool(abs(float(filled.values[0, 0]) - 10.0) < 5.0)
                True

        See Also:
            DEM.fill_depressions: hydrologic conditioning that removes sinks.
            DEM.burn_streams: stream-network drainage enforcement.
        """
        if method not in ("laplacian", "biharmonic"):
            raise ValueError(
                f"method must be 'laplacian' or 'biharmonic'; got {method!r}"
            )

        elev = self.values
        rows, cols = elev.shape
        z = elev.astype(np.float64, copy=True)
        fixed = np.isfinite(z)
        if mask is not None:
            fixed = fixed | mask.astype(bool, copy=False)
        if not fixed.any():
            raise ValueError(
                "anudem_interpolate needs at least one finite anchor cell"
            )
        # Seed unknown cells to the mean of known values to speed convergence.
        z = np.where(fixed, z, z[fixed].mean())

        def _edge_shifts(arr):
            """Return (north, south, east, west) views of `arr` with
            edge-replication boundary handling (no periodic wrap).

            Using `np.pad(..., mode="edge")` matches a Neumann (zero
            normal-derivative) boundary, which is the natural choice for
            an interpolation kernel — the original `np.roll` formed a
            torus and injected far-edge values into near-edge cells,
            corrupting anchors near the DEM boundary.
            """
            padded = np.pad(arr, 1, mode="edge")
            return (
                padded[:-2, 1:-1],
                padded[2:, 1:-1],
                padded[1:-1, 2:],
                padded[1:-1, :-2],
            )

        if method == "laplacian":
            for _ in range(max_iter):
                north, south, east, west = _edge_shifts(z)
                new_z = (north + south + east + west) / 4.0
                new_z[fixed] = z[fixed]
                diff = float(np.max(np.abs(new_z - z)))
                z = new_z
                if diff < tol:
                    break
        else:  # biharmonic
            # Alternate two Laplacian sweeps to approximate Δ²z = 0.
            # Step A: compute u = Δz on the current z.
            # Step B: relax z so Δz ≈ smoothed u (mean of neighbour u's).
            # Composed, this approximates a biharmonic relaxation with C¹
            # continuity at the anchors.
            for _ in range(max_iter):
                north, south, east, west = _edge_shifts(z)
                u = north + south + east + west - 4.0 * z
                un, us, ue, uw = _edge_shifts(u)
                u_smooth = (un + us + ue + uw) / 4.0
                # Solve Δz = u_smooth → new z[i,j] =
                # (sum of neighbours - u_smooth) / 4.
                new_z = (north + south + east + west - u_smooth) / 4.0
                new_z[fixed] = z[fixed]
                diff = float(np.max(np.abs(new_z - z)))
                z = new_z
                if diff < tol:
                    break

        no_val = self.no_data_value[0]
        z[~np.isfinite(z)] = no_val
        plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    def fill_sinks(self, inplace: bool = False) -> DEM | None:
        """Deprecated alias for `fill_depressions(method="priority_flood", epsilon=0.1)`.

        The original implementation was a single-pass, single-cell sink fill that did
        not cascade through nested pits. Calls now route through the Priority-Flood +
        epsilon algorithm, which is correct on cascading depressions. The output
        differs from the historical algorithm in two ways:

        1. Cascading pits are fully resolved (each pit fills to the rim of its enclosing
           pit, not just to its immediate-neighbour minimum).
        2. Drainage paths within filled depressions inherit a 0.1-unit gradient — so
           D8 routing on the result avoids `NO_FLOW` cells inside the fill.

        Args:
            inplace: If `True` the instance is updated in place; otherwise a new
                `DEM` is returned.

        Returns:
            DEM | None: New `DEM` with the sink-free elevation, or `None` when
            `inplace` is `True`.
        """
        warnings.warn(
            "DEM.fill_sinks is deprecated; use DEM.fill_depressions(method='priority_flood', "
            "epsilon=0.1) for equivalent behaviour or method='wang_liu' for a flat fill.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.fill_depressions(method="priority_flood", epsilon=0.1, inplace=inplace)

    def _get_8_direction_slopes(self) -> np.ndarray:
        """Compute slopes to all eight neighbours for every cell.

        Uses a padded elevation array and vectorised NumPy slicing to
        calculate the elevation difference divided by the inter-cell
        distance (cell size for cardinal, cell size × √2 for diagonal)
        in each of the eight D8 directions.

        Returns:
            np.ndarray: 3-D `float32` array of shape
                `(rows, columns, 8)` where the third axis corresponds
                to the direction indices defined in `DIR_OFFSETS`.
        """
        elev = self.values
        cell_size = self.cell_size
        dist2 = cell_size * np.sqrt(2)
        distances = [
            cell_size,
            dist2,
            cell_size,
            dist2,
            cell_size,
            dist2,
            cell_size,
            dist2,
        ]
        rows, cols = elev.shape
        slopes = np.full((rows, cols, 8), np.nan, dtype=np.float32)

        # padding = 2
        # pad_1 = padding - 1
        # Create a padded elevation array for boundary conditions
        padded_elev = np.full((rows + 2, cols + 2), np.nan, dtype=np.float32)
        padded_elev[1:-1, 1:-1] = elev

        # Calculate elevation differences using slicing
        diff_right = padded_elev[1:-1, 1:-1] - padded_elev[1:-1, 2:]
        diff_top_right = padded_elev[1:-1, 1:-1] - padded_elev[:-2, 2:]
        diff_top = padded_elev[1:-1, 1:-1] - padded_elev[:-2, 1:-1]
        diff_top_left = padded_elev[1:-1, 1:-1] - padded_elev[:-2, :-2]
        diff_left = padded_elev[1:-1, 1:-1] - padded_elev[1:-1, :-2]
        diff_bottom_left = padded_elev[1:-1, 1:-1] - padded_elev[2:, :-2]
        diff_bottom = padded_elev[1:-1, 1:-1] - padded_elev[2:, 1:-1]
        diff_bottom_right = padded_elev[1:-1, 1:-1] - padded_elev[2:, 2:]

        # Calculate slopes
        slopes[:, :, 0] = diff_bottom / distances[0]
        slopes[:, :, 1] = diff_bottom_left / distances[1]
        slopes[:, :, 2] = diff_left / distances[2]
        slopes[:, :, 3] = diff_top_left / distances[3]
        slopes[:, :, 4] = diff_top / distances[4]
        slopes[:, :, 5] = diff_top_right / distances[5]
        slopes[:, :, 6] = diff_right / distances[6]
        slopes[:, :, 7] = diff_bottom_right / distances[7]

        return slopes

    def twi(
        self,
        accumulation,
        slope_deg: Dataset | None = None,
    ) -> Dataset:
        """Topographic Wetness Index (Beven & Kirkby 1979).

        TWI = ln(SCA / tan(slope)) where SCA = specific catchment area =
        `accumulation_cells * cell_size`. High TWI values mark cells likely
        to be wet (low slope and / or large upstream area). Slope is treated
        in radians internally; values below a small floor (≈ 0.06°) are
        clamped to avoid log-singularity at flat cells.

        Args:
            accumulation: `Accumulation` raster aligned to this DEM.
            slope_deg: Optional pre-computed slope raster in degrees. If
                None, `Terrain.slope`-equivalent is computed on the fly via
                `arctan` of `DEM.slope()`'s rise/run output.

        Returns:
            `Dataset` of float32 TWI values. No-data sentinel `-9999.0`.

        References:
            Beven, K. J. & Kirkby, M. J. (1979). "A physically based,
            variable contributing area model of basin hydrology."
            *Hydrological Sciences Bulletin* 24(1): 43-69.
        """
        return self._area_slope_index(accumulation, slope_deg, kind="twi")

    def spi(
        self,
        accumulation,
        slope_deg: Dataset | None = None,
    ) -> Dataset:
        """Stream Power Index (Moore et al. 1991).

        SPI = SCA * tan(slope). Proportional to the rate at which overland
        flow does work at a cell; useful as a proxy for erosion risk.

        Args:
            accumulation: `Accumulation` raster aligned to this DEM.
            slope_deg: Optional pre-computed slope raster in degrees. If
                None, computed on the fly.

        Returns:
            `Dataset` of float32 SPI values. No-data sentinel `-9999.0`.
        """
        return self._area_slope_index(accumulation, slope_deg, kind="spi")

    def sti(
        self,
        accumulation,
        slope_deg: Dataset | None = None,
    ) -> Dataset:
        """Sediment Transport Index (Moore & Burch 1986).

        STI = (SCA / 22.13)^0.6 * (sin(slope) / 0.0896)^1.3. The 22.13 m and
        0.0896 m/m constants come from the original USLE plot length and
        slope; STI predicts where overland flow will transport sediment
        rather than deposit it.

        Args:
            accumulation: `Accumulation` raster aligned to this DEM.
            slope_deg: Optional pre-computed slope raster in degrees.

        Returns:
            `Dataset` of float32 STI values. No-data sentinel `-9999.0`.
        """
        return self._area_slope_index(accumulation, slope_deg, kind="sti")

    def _area_slope_index(
        self,
        accumulation,
        slope_deg,
        *,
        kind: str,
    ) -> Dataset:
        """Shared kernel for the TWI / SPI / STI family.

        All three indices need `(SCA, slope_rad)`; the difference is the
        functional form applied to them.
        """
        if slope_deg is None:
            slope_ratio = self.slope().read_array().astype(np.float64, copy=False)
            slope_deg_arr = np.degrees(np.arctan(np.where(
                np.isfinite(slope_ratio), slope_ratio, 0.0
            )))
        else:
            slope_deg_arr = slope_deg.read_array().astype(np.float64, copy=False)
            no_val = (
                slope_deg.no_data_value[0] if slope_deg.no_data_value else None
            )
            if no_val is not None:
                slope_deg_arr = np.where(
                    slope_deg_arr == no_val, np.nan, slope_deg_arr
                )

        acc = accumulation.read_array().astype(np.float64, copy=False)
        if slope_deg_arr.shape != acc.shape:
            raise ValueError(
                f"slope shape {slope_deg_arr.shape} != accumulation shape "
                f"{acc.shape}"
            )

        slope_rad = np.deg2rad(slope_deg_arr)
        # Floor at ~0.001 rad (≈ 0.06°) to keep tan() / sin() bounded away
        # from zero on flats.
        slope_rad = np.where(
            np.isfinite(slope_rad), np.maximum(slope_rad, 1.0e-3), np.nan,
        )

        cs = float(abs(self.geotransform[1]))
        sca = acc * cs  # specific catchment area (m, since width ≈ cell_size)

        with np.errstate(divide="ignore", invalid="ignore"):
            if kind == "twi":
                arr = np.log(sca / np.tan(slope_rad))
            elif kind == "spi":
                arr = sca * np.tan(slope_rad)
            elif kind == "sti":
                arr = ((sca / 22.13) ** 0.6) * (
                    (np.sin(slope_rad) / 0.0896) ** 1.3
                )
            else:  # pragma: no cover — defensive
                raise ValueError(f"unknown kind {kind!r}")

        no_val = -9999.0
        arr = np.where(np.isfinite(arr), arr, no_val).astype(np.float32)
        return Dataset.create_from_array(
            arr,
            geo=self.geotransform,
            epsg=self.epsg,
            no_data_value=no_val,
        )

    def _focal_window_stats(
        self,
        window: int,
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Shared focal-window kernel for the terrain-index family.

        Returns `(z, focal_mean, focal_sd)` where the focal stats are
        rectangular `window×window` reductions, **no-data-aware**: the mean
        and SD at each cell are computed only over the valid (non-NaN)
        cells inside its window. Cells whose window contains zero valid
        neighbours receive NaN — the per-caller wrapping converts those to
        the DEM's no-data sentinel.

        Used by `tpi`, `deviation_from_mean`, and `elev_std`. `ruggedness`
        has its own per-shift kernel and does not call this helper.
        """
        from scipy.ndimage import uniform_filter
        if window < 1:
            raise ValueError(f"window must be >= 1; got {window!r}")
        z = self.values.astype(np.float64, copy=False)
        valid = ~np.isnan(z)
        valid_f = valid.astype(np.float64)
        z_filled = np.where(valid, z, 0.0)
        # No-data-aware focal mean: total signal / count of valid neighbours
        # in each window. Cells whose window has zero valid neighbours yield
        # NaN (0 / 0); the caller's no-data wrapping converts those to the
        # DEM's sentinel.
        sum_z = uniform_filter(z_filled, size=int(window), mode="reflect")
        sum_zz = uniform_filter(z_filled * z_filled, size=int(window), mode="reflect")
        # `uniform_filter` averages by default — multiply by the window area
        # to recover unscaled sums so the same divisor (count of valid cells
        # in the window) applies to both numerator and second moment.
        n_window = float(int(window) * int(window))
        sum_z *= n_window
        sum_zz *= n_window
        count = uniform_filter(valid_f, size=int(window), mode="reflect") * n_window
        with np.errstate(invalid="ignore", divide="ignore"):
            m = sum_z / count
            ex2 = sum_zz / count
            sd = np.sqrt(np.maximum(ex2 - m * m, 0.0))
        return z, m, sd

    def tpi(self, window: int = 3) -> Dataset:
        """Topographic Position Index (Guisan 1999).

        TPI = `z - focal_mean(z, window)`. Positive values mark ridges and
        upland positions; negative values mark valleys and depressions.

        Args:
            window: Side length of the focal window in cells (must be ≥ 1).
                Defaults to 3 (a 3×3 neighbourhood). Larger windows pick up
                regional / catchment-scale topography; smaller windows pick
                up local relief.

        Returns:
            `Dataset` of float32 TPI values. No-data cells use this DEM's
            no-data sentinel.

        References:
            Guisan, A., Weiss, S. B., & Weiss, A. D. (1999). "GLM versus
            CCA spatial modeling of plant species distribution." *Plant
            Ecology* 143(1): 107-122.

        Examples:
            - A flat surface has every cell at its focal mean → TPI = 0:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> tpi = DEM(ds.raster).tpi(window=3).read_array()
                >>> bool(np.allclose(tpi, 0.0))
                True

            - A single ridge cell on flat terrain reports positive TPI; a
              pit reports negative:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.zeros((5, 5), dtype=np.float32)
                >>> z[2, 2] = 9.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(DEM(ds.raster).tpi(window=3).read_array()[2, 2] > 0)
                True
        """
        z, focal_mean, _focal_sd = self._focal_window_stats(window)
        out = (z - focal_mean).astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(out), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def deviation_from_mean(self, window: int = 3) -> Dataset:
        """Deviation from mean elevation — standardised TPI.

        `(z - focal_mean) / focal_sd`. Dimensionless ridge / valley index;
        because it normalises by local roughness it allows comparing
        positions across regimes with very different relief.

        Args:
            window: Side length of the focal window in cells (≥ 1).

        Returns:
            `Dataset` of float32 deviation values. Flat cells (focal_sd ≈ 0)
            yield 0.0 by definition. No-data cells use this DEM's no-data
            sentinel.

        Examples:
            - Flat terrain yields zero deviation everywhere:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((4, 4), 5.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(np.allclose(
                ...     DEM(ds.raster).deviation_from_mean(window=3).read_array(), 0.0
                ... ))
                True

            - A peak reports positive standardised deviation:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.zeros((5, 5), dtype=np.float32)
                >>> z[2, 2] = 10.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> out = DEM(ds.raster).deviation_from_mean(window=3).read_array()
                >>> bool(out[2, 2] > 0)
                True
        """
        z, focal_mean, focal_sd = self._focal_window_stats(window)
        out = (z - focal_mean) / np.where(focal_sd == 0.0, 1.0, focal_sd)
        out = out.astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(out), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def elev_std(self, window: int = 3) -> Dataset:
        """Standard deviation of elevation in a focal window.

        Pure focal-window SD on the elevation raster. A roughness proxy:
        high values mark varied terrain, low values mark smooth terrain.

        Args:
            window: Side length of the focal window in cells (≥ 1).

        Returns:
            `Dataset` of float32 SD values. No-data cells use this DEM's
            no-data sentinel.

        Examples:
            - Flat terrain reports zero SD everywhere:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((4, 4), 5.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(np.allclose(
                ...     DEM(ds.raster).elev_std(window=3).read_array(), 0.0
                ... ))
                True

            - A step in elevation produces non-zero SD along the boundary:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.zeros((5, 5), dtype=np.float32)
                >>> z[:, 3:] = 10.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> sd = DEM(ds.raster).elev_std(window=3).read_array()
                >>> bool((sd[:, 2] > 0).all())
                True
        """
        _z, _focal_mean, focal_sd = self._focal_window_stats(window)
        out = focal_sd.astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(out), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def curvature(self, kind: str = "profile") -> Dataset:
        """Surface curvature (Zevenbergen & Thorne 1987).

        Fits a partial quartic polynomial `z = Ax²y² + Bx²y + Cxy² + Dx² +
        Ey² + Fxy + Gx + Hy + I` to the 3×3 neighbourhood of each cell and
        evaluates one of the five canonical curvature variants from the
        coefficient grid:

        * `"plan"` — curvature perpendicular to the slope direction;
          positive on diverging slopes, negative on converging ones.
        * `"profile"` — curvature parallel to the slope direction; positive
          on convex (decelerating) slopes, negative on concave (accelerating)
          ones.
        * `"total"` — `2 * (D + E)`; sign-independent total relief.
        * `"mean"` — average of the two principal curvatures.
        * `"gaussian"` — product of the two principal curvatures.

        Args:
            kind: One of `"plan"`, `"profile"`, `"total"`, `"mean"`,
                `"gaussian"`. Defaults to `"profile"`.

        Returns:
            `Dataset` of float32 curvature values. No-data cells use this
            DEM's no-data sentinel.

        Raises:
            ValueError: If `kind` is not one of the five recognised
                variants.

        References:
            Zevenbergen, L. W. & Thorne, C. R. (1987). "Quantitative
            analysis of land surface topography." *Earth Surface Processes
            and Landforms* 12(1): 47-56.

        Examples:
            - Every curvature variant is zero on a flat DEM:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(np.allclose(
                ...     DEM(ds.raster).curvature(kind="total").read_array(), 0.0
                ... ))
                True

            - Mean curvature equals total / 2 on a paraboloid (interior):

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> x, y = np.meshgrid(np.arange(-3, 4), np.arange(-3, 4))
                >>> z = (-(x * x + y * y)).astype(np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> total = dem.curvature(kind="total").read_array()[2:-2, 2:-2]
                >>> mean = dem.curvature(kind="mean").read_array()[2:-2, 2:-2]
                >>> bool(np.allclose(mean, total / 2.0, atol=1e-5))
                True
        """
        if kind not in ("plan", "profile", "total", "mean", "gaussian"):
            raise ValueError(
                f"kind must be one of 'plan', 'profile', 'total', 'mean', "
                f"'gaussian'; got {kind!r}"
            )
        z = self.values.astype(np.float64, copy=False)
        L = float(abs(self.geotransform[1]))
        # 3×3 stencil — `np.pad(mode="edge")` mirrors the boundary value so
        # finite differences stay defined at the raster edge.
        zp = np.pad(np.where(np.isnan(z), 0.0, z), 1, mode="edge")
        z1, z2, z3 = zp[:-2, :-2], zp[:-2, 1:-1], zp[:-2, 2:]
        z4, z5, z6 = zp[1:-1, :-2], zp[1:-1, 1:-1], zp[1:-1, 2:]
        z7, z8, z9 = zp[2:, :-2], zp[2:, 1:-1], zp[2:, 2:]
        # Zevenbergen-Thorne polynomial coefficients.
        D = ((z4 + z6) / 2.0 - z5) / (L * L)
        E = ((z2 + z8) / 2.0 - z5) / (L * L)
        F = (-z1 + z3 + z7 - z9) / (4.0 * L * L)
        G = (-z4 + z6) / (2.0 * L)
        H = (z2 - z8) / (2.0 * L)
        denom = G * G + H * H + 1.0e-12
        with np.errstate(invalid="ignore", divide="ignore"):
            if kind == "plan":
                arr = -2.0 * (D * H * H + E * G * G - F * G * H) / denom
            elif kind == "profile":
                arr = 2.0 * (D * G * G + E * H * H + F * G * H) / denom
            elif kind == "total":
                arr = 2.0 * (D + E)
            elif kind == "mean":
                arr = D + E
            else:  # gaussian
                arr = 4.0 * D * E - F * F
        out = arr.astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(z) | ~np.isfinite(out), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def normal_vector_deviation(self, window: int = 3) -> Dataset:
        """Per-cell angular deviation of the surface normal from its focal mean.

        Computes each cell's outward-pointing surface normal from finite
        differences of the elevation grid, then takes the focal-mean of the
        unit-normal components in a `window×window` neighbourhood. The
        result at each cell is the angle (in radians) between the local
        normal and the focal-mean normal — a roughness metric that grows
        with how strongly the surface bends within the window.

        Args:
            window: Side length of the focal window in cells (≥ 1).

        Returns:
            `Dataset` of float32 angular deviations in radians,
            `[0, π/2]`. No-data cells use this DEM's no-data sentinel.

        Examples:
            - Flat terrain yields zero angular deviation:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(np.allclose(
                ...     DEM(ds.raster).normal_vector_deviation(window=3).read_array(),
                ...     0.0, atol=1e-5,
                ... ))
                True

            - A constant-slope plane has identical normals in its deep
              interior, so deviation there is ~0:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> x, y = np.meshgrid(np.arange(7), np.arange(7))
                >>> z = (2.0 * x + y).astype(np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> arr = DEM(ds.raster).normal_vector_deviation(window=3).read_array()
                >>> bool(np.allclose(arr[2:-2, 2:-2], 0.0, atol=1e-4))
                True
        """
        from scipy.ndimage import uniform_filter
        if window < 1:
            raise ValueError(f"window must be >= 1; got {window!r}")
        z = self.values.astype(np.float64, copy=False)
        L = float(abs(self.geotransform[1]))
        # Finite-difference partials with reflective edge padding.
        zp = np.pad(np.where(np.isnan(z), 0.0, z), 1, mode="edge")
        dzdx = (zp[1:-1, 2:] - zp[1:-1, :-2]) / (2.0 * L)
        dzdy = (zp[2:, 1:-1] - zp[:-2, 1:-1]) / (2.0 * L)
        # Outward-pointing normal `(-dz/dx, -dz/dy, 1)`; renormalise to unit.
        nx_raw = -dzdx
        ny_raw = -dzdy
        nz_raw = np.ones_like(z)
        nm = np.sqrt(nx_raw * nx_raw + ny_raw * ny_raw + nz_raw * nz_raw)
        nx = nx_raw / nm
        ny = ny_raw / nm
        nz = nz_raw / nm
        # Focal mean of each component, then renormalise to keep the mean
        # vector unit-length.
        mnx = uniform_filter(nx, size=int(window), mode="reflect")
        mny = uniform_filter(ny, size=int(window), mode="reflect")
        mnz = uniform_filter(nz, size=int(window), mode="reflect")
        mm = np.sqrt(mnx * mnx + mny * mny + mnz * mnz) + 1.0e-12
        mnx /= mm
        mny /= mm
        mnz /= mm
        cos_theta = np.clip(nx * mnx + ny * mny + nz * mnz, -1.0, 1.0)
        out = np.arccos(cos_theta).astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(z), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def openness(
        self,
        *,
        search_radius: int = 10,
        kind: str = "positive",
    ) -> Dataset:
        """Topographic openness (Yokoyama 2002).

        For each cell, walks outward along 8 azimuths up to `search_radius`
        cells and records the maximum elevation angle (positive openness)
        or the minimum (negative openness) along each walk. The per-cell
        output is the mean of `(π/2 - horizon_angle)` across the 8
        directions, in radians.

        High positive openness marks exposed / high-relief locations; high
        negative openness marks deep depressions / valley floors.

        Args:
            search_radius: Maximum walk distance in cells. Must be ≥ 1.
                Defaults to 10.
            kind: `"positive"` (default) or `"negative"`. Negative openness
                flips the sign of the elevation difference internally —
                effectively measuring the local pit / depression depth.

        Returns:
            `Dataset` of float32 openness values in radians. No-data cells
            use this DEM's no-data sentinel.

        Raises:
            ValueError: If `kind` is not one of `"positive"` / `"negative"`
                or `search_radius < 1`.

        References:
            Yokoyama, R., Shirasawa, M., & Pike, R. J. (2002). "Visualizing
            topography by openness: A new application of image processing
            to digital elevation models." *Photogrammetric Engineering and
            Remote Sensing* 68(3): 257-265.

        Examples:
            - Flat terrain: every horizon angle is 0, so positive openness
              is `π/2` at every cell:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> arr = DEM(ds.raster).openness(search_radius=2).read_array()
                >>> bool(np.allclose(arr, np.pi / 2.0, atol=1e-5))
                True

            - A peak on flat terrain has strictly larger positive openness
              than its neighbours (which look up at it):

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.zeros((5, 5), dtype=np.float32)
                >>> z[2, 2] = 10.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> arr = DEM(ds.raster).openness(search_radius=3).read_array()
                >>> bool(arr[2, 2] > arr[1, 2])
                True
        """
        from digitalrivers._numba import horizon_walk_kernel
        if kind not in ("positive", "negative"):
            raise ValueError(
                f"kind must be 'positive' or 'negative'; got {kind!r}"
            )
        if search_radius < 1:
            raise ValueError(
                f"search_radius must be >= 1; got {search_radius!r}"
            )
        z = self.values.astype(np.float64, copy=False)
        # For negative openness, flip the elevation so the kernel's
        # "maximum upward angle" becomes "maximum downward angle" relative
        # to the original surface.
        z_in = (-z if kind == "negative" else z).astype(np.float64, copy=False)
        z_filled = np.where(np.isnan(z_in), 0.0, z_in)
        out = horizon_walk_kernel(
            z_filled, float(abs(self.geotransform[1])), int(search_radius), 0,
        ).astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(z), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def sky_view_factor(
        self,
        *,
        search_radius: int = 10,
    ) -> Dataset:
        """Sky-view factor (Zakšek et al. 2011).

        For each cell, the fraction of the upper hemisphere that is visible
        from that cell. Walks along 8 azimuths up to `search_radius`,
        records the maximum elevation angle along each walk, and returns
        the mean of `(1 - sin(horizon_angle))` across the 8 directions —
        equivalent to the fraction of an isotropic sky dome not occluded
        by surrounding terrain.

        Shares the horizon-walk kernel with `openness` (W-27); the two
        differ only in the per-direction aggregation function.

        Args:
            search_radius: Maximum walk distance in cells. Must be ≥ 1.
                Defaults to 10.

        Returns:
            `Dataset` of float32 SVF values in `[0, 1]`. No-data cells use
            this DEM's no-data sentinel.

        Raises:
            ValueError: If `search_radius < 1`.

        References:
            Zakšek, K., Oštir, K., & Kokalj, Ž. (2011). "Sky-view factor as
            a relief visualization technique." *Remote Sensing* 3(2):
            398-415.

        Examples:
            - Flat terrain: nothing occludes the sky, SVF = 1 everywhere:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array()
                >>> bool(np.allclose(arr, 1.0, atol=1e-5))
                True

            - A pit surrounded by higher cells reports SVF strictly less
              than 1 (the walls occlude part of the sky):

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((5, 5), 10.0, dtype=np.float32)
                >>> z[2, 2] = 0.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array()
                >>> bool(arr[2, 2] < 1.0)
                True
        """
        from digitalrivers._numba import horizon_walk_kernel
        if search_radius < 1:
            raise ValueError(
                f"search_radius must be >= 1; got {search_radius!r}"
            )
        z = self.values.astype(np.float64, copy=False)
        z_filled = np.where(np.isnan(z), 0.0, z)
        out = horizon_walk_kernel(
            z_filled, float(abs(self.geotransform[1])), int(search_radius), 1,
        ).astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(z), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def ruggedness(self, window: int = 3) -> Dataset:
        """Terrain Ruggedness Index (Riley et al. 1999).

        Per-cell mean of absolute elevation differences to every other cell
        in a `window×window` neighbourhood. Output unit is the DEM elevation
        unit (metres). Higher values mark rougher terrain; flat terrain is
        zero.

        Args:
            window: Side length of the focal window in cells (≥ 1).
                Defaults to 3 (Riley's original 3×3 neighbourhood).

        Returns:
            `Dataset` of float32 ruggedness values. No-data cells use this
            DEM's no-data sentinel.

        References:
            Riley, S. J., DeGloria, S. D., & Elliot, R. (1999). "A terrain
            ruggedness index that quantifies topographic heterogeneity."
            *Intermountain Journal of Sciences* 5(1-4): 23-27.

        Examples:
            - Flat terrain has zero ruggedness everywhere:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.full((4, 4), 5.0, dtype=np.float32)
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> bool(np.allclose(
                ...     DEM(ds.raster).ruggedness(window=3).read_array(), 0.0
                ... ))
                True

            - A peak surrounded by flat terrain contributes positive
              ruggedness at the peak and its 8-neighbours:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.zeros((5, 5), dtype=np.float32)
                >>> z[2, 2] = 9.0
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> tri = DEM(ds.raster).ruggedness(window=3).read_array()
                >>> bool(tri[2, 2] > 0 and tri[1, 2] > 0 and tri[3, 2] > 0)
                True
        """
        if window < 1:
            raise ValueError(f"window must be >= 1; got {window!r}")
        z = self.values.astype(np.float64, copy=False)
        z_filled = np.where(np.isnan(z), 0.0, z)
        total = np.zeros_like(z_filled)
        count = 0
        half = int(window) // 2
        for dr in range(-half, half + 1):
            for dc in range(-half, half + 1):
                if dr == 0 and dc == 0:
                    continue
                shifted = np.roll(z_filled, shift=(dr, dc), axis=(0, 1))
                total += np.abs(z_filled - shifted)
                count += 1
        out = (total / float(count)).astype(np.float32)
        no_val = float(self.no_data_value[0])
        out = np.where(np.isnan(z), no_val, out)
        return Dataset.create_from_array(
            out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
        )

    def slope(self) -> Dataset:
        """Compute the maximum downhill slope at every cell.

        Calculates slopes in all eight D8 directions via
        `_get_8_direction_slopes` and returns a raster whose cell
        values are the maximum slope across the eight neighbours.

        Returns:
            Dataset: Single-band raster with the same geometry as the
                DEM, containing the maximum slope value per cell.

        See Also:
            Terrain.slope: GDAL-based slope using Horn or
                Zevenbergen-Thorne algorithms.
        """
        slope = self._get_8_direction_slopes()
        max_slope = np.nanmax(slope, axis=2)

        src = self.dataset_like(self, max_slope)
        return src

    def set_outflow(
        self, outflow: GeoDataFrame, direction: int, inplace: bool = False
    ) -> Dataset:
        """Assign a fixed flow direction at the basin outfall cell.

        Args:
            outflow: GeoDataFrame with point geometry marking the
                outfall location.
            direction: D8 direction code (0–7) to force at the outfall.
            inplace: If `True` modify the current instance in place;
                otherwise return a new `Dataset`.

        Returns:
            Dataset with the outfall direction applied, or `None` when
            *inplace* is `True`.

        Raises:
            NotImplementedError: This method is not yet implemented.
        """
        raise NotImplementedError("set_outflow is not yet implemented.")

    def flow_direction(
        self,
        method: str = "d8",
        exponent: float = 1.0,
        forced: GeoDataFrame | None = None,
        seed: int | None = None,
        forced_direction: GeoDataFrame | None = None,
    ) -> FlowDirection:
        """Derive a flow-direction raster from the DEM under one of five routing schemes.

        Schemes:

        * `"d8"` (default) — O'Callaghan & Mark (1984). Single-direction steepest
          descent. Output: 1-band `int32` raster of direction codes 0–7 following
          `DIR_OFFSETS`.
        * `"dinf"` — Tarboton (1997). Output: 2-band `float32` raster. Band 0 is
          the aspect angle in radians CCW from east in `[0, 2π)`; band 1 is the
          slope magnitude along the chosen facet. `-1.0` in band 0 marks sinks /
          no-data.
        * `"mfd_quinn"` — Quinn et al. (1991). Multi-direction with contour-length
          weighting. Output: 8-band `float32` raster of partition fractions,
          ordered by `DIR_OFFSETS`. Per-cell fractions sum to 1.0 (or all zero
          for sinks).
        * `"mfd_holmgren"` — Holmgren (1994). Same family as Quinn but tunable
          `exponent` (default 1.0 mimics Quinn; 4–6 mimics D8). 8-band output.
        * `"rho8"` — Fairfield & Leymarie (1991). Stochastic single-direction;
          cardinal slopes are perturbed before the steepest-neighbour pick. Pass
          `seed` for reproducibility. 1-band `int32` output like D8.

        Args:
            method: Routing scheme — one of `"d8"`, `"dinf"`, `"mfd_quinn"`,
                `"mfd_holmgren"`, `"rho8"`.
            exponent: `p` for `mfd_holmgren` and `mfd_quinn`. Ignored otherwise.
            forced: Optional GeoDataFrame with columns `geometry` (point) and
                `direction` (int 0–7) — cells at the given locations are forced
                to that D8 direction code regardless of the computed slope. Only
                meaningful for `"d8"` and `"rho8"`.
            seed: Random seed for `"rho8"` reproducibility.
            forced_direction: Deprecated alias for `forced`. If both are given,
                `forced` wins.

        Returns:
            FlowDirection: typed wrapper carrying the routing scheme and encoding.

        Raises:
            ValueError: If `method` is unknown.
        """
        if forced is None and forced_direction is not None:
            forced = forced_direction

        valid_methods = {"d8", "dinf", "mfd_quinn", "mfd_holmgren", "rho8"}
        if method not in valid_methods:
            raise ValueError(
                f"method must be one of {sorted(valid_methods)}; got {method!r}"
            )

        elev = self.values
        valid_mask = ~np.isnan(elev)

        if method == "d8":
            slopes = self._get_8_direction_slopes()
            slope_valid = ~np.all(np.isnan(slopes), axis=2)
            valid_cells_mask = valid_mask & slope_valid
            arr = np.full(elev.shape, Dataset.default_no_data_value, dtype=np.int32)
            if valid_cells_mask.any():
                best_dir = np.nanargmax(slopes[valid_cells_mask], axis=1)
                # Only commit a direction where the steepest slope is strictly downhill;
                # cells whose best 8-neighbour is at equal or higher elevation are sinks
                # and stay at the no-data sentinel (spec P5: "max(s_k) ≤ 0 → sink").
                rr, cc = np.where(valid_cells_mask)
                max_slope = slopes[rr, cc, best_dir]
                downhill = max_slope > 0
                arr[rr[downhill], cc[downhill]] = best_dir[downhill]
            if forced is not None:
                indices = self.map_to_array_coordinates(forced)
                for i, ind in enumerate(indices):
                    arr[tuple(ind)] = forced.loc[i, "direction"]
            plain_ds = Dataset.create_from_array(
                arr, geo=self.geotransform, epsg=self.epsg,
                no_data_value=self.default_no_data_value,
            )
            return FlowDirection.from_dataset(plain_ds, routing="d8")

        if method == "rho8":
            slopes = self._get_8_direction_slopes()
            rng = np.random.default_rng(seed)
            arr = _rho8_flow_direction(slopes, valid_mask, rng=rng)
            # Replace -1 (sentinel from rho8 helper) with the dataset no-data value.
            arr[arr < 0] = Dataset.default_no_data_value
            if forced is not None:
                indices = self.map_to_array_coordinates(forced)
                for i, ind in enumerate(indices):
                    arr[tuple(ind)] = forced.loc[i, "direction"]
            plain_ds = Dataset.create_from_array(
                arr.astype(np.int32, copy=False),
                geo=self.geotransform, epsg=self.epsg,
                no_data_value=self.default_no_data_value,
            )
            return FlowDirection.from_dataset(plain_ds, routing="rho8")

        if method == "dinf":
            angle, magnitude = _dinf_flow_direction(elev, self.cell_size)
            stacked = np.stack([angle, magnitude], axis=0).astype(np.float32, copy=False)
            plain_ds = Dataset.create_from_array(
                stacked, geo=self.geotransform, epsg=self.epsg,
                no_data_value=self.default_no_data_value,
            )
            return FlowDirection.from_dataset(plain_ds, routing="dinf")

        # mfd_quinn or mfd_holmgren
        slopes = self._get_8_direction_slopes()
        weighting = "quinn" if method == "mfd_quinn" else "holmgren"
        fractions = _mfd_flow_direction(
            slopes, valid_mask, weighting=weighting, exponent=exponent,
        )
        # Transpose (rows, cols, 8) -> (8, rows, cols) for pyramids's band-first layout.
        bands = np.transpose(fractions, (2, 0, 1)).astype(np.float32, copy=False)
        plain_ds = Dataset.create_from_array(
            bands, geo=self.geotransform, epsg=self.epsg,
            no_data_value=self.default_no_data_value,
        )
        return FlowDirection.from_dataset(plain_ds, routing=method)

    def accumulate_flow(self, r, c, flow_dir, acc, dir_offsets) -> int:
        """Count upstream cells that drain into `(r, c)` (iterative).

        Uses an explicit stack to perform a depth-first traversal of the
        flow-direction grid backwards.  For every neighbour whose flow
        direction points toward the current cell, the neighbour is pushed
        onto the stack.  Results are cached in *acc* so each cell is
        computed at most once.

        Args:
            r: Row index of the target cell.
            c: Column index of the target cell.
            flow_dir: 2-D `int` array of D8 direction codes (0–7).
            acc: 2-D `int32` accumulation array.  Cells initialised to
                `-1` are unprocessed; non-negative values are cached
                results.
            dir_offsets: Direction-offset mapping (see `DIR_OFFSETS`).

        Returns:
            Number of upstream cells that drain into `(r, c)`
            (excluding the cell itself).
        """
        rows, cols = flow_dir.shape

        if not (0 <= r < rows and 0 <= c < cols):
            return 0
        if acc[r, c] >= 0:
            return acc[r, c]

        offsets_list = [
            (d_col, d_row, self.opposite_direction(d_row, d_col, dir_offsets))
            for d_col, d_row in dir_offsets.values()
        ]

        stack = [(r, c, 0, 0)]

        while stack:
            cr, cc, idx, total = stack[-1]

            if acc[cr, cc] >= 0:
                stack.pop()
                if stack:
                    pr, pc, pidx, ptotal = stack[-1]
                    stack[-1] = (pr, pc, pidx, ptotal + acc[cr, cc] + 1)
                continue

            # Advance through remaining neighbours.
            found_unprocessed = False
            while idx < len(offsets_list):
                d_col, d_row, opp = offsets_list[idx]
                idx += 1
                rr, rc = cr + d_row, cc + d_col
                if not (0 <= rr < rows and 0 <= rc < cols):
                    continue
                if flow_dir[rr, rc] != opp:
                    continue
                if opp is None:
                    continue
                # Neighbour already computed — just add its count.
                if acc[rr, rc] >= 0:
                    total += acc[rr, rc] + 1
                    continue
                # Neighbour needs processing — save our state and push it.
                stack[-1] = (cr, cc, idx, total)
                stack.append((rr, rc, 0, 0))
                found_unprocessed = True
                break

            if not found_unprocessed:
                # All neighbours processed — finalise this cell.
                acc[cr, cc] = total
                stack.pop()
                if stack:
                    pr, pc, pidx, ptotal = stack[-1]
                    stack[-1] = (pr, pc, pidx, ptotal + total + 1)

        return acc[r, c]

    @staticmethod
    def opposite_direction(dr, dc, dir_offsets):
        """Return the D8 direction code opposite to the given offset.

        Args:
            dr: Row offset component.
            dc: Column offset component.
            dir_offsets: Direction-offset mapping (see `DIR_OFFSETS`).

        Returns:
            int or None: Direction code whose offset is `(-dr, -dc)`,
            or `None` if no match is found.
        """
        for d, (d_col, d_row) in dir_offsets.items():
            if d_row == -dr and d_col == -dc:
                return d
        return None

    def flow_accumulation(
        self,
        flow_direction,
        weights: Dataset | None = None,
        dir_offsets: dict = None,
    ) -> Dataset:
        """Compute flow accumulation under the given routing scheme.

        Generalised dispatcher that delegates to `FlowDirection.accumulate(...)`
        and returns an `int32` cast for backwards compatibility with the
        previous D8-only output. For weighted or fractional accumulation, call
        `flow_direction.accumulate(weights)` directly to get the underlying
        `Accumulation` (float32) instead.

        Args:
            flow_direction: A `FlowDirection` (preferred — its routing tag
                dispatches the algorithm) or a bare `Dataset` (assumed to be
                a D8 direction-code raster for back-compat).
            weights: Optional per-cell weight raster aligned to the DEM.
            dir_offsets: Deprecated/ignored. Kept for signature compatibility.

        Returns:
            Dataset: `int32` accumulation raster. No-data cells retain
            `Dataset.default_no_data_value`. Cell values are the count of
            (or weighted sum over) strictly-upstream cells — the cell's own
            weight does not contribute to its own value.

        Warns:
            UserWarning: When `flow_direction.routing` produces fractional
                accumulations (`"dinf"`, `"mfd_quinn"`, `"mfd_holmgren"`).
                The legacy `int32` cast truncates these toward zero, which is
                almost always wrong; call `flow_direction.accumulate(...)`
                directly to get the fractional `Accumulation` raster.

        Examples:
            - Compute D8 cell-count accumulation on a small east-flowing DEM
              and inspect the outlet value:

                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.array(
                ...     [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]],
                ...     dtype=np.float32,
                ... )
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> fd = dem.flow_direction(method="d8")
                >>> acc = dem.flow_accumulation(fd)
                >>> int(acc.read_array().max()) > 0
                True

            - A D∞ `FlowDirection` triggers the truncation warning:

                >>> import warnings
                >>> import numpy as np
                >>> from pyramids.dataset import Dataset
                >>> from digitalrivers import DEM
                >>> z = np.array(
                ...     [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]],
                ...     dtype=np.float32,
                ... )
                >>> ds = Dataset.create_from_array(
                ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
                ...     epsg=4326, no_data_value=-9999.0,
                ... )
                >>> dem = DEM(ds.raster)
                >>> fd = dem.flow_direction(method="dinf")
                >>> with warnings.catch_warnings(record=True) as caught:
                ...     warnings.simplefilter("always")
                ...     _ = dem.flow_accumulation(fd)
                >>> any("int32" in str(w.message) for w in caught)
                True
        """
        del dir_offsets  # legacy positional kwarg, no longer used
        import warnings

        if not isinstance(flow_direction, FlowDirection):
            # Wrap a bare Dataset as D8 for back-compat callers.
            flow_direction = FlowDirection.from_dataset(flow_direction, routing="d8")

        if flow_direction.routing not in ("d8", "rho8"):
            warnings.warn(
                f"DEM.flow_accumulation casts to int32 and truncates fractional "
                f"accumulations for routing={flow_direction.routing!r}. Call "
                f"flow_direction.accumulate(...) directly to get the float32 "
                f"Accumulation raster.",
                UserWarning,
                stacklevel=2,
            )

        acc = flow_direction.accumulate(weights=weights)
        arr = acc.read_array().astype(np.int32, copy=False)
        # Restore the dataset no-data sentinel where the original DEM is no-data.
        elev = self.values
        nodata_mask = np.isnan(elev)
        arr[nodata_mask] = Dataset.default_no_data_value
        return Dataset.create_from_array(
            arr,
            geo=self.geotransform,
            epsg=self.epsg,
            no_data_value=self.default_no_data_value,
        )

    def convert_flow_direction_to_cell_indices(self) -> np.ndarray:
        """Convert D8 direction codes to downstream cell row/column indices.

        Computes the flow direction from the DEM and translates each
        direction code into the absolute row and column index of the
        downstream neighbour.

        Returns:
            np.ndarray: 3-D `float64` array of shape
                `(rows, columns, 2)`.  Layer 0 holds the downstream
                row index; layer 1 holds the downstream column index.
                Cells with no valid direction contain `np.nan`.
        """
        flow_direction = self.flow_direction()
        flow_dir = flow_direction.read_array(band=0).astype(np.float32)
        no_val = flow_direction.no_data_value[0]
        flow_dir[np.isclose(flow_dir, no_val, rtol=0.00001)] = np.nan

        rows, cols = flow_dir.shape
        valid = ~np.isnan(flow_dir)

        # Build lookup arrays from DIR_OFFSETS (index 0 = first tuple
        # element, index 1 = second tuple element, matching the
        # original loop: cell[i,j,0] = i + offset[0]).
        offset_0 = np.array([DIR_OFFSETS[d][0] for d in range(8)], dtype=np.float64)
        offset_1 = np.array([DIR_OFFSETS[d][1] for d in range(8)], dtype=np.float64)

        flow_direction_cell = np.full((rows, cols, 2), np.nan, dtype=np.float64)

        dir_idx = flow_dir[valid].astype(int)
        row_idx, col_idx = np.where(valid)
        flow_direction_cell[valid, 0] = row_idx + offset_0[dir_idx]
        flow_direction_cell[valid, 1] = col_idx + offset_1[dir_idx]

        return flow_direction_cell


    @staticmethod
    def delete_basins(basins: Dataset, path: str):
        """Keep only the basin with the lowest ID and discard the rest.

        Reads a basin-ID raster produced during catchment delineation,
        replaces every cell that does not belong to the lowest basin ID
        with the no-data value, and writes the result to *path*.

        Args:
            basins: Dataset whose cell values are basin IDs (integers).
                The lowest unique basin ID (excluding no-data) is
                retained.
            path: Output GeoTIFF file path (must end with `".tif"`).

        Raises:
            TypeError: If *path* is not a string.
        """
        if not isinstance(path, str):
            raise TypeError(f"path: {path} input should be string type")

        basins_a = basins.read_array()
        no_val = np.float32(basins.no_data_value[0])

        valid_mask = basins_a != no_val
        unique_basins = np.unique(basins_a[valid_mask]).astype(int)

        if len(unique_basins) > 0:
            keep = unique_basins[0]
            basins_a[valid_mask & (basins_a != keep)] = no_val

        Dataset.dataset_like(basins, basins_a, path)

values property #

Elevation array with no-data cells replaced by np.nan.

Reads band 0 as float32 and masks every cell whose value is close to the raster's no-data value (relative tolerance 1e-5).

Returns:

Type Description

np.ndarray: 2-D float32 array of shape (rows, columns).

fill_depressions(method='priority_flood', epsilon=0.0, inplace=False) #

Fill closed depressions in the DEM.

Three algorithms are available via the method argument:

  • "priority_flood" (default) — Barnes, Lehman & Mulla (2014) Priority-Flood with the two-queue plateau optimisation. With epsilon == 0 it produces flat fills; with epsilon > 0 it produces a strictly monotonic surface (every cell has at least one strictly lower neighbour along the flood path) at the cost of a small elevation inflation proportional to plateau width.
  • "wang_liu" — Wang & Liu (2006). Flat fill, no epsilon. Equivalent in output to priority_flood with epsilon == 0; kept as a named alternative for callers who plan to resolve flats explicitly afterwards (P4).
  • "planchon_darboux" — Planchon & Darboux (2002). Iterative directional-sweep algorithm. Slower than Priority-Flood on large DEMs; kept as a low-relief reference. Requires epsilon > 0.

No-data handling is uniform across methods: cells flagged no-data act as outlets (they cannot be filled, and data cells adjacent to them are seeded as drainage sources alongside the true raster boundary).

Precision note. Priority-flood / planchon-darboux compute the cumulative lift in float64 but the output is cast back to the input dtype. For float32 DEMs with epsilon in the 0.1-class on wide plateaus, the accumulated lift can approach float32's relative precision near the spill elevation and very long plateaus may underflow to identical filled values. Prefer float64 inputs when running with epsilon > 0 and large depressions; wang_liu / epsilon=0 are immune.

Parameters:

Name Type Description Default
method str

One of "priority_flood", "wang_liu", "planchon_darboux".

'priority_flood'
epsilon float

Per-step elevation lift inside depressions. 0.0 (default for priority_flood) returns a non-strictly-decreasing surface — flats remain flat. Positive values guarantee a unique downhill path at the cost of slight elevation inflation. planchon_darboux requires epsilon > 0.

0.0
inplace bool

If True the current instance is updated in place and None is returned. If False (default) a new DEM is returned.

False

Returns:

Type Description
DEM | None

DEM | None: A new DEM containing the filled elevation, or None when

DEM | None

inplace is True.

Raises:

Type Description
ValueError

If method is unknown, or planchon_darboux is requested with epsilon <= 0.

Source code in src/digitalrivers/dem.py
def fill_depressions(
    self,
    method: str = "priority_flood",
    epsilon: float = 0.0,
    inplace: bool = False,
) -> DEM | None:
    """Fill closed depressions in the DEM.

    Three algorithms are available via the `method` argument:

    * `"priority_flood"` (default) — Barnes, Lehman & Mulla (2014) Priority-Flood
      with the two-queue plateau optimisation. With `epsilon == 0` it produces flat
      fills; with `epsilon > 0` it produces a strictly monotonic surface (every cell
      has at least one strictly lower neighbour along the flood path) at the cost of
      a small elevation inflation proportional to plateau width.
    * `"wang_liu"` — Wang & Liu (2006). Flat fill, no epsilon. Equivalent in output
      to `priority_flood` with `epsilon == 0`; kept as a named alternative for
      callers who plan to resolve flats explicitly afterwards (P4).
    * `"planchon_darboux"` — Planchon & Darboux (2002). Iterative directional-sweep
      algorithm. Slower than Priority-Flood on large DEMs; kept as a low-relief
      reference. Requires `epsilon > 0`.

    No-data handling is uniform across methods: cells flagged no-data act as outlets
    (they cannot be filled, and data cells adjacent to them are seeded as drainage
    sources alongside the true raster boundary).

    **Precision note.** Priority-flood / planchon-darboux compute the cumulative lift
    in float64 but the output is cast back to the input dtype. For `float32` DEMs
    with `epsilon` in the `0.1`-class on wide plateaus, the accumulated lift can
    approach float32's relative precision near the spill elevation and very long
    plateaus may underflow to identical filled values. Prefer `float64` inputs
    when running with `epsilon > 0` and large depressions; `wang_liu` /
    `epsilon=0` are immune.

    Args:
        method: One of `"priority_flood"`, `"wang_liu"`, `"planchon_darboux"`.
        epsilon: Per-step elevation lift inside depressions. `0.0` (default for
            `priority_flood`) returns a non-strictly-decreasing surface — flats
            remain flat. Positive values guarantee a unique downhill path at the
            cost of slight elevation inflation. `planchon_darboux` requires
            `epsilon > 0`.
        inplace: If `True` the current instance is updated in place and `None`
            is returned. If `False` (default) a new `DEM` is returned.

    Returns:
        DEM | None: A new `DEM` containing the filled elevation, or `None` when
        `inplace` is `True`.

    Raises:
        ValueError: If `method` is unknown, or `planchon_darboux` is requested
            with `epsilon <= 0`.
    """
    elev = self.values
    nodata_mask = np.isnan(elev)
    z_fill = _fill_depressions_array(
        elev.astype(np.float64, copy=False),
        nodata_mask=nodata_mask,
        method=method,
        epsilon=epsilon,
    )
    # Restore the original raster's no-data sentinel (the array carries NaN; the
    # GeoTIFF needs the numeric sentinel).
    no_val = self.no_data_value[0]
    z_fill[nodata_mask] = no_val

    # Build a plain Dataset (cls=Dataset so we don't get a DEM via cls(...)), then
    # wrap with the typed DEM. This mirrors the pattern used in flow_direction().
    plain_ds = Dataset.dataset_like(self, z_fill.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

breach_depressions(method='least_cost', max_depth=None, max_length=None, fill_remaining=True, inplace=False) #

Breach depressions in the DEM (Lindsay 2016 family).

Breaching is the structural alternative to filling: instead of raising the pit floor, it cuts a channel through the lowest barrier between the pit and an outlet. On LiDAR DEMs this is usually more realistic — most internal pits are data artefacts and the natural drainage path is preserved by cutting the artefact away rather than inflating the surrounding terrain.

Three methods are available via the method argument:

  • "single_cell" — cheap O(n) preprocessing pass that resolves isolated 1-cell pits by lowering an intermediate first-order neighbour to the midpoint of the pit and a lower second-order cell. Does nothing if no such configuration exists.
  • "least_cost" (default) — Lindsay 2016 Dijkstra-from-each-pit. Carves a strictly monotonic channel from the pit to the nearest outlet. Optional max_depth and max_length constraints abort the breach for any pit whose channel would exceed them; aborted pits are left unresolved.
  • "hybrid" — try least_cost first; pits that fail their constraint fall back to the Priority-Flood depression fill (P2). The breach phase has already lowered parts of the DEM where partial breaching occurred, so the fill operates on a modified surface and produces less overall lift than fill-only.

No-data cells act as free outlets — any Dijkstra path that reaches a no-data cell terminates the search.

Parameters:

Name Type Description Default
method str

One of "single_cell", "least_cost", "hybrid".

'least_cost'
max_depth float | None

Maximum cumulative |Δz| for a single breach path. None disables the constraint.

None
max_length int | None

Maximum path length in cells. None disables.

None
fill_remaining bool

Only meaningful when method="hybrid". If True (default), unresolved pits are passed to Priority-Flood with epsilon=0. If False, they are left as pits in the output.

True
inplace bool

If True the current instance is updated in place and None is returned. If False (default) a new DEM is returned.

False

Returns:

Type Description
DEM | None

DEM | None: A new DEM containing the breached elevation, or None when

DEM | None

inplace is True.

Raises:

Type Description
ValueError

If method is unknown.

Source code in src/digitalrivers/dem.py
def breach_depressions(
    self,
    method: str = "least_cost",
    max_depth: float | None = None,
    max_length: int | None = None,
    fill_remaining: bool = True,
    inplace: bool = False,
) -> DEM | None:
    """Breach depressions in the DEM (Lindsay 2016 family).

    Breaching is the structural alternative to filling: instead of raising the pit
    floor, it cuts a channel through the lowest barrier between the pit and an
    outlet. On LiDAR DEMs this is usually more realistic — most internal pits are
    data artefacts and the natural drainage path is preserved by cutting the artefact
    away rather than inflating the surrounding terrain.

    Three methods are available via the `method` argument:

    * `"single_cell"` — cheap O(n) preprocessing pass that resolves isolated 1-cell
      pits by lowering an intermediate first-order neighbour to the midpoint of the
      pit and a lower second-order cell. Does nothing if no such configuration exists.
    * `"least_cost"` (default) — Lindsay 2016 Dijkstra-from-each-pit. Carves a
      strictly monotonic channel from the pit to the nearest outlet. Optional
      `max_depth` and `max_length` constraints abort the breach for any pit whose
      channel would exceed them; aborted pits are left unresolved.
    * `"hybrid"` — try `least_cost` first; pits that fail their constraint fall
      back to the Priority-Flood depression fill (P2). The breach phase has already
      lowered parts of the DEM where partial breaching occurred, so the fill operates
      on a modified surface and produces less overall lift than fill-only.

    No-data cells act as free outlets — any Dijkstra path that reaches a no-data cell
    terminates the search.

    Args:
        method: One of `"single_cell"`, `"least_cost"`, `"hybrid"`.
        max_depth: Maximum cumulative `|Δz|` for a single breach path. `None`
            disables the constraint.
        max_length: Maximum path length in cells. `None` disables.
        fill_remaining: Only meaningful when `method="hybrid"`. If `True`
            (default), unresolved pits are passed to Priority-Flood with
            `epsilon=0`. If `False`, they are left as pits in the output.
        inplace: If `True` the current instance is updated in place and `None` is
            returned. If `False` (default) a new `DEM` is returned.

    Returns:
        DEM | None: A new `DEM` containing the breached elevation, or `None` when
        `inplace` is `True`.

    Raises:
        ValueError: If `method` is unknown.
    """
    elev = self.values
    nodata_mask = np.isnan(elev)
    z_out = _breach_depressions_array(
        elev.astype(np.float64, copy=False),
        nodata_mask=nodata_mask,
        method=method,
        max_depth=max_depth,
        max_length=max_length,
        fill_remaining=fill_remaining,
    )
    no_val = self.no_data_value[0]
    z_out[np.isnan(z_out)] = no_val
    plain_ds = Dataset.dataset_like(self, z_out.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

resolve_flats(max_iter=1000, epsilon=1e-05, connectivity=8, inplace=False) #

Impose a deterministic gradient on every flat plateau in the DEM.

After fill_depressions(method="wang_liu") (or "priority_flood" with epsilon=0), every closed depression is filled to its spill elevation — but the interior of each filled depression is a flat plateau with no defined steepest descent, so D8 flow direction over the result has NO_FLOW cells across every plateau. resolve_flats nudges those cells so each has a unique downhill neighbour: combined Garbrecht & Martz (1997) gradient — drain towards the nearest outlet (LEC) with a tiebreak that drains away from the nearest rim (HEC). The towards-lower gradient is weighted 2x so it dominates and the away-from-higher gradient acts as a deterministic tiebreaker.

Plateaus without a low-edge cell (closed depressions that survived the fill — they should not exist if you ran fill_depressions first) are left untouched.

Parameters:

Name Type Description Default
max_iter int

Safety cap on BFS levels per plateau. Real plateaus rarely exceed max(rows, cols); the default 1000 is essentially unbounded.

1000
epsilon float

Per-BFS-step elevation lift. Total lift over a plateau is at most (2 * max_high_dist + max_low_dist) * epsilon; choose small enough that this stays well below the minimum elevation step between adjacent non-plateau cells. Default 1e-5 is safe for ~1000-cell-wide plateaus.

1e-05
connectivity int

4 or 8. Controls plateau-labelling and BFS step direction; LEC/HEC classification always uses 8-connectivity (Garbrecht-Martz convention). Default is 8.

8
inplace bool

If True the current instance is updated in place and None is returned. If False (default) a new DEM is returned.

False

Returns:

Type Description
DEM | None

DEM | None: A new DEM with flat plateaus resolved, or None when

DEM | None

inplace is True.

Raises:

Type Description
ValueError

If connectivity is not 4 or 8.

Source code in src/digitalrivers/dem.py
def resolve_flats(
    self,
    max_iter: int = 1000,
    epsilon: float = 1e-5,
    connectivity: int = 8,
    inplace: bool = False,
) -> DEM | None:
    """Impose a deterministic gradient on every flat plateau in the DEM.

    After `fill_depressions(method="wang_liu")` (or `"priority_flood"` with
    `epsilon=0`), every closed depression is filled to its spill elevation — but the
    interior of each filled depression is a flat plateau with no defined steepest
    descent, so D8 flow direction over the result has `NO_FLOW` cells across every
    plateau. `resolve_flats` nudges those cells so each has a unique downhill
    neighbour: combined Garbrecht & Martz (1997) gradient — drain *towards* the
    nearest outlet (LEC) with a tiebreak that drains *away from* the nearest rim
    (HEC). The towards-lower gradient is weighted `2x` so it dominates and the
    away-from-higher gradient acts as a deterministic tiebreaker.

    Plateaus without a low-edge cell (closed depressions that survived the fill — they
    should not exist if you ran `fill_depressions` first) are left untouched.

    Args:
        max_iter: Safety cap on BFS levels per plateau. Real plateaus rarely exceed
            `max(rows, cols)`; the default `1000` is essentially unbounded.
        epsilon: Per-BFS-step elevation lift. Total lift over a plateau is at most
            `(2 * max_high_dist + max_low_dist) * epsilon`; choose small enough
            that this stays well below the minimum elevation step between adjacent
            non-plateau cells. Default `1e-5` is safe for ~1000-cell-wide plateaus.
        connectivity: 4 or 8. Controls plateau-labelling and BFS step direction;
            LEC/HEC classification always uses 8-connectivity (Garbrecht-Martz
            convention). Default is 8.
        inplace: If `True` the current instance is updated in place and `None` is
            returned. If `False` (default) a new `DEM` is returned.

    Returns:
        DEM | None: A new `DEM` with flat plateaus resolved, or `None` when
        `inplace` is `True`.

    Raises:
        ValueError: If `connectivity` is not 4 or 8.
    """
    elev = self.values
    nodata_mask = np.isnan(elev)
    z_out = _resolve_flats_array(
        elev.astype(np.float64, copy=False),
        nodata_mask=nodata_mask,
        epsilon=epsilon,
        connectivity=connectivity,
        max_iter=max_iter,
    )
    no_val = self.no_data_value[0]
    z_out[np.isnan(z_out)] = no_val
    plain_ds = Dataset.dataset_like(self, z_out.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

hand(streams, flow_direction=None, *, method='d8') #

Compute Height Above Nearest Drainage (Rennó 2008 / Nobre 2011).

Two methods are supported:

  • "d8" (default) — follows the D8 / Rho8 flow-direction raster downstream from every cell until it reaches a stream cell, and assigns elev[cell] - elev[stream_cell]. Orphans / sinks / no-data cells whose flow path never reaches a stream are NaN.
  • "euclidean" — for every cell, the nearest stream cell in 2-D space (Euclidean distance) is used as the reference. Cheaper than D8-HAND because there is no path tracing, but it does the wrong thing across ridges (a cell can be 2-D-closer to a stream in a different basin). Requires scipy.ndimage.

Parameters:

Name Type Description Default
streams

StreamRaster aligned to this DEM. Only the underlying stream mask is read.

required
flow_direction

Single-direction FlowDirection (d8 / rho8) aligned to this DEM. Required for method="d8"; ignored for method="euclidean".

None
method str

"d8" (default) or "euclidean".

'd8'

Returns:

Type Description
Dataset

Dataset containing the float32 HAND raster. No-data cells use

Dataset

this DEM's no-data sentinel.

Raises:

Type Description
ValueError

If method is unknown, shapes do not match, or flow_direction is missing / multi-direction for the D8 method.

Source code in src/digitalrivers/dem.py
def hand(
    self,
    streams,
    flow_direction=None,
    *,
    method: str = "d8",
) -> Dataset:
    """Compute Height Above Nearest Drainage (Rennó 2008 / Nobre 2011).

    Two methods are supported:

    * **`"d8"` (default)** — follows the D8 / Rho8 flow-direction raster
      downstream from every cell until it reaches a stream cell, and
      assigns `elev[cell] - elev[stream_cell]`. Orphans / sinks / no-data
      cells whose flow path never reaches a stream are NaN.
    * **`"euclidean"`** — for every cell, the nearest stream cell in 2-D
      space (Euclidean distance) is used as the reference. Cheaper than
      D8-HAND because there is no path tracing, but it does the wrong
      thing across ridges (a cell can be 2-D-closer to a stream in a
      different basin). Requires `scipy.ndimage`.

    Args:
        streams: `StreamRaster` aligned to this DEM. Only the underlying
            stream mask is read.
        flow_direction: Single-direction `FlowDirection` (`d8` /
            `rho8`) aligned to this DEM. Required for `method="d8"`;
            ignored for `method="euclidean"`.
        method: `"d8"` (default) or `"euclidean"`.

    Returns:
        `Dataset` containing the float32 HAND raster. No-data cells use
        this DEM's no-data sentinel.

    Raises:
        ValueError: If `method` is unknown, shapes do not match, or
            `flow_direction` is missing / multi-direction for the D8
            method.
    """
    from digitalrivers.stream_raster import StreamRaster

    if method not in ("d8", "euclidean"):
        raise ValueError(
            f"method must be 'd8' or 'euclidean'; got {method!r}"
        )
    if not isinstance(streams, StreamRaster):
        raise ValueError("streams must be a StreamRaster instance")

    if method == "d8":
        return self._hand_d8(streams, flow_direction)
    return self._hand_euclidean(streams)

full_hydro_pipeline(*, fill_method='priority_flood', flow_method='d8', stream_threshold_cells=None) #

Composite: fill → flow_direction → accumulate (→ optional streams).

Convenience entry point that chains the four most common steps of a DEM-hydrology pre-processing pipeline. Equivalent to:

filled = dem.fill_depressions(method=fill_method)
fdir = filled.flow_direction(method=flow_method)
acc = fdir.accumulate()
streams = acc.streams(threshold=stream_threshold_cells)  # if provided

Parameters:

Name Type Description Default
fill_method str

Argument forwarded to fill_depressions. Defaults to "priority_flood".

'priority_flood'
flow_method str

Argument forwarded to flow_direction. Defaults to "d8".

'd8'
stream_threshold_cells int | None

Optional accumulation threshold (in cells). When supplied, a StreamRaster is also returned in the result dict under the "streams" key. When None, the streams step is skipped.

None

Returns:

Type Description
dict

dict with keys "filled_dem" (DEM), "flow_direction"

dict

(FlowDirection), and "accumulation" (Accumulation); plus an

dict

optional "streams" (StreamRaster) when

dict

stream_threshold_cells is supplied.

Source code in src/digitalrivers/dem.py
def full_hydro_pipeline(
    self,
    *,
    fill_method: str = "priority_flood",
    flow_method: str = "d8",
    stream_threshold_cells: int | None = None,
) -> dict:
    """Composite: fill → flow_direction → accumulate (→ optional streams).

    Convenience entry point that chains the four most common steps of a
    DEM-hydrology pre-processing pipeline. Equivalent to:

    ```python
    filled = dem.fill_depressions(method=fill_method)
    fdir = filled.flow_direction(method=flow_method)
    acc = fdir.accumulate()
    streams = acc.streams(threshold=stream_threshold_cells)  # if provided
    ```

    Args:
        fill_method: Argument forwarded to `fill_depressions`. Defaults
            to `"priority_flood"`.
        flow_method: Argument forwarded to `flow_direction`. Defaults to
            `"d8"`.
        stream_threshold_cells: Optional accumulation threshold (in
            cells). When supplied, a `StreamRaster` is also returned in
            the result dict under the `"streams"` key. When None, the
            streams step is skipped.

    Returns:
        `dict` with keys `"filled_dem"` (DEM), `"flow_direction"`
        (FlowDirection), and `"accumulation"` (Accumulation); plus an
        optional `"streams"` (StreamRaster) when
        `stream_threshold_cells` is supplied.
    """
    filled = self.fill_depressions(method=fill_method)
    fdir = filled.flow_direction(method=flow_method)
    acc = fdir.accumulate()
    out: dict = {
        "filled_dem": filled,
        "flow_direction": fdir,
        "accumulation": acc,
    }
    if stream_threshold_cells is not None:
        out["streams"] = acc.streams(threshold=stream_threshold_cells)
    return out

stochastic_depressions(sigma, n_runs=100, *, seed=None, method='priority_flood') #

Per-cell depression-occurrence probability via Monte-Carlo.

Adds Gaussian noise (zero-mean, supplied sigma) to the DEM, runs depression detection on each noisy realisation, and aggregates the per-cell probability across n_runs realisations. Cells with high probability are robust depressions; low probability cells are likely noise artefacts of a noisy DEM.

Parameters:

Name Type Description Default
sigma float

Standard deviation of the Gaussian noise in DEM elevation units. Must be non-negative. A reasonable choice is the DEM's stated vertical error.

required
n_runs int

Number of Monte-Carlo realisations. Must be positive. Defaults to 100.

100
seed int | None

Optional seed for the random number generator. Pass an integer for reproducible results.

None
method str

Fill-depressions algorithm passed through to fill_depressions for each realisation. Defaults to "priority_flood".

'priority_flood'

Returns:

Type Description
Dataset

Dataset of float32 occurrence probabilities in [0.0, 1.0],

Dataset

aligned to this DEM. No-data sentinel -1.0.

Raises:

Type Description
ValueError

If sigma is negative or n_runs is not positive.

Source code in src/digitalrivers/dem.py
def stochastic_depressions(
    self,
    sigma: float,
    n_runs: int = 100,
    *,
    seed: int | None = None,
    method: str = "priority_flood",
) -> Dataset:
    """Per-cell depression-occurrence probability via Monte-Carlo.

    Adds Gaussian noise (zero-mean, supplied `sigma`) to the DEM, runs
    depression detection on each noisy realisation, and aggregates the
    per-cell probability across `n_runs` realisations. Cells with high
    probability are robust depressions; low probability cells are likely
    noise artefacts of a noisy DEM.

    Args:
        sigma: Standard deviation of the Gaussian noise in DEM elevation
            units. Must be non-negative. A reasonable choice is the DEM's
            stated vertical error.
        n_runs: Number of Monte-Carlo realisations. Must be positive.
            Defaults to 100.
        seed: Optional seed for the random number generator. Pass an
            integer for reproducible results.
        method: Fill-depressions algorithm passed through to
            `fill_depressions` for each realisation. Defaults to
            `"priority_flood"`.

    Returns:
        `Dataset` of float32 occurrence probabilities in `[0.0, 1.0]`,
        aligned to this DEM. No-data sentinel `-1.0`.

    Raises:
        ValueError: If `sigma` is negative or `n_runs` is not positive.
    """
    if sigma < 0:
        raise ValueError(f"sigma must be non-negative; got {sigma!r}")
    if n_runs <= 0:
        raise ValueError(f"n_runs must be positive; got {n_runs!r}")

    from digitalrivers._conditioning.pitremoval import (
        fill_depressions as _fill_depressions_kernel,
    )

    rng = np.random.default_rng(seed)
    elev = self.values
    # Pre-compute the no-data mask once — it doesn't change between
    # realisations since the noise only perturbs valid cells.
    nodata_mask = np.isnan(elev)
    prob = np.zeros(elev.shape, dtype=np.float32)
    for _ in range(int(n_runs)):
        noise = rng.normal(0.0, sigma, size=elev.shape).astype(
            elev.dtype, copy=False
        )
        noisy = elev + noise
        # Call the kernel directly — no GDAL Dataset wrapping inside the
        # Monte-Carlo loop. The kernel accepts a plain ndarray and an
        # optional nodata_mask.
        filled = _fill_depressions_kernel(
            noisy, nodata_mask=nodata_mask, method=method,
        )
        depr = (filled - noisy) > 0
        prob += depr.astype(np.float32)
    prob /= float(n_runs)
    return Dataset.create_from_array(
        prob,
        geo=self.geotransform,
        epsg=self.epsg,
        no_data_value=-1.0,
    )

burn_streams(streams, method='fill_burn', *, sharp=10.0, smooth=2.0, buffer_cells=5, constant_drop=1.0, max_breach_depth=None, max_breach_length=None, inplace=False) #

Condition the DEM by burning a vector stream network into it.

Three methods are specified by P20; this implementation ships "fill_burn" (Lindsay 2018 — used by WhiteboxTools' FillBurn) as the default. "agree" (Hellweger 1997) and "topological_breach" (Lindsay 2016) raise NotImplementedError.

Fill-burn algorithm:

  1. Rasterise every LineString in streams onto a stream mask.
  2. Lower every stream cell's elevation by constant_drop.
  3. Run fill_depressions(method="priority_flood") so the surrounding cells drain naturally into the channel.

Parameters:

Name Type Description Default
streams

GeoDataFrame of LineString geometries.

required
method str

"fill_burn" (default); "agree" and "topological_breach" raise NotImplementedError.

'fill_burn'
sharp / smooth / buffer_cells

AGREE parameters (unused for fill_burn).

required
constant_drop float

Elevation drop applied to every stream cell in fill_burn (default 1.0 map unit).

1.0
max_breach_depth / max_breach_length

topological_breach parameters (unused for fill_burn).

required
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: New DEM with the conditioned surface, or None when

DEM | None

inplace=True.

Examples:

  • Fill-burn lowers the stream-row of a flat DEM:

    import numpy as np import geopandas as gpd from shapely.geometry import LineString from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) dem = DEM(ds.raster)

    Horizontal stream down row 2 (y = -2.5).#

    line = LineString([(0.5, -2.5), (4.5, -2.5)]) layer = gpd.GeoDataFrame(geometry=[line], crs=4326) burnt = dem.burn_streams(layer, constant_drop=2.0) bool(float(burnt.values[2, 2]) < float(burnt.values[0, 0])) True

Source code in src/digitalrivers/dem.py
def burn_streams(
    self,
    streams,
    method: str = "fill_burn",
    *,
    sharp: float = 10.0,
    smooth: float = 2.0,
    buffer_cells: int = 5,
    constant_drop: float = 1.0,
    max_breach_depth: float | None = None,
    max_breach_length: int | None = None,
    inplace: bool = False,
) -> DEM | None:
    """Condition the DEM by burning a vector stream network into it.

    Three methods are specified by P20; this implementation ships
    `"fill_burn"` (Lindsay 2018 — used by WhiteboxTools' FillBurn) as
    the default. `"agree"` (Hellweger 1997) and
    `"topological_breach"` (Lindsay 2016) raise `NotImplementedError`.

    Fill-burn algorithm:

    1. Rasterise every LineString in `streams` onto a stream mask.
    2. Lower every stream cell's elevation by `constant_drop`.
    3. Run `fill_depressions(method="priority_flood")` so the
       surrounding cells drain naturally into the channel.

    Args:
        streams: `GeoDataFrame` of LineString geometries.
        method: `"fill_burn"` (default); `"agree"` and
            `"topological_breach"` raise `NotImplementedError`.
        sharp / smooth / buffer_cells: AGREE parameters (unused for
            fill_burn).
        constant_drop: Elevation drop applied to every stream cell
            in fill_burn (default 1.0 map unit).
        max_breach_depth / max_breach_length: topological_breach
            parameters (unused for fill_burn).
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: New DEM with the conditioned surface, or None when
        `inplace=True`.

    Examples:
        - Fill-burn lowers the stream-row of a flat DEM:

            >>> import numpy as np
            >>> import geopandas as gpd
            >>> from shapely.geometry import LineString
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> dem = DEM(ds.raster)
            >>> # Horizontal stream down row 2 (y = -2.5).
            >>> line = LineString([(0.5, -2.5), (4.5, -2.5)])
            >>> layer = gpd.GeoDataFrame(geometry=[line], crs=4326)
            >>> burnt = dem.burn_streams(layer, constant_drop=2.0)
            >>> bool(float(burnt.values[2, 2]) < float(burnt.values[0, 0]))
            True
    """
    if method == "agree":
        # Rasterise the stream lines, then apply a gradient buffer:
        # stream cells drop by `sharp`, buffer cells drop linearly from
        # `sharp` at the stream to `0` at buffer_cells radius. The
        # cumulative drop is then offset by `smooth` so the buffer
        # perimeter sits `smooth` units lower than the original DEM.
        elev = self.values
        rows, cols = elev.shape
        gt = self.geotransform
        stream_mask = np.zeros((rows, cols), dtype=bool)
        streams = _reproject_if_needed(streams, self.epsg)
        for geom in streams.geometry:
            if geom is None or geom.is_empty:
                continue
            if geom.geom_type == "MultiLineString":
                for sub in geom.geoms:
                    self._rasterise_line(sub, stream_mask, gt)
            else:
                self._rasterise_line(geom, stream_mask, gt)
        # Distance-from-stream within the buffer (cell-step BFS).
        dist = np.full((rows, cols), np.inf, dtype=np.float64)
        dist[stream_mask] = 0.0
        from collections import deque
        frontier: deque[tuple[int, int, int]] = deque(
            (int(r), int(c), 0) for r, c in zip(*np.where(stream_mask))
        )
        while frontier:
            r, c, d = frontier.popleft()
            if d >= buffer_cells:
                continue
            for dr in (-1, 0, 1):
                for dc in (-1, 0, 1):
                    if dr == 0 and dc == 0:
                        continue
                    nr = r + dr
                    nc = c + dc
                    if not (0 <= nr < rows and 0 <= nc < cols):
                        continue
                    if dist[nr, nc] > d + 1:
                        dist[nr, nc] = d + 1
                        frontier.append((nr, nc, d + 1))
        z = elev.astype(np.float64, copy=True)
        within_buffer = dist <= buffer_cells
        # Linear gradient: sharp at stream (dist=0) → 0 at perimeter.
        drop = np.where(
            within_buffer,
            sharp * (1.0 - dist / max(buffer_cells, 1)) + smooth,
            0.0,
        )
        z = z - drop
        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(
            self, z.astype(elev.dtype, copy=False)
        )
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    if method == "topological_breach":
        # Lindsay 2016: rasterise the stream network onto the DEM
        # (like fill_burn but without the final priority-flood), then
        # invoke the Phase 1 least-cost breach engine so every internal
        # pit Dijkstras outward toward a stream cell. The burned stream
        # cells sit max_breach_depth-or-equivalent below their
        # surroundings, so the breach paths follow the vector topology
        # by construction.
        elev = self.values
        rows, cols = elev.shape
        gt = self.geotransform
        stream_mask = np.zeros((rows, cols), dtype=bool)
        streams = _reproject_if_needed(streams, self.epsg)
        for geom in streams.geometry:
            if geom is None or geom.is_empty:
                continue
            if geom.geom_type == "MultiLineString":
                for sub in geom.geoms:
                    self._rasterise_line(sub, stream_mask, gt)
            else:
                self._rasterise_line(geom, stream_mask, gt)
        z = elev.astype(np.float64, copy=True)
        z[stream_mask] = z[stream_mask] - constant_drop
        nodata_mask = np.isnan(z)
        z = _breach_depressions_array(
            z, nodata_mask=nodata_mask, method="hybrid",
            max_depth=max_breach_depth,
            max_length=max_breach_length,
            fill_remaining=True,
        )
        no_val = self.no_data_value[0]
        z[np.isnan(z)] = no_val
        plain_ds = Dataset.dataset_like(
            self, z.astype(elev.dtype, copy=False)
        )
        if inplace:
            self._update_inplace(plain_ds.raster)
            return None
        return DEM(plain_ds.raster)

    if method != "fill_burn":
        raise NotImplementedError(
            f"method={method!r} not yet implemented (supported: "
            "'fill_burn', 'agree', 'topological_breach')"
        )

    elev = self.values
    rows, cols = elev.shape
    gt = self.geotransform
    x0, dx, _, y0, _, dy = gt
    stream_mask = np.zeros((rows, cols), dtype=bool)

    streams = _reproject_if_needed(streams, self.epsg)

    for geom in streams.geometry:
        if geom is None or geom.is_empty:
            continue
        if geom.geom_type == "MultiLineString":
            for sub in geom.geoms:
                self._rasterise_line(sub, stream_mask, gt)
        else:
            # Delegate to the shared 2× oversampled floor-rasteriser so
            # burn_streams, enforce_breaklines, and enforce_culverts all
            # snap line samples to cells identically (I1 fix).
            self._rasterise_line(geom, stream_mask, gt)

    z = elev.astype(np.float64, copy=True)
    z[stream_mask] = z[stream_mask] - constant_drop
    nodata_mask = np.isnan(z)
    z = _fill_depressions_array(
        z, nodata_mask=nodata_mask, method="priority_flood", epsilon=0.0,
    )
    no_val = self.no_data_value[0]
    z[np.isnan(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

enforce_culverts(roads, streams, culvert_drop=0.5, inplace=False) #

Lower DEM cells at every stream-road intersection by culvert_drop so subsequent flow routing crosses roads instead of dead-ending against them. Simplified version of WhiteboxTools' BurnStreamsAtRoads.

Parameters:

Name Type Description Default
roads

GeoDataFrame of LineString road geometries.

required
streams

GeoDataFrame of LineString stream geometries.

required
culvert_drop float

Elevation drop applied to each intersection cell.

0.5
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: New DEM with culverts enforced, or None when

DEM | None

inplace=True.

Source code in src/digitalrivers/dem.py
def enforce_culverts(
    self,
    roads,
    streams,
    culvert_drop: float = 0.5,
    inplace: bool = False,
) -> DEM | None:
    """Lower DEM cells at every stream-road intersection by
    `culvert_drop` so subsequent flow routing crosses roads instead of
    dead-ending against them. Simplified version of WhiteboxTools'
    `BurnStreamsAtRoads`.

    Args:
        roads: `GeoDataFrame` of LineString road geometries.
        streams: `GeoDataFrame` of LineString stream geometries.
        culvert_drop: Elevation drop applied to each intersection cell.
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: New DEM with culverts enforced, or None when
        `inplace=True`.
    """
    elev = self.values
    rows, cols = elev.shape
    gt = self.geotransform
    road_mask = np.zeros((rows, cols), dtype=bool)
    stream_mask = np.zeros((rows, cols), dtype=bool)
    for layer, mask in ((roads, road_mask), (streams, stream_mask)):
        layer = _reproject_if_needed(layer, self.epsg)
        for geom in layer.geometry:
            if geom is None or geom.is_empty:
                continue
            if geom.geom_type == "MultiLineString":
                for sub in geom.geoms:
                    self._rasterise_line(sub, mask, gt)
            else:
                self._rasterise_line(geom, mask, gt)

    crossings = road_mask & stream_mask
    z = elev.astype(np.float64, copy=True)
    z[crossings] = z[crossings] - culvert_drop
    no_val = self.no_data_value[0]
    z[np.isnan(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

hydroflatten(water_polygons, method='min', inplace=False) #

Flatten lake / pond surfaces to a single elevation per polygon.

For each input polygon, sample the DEM cells the polygon covers and assign every cell in the polygon the per-polygon statistic ("min" by default — the most defensive choice for hydrology; "mean" and "median" are also supported).

Parameters:

Name Type Description Default
water_polygons

GeoDataFrame of Polygon / MultiPolygon geometries.

required
method str

"min" (default), "mean", or "median".

'min'
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: Hydroflattened DEM.

Source code in src/digitalrivers/dem.py
def hydroflatten(
    self,
    water_polygons,
    method: str = "min",
    inplace: bool = False,
) -> DEM | None:
    """Flatten lake / pond surfaces to a single elevation per polygon.

    For each input polygon, sample the DEM cells the polygon covers
    and assign every cell in the polygon the per-polygon statistic
    (`"min"` by default — the most defensive choice for hydrology;
    `"mean"` and `"median"` are also supported).

    Args:
        water_polygons: `GeoDataFrame` of Polygon / MultiPolygon
            geometries.
        method: `"min"` (default), `"mean"`, or `"median"`.
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: Hydroflattened DEM.
    """
    if method not in ("min", "mean", "median"):
        raise ValueError(
            f"method must be 'min', 'mean', or 'median'; got {method!r}"
        )

    elev = self.values
    gt = self.geotransform
    x0, dx, _, y0, _, dy = gt
    rows, cols = elev.shape

    water_polygons = _reproject_if_needed(water_polygons, self.epsg)

    z = elev.astype(np.float64, copy=True)
    for geom in water_polygons.geometry:
        if geom is None or geom.is_empty:
            continue
        rs, cs = self._polygon_cell_indices(geom, gt, rows, cols)
        if rs.size == 0:
            continue
        vals = z[rs, cs]
        vals = vals[np.isfinite(vals)]
        if vals.size == 0:
            continue
        if method == "min":
            target = float(vals.min())
        elif method == "mean":
            target = float(vals.mean())
        else:
            target = float(np.median(vals))
        z[rs, cs] = target

    no_val = self.no_data_value[0]
    z[np.isnan(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

burn_buildings(building_polygons, lift=50.0, inplace=False) #

Lift building footprints above the DEM by lift map units so 2D flood routing flows around them.

Parameters:

Name Type Description Default
building_polygons

GeoDataFrame of Polygon geometries.

required
lift float

Elevation added to every cell whose centre falls inside a polygon.

50.0
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: DEM with buildings raised.

Source code in src/digitalrivers/dem.py
def burn_buildings(
    self,
    building_polygons,
    lift: float = 50.0,
    inplace: bool = False,
) -> DEM | None:
    """Lift building footprints above the DEM by `lift` map units so
    2D flood routing flows around them.

    Args:
        building_polygons: `GeoDataFrame` of Polygon geometries.
        lift: Elevation added to every cell whose centre falls inside a
            polygon.
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: DEM with buildings raised.
    """
    elev = self.values
    gt = self.geotransform
    x0, dx, _, y0, _, dy = gt
    rows, cols = elev.shape

    building_polygons = _reproject_if_needed(building_polygons, self.epsg)

    z = elev.astype(np.float64, copy=True)
    for geom in building_polygons.geometry:
        if geom is None or geom.is_empty:
            continue
        rs, cs = self._polygon_cell_indices(geom, gt, rows, cols)
        if rs.size:
            z[rs, cs] = z[rs, cs] + lift

    no_val = self.no_data_value[0]
    z[np.isnan(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

enforce_breaklines(breaklines, lift=5.0, inplace=False) #

Raise linear barriers (levees, walls, kerbs) above the surrounding DEM.

Parameters:

Name Type Description Default
breaklines

GeoDataFrame of LineString geometries.

required
lift float

Elevation added at each rasterised cell along the lines.

5.0
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: DEM with breaklines enforced.

Source code in src/digitalrivers/dem.py
def enforce_breaklines(
    self,
    breaklines,
    lift: float = 5.0,
    inplace: bool = False,
) -> DEM | None:
    """Raise linear barriers (levees, walls, kerbs) above the surrounding DEM.

    Args:
        breaklines: `GeoDataFrame` of LineString geometries.
        lift: Elevation added at each rasterised cell along the lines.
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: DEM with breaklines enforced.
    """
    elev = self.values
    gt = self.geotransform
    rows, cols = elev.shape
    mask = np.zeros((rows, cols), dtype=bool)

    breaklines = _reproject_if_needed(breaklines, self.epsg)

    for geom in breaklines.geometry:
        if geom is None or geom.is_empty:
            continue
        if geom.geom_type == "MultiLineString":
            for sub in geom.geoms:
                self._rasterise_line(sub, mask, gt)
        else:
            self._rasterise_line(geom, mask, gt)

    z = elev.astype(np.float64, copy=True)
    z[mask] = z[mask] + lift
    no_val = self.no_data_value[0]
    z[np.isnan(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

subgrid_bathymetry(scale_factor, n_bins=10) #

Build per-coarse-cell bathymetry tables (SFINCS-style).

For each coarse cell (scale_factor × scale_factor block of fine cells), compute a histogram-like table mapping a coarsened water- depth level to the wetted area within the block. This is the sub-grid representation SFINCS and similar reduced-order 2D models use to recover small-scale topography without resolving it on the coarse grid.

Parameters:

Name Type Description Default
scale_factor int

Integer aggregation factor (>= 2).

required
n_bins int

Number of depth bins per coarse cell.

10

Returns:

Type Description
'pd.DataFrame'

pandas.DataFrame indexed by coarse-cell (row, col) with

'pd.DataFrame'

n_bins + 2 columns: z_min, z_max, plus

'pd.DataFrame'

frac_below_<k> for k in [1, n_bins] giving the

'pd.DataFrame'

fraction of fine cells at or below the k-th depth bin.

'pd.DataFrame'

For flat blocks (z_max == z_min) every frac_below_<k>

'pd.DataFrame'

is 1.0.

Raises:

Type Description
ValueError

For scale_factor < 2 or n_bins < 1.

Examples:

  • A flat block produces frac_below_<k> == 1.0 for every bin (B1 regression — the columns are always present):

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM ds = Dataset.create_from_array( ... np.full((4, 4), 5.0, dtype=np.float32), ... top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) df = DEM(ds.raster).subgrid_bathymetry(scale_factor=2, n_bins=3) sorted(df.columns.tolist()) ['frac_below_1', 'frac_below_2', 'frac_below_3', 'z_max', 'z_min'] float(df["frac_below_1"].iloc[0]) 1.0

Source code in src/digitalrivers/dem.py
def subgrid_bathymetry(
    self,
    scale_factor: int,
    n_bins: int = 10,
) -> "pd.DataFrame":
    """Build per-coarse-cell bathymetry tables (SFINCS-style).

    For each coarse cell (`scale_factor × scale_factor` block of fine
    cells), compute a histogram-like table mapping a coarsened water-
    depth level to the wetted area within the block. This is the
    sub-grid representation SFINCS and similar reduced-order 2D models
    use to recover small-scale topography without resolving it on the
    coarse grid.

    Args:
        scale_factor: Integer aggregation factor (>= 2).
        n_bins: Number of depth bins per coarse cell.

    Returns:
        `pandas.DataFrame` indexed by coarse-cell `(row, col)` with
        `n_bins + 2` columns: `z_min`, `z_max`, plus
        `frac_below_<k>` for `k` in `[1, n_bins]` giving the
        fraction of fine cells at or below the `k`-th depth bin.
        For flat blocks (`z_max == z_min`) every `frac_below_<k>`
        is `1.0`.

    Raises:
        ValueError: For `scale_factor < 2` or `n_bins < 1`.

    Examples:
        - A flat block produces `frac_below_<k> == 1.0` for every bin
          (B1 regression — the columns are always present):

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> ds = Dataset.create_from_array(
            ...     np.full((4, 4), 5.0, dtype=np.float32),
            ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> df = DEM(ds.raster).subgrid_bathymetry(scale_factor=2, n_bins=3)
            >>> sorted(df.columns.tolist())
            ['frac_below_1', 'frac_below_2', 'frac_below_3', 'z_max', 'z_min']
            >>> float(df["frac_below_1"].iloc[0])
            1.0
    """
    import numpy as np
    import pandas as pd

    if scale_factor < 2:
        raise ValueError(
            f"scale_factor must be >= 2; got {scale_factor}"
        )
    if n_bins < 1:
        raise ValueError(f"n_bins must be >= 1; got {n_bins}")

    elev = self.values
    rows, cols = elev.shape
    out_rows = rows // scale_factor
    out_cols = cols // scale_factor

    records: list[dict] = []
    for br in range(out_rows):
        for bc in range(out_cols):
            block = elev[
                br * scale_factor : (br + 1) * scale_factor,
                bc * scale_factor : (bc + 1) * scale_factor,
            ].ravel()
            valid = block[np.isfinite(block)]
            if valid.size == 0:
                continue
            z_min = float(valid.min())
            z_max = float(valid.max())
            rec = {"row": br, "col": bc, "z_min": z_min, "z_max": z_max}
            if z_max == z_min:
                # Flat block: every bin is "below" the single value, so
                # frac_below_k == 1.0 for every k. (The B1 review found
                # that the original code computed this list but never
                # wrote it into the record, dropping the frac columns
                # entirely when every block was flat.)
                for k in range(1, n_bins + 1):
                    rec[f"frac_below_{k}"] = 1.0
            else:
                bin_edges = np.linspace(z_min, z_max, n_bins + 1)
                for k, edge in enumerate(bin_edges[1:], start=1):
                    rec[f"frac_below_{k}"] = float(
                        (valid <= edge).sum() / valid.size
                    )
            records.append(rec)
    df = pd.DataFrame(records).set_index(["row", "col"])
    return df

export(path, target, *, breaklines=None, walls=None, buildings=None, manning_n=None, boundary_conditions=None, validate=True, **kwargs) #

Export the DEM to a hydrodynamic-model format.

Every target listed below ships with a working writer. Most were backfilled after the initial Phase-3 cut; lisflood_fp is the canonical Arc-ASCII writer and remains the only target that actually requires a sinks-free input.

Parameters:

Name Type Description Default
path str

Output file path.

required
target str

One of "hec_ras", "tuflow", "sfincs", "lisflood_fp", "iber", "gmsh".

required
breaklines / walls / buildings / manning_n / boundary_conditions

Reserved for target-specific bundles. Currently ignored by the LISFLOOD-FP writer.

required
validate bool

When True (default), refuse to export a DEM with internal sinks. Only applied for target="lisflood_fp" — the Arc-ASCII writer is the only target where downstream tooling actually requires sinks-free input. Other writers skip the sink scan even when validate=True so the local_minima_8 pass does not run unnecessarily (I4 fixup). Pass validate=False to also disable the lisflood_fp guard.

True
**kwargs

Target-specific options.

{}

Returns:

Type Description
dict

dict mapping artefact label → file path written.

Raises:

Type Description
ValueError

For unknown target.

RuntimeError

When target == "lisflood_fp", validate=True, and the DEM has internal sinks.

Source code in src/digitalrivers/dem.py
def export(
    self,
    path: str,
    target: str,
    *,
    breaklines=None,
    walls=None,
    buildings=None,
    manning_n=None,
    boundary_conditions=None,
    validate: bool = True,
    **kwargs,
) -> dict:
    """Export the DEM to a hydrodynamic-model format.

    Every target listed below ships with a working writer. Most were
    backfilled after the initial Phase-3 cut; `lisflood_fp` is the
    canonical Arc-ASCII writer and remains the only target that
    actually requires a sinks-free input.

    Args:
        path: Output file path.
        target: One of `"hec_ras"`, `"tuflow"`, `"sfincs"`,
            `"lisflood_fp"`, `"iber"`, `"gmsh"`.
        breaklines / walls / buildings / manning_n / boundary_conditions:
            Reserved for target-specific bundles. Currently ignored by
            the LISFLOOD-FP writer.
        validate: When `True` (default), refuse to export a DEM with
            internal sinks. **Only applied for** `target="lisflood_fp"`
            — the Arc-ASCII writer is the only target where downstream
            tooling actually requires sinks-free input. Other writers
            skip the sink scan even when `validate=True` so the
            `local_minima_8` pass does not run unnecessarily (I4
            fixup). Pass `validate=False` to also disable the
            lisflood_fp guard.
        **kwargs: Target-specific options.

    Returns:
        `dict` mapping artefact label → file path written.

    Raises:
        ValueError: For unknown `target`.
        RuntimeError: When `target == "lisflood_fp"`, `validate=True`,
            and the DEM has internal sinks.
    """
    valid_targets = {
        "hec_ras", "tuflow", "sfincs", "lisflood_fp", "iber", "gmsh",
    }
    if target not in valid_targets:
        raise ValueError(
            f"target must be one of {sorted(valid_targets)}; got {target!r}"
        )

    # Only run the (expensive) sink scan when we'll actually export to
    # an implemented target. The unimplemented targets raise
    # NotImplementedError further down and would otherwise pay the full
    # validation cost for nothing.
    if validate and target == "lisflood_fp":
        from digitalrivers._conditioning.pitremoval import local_minima_8
        sinks = local_minima_8(self.values)
        if int(sinks.sum()) > 0:
            raise RuntimeError(
                f"DEM has {int(sinks.sum())} internal sinks; either fix "
                "them (DEM.fill_depressions) or pass validate=False"
            )

    elev = self.values
    gt = self.geotransform
    x0, dx, _, y0, _, dy = gt
    rows, cols = elev.shape
    nodata = float(self.no_data_value[0])
    out = np.where(np.isnan(elev), nodata, elev)
    cell_size = abs(dx)
    yllcorner = y0 + rows * dy

    if target == "lisflood_fp":
        with open(path, "w", encoding="ascii", newline="\n") as fh:
            fh.write(f"ncols {cols}\n")
            fh.write(f"nrows {rows}\n")
            fh.write(f"xllcorner {x0}\n")
            fh.write(f"yllcorner {yllcorner}\n")
            fh.write(f"cellsize {cell_size}\n")
            fh.write(f"NODATA_value {nodata}\n")
            for r in range(rows):
                fh.write(" ".join(f"{out[r, c]:.6f}" for c in range(cols)))
                fh.write("\n")
        return {"dem_asc": path}

    if target == "hec_ras":
        # HEC-RAS Mapper expects a single-band float32 GeoTIFF in the
        # dataset CRS with consistent geotransform — exactly what
        # Dataset.create_from_array(driver_type="GTiff", path=...) writes.
        Dataset.create_from_array(
            out.astype(np.float32, copy=False),
            geo=gt, epsg=self.epsg,
            no_data_value=nodata,
            driver_type="GTiff", path=path,
        )
        return {"dem_tif": path}

    if target == "tuflow":
        # ESRI floating-point grid (.flt binary, row-major little-endian
        # float32, top-left first) + .hdr text header.
        flt_path = path if path.endswith(".flt") else path + ".flt"
        hdr_path = flt_path[:-4] + ".hdr"
        out.astype(np.float32, copy=False).tofile(flt_path)
        with open(hdr_path, "w") as fh:
            fh.write(f"ncols {cols}\n")
            fh.write(f"nrows {rows}\n")
            fh.write(f"xllcorner {x0}\n")
            fh.write(f"yllcorner {yllcorner}\n")
            fh.write(f"cellsize {cell_size}\n")
            fh.write(f"NODATA_value {nodata}\n")
            fh.write("byteorder LSBFIRST\n")
        return {"dem_flt": flt_path, "dem_hdr": hdr_path}

    if target == "sfincs":
        # SFINCS .dep: row-major little-endian float32, no header.
        # Companion .msk: 0 where no-data, 1 elsewhere.
        dep_path = path if path.endswith(".dep") else path + ".dep"
        msk_path = dep_path[:-4] + ".msk"
        out.astype(np.float32, copy=False).tofile(dep_path)
        mask = np.where(np.isnan(elev), 0, 1).astype(np.uint8)
        mask.tofile(msk_path)
        return {"dem_dep": dep_path, "dem_msk": msk_path}

    if target == "gmsh":
        # Minimal .geo script: define the DEM bounds as a rectangle
        # with a uniform characteristic length. Downstream meshers can
        # be run via `gmsh -2 <path>`.
        geo_path = path if path.endswith(".geo") else path + ".geo"
        ext_x_lo = x0
        ext_x_hi = x0 + cols * dx
        ext_y_hi = y0
        ext_y_lo = y0 + rows * dy
        cl = cell_size
        with open(geo_path, "w") as fh:
            fh.write(f"cl = {cl};\n")
            fh.write(f"Point(1) = {{{ext_x_lo}, {ext_y_lo}, 0, cl}};\n")
            fh.write(f"Point(2) = {{{ext_x_hi}, {ext_y_lo}, 0, cl}};\n")
            fh.write(f"Point(3) = {{{ext_x_hi}, {ext_y_hi}, 0, cl}};\n")
            fh.write(f"Point(4) = {{{ext_x_lo}, {ext_y_hi}, 0, cl}};\n")
            fh.write("Line(1) = {1, 2};\n")
            fh.write("Line(2) = {2, 3};\n")
            fh.write("Line(3) = {3, 4};\n")
            fh.write("Line(4) = {4, 1};\n")
            fh.write("Line Loop(1) = {1, 2, 3, 4};\n")
            fh.write("Plane Surface(1) = {1};\n")
        return {"geo": geo_path}

    if target == "iber":
        # Iber expects a .dat ascii mesh; pending mesh generation
        # (Phase 4 P33) we write a placeholder boundary file that the
        # user can refine in Iber's pre-processor.
        dat_path = path if path.endswith(".dat") else path + ".dat"
        with open(dat_path, "w") as fh:
            fh.write("# Iber mesh boundary (auto-generated)\n")
            fh.write(f"NCOLS {cols}\nNROWS {rows}\n")
            fh.write(f"XLLCORNER {x0}\nYLLCORNER {yllcorner}\n")
            fh.write(f"CELLSIZE {cell_size}\nNODATA {nodata}\n")
            for r in range(rows):
                fh.write(" ".join(f"{out[r, c]:.6f}" for c in range(cols)))
                fh.write("\n")
        return {"dem_dat": dat_path}

    # Unreachable — guarded by valid_targets check above.
    raise NotImplementedError(target)

anudem_interpolate(mask=None, max_iter=200, tol=0.001, method='laplacian', inplace=False) #

ANUDEM-lite: Laplacian-relaxation gap fill (P25).

A pragmatic subset of Hutchinson 1989 ANUDEM that handles the common gap-filling case: a DEM with NaN holes (cloud shadows, survey gaps, vegetation occlusion) is filled by Gauss-Seidel Laplacian relaxation, holding the known cells fixed. Each iteration replaces every unknown cell with the mean of its four 4-connected neighbours; iteration stops when the maximum change in a sweep drops below tol or after max_iter sweeps.

Two solver methods are available:

  • "laplacian" (default): solves Δz = 0 via the 4-neighbour mean iteration. Fast, smooth interior, but only C⁰ continuity at the anchor cells — the surface has visible "kinks" at known points.
  • "biharmonic": solves Δ²z = 0 by alternating two Laplacian sweeps (relax u = Δz, then relax z so Δz = u). C¹ continuity at anchors; closer to Hutchinson 1989 ANUDEM's tension-spline objective but still without multigrid acceleration or drainage enforcement.

Limitations vs full ANUDEM:

  • No multigrid acceleration; iteration cost is O(N · max_iter).
  • No drainage enforcement. For stream-conditioned DEMs, combine with :meth:burn_streams before or after.
  • No tension parameter (Hutchinson's λ); the biharmonic mode is a fixed-λ approximation.

Parameters:

Name Type Description Default
mask

Optional bool array same shape as the DEM. True marks cells whose values must be preserved (in addition to the existing finite cells). None keeps every finite cell fixed.

None
max_iter int

Maximum relaxation sweeps.

200
tol float

Convergence tolerance — stop when max |Δz| < tol.

0.001
method str

"laplacian" (default) or "biharmonic".

'laplacian'
inplace bool

If True, update the instance; else return a new DEM.

False

Returns:

Type Description
DEM | None

DEM | None: Filled DEM, or None when inplace=True.

Raises:

Type Description
ValueError

If the input DEM has no finite cells or method is not "laplacian" / "biharmonic".

Examples:

  • Fill a single-cell NaN hole with the default Laplacian solver; the filled value sits inside the bracketing range:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.array( ... [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]], ... dtype=np.float32, ... ) ds = Dataset.create_from_array( ... np.where(np.isnan(z), -9999.0, z), ... top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) filled = DEM(ds.raster).anudem_interpolate( ... method="laplacian", max_iter=200, tol=1e-6, ... ) out = filled.values bool(1.0 <= out[1, 1] <= 9.0) True

  • The biharmonic mode (P32 backfill) approximates Hutchinson 1989's Delta^2 z = 0 by alternating Laplacian sweeps:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.array( ... [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]], ... dtype=np.float32, ... ) ds = Dataset.create_from_array( ... np.where(np.isnan(z), -9999.0, z), ... top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) filled = DEM(ds.raster).anudem_interpolate( ... method="biharmonic", max_iter=200, tol=1e-5, ... ) bool(np.isfinite(filled.values).all()) True

  • Unknown method raises ValueError:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM ds = Dataset.create_from_array( ... np.ones((2, 2), dtype=np.float32), ... top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) DEM(ds.raster).anudem_interpolate(method="bogus") Traceback (most recent call last): ... ValueError: method must be 'laplacian' or 'biharmonic'; got 'bogus'

  • The 5-point stencil uses edge-replication (Neumann) boundary padding, so a NaN cell adjacent to the raster boundary is filled from its in-bounds neighbours only — no periodic-wrap contamination from the opposite edge:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM

    Top-left NaN with very different anchor at bottom-right.#

    z = np.full((5, 5), 10.0, dtype=np.float32) z[-1, -1] = -100.0 z[0, 0] = np.nan ds = Dataset.create_from_array( ... np.where(np.isnan(z), -9999.0, z), ... top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) filled = DEM(ds.raster).anudem_interpolate( ... method="laplacian", max_iter=300, tol=1e-6, ... ) bool(abs(float(filled.values[0, 0]) - 10.0) < 5.0) True

See Also

DEM.fill_depressions: hydrologic conditioning that removes sinks. DEM.burn_streams: stream-network drainage enforcement.

Source code in src/digitalrivers/dem.py
def anudem_interpolate(
    self,
    mask=None,
    max_iter: int = 200,
    tol: float = 1e-3,
    method: str = "laplacian",
    inplace: bool = False,
) -> DEM | None:
    """ANUDEM-lite: Laplacian-relaxation gap fill (P25).

    A pragmatic subset of Hutchinson 1989 ANUDEM that handles the
    common gap-filling case: a DEM with NaN holes (cloud shadows,
    survey gaps, vegetation occlusion) is filled by Gauss-Seidel
    Laplacian relaxation, holding the known cells fixed. Each
    iteration replaces every unknown cell with the mean of its four
    4-connected neighbours; iteration stops when the maximum change
    in a sweep drops below `tol` or after `max_iter` sweeps.

    Two solver methods are available:

    - `"laplacian"` (default): solves Δz = 0 via the 4-neighbour
      mean iteration. Fast, smooth interior, but only C⁰ continuity
      at the anchor cells — the surface has visible "kinks" at
      known points.
    - `"biharmonic"`: solves Δ²z = 0 by alternating two Laplacian
      sweeps (relax `u = Δz`, then relax `z` so `Δz = u`).
      C¹ continuity at anchors; closer to Hutchinson 1989 ANUDEM's
      tension-spline objective but still without multigrid
      acceleration or drainage enforcement.

    Limitations vs full ANUDEM:

    - No multigrid acceleration; iteration cost is O(N · max_iter).
    - No drainage enforcement. For stream-conditioned DEMs, combine
      with :meth:`burn_streams` before or after.
    - No tension parameter (Hutchinson's λ); the biharmonic mode
      is a fixed-λ approximation.

    Args:
        mask: Optional bool array same shape as the DEM. `True`
            marks cells whose values must be preserved (in addition
            to the existing finite cells). `None` keeps every
            finite cell fixed.
        max_iter: Maximum relaxation sweeps.
        tol: Convergence tolerance — stop when `max |Δz| < tol`.
        method: `"laplacian"` (default) or `"biharmonic"`.
        inplace: If True, update the instance; else return a new DEM.

    Returns:
        DEM | None: Filled DEM, or None when `inplace=True`.

    Raises:
        ValueError: If the input DEM has no finite cells or `method`
            is not `"laplacian"` / `"biharmonic"`.

    Examples:
        - Fill a single-cell NaN hole with the default Laplacian solver;
          the filled value sits inside the bracketing range:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.array(
            ...     [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]],
            ...     dtype=np.float32,
            ... )
            >>> ds = Dataset.create_from_array(
            ...     np.where(np.isnan(z), -9999.0, z),
            ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> filled = DEM(ds.raster).anudem_interpolate(
            ...     method="laplacian", max_iter=200, tol=1e-6,
            ... )
            >>> out = filled.values
            >>> bool(1.0 <= out[1, 1] <= 9.0)
            True

        - The biharmonic mode (P32 backfill) approximates Hutchinson
          1989's Delta^2 z = 0 by alternating Laplacian sweeps:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.array(
            ...     [[1.0, 2.0, 3.0], [4.0, np.nan, 6.0], [7.0, 8.0, 9.0]],
            ...     dtype=np.float32,
            ... )
            >>> ds = Dataset.create_from_array(
            ...     np.where(np.isnan(z), -9999.0, z),
            ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> filled = DEM(ds.raster).anudem_interpolate(
            ...     method="biharmonic", max_iter=200, tol=1e-5,
            ... )
            >>> bool(np.isfinite(filled.values).all())
            True

        - Unknown method raises ValueError:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> ds = Dataset.create_from_array(
            ...     np.ones((2, 2), dtype=np.float32),
            ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> DEM(ds.raster).anudem_interpolate(method="bogus")
            Traceback (most recent call last):
                ...
            ValueError: method must be 'laplacian' or 'biharmonic'; got 'bogus'

        - The 5-point stencil uses edge-replication (Neumann) boundary
          padding, so a NaN cell adjacent to the raster boundary is
          filled from its in-bounds neighbours only — no periodic-wrap
          contamination from the opposite edge:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> # Top-left NaN with very different anchor at bottom-right.
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> z[-1, -1] = -100.0
            >>> z[0, 0] = np.nan
            >>> ds = Dataset.create_from_array(
            ...     np.where(np.isnan(z), -9999.0, z),
            ...     top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> filled = DEM(ds.raster).anudem_interpolate(
            ...     method="laplacian", max_iter=300, tol=1e-6,
            ... )
            >>> bool(abs(float(filled.values[0, 0]) - 10.0) < 5.0)
            True

    See Also:
        DEM.fill_depressions: hydrologic conditioning that removes sinks.
        DEM.burn_streams: stream-network drainage enforcement.
    """
    if method not in ("laplacian", "biharmonic"):
        raise ValueError(
            f"method must be 'laplacian' or 'biharmonic'; got {method!r}"
        )

    elev = self.values
    rows, cols = elev.shape
    z = elev.astype(np.float64, copy=True)
    fixed = np.isfinite(z)
    if mask is not None:
        fixed = fixed | mask.astype(bool, copy=False)
    if not fixed.any():
        raise ValueError(
            "anudem_interpolate needs at least one finite anchor cell"
        )
    # Seed unknown cells to the mean of known values to speed convergence.
    z = np.where(fixed, z, z[fixed].mean())

    def _edge_shifts(arr):
        """Return (north, south, east, west) views of `arr` with
        edge-replication boundary handling (no periodic wrap).

        Using `np.pad(..., mode="edge")` matches a Neumann (zero
        normal-derivative) boundary, which is the natural choice for
        an interpolation kernel — the original `np.roll` formed a
        torus and injected far-edge values into near-edge cells,
        corrupting anchors near the DEM boundary.
        """
        padded = np.pad(arr, 1, mode="edge")
        return (
            padded[:-2, 1:-1],
            padded[2:, 1:-1],
            padded[1:-1, 2:],
            padded[1:-1, :-2],
        )

    if method == "laplacian":
        for _ in range(max_iter):
            north, south, east, west = _edge_shifts(z)
            new_z = (north + south + east + west) / 4.0
            new_z[fixed] = z[fixed]
            diff = float(np.max(np.abs(new_z - z)))
            z = new_z
            if diff < tol:
                break
    else:  # biharmonic
        # Alternate two Laplacian sweeps to approximate Δ²z = 0.
        # Step A: compute u = Δz on the current z.
        # Step B: relax z so Δz ≈ smoothed u (mean of neighbour u's).
        # Composed, this approximates a biharmonic relaxation with C¹
        # continuity at the anchors.
        for _ in range(max_iter):
            north, south, east, west = _edge_shifts(z)
            u = north + south + east + west - 4.0 * z
            un, us, ue, uw = _edge_shifts(u)
            u_smooth = (un + us + ue + uw) / 4.0
            # Solve Δz = u_smooth → new z[i,j] =
            # (sum of neighbours - u_smooth) / 4.
            new_z = (north + south + east + west - u_smooth) / 4.0
            new_z[fixed] = z[fixed]
            diff = float(np.max(np.abs(new_z - z)))
            z = new_z
            if diff < tol:
                break

    no_val = self.no_data_value[0]
    z[~np.isfinite(z)] = no_val
    plain_ds = Dataset.dataset_like(self, z.astype(elev.dtype, copy=False))
    if inplace:
        self._update_inplace(plain_ds.raster)
        return None
    return DEM(plain_ds.raster)

fill_sinks(inplace=False) #

Deprecated alias for fill_depressions(method="priority_flood", epsilon=0.1).

The original implementation was a single-pass, single-cell sink fill that did not cascade through nested pits. Calls now route through the Priority-Flood + epsilon algorithm, which is correct on cascading depressions. The output differs from the historical algorithm in two ways:

  1. Cascading pits are fully resolved (each pit fills to the rim of its enclosing pit, not just to its immediate-neighbour minimum).
  2. Drainage paths within filled depressions inherit a 0.1-unit gradient — so D8 routing on the result avoids NO_FLOW cells inside the fill.

Parameters:

Name Type Description Default
inplace bool

If True the instance is updated in place; otherwise a new DEM is returned.

False

Returns:

Type Description
DEM | None

DEM | None: New DEM with the sink-free elevation, or None when

DEM | None

inplace is True.

Source code in src/digitalrivers/dem.py
def fill_sinks(self, inplace: bool = False) -> DEM | None:
    """Deprecated alias for `fill_depressions(method="priority_flood", epsilon=0.1)`.

    The original implementation was a single-pass, single-cell sink fill that did
    not cascade through nested pits. Calls now route through the Priority-Flood +
    epsilon algorithm, which is correct on cascading depressions. The output
    differs from the historical algorithm in two ways:

    1. Cascading pits are fully resolved (each pit fills to the rim of its enclosing
       pit, not just to its immediate-neighbour minimum).
    2. Drainage paths within filled depressions inherit a 0.1-unit gradient — so
       D8 routing on the result avoids `NO_FLOW` cells inside the fill.

    Args:
        inplace: If `True` the instance is updated in place; otherwise a new
            `DEM` is returned.

    Returns:
        DEM | None: New `DEM` with the sink-free elevation, or `None` when
        `inplace` is `True`.
    """
    warnings.warn(
        "DEM.fill_sinks is deprecated; use DEM.fill_depressions(method='priority_flood', "
        "epsilon=0.1) for equivalent behaviour or method='wang_liu' for a flat fill.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.fill_depressions(method="priority_flood", epsilon=0.1, inplace=inplace)

twi(accumulation, slope_deg=None) #

Topographic Wetness Index (Beven & Kirkby 1979).

TWI = ln(SCA / tan(slope)) where SCA = specific catchment area = accumulation_cells * cell_size. High TWI values mark cells likely to be wet (low slope and / or large upstream area). Slope is treated in radians internally; values below a small floor (≈ 0.06°) are clamped to avoid log-singularity at flat cells.

Parameters:

Name Type Description Default
accumulation

Accumulation raster aligned to this DEM.

required
slope_deg Dataset | None

Optional pre-computed slope raster in degrees. If None, Terrain.slope-equivalent is computed on the fly via arctan of DEM.slope()'s rise/run output.

None

Returns:

Type Description
Dataset

Dataset of float32 TWI values. No-data sentinel -9999.0.

References

Beven, K. J. & Kirkby, M. J. (1979). "A physically based, variable contributing area model of basin hydrology." Hydrological Sciences Bulletin 24(1): 43-69.

Source code in src/digitalrivers/dem.py
def twi(
    self,
    accumulation,
    slope_deg: Dataset | None = None,
) -> Dataset:
    """Topographic Wetness Index (Beven & Kirkby 1979).

    TWI = ln(SCA / tan(slope)) where SCA = specific catchment area =
    `accumulation_cells * cell_size`. High TWI values mark cells likely
    to be wet (low slope and / or large upstream area). Slope is treated
    in radians internally; values below a small floor (≈ 0.06°) are
    clamped to avoid log-singularity at flat cells.

    Args:
        accumulation: `Accumulation` raster aligned to this DEM.
        slope_deg: Optional pre-computed slope raster in degrees. If
            None, `Terrain.slope`-equivalent is computed on the fly via
            `arctan` of `DEM.slope()`'s rise/run output.

    Returns:
        `Dataset` of float32 TWI values. No-data sentinel `-9999.0`.

    References:
        Beven, K. J. & Kirkby, M. J. (1979). "A physically based,
        variable contributing area model of basin hydrology."
        *Hydrological Sciences Bulletin* 24(1): 43-69.
    """
    return self._area_slope_index(accumulation, slope_deg, kind="twi")

spi(accumulation, slope_deg=None) #

Stream Power Index (Moore et al. 1991).

SPI = SCA * tan(slope). Proportional to the rate at which overland flow does work at a cell; useful as a proxy for erosion risk.

Parameters:

Name Type Description Default
accumulation

Accumulation raster aligned to this DEM.

required
slope_deg Dataset | None

Optional pre-computed slope raster in degrees. If None, computed on the fly.

None

Returns:

Type Description
Dataset

Dataset of float32 SPI values. No-data sentinel -9999.0.

Source code in src/digitalrivers/dem.py
def spi(
    self,
    accumulation,
    slope_deg: Dataset | None = None,
) -> Dataset:
    """Stream Power Index (Moore et al. 1991).

    SPI = SCA * tan(slope). Proportional to the rate at which overland
    flow does work at a cell; useful as a proxy for erosion risk.

    Args:
        accumulation: `Accumulation` raster aligned to this DEM.
        slope_deg: Optional pre-computed slope raster in degrees. If
            None, computed on the fly.

    Returns:
        `Dataset` of float32 SPI values. No-data sentinel `-9999.0`.
    """
    return self._area_slope_index(accumulation, slope_deg, kind="spi")

sti(accumulation, slope_deg=None) #

Sediment Transport Index (Moore & Burch 1986).

STI = (SCA / 22.13)^0.6 * (sin(slope) / 0.0896)^1.3. The 22.13 m and 0.0896 m/m constants come from the original USLE plot length and slope; STI predicts where overland flow will transport sediment rather than deposit it.

Parameters:

Name Type Description Default
accumulation

Accumulation raster aligned to this DEM.

required
slope_deg Dataset | None

Optional pre-computed slope raster in degrees.

None

Returns:

Type Description
Dataset

Dataset of float32 STI values. No-data sentinel -9999.0.

Source code in src/digitalrivers/dem.py
def sti(
    self,
    accumulation,
    slope_deg: Dataset | None = None,
) -> Dataset:
    """Sediment Transport Index (Moore & Burch 1986).

    STI = (SCA / 22.13)^0.6 * (sin(slope) / 0.0896)^1.3. The 22.13 m and
    0.0896 m/m constants come from the original USLE plot length and
    slope; STI predicts where overland flow will transport sediment
    rather than deposit it.

    Args:
        accumulation: `Accumulation` raster aligned to this DEM.
        slope_deg: Optional pre-computed slope raster in degrees.

    Returns:
        `Dataset` of float32 STI values. No-data sentinel `-9999.0`.
    """
    return self._area_slope_index(accumulation, slope_deg, kind="sti")

tpi(window=3) #

Topographic Position Index (Guisan 1999).

TPI = z - focal_mean(z, window). Positive values mark ridges and upland positions; negative values mark valleys and depressions.

Parameters:

Name Type Description Default
window int

Side length of the focal window in cells (must be ≥ 1). Defaults to 3 (a 3×3 neighbourhood). Larger windows pick up regional / catchment-scale topography; smaller windows pick up local relief.

3

Returns:

Type Description
Dataset

Dataset of float32 TPI values. No-data cells use this DEM's

Dataset

no-data sentinel.

References

Guisan, A., Weiss, S. B., & Weiss, A. D. (1999). "GLM versus CCA spatial modeling of plant species distribution." Plant Ecology 143(1): 107-122.

Examples:

  • A flat surface has every cell at its focal mean → TPI = 0:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) tpi = DEM(ds.raster).tpi(window=3).read_array() bool(np.allclose(tpi, 0.0)) True

  • A single ridge cell on flat terrain reports positive TPI; a pit reports negative:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.zeros((5, 5), dtype=np.float32) z[2, 2] = 9.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(DEM(ds.raster).tpi(window=3).read_array()[2, 2] > 0) True

Source code in src/digitalrivers/dem.py
def tpi(self, window: int = 3) -> Dataset:
    """Topographic Position Index (Guisan 1999).

    TPI = `z - focal_mean(z, window)`. Positive values mark ridges and
    upland positions; negative values mark valleys and depressions.

    Args:
        window: Side length of the focal window in cells (must be ≥ 1).
            Defaults to 3 (a 3×3 neighbourhood). Larger windows pick up
            regional / catchment-scale topography; smaller windows pick
            up local relief.

    Returns:
        `Dataset` of float32 TPI values. No-data cells use this DEM's
        no-data sentinel.

    References:
        Guisan, A., Weiss, S. B., & Weiss, A. D. (1999). "GLM versus
        CCA spatial modeling of plant species distribution." *Plant
        Ecology* 143(1): 107-122.

    Examples:
        - A flat surface has every cell at its focal mean → TPI = 0:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> tpi = DEM(ds.raster).tpi(window=3).read_array()
            >>> bool(np.allclose(tpi, 0.0))
            True

        - A single ridge cell on flat terrain reports positive TPI; a
          pit reports negative:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.zeros((5, 5), dtype=np.float32)
            >>> z[2, 2] = 9.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(DEM(ds.raster).tpi(window=3).read_array()[2, 2] > 0)
            True
    """
    z, focal_mean, _focal_sd = self._focal_window_stats(window)
    out = (z - focal_mean).astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(out), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

deviation_from_mean(window=3) #

Deviation from mean elevation — standardised TPI.

(z - focal_mean) / focal_sd. Dimensionless ridge / valley index; because it normalises by local roughness it allows comparing positions across regimes with very different relief.

Parameters:

Name Type Description Default
window int

Side length of the focal window in cells (≥ 1).

3

Returns:

Type Description
Dataset

Dataset of float32 deviation values. Flat cells (focal_sd ≈ 0)

Dataset

yield 0.0 by definition. No-data cells use this DEM's no-data

Dataset

sentinel.

Examples:

  • Flat terrain yields zero deviation everywhere:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((4, 4), 5.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(np.allclose( ... DEM(ds.raster).deviation_from_mean(window=3).read_array(), 0.0 ... )) True

  • A peak reports positive standardised deviation:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.zeros((5, 5), dtype=np.float32) z[2, 2] = 10.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) out = DEM(ds.raster).deviation_from_mean(window=3).read_array() bool(out[2, 2] > 0) True

Source code in src/digitalrivers/dem.py
def deviation_from_mean(self, window: int = 3) -> Dataset:
    """Deviation from mean elevation — standardised TPI.

    `(z - focal_mean) / focal_sd`. Dimensionless ridge / valley index;
    because it normalises by local roughness it allows comparing
    positions across regimes with very different relief.

    Args:
        window: Side length of the focal window in cells (≥ 1).

    Returns:
        `Dataset` of float32 deviation values. Flat cells (focal_sd ≈ 0)
        yield 0.0 by definition. No-data cells use this DEM's no-data
        sentinel.

    Examples:
        - Flat terrain yields zero deviation everywhere:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((4, 4), 5.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(np.allclose(
            ...     DEM(ds.raster).deviation_from_mean(window=3).read_array(), 0.0
            ... ))
            True

        - A peak reports positive standardised deviation:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.zeros((5, 5), dtype=np.float32)
            >>> z[2, 2] = 10.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> out = DEM(ds.raster).deviation_from_mean(window=3).read_array()
            >>> bool(out[2, 2] > 0)
            True
    """
    z, focal_mean, focal_sd = self._focal_window_stats(window)
    out = (z - focal_mean) / np.where(focal_sd == 0.0, 1.0, focal_sd)
    out = out.astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(out), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

elev_std(window=3) #

Standard deviation of elevation in a focal window.

Pure focal-window SD on the elevation raster. A roughness proxy: high values mark varied terrain, low values mark smooth terrain.

Parameters:

Name Type Description Default
window int

Side length of the focal window in cells (≥ 1).

3

Returns:

Type Description
Dataset

Dataset of float32 SD values. No-data cells use this DEM's

Dataset

no-data sentinel.

Examples:

  • Flat terrain reports zero SD everywhere:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((4, 4), 5.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(np.allclose( ... DEM(ds.raster).elev_std(window=3).read_array(), 0.0 ... )) True

  • A step in elevation produces non-zero SD along the boundary:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.zeros((5, 5), dtype=np.float32) z[:, 3:] = 10.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) sd = DEM(ds.raster).elev_std(window=3).read_array() bool((sd[:, 2] > 0).all()) True

Source code in src/digitalrivers/dem.py
def elev_std(self, window: int = 3) -> Dataset:
    """Standard deviation of elevation in a focal window.

    Pure focal-window SD on the elevation raster. A roughness proxy:
    high values mark varied terrain, low values mark smooth terrain.

    Args:
        window: Side length of the focal window in cells (≥ 1).

    Returns:
        `Dataset` of float32 SD values. No-data cells use this DEM's
        no-data sentinel.

    Examples:
        - Flat terrain reports zero SD everywhere:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((4, 4), 5.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(np.allclose(
            ...     DEM(ds.raster).elev_std(window=3).read_array(), 0.0
            ... ))
            True

        - A step in elevation produces non-zero SD along the boundary:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.zeros((5, 5), dtype=np.float32)
            >>> z[:, 3:] = 10.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> sd = DEM(ds.raster).elev_std(window=3).read_array()
            >>> bool((sd[:, 2] > 0).all())
            True
    """
    _z, _focal_mean, focal_sd = self._focal_window_stats(window)
    out = focal_sd.astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(out), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

curvature(kind='profile') #

Surface curvature (Zevenbergen & Thorne 1987).

Fits a partial quartic polynomial z = Ax²y² + Bx²y + Cxy² + Dx² + Ey² + Fxy + Gx + Hy + I to the 3×3 neighbourhood of each cell and evaluates one of the five canonical curvature variants from the coefficient grid:

  • "plan" — curvature perpendicular to the slope direction; positive on diverging slopes, negative on converging ones.
  • "profile" — curvature parallel to the slope direction; positive on convex (decelerating) slopes, negative on concave (accelerating) ones.
  • "total"2 * (D + E); sign-independent total relief.
  • "mean" — average of the two principal curvatures.
  • "gaussian" — product of the two principal curvatures.

Parameters:

Name Type Description Default
kind str

One of "plan", "profile", "total", "mean", "gaussian". Defaults to "profile".

'profile'

Returns:

Type Description
Dataset

Dataset of float32 curvature values. No-data cells use this

Dataset

DEM's no-data sentinel.

Raises:

Type Description
ValueError

If kind is not one of the five recognised variants.

References

Zevenbergen, L. W. & Thorne, C. R. (1987). "Quantitative analysis of land surface topography." Earth Surface Processes and Landforms 12(1): 47-56.

Examples:

  • Every curvature variant is zero on a flat DEM:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(np.allclose( ... DEM(ds.raster).curvature(kind="total").read_array(), 0.0 ... )) True

  • Mean curvature equals total / 2 on a paraboloid (interior):

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM x, y = np.meshgrid(np.arange(-3, 4), np.arange(-3, 4)) z = (-(x * x + y * y)).astype(np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) dem = DEM(ds.raster) total = dem.curvature(kind="total").read_array()[2:-2, 2:-2] mean = dem.curvature(kind="mean").read_array()[2:-2, 2:-2] bool(np.allclose(mean, total / 2.0, atol=1e-5)) True

Source code in src/digitalrivers/dem.py
def curvature(self, kind: str = "profile") -> Dataset:
    """Surface curvature (Zevenbergen & Thorne 1987).

    Fits a partial quartic polynomial `z = Ax²y² + Bx²y + Cxy² + Dx² +
    Ey² + Fxy + Gx + Hy + I` to the 3×3 neighbourhood of each cell and
    evaluates one of the five canonical curvature variants from the
    coefficient grid:

    * `"plan"` — curvature perpendicular to the slope direction;
      positive on diverging slopes, negative on converging ones.
    * `"profile"` — curvature parallel to the slope direction; positive
      on convex (decelerating) slopes, negative on concave (accelerating)
      ones.
    * `"total"` — `2 * (D + E)`; sign-independent total relief.
    * `"mean"` — average of the two principal curvatures.
    * `"gaussian"` — product of the two principal curvatures.

    Args:
        kind: One of `"plan"`, `"profile"`, `"total"`, `"mean"`,
            `"gaussian"`. Defaults to `"profile"`.

    Returns:
        `Dataset` of float32 curvature values. No-data cells use this
        DEM's no-data sentinel.

    Raises:
        ValueError: If `kind` is not one of the five recognised
            variants.

    References:
        Zevenbergen, L. W. & Thorne, C. R. (1987). "Quantitative
        analysis of land surface topography." *Earth Surface Processes
        and Landforms* 12(1): 47-56.

    Examples:
        - Every curvature variant is zero on a flat DEM:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(np.allclose(
            ...     DEM(ds.raster).curvature(kind="total").read_array(), 0.0
            ... ))
            True

        - Mean curvature equals total / 2 on a paraboloid (interior):

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> x, y = np.meshgrid(np.arange(-3, 4), np.arange(-3, 4))
            >>> z = (-(x * x + y * y)).astype(np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> dem = DEM(ds.raster)
            >>> total = dem.curvature(kind="total").read_array()[2:-2, 2:-2]
            >>> mean = dem.curvature(kind="mean").read_array()[2:-2, 2:-2]
            >>> bool(np.allclose(mean, total / 2.0, atol=1e-5))
            True
    """
    if kind not in ("plan", "profile", "total", "mean", "gaussian"):
        raise ValueError(
            f"kind must be one of 'plan', 'profile', 'total', 'mean', "
            f"'gaussian'; got {kind!r}"
        )
    z = self.values.astype(np.float64, copy=False)
    L = float(abs(self.geotransform[1]))
    # 3×3 stencil — `np.pad(mode="edge")` mirrors the boundary value so
    # finite differences stay defined at the raster edge.
    zp = np.pad(np.where(np.isnan(z), 0.0, z), 1, mode="edge")
    z1, z2, z3 = zp[:-2, :-2], zp[:-2, 1:-1], zp[:-2, 2:]
    z4, z5, z6 = zp[1:-1, :-2], zp[1:-1, 1:-1], zp[1:-1, 2:]
    z7, z8, z9 = zp[2:, :-2], zp[2:, 1:-1], zp[2:, 2:]
    # Zevenbergen-Thorne polynomial coefficients.
    D = ((z4 + z6) / 2.0 - z5) / (L * L)
    E = ((z2 + z8) / 2.0 - z5) / (L * L)
    F = (-z1 + z3 + z7 - z9) / (4.0 * L * L)
    G = (-z4 + z6) / (2.0 * L)
    H = (z2 - z8) / (2.0 * L)
    denom = G * G + H * H + 1.0e-12
    with np.errstate(invalid="ignore", divide="ignore"):
        if kind == "plan":
            arr = -2.0 * (D * H * H + E * G * G - F * G * H) / denom
        elif kind == "profile":
            arr = 2.0 * (D * G * G + E * H * H + F * G * H) / denom
        elif kind == "total":
            arr = 2.0 * (D + E)
        elif kind == "mean":
            arr = D + E
        else:  # gaussian
            arr = 4.0 * D * E - F * F
    out = arr.astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(z) | ~np.isfinite(out), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

normal_vector_deviation(window=3) #

Per-cell angular deviation of the surface normal from its focal mean.

Computes each cell's outward-pointing surface normal from finite differences of the elevation grid, then takes the focal-mean of the unit-normal components in a window×window neighbourhood. The result at each cell is the angle (in radians) between the local normal and the focal-mean normal — a roughness metric that grows with how strongly the surface bends within the window.

Parameters:

Name Type Description Default
window int

Side length of the focal window in cells (≥ 1).

3

Returns:

Type Description
Dataset

Dataset of float32 angular deviations in radians,

Dataset

[0, π/2]. No-data cells use this DEM's no-data sentinel.

Examples:

  • Flat terrain yields zero angular deviation:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(np.allclose( ... DEM(ds.raster).normal_vector_deviation(window=3).read_array(), ... 0.0, atol=1e-5, ... )) True

  • A constant-slope plane has identical normals in its deep interior, so deviation there is ~0:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM x, y = np.meshgrid(np.arange(7), np.arange(7)) z = (2.0 * x + y).astype(np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) arr = DEM(ds.raster).normal_vector_deviation(window=3).read_array() bool(np.allclose(arr[2:-2, 2:-2], 0.0, atol=1e-4)) True

Source code in src/digitalrivers/dem.py
def normal_vector_deviation(self, window: int = 3) -> Dataset:
    """Per-cell angular deviation of the surface normal from its focal mean.

    Computes each cell's outward-pointing surface normal from finite
    differences of the elevation grid, then takes the focal-mean of the
    unit-normal components in a `window×window` neighbourhood. The
    result at each cell is the angle (in radians) between the local
    normal and the focal-mean normal — a roughness metric that grows
    with how strongly the surface bends within the window.

    Args:
        window: Side length of the focal window in cells (≥ 1).

    Returns:
        `Dataset` of float32 angular deviations in radians,
        `[0, π/2]`. No-data cells use this DEM's no-data sentinel.

    Examples:
        - Flat terrain yields zero angular deviation:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(np.allclose(
            ...     DEM(ds.raster).normal_vector_deviation(window=3).read_array(),
            ...     0.0, atol=1e-5,
            ... ))
            True

        - A constant-slope plane has identical normals in its deep
          interior, so deviation there is ~0:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> x, y = np.meshgrid(np.arange(7), np.arange(7))
            >>> z = (2.0 * x + y).astype(np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> arr = DEM(ds.raster).normal_vector_deviation(window=3).read_array()
            >>> bool(np.allclose(arr[2:-2, 2:-2], 0.0, atol=1e-4))
            True
    """
    from scipy.ndimage import uniform_filter
    if window < 1:
        raise ValueError(f"window must be >= 1; got {window!r}")
    z = self.values.astype(np.float64, copy=False)
    L = float(abs(self.geotransform[1]))
    # Finite-difference partials with reflective edge padding.
    zp = np.pad(np.where(np.isnan(z), 0.0, z), 1, mode="edge")
    dzdx = (zp[1:-1, 2:] - zp[1:-1, :-2]) / (2.0 * L)
    dzdy = (zp[2:, 1:-1] - zp[:-2, 1:-1]) / (2.0 * L)
    # Outward-pointing normal `(-dz/dx, -dz/dy, 1)`; renormalise to unit.
    nx_raw = -dzdx
    ny_raw = -dzdy
    nz_raw = np.ones_like(z)
    nm = np.sqrt(nx_raw * nx_raw + ny_raw * ny_raw + nz_raw * nz_raw)
    nx = nx_raw / nm
    ny = ny_raw / nm
    nz = nz_raw / nm
    # Focal mean of each component, then renormalise to keep the mean
    # vector unit-length.
    mnx = uniform_filter(nx, size=int(window), mode="reflect")
    mny = uniform_filter(ny, size=int(window), mode="reflect")
    mnz = uniform_filter(nz, size=int(window), mode="reflect")
    mm = np.sqrt(mnx * mnx + mny * mny + mnz * mnz) + 1.0e-12
    mnx /= mm
    mny /= mm
    mnz /= mm
    cos_theta = np.clip(nx * mnx + ny * mny + nz * mnz, -1.0, 1.0)
    out = np.arccos(cos_theta).astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(z), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

openness(*, search_radius=10, kind='positive') #

Topographic openness (Yokoyama 2002).

For each cell, walks outward along 8 azimuths up to search_radius cells and records the maximum elevation angle (positive openness) or the minimum (negative openness) along each walk. The per-cell output is the mean of (π/2 - horizon_angle) across the 8 directions, in radians.

High positive openness marks exposed / high-relief locations; high negative openness marks deep depressions / valley floors.

Parameters:

Name Type Description Default
search_radius int

Maximum walk distance in cells. Must be ≥ 1. Defaults to 10.

10
kind str

"positive" (default) or "negative". Negative openness flips the sign of the elevation difference internally — effectively measuring the local pit / depression depth.

'positive'

Returns:

Type Description
Dataset

Dataset of float32 openness values in radians. No-data cells

Dataset

use this DEM's no-data sentinel.

Raises:

Type Description
ValueError

If kind is not one of "positive" / "negative" or search_radius < 1.

References

Yokoyama, R., Shirasawa, M., & Pike, R. J. (2002). "Visualizing topography by openness: A new application of image processing to digital elevation models." Photogrammetric Engineering and Remote Sensing 68(3): 257-265.

Examples:

  • Flat terrain: every horizon angle is 0, so positive openness is π/2 at every cell:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) arr = DEM(ds.raster).openness(search_radius=2).read_array() bool(np.allclose(arr, np.pi / 2.0, atol=1e-5)) True

  • A peak on flat terrain has strictly larger positive openness than its neighbours (which look up at it):

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.zeros((5, 5), dtype=np.float32) z[2, 2] = 10.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) arr = DEM(ds.raster).openness(search_radius=3).read_array() bool(arr[2, 2] > arr[1, 2]) True

Source code in src/digitalrivers/dem.py
def openness(
    self,
    *,
    search_radius: int = 10,
    kind: str = "positive",
) -> Dataset:
    """Topographic openness (Yokoyama 2002).

    For each cell, walks outward along 8 azimuths up to `search_radius`
    cells and records the maximum elevation angle (positive openness)
    or the minimum (negative openness) along each walk. The per-cell
    output is the mean of `(π/2 - horizon_angle)` across the 8
    directions, in radians.

    High positive openness marks exposed / high-relief locations; high
    negative openness marks deep depressions / valley floors.

    Args:
        search_radius: Maximum walk distance in cells. Must be ≥ 1.
            Defaults to 10.
        kind: `"positive"` (default) or `"negative"`. Negative openness
            flips the sign of the elevation difference internally —
            effectively measuring the local pit / depression depth.

    Returns:
        `Dataset` of float32 openness values in radians. No-data cells
        use this DEM's no-data sentinel.

    Raises:
        ValueError: If `kind` is not one of `"positive"` / `"negative"`
            or `search_radius < 1`.

    References:
        Yokoyama, R., Shirasawa, M., & Pike, R. J. (2002). "Visualizing
        topography by openness: A new application of image processing
        to digital elevation models." *Photogrammetric Engineering and
        Remote Sensing* 68(3): 257-265.

    Examples:
        - Flat terrain: every horizon angle is 0, so positive openness
          is `π/2` at every cell:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> arr = DEM(ds.raster).openness(search_radius=2).read_array()
            >>> bool(np.allclose(arr, np.pi / 2.0, atol=1e-5))
            True

        - A peak on flat terrain has strictly larger positive openness
          than its neighbours (which look up at it):

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.zeros((5, 5), dtype=np.float32)
            >>> z[2, 2] = 10.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> arr = DEM(ds.raster).openness(search_radius=3).read_array()
            >>> bool(arr[2, 2] > arr[1, 2])
            True
    """
    from digitalrivers._numba import horizon_walk_kernel
    if kind not in ("positive", "negative"):
        raise ValueError(
            f"kind must be 'positive' or 'negative'; got {kind!r}"
        )
    if search_radius < 1:
        raise ValueError(
            f"search_radius must be >= 1; got {search_radius!r}"
        )
    z = self.values.astype(np.float64, copy=False)
    # For negative openness, flip the elevation so the kernel's
    # "maximum upward angle" becomes "maximum downward angle" relative
    # to the original surface.
    z_in = (-z if kind == "negative" else z).astype(np.float64, copy=False)
    z_filled = np.where(np.isnan(z_in), 0.0, z_in)
    out = horizon_walk_kernel(
        z_filled, float(abs(self.geotransform[1])), int(search_radius), 0,
    ).astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(z), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

sky_view_factor(*, search_radius=10) #

Sky-view factor (Zakšek et al. 2011).

For each cell, the fraction of the upper hemisphere that is visible from that cell. Walks along 8 azimuths up to search_radius, records the maximum elevation angle along each walk, and returns the mean of (1 - sin(horizon_angle)) across the 8 directions — equivalent to the fraction of an isotropic sky dome not occluded by surrounding terrain.

Shares the horizon-walk kernel with openness (W-27); the two differ only in the per-direction aggregation function.

Parameters:

Name Type Description Default
search_radius int

Maximum walk distance in cells. Must be ≥ 1. Defaults to 10.

10

Returns:

Type Description
Dataset

Dataset of float32 SVF values in [0, 1]. No-data cells use

Dataset

this DEM's no-data sentinel.

Raises:

Type Description
ValueError

If search_radius < 1.

References

Zakšek, K., Oštir, K., & Kokalj, Ž. (2011). "Sky-view factor as a relief visualization technique." Remote Sensing 3(2): 398-415.

Examples:

  • Flat terrain: nothing occludes the sky, SVF = 1 everywhere:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array() bool(np.allclose(arr, 1.0, atol=1e-5)) True

  • A pit surrounded by higher cells reports SVF strictly less than 1 (the walls occlude part of the sky):

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((5, 5), 10.0, dtype=np.float32) z[2, 2] = 0.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array() bool(arr[2, 2] < 1.0) True

Source code in src/digitalrivers/dem.py
def sky_view_factor(
    self,
    *,
    search_radius: int = 10,
) -> Dataset:
    """Sky-view factor (Zakšek et al. 2011).

    For each cell, the fraction of the upper hemisphere that is visible
    from that cell. Walks along 8 azimuths up to `search_radius`,
    records the maximum elevation angle along each walk, and returns
    the mean of `(1 - sin(horizon_angle))` across the 8 directions —
    equivalent to the fraction of an isotropic sky dome not occluded
    by surrounding terrain.

    Shares the horizon-walk kernel with `openness` (W-27); the two
    differ only in the per-direction aggregation function.

    Args:
        search_radius: Maximum walk distance in cells. Must be ≥ 1.
            Defaults to 10.

    Returns:
        `Dataset` of float32 SVF values in `[0, 1]`. No-data cells use
        this DEM's no-data sentinel.

    Raises:
        ValueError: If `search_radius < 1`.

    References:
        Zakšek, K., Oštir, K., & Kokalj, Ž. (2011). "Sky-view factor as
        a relief visualization technique." *Remote Sensing* 3(2):
        398-415.

    Examples:
        - Flat terrain: nothing occludes the sky, SVF = 1 everywhere:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array()
            >>> bool(np.allclose(arr, 1.0, atol=1e-5))
            True

        - A pit surrounded by higher cells reports SVF strictly less
          than 1 (the walls occlude part of the sky):

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((5, 5), 10.0, dtype=np.float32)
            >>> z[2, 2] = 0.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> arr = DEM(ds.raster).sky_view_factor(search_radius=2).read_array()
            >>> bool(arr[2, 2] < 1.0)
            True
    """
    from digitalrivers._numba import horizon_walk_kernel
    if search_radius < 1:
        raise ValueError(
            f"search_radius must be >= 1; got {search_radius!r}"
        )
    z = self.values.astype(np.float64, copy=False)
    z_filled = np.where(np.isnan(z), 0.0, z)
    out = horizon_walk_kernel(
        z_filled, float(abs(self.geotransform[1])), int(search_radius), 1,
    ).astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(z), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

ruggedness(window=3) #

Terrain Ruggedness Index (Riley et al. 1999).

Per-cell mean of absolute elevation differences to every other cell in a window×window neighbourhood. Output unit is the DEM elevation unit (metres). Higher values mark rougher terrain; flat terrain is zero.

Parameters:

Name Type Description Default
window int

Side length of the focal window in cells (≥ 1). Defaults to 3 (Riley's original 3×3 neighbourhood).

3

Returns:

Type Description
Dataset

Dataset of float32 ruggedness values. No-data cells use this

Dataset

DEM's no-data sentinel.

References

Riley, S. J., DeGloria, S. D., & Elliot, R. (1999). "A terrain ruggedness index that quantifies topographic heterogeneity." Intermountain Journal of Sciences 5(1-4): 23-27.

Examples:

  • Flat terrain has zero ruggedness everywhere:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.full((4, 4), 5.0, dtype=np.float32) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) bool(np.allclose( ... DEM(ds.raster).ruggedness(window=3).read_array(), 0.0 ... )) True

  • A peak surrounded by flat terrain contributes positive ruggedness at the peak and its 8-neighbours:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.zeros((5, 5), dtype=np.float32) z[2, 2] = 9.0 ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) tri = DEM(ds.raster).ruggedness(window=3).read_array() bool(tri[2, 2] > 0 and tri[1, 2] > 0 and tri[3, 2] > 0) True

Source code in src/digitalrivers/dem.py
def ruggedness(self, window: int = 3) -> Dataset:
    """Terrain Ruggedness Index (Riley et al. 1999).

    Per-cell mean of absolute elevation differences to every other cell
    in a `window×window` neighbourhood. Output unit is the DEM elevation
    unit (metres). Higher values mark rougher terrain; flat terrain is
    zero.

    Args:
        window: Side length of the focal window in cells (≥ 1).
            Defaults to 3 (Riley's original 3×3 neighbourhood).

    Returns:
        `Dataset` of float32 ruggedness values. No-data cells use this
        DEM's no-data sentinel.

    References:
        Riley, S. J., DeGloria, S. D., & Elliot, R. (1999). "A terrain
        ruggedness index that quantifies topographic heterogeneity."
        *Intermountain Journal of Sciences* 5(1-4): 23-27.

    Examples:
        - Flat terrain has zero ruggedness everywhere:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.full((4, 4), 5.0, dtype=np.float32)
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> bool(np.allclose(
            ...     DEM(ds.raster).ruggedness(window=3).read_array(), 0.0
            ... ))
            True

        - A peak surrounded by flat terrain contributes positive
          ruggedness at the peak and its 8-neighbours:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.zeros((5, 5), dtype=np.float32)
            >>> z[2, 2] = 9.0
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> tri = DEM(ds.raster).ruggedness(window=3).read_array()
            >>> bool(tri[2, 2] > 0 and tri[1, 2] > 0 and tri[3, 2] > 0)
            True
    """
    if window < 1:
        raise ValueError(f"window must be >= 1; got {window!r}")
    z = self.values.astype(np.float64, copy=False)
    z_filled = np.where(np.isnan(z), 0.0, z)
    total = np.zeros_like(z_filled)
    count = 0
    half = int(window) // 2
    for dr in range(-half, half + 1):
        for dc in range(-half, half + 1):
            if dr == 0 and dc == 0:
                continue
            shifted = np.roll(z_filled, shift=(dr, dc), axis=(0, 1))
            total += np.abs(z_filled - shifted)
            count += 1
    out = (total / float(count)).astype(np.float32)
    no_val = float(self.no_data_value[0])
    out = np.where(np.isnan(z), no_val, out)
    return Dataset.create_from_array(
        out, geo=self.geotransform, epsg=self.epsg, no_data_value=no_val,
    )

slope() #

Compute the maximum downhill slope at every cell.

Calculates slopes in all eight D8 directions via _get_8_direction_slopes and returns a raster whose cell values are the maximum slope across the eight neighbours.

Returns:

Name Type Description
Dataset Dataset

Single-band raster with the same geometry as the DEM, containing the maximum slope value per cell.

See Also

Terrain.slope: GDAL-based slope using Horn or Zevenbergen-Thorne algorithms.

Source code in src/digitalrivers/dem.py
def slope(self) -> Dataset:
    """Compute the maximum downhill slope at every cell.

    Calculates slopes in all eight D8 directions via
    `_get_8_direction_slopes` and returns a raster whose cell
    values are the maximum slope across the eight neighbours.

    Returns:
        Dataset: Single-band raster with the same geometry as the
            DEM, containing the maximum slope value per cell.

    See Also:
        Terrain.slope: GDAL-based slope using Horn or
            Zevenbergen-Thorne algorithms.
    """
    slope = self._get_8_direction_slopes()
    max_slope = np.nanmax(slope, axis=2)

    src = self.dataset_like(self, max_slope)
    return src

set_outflow(outflow, direction, inplace=False) #

Assign a fixed flow direction at the basin outfall cell.

Parameters:

Name Type Description Default
outflow GeoDataFrame

GeoDataFrame with point geometry marking the outfall location.

required
direction int

D8 direction code (0–7) to force at the outfall.

required
inplace bool

If True modify the current instance in place; otherwise return a new Dataset.

False

Returns:

Type Description
Dataset

Dataset with the outfall direction applied, or None when

Dataset

inplace is True.

Raises:

Type Description
NotImplementedError

This method is not yet implemented.

Source code in src/digitalrivers/dem.py
def set_outflow(
    self, outflow: GeoDataFrame, direction: int, inplace: bool = False
) -> Dataset:
    """Assign a fixed flow direction at the basin outfall cell.

    Args:
        outflow: GeoDataFrame with point geometry marking the
            outfall location.
        direction: D8 direction code (0–7) to force at the outfall.
        inplace: If `True` modify the current instance in place;
            otherwise return a new `Dataset`.

    Returns:
        Dataset with the outfall direction applied, or `None` when
        *inplace* is `True`.

    Raises:
        NotImplementedError: This method is not yet implemented.
    """
    raise NotImplementedError("set_outflow is not yet implemented.")

flow_direction(method='d8', exponent=1.0, forced=None, seed=None, forced_direction=None) #

Derive a flow-direction raster from the DEM under one of five routing schemes.

Schemes:

  • "d8" (default) — O'Callaghan & Mark (1984). Single-direction steepest descent. Output: 1-band int32 raster of direction codes 0–7 following DIR_OFFSETS.
  • "dinf" — Tarboton (1997). Output: 2-band float32 raster. Band 0 is the aspect angle in radians CCW from east in [0, 2π); band 1 is the slope magnitude along the chosen facet. -1.0 in band 0 marks sinks / no-data.
  • "mfd_quinn" — Quinn et al. (1991). Multi-direction with contour-length weighting. Output: 8-band float32 raster of partition fractions, ordered by DIR_OFFSETS. Per-cell fractions sum to 1.0 (or all zero for sinks).
  • "mfd_holmgren" — Holmgren (1994). Same family as Quinn but tunable exponent (default 1.0 mimics Quinn; 4–6 mimics D8). 8-band output.
  • "rho8" — Fairfield & Leymarie (1991). Stochastic single-direction; cardinal slopes are perturbed before the steepest-neighbour pick. Pass seed for reproducibility. 1-band int32 output like D8.

Parameters:

Name Type Description Default
method str

Routing scheme — one of "d8", "dinf", "mfd_quinn", "mfd_holmgren", "rho8".

'd8'
exponent float

p for mfd_holmgren and mfd_quinn. Ignored otherwise.

1.0
forced GeoDataFrame | None

Optional GeoDataFrame with columns geometry (point) and direction (int 0–7) — cells at the given locations are forced to that D8 direction code regardless of the computed slope. Only meaningful for "d8" and "rho8".

None
seed int | None

Random seed for "rho8" reproducibility.

None
forced_direction GeoDataFrame | None

Deprecated alias for forced. If both are given, forced wins.

None

Returns:

Name Type Description
FlowDirection FlowDirection

typed wrapper carrying the routing scheme and encoding.

Raises:

Type Description
ValueError

If method is unknown.

Source code in src/digitalrivers/dem.py
def flow_direction(
    self,
    method: str = "d8",
    exponent: float = 1.0,
    forced: GeoDataFrame | None = None,
    seed: int | None = None,
    forced_direction: GeoDataFrame | None = None,
) -> FlowDirection:
    """Derive a flow-direction raster from the DEM under one of five routing schemes.

    Schemes:

    * `"d8"` (default) — O'Callaghan & Mark (1984). Single-direction steepest
      descent. Output: 1-band `int32` raster of direction codes 0–7 following
      `DIR_OFFSETS`.
    * `"dinf"` — Tarboton (1997). Output: 2-band `float32` raster. Band 0 is
      the aspect angle in radians CCW from east in `[0, 2π)`; band 1 is the
      slope magnitude along the chosen facet. `-1.0` in band 0 marks sinks /
      no-data.
    * `"mfd_quinn"` — Quinn et al. (1991). Multi-direction with contour-length
      weighting. Output: 8-band `float32` raster of partition fractions,
      ordered by `DIR_OFFSETS`. Per-cell fractions sum to 1.0 (or all zero
      for sinks).
    * `"mfd_holmgren"` — Holmgren (1994). Same family as Quinn but tunable
      `exponent` (default 1.0 mimics Quinn; 4–6 mimics D8). 8-band output.
    * `"rho8"` — Fairfield & Leymarie (1991). Stochastic single-direction;
      cardinal slopes are perturbed before the steepest-neighbour pick. Pass
      `seed` for reproducibility. 1-band `int32` output like D8.

    Args:
        method: Routing scheme — one of `"d8"`, `"dinf"`, `"mfd_quinn"`,
            `"mfd_holmgren"`, `"rho8"`.
        exponent: `p` for `mfd_holmgren` and `mfd_quinn`. Ignored otherwise.
        forced: Optional GeoDataFrame with columns `geometry` (point) and
            `direction` (int 0–7) — cells at the given locations are forced
            to that D8 direction code regardless of the computed slope. Only
            meaningful for `"d8"` and `"rho8"`.
        seed: Random seed for `"rho8"` reproducibility.
        forced_direction: Deprecated alias for `forced`. If both are given,
            `forced` wins.

    Returns:
        FlowDirection: typed wrapper carrying the routing scheme and encoding.

    Raises:
        ValueError: If `method` is unknown.
    """
    if forced is None and forced_direction is not None:
        forced = forced_direction

    valid_methods = {"d8", "dinf", "mfd_quinn", "mfd_holmgren", "rho8"}
    if method not in valid_methods:
        raise ValueError(
            f"method must be one of {sorted(valid_methods)}; got {method!r}"
        )

    elev = self.values
    valid_mask = ~np.isnan(elev)

    if method == "d8":
        slopes = self._get_8_direction_slopes()
        slope_valid = ~np.all(np.isnan(slopes), axis=2)
        valid_cells_mask = valid_mask & slope_valid
        arr = np.full(elev.shape, Dataset.default_no_data_value, dtype=np.int32)
        if valid_cells_mask.any():
            best_dir = np.nanargmax(slopes[valid_cells_mask], axis=1)
            # Only commit a direction where the steepest slope is strictly downhill;
            # cells whose best 8-neighbour is at equal or higher elevation are sinks
            # and stay at the no-data sentinel (spec P5: "max(s_k) ≤ 0 → sink").
            rr, cc = np.where(valid_cells_mask)
            max_slope = slopes[rr, cc, best_dir]
            downhill = max_slope > 0
            arr[rr[downhill], cc[downhill]] = best_dir[downhill]
        if forced is not None:
            indices = self.map_to_array_coordinates(forced)
            for i, ind in enumerate(indices):
                arr[tuple(ind)] = forced.loc[i, "direction"]
        plain_ds = Dataset.create_from_array(
            arr, geo=self.geotransform, epsg=self.epsg,
            no_data_value=self.default_no_data_value,
        )
        return FlowDirection.from_dataset(plain_ds, routing="d8")

    if method == "rho8":
        slopes = self._get_8_direction_slopes()
        rng = np.random.default_rng(seed)
        arr = _rho8_flow_direction(slopes, valid_mask, rng=rng)
        # Replace -1 (sentinel from rho8 helper) with the dataset no-data value.
        arr[arr < 0] = Dataset.default_no_data_value
        if forced is not None:
            indices = self.map_to_array_coordinates(forced)
            for i, ind in enumerate(indices):
                arr[tuple(ind)] = forced.loc[i, "direction"]
        plain_ds = Dataset.create_from_array(
            arr.astype(np.int32, copy=False),
            geo=self.geotransform, epsg=self.epsg,
            no_data_value=self.default_no_data_value,
        )
        return FlowDirection.from_dataset(plain_ds, routing="rho8")

    if method == "dinf":
        angle, magnitude = _dinf_flow_direction(elev, self.cell_size)
        stacked = np.stack([angle, magnitude], axis=0).astype(np.float32, copy=False)
        plain_ds = Dataset.create_from_array(
            stacked, geo=self.geotransform, epsg=self.epsg,
            no_data_value=self.default_no_data_value,
        )
        return FlowDirection.from_dataset(plain_ds, routing="dinf")

    # mfd_quinn or mfd_holmgren
    slopes = self._get_8_direction_slopes()
    weighting = "quinn" if method == "mfd_quinn" else "holmgren"
    fractions = _mfd_flow_direction(
        slopes, valid_mask, weighting=weighting, exponent=exponent,
    )
    # Transpose (rows, cols, 8) -> (8, rows, cols) for pyramids's band-first layout.
    bands = np.transpose(fractions, (2, 0, 1)).astype(np.float32, copy=False)
    plain_ds = Dataset.create_from_array(
        bands, geo=self.geotransform, epsg=self.epsg,
        no_data_value=self.default_no_data_value,
    )
    return FlowDirection.from_dataset(plain_ds, routing=method)

accumulate_flow(r, c, flow_dir, acc, dir_offsets) #

Count upstream cells that drain into (r, c) (iterative).

Uses an explicit stack to perform a depth-first traversal of the flow-direction grid backwards. For every neighbour whose flow direction points toward the current cell, the neighbour is pushed onto the stack. Results are cached in acc so each cell is computed at most once.

Parameters:

Name Type Description Default
r

Row index of the target cell.

required
c

Column index of the target cell.

required
flow_dir

2-D int array of D8 direction codes (0–7).

required
acc

2-D int32 accumulation array. Cells initialised to -1 are unprocessed; non-negative values are cached results.

required
dir_offsets

Direction-offset mapping (see DIR_OFFSETS).

required

Returns:

Type Description
int

Number of upstream cells that drain into (r, c)

int

(excluding the cell itself).

Source code in src/digitalrivers/dem.py
def accumulate_flow(self, r, c, flow_dir, acc, dir_offsets) -> int:
    """Count upstream cells that drain into `(r, c)` (iterative).

    Uses an explicit stack to perform a depth-first traversal of the
    flow-direction grid backwards.  For every neighbour whose flow
    direction points toward the current cell, the neighbour is pushed
    onto the stack.  Results are cached in *acc* so each cell is
    computed at most once.

    Args:
        r: Row index of the target cell.
        c: Column index of the target cell.
        flow_dir: 2-D `int` array of D8 direction codes (0–7).
        acc: 2-D `int32` accumulation array.  Cells initialised to
            `-1` are unprocessed; non-negative values are cached
            results.
        dir_offsets: Direction-offset mapping (see `DIR_OFFSETS`).

    Returns:
        Number of upstream cells that drain into `(r, c)`
        (excluding the cell itself).
    """
    rows, cols = flow_dir.shape

    if not (0 <= r < rows and 0 <= c < cols):
        return 0
    if acc[r, c] >= 0:
        return acc[r, c]

    offsets_list = [
        (d_col, d_row, self.opposite_direction(d_row, d_col, dir_offsets))
        for d_col, d_row in dir_offsets.values()
    ]

    stack = [(r, c, 0, 0)]

    while stack:
        cr, cc, idx, total = stack[-1]

        if acc[cr, cc] >= 0:
            stack.pop()
            if stack:
                pr, pc, pidx, ptotal = stack[-1]
                stack[-1] = (pr, pc, pidx, ptotal + acc[cr, cc] + 1)
            continue

        # Advance through remaining neighbours.
        found_unprocessed = False
        while idx < len(offsets_list):
            d_col, d_row, opp = offsets_list[idx]
            idx += 1
            rr, rc = cr + d_row, cc + d_col
            if not (0 <= rr < rows and 0 <= rc < cols):
                continue
            if flow_dir[rr, rc] != opp:
                continue
            if opp is None:
                continue
            # Neighbour already computed — just add its count.
            if acc[rr, rc] >= 0:
                total += acc[rr, rc] + 1
                continue
            # Neighbour needs processing — save our state and push it.
            stack[-1] = (cr, cc, idx, total)
            stack.append((rr, rc, 0, 0))
            found_unprocessed = True
            break

        if not found_unprocessed:
            # All neighbours processed — finalise this cell.
            acc[cr, cc] = total
            stack.pop()
            if stack:
                pr, pc, pidx, ptotal = stack[-1]
                stack[-1] = (pr, pc, pidx, ptotal + total + 1)

    return acc[r, c]

opposite_direction(dr, dc, dir_offsets) staticmethod #

Return the D8 direction code opposite to the given offset.

Parameters:

Name Type Description Default
dr

Row offset component.

required
dc

Column offset component.

required
dir_offsets

Direction-offset mapping (see DIR_OFFSETS).

required

Returns:

Type Description

int or None: Direction code whose offset is (-dr, -dc),

or None if no match is found.

Source code in src/digitalrivers/dem.py
@staticmethod
def opposite_direction(dr, dc, dir_offsets):
    """Return the D8 direction code opposite to the given offset.

    Args:
        dr: Row offset component.
        dc: Column offset component.
        dir_offsets: Direction-offset mapping (see `DIR_OFFSETS`).

    Returns:
        int or None: Direction code whose offset is `(-dr, -dc)`,
        or `None` if no match is found.
    """
    for d, (d_col, d_row) in dir_offsets.items():
        if d_row == -dr and d_col == -dc:
            return d
    return None

flow_accumulation(flow_direction, weights=None, dir_offsets=None) #

Compute flow accumulation under the given routing scheme.

Generalised dispatcher that delegates to FlowDirection.accumulate(...) and returns an int32 cast for backwards compatibility with the previous D8-only output. For weighted or fractional accumulation, call flow_direction.accumulate(weights) directly to get the underlying Accumulation (float32) instead.

Parameters:

Name Type Description Default
flow_direction

A FlowDirection (preferred — its routing tag dispatches the algorithm) or a bare Dataset (assumed to be a D8 direction-code raster for back-compat).

required
weights Dataset | None

Optional per-cell weight raster aligned to the DEM.

None
dir_offsets dict

Deprecated/ignored. Kept for signature compatibility.

None

Returns:

Name Type Description
Dataset Dataset

int32 accumulation raster. No-data cells retain

Dataset

Dataset.default_no_data_value. Cell values are the count of

Dataset

(or weighted sum over) strictly-upstream cells — the cell's own

Dataset

weight does not contribute to its own value.

Warns:

Type Description
UserWarning

When flow_direction.routing produces fractional accumulations ("dinf", "mfd_quinn", "mfd_holmgren"). The legacy int32 cast truncates these toward zero, which is almost always wrong; call flow_direction.accumulate(...) directly to get the fractional Accumulation raster.

Examples:

  • Compute D8 cell-count accumulation on a small east-flowing DEM and inspect the outlet value:

    import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.array( ... [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]], ... dtype=np.float32, ... ) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) dem = DEM(ds.raster) fd = dem.flow_direction(method="d8") acc = dem.flow_accumulation(fd) int(acc.read_array().max()) > 0 True

  • A D∞ FlowDirection triggers the truncation warning:

    import warnings import numpy as np from pyramids.dataset import Dataset from digitalrivers import DEM z = np.array( ... [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]], ... dtype=np.float32, ... ) ds = Dataset.create_from_array( ... z, top_left_corner=(0.0, 0.0), cell_size=1.0, ... epsg=4326, no_data_value=-9999.0, ... ) dem = DEM(ds.raster) fd = dem.flow_direction(method="dinf") with warnings.catch_warnings(record=True) as caught: ... warnings.simplefilter("always") ... _ = dem.flow_accumulation(fd) any("int32" in str(w.message) for w in caught) True

Source code in src/digitalrivers/dem.py
def flow_accumulation(
    self,
    flow_direction,
    weights: Dataset | None = None,
    dir_offsets: dict = None,
) -> Dataset:
    """Compute flow accumulation under the given routing scheme.

    Generalised dispatcher that delegates to `FlowDirection.accumulate(...)`
    and returns an `int32` cast for backwards compatibility with the
    previous D8-only output. For weighted or fractional accumulation, call
    `flow_direction.accumulate(weights)` directly to get the underlying
    `Accumulation` (float32) instead.

    Args:
        flow_direction: A `FlowDirection` (preferred — its routing tag
            dispatches the algorithm) or a bare `Dataset` (assumed to be
            a D8 direction-code raster for back-compat).
        weights: Optional per-cell weight raster aligned to the DEM.
        dir_offsets: Deprecated/ignored. Kept for signature compatibility.

    Returns:
        Dataset: `int32` accumulation raster. No-data cells retain
        `Dataset.default_no_data_value`. Cell values are the count of
        (or weighted sum over) strictly-upstream cells — the cell's own
        weight does not contribute to its own value.

    Warns:
        UserWarning: When `flow_direction.routing` produces fractional
            accumulations (`"dinf"`, `"mfd_quinn"`, `"mfd_holmgren"`).
            The legacy `int32` cast truncates these toward zero, which is
            almost always wrong; call `flow_direction.accumulate(...)`
            directly to get the fractional `Accumulation` raster.

    Examples:
        - Compute D8 cell-count accumulation on a small east-flowing DEM
          and inspect the outlet value:

            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.array(
            ...     [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]],
            ...     dtype=np.float32,
            ... )
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> dem = DEM(ds.raster)
            >>> fd = dem.flow_direction(method="d8")
            >>> acc = dem.flow_accumulation(fd)
            >>> int(acc.read_array().max()) > 0
            True

        - A D∞ `FlowDirection` triggers the truncation warning:

            >>> import warnings
            >>> import numpy as np
            >>> from pyramids.dataset import Dataset
            >>> from digitalrivers import DEM
            >>> z = np.array(
            ...     [[9, 9, 9, 9], [9, 5, 4, 1], [9, 9, 9, 9]],
            ...     dtype=np.float32,
            ... )
            >>> ds = Dataset.create_from_array(
            ...     z, top_left_corner=(0.0, 0.0), cell_size=1.0,
            ...     epsg=4326, no_data_value=-9999.0,
            ... )
            >>> dem = DEM(ds.raster)
            >>> fd = dem.flow_direction(method="dinf")
            >>> with warnings.catch_warnings(record=True) as caught:
            ...     warnings.simplefilter("always")
            ...     _ = dem.flow_accumulation(fd)
            >>> any("int32" in str(w.message) for w in caught)
            True
    """
    del dir_offsets  # legacy positional kwarg, no longer used
    import warnings

    if not isinstance(flow_direction, FlowDirection):
        # Wrap a bare Dataset as D8 for back-compat callers.
        flow_direction = FlowDirection.from_dataset(flow_direction, routing="d8")

    if flow_direction.routing not in ("d8", "rho8"):
        warnings.warn(
            f"DEM.flow_accumulation casts to int32 and truncates fractional "
            f"accumulations for routing={flow_direction.routing!r}. Call "
            f"flow_direction.accumulate(...) directly to get the float32 "
            f"Accumulation raster.",
            UserWarning,
            stacklevel=2,
        )

    acc = flow_direction.accumulate(weights=weights)
    arr = acc.read_array().astype(np.int32, copy=False)
    # Restore the dataset no-data sentinel where the original DEM is no-data.
    elev = self.values
    nodata_mask = np.isnan(elev)
    arr[nodata_mask] = Dataset.default_no_data_value
    return Dataset.create_from_array(
        arr,
        geo=self.geotransform,
        epsg=self.epsg,
        no_data_value=self.default_no_data_value,
    )

convert_flow_direction_to_cell_indices() #

Convert D8 direction codes to downstream cell row/column indices.

Computes the flow direction from the DEM and translates each direction code into the absolute row and column index of the downstream neighbour.

Returns:

Type Description
ndarray

np.ndarray: 3-D float64 array of shape (rows, columns, 2). Layer 0 holds the downstream row index; layer 1 holds the downstream column index. Cells with no valid direction contain np.nan.

Source code in src/digitalrivers/dem.py
def convert_flow_direction_to_cell_indices(self) -> np.ndarray:
    """Convert D8 direction codes to downstream cell row/column indices.

    Computes the flow direction from the DEM and translates each
    direction code into the absolute row and column index of the
    downstream neighbour.

    Returns:
        np.ndarray: 3-D `float64` array of shape
            `(rows, columns, 2)`.  Layer 0 holds the downstream
            row index; layer 1 holds the downstream column index.
            Cells with no valid direction contain `np.nan`.
    """
    flow_direction = self.flow_direction()
    flow_dir = flow_direction.read_array(band=0).astype(np.float32)
    no_val = flow_direction.no_data_value[0]
    flow_dir[np.isclose(flow_dir, no_val, rtol=0.00001)] = np.nan

    rows, cols = flow_dir.shape
    valid = ~np.isnan(flow_dir)

    # Build lookup arrays from DIR_OFFSETS (index 0 = first tuple
    # element, index 1 = second tuple element, matching the
    # original loop: cell[i,j,0] = i + offset[0]).
    offset_0 = np.array([DIR_OFFSETS[d][0] for d in range(8)], dtype=np.float64)
    offset_1 = np.array([DIR_OFFSETS[d][1] for d in range(8)], dtype=np.float64)

    flow_direction_cell = np.full((rows, cols, 2), np.nan, dtype=np.float64)

    dir_idx = flow_dir[valid].astype(int)
    row_idx, col_idx = np.where(valid)
    flow_direction_cell[valid, 0] = row_idx + offset_0[dir_idx]
    flow_direction_cell[valid, 1] = col_idx + offset_1[dir_idx]

    return flow_direction_cell

delete_basins(basins, path) staticmethod #

Keep only the basin with the lowest ID and discard the rest.

Reads a basin-ID raster produced during catchment delineation, replaces every cell that does not belong to the lowest basin ID with the no-data value, and writes the result to path.

Parameters:

Name Type Description Default
basins Dataset

Dataset whose cell values are basin IDs (integers). The lowest unique basin ID (excluding no-data) is retained.

required
path str

Output GeoTIFF file path (must end with ".tif").

required

Raises:

Type Description
TypeError

If path is not a string.

Source code in src/digitalrivers/dem.py
@staticmethod
def delete_basins(basins: Dataset, path: str):
    """Keep only the basin with the lowest ID and discard the rest.

    Reads a basin-ID raster produced during catchment delineation,
    replaces every cell that does not belong to the lowest basin ID
    with the no-data value, and writes the result to *path*.

    Args:
        basins: Dataset whose cell values are basin IDs (integers).
            The lowest unique basin ID (excluding no-data) is
            retained.
        path: Output GeoTIFF file path (must end with `".tif"`).

    Raises:
        TypeError: If *path* is not a string.
    """
    if not isinstance(path, str):
        raise TypeError(f"path: {path} input should be string type")

    basins_a = basins.read_array()
    no_val = np.float32(basins.no_data_value[0])

    valid_mask = basins_a != no_val
    unique_basins = np.unique(basins_a[valid_mask]).astype(int)

    if len(unique_basins) > 0:
        keep = unique_basins[0]
        basins_a[valid_mask & (basins_a != keep)] = no_val

    Dataset.dataset_like(basins, basins_a, path)