Skip to content

ECMWF#

ECMWF Climate Data Store data source via cdsapi.

earthlens.ecmwf.ECMWF #

Bases: AbstractDataSource

ECMWF / Copernicus Climate Data Store backend.

Downloads ERA5 reanalysis (and ERA5-Land where the catalog indicates) via :class:cdsapi.Client. The user-friendly variable short codes (e.g. "2m-temperature", "total-precipitation") are resolved through :class:Catalog, which loads the per-variable metadata from cds_data_catalog.yaml.

The download pipeline (per variable) is a single step:

  • :meth:_api — build the cdsapi request dict (daily / monthly branch on temporal_resolution) and submit it via client.retrieve(dataset, request, target). Returns the absolute path to the NetCDF that CDS wrote.

Per-date GeoTIFF post-processing (time-window mean, flux scaling, raster output) is intentionally not part of the package — see examples/post_process_ecmwf_netcdf.py for a runnable script that consumes the NetCDF this method writes.

The valid temporal_resolution values are "daily" and "monthly". _check_input_dates raises ValueError for anything else; that is the authoritative gate. Spatial cell size lives on :attr:SpatialExtent.resolution (populated by :meth:_create_grid) and is sourced from :data:ERA5_GRID_DEGREES.

Source code in src/earthlens/ecmwf/backend.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
class ECMWF(AbstractDataSource):
    """ECMWF / Copernicus Climate Data Store backend.

    Downloads ERA5 reanalysis (and ERA5-Land where the catalog
    indicates) via :class:`cdsapi.Client`. The user-friendly variable
    short codes (e.g. `"2m-temperature"`, `"total-precipitation"`) are resolved through
    :class:`Catalog`, which loads the per-variable metadata from
    `cds_data_catalog.yaml`.

    The download pipeline (per variable) is a single step:

    * :meth:`_api` — build the cdsapi request dict (daily / monthly
      branch on `temporal_resolution`) and submit it via
      `client.retrieve(dataset, request, target)`. Returns the
      absolute path to the NetCDF that CDS wrote.

    Per-date GeoTIFF post-processing (time-window mean, flux
    scaling, raster output) is intentionally not part of the
    package — see `examples/post_process_ecmwf_netcdf.py` for a
    runnable script that consumes the NetCDF this method writes.

    The valid `temporal_resolution` values are `"daily"` and
    `"monthly"`. `_check_input_dates` raises `ValueError` for
    anything else; that is the authoritative gate. Spatial cell
    size lives on :attr:`SpatialExtent.resolution` (populated by
    :meth:`_create_grid`) and is sourced from
    :data:`ERA5_GRID_DEGREES`.
    """

    def __init__(
        self,
        start: str,
        end: str,
        variables: dict[str, list[str]],
        lat_lim: list[float],
        lon_lim: list[float],
        temporal_resolution: str = "daily",
        path: Path | str = "",
        fmt: str = "%Y-%m-%d",
        skip_constraints: bool = False,
    ):
        """Initialize an ECMWF backend instance.

        Forwards every argument to :class:`AbstractDataSource`,
        which captures the cdsapi client into `self.client` and
        the bbox/date dict into `self.space`/`self.time`.

        Args:
            start: Inclusive start date as a string (parsed with
                `fmt`). Required.
            end: Inclusive end date as a string. Required.
            variables: Mapping from CDS dataset short name to a list
                of variable codes drawn from that dataset, e.g.
                `{"reanalysis-era5-single-levels": ["2m-temperature",
                "total-precipitation"]}`. The dataset name must be a
                key of :attr:`Catalog.datasets`; each variable name
                must appear under that dataset's `variables:` block.
                See `cds_data_catalog.yaml` for the registered keys.
                Required.
            lat_lim: `[lat_min, lat_max]`. Required.
            lon_lim: `[lon_min, lon_max]`. Required.
            temporal_resolution: Either `"daily"` or `"monthly"`.
                Defaults to `"daily"`.
            path: Output directory. Created by the parent if it does
                not exist. Defaults to `""` (the current working
                directory).
            fmt: `strptime` format for `start` / `end`.
                Defaults to `"%Y-%m-%d"`.
            skip_constraints: When `True`, every CDS pre-flight
                validation phase (date / area sanity, variable typo
                check, required-fields check, combinatorial cover
                check) is bypassed and the request is sent to CDS
                unchecked. Useful when CDS's published
                `constraints.json` is stale or wrong for the
                dataset, or when running offline. Defaults to `False`.
        """
        self.skip_constraints = skip_constraints
        super().__init__(
            start=start,
            end=end,
            variables=variables,
            temporal_resolution=temporal_resolution,
            lat_lim=lat_lim,
            lon_lim=lon_lim,
            fmt=fmt,
            path=path,
        )

    def _check_input_dates(
        self, start: str, end: str, temporal_resolution: str, fmt: str
    ):
        """Parse the date range and produce the iteration index.

        Returned dict is captured by
        :meth:`AbstractDataSource.__init__` into `self.time` so
        :meth:`_api` can access the parsed bounds and the per-date
        pandas range without re-parsing.

        Args:
            start: Inclusive start date as a string.
            end: Inclusive end date as a string.
            temporal_resolution: `"daily"` (uses `freq="D"`) or
                `"monthly"` (uses `freq="MS"`).
            fmt: `strptime` format applied to `start` and `end`.

        Returns:
            TemporalExtent: Frozen pydantic model with `start_date`,
            `end_date`, `resolution` (pandas frequency alias —
            `"D"` for daily, `"MS"` for month-start), and
            `dates` (the :class:`pandas.DatetimeIndex` the
            download loop iterates).

        Raises:
            ValueError: If `temporal_resolution` is neither
                `"daily"` nor `"monthly"`, or if the parsed
                `start` is later than the parsed `end`.
        """
        start = dt.datetime.strptime(start, fmt)
        end = dt.datetime.strptime(end, fmt)

        if temporal_resolution == "daily":
            dates = pd.date_range(start, end, freq="D")
            resolution = "D"
        elif temporal_resolution == "monthly":
            dates = pd.date_range(start, end, freq="MS")
            resolution = "MS"
        else:
            raise ValueError(
                "temporal_resolution should be either 'daily' or 'monthly'"
            )

        return TemporalExtent(
            start_date=start,
            end_date=end,
            resolution=resolution,
            dates=dates,
        )

    def _initialize(self):
        """Construct the :class:`cdsapi.Client` for talking to CDS.

        Reads credentials from `~/.cdsapirc` (or the `CDSAPI_URL` /
        `CDSAPI_KEY` environment variables, which cdsapi falls back to
        when the dotfile is absent). If neither is configured, the
        underlying cdsapi exception is wrapped in
        :class:`AuthenticationError` with a message that tells the user
        exactly where to put their Personal Access Token.

        Returns:
            cdsapi.Client: Authenticated CDS client. Calls to
            `client.retrieve(...)` use this connection.

        Raises:
            AuthenticationError: If cdsapi cannot construct a Client —
                typically because `~/.cdsapirc` is missing,
                malformed, or contains an old-API-style `email` line.

        Examples:
            - Construct a client when credentials are properly
              configured. Marked `# doctest: +SKIP` because it
              requires a real `~/.cdsapirc`:

                ```python
                >>> ecmwf = ECMWF(  # doctest: +SKIP
                ...     start="2022-01-01",
                ...     end="2022-01-01",
                ...     variables={
                ...         "reanalysis-era5-single-levels": ["2m-temperature"],
                ...     },
                ...     lat_lim=[4.0, 5.0],
                ...     lon_lim=[-75.0, -74.0],
                ...     path="examples/data/era5",
                ... )

                ```
        """
        try:
            client = cdsapi.Client()
        except Exception as exc:  # noqa: BLE001 - cdsapi raises a variety of types; classify here and re-raise as AuthenticationError
            if _looks_like_missing_credentials(exc):
                raise AuthenticationError(
                    "cdsapi could not authenticate against the Climate "
                    "Data Store. Create ~/.cdsapirc (Windows: "
                    "C:\\Users\\<USER>\\.cdsapirc) with:\n"
                    "    url: https://cds.climate.copernicus.eu/api\n"
                    "    key: <YOUR-PERSONAL-ACCESS-TOKEN>\n"
                    "Generate a Personal Access Token at "
                    "https://cds.climate.copernicus.eu/profile and "
                    "accept the licence for each dataset you intend to "
                    "download. See https://cds.climate.copernicus.eu/how-to-api for "
                    "the full setup guide."
                ) from exc
            raise

        return client

    def _create_grid(self, lat_lim: list, lon_lim: list):
        """Snap a lat/lon bounding box to ERA5 grid edges.

        Floors the south/west limits and ceils the north/east limits to
        the nearest multiple of :data:`ERA5_GRID_DEGREES` (0.125°), so
        every CDS retrieve aligns with the ERA5 native grid and no
        cell straddles the requested area boundary.

        Args:
            lat_lim: `[lat_min, lat_max]` in degrees north.
            lon_lim: `[lon_min, lon_max]` in degrees east.

        Returns:
            SpatialExtent: Grid-aligned bounding box with
            `resolution` set to :data:`ERA5_GRID_DEGREES`.

        Examples:
            - Snap a 1° box to the ERA5 grid:

                ```python
                >>> ecmwf = ECMWF.__new__(ECMWF)
                >>> extent = ecmwf._create_grid([4.19, 4.64], [-75.65, -74.73])
                >>> round(extent.resolution, 3)
                0.125
                >>> round(extent.latitude_min, 3), round(extent.latitude_max, 3)
                (4.125, 4.75)

                ```
            - The bbox always grows out to grid edges:

                ```python
                >>> ecmwf = ECMWF.__new__(ECMWF)
                >>> extent = ecmwf._create_grid([0.05, 0.95], [0.05, 0.95])
                >>> round(extent.latitude_min, 3), round(extent.latitude_max, 3)
                (0.0, 1.0)
                >>> round(extent.longitude_min, 3), round(extent.longitude_max, 3)
                (0.0, 1.0)

                ```
        """
        cell_size = ERA5_GRID_DEGREES
        lat_lim_floor = np.floor(lat_lim[0] / cell_size) * cell_size
        lat_lim_ceil = np.ceil(lat_lim[1] / cell_size) * cell_size
        lat_lim = [lat_lim_floor, lat_lim_ceil]

        lon_lim_floor = np.floor(lon_lim[0] / cell_size) * cell_size
        lon_lim_ceil = np.ceil(lon_lim[1] / cell_size) * cell_size
        lon_lim = [lon_lim_floor, lon_lim_ceil]
        return SpatialExtent.from_pairs(
            lat_lim=lat_lim, lon_lim=lon_lim, resolution=cell_size
        )

    def download(
        self,
        progress_bar: bool = True,
        aggregate: AggregationConfig | None = None,
    ):
        """Download every `(dataset, variable)` pair in `self.vars` from CDS.

        Iterates the user-supplied `variables` mapping (CDS dataset
        short name → list of variable codes) and, for each pair,
        looks the variable up in the CDS :class:`Catalog` and
        delegates to :meth:`_download_dataset`.

        Args:
            progress_bar: Reserved; currently unused since the
                slicing pipeline that previously consumed it has
                been moved out of the package. Defaults to `True`
                so existing callers keep working.
            aggregate: Optional :class:`earthlens.aggregate.AggregationConfig`.
                When provided, every retrieved NetCDF is fed through
                :func:`earthlens.aggregate.aggregate_netcdf` immediately
                after `_api()` returns. When the config's `out_dir`
                is `None`, it is defaulted to
                `<self.root_dir>/aggregated/`. Aggregation failures
                surface in the per-variable failure summary alongside
                retrieve failures, so a single bad variable does not
                abort the rest of the loop.

                **`op="auto"` semantics.** When the config's `op` is
                left at its default `"auto"`, the reducer is picked
                per-variable from the catalog row's `types` field
                (`Variable.is_flux`):

                * **State** (`types` unset or `"state"` — e.g.
                  `2m-temperature`, `surface-pressure`,
                  `relative-humidity`). Each NetCDF sample is the
                  instantaneous value at that timestamp. `auto` →
                  `"mean"`. The window mean is the natural daily /
                  monthly summary.
                * **Flux** (`types: flux` — e.g.
                  `total-precipitation`, `evaporation`,
                  `surface-runoff`, radiation accumulations). Each
                  NetCDF sample is the accumulation since the
                  previous post-processing step (a 6-hour
                  accumulation in legacy daily ERA5, 1-hour in
                  CDS-Beta). `auto` → `"sum"`. The per-slot
                  accumulations are summed inside the window to
                  recover the actual window total.

                Worked example — daily `evaporation` for one pixel
                with the four 6-hourly slots
                `[0.001, 0.002, 0.005, 0.004]` m of water
                equivalent. `op="auto"` resolves to `"sum"` and
                writes `0.012 m` (the day's total evaporation) to
                the GeoTIFF. A plain `op="mean"` would write
                `0.003 m` (the average 6-hour accumulation, **not**
                a daily total).

                Pass an explicit `op="mean"` / `"sum"` / `"min"` /
                `"max"` / `"std"` to bypass auto-routing — for
                example, on pre-aggregated CDS datasets like
                `derived-era5-single-levels-daily-statistics` where
                each NetCDF sample is already a daily aggregate and
                summing four of them would multiply by 4. See
                `docs/reference/aggregation.md` for the full
                walkthrough.
        Returns:
            None. Per-variable NetCDFs land at
            `<self.root_dir>/<cds_variable>_<cds_dataset>.nc`. When
            `aggregate` is set, per-window GeoTIFFs land at
            `<aggregate.out_dir or self.root_dir/aggregated>/<cds_variable>_<freq>_<window>.tif`.

        Raises:
            KeyError: If any dataset key in `self.vars` is not a
                curated CDS dataset, or if a listed variable is not
                declared under that dataset.
            Exception: Any error :meth:`_api` propagates from
                :meth:`cdsapi.Client.retrieve`.

        Examples:
            - End-to-end download via the user-facing
              :class:`EarthLens` facade. Marked
              `# doctest: +SKIP` because it requires a configured
              `~/.cdsapirc` and several minutes of CDS queue time:

                ```python
                >>> from earthlens.earthlens import EarthLens
                >>> earthlens = EarthLens(  # doctest: +SKIP
                ...     data_source="ecmwf",
                ...     temporal_resolution="daily",
                ...     start="2022-01-01",
                ...     end="2022-01-01",
                ...     variables={
                ...         "reanalysis-era5-single-levels": [
                ...             "2m-temperature", "total-precipitation"
                ...         ],
                ...     },
                ...     lat_lim=[4.0, 5.0],
                ...     lon_lim=[-75.0, -74.0],
                ...     path="examples/data/era5",
                ... )
                >>> earthlens.download()  # doctest: +SKIP

                ```

        See Also:
            :meth:`_download_dataset`: Per-variable download +
                post-processing.
            :meth:`_api`: Builds and submits the cdsapi request.
            :class:`Catalog`: Resolves `(dataset, code)` pairs to
                per-variable metadata.
        """
        catalog = Catalog()
        succeeded: list[tuple[str, str]] = []
        failed: list[tuple[tuple[str, str], BaseException]] = []

        effective_aggregate: AggregationConfig | None = None
        if aggregate is not None:
            if aggregate.out_dir is None:
                effective_aggregate = aggregate.model_copy(
                    update={"out_dir": self.root_dir / "aggregated"}
                )
            else:
                effective_aggregate = aggregate

        for dataset_name, var_codes in self.vars.items():
            for var in var_codes:
                start = self.time.start_date
                end = self.time.end_date
                logger.info(
                    f"Download ECMWF {dataset_name}/{var} data for "
                    f"period {start} till {end}"
                )
                try:
                    var_info = catalog.get_variable(dataset_name, var)
                    nc_path = self._download_dataset(
                        var_info, progress_bar=progress_bar
                    )
                except Exception as exc:  # noqa: BLE001 - log + continue so one bad variable doesn't kill the batch
                    logger.error(
                        f"ECMWF download for {dataset_name}/{var} failed: "
                        f"{type(exc).__name__}: {exc}"
                    )
                    failed.append(((dataset_name, var), exc))
                    continue

                if effective_aggregate is not None:
                    try:
                        aggregate_netcdf(nc_path, var_info, effective_aggregate)
                    except Exception as exc:  # noqa: BLE001 - log + continue so one bad aggregate doesn't kill the batch
                        logger.error(
                            f"ECMWF aggregate for {dataset_name}/{var} failed: "
                            f"{type(exc).__name__}: {exc}"
                        )
                        failed.append(((dataset_name, var), exc))
                        continue

                succeeded.append((dataset_name, var))

        if failed:
            failed_summary = ", ".join(
                f"{ds}/{var} ({type(exc).__name__})" for (ds, var), exc in failed
            )
            logger.warning(
                f"ECMWF download summary: {len(succeeded)} succeeded "
                f"({succeeded}), {len(failed)} failed ({failed_summary})"
            )
        else:
            logger.info(
                f"ECMWF download summary: all {len(succeeded)} "
                f"variables succeeded ({succeeded})"
            )

    def _download_dataset(
        self,
        var_info: Variable,
        progress_bar: bool = True,
    ):
        """Download a single variable from CDS.

        Thin wrapper around :meth:`_api` — builds the cdsapi request,
        submits it, and returns the absolute :class:`pathlib.Path`
        to the NetCDF that CDS wrote.

        Per-date GeoTIFF slicing is **not** done here. Users who
        want per-date `.tif` outputs can run
        `examples/post_process_ecmwf_netcdf.py` against the
        returned NetCDF.

        Args:
            var_info: Catalog row for the variable. See :meth:`_api`
                for the attributes consumed.
            progress_bar: Reserved; currently unused since the
                slicing pipeline that previously consumed it has
                been moved out of the package. Defaults to `True`
                so existing callers keep working.

        Returns:
            pathlib.Path: Absolute path to the downloaded NetCDF.

        See Also:
            :meth:`_api`: Builds and submits the CDS request, returns
                the path to the NetCDF.
            :class:`Catalog`: Loads `Variable` instances from
                `cds_data_catalog.yaml`.
        """
        return self._api(var_info)

    def _api(self, var_info: Variable):
        """Submit a CDS retrieve request for one variable and return the path.

        Five-stage pipeline:

        1. Derive the dataset name from `var_info.cds_dataset`.
        2. Delegate request-dict assembly to :meth:`_build_request`.
        3. Pre-flight the request via
           :class:`earthlens.ecmwf.constraints.RequestValidator`
           (skipped when the constructor was given
           `skip_constraints=True`).
        4. Submit via :meth:`cdsapi.Client.retrieve`. The call blocks
           until CDS has served the request and written the NetCDF
           — typically minutes due to CDS queue times.
        5. On failure, rewrite licence-not-accepted exceptions into a
           :class:`PermissionError` carrying the dataset's licence
           page URL. All other exceptions propagate untouched.

        Output filename:
        `<self.root_dir>/<cds_variable>_<cds_dataset>.nc`.

        Args:
            var_info: Catalog row resolved by :class:`Catalog`.
                See :meth:`_build_request` for the full list of
                fields consumed during request assembly. `_api`
                itself reads `cds_dataset` (the retrieve target)
                and `cds_variable` (the output filename stem).

        Returns:
            pathlib.Path: Absolute path to the downloaded NetCDF
            file.

        Raises:
            PermissionError: When CDS rejects the request because
                the dataset's licence has not been accepted on the
                user's CDS account. Message links to the dataset's
                licence page.
            ValueError: Propagated from
                :class:`earthlens.ecmwf.constraints.RequestValidator`
                when the assembled request fails the pre-flight
                check (variable typo, unknown extras, malformed
                date / area, ...). Skipped entirely when
                `skip_constraints=True`.
            Exception: Other transport-level errors from
                :meth:`cdsapi.Client.retrieve` (authentication
                failures, transient CDS 5xx, network drops)
                propagate untouched.

        Examples:
            - Inspect the variable + filename pattern this method
              produces (no network access — pure catalog read):

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> spec = Catalog().get_variable(
                ...     "reanalysis-era5-single-levels", "2m-temperature"
                ... )
                >>> spec.cds_dataset
                'reanalysis-era5-single-levels'
                >>> f"{spec.cds_variable}_{spec.cds_dataset}.nc"
                '2m_temperature_reanalysis-era5-single-levels.nc'

                ```
            - Submit the request through the user-facing
              :class:`EarthLens` facade. Marked
              `# doctest: +SKIP` because it requires a configured
              `~/.cdsapirc` and several minutes of CDS queue time:

                ```python
                >>> from earthlens.earthlens import EarthLens  # doctest: +SKIP
                >>> earthlens = EarthLens(  # doctest: +SKIP
                ...     data_source="ecmwf",
                ...     temporal_resolution="daily",
                ...     start="2022-01-01",
                ...     end="2022-01-01",
                ...     variables={
                ...         "reanalysis-era5-single-levels": ["2m-temperature"],
                ...     },
                ...     lat_lim=[4.0, 5.0],
                ...     lon_lim=[-75.0, -74.0],
                ...     path="examples/data/era5",
                ... )
                >>> earthlens.download()  # doctest: +SKIP

                ```

        See Also:
            :meth:`_build_request`: Assembles the CDS request dict
                this method submits — the pure-builder collaborator.
            :class:`earthlens.ecmwf.constraints.RequestValidator`: The
                pre-flight check applied to the assembled request.
            :meth:`_download_dataset`: Thin pass-through wrapper —
                calls this method and returns the same path.
            :class:`Catalog`: Resolves `(dataset, variable)` pairs
                to :class:`Variable` rows.
            :class:`earthlens.earthlens.EarthLens`: User-facing facade
                that wires this method into the `download()` flow.
        """
        dataset = var_info.cds_dataset
        request = self._build_request(var_info)

        # Pre-flight check the assembled request against the CDS
        # `constraints.json` for this dataset. Catches typos and
        # invalid extras combinations client-side before they
        # consume a CDS queue slot. Pass `skip_constraints=True`
        # to `ECMWF(...)` to bypass.
        RequestValidator(dataset, request, skip=self.skip_constraints).check()

        target = self.root_dir / f"{var_info.cds_variable}_{dataset}.nc"
        logger.info(f"Requesting {dataset} from CDS; this may take several minutes")
        try:
            self.client.retrieve(dataset, request, str(target))
        except Exception as exc:  # noqa: BLE001 - cdsapi raises a variety of types; classify here and re-raise as PermissionError when licence-related
            if _looks_like_licence_not_accepted(exc):
                raise PermissionError(
                    f"CDS rejected the request for {dataset!r}: licence "
                    "not accepted. Open the dataset page at "
                    f"https://cds.climate.copernicus.eu/datasets/{dataset} "
                    "and tick the licence at the bottom of the "
                    "'Download' tab. The acceptance is permanent and "
                    "tied to your CDS account."
                ) from exc
            raise
        _unwrap_zipped_netcdf(target)
        return target

    def _build_request(self, var_info: Variable) -> dict[str, Any]:
        """Assemble the CDS retrieve-request dict for one variable.

        Pure function over `var_info`, `self.time.dates`,
        `self.space`, and `self.temporal_resolution`. No I/O, no
        validation, no client calls — just dictionary assembly.
        :meth:`_api` consumes the result and submits it via
        :meth:`cdsapi.Client.retrieve`.

        Build order (later steps override earlier ones):

        1. Template defaults (`variable`, `year`, `month`,
           `data_format`, `area`, `product_type`).
        2. Daily / monthly branch — daily adds `day` plus four
           six-hourly `time` slots; monthly pins `time=["00:00"]`
           and omits `day` (CDS monthly-means datasets reject
           `day`).
        3. Pressure-level forward — `cds_pressure_level` becomes
           `pressure_level` on the request.
        4. `var_info.extras` merge — per-row catalog overrides win
           over the template defaults.
        5. `request_kind` strip — drop template-default keys the
           dataset family rejects (e.g. ORAS5 rejects
           `day`/`time`/`area`). Done after the extras merge so a
           user can re-introduce a stripped key by setting it
           explicitly in extras.
        6. Per-row `None` opt-outs — any `extras` key set to `None`
           is dropped from the request, the per-row escape hatch
           for datasets that reject the default bbox without
           forcing a new `request_kind`.

        Args:
            var_info: Catalog row for the variable being requested.
                Drives every field on the request except `area` /
                `year` / `month` / `day` / `time` (which come from
                `self.space` and `self.time`).

        Returns:
            dict[str, Any]: Request dict ready to pass as the
            second positional argument to
            :meth:`cdsapi.Client.retrieve`.
        """
        dates = self.time.dates
        request: dict[str, Any] = {
            "variable": [var_info.cds_variable],
            "year": sorted({str(d.year) for d in dates}),
            "month": sorted({f"{d.month:02d}" for d in dates}),
            "data_format": "netcdf",
            "area": [
                self.space.north,
                self.space.west,
                self.space.south,
                self.space.east,
            ],
            "product_type": var_info.product_type,
        }

        if self.temporal_resolution == "monthly":
            request["time"] = ["00:00"]
        else:
            request["day"] = sorted({f"{d.day:02d}" for d in dates})
            request["time"] = ["00:00", "06:00", "12:00", "18:00"]

        if var_info.cds_pressure_level is not None:
            request["pressure_level"] = var_info.cds_pressure_level

        request.update(var_info.extras)

        for stripped in _REQUEST_KIND_STRIPS.get(var_info.request_kind, ()):
            if stripped not in var_info.extras:
                request.pop(stripped, None)

        for key, value in list(var_info.extras.items()):
            if value is None:
                request.pop(key, None)

        return request

__init__(start, end, variables, lat_lim, lon_lim, temporal_resolution='daily', path='', fmt='%Y-%m-%d', skip_constraints=False) #

Initialize an ECMWF backend instance.

Forwards every argument to :class:AbstractDataSource, which captures the cdsapi client into self.client and the bbox/date dict into self.space/self.time.

Parameters:

Name Type Description Default
start str

Inclusive start date as a string (parsed with fmt). Required.

required
end str

Inclusive end date as a string. Required.

required
variables dict[str, list[str]]

Mapping from CDS dataset short name to a list of variable codes drawn from that dataset, e.g. {"reanalysis-era5-single-levels": ["2m-temperature", "total-precipitation"]}. The dataset name must be a key of :attr:Catalog.datasets; each variable name must appear under that dataset's variables: block. See cds_data_catalog.yaml for the registered keys. Required.

required
lat_lim list[float]

[lat_min, lat_max]. Required.

required
lon_lim list[float]

[lon_min, lon_max]. Required.

required
temporal_resolution str

Either "daily" or "monthly". Defaults to "daily".

'daily'
path Path | str

Output directory. Created by the parent if it does not exist. Defaults to "" (the current working directory).

''
fmt str

strptime format for start / end. Defaults to "%Y-%m-%d".

'%Y-%m-%d'
skip_constraints bool

When True, every CDS pre-flight validation phase (date / area sanity, variable typo check, required-fields check, combinatorial cover check) is bypassed and the request is sent to CDS unchecked. Useful when CDS's published constraints.json is stale or wrong for the dataset, or when running offline. Defaults to False.

False
Source code in src/earthlens/ecmwf/backend.py
def __init__(
    self,
    start: str,
    end: str,
    variables: dict[str, list[str]],
    lat_lim: list[float],
    lon_lim: list[float],
    temporal_resolution: str = "daily",
    path: Path | str = "",
    fmt: str = "%Y-%m-%d",
    skip_constraints: bool = False,
):
    """Initialize an ECMWF backend instance.

    Forwards every argument to :class:`AbstractDataSource`,
    which captures the cdsapi client into `self.client` and
    the bbox/date dict into `self.space`/`self.time`.

    Args:
        start: Inclusive start date as a string (parsed with
            `fmt`). Required.
        end: Inclusive end date as a string. Required.
        variables: Mapping from CDS dataset short name to a list
            of variable codes drawn from that dataset, e.g.
            `{"reanalysis-era5-single-levels": ["2m-temperature",
            "total-precipitation"]}`. The dataset name must be a
            key of :attr:`Catalog.datasets`; each variable name
            must appear under that dataset's `variables:` block.
            See `cds_data_catalog.yaml` for the registered keys.
            Required.
        lat_lim: `[lat_min, lat_max]`. Required.
        lon_lim: `[lon_min, lon_max]`. Required.
        temporal_resolution: Either `"daily"` or `"monthly"`.
            Defaults to `"daily"`.
        path: Output directory. Created by the parent if it does
            not exist. Defaults to `""` (the current working
            directory).
        fmt: `strptime` format for `start` / `end`.
            Defaults to `"%Y-%m-%d"`.
        skip_constraints: When `True`, every CDS pre-flight
            validation phase (date / area sanity, variable typo
            check, required-fields check, combinatorial cover
            check) is bypassed and the request is sent to CDS
            unchecked. Useful when CDS's published
            `constraints.json` is stale or wrong for the
            dataset, or when running offline. Defaults to `False`.
    """
    self.skip_constraints = skip_constraints
    super().__init__(
        start=start,
        end=end,
        variables=variables,
        temporal_resolution=temporal_resolution,
        lat_lim=lat_lim,
        lon_lim=lon_lim,
        fmt=fmt,
        path=path,
    )

download(progress_bar=True, aggregate=None) #

Download every (dataset, variable) pair in self.vars from CDS.

Iterates the user-supplied variables mapping (CDS dataset short name → list of variable codes) and, for each pair, looks the variable up in the CDS :class:Catalog and delegates to :meth:_download_dataset.

Parameters:

Name Type Description Default
progress_bar bool

Reserved; currently unused since the slicing pipeline that previously consumed it has been moved out of the package. Defaults to True so existing callers keep working.

True
aggregate AggregationConfig | None

Optional :class:earthlens.aggregate.AggregationConfig. When provided, every retrieved NetCDF is fed through :func:earthlens.aggregate.aggregate_netcdf immediately after _api() returns. When the config's out_dir is None, it is defaulted to <self.root_dir>/aggregated/. Aggregation failures surface in the per-variable failure summary alongside retrieve failures, so a single bad variable does not abort the rest of the loop.

op="auto" semantics. When the config's op is left at its default "auto", the reducer is picked per-variable from the catalog row's types field (Variable.is_flux):

  • State (types unset or "state" — e.g. 2m-temperature, surface-pressure, relative-humidity). Each NetCDF sample is the instantaneous value at that timestamp. auto"mean". The window mean is the natural daily / monthly summary.
  • Flux (types: flux — e.g. total-precipitation, evaporation, surface-runoff, radiation accumulations). Each NetCDF sample is the accumulation since the previous post-processing step (a 6-hour accumulation in legacy daily ERA5, 1-hour in CDS-Beta). auto"sum". The per-slot accumulations are summed inside the window to recover the actual window total.

Worked example — daily evaporation for one pixel with the four 6-hourly slots [0.001, 0.002, 0.005, 0.004] m of water equivalent. op="auto" resolves to "sum" and writes 0.012 m (the day's total evaporation) to the GeoTIFF. A plain op="mean" would write 0.003 m (the average 6-hour accumulation, not a daily total).

Pass an explicit op="mean" / "sum" / "min" / "max" / "std" to bypass auto-routing — for example, on pre-aggregated CDS datasets like derived-era5-single-levels-daily-statistics where each NetCDF sample is already a daily aggregate and summing four of them would multiply by 4. See docs/reference/aggregation.md for the full walkthrough.

None

Returns: None. Per-variable NetCDFs land at <self.root_dir>/<cds_variable>_<cds_dataset>.nc. When aggregate is set, per-window GeoTIFFs land at <aggregate.out_dir or self.root_dir/aggregated>/<cds_variable>_<freq>_<window>.tif.

Raises:

Type Description
KeyError

If any dataset key in self.vars is not a curated CDS dataset, or if a listed variable is not declared under that dataset.

Exception

Any error :meth:_api propagates from :meth:cdsapi.Client.retrieve.

Examples:

  • End-to-end download via the user-facing :class:EarthLens facade. Marked # doctest: +SKIP because it requires a configured ~/.cdsapirc and several minutes of CDS queue time:

    >>> from earthlens.earthlens import EarthLens
    >>> earthlens = EarthLens(  # doctest: +SKIP
    ...     data_source="ecmwf",
    ...     temporal_resolution="daily",
    ...     start="2022-01-01",
    ...     end="2022-01-01",
    ...     variables={
    ...         "reanalysis-era5-single-levels": [
    ...             "2m-temperature", "total-precipitation"
    ...         ],
    ...     },
    ...     lat_lim=[4.0, 5.0],
    ...     lon_lim=[-75.0, -74.0],
    ...     path="examples/data/era5",
    ... )
    >>> earthlens.download()  # doctest: +SKIP
    
See Also

:meth:_download_dataset: Per-variable download + post-processing. :meth:_api: Builds and submits the cdsapi request. :class:Catalog: Resolves (dataset, code) pairs to per-variable metadata.

Source code in src/earthlens/ecmwf/backend.py
def download(
    self,
    progress_bar: bool = True,
    aggregate: AggregationConfig | None = None,
):
    """Download every `(dataset, variable)` pair in `self.vars` from CDS.

    Iterates the user-supplied `variables` mapping (CDS dataset
    short name → list of variable codes) and, for each pair,
    looks the variable up in the CDS :class:`Catalog` and
    delegates to :meth:`_download_dataset`.

    Args:
        progress_bar: Reserved; currently unused since the
            slicing pipeline that previously consumed it has
            been moved out of the package. Defaults to `True`
            so existing callers keep working.
        aggregate: Optional :class:`earthlens.aggregate.AggregationConfig`.
            When provided, every retrieved NetCDF is fed through
            :func:`earthlens.aggregate.aggregate_netcdf` immediately
            after `_api()` returns. When the config's `out_dir`
            is `None`, it is defaulted to
            `<self.root_dir>/aggregated/`. Aggregation failures
            surface in the per-variable failure summary alongside
            retrieve failures, so a single bad variable does not
            abort the rest of the loop.

            **`op="auto"` semantics.** When the config's `op` is
            left at its default `"auto"`, the reducer is picked
            per-variable from the catalog row's `types` field
            (`Variable.is_flux`):

            * **State** (`types` unset or `"state"` — e.g.
              `2m-temperature`, `surface-pressure`,
              `relative-humidity`). Each NetCDF sample is the
              instantaneous value at that timestamp. `auto` →
              `"mean"`. The window mean is the natural daily /
              monthly summary.
            * **Flux** (`types: flux` — e.g.
              `total-precipitation`, `evaporation`,
              `surface-runoff`, radiation accumulations). Each
              NetCDF sample is the accumulation since the
              previous post-processing step (a 6-hour
              accumulation in legacy daily ERA5, 1-hour in
              CDS-Beta). `auto` → `"sum"`. The per-slot
              accumulations are summed inside the window to
              recover the actual window total.

            Worked example — daily `evaporation` for one pixel
            with the four 6-hourly slots
            `[0.001, 0.002, 0.005, 0.004]` m of water
            equivalent. `op="auto"` resolves to `"sum"` and
            writes `0.012 m` (the day's total evaporation) to
            the GeoTIFF. A plain `op="mean"` would write
            `0.003 m` (the average 6-hour accumulation, **not**
            a daily total).

            Pass an explicit `op="mean"` / `"sum"` / `"min"` /
            `"max"` / `"std"` to bypass auto-routing — for
            example, on pre-aggregated CDS datasets like
            `derived-era5-single-levels-daily-statistics` where
            each NetCDF sample is already a daily aggregate and
            summing four of them would multiply by 4. See
            `docs/reference/aggregation.md` for the full
            walkthrough.
    Returns:
        None. Per-variable NetCDFs land at
        `<self.root_dir>/<cds_variable>_<cds_dataset>.nc`. When
        `aggregate` is set, per-window GeoTIFFs land at
        `<aggregate.out_dir or self.root_dir/aggregated>/<cds_variable>_<freq>_<window>.tif`.

    Raises:
        KeyError: If any dataset key in `self.vars` is not a
            curated CDS dataset, or if a listed variable is not
            declared under that dataset.
        Exception: Any error :meth:`_api` propagates from
            :meth:`cdsapi.Client.retrieve`.

    Examples:
        - End-to-end download via the user-facing
          :class:`EarthLens` facade. Marked
          `# doctest: +SKIP` because it requires a configured
          `~/.cdsapirc` and several minutes of CDS queue time:

            ```python
            >>> from earthlens.earthlens import EarthLens
            >>> earthlens = EarthLens(  # doctest: +SKIP
            ...     data_source="ecmwf",
            ...     temporal_resolution="daily",
            ...     start="2022-01-01",
            ...     end="2022-01-01",
            ...     variables={
            ...         "reanalysis-era5-single-levels": [
            ...             "2m-temperature", "total-precipitation"
            ...         ],
            ...     },
            ...     lat_lim=[4.0, 5.0],
            ...     lon_lim=[-75.0, -74.0],
            ...     path="examples/data/era5",
            ... )
            >>> earthlens.download()  # doctest: +SKIP

            ```

    See Also:
        :meth:`_download_dataset`: Per-variable download +
            post-processing.
        :meth:`_api`: Builds and submits the cdsapi request.
        :class:`Catalog`: Resolves `(dataset, code)` pairs to
            per-variable metadata.
    """
    catalog = Catalog()
    succeeded: list[tuple[str, str]] = []
    failed: list[tuple[tuple[str, str], BaseException]] = []

    effective_aggregate: AggregationConfig | None = None
    if aggregate is not None:
        if aggregate.out_dir is None:
            effective_aggregate = aggregate.model_copy(
                update={"out_dir": self.root_dir / "aggregated"}
            )
        else:
            effective_aggregate = aggregate

    for dataset_name, var_codes in self.vars.items():
        for var in var_codes:
            start = self.time.start_date
            end = self.time.end_date
            logger.info(
                f"Download ECMWF {dataset_name}/{var} data for "
                f"period {start} till {end}"
            )
            try:
                var_info = catalog.get_variable(dataset_name, var)
                nc_path = self._download_dataset(
                    var_info, progress_bar=progress_bar
                )
            except Exception as exc:  # noqa: BLE001 - log + continue so one bad variable doesn't kill the batch
                logger.error(
                    f"ECMWF download for {dataset_name}/{var} failed: "
                    f"{type(exc).__name__}: {exc}"
                )
                failed.append(((dataset_name, var), exc))
                continue

            if effective_aggregate is not None:
                try:
                    aggregate_netcdf(nc_path, var_info, effective_aggregate)
                except Exception as exc:  # noqa: BLE001 - log + continue so one bad aggregate doesn't kill the batch
                    logger.error(
                        f"ECMWF aggregate for {dataset_name}/{var} failed: "
                        f"{type(exc).__name__}: {exc}"
                    )
                    failed.append(((dataset_name, var), exc))
                    continue

            succeeded.append((dataset_name, var))

    if failed:
        failed_summary = ", ".join(
            f"{ds}/{var} ({type(exc).__name__})" for (ds, var), exc in failed
        )
        logger.warning(
            f"ECMWF download summary: {len(succeeded)} succeeded "
            f"({succeeded}), {len(failed)} failed ({failed_summary})"
        )
    else:
        logger.info(
            f"ECMWF download summary: all {len(succeeded)} "
            f"variables succeeded ({succeeded})"
        )

earthlens.ecmwf.Catalog #

Bases: AbstractCatalog

Variable catalog for the CDS-backed ECMWF data source.

Reads cds_data_catalog.yaml (shipped as package data) and exposes its consumed top-level sections as typed pydantic fields. Instantiate with no arguments (Catalog()) — :func:model_post_init parses the YAML and populates every field in one pass.

Variables are addressed by the (dataset_name, variable_name) pair via :meth:get_variable; there is no flat per-code lookup. The same short code can legitimately appear under more than one dataset (e.g. "2m-temperature" lives in both reanalysis-era5-single-levels and reanalysis-era5-land), so the dataset name is part of the identity.

Attributes:

Name Type Description
available_datasets list[str]

Informational list of every CDS dataset short name. Mirrors the available_datasets: block in the YAML; runtime code does not consume it.

datasets dict[str, Dataset]

Structural map keyed by CDS dataset short name. Each value is a :class:Dataset carrying that dataset's monthly-aggregate variant and its per-variable map. The authoritative store: every catalog lookup goes through it.

Examples:

  • Look up a variable by (dataset_name, variable_name):

    >>> from earthlens.ecmwf import Catalog
    >>> spec = Catalog().get_variable(
    ...     "reanalysis-era5-single-levels", "2m-temperature"
    ... )
    >>> spec.cds_dataset
    'reanalysis-era5-single-levels'
    >>> spec.nc_variable
    't2m'
    
    - The same short code under a different dataset is a different :class:Variable:

    >>> from earthlens.ecmwf import Catalog
    >>> Catalog().get_variable(
    ...     "reanalysis-era5-land", "2m-temperature"
    ... ).cds_dataset
    'reanalysis-era5-land'
    
    - Iterate variables grouped by dataset (structural):

    >>> from earthlens.ecmwf import Catalog
    >>> cat = Catalog()
    >>> cat.get_dataset("reanalysis-era5-pressure-levels").monthly
    'reanalysis-era5-pressure-levels-monthly-means'
    >>> sorted(cat.get_dataset("reanalysis-era5-pressure-levels").variables)[:3]
    ['divergence', 'fraction-of-cloud-cover', 'geopotential']
    
    - Inspect what CDS hosts overall:

    >>> from earthlens.ecmwf import Catalog
    >>> len(Catalog().available_datasets)
    134
    
Source code in src/earthlens/ecmwf/catalog.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
class Catalog(AbstractCatalog):
    """Variable catalog for the CDS-backed ECMWF data source.

    Reads `cds_data_catalog.yaml` (shipped as package data) and
    exposes its consumed top-level sections as typed pydantic fields.
    Instantiate with no arguments (`Catalog()`) — :func:`model_post_init`
    parses the YAML and populates every field in one pass.

    Variables are addressed by the `(dataset_name, variable_name)`
    pair via :meth:`get_variable`; there is no flat per-code lookup.
    The same short code can legitimately appear under more than one
    dataset (e.g. `"2m-temperature"` lives in both
    `reanalysis-era5-single-levels` and `reanalysis-era5-land`), so
    the dataset name is part of the identity.

    Attributes:
        available_datasets: Informational list of every CDS dataset
            short name. Mirrors the `available_datasets:` block in
            the YAML; runtime code does not consume it.
        datasets: Structural map keyed by CDS dataset short name. Each
            value is a :class:`Dataset` carrying that dataset's
            monthly-aggregate variant and its per-variable map. The
            authoritative store: every catalog lookup goes through
            it.

    Examples:
        - Look up a variable by `(dataset_name, variable_name)`:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> spec = Catalog().get_variable(
            ...     "reanalysis-era5-single-levels", "2m-temperature"
            ... )
            >>> spec.cds_dataset
            'reanalysis-era5-single-levels'
            >>> spec.nc_variable
            't2m'

            ```
        - The same short code under a different dataset is a
          different :class:`Variable`:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> Catalog().get_variable(
            ...     "reanalysis-era5-land", "2m-temperature"
            ... ).cds_dataset
            'reanalysis-era5-land'

            ```
        - Iterate variables grouped by dataset (structural):

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> cat = Catalog()
            >>> cat.get_dataset("reanalysis-era5-pressure-levels").monthly
            'reanalysis-era5-pressure-levels-monthly-means'
            >>> sorted(cat.get_dataset("reanalysis-era5-pressure-levels").variables)[:3]
            ['divergence', 'fraction-of-cloud-cover', 'geopotential']

            ```
        - Inspect what CDS hosts overall:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> len(Catalog().available_datasets)
            134

            ```
    """

    _catalog_kind: str = "CDS catalog"

    available_datasets: list[str] = Field(default_factory=list)
    datasets: dict[str, Dataset] = Field(default_factory=dict)
    providers: dict[str, Provider] = Field(default_factory=dict)

    def model_post_init(self, __context: Any) -> None:
        """Auto-load `cds_data_catalog.yaml` when the user didn't supply one.

        `Catalog()` with no args is sugar for `Catalog.load()` — it
        reads the bundled YAML through the `(path, mtime_ns)`-keyed
        cache so repeated construction is ~1 ms. If the caller passed
        `datasets=...`, the disk read is skipped (test path; see
        :meth:`load` for the heavy-lifting classmethod).

        Raises:
            ValueError: When auto-loading, propagates the same errors
                as :meth:`load`.
        """
        if self.datasets:
            return
        loaded = Catalog.load()
        self.available_datasets = loaded.available_datasets
        self.datasets = loaded.datasets
        self.providers = loaded.providers

    @classmethod
    def load(
        cls,
        catalog_path: Path | None = None,
        providers_path: Path | None = None,
    ) -> Catalog:
        """Read the CDS catalog + providers registry from disk (cached).

        Mirrors :meth:`earthlens.gee.Catalog.load` so the two backends
        feel identical. Validates that every `Dataset.provider` slug
        is in the registry; an unregistered slug is a load-time error.

        Args:
            catalog_path: Path to a `cds_data_catalog.yaml`-shaped
                file. Defaults to module-level :data:`CATALOG_PATH`.
            providers_path: Path to `providers.yaml`. Defaults to
                module-level :data:`PROVIDERS_PATH`.

        Returns:
            A fully-populated :class:`Catalog`.

        Raises:
            ValueError: Propagated from :func:`_load_catalog_data` or
                :func:`earthlens.base.providers.load_providers`, plus
                an unregistered-slug error if the YAML references a
                provider not in the registry.
        """
        catalog_path = catalog_path if catalog_path is not None else CATALOG_PATH
        providers_path = providers_path if providers_path is not None else PROVIDERS_PATH
        available_datasets, datasets = _load_catalog_data(catalog_path)
        providers = load_providers(providers_path)
        unknown = sorted(
            {d.provider for d in datasets.values() if d.provider and d.provider not in providers}
        )
        if unknown:
            raise ValueError(
                f"the following provider slugs are referenced by "
                f"`{catalog_path.name}` but missing from {providers_path}: "
                f"{unknown}. Add them to providers.yaml or fix the typo."
            )
        return cls(
            available_datasets=list(available_datasets),
            datasets=dict(datasets),
            providers=dict(providers),
        )

    def get_catalog(self) -> dict[str, Dataset]:
        """Return the structural per-dataset map.

        Satisfies the abstract base's contract; the actual parsing
        is done in :func:`model_post_init`.

        Returns:
            dict[str, Dataset]: One entry per CDS dataset. Same
            object as :attr:`datasets`.

        Examples:
            - Inspect the dataset count and a sample:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> mapping = Catalog().get_catalog()
                >>> "reanalysis-era5-single-levels" in mapping
                True
                >>> mapping["reanalysis-era5-single-levels"].monthly
                'reanalysis-era5-single-levels-monthly-means'

                ```
        """
        return self.datasets

    def get_variable(self, dataset_name: str, variable_name: str) -> Variable:
        """Return the :class:`Variable` for a `(dataset, code)` pair.

        Args:
            dataset_name: CDS dataset short name as it appears as a
                key in :attr:`datasets` (e.g.
                `"reanalysis-era5-single-levels"`).
            variable_name: Short variable code as it appears as a
                YAML key under that dataset (e.g.
                `"2m-temperature"`, `"total-precipitation"`).

        Returns:
            Variable: Per-variable metadata loaded from
            `cds_data_catalog.yaml`.

        Raises:
            KeyError: If `dataset_name` is not curated, or if
                `variable_name` is not declared under that dataset.

        Examples:
            - Look up a single-level ERA5 variable and read its CDS
              dataset and NetCDF short name:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> spec = Catalog().get_variable(
                ...     "reanalysis-era5-single-levels", "2m-temperature"
                ... )
                >>> spec.cds_dataset
                'reanalysis-era5-single-levels'
                >>> spec.nc_variable, spec.units
                ('t2m', 'K')

                ```
            - Pressure-level variables expose `cds_pressure_level`:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> spec = Catalog().get_variable(
                ...     "reanalysis-era5-pressure-levels", "temperature"
                ... )
                >>> spec.cds_pressure_level
                ['1000']

                ```
            - The same short code under a different dataset is a
              different Variable:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> Catalog().get_variable(
                ...     "reanalysis-era5-land", "2m-temperature"
                ... ).cds_dataset
                'reanalysis-era5-land'

                ```
            - Unknown dataset or variable raises `KeyError`:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> Catalog().get_variable(
                ...     "reanalysis-era5-single-levels", "not-a-variable"
                ... )
                Traceback (most recent call last):
                    ...
                KeyError: 'not-a-variable'

                ```
        """
        return self.datasets[dataset_name].variables[variable_name]

    # `get_dataset(name)` (with the did-you-mean hint) and the dict-like
    # `__getitem__` / `__contains__` / `__iter__` / `__len__` / `__repr__`
    # / `__str__` dunders are inherited from
    # :class:`earthlens.base.AbstractCatalog` (M1 in
    # planning/catalog-cross-backend-comparison.md).

    def health(self) -> dict[str, list[str]]:
        """Report structural hygiene issues across the loaded catalog (L1).

        Returns a mapping `check_name -> list of "<dataset>/<variable>"
        offenders`. An empty list means the check is currently passing;
        an empty dict means the catalog is clean. Most schema-level
        invariants (duplicate keys, unknown fields, missing required
        fields, legacy MARS keys in `extras`) are already enforced at
        load time — this method covers the residual data-quality checks
        that can't be expressed in the pydantic schema.

        Checks reported:

        * `variable_missing_nc_variable` — variables whose
          `nc_variable` is empty or whitespace-only (would break
          downstream NetCDF reads).
        * `dataset_without_variables` — datasets carrying zero
          curated variables. Should always be `[]` since the loader
          rejects these; included for defence in depth.
        """
        missing_nc: list[str] = []
        empty_dataset: list[str] = []
        unregistered_provider: list[str] = []
        used_providers: set[str] = set()
        for ds_name, ds in self.datasets.items():
            if not ds.variables:
                empty_dataset.append(ds_name)
                continue
            for var_code, var in ds.variables.items():
                if not var.nc_variable or not var.nc_variable.strip():
                    missing_nc.append(f"{ds_name}/{var_code}")
            if ds.provider:
                used_providers.add(ds.provider)
                if ds.provider not in self.providers:
                    unregistered_provider.append(ds_name)
        unused_provider = sorted(set(self.providers) - used_providers)
        return {
            "variable_missing_nc_variable": sorted(missing_nc),
            "dataset_without_variables": sorted(empty_dataset),
            "unregistered_provider": sorted(unregistered_provider),
            "unused_provider": unused_provider,
        }

    def describe(self, dataset_name: str) -> dict[str, Any]:
        """Return a structured introspection record for a CDS dataset.

        Useful for "what variables and extras does dataset X expose?"
        questions at runtime — the CLI / notebook caller can dump
        the result without needing to walk the YAML themselves.

        Args:
            dataset_name: CDS dataset short name as it appears as a
                key in :attr:`datasets` (e.g.
                `"reanalysis-era5-land"`).

        Returns:
            dict with keys `dataset` (the short name), `monthly`
            (the monthly-aggregate dataset name or `None`),
            `pressure_level` (the default level list or `None`),
            `extras` (the parent-level request defaults), and
            `variables` (sorted list of the variable short codes
            available under this dataset).

        Raises:
            KeyError: If `dataset_name` is not a curated dataset
                (i.e. not present in :attr:`datasets`).

        Examples:
            - Describe ERA5-Land at a glance:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> info = Catalog().describe("reanalysis-era5-land")
                >>> info["dataset"]
                'reanalysis-era5-land'
                >>> info["monthly"]
                'reanalysis-era5-land-monthly-means'
                >>> len(info["variables"]) == 60
                True
                >>> "2m-temperature" in info["variables"]
                True

                ```
        """
        ds = self.get_dataset(dataset_name)
        return {
            "dataset": dataset_name,
            "monthly": ds.monthly,
            "pressure_level": ds.pressure_level,
            "extras": dict(ds.extras),
            "variables": sorted(ds.variables),
        }

    def minimal_valid_request(self, dataset_name: str) -> dict[str, Any]:
        """Return a known-valid minimal request for `dataset_name`.

        Walks the dataset's published `constraints.json` (cached
        per-process) and returns the first entry expanded into a
        request dict with one value per selector. Useful for:

        * verifying a CDS account is set up correctly (submit the
          returned dict via :meth:`cdsapi.Client.retrieve` and watch
          for a NetCDF rather than a 400),
        * seeing what a valid extras schema looks like for a new
          dataset before authoring catalog rows,
        * starting points for tests.

        The returned request always carries `data_format: netcdf`;
        the rest is whatever the first constraint entry enumerates.

        Args:
            dataset_name: CDS dataset short name. Does not need to be
                in :attr:`datasets` — the constraints endpoint is
                hit directly so any addressable dataset works.

        Returns:
            dict[str, Any]: A request dict ready to pass to
            :meth:`cdsapi.Client.retrieve`. Empty dict (besides
            `data_format`) when the dataset's constraints are
            empty / unreachable.

        Examples:
            - Inspect ECMWF's published shape for a new dataset
              before authoring rows. Marked `# doctest: +SKIP`
              because it requires network access:

                ```python
                >>> from earthlens.ecmwf import Catalog
                >>> req = Catalog().minimal_valid_request(  # doctest: +SKIP
                ...     "reanalysis-cerra-land",
                ... )
                >>> sorted(req.keys())  # doctest: +SKIP
                ['data_format', 'day', 'leadtime_hour', 'level_type', ...]

                ```
        """
        constraints = fetch_constraints(dataset_name)
        request: dict[str, Any] = {"data_format": "netcdf"}
        if not constraints:
            return request
        # Pick the first entry that has at least one variable —
        # entries with empty `variable` lists are dataset-form
        # placeholders that don't make a usable retrieve request.
        for entry in constraints:
            if entry.get("variable"):
                for key, value in entry.items():
                    if isinstance(value, list) and value:
                        request[key] = value[:1]
                    else:
                        request[key] = value
                return request
        # No entry had variables — fall back to the first one anyway
        # (some datasets identify the data column via an extra rather
        # than a `variable` list).
        first = constraints[0]
        for key, value in first.items():
            if isinstance(value, list) and value:
                request[key] = value[:1]
            else:
                request[key] = value
        return request

    def list_recent_jobs(
        self,
        status: str | None = None,
        max_age_min: int = 60,
        limit: int = 50,
    ) -> list[dict[str, Any]]:
        """Return the user's recent CDS retrieval jobs.

        Thin wrapper that delegates to
        :func:`earthlens.ecmwf.jobs.list_recent_jobs` (N3); see that
        for the full docstring. Kept on `Catalog` as a convenience so
        `Catalog().list_recent_jobs(...)` keeps working.
        """
        return _list_recent_jobs_impl(
            status=status, max_age_min=max_age_min, limit=limit
        )

    def download_job(
        self,
        job_id: str,
        target: Path | str,
        chunk_size: int = 1 << 20,
    ) -> Path:
        """Download the result asset of a successful CDS job.

        Thin wrapper that delegates to
        :func:`earthlens.ecmwf.jobs.download_job` (N3); see that for
        the full docstring.
        """
        return _download_job_impl(job_id, target, chunk_size=chunk_size)

describe(dataset_name) #

Return a structured introspection record for a CDS dataset.

Useful for "what variables and extras does dataset X expose?" questions at runtime — the CLI / notebook caller can dump the result without needing to walk the YAML themselves.

Parameters:

Name Type Description Default
dataset_name str

CDS dataset short name as it appears as a key in :attr:datasets (e.g. "reanalysis-era5-land").

required

Returns:

Type Description
dict[str, Any]

dict with keys dataset (the short name), monthly

dict[str, Any]

(the monthly-aggregate dataset name or None),

dict[str, Any]

pressure_level (the default level list or None),

dict[str, Any]

extras (the parent-level request defaults), and

dict[str, Any]

variables (sorted list of the variable short codes

dict[str, Any]

available under this dataset).

Raises:

Type Description
KeyError

If dataset_name is not a curated dataset (i.e. not present in :attr:datasets).

Examples:

  • Describe ERA5-Land at a glance:

    >>> from earthlens.ecmwf import Catalog
    >>> info = Catalog().describe("reanalysis-era5-land")
    >>> info["dataset"]
    'reanalysis-era5-land'
    >>> info["monthly"]
    'reanalysis-era5-land-monthly-means'
    >>> len(info["variables"]) == 60
    True
    >>> "2m-temperature" in info["variables"]
    True
    
Source code in src/earthlens/ecmwf/catalog.py
def describe(self, dataset_name: str) -> dict[str, Any]:
    """Return a structured introspection record for a CDS dataset.

    Useful for "what variables and extras does dataset X expose?"
    questions at runtime — the CLI / notebook caller can dump
    the result without needing to walk the YAML themselves.

    Args:
        dataset_name: CDS dataset short name as it appears as a
            key in :attr:`datasets` (e.g.
            `"reanalysis-era5-land"`).

    Returns:
        dict with keys `dataset` (the short name), `monthly`
        (the monthly-aggregate dataset name or `None`),
        `pressure_level` (the default level list or `None`),
        `extras` (the parent-level request defaults), and
        `variables` (sorted list of the variable short codes
        available under this dataset).

    Raises:
        KeyError: If `dataset_name` is not a curated dataset
            (i.e. not present in :attr:`datasets`).

    Examples:
        - Describe ERA5-Land at a glance:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> info = Catalog().describe("reanalysis-era5-land")
            >>> info["dataset"]
            'reanalysis-era5-land'
            >>> info["monthly"]
            'reanalysis-era5-land-monthly-means'
            >>> len(info["variables"]) == 60
            True
            >>> "2m-temperature" in info["variables"]
            True

            ```
    """
    ds = self.get_dataset(dataset_name)
    return {
        "dataset": dataset_name,
        "monthly": ds.monthly,
        "pressure_level": ds.pressure_level,
        "extras": dict(ds.extras),
        "variables": sorted(ds.variables),
    }

download_job(job_id, target, chunk_size=1 << 20) #

Download the result asset of a successful CDS job.

Thin wrapper that delegates to :func:earthlens.ecmwf.jobs.download_job (N3); see that for the full docstring.

Source code in src/earthlens/ecmwf/catalog.py
def download_job(
    self,
    job_id: str,
    target: Path | str,
    chunk_size: int = 1 << 20,
) -> Path:
    """Download the result asset of a successful CDS job.

    Thin wrapper that delegates to
    :func:`earthlens.ecmwf.jobs.download_job` (N3); see that for
    the full docstring.
    """
    return _download_job_impl(job_id, target, chunk_size=chunk_size)

get_catalog() #

Return the structural per-dataset map.

Satisfies the abstract base's contract; the actual parsing is done in :func:model_post_init.

Returns:

Type Description
dict[str, Dataset]

dict[str, Dataset]: One entry per CDS dataset. Same

dict[str, Dataset]

object as :attr:datasets.

Examples:

  • Inspect the dataset count and a sample:

    >>> from earthlens.ecmwf import Catalog
    >>> mapping = Catalog().get_catalog()
    >>> "reanalysis-era5-single-levels" in mapping
    True
    >>> mapping["reanalysis-era5-single-levels"].monthly
    'reanalysis-era5-single-levels-monthly-means'
    
Source code in src/earthlens/ecmwf/catalog.py
def get_catalog(self) -> dict[str, Dataset]:
    """Return the structural per-dataset map.

    Satisfies the abstract base's contract; the actual parsing
    is done in :func:`model_post_init`.

    Returns:
        dict[str, Dataset]: One entry per CDS dataset. Same
        object as :attr:`datasets`.

    Examples:
        - Inspect the dataset count and a sample:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> mapping = Catalog().get_catalog()
            >>> "reanalysis-era5-single-levels" in mapping
            True
            >>> mapping["reanalysis-era5-single-levels"].monthly
            'reanalysis-era5-single-levels-monthly-means'

            ```
    """
    return self.datasets

get_variable(dataset_name, variable_name) #

Return the :class:Variable for a (dataset, code) pair.

Parameters:

Name Type Description Default
dataset_name str

CDS dataset short name as it appears as a key in :attr:datasets (e.g. "reanalysis-era5-single-levels").

required
variable_name str

Short variable code as it appears as a YAML key under that dataset (e.g. "2m-temperature", "total-precipitation").

required

Returns:

Name Type Description
Variable Variable

Per-variable metadata loaded from

Variable

cds_data_catalog.yaml.

Raises:

Type Description
KeyError

If dataset_name is not curated, or if variable_name is not declared under that dataset.

Examples:

  • Look up a single-level ERA5 variable and read its CDS dataset and NetCDF short name:

    >>> from earthlens.ecmwf import Catalog
    >>> spec = Catalog().get_variable(
    ...     "reanalysis-era5-single-levels", "2m-temperature"
    ... )
    >>> spec.cds_dataset
    'reanalysis-era5-single-levels'
    >>> spec.nc_variable, spec.units
    ('t2m', 'K')
    
    - Pressure-level variables expose cds_pressure_level:

    >>> from earthlens.ecmwf import Catalog
    >>> spec = Catalog().get_variable(
    ...     "reanalysis-era5-pressure-levels", "temperature"
    ... )
    >>> spec.cds_pressure_level
    ['1000']
    
    - The same short code under a different dataset is a different Variable:

    >>> from earthlens.ecmwf import Catalog
    >>> Catalog().get_variable(
    ...     "reanalysis-era5-land", "2m-temperature"
    ... ).cds_dataset
    'reanalysis-era5-land'
    
    - Unknown dataset or variable raises KeyError:

    >>> from earthlens.ecmwf import Catalog
    >>> Catalog().get_variable(
    ...     "reanalysis-era5-single-levels", "not-a-variable"
    ... )
    Traceback (most recent call last):
        ...
    KeyError: 'not-a-variable'
    
Source code in src/earthlens/ecmwf/catalog.py
def get_variable(self, dataset_name: str, variable_name: str) -> Variable:
    """Return the :class:`Variable` for a `(dataset, code)` pair.

    Args:
        dataset_name: CDS dataset short name as it appears as a
            key in :attr:`datasets` (e.g.
            `"reanalysis-era5-single-levels"`).
        variable_name: Short variable code as it appears as a
            YAML key under that dataset (e.g.
            `"2m-temperature"`, `"total-precipitation"`).

    Returns:
        Variable: Per-variable metadata loaded from
        `cds_data_catalog.yaml`.

    Raises:
        KeyError: If `dataset_name` is not curated, or if
            `variable_name` is not declared under that dataset.

    Examples:
        - Look up a single-level ERA5 variable and read its CDS
          dataset and NetCDF short name:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> spec = Catalog().get_variable(
            ...     "reanalysis-era5-single-levels", "2m-temperature"
            ... )
            >>> spec.cds_dataset
            'reanalysis-era5-single-levels'
            >>> spec.nc_variable, spec.units
            ('t2m', 'K')

            ```
        - Pressure-level variables expose `cds_pressure_level`:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> spec = Catalog().get_variable(
            ...     "reanalysis-era5-pressure-levels", "temperature"
            ... )
            >>> spec.cds_pressure_level
            ['1000']

            ```
        - The same short code under a different dataset is a
          different Variable:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> Catalog().get_variable(
            ...     "reanalysis-era5-land", "2m-temperature"
            ... ).cds_dataset
            'reanalysis-era5-land'

            ```
        - Unknown dataset or variable raises `KeyError`:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> Catalog().get_variable(
            ...     "reanalysis-era5-single-levels", "not-a-variable"
            ... )
            Traceback (most recent call last):
                ...
            KeyError: 'not-a-variable'

            ```
    """
    return self.datasets[dataset_name].variables[variable_name]

health() #

Report structural hygiene issues across the loaded catalog (L1).

Returns a mapping check_name -> list of "<dataset>/<variable>" offenders. An empty list means the check is currently passing; an empty dict means the catalog is clean. Most schema-level invariants (duplicate keys, unknown fields, missing required fields, legacy MARS keys in extras) are already enforced at load time — this method covers the residual data-quality checks that can't be expressed in the pydantic schema.

Checks reported:

  • variable_missing_nc_variable — variables whose nc_variable is empty or whitespace-only (would break downstream NetCDF reads).
  • dataset_without_variables — datasets carrying zero curated variables. Should always be [] since the loader rejects these; included for defence in depth.
Source code in src/earthlens/ecmwf/catalog.py
def health(self) -> dict[str, list[str]]:
    """Report structural hygiene issues across the loaded catalog (L1).

    Returns a mapping `check_name -> list of "<dataset>/<variable>"
    offenders`. An empty list means the check is currently passing;
    an empty dict means the catalog is clean. Most schema-level
    invariants (duplicate keys, unknown fields, missing required
    fields, legacy MARS keys in `extras`) are already enforced at
    load time — this method covers the residual data-quality checks
    that can't be expressed in the pydantic schema.

    Checks reported:

    * `variable_missing_nc_variable` — variables whose
      `nc_variable` is empty or whitespace-only (would break
      downstream NetCDF reads).
    * `dataset_without_variables` — datasets carrying zero
      curated variables. Should always be `[]` since the loader
      rejects these; included for defence in depth.
    """
    missing_nc: list[str] = []
    empty_dataset: list[str] = []
    unregistered_provider: list[str] = []
    used_providers: set[str] = set()
    for ds_name, ds in self.datasets.items():
        if not ds.variables:
            empty_dataset.append(ds_name)
            continue
        for var_code, var in ds.variables.items():
            if not var.nc_variable or not var.nc_variable.strip():
                missing_nc.append(f"{ds_name}/{var_code}")
        if ds.provider:
            used_providers.add(ds.provider)
            if ds.provider not in self.providers:
                unregistered_provider.append(ds_name)
    unused_provider = sorted(set(self.providers) - used_providers)
    return {
        "variable_missing_nc_variable": sorted(missing_nc),
        "dataset_without_variables": sorted(empty_dataset),
        "unregistered_provider": sorted(unregistered_provider),
        "unused_provider": unused_provider,
    }

list_recent_jobs(status=None, max_age_min=60, limit=50) #

Return the user's recent CDS retrieval jobs.

Thin wrapper that delegates to :func:earthlens.ecmwf.jobs.list_recent_jobs (N3); see that for the full docstring. Kept on Catalog as a convenience so Catalog().list_recent_jobs(...) keeps working.

Source code in src/earthlens/ecmwf/catalog.py
def list_recent_jobs(
    self,
    status: str | None = None,
    max_age_min: int = 60,
    limit: int = 50,
) -> list[dict[str, Any]]:
    """Return the user's recent CDS retrieval jobs.

    Thin wrapper that delegates to
    :func:`earthlens.ecmwf.jobs.list_recent_jobs` (N3); see that
    for the full docstring. Kept on `Catalog` as a convenience so
    `Catalog().list_recent_jobs(...)` keeps working.
    """
    return _list_recent_jobs_impl(
        status=status, max_age_min=max_age_min, limit=limit
    )

load(catalog_path=None, providers_path=None) classmethod #

Read the CDS catalog + providers registry from disk (cached).

Mirrors :meth:earthlens.gee.Catalog.load so the two backends feel identical. Validates that every Dataset.provider slug is in the registry; an unregistered slug is a load-time error.

Parameters:

Name Type Description Default
catalog_path Path | None

Path to a cds_data_catalog.yaml-shaped file. Defaults to module-level :data:CATALOG_PATH.

None
providers_path Path | None

Path to providers.yaml. Defaults to module-level :data:PROVIDERS_PATH.

None

Returns:

Type Description
Catalog

A fully-populated :class:Catalog.

Raises:

Type Description
ValueError

Propagated from :func:_load_catalog_data or :func:earthlens.base.providers.load_providers, plus an unregistered-slug error if the YAML references a provider not in the registry.

Source code in src/earthlens/ecmwf/catalog.py
@classmethod
def load(
    cls,
    catalog_path: Path | None = None,
    providers_path: Path | None = None,
) -> Catalog:
    """Read the CDS catalog + providers registry from disk (cached).

    Mirrors :meth:`earthlens.gee.Catalog.load` so the two backends
    feel identical. Validates that every `Dataset.provider` slug
    is in the registry; an unregistered slug is a load-time error.

    Args:
        catalog_path: Path to a `cds_data_catalog.yaml`-shaped
            file. Defaults to module-level :data:`CATALOG_PATH`.
        providers_path: Path to `providers.yaml`. Defaults to
            module-level :data:`PROVIDERS_PATH`.

    Returns:
        A fully-populated :class:`Catalog`.

    Raises:
        ValueError: Propagated from :func:`_load_catalog_data` or
            :func:`earthlens.base.providers.load_providers`, plus
            an unregistered-slug error if the YAML references a
            provider not in the registry.
    """
    catalog_path = catalog_path if catalog_path is not None else CATALOG_PATH
    providers_path = providers_path if providers_path is not None else PROVIDERS_PATH
    available_datasets, datasets = _load_catalog_data(catalog_path)
    providers = load_providers(providers_path)
    unknown = sorted(
        {d.provider for d in datasets.values() if d.provider and d.provider not in providers}
    )
    if unknown:
        raise ValueError(
            f"the following provider slugs are referenced by "
            f"`{catalog_path.name}` but missing from {providers_path}: "
            f"{unknown}. Add them to providers.yaml or fix the typo."
        )
    return cls(
        available_datasets=list(available_datasets),
        datasets=dict(datasets),
        providers=dict(providers),
    )

minimal_valid_request(dataset_name) #

Return a known-valid minimal request for dataset_name.

Walks the dataset's published constraints.json (cached per-process) and returns the first entry expanded into a request dict with one value per selector. Useful for:

  • verifying a CDS account is set up correctly (submit the returned dict via :meth:cdsapi.Client.retrieve and watch for a NetCDF rather than a 400),
  • seeing what a valid extras schema looks like for a new dataset before authoring catalog rows,
  • starting points for tests.

The returned request always carries data_format: netcdf; the rest is whatever the first constraint entry enumerates.

Parameters:

Name Type Description Default
dataset_name str

CDS dataset short name. Does not need to be in :attr:datasets — the constraints endpoint is hit directly so any addressable dataset works.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A request dict ready to pass to

dict[str, Any]

meth:cdsapi.Client.retrieve. Empty dict (besides

dict[str, Any]

data_format) when the dataset's constraints are

dict[str, Any]

empty / unreachable.

Examples:

  • Inspect ECMWF's published shape for a new dataset before authoring rows. Marked # doctest: +SKIP because it requires network access:

    >>> from earthlens.ecmwf import Catalog
    >>> req = Catalog().minimal_valid_request(  # doctest: +SKIP
    ...     "reanalysis-cerra-land",
    ... )
    >>> sorted(req.keys())  # doctest: +SKIP
    ['data_format', 'day', 'leadtime_hour', 'level_type', ...]
    
Source code in src/earthlens/ecmwf/catalog.py
def minimal_valid_request(self, dataset_name: str) -> dict[str, Any]:
    """Return a known-valid minimal request for `dataset_name`.

    Walks the dataset's published `constraints.json` (cached
    per-process) and returns the first entry expanded into a
    request dict with one value per selector. Useful for:

    * verifying a CDS account is set up correctly (submit the
      returned dict via :meth:`cdsapi.Client.retrieve` and watch
      for a NetCDF rather than a 400),
    * seeing what a valid extras schema looks like for a new
      dataset before authoring catalog rows,
    * starting points for tests.

    The returned request always carries `data_format: netcdf`;
    the rest is whatever the first constraint entry enumerates.

    Args:
        dataset_name: CDS dataset short name. Does not need to be
            in :attr:`datasets` — the constraints endpoint is
            hit directly so any addressable dataset works.

    Returns:
        dict[str, Any]: A request dict ready to pass to
        :meth:`cdsapi.Client.retrieve`. Empty dict (besides
        `data_format`) when the dataset's constraints are
        empty / unreachable.

    Examples:
        - Inspect ECMWF's published shape for a new dataset
          before authoring rows. Marked `# doctest: +SKIP`
          because it requires network access:

            ```python
            >>> from earthlens.ecmwf import Catalog
            >>> req = Catalog().minimal_valid_request(  # doctest: +SKIP
            ...     "reanalysis-cerra-land",
            ... )
            >>> sorted(req.keys())  # doctest: +SKIP
            ['data_format', 'day', 'leadtime_hour', 'level_type', ...]

            ```
    """
    constraints = fetch_constraints(dataset_name)
    request: dict[str, Any] = {"data_format": "netcdf"}
    if not constraints:
        return request
    # Pick the first entry that has at least one variable —
    # entries with empty `variable` lists are dataset-form
    # placeholders that don't make a usable retrieve request.
    for entry in constraints:
        if entry.get("variable"):
            for key, value in entry.items():
                if isinstance(value, list) and value:
                    request[key] = value[:1]
                else:
                    request[key] = value
            return request
    # No entry had variables — fall back to the first one anyway
    # (some datasets identify the data column via an extra rather
    # than a `variable` list).
    first = constraints[0]
    for key, value in first.items():
        if isinstance(value, list) and value:
            request[key] = value[:1]
        else:
            request[key] = value
    return request

model_post_init(__context) #

Auto-load cds_data_catalog.yaml when the user didn't supply one.

Catalog() with no args is sugar for Catalog.load() — it reads the bundled YAML through the (path, mtime_ns)-keyed cache so repeated construction is ~1 ms. If the caller passed datasets=..., the disk read is skipped (test path; see :meth:load for the heavy-lifting classmethod).

Raises:

Type Description
ValueError

When auto-loading, propagates the same errors as :meth:load.

Source code in src/earthlens/ecmwf/catalog.py
def model_post_init(self, __context: Any) -> None:
    """Auto-load `cds_data_catalog.yaml` when the user didn't supply one.

    `Catalog()` with no args is sugar for `Catalog.load()` — it
    reads the bundled YAML through the `(path, mtime_ns)`-keyed
    cache so repeated construction is ~1 ms. If the caller passed
    `datasets=...`, the disk read is skipped (test path; see
    :meth:`load` for the heavy-lifting classmethod).

    Raises:
        ValueError: When auto-loading, propagates the same errors
            as :meth:`load`.
    """
    if self.datasets:
        return
    loaded = Catalog.load()
    self.available_datasets = loaded.available_datasets
    self.datasets = loaded.datasets
    self.providers = loaded.providers