Skip to content

Downloadable Class Reference

Abstract Base Class

geefetch.data.downloadables.abc

DownloadableABC

Bases: ABC

Source code in src/geefetch/data/downloadables/abc.py
class DownloadableABC(ABC):
    @abstractmethod
    def download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        **kwargs: Any,
    ) -> None:
        """Download data.

        Parameters
        ----------
        out : Path
            The file to download the data to.
        region : GeoBoundingBox
            The AOI.
        crs : CRS
            The CRS in which `region` is expressed and in which to express the data.
        bands : list[str]
            The bands (for images) or properties (for collections) to select for download.
        **kwargs : Any
            Any additional necessary arguments.
        """
        ...
download(out, region, crs, bands, **kwargs) abstractmethod

Download data.

Parameters:

Name Type Description Default
out Path

The file to download the data to.

required
region GeoBoundingBox

The AOI.

required
crs CRS

The CRS in which region is expressed and in which to express the data.

required
bands list[str]

The bands (for images) or properties (for collections) to select for download.

required
**kwargs Any

Any additional necessary arguments.

{}
Source code in src/geefetch/data/downloadables/abc.py
@abstractmethod
def download(
    self,
    out: Path,
    region: GeoBoundingBox,
    crs: CRS,
    bands: list[str],
    **kwargs: Any,
) -> None:
    """Download data.

    Parameters
    ----------
    out : Path
        The file to download the data to.
    region : GeoBoundingBox
        The AOI.
    crs : CRS
        The CRS in which `region` is expressed and in which to express the data.
    bands : list[str]
        The bands (for images) or properties (for collections) to select for download.
    **kwargs : Any
        Any additional necessary arguments.
    """
    ...

Collection

geefetch.data.downloadables.collection

This module provides downloading utility functions for Google Earth Engine's FeatureCollection, similar to what geedim provides for Image and ImageCollection.

DownloadableGEECollection

Bases: DownloadableABC

Downloads feature collections from Google Earth Engine.

This class handles downloading Earth Engine FeatureCollections to local files in either GeoJSON or Parquet format. It implements automatic splitting of large collection requests to handle Earth Engine compute limits, with recursive retries when a download fails.

It is thread safe.

Parameters:

Name Type Description Default
collection FeatureCollection

The Earth Engine FeatureCollection to download.

required
Source code in src/geefetch/data/downloadables/collection.py
class DownloadableGEECollection(DownloadableABC):
    """Downloads feature collections from Google Earth Engine.

    This class handles downloading Earth Engine FeatureCollections to local files
    in either GeoJSON or Parquet format. It implements automatic splitting of large
    collection requests to handle Earth Engine compute limits, with recursive retries
    when a download fails.

    It is thread safe.

    Parameters
    ----------
    collection : FeatureCollection
        The Earth Engine FeatureCollection to download.
    """

    _lock = threading.Lock()

    def __init__(self, collection: FeatureCollection):
        self.collection = collection

    def _get_download_url(
        self, collection: FeatureCollection, format: Format
    ) -> tuple[requests.Response, str]:
        """Get tile download url and response."""
        with self._lock:
            url = collection.getDownloadURL(filetype=format.to_str())
            return requests.get(url, stream=True), url

    def download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        format: Format = Format.GEOJSON,
        **kwargs: Any,
    ) -> None:
        """Download a FeatureCollection in one go.
        It is up to the caller to make sure that the collection does not exceed
        Google Earth Engine compute limit.

        Parameters
        ----------
        out : Path
            Path to the geojson file to download the collection to.
        region : GeoBoundingBox
            The ROI.
        crs : CRS
            The CRS to use for the features' geometries.
        bands : list[str]
            Properties of the collection to select for download.
        format : Format
            The desired filetype.
        **kwargs : Any
            Accepted but ignored additional arguments.
        """
        for key in kwargs:
            if key not in ["scale", "progress", "max_tile_size"]:
                log.warning(f"Argument {key} is ignored.")
        tmp_out = out.with_suffix(f".tmp{out.suffix}")
        tmp_out.unlink(missing_ok=True)
        self._recursively_download(tmp_out, region, crs, bands, format)
        tmp_out.replace(out)

    def _recursively_download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        format: Format = Format.GEOJSON,
        _split_recursion_depth: int = 0,
        **kwargs: Any,
    ) -> None:
        for key in kwargs:
            if key not in ["scale", "progress", "max_tile_size"]:
                log.warning(f"Argument {key} is ignored.")
        old_crs = crs
        if format == Format.GEOJSON and crs != WGS84:
            log.warning(f".geojson files must be in WGS84. Ignoring argument {crs=}.")
            crs = WGS84
        elif format == Format.PARQUET:
            crs = WGS84

        # get image download url and response
        collection = (
            self.collection.filterBounds(region.to_ee_geometry())
            .select(bands)
            .map(lambda feature: feature.transform(f"EPSG:{crs.to_epsg()}"))
        )
        response, _ = self._get_download_url(
            collection, Format.GEOJSON if format == Format.PARQUET else format
        )

        def handle_error_response(response: requests.Response) -> None:
            resp_dict = response.json()
            if "error" in resp_dict and "message" in resp_dict["error"]:
                msg = resp_dict["error"]["message"]
                if msg == "Unable to compute table: java.io.IOException: No space left on device":
                    if _split_recursion_depth > 3:
                        log.error(
                            "Attempted to split the download regions 3 times. "
                            f"Still getting error: {msg}. Aborting."
                        )
                        raise OSError(msg)
                    log.debug(
                        f"Caught GEE exception '[black]{msg}[/]' for tile {out}. "
                        f"Attempting to split into smaller regions ({_split_recursion_depth=})."
                    )
                    self._split_then_download(
                        out,
                        region,
                        old_crs,
                        bands,
                        format,
                        _split_recursion_depth=_split_recursion_depth + 1,
                        **kwargs,
                    )
                    return
                ex_msg = f"Error downloading tile: {msg}"
            else:
                ex_msg = str(response.json())
            raise OSError(ex_msg)

        if not response.ok:
            handle_error_response(response)
            return

        if format == Format.PARQUET:
            with tempfile.NamedTemporaryFile(suffix=".geojson", delete=False) as tmp_file:
                for data in response.iter_content(chunk_size=1024):
                    tmp_file.write(data)
                tmp_file.flush()
                gdf = gpd.read_file(tmp_file.name).to_crs(old_crs)
                assert isinstance(gdf, gpd.GeoDataFrame)
                Path(tmp_file.name).unlink()
            gdf.reset_index(inplace=True, drop=True)
            gdf.to_parquet(out)
            return
        with out.open("wb") as geojsonfile:
            for data in response.iter_content(chunk_size=1024):
                geojsonfile.write(data)

    def _split_then_download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        format: Format = Format.GEOJSON,
        _split_recursion_depth: int = 0,
        **kwargs: Any,
    ) -> None:
        match format:
            case Format.GEOJSON | Format.PARQUET:
                pass
            case _:
                raise NotImplementedError(
                    f"Splitting and merging is not supported for download format {format}."
                )
        center_northing, center_easting = region.center
        northings = [region.bottom, center_northing, region.top]
        eastings = [region.left, center_easting, region.right]
        regions = [
            GeoBoundingBox(left, bottom, right, top, crs=region.crs)
            for left, right in zip(eastings[:-1], eastings[1:], strict=True)
            for bottom, top in zip(northings[:-1], northings[1:], strict=True)
        ]

        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_paths = []
            for i, region in enumerate(regions):
                tmp_path = Path(tmp_dir) / f"{i}.{format.to_str()}"
                tmp_paths.append(tmp_path)
                self._recursively_download(
                    tmp_path,
                    region,
                    WGS84,
                    bands,
                    Format.GEOJSON,
                    _split_recursion_depth,
                    **kwargs,
                )
                log.debug(f"Downloaded [{i + 1}/4] split for {out}.")
            gdf = merge_geojson(tmp_paths)
        if format == Format.PARQUET:
            gdf.to_crs(crs).to_parquet(out)
        else:
            gdf.to_file(out)
download(out, region, crs, bands, format=Format.GEOJSON, **kwargs)

Download a FeatureCollection in one go. It is up to the caller to make sure that the collection does not exceed Google Earth Engine compute limit.

Parameters:

Name Type Description Default
out Path

Path to the geojson file to download the collection to.

required
region GeoBoundingBox

The ROI.

required
crs CRS

The CRS to use for the features' geometries.

required
bands list[str]

Properties of the collection to select for download.

required
format Format

The desired filetype.

GEOJSON
**kwargs Any

Accepted but ignored additional arguments.

{}
Source code in src/geefetch/data/downloadables/collection.py
def download(
    self,
    out: Path,
    region: GeoBoundingBox,
    crs: CRS,
    bands: list[str],
    format: Format = Format.GEOJSON,
    **kwargs: Any,
) -> None:
    """Download a FeatureCollection in one go.
    It is up to the caller to make sure that the collection does not exceed
    Google Earth Engine compute limit.

    Parameters
    ----------
    out : Path
        Path to the geojson file to download the collection to.
    region : GeoBoundingBox
        The ROI.
    crs : CRS
        The CRS to use for the features' geometries.
    bands : list[str]
        Properties of the collection to select for download.
    format : Format
        The desired filetype.
    **kwargs : Any
        Accepted but ignored additional arguments.
    """
    for key in kwargs:
        if key not in ["scale", "progress", "max_tile_size"]:
            log.warning(f"Argument {key} is ignored.")
    tmp_out = out.with_suffix(f".tmp{out.suffix}")
    tmp_out.unlink(missing_ok=True)
    self._recursively_download(tmp_out, region, crs, bands, format)
    tmp_out.replace(out)

Geedim Downloadable

geefetch.data.downloadables.geedim

DownloadableGeedimImage

Bases: DownloadableABC

Simple wrapper around geedim.PatchedBaseImage to adhere to the DownloadableABC interface.

Source code in src/geefetch/data/downloadables/geedim.py
class DownloadableGeedimImage(DownloadableABC):
    """Simple wrapper around `geedim.PatchedBaseImage` to adhere to the DownloadableABC
    interface."""

    def __init__(self, image: PatchedBaseImage):
        self.image = image

    def download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        max_tile_size: int | None = None,
        num_threads: int | None = None,
        scale: int | None = None,
        dtype: str = "float32",
        progress: Progress | None = None,
        **kwargs: Any,
    ) -> None:
        for key in kwargs:
            log.warning(f"Argument {key} is ignored.")
        self.image.download(
            out,
            region=region.to_ee_geometry(),
            crs=f"EPSG:{crs.to_epsg()}",
            bands=bands,
            max_tile_size=max_tile_size,
            num_threads=num_threads,
            scale=scale,
            dtype=dtype,
            progress=progress,
        )

DownloadableGeedimImageCollection

Bases: DownloadableABC

Wrapper to download a collection of geedim images.

Source code in src/geefetch/data/downloadables/geedim.py
class DownloadableGeedimImageCollection(DownloadableABC):
    """Wrapper to download a collection of geedim images."""

    IMAGE_ID_REGEXP = r"[a-zA-Z0-9_-]+"

    def __init__(self, id_to_images: dict[str, PatchedBaseImage]):
        self.id_to_images = id_to_images

    def download(
        self,
        out: Path,
        region: GeoBoundingBox,
        crs: CRS,
        bands: list[str],
        max_tile_size: int | None = None,
        num_threads: int | None = None,
        scale: int | None = None,
        dtype: str = "float32",
        progress: Progress | None = None,
        **kwargs: Any,
    ) -> None:
        for key in kwargs:
            log.warning(f"Argument {key} is ignored.")
        if out.suffix != "":
            log.warning(f"Directory name for download has a suffix: {out.suffix}.")
        if not out.exists():
            out.mkdir()
        if not out.is_dir():
            raise ValueError(f"Path {out} was expected to be a directory.")

        with ExitStack() as stack:
            if progress is None:
                progress = stack.enter_context(default_bar())
            task = progress.add_task(
                f"[magenta]Downloading time series to [cyan]{out}[/]",
                total=len(self.id_to_images),
            )
            for id_, image in self.id_to_images.items():
                if not re.fullmatch(DownloadableGeedimImageCollection.IMAGE_ID_REGEXP, id_):
                    raise ValueError(
                        f"Image id {id_} is not valid "
                        "(should be alphanumeric, optionally using underscores/dashes)."
                    )
                dst_path = out / f"{id_}.tif"
                if dst_path.exists():
                    log.debug(f"Found existing {dst_path}. Skipping download.")
                    continue
                image.download(
                    dst_path,
                    region=region.to_ee_geometry(),
                    crs=f"EPSG:{crs.to_epsg()}",
                    bands=bands,
                    max_tile_size=max_tile_size,
                    num_threads=num_threads,
                    scale=scale,
                    dtype=dtype,
                    progress=progress,
                )
                log.debug(f"Downloaded image to {dst_path}.")
                progress.advance(task)