diff --git a/docs/stac.md b/docs/stac.md new file mode 100644 index 0000000000..ea1e2104b6 --- /dev/null +++ b/docs/stac.md @@ -0,0 +1,3 @@ +# stac module + +::: leafmap.stac diff --git a/leafmap/common.py b/leafmap/common.py index 1259306afd..84d4e49d16 100644 --- a/leafmap/common.py +++ b/leafmap/common.py @@ -14,6 +14,7 @@ import ipywidgets as widgets import whitebox from typing import Union, List, Dict, Tuple +from .stac import * try: from IPython.display import display, IFrame @@ -21,128 +22,6 @@ pass -class TitilerEndpoint: - """This class contains the methods for the titiler endpoint.""" - - def __init__( - self, - endpoint="https://titiler.xyz", - name="stac", - TileMatrixSetId="WebMercatorQuad", - ): - """Initialize the TitilerEndpoint object. - - Args: - endpoint (str, optional): The endpoint of the titiler server. Defaults to "https://titiler.xyz". - name (str, optional): The name to be used in the file path. Defaults to "stac". - TileMatrixSetId (str, optional): The TileMatrixSetId to be used in the file path. Defaults to "WebMercatorQuad". - """ - self.endpoint = endpoint - self.name = name - self.TileMatrixSetId = TileMatrixSetId - - def url_for_stac_item(self): - return f"{self.endpoint}/{self.name}/{self.TileMatrixSetId}/tilejson.json" - - def url_for_stac_assets(self): - return f"{self.endpoint}/{self.name}/assets" - - def url_for_stac_bounds(self): - return f"{self.endpoint}/{self.name}/bounds" - - def url_for_stac_info(self): - return f"{self.endpoint}/{self.name}/info" - - def url_for_stac_info_geojson(self): - return f"{self.endpoint}/{self.name}/info.geojson" - - def url_for_stac_statistics(self): - return f"{self.endpoint}/{self.name}/statistics" - - def url_for_stac_pixel_value(self, lon, lat): - return f"{self.endpoint}/{self.name}/point/{lon},{lat}" - - def url_for_stac_wmts(self): - return ( - f"{self.endpoint}/{self.name}/{self.TileMatrixSetId}/WMTSCapabilities.xml" - ) - - -class PlanetaryComputerEndpoint(TitilerEndpoint): - """This class contains the methods for the Microsoft Planetary Computer endpoint.""" - - def __init__( - self, - endpoint="https://planetarycomputer.microsoft.com/api/data/v1", - name="item", - TileMatrixSetId="WebMercatorQuad", - ): - """Initialize the PlanetaryComputerEndpoint object. - - Args: - endpoint (str, optional): The endpoint of the titiler server. Defaults to "https://planetarycomputer.microsoft.com/api/data/v1". - name (str, optional): The name to be used in the file path. Defaults to "item". - TileMatrixSetId (str, optional): The TileMatrixSetId to be used in the file path. Defaults to "WebMercatorQuad". - """ - super().__init__(endpoint, name, TileMatrixSetId) - - def url_for_stac_collection(self): - return f"{self.endpoint}/collection/{self.TileMatrixSetId}/tilejson.json" - - def url_for_collection_assets(self): - return f"{self.endpoint}/collection/assets" - - def url_for_collection_bounds(self): - return f"{self.endpoint}/collection/bounds" - - def url_for_collection_info(self): - return f"{self.endpoint}/collection/info" - - def url_for_collection_info_geojson(self): - return f"{self.endpoint}/collection/info.geojson" - - def url_for_collection_pixel_value(self, lon, lat): - return f"{self.endpoint}/collection/point/{lon},{lat}" - - def url_for_collection_wmts(self): - return f"{self.endpoint}/collection/{self.TileMatrixSetId}/WMTSCapabilities.xml" - - def url_for_collection_lat_lon_assets(self, lng, lat): - return f"{self.endpoint}/collection/{lng},{lat}/assets" - - def url_for_collection_bbox_assets(self, minx, miny, maxx, maxy): - return f"{self.endpoint}/collection/{minx},{miny},{maxx},{maxy}/assets" - - def url_for_stac_mosaic(self, searchid): - return f"{self.endpoint}/mosaic/{searchid}/{self.TileMatrixSetId}/tilejson.json" - - def url_for_mosaic_info(self, searchid): - return f"{self.endpoint}/mosaic/{searchid}/info" - - def url_for_mosaic_lat_lon_assets(self, searchid, lon, lat): - return f"{self.endpoint}/mosaic/{searchid}/{lon},{lat}/assets" - - -def check_titiler_endpoint(titiler_endpoint=None): - """Returns the default titiler endpoint. - - Returns: - object: A titiler endpoint. - """ - if titiler_endpoint is None: - if os.environ.get("TITILER_ENDPOINT") is not None: - titiler_endpoint = os.environ.get("TITILER_ENDPOINT") - - if titiler_endpoint == "planetary-computer": - titiler_endpoint = PlanetaryComputerEndpoint() - else: - titiler_endpoint = "https://titiler.xyz" - elif titiler_endpoint in ["planetary-computer", "pc"]: - titiler_endpoint = PlanetaryComputerEndpoint() - - return titiler_endpoint - - class WhiteboxTools(whitebox.WhiteboxTools): """This class inherits the whitebox WhiteboxTools class.""" @@ -1014,840 +893,6 @@ def create_code_cell(code="", where="below"): ) -def cog_tile(url, bands=None, titiler_endpoint=None, **kwargs): - """Get a tile layer from a Cloud Optimized GeoTIFF (COG). - Source code adapted from https://developmentseed.org/titiler/examples/notebooks/Working_with_CloudOptimizedGeoTIFF_simple/ - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - bands (list, optional): List of bands to use. Defaults to None. - titiler_endpoint (str, optional): TiTiler endpoint. Defaults to "https://titiler.xyz". - **kwargs: Additional arguments to pass to the titiler endpoint. For more information about the available arguments, see https://developmentseed.org/titiler/endpoints/cog/#tiles. - For example, to apply a rescaling to multiple bands, use something like `rescale=["164,223","130,211","99,212"]`. - - Returns: - tuple: Returns the COG Tile layer URL and bounds. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - - kwargs["url"] = url - - band_names = cog_bands(url, titiler_endpoint) - - if isinstance(bands, str): - bands = [bands] - - if bands is None and "bidx" not in kwargs: - if len(band_names) >= 3: - kwargs["bidx"] = [1, 2, 3] - elif isinstance(bands, list) and "bidx" not in kwargs: - if all(isinstance(x, int) for x in bands): - if len(set(bands)) == 1: - bands = bands[0] - kwargs["bidx"] = bands - elif all(isinstance(x, str) for x in bands): - if len(set(bands)) == 1: - bands = bands[0] - kwargs["bidx"] = [band_names.index(x) + 1 for x in bands] - else: - raise ValueError("Bands must be a list of integers or strings.") - - if "palette" in kwargs: - kwargs["colormap_name"] = kwargs["palette"].lower() - del kwargs["palette"] - - if "bidx" not in kwargs: - kwargs["bidx"] = [1] - elif isinstance(kwargs["bidx"], int): - kwargs["bidx"] = [kwargs["bidx"]] - - if "rescale" not in kwargs: - stats = cog_stats(url, titiler_endpoint) - - if "message" not in stats: - try: - rescale = [] - for i in band_names: - rescale.append( - "{},{}".format( - stats[i]["percentile_2"], - stats[i]["percentile_98"], - ) - ) - kwargs["rescale"] = rescale - except Exception as e: - print(e) - - TileMatrixSetId = "WebMercatorQuad" - if "TileMatrixSetId" in kwargs.keys(): - TileMatrixSetId = kwargs["TileMatrixSetId"] - kwargs.pop("TileMatrixSetId") - - r = requests.get( - f"{titiler_endpoint}/cog/{TileMatrixSetId}/tilejson.json", params=kwargs - ).json() - return r["tiles"][0] - - -def cog_tile_vmin_vmax( - url, bands=None, titiler_endpoint=None, percentile=True, **kwargs -): - """Get a tile layer from a Cloud Optimized GeoTIFF (COG) and return the minimum and maximum values. - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - bands (list, optional): List of bands to use. Defaults to None. - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - percentile (bool, optional): Whether to use percentiles or not. Defaults to True. - Returns: - tuple: Returns the minimum and maximum values. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - stats = cog_stats(url, titiler_endpoint) - - if isinstance(bands, str): - bands = [bands] - - if bands is not None: - stats = {s: stats[s] for s in stats if s in bands} - - if percentile: - vmin = min([stats[s]["percentile_2"] for s in stats]) - vmax = max([stats[s]["percentile_98"] for s in stats]) - else: - vmin = min([stats[s]["min"] for s in stats]) - vmax = max([stats[s]["max"] for s in stats]) - - return vmin, vmax - - -def cog_mosaic( - links, - titiler_endpoint=None, - username="anonymous", - layername=None, - overwrite=False, - verbose=True, - **kwargs, -): - """Creates a COG mosaic from a list of COG URLs. - - Args: - links (list): A list containing COG HTTP URLs. - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - username (str, optional): User name for the titiler endpoint. Defaults to "anonymous". - layername ([type], optional): Layer name to use. Defaults to None. - overwrite (bool, optional): Whether to overwrite the layer name if existing. Defaults to False. - verbose (bool, optional): Whether to print out descriptive information. Defaults to True. - - Raises: - Exception: If the COG mosaic fails to create. - - Returns: - str: The tile URL for the COG mosaic. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if layername is None: - layername = "layer_" + random_string(5) - - try: - if verbose: - print("Creating COG masaic ...") - - # Create token - r = requests.post( - f"{titiler_endpoint}/tokens/create", - json={"username": username, "scope": ["mosaic:read", "mosaic:create"]}, - ).json() - token = r["token"] - - # Create mosaic - requests.post( - f"{titiler_endpoint}/mosaicjson/create", - json={ - "username": username, - "layername": layername, - "files": links, - # "overwrite": overwrite - }, - params={ - "access_token": token, - }, - ).json() - - r2 = requests.get( - f"{titiler_endpoint}/mosaicjson/{username}.{layername}/tilejson.json", - ).json() - - return r2["tiles"][0] - - except Exception as e: - raise Exception(e) - - -def cog_mosaic_from_file( - filepath, - skip_rows=0, - titiler_endpoint=None, - username="anonymous", - layername=None, - overwrite=False, - verbose=True, - **kwargs, -): - """Creates a COG mosaic from a csv/txt file stored locally for through HTTP URL. - - Args: - filepath (str): Local path or HTTP URL to the csv/txt file containing COG URLs. - skip_rows (int, optional): The number of rows to skip in the file. Defaults to 0. - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - username (str, optional): User name for the titiler endpoint. Defaults to "anonymous". - layername ([type], optional): Layer name to use. Defaults to None. - overwrite (bool, optional): Whether to overwrite the layer name if existing. Defaults to False. - verbose (bool, optional): Whether to print out descriptive information. Defaults to True. - - Returns: - str: The tile URL for the COG mosaic. - """ - import urllib - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - links = [] - if filepath.startswith("http"): - data = urllib.request.urlopen(filepath) - for line in data: - links.append(line.decode("utf-8").strip()) - - else: - with open(filepath) as f: - links = [line.strip() for line in f.readlines()] - - links = links[skip_rows:] - # print(links) - mosaic = cog_mosaic( - links, titiler_endpoint, username, layername, overwrite, verbose, **kwargs - ) - return mosaic - - -def cog_bounds(url, titiler_endpoint=None): - """Get the bounding box of a Cloud Optimized GeoTIFF (COG). - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - - Returns: - list: A list of values representing [left, bottom, right, top] - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - r = requests.get(f"{titiler_endpoint}/cog/bounds", params={"url": url}).json() - - if "bounds" in r.keys(): - bounds = r["bounds"] - else: - bounds = None - return bounds - - -def cog_center(url, titiler_endpoint=None): - """Get the centroid of a Cloud Optimized GeoTIFF (COG). - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - - Returns: - tuple: A tuple representing (longitude, latitude) - """ - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - bounds = cog_bounds(url, titiler_endpoint) - center = ((bounds[0] + bounds[2]) / 2, (bounds[1] + bounds[3]) / 2) # (lat, lon) - return center - - -def cog_bands(url, titiler_endpoint=None): - """Get band names of a Cloud Optimized GeoTIFF (COG). - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - - Returns: - list: A list of band names - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - r = requests.get( - f"{titiler_endpoint}/cog/info", - params={ - "url": url, - }, - ).json() - - bands = [b[0] for b in r["band_descriptions"]] - return bands - - -def cog_stats(url, titiler_endpoint=None): - """Get band statistics of a Cloud Optimized GeoTIFF (COG). - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - - Returns: - list: A dictionary of band statistics. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - r = requests.get( - f"{titiler_endpoint}/cog/statistics", - params={ - "url": url, - }, - ).json() - - return r - - -def cog_info(url, titiler_endpoint=None, return_geojson=False): - """Get band statistics of a Cloud Optimized GeoTIFF (COG). - - Args: - url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif - titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". - - Returns: - list: A dictionary of band info. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - info = "info" - if return_geojson: - info = "info.geojson" - - r = requests.get( - f"{titiler_endpoint}/cog/{info}", - params={ - "url": url, - }, - ).json() - - return r - - -def cog_pixel_value( - lon, - lat, - url, - bidx=None, - titiler_endpoint=None, - verbose=True, - **kwargs, -): - """Get pixel value from COG. - - Args: - lon (float): Longitude of the pixel. - lat (float): Latitude of the pixel. - url (str): HTTP URL to a COG, e.g., 'https://opendata.digitalglobe.com/events/california-fire-2020/pre-event/2018-02-16/pine-gulch-fire20/1030010076004E00.tif' - bidx (str, optional): Dataset band indexes (e.g bidx=1, bidx=1&bidx=2&bidx=3). Defaults to None. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - verbose (bool, optional): Print status messages. Defaults to True. - - Returns: - list: A dictionary of band info. - """ - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - kwargs["url"] = url - if bidx is not None: - kwargs["bidx"] = bidx - - r = requests.get(f"{titiler_endpoint}/cog/point/{lon},{lat}", params=kwargs).json() - bands = cog_bands(url, titiler_endpoint) - # if isinstance(titiler_endpoint, str): - # r = requests.get(f"{titiler_endpoint}/cog/point/{lon},{lat}", params=kwargs).json() - # else: - # r = requests.get( - # titiler_endpoint.url_for_stac_pixel_value(lon, lat), params=kwargs - # ).json() - - if "detail" in r: - if verbose: - print(r["detail"]) - return None - else: - values = r["values"] - result = dict(zip(bands, values)) - return result - - -def stac_tile( - url=None, - collection=None, - item=None, - assets=None, - bands=None, - titiler_endpoint=None, - **kwargs, -): - - """Get a tile layer from a single SpatialTemporal Asset Catalog (STAC) item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. - bands (list): A list of band names, e.g., ["SR_B7", "SR_B5", "SR_B4"] - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "https://planetarycomputer.microsoft.com/api/data/v1", "planetary-computer", "pc". Defaults to None. - - Returns: - str: Returns the STAC Tile layer URL. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - - if "palette" in kwargs: - kwargs["colormap_name"] = kwargs["palette"].lower() - del kwargs["palette"] - - if isinstance(bands, list) and len(set(bands)) == 1: - bands = bands[0] - - if isinstance(assets, list) and len(set(assets)) == 1: - assets = assets[0] - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - - if isinstance(titiler_endpoint, PlanetaryComputerEndpoint): - if isinstance(bands, str): - bands = bands.split(",") - if isinstance(assets, str): - assets = assets.split(",") - if assets is None and (bands is not None): - assets = bands - else: - kwargs["bidx"] = bands - - kwargs["assets"] = assets - - # if ("expression" in kwargs) and ("rescale" not in kwargs): - # stats = stac_stats( - # collection=collection, - # item=item, - # expression=kwargs["expression"], - # titiler_endpoint=titiler_endpoint, - # ) - # kwargs[ - # "rescale" - # ] = f"{stats[0]['percentile_2']},{stats[0]['percentile_98']}" - - # if ("asset_expression" in kwargs) and ("rescale" not in kwargs): - # stats = stac_stats( - # collection=collection, - # item=item, - # expression=kwargs["asset_expression"], - # titiler_endpoint=titiler_endpoint, - # ) - # kwargs[ - # "rescale" - # ] = f"{stats[0]['percentile_2']},{stats[0]['percentile_98']}" - - if ( - (assets is not None) - and ("asset_expression" not in kwargs) - and ("expression" not in kwargs) - and ("rescale" not in kwargs) - ): - stats = stac_stats( - collection=collection, - item=item, - assets=assets, - titiler_endpoint=titiler_endpoint, - ) - if "detail" not in stats: - - try: - percentile_2 = min([stats[s]["percentile_2"] for s in stats]) - percentile_98 = max([stats[s]["percentile_98"] for s in stats]) - except: - percentile_2 = min( - [ - stats[s][list(stats[s].keys())[0]]["percentile_2"] - for s in stats - ] - ) - percentile_98 = max( - [ - stats[s][list(stats[s].keys())[0]]["percentile_98"] - for s in stats - ] - ) - kwargs["rescale"] = f"{percentile_2},{percentile_98}" - else: - print(stats["detail"]) # When operation times out. - - else: - if isinstance(bands, str): - bands = bands.split(",") - if isinstance(assets, str): - assets = assets.split(",") - - if assets is None and (bands is not None): - assets = bands - else: - kwargs["asset_bidx"] = bands - kwargs["assets"] = assets - - TileMatrixSetId = "WebMercatorQuad" - if "TileMatrixSetId" in kwargs.keys(): - TileMatrixSetId = kwargs["TileMatrixSetId"] - kwargs.pop("TileMatrixSetId") - - if isinstance(titiler_endpoint, str): - r = requests.get( - f"{titiler_endpoint}/stac/{TileMatrixSetId}/tilejson.json", - params=kwargs, - ).json() - else: - r = requests.get(titiler_endpoint.url_for_stac_item(), params=kwargs).json() - - return r["tiles"][0] - - -def stac_bounds(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): - """Get the bounding box of a single SpatialTemporal Asset Catalog (STAC) item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A list of values representing [left, bottom, right, top] - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/bounds", params=kwargs).json() - else: - r = requests.get(titiler_endpoint.url_for_stac_bounds(), params=kwargs).json() - - bounds = r["bounds"] - return bounds - - -def stac_center(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): - """Get the centroid of a single SpatialTemporal Asset Catalog (STAC) item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - tuple: A tuple representing (longitude, latitude) - """ - bounds = stac_bounds(url, collection, item, titiler_endpoint, **kwargs) - center = ((bounds[0] + bounds[2]) / 2, (bounds[1] + bounds[3]) / 2) # (lon, lat) - return center - - -def stac_bands(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): - """Get band names of a single SpatialTemporal Asset Catalog (STAC) item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A list of band names - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/assets", params=kwargs).json() - else: - r = requests.get(titiler_endpoint.url_for_stac_assets(), params=kwargs).json() - - return r - - -def stac_stats( - url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs -): - """Get band statistics of a STAC item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A dictionary of band statistics. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - if assets is not None: - kwargs["assets"] = assets - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/statistics", params=kwargs).json() - else: - r = requests.get( - titiler_endpoint.url_for_stac_statistics(), params=kwargs - ).json() - - return r - - -def stac_info( - url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs -): - """Get band info of a STAC item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A dictionary of band info. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - if assets is not None: - kwargs["assets"] = assets - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/info", params=kwargs).json() - else: - r = requests.get(titiler_endpoint.url_for_stac_info(), params=kwargs).json() - - return r - - -def stac_info_geojson( - url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs -): - """Get band info of a STAC item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A dictionary of band info. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - if assets is not None: - kwargs["assets"] = assets - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/info.geojson", params=kwargs).json() - else: - r = requests.get( - titiler_endpoint.url_for_stac_info_geojson(), params=kwargs - ).json() - - return r - - -def stac_assets(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): - """Get all assets of a STAC item. - - Args: - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - - Returns: - list: A list of assets. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/assets", params=kwargs).json() - else: - r = requests.get(titiler_endpoint.url_for_stac_assets(), params=kwargs).json() - - return r - - -def stac_pixel_value( - lon, - lat, - url=None, - collection=None, - item=None, - assets=None, - titiler_endpoint=None, - verbose=True, - **kwargs, -): - """Get pixel value from STAC assets. - - Args: - lon (float): Longitude of the pixel. - lat (float): Latitude of the pixel. - url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json - collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. - item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. - assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. - titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. - verbose (bool, optional): Print out the error message. Defaults to True. - - Returns: - list: A dictionary of pixel values for each asset. - """ - - if url is None and collection is None: - raise ValueError("Either url or collection must be specified.") - - if collection is not None and titiler_endpoint is None: - titiler_endpoint = "planetary-computer" - - if url is not None: - kwargs["url"] = url - if collection is not None: - kwargs["collection"] = collection - if item is not None: - kwargs["item"] = item - - if assets is None: - assets = stac_assets( - url=url, - collection=collection, - item=item, - titiler_endpoint=titiler_endpoint, - ) - assets = ",".join(assets) - kwargs["assets"] = assets - - titiler_endpoint = check_titiler_endpoint(titiler_endpoint) - if isinstance(titiler_endpoint, str): - r = requests.get(f"{titiler_endpoint}/stac/{lon},{lat}", params=kwargs).json() - else: - r = requests.get( - titiler_endpoint.url_for_stac_pixel_value(lon, lat), params=kwargs - ).json() - - if "detail" in r: - if verbose: - print(r["detail"]) - return None - else: - values = [v[0] for v in r["values"]] - result = dict(zip(assets.split(","), values)) - return result - - def local_tile_pixel_value( lon, lat, @@ -4232,6 +3277,29 @@ def geom_type(in_geojson, encoding="utf-8"): raise Exception(e) +def geojson_to_gdf(in_geojson, encoding="utf-8", **kwargs): + """Converts a GeoJSON object to a geopandas GeoDataFrame. + + Args: + in_geojson (str | dict): The input GeoJSON file or GeoJSON object as a dict. + encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8". + + Returns: + geopandas.GeoDataFrame: A geopandas GeoDataFrame containing the GeoJSON object. + """ + + import geopandas as gpd + + if isinstance(in_geojson, dict): + out_file = temp_file_path(extension="geojson") + with open(out_file, "w") as f: + json.dump(in_geojson, f) + in_geojson = out_file + + gdf = gpd.read_file(in_geojson, encoding=encoding, **kwargs) + return gdf + + def geojson_to_df(in_geojson, encoding="utf-8", drop_geometry=True): """Converts a GeoJSON object to a pandas DataFrame. @@ -6700,6 +5768,21 @@ def image_projection(image, **kwargs): return client.metadata()["Projection"] +def image_set_crs(image, epsg): + """Define the CRS of an image. + + Args: + image (str): The input image filepath + epsg (int): The EPSG code of the CRS to set. + """ + + from rasterio.crs import CRS + import rasterio + + with rasterio.open(image, "r+") as rds: + rds.crs = CRS.from_epsg(epsg) + + def image_geotransform(image, **kwargs): """Get the geotransform of an image. @@ -8307,7 +7390,7 @@ def vector_to_gif( open_args={}, plot_args={}, ): - """Convert a vector to a gif. This function was inspired by by Johannes Uhl's shapefile2gif repo at + """Convert a vector to a gif. This function was inspired by by Johannes Uhl's shapefile2gif repo at https://github.com/johannesuhl/shapefile2gif. Credits to Johannes Uhl. Args: @@ -8666,3 +7749,77 @@ def arc_zoom_to_extent(xmin, ymin, xmax, ymax): # if isinstance(zoom, int): # scale = 156543.04 * math.cos(0) / math.pow(2, zoom) # view.camera.scale = scale # Not working properly + + +def vector_to_raster( + vector, + output, + field="FID", + assign="last", + nodata=True, + cell_size=None, + base=None, + callback=None, + verbose=False, + to_epsg=None, +): + """Convert a vector to a raster. + + Args: + vector (str | GeoPandas.GeoDataFrame): The input vector data, can be a file path or a GeoDataFrame. + output (str): The output raster file path. + field (str, optional): Input field name in attribute table. Defaults to 'FID'. + assign (str, optional): Assignment operation, where multiple points are in the same grid cell; options + include 'first', 'last' (default), 'min', 'max', 'sum', 'number'. Defaults to 'last'. + nodata (bool, optional): Background value to set to NoData. Without this flag, it will be set to 0.0. + cell_size (float, optional): Optionally specified cell size of output raster. Not used when base raster is specified + base (str, optional): Optionally specified input base raster file. Not used when a cell size is specified. Defaults to None. + callback (fuct, optional): A callback function to report progress. Defaults to None. + verbose (bool, optional): Whether to print progress to the console. Defaults to False. + to_epsg (integer, optional): Optionally specified the EPSG code to reproject the raster to. Defaults to None. + + """ + import geopandas as gpd + import whitebox + + output = os.path.abspath(output) + + if isinstance(vector, str): + gdf = gpd.read_file(vector) + elif isinstance(vector, gpd.GeoDataFrame): + gdf = vector + else: + raise TypeError("vector must be a file path or a GeoDataFrame") + + if to_epsg is None: + to_epsg = 3857 + + if to_epsg == 4326: + raise ValueError("to_epsg cannot be 4326") + + if gdf.crs.is_geographic: + gdf = gdf.to_crs(epsg=to_epsg) + vector = temp_file_path(extension=".shp") + gdf.to_file(vector) + else: + to_epsg = gdf.crs.to_epsg() + + wbt = whitebox.WhiteboxTools() + wbt.verbose = verbose + + goem_type = gdf.geom_type[0] + + if goem_type == "LineString": + wbt.vector_lines_to_raster( + vector, output, field, nodata, cell_size, base, callback + ) + elif goem_type == "Polygon": + wbt.vector_polygons_to_raster( + vector, output, field, nodata, cell_size, base, callback + ) + else: + wbt.vector_points_to_raster( + vector, output, field, assign, nodata, cell_size, base, callback + ) + + image_set_crs(output, to_epsg) diff --git a/leafmap/stac.py b/leafmap/stac.py new file mode 100644 index 0000000000..d0cf2861d6 --- /dev/null +++ b/leafmap/stac.py @@ -0,0 +1,1255 @@ +import os +import pystac +import requests + + +class TitilerEndpoint: + """This class contains the methods for the titiler endpoint.""" + + def __init__( + self, + endpoint="https://titiler.xyz", + name="stac", + TileMatrixSetId="WebMercatorQuad", + ): + """Initialize the TitilerEndpoint object. + + Args: + endpoint (str, optional): The endpoint of the titiler server. Defaults to "https://titiler.xyz". + name (str, optional): The name to be used in the file path. Defaults to "stac". + TileMatrixSetId (str, optional): The TileMatrixSetId to be used in the file path. Defaults to "WebMercatorQuad". + """ + self.endpoint = endpoint + self.name = name + self.TileMatrixSetId = TileMatrixSetId + + def url_for_stac_item(self): + return f"{self.endpoint}/{self.name}/{self.TileMatrixSetId}/tilejson.json" + + def url_for_stac_assets(self): + return f"{self.endpoint}/{self.name}/assets" + + def url_for_stac_bounds(self): + return f"{self.endpoint}/{self.name}/bounds" + + def url_for_stac_info(self): + return f"{self.endpoint}/{self.name}/info" + + def url_for_stac_info_geojson(self): + return f"{self.endpoint}/{self.name}/info.geojson" + + def url_for_stac_statistics(self): + return f"{self.endpoint}/{self.name}/statistics" + + def url_for_stac_pixel_value(self, lon, lat): + return f"{self.endpoint}/{self.name}/point/{lon},{lat}" + + def url_for_stac_wmts(self): + return ( + f"{self.endpoint}/{self.name}/{self.TileMatrixSetId}/WMTSCapabilities.xml" + ) + + +class PlanetaryComputerEndpoint(TitilerEndpoint): + """This class contains the methods for the Microsoft Planetary Computer endpoint.""" + + def __init__( + self, + endpoint="https://planetarycomputer.microsoft.com/api/data/v1", + name="item", + TileMatrixSetId="WebMercatorQuad", + ): + """Initialize the PlanetaryComputerEndpoint object. + + Args: + endpoint (str, optional): The endpoint of the titiler server. Defaults to "https://planetarycomputer.microsoft.com/api/data/v1". + name (str, optional): The name to be used in the file path. Defaults to "item". + TileMatrixSetId (str, optional): The TileMatrixSetId to be used in the file path. Defaults to "WebMercatorQuad". + """ + super().__init__(endpoint, name, TileMatrixSetId) + + def url_for_stac_collection(self): + return f"{self.endpoint}/collection/{self.TileMatrixSetId}/tilejson.json" + + def url_for_collection_assets(self): + return f"{self.endpoint}/collection/assets" + + def url_for_collection_bounds(self): + return f"{self.endpoint}/collection/bounds" + + def url_for_collection_info(self): + return f"{self.endpoint}/collection/info" + + def url_for_collection_info_geojson(self): + return f"{self.endpoint}/collection/info.geojson" + + def url_for_collection_pixel_value(self, lon, lat): + return f"{self.endpoint}/collection/point/{lon},{lat}" + + def url_for_collection_wmts(self): + return f"{self.endpoint}/collection/{self.TileMatrixSetId}/WMTSCapabilities.xml" + + def url_for_collection_lat_lon_assets(self, lng, lat): + return f"{self.endpoint}/collection/{lng},{lat}/assets" + + def url_for_collection_bbox_assets(self, minx, miny, maxx, maxy): + return f"{self.endpoint}/collection/{minx},{miny},{maxx},{maxy}/assets" + + def url_for_stac_mosaic(self, searchid): + return f"{self.endpoint}/mosaic/{searchid}/{self.TileMatrixSetId}/tilejson.json" + + def url_for_mosaic_info(self, searchid): + return f"{self.endpoint}/mosaic/{searchid}/info" + + def url_for_mosaic_lat_lon_assets(self, searchid, lon, lat): + return f"{self.endpoint}/mosaic/{searchid}/{lon},{lat}/assets" + + +def check_titiler_endpoint(titiler_endpoint=None): + """Returns the default titiler endpoint. + + Returns: + object: A titiler endpoint. + """ + if titiler_endpoint is None: + if os.environ.get("TITILER_ENDPOINT") is not None: + titiler_endpoint = os.environ.get("TITILER_ENDPOINT") + + if titiler_endpoint == "planetary-computer": + titiler_endpoint = PlanetaryComputerEndpoint() + else: + titiler_endpoint = "https://titiler.xyz" + elif titiler_endpoint in ["planetary-computer", "pc"]: + titiler_endpoint = PlanetaryComputerEndpoint() + + return titiler_endpoint + + +def cog_tile(url, bands=None, titiler_endpoint=None, **kwargs): + """Get a tile layer from a Cloud Optimized GeoTIFF (COG). + Source code adapted from https://developmentseed.org/titiler/examples/notebooks/Working_with_CloudOptimizedGeoTIFF_simple/ + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + bands (list, optional): List of bands to use. Defaults to None. + titiler_endpoint (str, optional): TiTiler endpoint. Defaults to "https://titiler.xyz". + **kwargs: Additional arguments to pass to the titiler endpoint. For more information about the available arguments, see https://developmentseed.org/titiler/endpoints/cog/#tiles. + For example, to apply a rescaling to multiple bands, use something like `rescale=["164,223","130,211","99,212"]`. + + Returns: + tuple: Returns the COG Tile layer URL and bounds. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + + kwargs["url"] = url + + band_names = cog_bands(url, titiler_endpoint) + + if isinstance(bands, str): + bands = [bands] + + if bands is None and "bidx" not in kwargs: + if len(band_names) >= 3: + kwargs["bidx"] = [1, 2, 3] + elif isinstance(bands, list) and "bidx" not in kwargs: + if all(isinstance(x, int) for x in bands): + if len(set(bands)) == 1: + bands = bands[0] + kwargs["bidx"] = bands + elif all(isinstance(x, str) for x in bands): + if len(set(bands)) == 1: + bands = bands[0] + kwargs["bidx"] = [band_names.index(x) + 1 for x in bands] + else: + raise ValueError("Bands must be a list of integers or strings.") + + if "palette" in kwargs: + kwargs["colormap_name"] = kwargs["palette"].lower() + del kwargs["palette"] + + if "bidx" not in kwargs: + kwargs["bidx"] = [1] + elif isinstance(kwargs["bidx"], int): + kwargs["bidx"] = [kwargs["bidx"]] + + if "rescale" not in kwargs: + stats = cog_stats(url, titiler_endpoint) + + if "message" not in stats: + try: + rescale = [] + for i in band_names: + rescale.append( + "{},{}".format( + stats[i]["percentile_2"], + stats[i]["percentile_98"], + ) + ) + kwargs["rescale"] = rescale + except Exception as e: + print(e) + + TileMatrixSetId = "WebMercatorQuad" + if "TileMatrixSetId" in kwargs.keys(): + TileMatrixSetId = kwargs["TileMatrixSetId"] + kwargs.pop("TileMatrixSetId") + + r = requests.get( + f"{titiler_endpoint}/cog/{TileMatrixSetId}/tilejson.json", params=kwargs + ).json() + return r["tiles"][0] + + +def cog_tile_vmin_vmax( + url, bands=None, titiler_endpoint=None, percentile=True, **kwargs +): + """Get a tile layer from a Cloud Optimized GeoTIFF (COG) and return the minimum and maximum values. + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + bands (list, optional): List of bands to use. Defaults to None. + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + percentile (bool, optional): Whether to use percentiles or not. Defaults to True. + Returns: + tuple: Returns the minimum and maximum values. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + stats = cog_stats(url, titiler_endpoint) + + if isinstance(bands, str): + bands = [bands] + + if bands is not None: + stats = {s: stats[s] for s in stats if s in bands} + + if percentile: + vmin = min([stats[s]["percentile_2"] for s in stats]) + vmax = max([stats[s]["percentile_98"] for s in stats]) + else: + vmin = min([stats[s]["min"] for s in stats]) + vmax = max([stats[s]["max"] for s in stats]) + + return vmin, vmax + + +def cog_mosaic( + links, + titiler_endpoint=None, + username="anonymous", + layername=None, + overwrite=False, + verbose=True, + **kwargs, +): + """Creates a COG mosaic from a list of COG URLs. + + Args: + links (list): A list containing COG HTTP URLs. + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + username (str, optional): User name for the titiler endpoint. Defaults to "anonymous". + layername ([type], optional): Layer name to use. Defaults to None. + overwrite (bool, optional): Whether to overwrite the layer name if existing. Defaults to False. + verbose (bool, optional): Whether to print out descriptive information. Defaults to True. + + Raises: + Exception: If the COG mosaic fails to create. + + Returns: + str: The tile URL for the COG mosaic. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if layername is None: + layername = "layer_X" + + try: + if verbose: + print("Creating COG masaic ...") + + # Create token + r = requests.post( + f"{titiler_endpoint}/tokens/create", + json={"username": username, "scope": ["mosaic:read", "mosaic:create"]}, + ).json() + token = r["token"] + + # Create mosaic + requests.post( + f"{titiler_endpoint}/mosaicjson/create", + json={ + "username": username, + "layername": layername, + "files": links, + # "overwrite": overwrite + }, + params={ + "access_token": token, + }, + ).json() + + r2 = requests.get( + f"{titiler_endpoint}/mosaicjson/{username}.{layername}/tilejson.json", + ).json() + + return r2["tiles"][0] + + except Exception as e: + raise Exception(e) + + +def cog_mosaic_from_file( + filepath, + skip_rows=0, + titiler_endpoint=None, + username="anonymous", + layername=None, + overwrite=False, + verbose=True, + **kwargs, +): + """Creates a COG mosaic from a csv/txt file stored locally for through HTTP URL. + + Args: + filepath (str): Local path or HTTP URL to the csv/txt file containing COG URLs. + skip_rows (int, optional): The number of rows to skip in the file. Defaults to 0. + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + username (str, optional): User name for the titiler endpoint. Defaults to "anonymous". + layername ([type], optional): Layer name to use. Defaults to None. + overwrite (bool, optional): Whether to overwrite the layer name if existing. Defaults to False. + verbose (bool, optional): Whether to print out descriptive information. Defaults to True. + + Returns: + str: The tile URL for the COG mosaic. + """ + import urllib + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + links = [] + if filepath.startswith("http"): + data = urllib.request.urlopen(filepath) + for line in data: + links.append(line.decode("utf-8").strip()) + + else: + with open(filepath) as f: + links = [line.strip() for line in f.readlines()] + + links = links[skip_rows:] + # print(links) + mosaic = cog_mosaic( + links, titiler_endpoint, username, layername, overwrite, verbose, **kwargs + ) + return mosaic + + +def cog_bounds(url, titiler_endpoint=None): + """Get the bounding box of a Cloud Optimized GeoTIFF (COG). + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + + Returns: + list: A list of values representing [left, bottom, right, top] + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + r = requests.get(f"{titiler_endpoint}/cog/bounds", params={"url": url}).json() + + if "bounds" in r.keys(): + bounds = r["bounds"] + else: + bounds = None + return bounds + + +def cog_center(url, titiler_endpoint=None): + """Get the centroid of a Cloud Optimized GeoTIFF (COG). + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + + Returns: + tuple: A tuple representing (longitude, latitude) + """ + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + bounds = cog_bounds(url, titiler_endpoint) + center = ((bounds[0] + bounds[2]) / 2, (bounds[1] + bounds[3]) / 2) # (lat, lon) + return center + + +def cog_bands(url, titiler_endpoint=None): + """Get band names of a Cloud Optimized GeoTIFF (COG). + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + + Returns: + list: A list of band names + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + r = requests.get( + f"{titiler_endpoint}/cog/info", + params={ + "url": url, + }, + ).json() + + bands = [b[0] for b in r["band_descriptions"]] + return bands + + +def cog_stats(url, titiler_endpoint=None): + """Get band statistics of a Cloud Optimized GeoTIFF (COG). + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + + Returns: + list: A dictionary of band statistics. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + r = requests.get( + f"{titiler_endpoint}/cog/statistics", + params={ + "url": url, + }, + ).json() + + return r + + +def cog_info(url, titiler_endpoint=None, return_geojson=False): + """Get band statistics of a Cloud Optimized GeoTIFF (COG). + + Args: + url (str): HTTP URL to a COG, e.g., https://opendata.digitalglobe.com/events/mauritius-oil-spill/post-event/2020-08-12/105001001F1B5B00/105001001F1B5B00.tif + titiler_endpoint (str, optional): Titiler endpoint. Defaults to "https://titiler.xyz". + + Returns: + list: A dictionary of band info. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + info = "info" + if return_geojson: + info = "info.geojson" + + r = requests.get( + f"{titiler_endpoint}/cog/{info}", + params={ + "url": url, + }, + ).json() + + return r + + +def cog_pixel_value( + lon, + lat, + url, + bidx=None, + titiler_endpoint=None, + verbose=True, + **kwargs, +): + """Get pixel value from COG. + + Args: + lon (float): Longitude of the pixel. + lat (float): Latitude of the pixel. + url (str): HTTP URL to a COG, e.g., 'https://opendata.digitalglobe.com/events/california-fire-2020/pre-event/2018-02-16/pine-gulch-fire20/1030010076004E00.tif' + bidx (str, optional): Dataset band indexes (e.g bidx=1, bidx=1&bidx=2&bidx=3). Defaults to None. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + verbose (bool, optional): Print status messages. Defaults to True. + + Returns: + list: A dictionary of band info. + """ + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + kwargs["url"] = url + if bidx is not None: + kwargs["bidx"] = bidx + + r = requests.get(f"{titiler_endpoint}/cog/point/{lon},{lat}", params=kwargs).json() + bands = cog_bands(url, titiler_endpoint) + # if isinstance(titiler_endpoint, str): + # r = requests.get(f"{titiler_endpoint}/cog/point/{lon},{lat}", params=kwargs).json() + # else: + # r = requests.get( + # titiler_endpoint.url_for_stac_pixel_value(lon, lat), params=kwargs + # ).json() + + if "detail" in r: + if verbose: + print(r["detail"]) + return None + else: + values = r["values"] + result = dict(zip(bands, values)) + return result + + +def stac_tile( + url=None, + collection=None, + item=None, + assets=None, + bands=None, + titiler_endpoint=None, + **kwargs, +): + + """Get a tile layer from a single SpatialTemporal Asset Catalog (STAC) item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. + bands (list): A list of band names, e.g., ["SR_B7", "SR_B5", "SR_B4"] + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "https://planetarycomputer.microsoft.com/api/data/v1", "planetary-computer", "pc". Defaults to None. + + Returns: + str: Returns the STAC Tile layer URL. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + + if "palette" in kwargs: + kwargs["colormap_name"] = kwargs["palette"].lower() + del kwargs["palette"] + + if isinstance(bands, list) and len(set(bands)) == 1: + bands = bands[0] + + if isinstance(assets, list) and len(set(assets)) == 1: + assets = assets[0] + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + + if isinstance(titiler_endpoint, PlanetaryComputerEndpoint): + if isinstance(bands, str): + bands = bands.split(",") + if isinstance(assets, str): + assets = assets.split(",") + if assets is None and (bands is not None): + assets = bands + else: + kwargs["bidx"] = bands + + kwargs["assets"] = assets + + if ( + (assets is not None) + and ("asset_expression" not in kwargs) + and ("expression" not in kwargs) + and ("rescale" not in kwargs) + ): + stats = stac_stats( + collection=collection, + item=item, + assets=assets, + titiler_endpoint=titiler_endpoint, + ) + if "detail" not in stats: + + try: + percentile_2 = min([stats[s]["percentile_2"] for s in stats]) + percentile_98 = max([stats[s]["percentile_98"] for s in stats]) + except: + percentile_2 = min( + [ + stats[s][list(stats[s].keys())[0]]["percentile_2"] + for s in stats + ] + ) + percentile_98 = max( + [ + stats[s][list(stats[s].keys())[0]]["percentile_98"] + for s in stats + ] + ) + kwargs["rescale"] = f"{percentile_2},{percentile_98}" + else: + print(stats["detail"]) # When operation times out. + + else: + if isinstance(bands, str): + bands = bands.split(",") + if isinstance(assets, str): + assets = assets.split(",") + + if assets is None: + if bands is not None: + assets = bands + else: + bnames = stac_bands(url) + if len(bnames) >= 3: + assets = bnames[0:3] + else: + assets = bnames[0] + else: + kwargs["asset_bidx"] = bands + kwargs["assets"] = assets + + TileMatrixSetId = "WebMercatorQuad" + if "TileMatrixSetId" in kwargs.keys(): + TileMatrixSetId = kwargs["TileMatrixSetId"] + kwargs.pop("TileMatrixSetId") + + if isinstance(titiler_endpoint, str): + r = requests.get( + f"{titiler_endpoint}/stac/{TileMatrixSetId}/tilejson.json", + params=kwargs, + ).json() + else: + r = requests.get(titiler_endpoint.url_for_stac_item(), params=kwargs).json() + + return r["tiles"][0] + + +def stac_bounds(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): + """Get the bounding box of a single SpatialTemporal Asset Catalog (STAC) item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A list of values representing [left, bottom, right, top] + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/bounds", params=kwargs).json() + else: + r = requests.get(titiler_endpoint.url_for_stac_bounds(), params=kwargs).json() + + bounds = r["bounds"] + return bounds + + +def stac_center(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): + """Get the centroid of a single SpatialTemporal Asset Catalog (STAC) item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + tuple: A tuple representing (longitude, latitude) + """ + bounds = stac_bounds(url, collection, item, titiler_endpoint, **kwargs) + center = ((bounds[0] + bounds[2]) / 2, (bounds[1] + bounds[3]) / 2) # (lon, lat) + return center + + +def stac_bands(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): + """Get band names of a single SpatialTemporal Asset Catalog (STAC) item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A list of band names + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/assets", params=kwargs).json() + else: + r = requests.get(titiler_endpoint.url_for_stac_assets(), params=kwargs).json() + + return r + + +def stac_stats( + url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs +): + """Get band statistics of a STAC item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A dictionary of band statistics. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + if assets is not None: + kwargs["assets"] = assets + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/statistics", params=kwargs).json() + else: + r = requests.get( + titiler_endpoint.url_for_stac_statistics(), params=kwargs + ).json() + + return r + + +def stac_info( + url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs +): + """Get band info of a STAC item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A dictionary of band info. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + if assets is not None: + kwargs["assets"] = assets + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/info", params=kwargs).json() + else: + r = requests.get(titiler_endpoint.url_for_stac_info(), params=kwargs).json() + + return r + + +def stac_info_geojson( + url=None, collection=None, item=None, assets=None, titiler_endpoint=None, **kwargs +): + """Get band info of a STAC item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A dictionary of band info. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + if assets is not None: + kwargs["assets"] = assets + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/info.geojson", params=kwargs).json() + else: + r = requests.get( + titiler_endpoint.url_for_stac_info_geojson(), params=kwargs + ).json() + + return r + + +def stac_assets(url=None, collection=None, item=None, titiler_endpoint=None, **kwargs): + """Get all assets of a STAC item. + + Args: + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + + Returns: + list: A list of assets. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/assets", params=kwargs).json() + else: + r = requests.get(titiler_endpoint.url_for_stac_assets(), params=kwargs).json() + + return r + + +def stac_pixel_value( + lon, + lat, + url=None, + collection=None, + item=None, + assets=None, + titiler_endpoint=None, + verbose=True, + **kwargs, +): + """Get pixel value from STAC assets. + + Args: + lon (float): Longitude of the pixel. + lat (float): Latitude of the pixel. + url (str): HTTP URL to a STAC item, e.g., https://canada-spot-ortho.s3.amazonaws.com/canada_spot_orthoimages/canada_spot5_orthoimages/S5_2007/S5_11055_6057_20070622/S5_11055_6057_20070622.json + collection (str): The Microsoft Planetary Computer STAC collection ID, e.g., landsat-8-c2-l2. + item (str): The Microsoft Planetary Computer STAC item ID, e.g., LC08_L2SP_047027_20201204_02_T1. + assets (str | list): The Microsoft Planetary Computer STAC asset ID, e.g., ["SR_B7", "SR_B5", "SR_B4"]. + titiler_endpoint (str, optional): Titiler endpoint, e.g., "https://titiler.xyz", "planetary-computer", "pc". Defaults to None. + verbose (bool, optional): Print out the error message. Defaults to True. + + Returns: + list: A dictionary of pixel values for each asset. + """ + + if url is None and collection is None: + raise ValueError("Either url or collection must be specified.") + + if collection is not None and titiler_endpoint is None: + titiler_endpoint = "planetary-computer" + + if url is not None: + kwargs["url"] = url + if collection is not None: + kwargs["collection"] = collection + if item is not None: + kwargs["item"] = item + + if assets is None: + assets = stac_assets( + url=url, + collection=collection, + item=item, + titiler_endpoint=titiler_endpoint, + ) + assets = ",".join(assets) + kwargs["assets"] = assets + + titiler_endpoint = check_titiler_endpoint(titiler_endpoint) + if isinstance(titiler_endpoint, str): + r = requests.get(f"{titiler_endpoint}/stac/{lon},{lat}", params=kwargs).json() + else: + r = requests.get( + titiler_endpoint.url_for_stac_pixel_value(lon, lat), params=kwargs + ).json() + + if "detail" in r: + if verbose: + print(r["detail"]) + return None + else: + values = [v[0] for v in r["values"]] + result = dict(zip(assets.split(","), values)) + return result + + +def stac_object_type(url): + """Get the STAC object type. + + Args: + url (str): The STAC object URL. + + Returns: + str: The STAC object type, can be catalog, collection, or item. + """ + try: + obj = pystac.STACObject.from_file(url) + + if isinstance(obj, pystac.Collection): + return "collection" + elif isinstance(obj, pystac.Item): + return "item" + elif isinstance(obj, pystac.Catalog): + return "catalog" + + except Exception as e: + print(e) + return None + + +def stac_root_link(url, return_col_id=False): + """Get the root link of a STAC object. + + Args: + url (str): The STAC object URL. + return_col_id (bool, optional): Return the collection ID if the STAC object is a collection. Defaults to False. + + Returns: + str: The root link of the STAC object. + """ + collection_id = None + try: + obj = pystac.STACObject.from_file(url) + if isinstance(obj, pystac.Collection): + collection_id = obj.id + href = obj.get_root_link().get_href() + + if return_col_id: + return href, collection_id + else: + return href + + except Exception as e: + print(e) + return None + + +def stac_client( + url, headers=None, parameters=None, ignore_conformance=False, modifier=None, return_col_id=False +): + """Get the STAC client. It wraps the pystac.Client.open() method. See + https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open + + Args: + url (str): The URL of a STAC Catalog. + headers (dict, optional): A dictionary of additional headers to use in all requests + made to any part of this Catalog/API. Defaults to None. + parameters (dict, optional): Optional dictionary of query string parameters to include in all requests. + Defaults to None. + ignore_conformance (bool, optional): Ignore any advertised Conformance Classes in this Catalog/API. + This means that functions will skip checking conformance, and may throw an unknown error + if that feature is not supported, rather than a NotImplementedError. Defaults to False. + modifier (function, optional): A callable that modifies the children collection and items + returned by this Client. This can be useful for injecting authentication parameters + into child assets to access data from non-public sources. Defaults to None. + return_col_id (bool, optional): Return the collection ID. Defaults to False. + + Returns: + pystac.Client: The STAC client. + """ + from pystac_client import Client + + collection_id = None + + try: + root = stac_root_link(url, return_col_id=return_col_id) + + if return_col_id: + client = Client.open(root[0], headers, parameters, ignore_conformance, modifier) + collection_id = root[1] + return client, collection_id + else: + client = Client.open(root, headers, parameters, ignore_conformance, modifier) + return client + + except Exception as e: + print(e) + return None + + +def stac_collections(url, return_ids=False): + + """Get the collection IDs of a STAC catalog. + + Args: + url (str): The STAC catalog URL. + return_ids (bool, optional): Return collection IDs. Defaults to False. + + Returns: + list: A list of collection IDs. + """ + try: + client = stac_client(url) + collections = client.get_all_collections() + + if return_ids: + + return [c.id for c in collections] + else: + return collections + + except Exception as e: + print(e) + return None + + +def stac_search( + url, + method="POST", + max_items=None, + limit=100, + ids=None, + collections=None, + bbox=None, + intersects=None, + datetime=None, + query=None, + filter=None, + filter_lang=None, + sortby=None, + fields=None, + get_item_col=False, + get_items=False, + get_links=False, + get_gdf=False, + get_info=False, + **kwargs, +): + """Search a STAC API. The function wraps the pysatc_client.Client.search() method. See + https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.search + + Args: + url (str): The STAC API URL. + method (str, optional): The HTTP method to use when making a request to the service. + This must be either "GET", "POST", or None. If None, this will default to "POST". + If a "POST" request receives a 405 status for the response, it will automatically + retry with "GET" for all subsequent requests. Defaults to "POST". + max_items (init, optional): The maximum number of items to return from the search, + even if there are more matching results. This client to limit the total number of + Items returned from the items(), item_collections(), and items_as_dicts methods(). + The client will continue to request pages of items until the number of max items + is reached. This parameter defaults to 100. Setting this to None will allow iteration + over a possibly very large number of results.. Defaults to None. + limit (int, optional): A recommendation to the service as to the number of items to + return per page of results. Defaults to 100. + ids (list, optional): List of one or more Item ids to filter on. Defaults to None. + collections (list, optional): List of one or more Collection IDs or pystac.Collection instances. + Only Items in one of the provided Collections will be searched. Defaults to None. + bbox (list | tuple, optional): A list, tuple, or iterator representing a bounding box of 2D + or 3D coordinates. Results will be filtered to only those intersecting the bounding box. + Defaults to None. + intersects (str | dict, optional): A string or dictionary representing a GeoJSON geometry, or + an object that implements a __geo_interface__ property, as supported by several + libraries including Shapely, ArcPy, PySAL, and geojson. Results filtered to only + those intersecting the geometry. Defaults to None. + datetime (str, optional): Either a single datetime or datetime range used to filter results. + You may express a single datetime using a datetime.datetime instance, a RFC 3339-compliant + timestamp, or a simple date string (see below). Instances of datetime.datetime may be either + timezone aware or unaware. Timezone aware instances will be converted to a UTC timestamp + before being passed to the endpoint. Timezone unaware instances are assumed to represent + UTC timestamps. You may represent a datetime range using a "/" separated string as described + in the spec, or a list, tuple, or iterator of 2 timestamps or datetime instances. + For open-ended ranges, use either ".." ('2020-01-01:00:00:00Z/..', ['2020-01-01:00:00:00Z', '..']) + or a value of None (['2020-01-01:00:00:00Z', None]). If using a simple date string, + the datetime can be specified in YYYY-mm-dd format, optionally truncating to + YYYY-mm or just YYYY. Simple date strings will be expanded to include the entire + time period. Defaults to None. + query (list, optional): List or JSON of query parameters as per the STAC API query extension. + such as {"eo:cloud_cover":{"lt":10}}. Defaults to None. + filter (dict, optional): JSON of query parameters as per the STAC API filter extension. Defaults to None. + filter_lang (str, optional): Language variant used in the filter body. If filter is a dictionary + or not provided, defaults to ‘cql2-json’. If filter is a string, defaults to cql2-text. Defaults to None. + sortby (str | list, optional): A single field or list of fields to sort the response by. + such as [{ 'field': 'properties.eo:cloud_cover', 'direction': 'asc' }]. Defaults to None. + fields (list, optional): A list of fields to include in the response. Note this may result in + invalid STAC objects, as they may not have required fields. Use items_as_dicts to avoid object + unmarshalling errors. Defaults to None. + get_item_col (bool, optional): True to return a pystac.ItemCollection. Defaults to False. + get_items (bool, optional): True to return a list of pystac.Item. Defaults to False. + get_links (bool, optional): True to return a list of links. Defaults to False. + get_gdf (bool, optional): True to return a GeoDataFrame. Defaults to False. + **kwargs: Additional keyword arguments to pass to the stac_client() function. + + Returns: + list | pystac.ItemCollection : The search results as a list of links or a pystac.ItemCollection. + """ + + client, collection_id = stac_client(url, return_col_id=True, **kwargs) + + if client is None: + return None + else: + + if isinstance(intersects, dict) and "geometry" in intersects: + intersects = intersects["geometry"] + + if collection_id is not None and collections is None: + collections = [collection_id] + + search = client.search( + method=method, + max_items=max_items, + limit=limit, + ids=ids, + collections=collections, + bbox=bbox, + intersects=intersects, + datetime=datetime, + query=query, + filter=filter, + filter_lang=filter_lang, + sortby=sortby, + fields=fields, + ) + + if get_item_col: + return search.item_collection() + elif get_items: + return list(search.item_collection()) + elif get_links: + return [item.get_self_href() for item in search.items()] + elif get_gdf: + import geopandas as gpd + + gdf = gpd.GeoDataFrame.from_features( + search.item_collection().to_dict(), crs="EPSG:4326" + ) + return gdf + elif get_info: + items = list(search.item_collection()) + info = {} + for item in items: + info[item.id] = {'id': item.id, 'href': item.get_self_href(), 'bands': list(item.get_assets().keys()), 'assets': item.get_assets()} + return info + else: + return search + + +def download_data_catalogs(out_dir=None, quiet=True, overwrite=False): + """Download geospatial data catalogs from https://github.com/giswqs/geospatial-data-catalogs. + + Args: + out_dir (str, optional): The output directory. Defaults to None. + quiet (bool, optional): Whether to suppress the download progress bar. Defaults to True. + overwrite (bool, optional): Whether to overwrite the existing data catalog. Defaults to False. + + Returns: + str: The path to the downloaded data catalog. + """ + import tempfile + import gdown + import zipfile + + if out_dir is None: + out_dir = tempfile.gettempdir() + elif not os.path.exists(out_dir): + os.makedirs(out_dir) + + url = "https://github.com/giswqs/geospatial-data-catalogs/archive/refs/heads/master.zip" + + out_file = os.path.join(out_dir, "geospatial-data-catalogs.zip") + work_dir = os.path.join(out_dir, "geospatial-data-catalogs-master") + + if os.path.exists(work_dir) and not overwrite: + return work_dir + else: + + gdown.download(url, out_file, quiet=quiet) + with zipfile.ZipFile(out_file, "r") as zip_ref: + zip_ref.extractall(out_dir) + return work_dir + + +def set_default_bands(bands): + + if len(bands) == 0: + return [None, None, None] + + if isinstance(bands, str): + bands = [bands] + + if not isinstance(bands, list): + raise ValueError("bands must be a list or a string.") + + if (set(['nir', 'red', 'green']) <= set(bands)): + return ['nir', 'red', 'green'] + elif (set(['red', 'green', 'blue']) <= set(bands)): + return ['red', 'green', 'blue'] + elif (set(["B3", "B2", "B1"]) <= set(bands)): + return ["B3", "B2", "B1"] + elif len(bands) < 3: + return bands[0] * 3 + else: + return bands[:3] + \ No newline at end of file diff --git a/leafmap/toolbar.py b/leafmap/toolbar.py index 568600286a..e50266dd2a 100644 --- a/leafmap/toolbar.py +++ b/leafmap/toolbar.py @@ -4439,12 +4439,61 @@ def stac_gui(m=None): ipywidgets: The tool GUI widget. """ from .pc import get_pc_collection_list + import pandas as pd widget_width = "450px" padding = "0px 0px 0px 5px" # upper, right, bottom, left style = {"description_width": "initial"} + MAX_ITEMS = 20 + if "MAX_ITEMS" in os.environ: + MAX_ITEMS = int(os.environ["MAX_ITEMS"]) + + catalog_path = download_data_catalogs() + aws_open_data_path = os.path.join(catalog_path, "aws_stac_catalogs.tsv") + gee_path = os.path.join(catalog_path, "gee_catalog.tsv") + pc_path = os.path.join(catalog_path, "pc_catalog.tsv") + nasa_path = os.path.join(catalog_path, "nasa_cmr_catalog.tsv") + stac_index_path = os.path.join(catalog_path, "stac_catalogs.tsv") + stac_data = [] + + stac_info = { + "AWS Open Data": { + "filename": aws_open_data_path, + "name": "Name", + "url": "Endpoint", + "description": "Description", + }, + "Google Earth Engine": { + "filename": gee_path, + "name": "id", + "url": "url", + "description": "title", + }, + "Microsoft Planetary Computer": { + "filename": pc_path, + "name": "title", + "url": "link", + "description": "description", + }, + "NASA Common Metadata Repository": { + "filename": nasa_path, + "name": "id", + "url": "url", + "description": "title", + }, + "STAC Index Catalogs": { + "filename": stac_index_path, + "name": "title", + "url": "url", + "description": "summary", + }, + } - output = widgets.Output(layout=widgets.Layout(width=widget_width, padding=padding)) + connections = list(stac_info.keys()) + + output = widgets.Output( + layout=widgets.Layout(width=widget_width, padding=padding, overflow="auto") + ) toolbar_button = widgets.ToggleButton( value=False, @@ -4461,10 +4510,36 @@ def stac_gui(m=None): layout=widgets.Layout(height="28px", width="28px", padding="0px 0px 0px 4px"), ) + connection = widgets.Dropdown( + options=connections, + value="AWS Open Data", + description="Catalog:", + style=style, + layout=widgets.Layout(width="454px", padding=padding), + ) + + df = pd.read_csv(stac_info[connection.value]["filename"], sep="\t") + datasets = df[stac_info[connection.value]["name"]].tolist() + + dataset = widgets.Dropdown( + options=datasets, + value="Sentinel-2 Cloud-Optimized GeoTIFFs", + description="Dataset:", + style=style, + layout=widgets.Layout(width="454px", padding=padding), + ) + + description = widgets.Text( + value="Sentinel-2 Level 2A scenes and metadata", + description="Description:", + style=style, + layout=widgets.Layout(width="454px", padding=padding), + ) + http_url = widgets.Text( - value="https://planetarycomputer.microsoft.com/api/stac/v1", - description="Catalog URL:", - tooltip="Enter an http URL to the STAC Catalog", + value="https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a", + description="URL:", + tooltip="STAC Catalog URL", style=style, layout=widgets.Layout(width="454px", padding=padding), ) @@ -4490,27 +4565,15 @@ def stac_gui(m=None): layout=widgets.Layout(width="454px", padding=padding), ) - col_name = collection.value.split(" - ")[0].strip() - band_names = get_pc_inventory()[col_name]["bands"] - # red.options = band_names - # green.options = band_names - # blue.options = band_names + band_names = ["red", "green", "blue"] item = widgets.Dropdown( - options=["LC08_L2SP_047027_20201204_02_T1"], + options=[], description="Item:", style=style, layout=widgets.Layout(width="454px", padding=padding), ) - # assets = widgets.Text( - # value=None, - # description="Bands:", - # tooltip="STAC Asset ID", - # style=style, - # layout=widgets.Layout(width="454px", padding=padding), - # ) - layer_name = widgets.Text( value="STAC Layer", description="Layer name:", @@ -4521,8 +4584,8 @@ def stac_gui(m=None): band_width = "149px" red = widgets.Dropdown( - value="SR_B5", options=band_names, + value="red", description="Red:", tooltip="Select a band for the red channel", style=style, @@ -4530,8 +4593,8 @@ def stac_gui(m=None): ) green = widgets.Dropdown( - value="SR_B4", options=band_names, + value="green", description="Green:", tooltip="Select a band for the green channel", style=style, @@ -4539,8 +4602,8 @@ def stac_gui(m=None): ) blue = widgets.Dropdown( - value="SR_B3", options=band_names, + value="blue", description="Blue:", tooltip="Select a band for the blue channel", style=style, @@ -4571,9 +4634,7 @@ def stac_gui(m=None): layout=widgets.Layout(width="150px", padding=padding), ) - # local_tile_palettes = list_palettes(add_extra=True) palette_options = list_palettes(lowercase=True) - # palette_options = local_tile_palettes palette = widgets.Dropdown( options=palette_options, value=None, @@ -4600,21 +4661,23 @@ def stac_gui(m=None): def reset_options(reset_url=True): """Reset the options to their default values.""" - if reset_url: - http_url.value = "https://planetarycomputer.microsoft.com/api/stac/v1" + connection.value = "AWS Open Data" + dataset.options = datasets + dataset.value = "Sentinel-2 Cloud-Optimized GeoTIFFs" + http_url.value = ( + "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a" + ) start_date.value = None end_date.value = None - collection.options = [] - collection.value = None item.options = [] item.value = None layer_name.value = "" - red.options = [] - green.options = [] - blue.options = [] - red.value = None - green.value = None - blue.value = None + red.options = ["red", "green", "blue"] + green.options = ["red", "green", "blue"] + blue.options = ["red", "green", "blue"] + red.value = "red" + green.value = "green" + blue.value = "blue" vmin.value = "" vmax.value = "" nodata.value = "" @@ -4622,26 +4685,18 @@ def reset_options(reset_url=True): add_params.value = "" output.clear_output() - with output: - col_name = collection.value.split(" - ")[0].strip() - band_names = get_pc_inventory()[col_name]["bands"] - red.options = band_names - green.options = band_names - blue.options = band_names - - params_widget = widgets.HBox() + params_widget = widgets.VBox() raster_options = widgets.VBox() raster_options.children = [ widgets.HBox([red, green, blue]), - widgets.HBox([vmin, vmax, nodata]), widgets.HBox([palette, checkbox]), params_widget, ] buttons = widgets.ToggleButtons( value=None, - options=["Collections", "Items", "Display", "Reset", "Close"], + options=["Search", "Display", "Reset", "Close"], tooltips=["Get Collections", "Get Items", "Display Image", "Reset", "Close"], button_style="primary", ) @@ -4653,11 +4708,12 @@ def reset_options(reset_url=True): toolbar_header.children = [close_button, toolbar_button] toolbar_footer = widgets.VBox() toolbar_footer.children = [ + connection, + dataset, + description, http_url, widgets.HBox([start_date, end_date]), - collection, item, - layer_name, raster_options, buttons, output, @@ -4667,53 +4723,77 @@ def reset_options(reset_url=True): source=toolbar_widget, watched_events=["mouseenter", "mouseleave"] ) - def checkbox_changed(change): - if change["new"]: - params_widget.children = [add_params] + def update_bands(): + + if len(stac_data) > 0: + bnames = stac_data[0][item.value]["bands"] else: - params_widget.children = [] + bnames = [] - checkbox.observe(checkbox_changed, names="value") + red.options = bnames + green.options = bnames + blue.options = bnames + + default_bands = set_default_bands(bnames) + try: + red.value = default_bands[0] + green.value = default_bands[1] + blue.value = default_bands[2] + except Exception as e: + red.value = None + green.value = None + blue.value = None - def url_changed(change): - if change["new"] or http_url.value == "": - reset_options(reset_url=False) + def connection_changed(change): + if change["new"]: - http_url.observe(url_changed, names="value") + df = pd.read_csv(stac_info[connection.value]["filename"], sep="\t") + datasets = df[stac_info[connection.value]["name"]].tolist() + dataset.options = datasets + dataset.value = datasets[0] - def collection_changed(change): + connection.observe(connection_changed, names="value") + def dataset_changed(change): if change["new"]: + df = pd.read_csv(stac_info[connection.value]["filename"], sep="\t") + df = df[df[stac_info[connection.value]["name"]] == dataset.value] + description.value = df[stac_info[connection.value]["description"]].tolist()[ + 0 + ] + http_url.value = df[stac_info[connection.value]["url"]].tolist()[0] + item.options = [] + stac_data.clear() + update_bands() + + dataset.observe(dataset_changed, names="value") + + def item_changed(change): + if change["new"]: + layer_name.value = item.value with output: - if not hasattr(m, "pc_inventory"): - setattr(m, "pc_inventory", get_pc_inventory()) - col_name = change["new"].split(" - ")[0] - first_item = m.pc_inventory[col_name]["first_item"] - item.options = [first_item] - band_names = m.pc_inventory[col_name]["bands"] - red.options = band_names - green.options = band_names - blue.options = band_names - - if change["new"] == "landsat-8-c2-l2 - Landsat 8 Collection 2 Level-2": - red.value = "SR_B7" - green.value = "SR_B5" - blue.value = "SR_B4" - elif change["new"] == "sentinel-2-l2a - Sentinel-2 Level-2A": - red.value = "B08" - green.value = "B04" - blue.value = "B03" - else: - if len(band_names) > 2: - red.value = band_names[0] - green.value = band_names[1] - blue.value = band_names[2] - else: - red.value = band_names[0] - green.value = band_names[0] - blue.value = band_names[0] + update_bands() - collection.observe(collection_changed, names="value") + if dataset.value == "Sentinel-2 Cloud-Optimized GeoTIFFs": + vmin.value = "0" + vmax.value = "3000" + else: + vmin.value = "" + vmax.value = "" + + item.observe(item_changed, names="value") + + def checkbox_changed(change): + if change["new"]: + params_widget.children = [ + layer_name, + widgets.HBox([vmin, vmax, nodata]), + add_params, + ] + else: + params_widget.children = [] + + checkbox.observe(checkbox_changed, names="value") def handle_toolbar_event(event): @@ -4751,27 +4831,10 @@ def close_btn_click(change): def button_clicked(change): - if change["new"] == "Collections": - with output: - output.clear_output() - if http_url.value.startswith("http"): - if ( - http_url.value - == "https://planetarycomputer.microsoft.com/api/stac/v1" - ): - collection.options = get_pc_collection_list() - else: - print("Retrieving collections...") - collection.options = [ - x[0] for x in get_stac_collections(http_url.value) - ] - output.clear_output() - else: - print("Please enter a valid URL.") - elif change["new"] == "Items": + if change["new"] == "Search": with output: output.clear_output() - if collection.value is not None: + if http_url.value is not None: if start_date.value is not None and end_date.value is not None: datetime = str(start_date.value) + "/" + str(end_date.value) elif start_date.value is not None: @@ -4781,53 +4844,36 @@ def button_clicked(change): else: datetime = None - col_name = collection.value.split(" - ")[0].strip() - if m.user_roi is not None: + if m is not None and m.user_roi is not None: intersects = m.user_roi["geometry"] else: - print("Please draw a polygon to be used as an AOI.") - print( - "Since no AOI is specified, using the default sample AOI." - ) - intersects = { - "type": "Polygon", - "coordinates": [ - [ - [-122.27508544921875, 47.54687159892238], - [-121.96128845214844, 47.54687159892238], - [-121.96128845214844, 47.745787772920934], - [-122.27508544921875, 47.745787772920934], - [-122.27508544921875, 47.54687159892238], - ] - ], - } - print("Retrieving items...") + intersects = None - gdf = get_stac_items( - http_url.value, - col_name, - datetime=datetime, + print("Retrieving items...") + try: + search = stac_search( + url=http_url.value, + max_items=MAX_ITEMS, intersects=intersects, + datetime=datetime, + get_info=True, ) - if gdf is not None: - item.options = gdf["id"].tolist() - if not hasattr(m, "layers_control"): - layers_control = m.add_control( - ipyleaflet.LayersControl(position="topright") - ) - setattr(m, "layers_control", layers_control) - m.add_gdf(gdf, "Image footprints", style={"fill": False}) + item.options = list(search.keys()) + + stac_data.clear() + stac_data.append(search) + update_bands() output.clear_output() - print(f"{len(item.options)} items found.") - else: - print("Please select a valid collection.") + + except Exception as e: + print(e) elif change["new"] == "Display": with output: output.clear_output() - if red.value is not None: + if item.value and m is not None: print("Loading data...") @@ -4864,16 +4910,23 @@ def button_clicked(change): assets = red.value else: assets = f"{red.value},{green.value},{blue.value}" - m.add_stac_layer( - collection=col, - item=item.value, - assets=assets, - name=layer_name.value, - **vis_params, - ) - output.clear_output() + + try: + + m.add_stac_layer( + url=stac_data[0][item.value]["href"], + item=item.value, + assets=assets, + name=layer_name.value, + **vis_params, + ) + m.stac_data = stac_data[0][item.value] + output.clear_output() + except Exception as e: + print(e) + else: - print("Please select at least one band.") + print("Please click on the search button first.") buttons.value = None elif change["new"] == "Reset":