Skip to content

Decomposition & Smoothing#

statista.time_series.decomposition #

Decomposition and smoothing mixin for TimeSeries.

Decomposition #

Bases: _TimeSeriesStub

Time series decomposition and smoothing methods.

Source code in src/statista/time_series/decomposition.py
class Decomposition(_TimeSeriesStub):
    """Time series decomposition and smoothing methods."""

    def classical_decompose(
        self,
        period: int = None,
        model: str = "additive",
        column: str = None,
        plot: bool = True,
        **kwargs: Any,
    ) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
        """Classical seasonal decomposition using moving averages.

        Decomposes the time series into trend, seasonal, and residual components.

        - Additive: Y(t) = Trend(t) + Seasonal(t) + Residual(t)
        - Multiplicative: Y(t) = Trend(t) * Seasonal(t) * Residual(t)

        Implemented from scratch (no statsmodels dependency).

        Args:
            period: Length of the seasonal cycle (e.g., 12 for monthly data with
                annual seasonality, 7 for daily data with weekly seasonality).
                **Required** — there is no auto-detection.
            model: "additive" or "multiplicative". Default "additive".
            column: Column to decompose. If None, uses first column.
            plot: Whether to produce a 4-panel decomposition plot. Default True.
            **kwargs: Passed to ``_adjust_axes_labels``.

        Returns:
            tuple: (decomposition_df, (fig, axes)) or (decomposition_df, None).
                decomposition_df has columns: observed, trend, seasonal, residual.

        Raises:
            ValueError: If period is None or data length < 2 * period.

        Examples:
            Decompose a synthetic monthly series with trend and seasonality:

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> t = np.arange(120)
            >>> seasonal = 5 * np.sin(2 * np.pi * t / 12)
            >>> trend = 0.1 * t
            >>> data = trend + seasonal + np.random.randn(120) * 0.5
            >>> ts = TimeSeries(data)
            >>> result, _ = ts.classical_decompose(period=12, plot=False)
            >>> list(result.columns)
            ['observed', 'trend', 'seasonal', 'residual']
            >>> result.shape
            (120, 4)
            >>> round(float(result["trend"].dropna().mean()), 4)
            5.8866

            Verify seasonal component captures the pattern:

            >>> round(float(result["seasonal"].std()), 4)
            3.495

            Decompose a shorter series with stronger trend:

            >>> np.random.seed(42)
            >>> t2 = np.arange(48)
            >>> data2 = 10 + 0.5 * t2 + 3 * np.sin(2 * np.pi * t2 / 12) + np.random.randn(48) * 0.3
            >>> ts2 = TimeSeries(data2)
            >>> result2, _ = ts2.classical_decompose(period=12, plot=False)
            >>> round(float(result2["trend"].iloc[24]), 4)
            21.3941

        References:
            Persons, W.M. (1919). Indices of business conditions.
            Review of Economics and Statistics, 1(1), 5-107.
        """
        if period is None:
            raise ValueError(
                "period must be specified (e.g., 12 for monthly, 7 for daily)."
            )

        if column is None:
            column = self.columns[0]

        data = self[column].dropna().values.astype(float)
        n = len(data)
        idx = self[column].dropna().index

        if n < 2 * period:
            raise ValueError(f"Data length ({n}) must be >= 2 * period ({2 * period}).")

        # Step 1: Trend via centered moving average
        trend = _centered_moving_average(data, period)

        # Step 2: Detrended series
        if model == "additive":
            detrended = data - trend
        elif model == "multiplicative":
            detrended = data / np.where(trend != 0, trend, np.nan)
        else:
            raise ValueError(
                f"model must be 'additive' or 'multiplicative', got '{model}'."
            )

        # Step 3: Seasonal component (average of detrended values at each position in the cycle)
        seasonal = np.zeros(n)
        for i in range(period):
            indices = np.arange(i, n, period)
            valid = detrended[indices]
            valid = valid[~np.isnan(valid)]
            season_mean = np.mean(valid) if len(valid) > 0 else 0.0
            seasonal[indices] = season_mean

        # Center the seasonal component (should sum to ~0 for additive)
        if model == "additive":
            seasonal -= np.mean(seasonal[:period])

        # Step 4: Residual
        if model == "additive":
            residual = data - trend - seasonal
        else:
            residual = data / (
                np.where(trend != 0, trend, np.nan)
                * np.where(seasonal != 0, seasonal, np.nan)
            )

        result_df = DataFrame(
            {
                "observed": data,
                "trend": trend,
                "seasonal": seasonal,
                "residual": residual,
            },
            index=idx,
        )

        fig_ax: tuple[Figure, Axes] | None = None
        if plot:
            fig_ax = _plot_decomposition(result_df, column, model, **kwargs)

        return result_df, fig_ax

    def smooth(
        self,
        method: str = "moving_average",
        window: int = 10,
        **params: Any,
    ) -> Any:
        """Apply smoothing to the time series.

        Args:
            method: Smoothing method.
                - "moving_average": Centered moving average via pandas rolling.
                - "exponential": Exponential weighted moving average via pandas ewm.
                - "savgol": Savitzky-Golay filter (preserves peaks better than MA).
                  Extra param: polyorder (default 2).
            window: Window size. For savgol, must be odd. Default 10.
            **params: Method-specific parameters (e.g., polyorder for savgol).

        Returns:
            TimeSeries: New TimeSeries with smoothed values. Same index as original.

        Examples:
            Moving average smoothing (NaN at edges where window is incomplete):

            >>> import numpy as np
            >>> from statista.time_series import TimeSeries
            >>> np.random.seed(42)
            >>> ts = TimeSeries(np.random.randn(100))
            >>> smoothed = ts.smooth(method="moving_average", window=10)
            >>> smoothed.shape
            (100, 1)
            >>> round(float(smoothed.dropna().iloc[0, 0]), 4)
            0.4481

            Exponential weighted moving average (no NaN values):

            >>> np.random.seed(42)
            >>> ts2 = TimeSeries(np.random.randn(100))
            >>> smoothed2 = ts2.smooth(method="exponential", window=10)
            >>> round(float(smoothed2.iloc[0, 0]), 4)
            0.4967
            >>> round(float(smoothed2.iloc[2, 0]), 4)
            0.3486

            Savitzky-Golay filter preserves peaks better (reduces std less aggressively):

            >>> np.random.seed(42)
            >>> ts3 = TimeSeries(np.random.randn(100))
            >>> smoothed3 = ts3.smooth(method="savgol", window=11, polyorder=2)
            >>> round(float(smoothed3.iloc[0, 0]), 4)
            0.2353
            >>> round(float(smoothed3.values.std() / ts3.values.std()), 4)
            0.4354
        """
        from statista.time_series import TimeSeries

        if method == "moving_average":
            smoothed_df = self.rolling(window, center=True).mean()
            result = TimeSeries(
                smoothed_df.values,
                index=self.index,
                columns=list(self.columns),
            )
        elif method == "exponential":
            smoothed_df = self.ewm(span=window).mean()
            result = TimeSeries(
                smoothed_df.values,
                index=self.index,
                columns=list(self.columns),
            )
        elif method == "savgol":
            polyorder = params.get("polyorder", 2)
            # savgol requires odd window
            win = window if window % 2 == 1 else window + 1
            result_data = np.empty_like(self.values, dtype=float)
            for i, col in enumerate(self.columns):
                data = self[col].values.astype(float)
                result_data[:, i] = savgol_filter(
                    data, window_length=win, polyorder=polyorder
                )
            result = TimeSeries(
                result_data,
                index=self.index,
                columns=list(self.columns),
            )
        else:
            raise ValueError(
                f"Unknown method '{method}'. Choose from 'moving_average', 'exponential', 'savgol'."
            )

        return result

    def envelope(
        self,
        window: int = 30,
        lower_pct: float = 5,
        upper_pct: float = 95,
        column: str = None,
        **kwargs: Any,
    ) -> tuple[Figure, Axes]:
        """Plot the time series with rolling percentile envelope bands.

        Shows the natural variability range of the data over a rolling window.

        Args:
            window: Rolling window size. Default 30.
            lower_pct: Lower percentile for the band (0-100). Default 5.
            upper_pct: Upper percentile for the band (0-100). Default 95.
            column: Column to plot. If None, uses first column.
            **kwargs: Passed to ``_adjust_axes_labels``.

        Returns:
            tuple: (Figure, Axes)

        Examples:
            >>> import numpy as np  # doctest: +SKIP
            >>> from statista.time_series import TimeSeries  # doctest: +SKIP
            >>> ts = TimeSeries(np.random.randn(200))  # doctest: +SKIP
            >>> fig, ax = ts.envelope(window=20)  # doctest: +SKIP
        """
        if column is None:
            column = self.columns[0]

        series = self[column]
        rolling_lower = series.rolling(window).quantile(lower_pct / 100.0)
        rolling_upper = series.rolling(window).quantile(upper_pct / 100.0)
        rolling_median = series.rolling(window).median()

        fig, ax = self._get_ax_fig(**kwargs)
        kwargs.pop("fig", None)
        kwargs.pop("ax", None)

        ax.plot(
            series.index,
            series.values,
            color="steelblue",
            alpha=0.4,
            linewidth=0.5,
            label="Data",
        )
        ax.plot(
            series.index,
            rolling_median.values,
            color="darkblue",
            linewidth=1.2,
            label="Rolling median",
        )
        ax.fill_between(
            series.index,
            rolling_lower.values,
            rolling_upper.values,
            color="lightblue",
            alpha=0.4,
            label=f"{lower_pct}%-{upper_pct}% envelope",
        )

        if "title" not in kwargs:
            kwargs["title"] = f"Envelope — {column}"
        if "xlabel" not in kwargs:
            kwargs["xlabel"] = "Index"
        if "ylabel" not in kwargs:
            kwargs["ylabel"] = "Value"

        ax = self._adjust_axes_labels(ax, **kwargs)
        plt.show()
        return fig, ax
classical_decompose(period=None, model='additive', column=None, plot=True, **kwargs) #

Classical seasonal decomposition using moving averages.

Decomposes the time series into trend, seasonal, and residual components.

  • Additive: Y(t) = Trend(t) + Seasonal(t) + Residual(t)
  • Multiplicative: Y(t) = Trend(t) * Seasonal(t) * Residual(t)

Implemented from scratch (no statsmodels dependency).

Parameters:

Name Type Description Default
period int

Length of the seasonal cycle (e.g., 12 for monthly data with annual seasonality, 7 for daily data with weekly seasonality). Required — there is no auto-detection.

None
model str

"additive" or "multiplicative". Default "additive".

'additive'
column str

Column to decompose. If None, uses first column.

None
plot bool

Whether to produce a 4-panel decomposition plot. Default True.

True
**kwargs Any

Passed to _adjust_axes_labels.

{}

Returns:

Name Type Description
tuple tuple[DataFrame, tuple[Figure, Axes] | None]

(decomposition_df, (fig, axes)) or (decomposition_df, None). decomposition_df has columns: observed, trend, seasonal, residual.

Raises:

Type Description
ValueError

If period is None or data length < 2 * period.

Examples:

Decompose a synthetic monthly series with trend and seasonality:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> t = np.arange(120)
>>> seasonal = 5 * np.sin(2 * np.pi * t / 12)
>>> trend = 0.1 * t
>>> data = trend + seasonal + np.random.randn(120) * 0.5
>>> ts = TimeSeries(data)
>>> result, _ = ts.classical_decompose(period=12, plot=False)
>>> list(result.columns)
['observed', 'trend', 'seasonal', 'residual']
>>> result.shape
(120, 4)
>>> round(float(result["trend"].dropna().mean()), 4)
5.8866

Verify seasonal component captures the pattern:

>>> round(float(result["seasonal"].std()), 4)
3.495

Decompose a shorter series with stronger trend:

>>> np.random.seed(42)
>>> t2 = np.arange(48)
>>> data2 = 10 + 0.5 * t2 + 3 * np.sin(2 * np.pi * t2 / 12) + np.random.randn(48) * 0.3
>>> ts2 = TimeSeries(data2)
>>> result2, _ = ts2.classical_decompose(period=12, plot=False)
>>> round(float(result2["trend"].iloc[24]), 4)
21.3941
References

Persons, W.M. (1919). Indices of business conditions. Review of Economics and Statistics, 1(1), 5-107.

Source code in src/statista/time_series/decomposition.py
def classical_decompose(
    self,
    period: int = None,
    model: str = "additive",
    column: str = None,
    plot: bool = True,
    **kwargs: Any,
) -> tuple[DataFrame, tuple[Figure, Axes] | None]:
    """Classical seasonal decomposition using moving averages.

    Decomposes the time series into trend, seasonal, and residual components.

    - Additive: Y(t) = Trend(t) + Seasonal(t) + Residual(t)
    - Multiplicative: Y(t) = Trend(t) * Seasonal(t) * Residual(t)

    Implemented from scratch (no statsmodels dependency).

    Args:
        period: Length of the seasonal cycle (e.g., 12 for monthly data with
            annual seasonality, 7 for daily data with weekly seasonality).
            **Required** — there is no auto-detection.
        model: "additive" or "multiplicative". Default "additive".
        column: Column to decompose. If None, uses first column.
        plot: Whether to produce a 4-panel decomposition plot. Default True.
        **kwargs: Passed to ``_adjust_axes_labels``.

    Returns:
        tuple: (decomposition_df, (fig, axes)) or (decomposition_df, None).
            decomposition_df has columns: observed, trend, seasonal, residual.

    Raises:
        ValueError: If period is None or data length < 2 * period.

    Examples:
        Decompose a synthetic monthly series with trend and seasonality:

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> t = np.arange(120)
        >>> seasonal = 5 * np.sin(2 * np.pi * t / 12)
        >>> trend = 0.1 * t
        >>> data = trend + seasonal + np.random.randn(120) * 0.5
        >>> ts = TimeSeries(data)
        >>> result, _ = ts.classical_decompose(period=12, plot=False)
        >>> list(result.columns)
        ['observed', 'trend', 'seasonal', 'residual']
        >>> result.shape
        (120, 4)
        >>> round(float(result["trend"].dropna().mean()), 4)
        5.8866

        Verify seasonal component captures the pattern:

        >>> round(float(result["seasonal"].std()), 4)
        3.495

        Decompose a shorter series with stronger trend:

        >>> np.random.seed(42)
        >>> t2 = np.arange(48)
        >>> data2 = 10 + 0.5 * t2 + 3 * np.sin(2 * np.pi * t2 / 12) + np.random.randn(48) * 0.3
        >>> ts2 = TimeSeries(data2)
        >>> result2, _ = ts2.classical_decompose(period=12, plot=False)
        >>> round(float(result2["trend"].iloc[24]), 4)
        21.3941

    References:
        Persons, W.M. (1919). Indices of business conditions.
        Review of Economics and Statistics, 1(1), 5-107.
    """
    if period is None:
        raise ValueError(
            "period must be specified (e.g., 12 for monthly, 7 for daily)."
        )

    if column is None:
        column = self.columns[0]

    data = self[column].dropna().values.astype(float)
    n = len(data)
    idx = self[column].dropna().index

    if n < 2 * period:
        raise ValueError(f"Data length ({n}) must be >= 2 * period ({2 * period}).")

    # Step 1: Trend via centered moving average
    trend = _centered_moving_average(data, period)

    # Step 2: Detrended series
    if model == "additive":
        detrended = data - trend
    elif model == "multiplicative":
        detrended = data / np.where(trend != 0, trend, np.nan)
    else:
        raise ValueError(
            f"model must be 'additive' or 'multiplicative', got '{model}'."
        )

    # Step 3: Seasonal component (average of detrended values at each position in the cycle)
    seasonal = np.zeros(n)
    for i in range(period):
        indices = np.arange(i, n, period)
        valid = detrended[indices]
        valid = valid[~np.isnan(valid)]
        season_mean = np.mean(valid) if len(valid) > 0 else 0.0
        seasonal[indices] = season_mean

    # Center the seasonal component (should sum to ~0 for additive)
    if model == "additive":
        seasonal -= np.mean(seasonal[:period])

    # Step 4: Residual
    if model == "additive":
        residual = data - trend - seasonal
    else:
        residual = data / (
            np.where(trend != 0, trend, np.nan)
            * np.where(seasonal != 0, seasonal, np.nan)
        )

    result_df = DataFrame(
        {
            "observed": data,
            "trend": trend,
            "seasonal": seasonal,
            "residual": residual,
        },
        index=idx,
    )

    fig_ax: tuple[Figure, Axes] | None = None
    if plot:
        fig_ax = _plot_decomposition(result_df, column, model, **kwargs)

    return result_df, fig_ax
smooth(method='moving_average', window=10, **params) #

Apply smoothing to the time series.

Parameters:

Name Type Description Default
method str

Smoothing method. - "moving_average": Centered moving average via pandas rolling. - "exponential": Exponential weighted moving average via pandas ewm. - "savgol": Savitzky-Golay filter (preserves peaks better than MA). Extra param: polyorder (default 2).

'moving_average'
window int

Window size. For savgol, must be odd. Default 10.

10
**params Any

Method-specific parameters (e.g., polyorder for savgol).

{}

Returns:

Name Type Description
TimeSeries Any

New TimeSeries with smoothed values. Same index as original.

Examples:

Moving average smoothing (NaN at edges where window is incomplete):

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> np.random.seed(42)
>>> ts = TimeSeries(np.random.randn(100))
>>> smoothed = ts.smooth(method="moving_average", window=10)
>>> smoothed.shape
(100, 1)
>>> round(float(smoothed.dropna().iloc[0, 0]), 4)
0.4481

Exponential weighted moving average (no NaN values):

>>> np.random.seed(42)
>>> ts2 = TimeSeries(np.random.randn(100))
>>> smoothed2 = ts2.smooth(method="exponential", window=10)
>>> round(float(smoothed2.iloc[0, 0]), 4)
0.4967
>>> round(float(smoothed2.iloc[2, 0]), 4)
0.3486

Savitzky-Golay filter preserves peaks better (reduces std less aggressively):

>>> np.random.seed(42)
>>> ts3 = TimeSeries(np.random.randn(100))
>>> smoothed3 = ts3.smooth(method="savgol", window=11, polyorder=2)
>>> round(float(smoothed3.iloc[0, 0]), 4)
0.2353
>>> round(float(smoothed3.values.std() / ts3.values.std()), 4)
0.4354
Source code in src/statista/time_series/decomposition.py
def smooth(
    self,
    method: str = "moving_average",
    window: int = 10,
    **params: Any,
) -> Any:
    """Apply smoothing to the time series.

    Args:
        method: Smoothing method.
            - "moving_average": Centered moving average via pandas rolling.
            - "exponential": Exponential weighted moving average via pandas ewm.
            - "savgol": Savitzky-Golay filter (preserves peaks better than MA).
              Extra param: polyorder (default 2).
        window: Window size. For savgol, must be odd. Default 10.
        **params: Method-specific parameters (e.g., polyorder for savgol).

    Returns:
        TimeSeries: New TimeSeries with smoothed values. Same index as original.

    Examples:
        Moving average smoothing (NaN at edges where window is incomplete):

        >>> import numpy as np
        >>> from statista.time_series import TimeSeries
        >>> np.random.seed(42)
        >>> ts = TimeSeries(np.random.randn(100))
        >>> smoothed = ts.smooth(method="moving_average", window=10)
        >>> smoothed.shape
        (100, 1)
        >>> round(float(smoothed.dropna().iloc[0, 0]), 4)
        0.4481

        Exponential weighted moving average (no NaN values):

        >>> np.random.seed(42)
        >>> ts2 = TimeSeries(np.random.randn(100))
        >>> smoothed2 = ts2.smooth(method="exponential", window=10)
        >>> round(float(smoothed2.iloc[0, 0]), 4)
        0.4967
        >>> round(float(smoothed2.iloc[2, 0]), 4)
        0.3486

        Savitzky-Golay filter preserves peaks better (reduces std less aggressively):

        >>> np.random.seed(42)
        >>> ts3 = TimeSeries(np.random.randn(100))
        >>> smoothed3 = ts3.smooth(method="savgol", window=11, polyorder=2)
        >>> round(float(smoothed3.iloc[0, 0]), 4)
        0.2353
        >>> round(float(smoothed3.values.std() / ts3.values.std()), 4)
        0.4354
    """
    from statista.time_series import TimeSeries

    if method == "moving_average":
        smoothed_df = self.rolling(window, center=True).mean()
        result = TimeSeries(
            smoothed_df.values,
            index=self.index,
            columns=list(self.columns),
        )
    elif method == "exponential":
        smoothed_df = self.ewm(span=window).mean()
        result = TimeSeries(
            smoothed_df.values,
            index=self.index,
            columns=list(self.columns),
        )
    elif method == "savgol":
        polyorder = params.get("polyorder", 2)
        # savgol requires odd window
        win = window if window % 2 == 1 else window + 1
        result_data = np.empty_like(self.values, dtype=float)
        for i, col in enumerate(self.columns):
            data = self[col].values.astype(float)
            result_data[:, i] = savgol_filter(
                data, window_length=win, polyorder=polyorder
            )
        result = TimeSeries(
            result_data,
            index=self.index,
            columns=list(self.columns),
        )
    else:
        raise ValueError(
            f"Unknown method '{method}'. Choose from 'moving_average', 'exponential', 'savgol'."
        )

    return result
envelope(window=30, lower_pct=5, upper_pct=95, column=None, **kwargs) #

Plot the time series with rolling percentile envelope bands.

Shows the natural variability range of the data over a rolling window.

Parameters:

Name Type Description Default
window int

Rolling window size. Default 30.

30
lower_pct float

Lower percentile for the band (0-100). Default 5.

5
upper_pct float

Upper percentile for the band (0-100). Default 95.

95
column str

Column to plot. If None, uses first column.

None
**kwargs Any

Passed to _adjust_axes_labels.

{}

Returns:

Name Type Description
tuple tuple[Figure, Axes]

(Figure, Axes)

Examples:

>>> import numpy as np
>>> from statista.time_series import TimeSeries
>>> ts = TimeSeries(np.random.randn(200))
>>> fig, ax = ts.envelope(window=20)
Source code in src/statista/time_series/decomposition.py
def envelope(
    self,
    window: int = 30,
    lower_pct: float = 5,
    upper_pct: float = 95,
    column: str = None,
    **kwargs: Any,
) -> tuple[Figure, Axes]:
    """Plot the time series with rolling percentile envelope bands.

    Shows the natural variability range of the data over a rolling window.

    Args:
        window: Rolling window size. Default 30.
        lower_pct: Lower percentile for the band (0-100). Default 5.
        upper_pct: Upper percentile for the band (0-100). Default 95.
        column: Column to plot. If None, uses first column.
        **kwargs: Passed to ``_adjust_axes_labels``.

    Returns:
        tuple: (Figure, Axes)

    Examples:
        >>> import numpy as np  # doctest: +SKIP
        >>> from statista.time_series import TimeSeries  # doctest: +SKIP
        >>> ts = TimeSeries(np.random.randn(200))  # doctest: +SKIP
        >>> fig, ax = ts.envelope(window=20)  # doctest: +SKIP
    """
    if column is None:
        column = self.columns[0]

    series = self[column]
    rolling_lower = series.rolling(window).quantile(lower_pct / 100.0)
    rolling_upper = series.rolling(window).quantile(upper_pct / 100.0)
    rolling_median = series.rolling(window).median()

    fig, ax = self._get_ax_fig(**kwargs)
    kwargs.pop("fig", None)
    kwargs.pop("ax", None)

    ax.plot(
        series.index,
        series.values,
        color="steelblue",
        alpha=0.4,
        linewidth=0.5,
        label="Data",
    )
    ax.plot(
        series.index,
        rolling_median.values,
        color="darkblue",
        linewidth=1.2,
        label="Rolling median",
    )
    ax.fill_between(
        series.index,
        rolling_lower.values,
        rolling_upper.values,
        color="lightblue",
        alpha=0.4,
        label=f"{lower_pct}%-{upper_pct}% envelope",
    )

    if "title" not in kwargs:
        kwargs["title"] = f"Envelope — {column}"
    if "xlabel" not in kwargs:
        kwargs["xlabel"] = "Index"
    if "ylabel" not in kwargs:
        kwargs["ylabel"] = "Value"

    ax = self._adjust_axes_labels(ax, **kwargs)
    plt.show()
    return fig, ax