Skip to content

Hydrological Methods#

statista.time_series.hydrological #

Hydrological methods mixin for TimeSeries.

Hydrological #

Bases: _TimeSeriesStub

Hydrology-specific analysis methods for TimeSeries.

Implements flow duration curves, baseflow separation, annual extremes, and hydrological indices commonly used in water resources engineering.

Source code in src\statista\time_series\hydrological.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
class Hydrological(_TimeSeriesStub):
    """Hydrology-specific analysis methods for TimeSeries.

    Implements flow duration curves, baseflow separation, annual extremes, and
    hydrological indices commonly used in water resources engineering.
    """

    def flow_duration_curve(
        self,
        log_scale: bool = True,
        method: str = "weibull",
        column: str = None,
        plot: bool = True,
        **kwargs: Any,
    ) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
        """Compute and plot the flow duration curve (FDC).

        The FDC is the most widely used plot in hydrology. It shows the percentage
        of time a given flow value is equalled or exceeded.

        Args:
            log_scale: Use log scale for the y-axis. Default True.
            method: Plotting position formula.
                - "weibull": i / (n+1). Default.
                - "gringorten": (i - 0.44) / (n + 0.12). Recommended for Gumbel/GEV.
                - "cunnane": (i - 0.4) / (n + 0.2).
            column: Column to analyze. If None, overlays all columns.
            plot: Whether to produce a plot. Default True.
            **kwargs: Passed to ``_adjust_axes_labels``.

        Returns:
            tuple: (fdc_df, (fig, ax)) or (fdc_df, None).
                fdc_df has columns: value, exceedance_pct for single column,
                or one value column per series.

        Examples:
            Compute a flow duration curve from absolute random flow data:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> ts = TimeSeries(np.abs(np.random.randn(365)) * 100)
            >>> fdc, _ = ts.flow_duration_curve(plot=False)
            >>> list(fdc.columns)
            ['value', 'exceedance_pct']
            >>> round(float(fdc["value"].iloc[0]), 4)
            385.2731
            >>> round(float(fdc["exceedance_pct"].iloc[0]), 4)
            0.2732

            Use the Gringorten plotting position for GEV-distributed extremes:

            >>> fdc2, _ = ts.flow_duration_curve(plot=False, method="gringorten")
            >>> round(float(fdc2["exceedance_pct"].iloc[0]), 4)
            0.1534

        References:
            Vogel, R.M. and Fennessey, N.M. (1994). Flow-Duration Curves. I: New Interpretation
            and Confidence Intervals. Journal of Water Resources Planning and Management, 120(4).
        """
        cols = [column] if column is not None else list(self.columns)

        all_results = []
        for col in cols:
            data = np.sort(self[col].dropna().values)[::-1]  # Sort descending
            n = len(data)

            # Validate non-empty data after removing NaN values
            if n == 0:
                raise ValueError(
                    f"Column '{col}' contains no valid data after removing NaN values"
                )

            # Warn about negative flows (physically invalid in hydrology)
            if (data < 0).any():
                warnings.warn(
                    f"Column '{col}' contains negative values, which may be invalid for flow data",
                    UserWarning
                )

            ranks = np.arange(1, n + 1)

            if method == "weibull":
                exceedance = ranks / (n + 1) * 100
            elif method == "gringorten":
                # Gringorten (1963): a=0.44, optimized for Gumbel/GEV distributions
                exceedance = (ranks - 0.44) / (n + 0.12) * 100  # type: ignore[assignment]
            elif method == "cunnane":
                # Cunnane (1978): a=0.4, approximately unbiased for most distributions
                exceedance = (ranks - 0.4) / (n + 0.2) * 100  # type: ignore[assignment]
            else:
                raise ValueError(
                    f"Unknown method '{method}'. Choose from 'weibull', 'gringorten', 'cunnane'."
                )

            all_results.append((col, data, exceedance))

        # Build result DataFrame
        if len(cols) == 1:
            col, data, exceedance = all_results[0]
            fdc_df = DataFrame({"value": data, "exceedance_pct": exceedance})
        else:
            # Each column may have different lengths after dropna, so build per-column
            frames = []
            for col, col_data, exc in all_results:
                frames.append(DataFrame({col: col_data, f"{col}_exceedance_pct": exc}))
            fdc_df = pd.concat(frames, axis=1)

        fig_ax: tuple[Figure, Axes] | None = None
        if plot:
            fig, ax = self._get_ax_fig(**kwargs)
            kwargs.pop("fig", None)
            kwargs.pop("ax", None)

            for col, data, exceedance in all_results:
                ax.plot(exceedance, data, linewidth=1.2, label=col)

            if log_scale:
                ax.set_yscale("log")

            if "title" not in kwargs:
                kwargs["title"] = "Flow Duration Curve"
            if "xlabel" not in kwargs:
                kwargs["xlabel"] = "Exceedance Probability (%)"
            if "ylabel" not in kwargs:
                kwargs["ylabel"] = "Value"
            if len(cols) > 1 and "legend" not in kwargs:
                kwargs["legend"] = cols

            ax = self._adjust_axes_labels(ax, **kwargs)
            plt.show()
            fig_ax = (fig, ax)

        return fdc_df, fig_ax

    def annual_extremes(
        self,
        kind: str = "max",
        water_year_start: str = "YE-OCT",
        column: str = None,
    ) -> Any:
        """Extract annual maxima or minima series.

        Args:
            kind: "max" for annual maxima, "min" for annual minima. Default "max".
            water_year_start: Pandas offset alias for resampling rule defining the
                water year. Default "YE-OCT" (Oct-Sep water year).
            column: Column to extract. If None, extracts from all columns.

        Returns:
            TimeSeries: New TimeSeries with one value per year.

        Raises:
            ValueError: If kind is not "max" or "min".

        Examples:
            Extract annual maximum series from two years of daily data:

            >>> import numpy as np
            >>> import pandas as pd
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> idx = pd.date_range("2000-01-01", periods=730, freq="D")
            >>> ts = TimeSeries(np.random.randn(730), index=idx)
            >>> ams = ts.annual_extremes(kind="max")
            >>> ams.shape[0]
            3
            >>> [round(float(v), 4) for v in ams.values.flatten()]
            [3.8527, 3.0789, 1.7548]

            Extract annual minimum series:

            >>> amn = ts.annual_extremes(kind="min")
            >>> [round(float(v), 4) for v in amn.values.flatten()]
            [-3.2413, -2.6969, -2.0819]
        """
        from statista.time_series import TimeSeries

        if column is not None:
            data = self[column]
        else:
            data = self

        if kind == "max":
            result = data.resample(water_year_start).max()
        elif kind == "min":
            result = data.resample(water_year_start).min()
        else:
            raise ValueError(f"kind must be 'max' or 'min', got '{kind}'.")

        result_vals = (
            result.values if result.values.ndim == 2 else result.values.reshape(-1, 1)
        )
        cols = (
            list(result.columns)
            if hasattr(result, "columns")
            else [column or self.columns[0]]
        )
        ts_result = TimeSeries(result_vals, index=result.index, columns=cols)
        return ts_result

    def exceedance_probability(
        self,
        method: str = "weibull",
        column: str = None,
    ) -> DataFrame:
        """Compute empirical exceedance probability for each value.

        Args:
            method: Plotting position formula — "weibull", "gringorten", or "cunnane".
                Default "weibull".
            column: Column to analyze. If None, analyzes all columns.

        Returns:
            pandas.DataFrame: Sorted by value (descending) with columns:
                value, exceedance_probability, return_period.

        Examples:
            Compute exceedance probability and return periods using Weibull:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> ts = TimeSeries(np.array([10.0, 20.0, 30.0, 40.0, 50.0]))
            >>> result = ts.exceedance_probability()
            >>> [round(float(v), 4) for v in result["exceedance_probability"].values]
            [0.1667, 0.3333, 0.5, 0.6667, 0.8333]
            >>> [round(float(v), 1) for v in result["return_period"].values]
            [6.0, 3.0, 2.0, 1.5, 1.2]

            Use Gringorten plotting positions (better for GEV):

            >>> result2 = ts.exceedance_probability(method="gringorten")
            >>> [round(float(v), 4) for v in result2["exceedance_probability"].values]
            [0.1094, 0.3047, 0.5, 0.6953, 0.8906]
        """
        cols = [column] if column is not None else list(self.columns)
        frames = []

        for col in cols:
            data = np.sort(self[col].dropna().values)[::-1]
            n = len(data)
            ranks = np.arange(1, n + 1)

            if method == "weibull":
                exc = ranks / (n + 1)
            elif method == "gringorten":
                exc = (ranks - 0.44) / (n + 0.12)  # type: ignore[assignment]
            elif method == "cunnane":
                exc = (ranks - 0.4) / (n + 0.2)  # type: ignore[assignment]
            else:
                raise ValueError(f"Unknown method '{method}'.")

            rp = 1.0 / exc

            frame = DataFrame(
                {
                    "column": col,
                    "value": data,
                    "exceedance_probability": exc,
                    "return_period": rp,
                }
            )
            frames.append(frame)

        result = frames[0] if len(frames) == 1 else pd.concat(frames, ignore_index=True)
        return result

    def baseflow_separation(
        self,
        method: str = "lyne_hollick",
        alpha: float = 0.925,
        column: str = None,
        plot: bool = True,
        **kwargs: Any,
    ) -> tuple[Any, tuple[Figure, Axes] | None]:
        """Separate streamflow into baseflow and quickflow.

        Args:
            method: Separation method.
                - "lyne_hollick": Digital filter (Lyne & Hollick, 1979).
                  ``b_t = alpha * b_{t-1} + (1-alpha)/2 * (q_t + q_{t-1})``
                - "eckhardt": Two-parameter filter (Eckhardt, 2005).
                  Extra param via kwargs: bfi_max (default 0.80).
                - "chapman_maxwell": One-parameter filter (Chapman & Maxwell, 1996).
                  ``b_t = k/(2-k) * b_{t-1} + (1-k)/(2-k) * q_t``
            alpha: Filter coefficient. Default 0.925.
            column: Column to analyze. If None, uses first column.
            plot: Whether to produce a hydrograph with baseflow shading. Default True.
            **kwargs: bfi_max for Eckhardt, or passed to ``_adjust_axes_labels``.

        Returns:
            tuple: (separation_df, (fig, ax)) or (separation_df, None).
                separation_df has columns: total_flow, baseflow, quickflow.

        Examples:
            Separate baseflow using Lyne-Hollick digital filter:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
            >>> result, _ = ts.baseflow_separation(plot=False)
            >>> list(result.columns)
            ['total_flow', 'baseflow', 'quickflow']
            >>> round(float(result["baseflow"].mean()), 4)
            7.5446
            >>> round(float(result["quickflow"].mean()), 4)
            4.8504

            First time step has zero quickflow (baseflow equals total flow):

            >>> round(float(result["quickflow"].iloc[0]), 4)
            0.0

        References:
            Lyne, V. and Hollick, M. (1979). Stochastic time-variable rainfall-runoff
            modelling. Inst. Eng. Aust. Natl. Conf.

            Eckhardt, K. (2005). How to construct recursive digital filters for baseflow
            separation. Hydrological Processes, 19(2), 507-515.
        """
        if column is None:
            column = self.columns[0]

        # Check raw input for negative values BEFORE dropna so the warning is not silenced
        # by NaN values that happen to coincide with negative entries.
        raw = self[column].values
        if np.any(raw[~np.isnan(raw)] < 0):
            warnings.warn(
                f"Column '{column}' contains negative values, which may be invalid for flow data",
                UserWarning
            )

        q = self[column].dropna().values.astype(float)
        idx = self[column].dropna().index

        if method == "lyne_hollick":
            baseflow = _lyne_hollick(q, alpha)
        elif method == "eckhardt":
            bfi_max = kwargs.pop("bfi_max", 0.80)
            baseflow = _eckhardt(q, alpha, bfi_max)
        elif method == "chapman_maxwell":
            baseflow = _chapman_maxwell(q, alpha)
        else:
            raise ValueError(
                f"Unknown method '{method}'. Choose from 'lyne_hollick', 'eckhardt', 'chapman_maxwell'."
            )

        quickflow = q - baseflow

        result_df = DataFrame(
            {"total_flow": q, "baseflow": baseflow, "quickflow": quickflow},
            index=idx,
        )

        fig_ax: tuple[Figure, Axes] | None = None
        if plot:
            fig, ax = self._get_ax_fig(**kwargs)
            kwargs.pop("fig", None)
            kwargs.pop("ax", None)

            ax.plot(idx, q, color="steelblue", linewidth=0.8, label="Total flow")
            ax.fill_between(
                idx, 0, baseflow, color="lightblue", alpha=0.6, label="Baseflow"
            )

            if "title" not in kwargs:
                kwargs["title"] = f"Baseflow Separation ({method}) — {column}"
            if "xlabel" not in kwargs:
                kwargs["xlabel"] = "Time"
            if "ylabel" not in kwargs:
                kwargs["ylabel"] = "Flow"

            ax = self._adjust_axes_labels(ax, **kwargs)
            plt.show()
            fig_ax = (fig, ax)

        return result_df, fig_ax

    def baseflow_index(
        self,
        method: str = "lyne_hollick",
        alpha: float = 0.925,
        column: str = None,
    ) -> DataFrame:
        """Compute the Baseflow Index (BFI) — ratio of baseflow to total flow.

        BFI = sum(baseflow) / sum(total_flow). Values near 1 indicate
        groundwater-dominated systems; near 0 indicate flashy systems.

        Args:
            method: Separation method (see ``baseflow_separation``). Default "lyne_hollick".
            alpha: Filter coefficient. Default 0.925.
            column: Column to analyze. If None, analyzes all columns.

        Returns:
            pandas.DataFrame: One row per column with: bfi value.

        Examples:
            Compute baseflow index using the Lyne-Hollick filter:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
            >>> result = ts.baseflow_index()
            >>> round(float(result.loc["Series1", "bfi"]), 4)
            0.6087

            Compare with Eckhardt two-parameter filter:

            >>> result2 = ts.baseflow_index(method="eckhardt")
            >>> round(float(result2.loc["Series1", "bfi"]), 4)
            0.6909
        """
        cols = [column] if column is not None else list(self.columns)
        rows = []

        for col in cols:
            sep_df, _ = self.baseflow_separation(
                method=method, alpha=alpha, column=col, plot=False
            )
            bfi = float(sep_df["baseflow"].sum() / sep_df["total_flow"].sum())
            rows.append({"column": col, "bfi": bfi})

        result = DataFrame(rows).set_index("column")
        return result

    def flashiness_index(self, column: str = None) -> DataFrame:
        """Richards-Baker Flashiness Index.

        Measures the oscillations in flow relative to total flow:
        FI = sum(|Q_t - Q_{t-1}|) / sum(Q_t)

        Higher values indicate flashier (more variable) flow regimes.

        Args:
            column: Column to analyze. If None, analyzes all columns.

        Returns:
            pandas.DataFrame: One row per column with: flashiness value.

        Examples:
            Highly flashy (alternating) flow pattern:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> ts = TimeSeries(np.array([10.0, 50.0, 10.0, 50.0, 10.0]))
            >>> result = ts.flashiness_index()
            >>> round(float(result.loc["Series1", "flashiness"]), 4)
            1.2308

            Steady flow has zero flashiness:

            >>> ts2 = TimeSeries(np.array([10.0, 10.0, 10.0, 10.0, 10.0]))
            >>> result2 = ts2.flashiness_index()
            >>> round(float(result2.loc["Series1", "flashiness"]), 4)
            0.0

            Random flow with moderate flashiness:

            >>> np.random.seed(42)
            >>> ts3 = TimeSeries(np.abs(np.random.randn(100)) * 10 + 5)
            >>> result3 = ts3.flashiness_index()
            >>> round(float(result3.loc["Series1", "flashiness"]), 4)
            0.5231

        References:
            Baker, D.B. et al. (2004). A new flashiness index: characteristics and
            applications to midwestern rivers and streams. JAWRA, 40(2), 503-522.
        """
        cols = [column] if column is not None else list(self.columns)
        rows = []

        for col in cols:
            data = self[col].dropna().values
            fi = (
                float(np.sum(np.abs(np.diff(data))) / np.sum(data))
                if np.sum(data) > 0
                else 0.0
            )
            rows.append({"column": col, "flashiness": fi})

        result = DataFrame(rows).set_index("column")
        return result

    def recession_analysis(
        self,
        min_length: int = 5,
        column: str = None,
        plot: bool = True,
        **kwargs: Any,
    ) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
        """Extract recession segments and fit a master recession curve.

        Identifies periods of monotonically decreasing flow (recession limbs)
        and fits an exponential recession model: Q(t) = Q0 * exp(-t / k),
        where k is the recession constant.

        Args:
            min_length: Minimum number of consecutive decreasing steps to
                qualify as a recession segment. Default 5.
            column: Column to analyze. If None, uses first column.
            plot: Whether to produce a log(Q) vs t recession plot. Default True.
            **kwargs: Passed to ``_adjust_axes_labels``.

        Returns:
            tuple: (recession_df, (fig, ax)) or (recession_df, None).
                recession_df has columns: recession_id, start_index, end_index, length,
                recession_constant_k, r_squared.

        Examples:
            Fit a recession curve to exponential decay with small noise:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> q = 100 * np.exp(-np.arange(50) / 15.0) + np.random.randn(50) * 0.5
            >>> ts = TimeSeries(np.abs(q))
            >>> result, _ = ts.recession_analysis(min_length=3, plot=False)
            >>> len(result)
            5
            >>> round(float(result.iloc[0]["recession_constant_k"]), 4)
            14.8596
            >>> round(float(result.iloc[0]["r_squared"]), 4)
            0.9996

            The first segment spans from the start of the decay:

            >>> int(result.iloc[0]["start_index"])
            0
            >>> int(result.iloc[0]["length"])
            31

        References:
            Tallaksen, L.M. (1995). A review of baseflow recession analysis.
            Journal of Hydrology, 165(1-4), 349-370.
        """
        if column is None:
            column = self.columns[0]

        data = self[column].dropna().values.astype(float)
        n = len(data)

        # Identify recession segments (monotonically decreasing runs)
        segments = []
        start = None
        for i in range(1, n):
            if data[i] < data[i - 1]:
                if start is None:
                    start = i - 1
            else:
                if start is not None:
                    length = i - start
                    if length >= min_length:
                        segments.append((start, i - 1))
                    start = None
        if start is not None and (n - start) >= min_length:
            segments.append((start, n - 1))

        # Fit exponential decay to each segment
        rows = []
        for seg_id, (s, e) in enumerate(segments):
            seg = data[s : e + 1]
            seg_positive = seg[seg > 0]
            if len(seg_positive) < 3:
                continue

            t = np.arange(len(seg_positive), dtype=float)
            log_q = np.log(seg_positive)

            # Linear fit: log(Q) = log(Q0) - t/k  =>  slope = -1/k
            coeffs = np.polyfit(t, log_q, 1)
            slope = coeffs[0]
            k = -1.0 / slope if slope != 0 else np.inf

            # R-squared
            predicted = np.polyval(coeffs, t)
            ss_res = np.sum((log_q - predicted) ** 2)
            ss_tot = np.sum((log_q - np.mean(log_q)) ** 2)
            r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0

            rows.append(
                {
                    "recession_id": seg_id,
                    "start_index": s,
                    "end_index": e,
                    "length": e - s + 1,
                    "recession_constant_k": float(k),
                    "r_squared": float(r_squared),
                }
            )

        recession_df = (
            DataFrame(rows)
            if rows
            else DataFrame(
                columns=[
                    "recession_id",
                    "start_index",
                    "end_index",
                    "length",
                    "recession_constant_k",
                    "r_squared",
                ]
            )
        )

        fig_ax: tuple[Figure, Axes] | None = None
        if plot and len(segments) > 0:
            fig, ax = self._get_ax_fig(**kwargs)
            kwargs.pop("fig", None)
            kwargs.pop("ax", None)

            for s, e in segments:
                seg = data[s : e + 1]
                seg_positive = seg[seg > 0]
                if len(seg_positive) >= 3:
                    t = np.arange(len(seg_positive))
                    ax.plot(
                        t, seg_positive, "o-", markersize=3, linewidth=0.8, alpha=0.6
                    )

            ax.set_yscale("log")

            if "title" not in kwargs:
                kwargs["title"] = f"Recession Analysis — {column}"
            if "xlabel" not in kwargs:
                kwargs["xlabel"] = "Time steps from recession start"
            if "ylabel" not in kwargs:
                kwargs["ylabel"] = "Flow (log scale)"

            ax = self._adjust_axes_labels(ax, **kwargs)
            plt.show()
            fig_ax = (fig, ax)

        return recession_df, fig_ax
flow_duration_curve(log_scale=True, method='weibull', column=None, plot=True, **kwargs) #

Compute and plot the flow duration curve (FDC).

The FDC is the most widely used plot in hydrology. It shows the percentage of time a given flow value is equalled or exceeded.

Parameters:

Name Type Description Default
log_scale bool

Use log scale for the y-axis. Default True.

True
method str

Plotting position formula. - "weibull": i / (n+1). Default. - "gringorten": (i - 0.44) / (n + 0.12). Recommended for Gumbel/GEV. - "cunnane": (i - 0.4) / (n + 0.2).

'weibull'
column str

Column to analyze. If None, overlays all columns.

None
plot bool

Whether to produce a plot. Default True.

True
**kwargs Any

Passed to _adjust_axes_labels.

{}

Returns:

Name Type Description
tuple tuple[DataFrame, tuple[Figure, Axes] | None]

(fdc_df, (fig, ax)) or (fdc_df, None). fdc_df has columns: value, exceedance_pct for single column, or one value column per series.

Examples:

Compute a flow duration curve from absolute random flow data:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> ts = TimeSeries(np.abs(np.random.randn(365)) * 100)
>>> fdc, _ = ts.flow_duration_curve(plot=False)
>>> list(fdc.columns)
['value', 'exceedance_pct']
>>> round(float(fdc["value"].iloc[0]), 4)
385.2731
>>> round(float(fdc["exceedance_pct"].iloc[0]), 4)
0.2732

Use the Gringorten plotting position for GEV-distributed extremes:

>>> fdc2, _ = ts.flow_duration_curve(plot=False, method="gringorten")
>>> round(float(fdc2["exceedance_pct"].iloc[0]), 4)
0.1534
References

Vogel, R.M. and Fennessey, N.M. (1994). Flow-Duration Curves. I: New Interpretation and Confidence Intervals. Journal of Water Resources Planning and Management, 120(4).

Source code in src\statista\time_series\hydrological.py
def flow_duration_curve(
    self,
    log_scale: bool = True,
    method: str = "weibull",
    column: str = None,
    plot: bool = True,
    **kwargs: Any,
) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
    """Compute and plot the flow duration curve (FDC).

    The FDC is the most widely used plot in hydrology. It shows the percentage
    of time a given flow value is equalled or exceeded.

    Args:
        log_scale: Use log scale for the y-axis. Default True.
        method: Plotting position formula.
            - "weibull": i / (n+1). Default.
            - "gringorten": (i - 0.44) / (n + 0.12). Recommended for Gumbel/GEV.
            - "cunnane": (i - 0.4) / (n + 0.2).
        column: Column to analyze. If None, overlays all columns.
        plot: Whether to produce a plot. Default True.
        **kwargs: Passed to ``_adjust_axes_labels``.

    Returns:
        tuple: (fdc_df, (fig, ax)) or (fdc_df, None).
            fdc_df has columns: value, exceedance_pct for single column,
            or one value column per series.

    Examples:
        Compute a flow duration curve from absolute random flow data:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> ts = TimeSeries(np.abs(np.random.randn(365)) * 100)
        >>> fdc, _ = ts.flow_duration_curve(plot=False)
        >>> list(fdc.columns)
        ['value', 'exceedance_pct']
        >>> round(float(fdc["value"].iloc[0]), 4)
        385.2731
        >>> round(float(fdc["exceedance_pct"].iloc[0]), 4)
        0.2732

        Use the Gringorten plotting position for GEV-distributed extremes:

        >>> fdc2, _ = ts.flow_duration_curve(plot=False, method="gringorten")
        >>> round(float(fdc2["exceedance_pct"].iloc[0]), 4)
        0.1534

    References:
        Vogel, R.M. and Fennessey, N.M. (1994). Flow-Duration Curves. I: New Interpretation
        and Confidence Intervals. Journal of Water Resources Planning and Management, 120(4).
    """
    cols = [column] if column is not None else list(self.columns)

    all_results = []
    for col in cols:
        data = np.sort(self[col].dropna().values)[::-1]  # Sort descending
        n = len(data)

        # Validate non-empty data after removing NaN values
        if n == 0:
            raise ValueError(
                f"Column '{col}' contains no valid data after removing NaN values"
            )

        # Warn about negative flows (physically invalid in hydrology)
        if (data < 0).any():
            warnings.warn(
                f"Column '{col}' contains negative values, which may be invalid for flow data",
                UserWarning
            )

        ranks = np.arange(1, n + 1)

        if method == "weibull":
            exceedance = ranks / (n + 1) * 100
        elif method == "gringorten":
            # Gringorten (1963): a=0.44, optimized for Gumbel/GEV distributions
            exceedance = (ranks - 0.44) / (n + 0.12) * 100  # type: ignore[assignment]
        elif method == "cunnane":
            # Cunnane (1978): a=0.4, approximately unbiased for most distributions
            exceedance = (ranks - 0.4) / (n + 0.2) * 100  # type: ignore[assignment]
        else:
            raise ValueError(
                f"Unknown method '{method}'. Choose from 'weibull', 'gringorten', 'cunnane'."
            )

        all_results.append((col, data, exceedance))

    # Build result DataFrame
    if len(cols) == 1:
        col, data, exceedance = all_results[0]
        fdc_df = DataFrame({"value": data, "exceedance_pct": exceedance})
    else:
        # Each column may have different lengths after dropna, so build per-column
        frames = []
        for col, col_data, exc in all_results:
            frames.append(DataFrame({col: col_data, f"{col}_exceedance_pct": exc}))
        fdc_df = pd.concat(frames, axis=1)

    fig_ax: tuple[Figure, Axes] | None = None
    if plot:
        fig, ax = self._get_ax_fig(**kwargs)
        kwargs.pop("fig", None)
        kwargs.pop("ax", None)

        for col, data, exceedance in all_results:
            ax.plot(exceedance, data, linewidth=1.2, label=col)

        if log_scale:
            ax.set_yscale("log")

        if "title" not in kwargs:
            kwargs["title"] = "Flow Duration Curve"
        if "xlabel" not in kwargs:
            kwargs["xlabel"] = "Exceedance Probability (%)"
        if "ylabel" not in kwargs:
            kwargs["ylabel"] = "Value"
        if len(cols) > 1 and "legend" not in kwargs:
            kwargs["legend"] = cols

        ax = self._adjust_axes_labels(ax, **kwargs)
        plt.show()
        fig_ax = (fig, ax)

    return fdc_df, fig_ax
annual_extremes(kind='max', water_year_start='YE-OCT', column=None) #

Extract annual maxima or minima series.

Parameters:

Name Type Description Default
kind str

"max" for annual maxima, "min" for annual minima. Default "max".

'max'
water_year_start str

Pandas offset alias for resampling rule defining the water year. Default "YE-OCT" (Oct-Sep water year).

'YE-OCT'
column str

Column to extract. If None, extracts from all columns.

None

Returns:

Name Type Description
TimeSeries Any

New TimeSeries with one value per year.

Raises:

Type Description
ValueError

If kind is not "max" or "min".

Examples:

Extract annual maximum series from two years of daily data:

>>> import numpy as np
>>> import pandas as pd
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> idx = pd.date_range("2000-01-01", periods=730, freq="D")
>>> ts = TimeSeries(np.random.randn(730), index=idx)
>>> ams = ts.annual_extremes(kind="max")
>>> ams.shape[0]
3
>>> [round(float(v), 4) for v in ams.values.flatten()]
[3.8527, 3.0789, 1.7548]

Extract annual minimum series:

>>> amn = ts.annual_extremes(kind="min")
>>> [round(float(v), 4) for v in amn.values.flatten()]
[-3.2413, -2.6969, -2.0819]
Source code in src\statista\time_series\hydrological.py
def annual_extremes(
    self,
    kind: str = "max",
    water_year_start: str = "YE-OCT",
    column: str = None,
) -> Any:
    """Extract annual maxima or minima series.

    Args:
        kind: "max" for annual maxima, "min" for annual minima. Default "max".
        water_year_start: Pandas offset alias for resampling rule defining the
            water year. Default "YE-OCT" (Oct-Sep water year).
        column: Column to extract. If None, extracts from all columns.

    Returns:
        TimeSeries: New TimeSeries with one value per year.

    Raises:
        ValueError: If kind is not "max" or "min".

    Examples:
        Extract annual maximum series from two years of daily data:

        >>> import numpy as np
        >>> import pandas as pd
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> idx = pd.date_range("2000-01-01", periods=730, freq="D")
        >>> ts = TimeSeries(np.random.randn(730), index=idx)
        >>> ams = ts.annual_extremes(kind="max")
        >>> ams.shape[0]
        3
        >>> [round(float(v), 4) for v in ams.values.flatten()]
        [3.8527, 3.0789, 1.7548]

        Extract annual minimum series:

        >>> amn = ts.annual_extremes(kind="min")
        >>> [round(float(v), 4) for v in amn.values.flatten()]
        [-3.2413, -2.6969, -2.0819]
    """
    from statista.time_series import TimeSeries

    if column is not None:
        data = self[column]
    else:
        data = self

    if kind == "max":
        result = data.resample(water_year_start).max()
    elif kind == "min":
        result = data.resample(water_year_start).min()
    else:
        raise ValueError(f"kind must be 'max' or 'min', got '{kind}'.")

    result_vals = (
        result.values if result.values.ndim == 2 else result.values.reshape(-1, 1)
    )
    cols = (
        list(result.columns)
        if hasattr(result, "columns")
        else [column or self.columns[0]]
    )
    ts_result = TimeSeries(result_vals, index=result.index, columns=cols)
    return ts_result
exceedance_probability(method='weibull', column=None) #

Compute empirical exceedance probability for each value.

Parameters:

Name Type Description Default
method str

Plotting position formula — "weibull", "gringorten", or "cunnane". Default "weibull".

'weibull'
column str

Column to analyze. If None, analyzes all columns.

None

Returns:

Type Description
DataFrame

pandas.DataFrame: Sorted by value (descending) with columns: value, exceedance_probability, return_period.

Examples:

Compute exceedance probability and return periods using Weibull:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> ts = TimeSeries(np.array([10.0, 20.0, 30.0, 40.0, 50.0]))
>>> result = ts.exceedance_probability()
>>> [round(float(v), 4) for v in result["exceedance_probability"].values]
[0.1667, 0.3333, 0.5, 0.6667, 0.8333]
>>> [round(float(v), 1) for v in result["return_period"].values]
[6.0, 3.0, 2.0, 1.5, 1.2]

Use Gringorten plotting positions (better for GEV):

>>> result2 = ts.exceedance_probability(method="gringorten")
>>> [round(float(v), 4) for v in result2["exceedance_probability"].values]
[0.1094, 0.3047, 0.5, 0.6953, 0.8906]
Source code in src\statista\time_series\hydrological.py
def exceedance_probability(
    self,
    method: str = "weibull",
    column: str = None,
) -> DataFrame:
    """Compute empirical exceedance probability for each value.

    Args:
        method: Plotting position formula — "weibull", "gringorten", or "cunnane".
            Default "weibull".
        column: Column to analyze. If None, analyzes all columns.

    Returns:
        pandas.DataFrame: Sorted by value (descending) with columns:
            value, exceedance_probability, return_period.

    Examples:
        Compute exceedance probability and return periods using Weibull:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> ts = TimeSeries(np.array([10.0, 20.0, 30.0, 40.0, 50.0]))
        >>> result = ts.exceedance_probability()
        >>> [round(float(v), 4) for v in result["exceedance_probability"].values]
        [0.1667, 0.3333, 0.5, 0.6667, 0.8333]
        >>> [round(float(v), 1) for v in result["return_period"].values]
        [6.0, 3.0, 2.0, 1.5, 1.2]

        Use Gringorten plotting positions (better for GEV):

        >>> result2 = ts.exceedance_probability(method="gringorten")
        >>> [round(float(v), 4) for v in result2["exceedance_probability"].values]
        [0.1094, 0.3047, 0.5, 0.6953, 0.8906]
    """
    cols = [column] if column is not None else list(self.columns)
    frames = []

    for col in cols:
        data = np.sort(self[col].dropna().values)[::-1]
        n = len(data)
        ranks = np.arange(1, n + 1)

        if method == "weibull":
            exc = ranks / (n + 1)
        elif method == "gringorten":
            exc = (ranks - 0.44) / (n + 0.12)  # type: ignore[assignment]
        elif method == "cunnane":
            exc = (ranks - 0.4) / (n + 0.2)  # type: ignore[assignment]
        else:
            raise ValueError(f"Unknown method '{method}'.")

        rp = 1.0 / exc

        frame = DataFrame(
            {
                "column": col,
                "value": data,
                "exceedance_probability": exc,
                "return_period": rp,
            }
        )
        frames.append(frame)

    result = frames[0] if len(frames) == 1 else pd.concat(frames, ignore_index=True)
    return result
baseflow_separation(method='lyne_hollick', alpha=0.925, column=None, plot=True, **kwargs) #

Separate streamflow into baseflow and quickflow.

Parameters:

Name Type Description Default
method str

Separation method. - "lyne_hollick": Digital filter (Lyne & Hollick, 1979). b_t = alpha * b_{t-1} + (1-alpha)/2 * (q_t + q_{t-1}) - "eckhardt": Two-parameter filter (Eckhardt, 2005). Extra param via kwargs: bfi_max (default 0.80). - "chapman_maxwell": One-parameter filter (Chapman & Maxwell, 1996). b_t = k/(2-k) * b_{t-1} + (1-k)/(2-k) * q_t

'lyne_hollick'
alpha float

Filter coefficient. Default 0.925.

0.925
column str

Column to analyze. If None, uses first column.

None
plot bool

Whether to produce a hydrograph with baseflow shading. Default True.

True
**kwargs Any

bfi_max for Eckhardt, or passed to _adjust_axes_labels.

{}

Returns:

Name Type Description
tuple tuple[Any, tuple[Figure, Axes] | None]

(separation_df, (fig, ax)) or (separation_df, None). separation_df has columns: total_flow, baseflow, quickflow.

Examples:

Separate baseflow using Lyne-Hollick digital filter:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
>>> result, _ = ts.baseflow_separation(plot=False)
>>> list(result.columns)
['total_flow', 'baseflow', 'quickflow']
>>> round(float(result["baseflow"].mean()), 4)
7.5446
>>> round(float(result["quickflow"].mean()), 4)
4.8504

First time step has zero quickflow (baseflow equals total flow):

>>> round(float(result["quickflow"].iloc[0]), 4)
0.0
References

Lyne, V. and Hollick, M. (1979). Stochastic time-variable rainfall-runoff modelling. Inst. Eng. Aust. Natl. Conf.

Eckhardt, K. (2005). How to construct recursive digital filters for baseflow separation. Hydrological Processes, 19(2), 507-515.

Source code in src\statista\time_series\hydrological.py
def baseflow_separation(
    self,
    method: str = "lyne_hollick",
    alpha: float = 0.925,
    column: str = None,
    plot: bool = True,
    **kwargs: Any,
) -> tuple[Any, tuple[Figure, Axes] | None]:
    """Separate streamflow into baseflow and quickflow.

    Args:
        method: Separation method.
            - "lyne_hollick": Digital filter (Lyne & Hollick, 1979).
              ``b_t = alpha * b_{t-1} + (1-alpha)/2 * (q_t + q_{t-1})``
            - "eckhardt": Two-parameter filter (Eckhardt, 2005).
              Extra param via kwargs: bfi_max (default 0.80).
            - "chapman_maxwell": One-parameter filter (Chapman & Maxwell, 1996).
              ``b_t = k/(2-k) * b_{t-1} + (1-k)/(2-k) * q_t``
        alpha: Filter coefficient. Default 0.925.
        column: Column to analyze. If None, uses first column.
        plot: Whether to produce a hydrograph with baseflow shading. Default True.
        **kwargs: bfi_max for Eckhardt, or passed to ``_adjust_axes_labels``.

    Returns:
        tuple: (separation_df, (fig, ax)) or (separation_df, None).
            separation_df has columns: total_flow, baseflow, quickflow.

    Examples:
        Separate baseflow using Lyne-Hollick digital filter:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
        >>> result, _ = ts.baseflow_separation(plot=False)
        >>> list(result.columns)
        ['total_flow', 'baseflow', 'quickflow']
        >>> round(float(result["baseflow"].mean()), 4)
        7.5446
        >>> round(float(result["quickflow"].mean()), 4)
        4.8504

        First time step has zero quickflow (baseflow equals total flow):

        >>> round(float(result["quickflow"].iloc[0]), 4)
        0.0

    References:
        Lyne, V. and Hollick, M. (1979). Stochastic time-variable rainfall-runoff
        modelling. Inst. Eng. Aust. Natl. Conf.

        Eckhardt, K. (2005). How to construct recursive digital filters for baseflow
        separation. Hydrological Processes, 19(2), 507-515.
    """
    if column is None:
        column = self.columns[0]

    # Check raw input for negative values BEFORE dropna so the warning is not silenced
    # by NaN values that happen to coincide with negative entries.
    raw = self[column].values
    if np.any(raw[~np.isnan(raw)] < 0):
        warnings.warn(
            f"Column '{column}' contains negative values, which may be invalid for flow data",
            UserWarning
        )

    q = self[column].dropna().values.astype(float)
    idx = self[column].dropna().index

    if method == "lyne_hollick":
        baseflow = _lyne_hollick(q, alpha)
    elif method == "eckhardt":
        bfi_max = kwargs.pop("bfi_max", 0.80)
        baseflow = _eckhardt(q, alpha, bfi_max)
    elif method == "chapman_maxwell":
        baseflow = _chapman_maxwell(q, alpha)
    else:
        raise ValueError(
            f"Unknown method '{method}'. Choose from 'lyne_hollick', 'eckhardt', 'chapman_maxwell'."
        )

    quickflow = q - baseflow

    result_df = DataFrame(
        {"total_flow": q, "baseflow": baseflow, "quickflow": quickflow},
        index=idx,
    )

    fig_ax: tuple[Figure, Axes] | None = None
    if plot:
        fig, ax = self._get_ax_fig(**kwargs)
        kwargs.pop("fig", None)
        kwargs.pop("ax", None)

        ax.plot(idx, q, color="steelblue", linewidth=0.8, label="Total flow")
        ax.fill_between(
            idx, 0, baseflow, color="lightblue", alpha=0.6, label="Baseflow"
        )

        if "title" not in kwargs:
            kwargs["title"] = f"Baseflow Separation ({method}) — {column}"
        if "xlabel" not in kwargs:
            kwargs["xlabel"] = "Time"
        if "ylabel" not in kwargs:
            kwargs["ylabel"] = "Flow"

        ax = self._adjust_axes_labels(ax, **kwargs)
        plt.show()
        fig_ax = (fig, ax)

    return result_df, fig_ax
baseflow_index(method='lyne_hollick', alpha=0.925, column=None) #

Compute the Baseflow Index (BFI) — ratio of baseflow to total flow.

BFI = sum(baseflow) / sum(total_flow). Values near 1 indicate groundwater-dominated systems; near 0 indicate flashy systems.

Parameters:

Name Type Description Default
method str

Separation method (see baseflow_separation). Default "lyne_hollick".

'lyne_hollick'
alpha float

Filter coefficient. Default 0.925.

0.925
column str

Column to analyze. If None, analyzes all columns.

None

Returns:

Type Description
DataFrame

pandas.DataFrame: One row per column with: bfi value.

Examples:

Compute baseflow index using the Lyne-Hollick filter:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
>>> result = ts.baseflow_index()
>>> round(float(result.loc["Series1", "bfi"]), 4)
0.6087

Compare with Eckhardt two-parameter filter:

>>> result2 = ts.baseflow_index(method="eckhardt")
>>> round(float(result2.loc["Series1", "bfi"]), 4)
0.6909
Source code in src\statista\time_series\hydrological.py
def baseflow_index(
    self,
    method: str = "lyne_hollick",
    alpha: float = 0.925,
    column: str = None,
) -> DataFrame:
    """Compute the Baseflow Index (BFI) — ratio of baseflow to total flow.

    BFI = sum(baseflow) / sum(total_flow). Values near 1 indicate
    groundwater-dominated systems; near 0 indicate flashy systems.

    Args:
        method: Separation method (see ``baseflow_separation``). Default "lyne_hollick".
        alpha: Filter coefficient. Default 0.925.
        column: Column to analyze. If None, analyzes all columns.

    Returns:
        pandas.DataFrame: One row per column with: bfi value.

    Examples:
        Compute baseflow index using the Lyne-Hollick filter:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> ts = TimeSeries(np.abs(np.random.randn(200)) * 10 + 5)
        >>> result = ts.baseflow_index()
        >>> round(float(result.loc["Series1", "bfi"]), 4)
        0.6087

        Compare with Eckhardt two-parameter filter:

        >>> result2 = ts.baseflow_index(method="eckhardt")
        >>> round(float(result2.loc["Series1", "bfi"]), 4)
        0.6909
    """
    cols = [column] if column is not None else list(self.columns)
    rows = []

    for col in cols:
        sep_df, _ = self.baseflow_separation(
            method=method, alpha=alpha, column=col, plot=False
        )
        bfi = float(sep_df["baseflow"].sum() / sep_df["total_flow"].sum())
        rows.append({"column": col, "bfi": bfi})

    result = DataFrame(rows).set_index("column")
    return result
flashiness_index(column=None) #

Richards-Baker Flashiness Index.

Measures the oscillations in flow relative to total flow: FI = sum(|Q_t - Q_{t-1}|) / sum(Q_t)

Higher values indicate flashier (more variable) flow regimes.

Parameters:

Name Type Description Default
column str

Column to analyze. If None, analyzes all columns.

None

Returns:

Type Description
DataFrame

pandas.DataFrame: One row per column with: flashiness value.

Examples:

Highly flashy (alternating) flow pattern:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> ts = TimeSeries(np.array([10.0, 50.0, 10.0, 50.0, 10.0]))
>>> result = ts.flashiness_index()
>>> round(float(result.loc["Series1", "flashiness"]), 4)
1.2308

Steady flow has zero flashiness:

>>> ts2 = TimeSeries(np.array([10.0, 10.0, 10.0, 10.0, 10.0]))
>>> result2 = ts2.flashiness_index()
>>> round(float(result2.loc["Series1", "flashiness"]), 4)
0.0

Random flow with moderate flashiness:

>>> np.random.seed(42)
>>> ts3 = TimeSeries(np.abs(np.random.randn(100)) * 10 + 5)
>>> result3 = ts3.flashiness_index()
>>> round(float(result3.loc["Series1", "flashiness"]), 4)
0.5231
References

Baker, D.B. et al. (2004). A new flashiness index: characteristics and applications to midwestern rivers and streams. JAWRA, 40(2), 503-522.

Source code in src\statista\time_series\hydrological.py
def flashiness_index(self, column: str = None) -> DataFrame:
    """Richards-Baker Flashiness Index.

    Measures the oscillations in flow relative to total flow:
    FI = sum(|Q_t - Q_{t-1}|) / sum(Q_t)

    Higher values indicate flashier (more variable) flow regimes.

    Args:
        column: Column to analyze. If None, analyzes all columns.

    Returns:
        pandas.DataFrame: One row per column with: flashiness value.

    Examples:
        Highly flashy (alternating) flow pattern:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> ts = TimeSeries(np.array([10.0, 50.0, 10.0, 50.0, 10.0]))
        >>> result = ts.flashiness_index()
        >>> round(float(result.loc["Series1", "flashiness"]), 4)
        1.2308

        Steady flow has zero flashiness:

        >>> ts2 = TimeSeries(np.array([10.0, 10.0, 10.0, 10.0, 10.0]))
        >>> result2 = ts2.flashiness_index()
        >>> round(float(result2.loc["Series1", "flashiness"]), 4)
        0.0

        Random flow with moderate flashiness:

        >>> np.random.seed(42)
        >>> ts3 = TimeSeries(np.abs(np.random.randn(100)) * 10 + 5)
        >>> result3 = ts3.flashiness_index()
        >>> round(float(result3.loc["Series1", "flashiness"]), 4)
        0.5231

    References:
        Baker, D.B. et al. (2004). A new flashiness index: characteristics and
        applications to midwestern rivers and streams. JAWRA, 40(2), 503-522.
    """
    cols = [column] if column is not None else list(self.columns)
    rows = []

    for col in cols:
        data = self[col].dropna().values
        fi = (
            float(np.sum(np.abs(np.diff(data))) / np.sum(data))
            if np.sum(data) > 0
            else 0.0
        )
        rows.append({"column": col, "flashiness": fi})

    result = DataFrame(rows).set_index("column")
    return result
recession_analysis(min_length=5, column=None, plot=True, **kwargs) #

Extract recession segments and fit a master recession curve.

Identifies periods of monotonically decreasing flow (recession limbs) and fits an exponential recession model: Q(t) = Q0 * exp(-t / k), where k is the recession constant.

Parameters:

Name Type Description Default
min_length int

Minimum number of consecutive decreasing steps to qualify as a recession segment. Default 5.

5
column str

Column to analyze. If None, uses first column.

None
plot bool

Whether to produce a log(Q) vs t recession plot. Default True.

True
**kwargs Any

Passed to _adjust_axes_labels.

{}

Returns:

Name Type Description
tuple tuple[DataFrame, tuple[Figure, Axes] | None]

(recession_df, (fig, ax)) or (recession_df, None). recession_df has columns: recession_id, start_index, end_index, length, recession_constant_k, r_squared.

Examples:

Fit a recession curve to exponential decay with small noise:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> q = 100 * np.exp(-np.arange(50) / 15.0) + np.random.randn(50) * 0.5
>>> ts = TimeSeries(np.abs(q))
>>> result, _ = ts.recession_analysis(min_length=3, plot=False)
>>> len(result)
5
>>> round(float(result.iloc[0]["recession_constant_k"]), 4)
14.8596
>>> round(float(result.iloc[0]["r_squared"]), 4)
0.9996

The first segment spans from the start of the decay:

>>> int(result.iloc[0]["start_index"])
0
>>> int(result.iloc[0]["length"])
31
References

Tallaksen, L.M. (1995). A review of baseflow recession analysis. Journal of Hydrology, 165(1-4), 349-370.

Source code in src\statista\time_series\hydrological.py
def recession_analysis(
    self,
    min_length: int = 5,
    column: str = None,
    plot: bool = True,
    **kwargs: Any,
) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
    """Extract recession segments and fit a master recession curve.

    Identifies periods of monotonically decreasing flow (recession limbs)
    and fits an exponential recession model: Q(t) = Q0 * exp(-t / k),
    where k is the recession constant.

    Args:
        min_length: Minimum number of consecutive decreasing steps to
            qualify as a recession segment. Default 5.
        column: Column to analyze. If None, uses first column.
        plot: Whether to produce a log(Q) vs t recession plot. Default True.
        **kwargs: Passed to ``_adjust_axes_labels``.

    Returns:
        tuple: (recession_df, (fig, ax)) or (recession_df, None).
            recession_df has columns: recession_id, start_index, end_index, length,
            recession_constant_k, r_squared.

    Examples:
        Fit a recession curve to exponential decay with small noise:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> q = 100 * np.exp(-np.arange(50) / 15.0) + np.random.randn(50) * 0.5
        >>> ts = TimeSeries(np.abs(q))
        >>> result, _ = ts.recession_analysis(min_length=3, plot=False)
        >>> len(result)
        5
        >>> round(float(result.iloc[0]["recession_constant_k"]), 4)
        14.8596
        >>> round(float(result.iloc[0]["r_squared"]), 4)
        0.9996

        The first segment spans from the start of the decay:

        >>> int(result.iloc[0]["start_index"])
        0
        >>> int(result.iloc[0]["length"])
        31

    References:
        Tallaksen, L.M. (1995). A review of baseflow recession analysis.
        Journal of Hydrology, 165(1-4), 349-370.
    """
    if column is None:
        column = self.columns[0]

    data = self[column].dropna().values.astype(float)
    n = len(data)

    # Identify recession segments (monotonically decreasing runs)
    segments = []
    start = None
    for i in range(1, n):
        if data[i] < data[i - 1]:
            if start is None:
                start = i - 1
        else:
            if start is not None:
                length = i - start
                if length >= min_length:
                    segments.append((start, i - 1))
                start = None
    if start is not None and (n - start) >= min_length:
        segments.append((start, n - 1))

    # Fit exponential decay to each segment
    rows = []
    for seg_id, (s, e) in enumerate(segments):
        seg = data[s : e + 1]
        seg_positive = seg[seg > 0]
        if len(seg_positive) < 3:
            continue

        t = np.arange(len(seg_positive), dtype=float)
        log_q = np.log(seg_positive)

        # Linear fit: log(Q) = log(Q0) - t/k  =>  slope = -1/k
        coeffs = np.polyfit(t, log_q, 1)
        slope = coeffs[0]
        k = -1.0 / slope if slope != 0 else np.inf

        # R-squared
        predicted = np.polyval(coeffs, t)
        ss_res = np.sum((log_q - predicted) ** 2)
        ss_tot = np.sum((log_q - np.mean(log_q)) ** 2)
        r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0

        rows.append(
            {
                "recession_id": seg_id,
                "start_index": s,
                "end_index": e,
                "length": e - s + 1,
                "recession_constant_k": float(k),
                "r_squared": float(r_squared),
            }
        )

    recession_df = (
        DataFrame(rows)
        if rows
        else DataFrame(
            columns=[
                "recession_id",
                "start_index",
                "end_index",
                "length",
                "recession_constant_k",
                "r_squared",
            ]
        )
    )

    fig_ax: tuple[Figure, Axes] | None = None
    if plot and len(segments) > 0:
        fig, ax = self._get_ax_fig(**kwargs)
        kwargs.pop("fig", None)
        kwargs.pop("ax", None)

        for s, e in segments:
            seg = data[s : e + 1]
            seg_positive = seg[seg > 0]
            if len(seg_positive) >= 3:
                t = np.arange(len(seg_positive))
                ax.plot(
                    t, seg_positive, "o-", markersize=3, linewidth=0.8, alpha=0.6
                )

        ax.set_yscale("log")

        if "title" not in kwargs:
            kwargs["title"] = f"Recession Analysis — {column}"
        if "xlabel" not in kwargs:
            kwargs["xlabel"] = "Time steps from recession start"
        if "ylabel" not in kwargs:
            kwargs["ylabel"] = "Flow (log scale)"

        ax = self._adjust_axes_labels(ax, **kwargs)
        plt.show()
        fig_ax = (fig, ax)

    return recession_df, fig_ax