Skip to content

Commit

Permalink
Merge pull request #84 from developmentseed/feat/stac-output
Browse files Browse the repository at this point in the history
Feat/stac output
  • Loading branch information
emileten authored May 7, 2024
2 parents 18fdde1 + 9eb77de commit 709b945
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 22 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pip install -e .

## Usage

### In a virtual environment

A command line interface is exposed with the package. For example, this code snippet will run a cut-down version of a "days above temperature" indicator and write the output to `$HOME/hazard_example` :

```
Expand All @@ -35,6 +37,23 @@ mkdir -p $HOME/hazard_example
os_climate_hazard days_tas_above_indicator --store $HOME/hazard_example
```

### In a docker container

First, build the image.

```
docker build -t os-hazard-indicator -f dockerfiles/Dockerfile .
```

Then, you can run an example the following way. In the example, we save the data locally to /data/hazard-test-container in the container. To have access to the output once the container finished running, we are mounting `/data` from the container to `$HOME/data` locally.

```
docker run -it -v $HOME/data:/data os-hazard-indicator os_climate_hazard days_tas_above_indicator --store /data/hazard-test-container
```

### In a CWL (Common Workflow Language) workflow


# Contributing

Patches may be contributed via pull requests from forks to
Expand Down
12 changes: 11 additions & 1 deletion src/hazard/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ def days_tas_above_indicator(
bucket: Optional[str] = None,
prefix: Optional[str] = None,
store: Optional[str] = None,
inventory_format: Optional[str] = "osc",
extra_xarray_store: Optional[bool] = False
):

hazard_services.days_tas_above_indicator(
gcm_list, scenario_list, threshold_list, central_year_list, window_years, bucket, prefix, store, extra_xarray_store
gcm_list,
scenario_list,
threshold_list,
central_year_list,
window_years,
bucket,
prefix,
store,
extra_xarray_store,
inventory_format,
)


Expand Down
45 changes: 32 additions & 13 deletions src/hazard/docs_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@

from hazard.sources.osc_zarr import default_dev_bucket

from .inventory import HazardResource


class HazardResources(BaseModel):
resources: List[HazardResource]
from .inventory import HazardResource, HazardResources, resource_from_stac_item_dict


class DocStore:
Expand Down Expand Up @@ -60,14 +56,18 @@ def __init__(
else:
self._root = str(PurePosixPath(bucket, prefix))

def read_inventory(self) -> List[HazardResource]:
def read_inventory(self, format="osc") -> List[HazardResource]:
"""Read inventory at path provided and return HazardResources."""
path = self._full_path_inventory()
if not self._fs.exists(path):
return []
json_str = self.read_inventory_json()
models = parse_obj_as(HazardResources, json.loads(json_str)).resources
return models
if format == "stac":
return [resource_from_stac_item_dict(item) for item in json.loads(json_str)["features"]]
elif format == "osc":
return parse_obj_as(HazardResources, json.loads(json_str)).resources
else:
raise ValueError(f'JSON inventory file format must be one of "osc" or "stac", but got {format}')

def read_inventory_json(self) -> str:
"""Read inventory at path provided and return JSON."""
Expand All @@ -81,24 +81,43 @@ def write_inventory_json(self, json_str: str):
with self._fs.open(path, "w") as f:
f.write(json_str)

def write_new_empty_inventory(self):
def write_new_empty_inventory(self, format="osc"):
"""Write inventory."""
path = self._full_path_inventory()
os.makedirs(os.path.dirname(path), exist_ok=True)
models = HazardResources(resources=[])
json_str = json.dumps(models.dict(), indent=4) # pretty print
if format == "stac":
models = HazardResources(resources=[]).to_stac_items()
json_str = json.dumps(models, indent=4) # pretty print
elif format == "osc":
models = HazardResources(resources=[])
json_str = json.dumps(models.dict(), indent=4) # pretty print
else:
raise ValueError(f'JSON inventory file format must be one of "osc" or "stac", but got {format}')

with self._fs.open(path, "w") as f:
f.write(json_str)

def update_inventory(self, resources: Iterable[HazardResource], remove_existing: bool = False):
def update_inventory(self, resources: Iterable[HazardResource], remove_existing: bool = False, format="osc"):
"""Add the hazard models provided to the inventory. If a model with the same key
(hazard type and id) exists, replace."""

# if format == stac, we do a round trip, stac -> osc -> stac.
path = self._full_path_inventory()
combined = {} if remove_existing else dict((i.key(), i) for i in self.read_inventory())
combined = {} if remove_existing else dict((i.key(), i) for i in self.read_inventory(format=format))
for resource in resources:
combined[resource.key()] = resource
models = HazardResources(resources=list(combined.values()))
json_str = json.dumps(models.dict(), indent=4) # pretty print

if format == "stac":
models = HazardResources(resources=list(combined.values())).to_stac_items(items_as_dicts=True)
json_str = json.dumps(models, indent=4)
elif format == "osc":
models = HazardResources(resources=list(combined.values()))
json_str = json.dumps(models.dict(), indent=4)
else:
raise ValueError(f'JSON inventory file format must be one of "osc" or "stac", but got {format}')

with self._fs.open(path, "w") as f:
f.write(json_str)

Expand Down
71 changes: 70 additions & 1 deletion src/hazard/inventory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import json
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import pystac
from pydantic import BaseModel, Field

# region HazardModel
Expand Down Expand Up @@ -39,6 +41,7 @@ class MapInfo(BaseModel):
[(-180.0, 85.0), (180.0, 85.0), (180.0, -85.0), (-180.0, -85.0)],
description="Bounds (top/left, top/right, bottom/right, bottom/left) as degrees. Note applied to map reprojected into Web Mercator CRS.", # noqa
)
bbox: List[float] = Field([-180.0, -85.0, 180.0, 85.0])
index_values: Optional[List[Any]] = Field(
None,
description="Index values to include in maps. If None, the last index value only is included.",
Expand Down Expand Up @@ -108,6 +111,72 @@ def key(self):
selects based on its own logic (e.g. selects a particular General Circulation Model)."""
return self.path

def to_stac_item(self, item_as_dict: bool = False) -> Union[pystac.Item, Dict]:
"""
converts hazard resource to a STAC item.
"""

osc_properties = self.model_dump()
osc_properties = {f"osc-hazard:{k}": osc_properties[k] for k in osc_properties.keys()}

asset = pystac.Asset(
href=self.path,
title="zarr directory",
description="directory containing indicators data as zarr arrays",
media_type=pystac.MediaType.ZARR,
roles=["data"],
)

link = pystac.Link(rel="collection", media_type="application/json", target="./collection.json")

stac_item = pystac.Item(
id=self.indicator_id,
geometry={"type": "Polygon", "coordinates": [self.map.bounds]},
bbox=self.map.bbox,
datetime=None,
start_datetime=datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc),
end_datetime=datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc),
properties=osc_properties,
collection="osc-hazard-indicators",
assets={"indicators": asset},
)

stac_item.add_link(link)

stac_item.validate()

if item_as_dict:
return stac_item.to_dict()
else:
return stac_item


class HazardResources(BaseModel):
resources: List[HazardResource]

def to_stac_items(self, items_as_dicts: bool = False) -> Dict[str, Union[str, pystac.Item, Dict]]:
"""
converts hazard resources to a list of STAC items.
"""
return {
"type": "FeatureCollection",
"features": [resource.to_stac_item(item_as_dict=items_as_dicts) for resource in self.resources],
}


def resource_from_stac_item_dict(stac_item: Dict) -> HazardResource:
"""
converts STAC item to HazardResource
"""

return HazardResource(
**{
k.replace("osc-hazard:", ""): stac_item["properties"][k]
for k in stac_item["properties"]
if "osc-hazard:" in k
}
)


def expand(item: str, key: str, param: str):
return item and item.replace("{" + key + "}", param)
Expand Down
1 change: 1 addition & 0 deletions src/hazard/models/days_tas_above.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _resource(self) -> HazardResource:
units="days/year",
),
bounds=[(-180.0, 85.0), (180.0, 85.0), (180.0, -60.0), (-180.0, -60.0)],
bbox=[-180.0, -60.0, 180.0, 85.0],
index_values=None,
path="days_tas_above_{temp_c}c_{gcm}_{scenario}_{year}_map",
source="map_array",
Expand Down
5 changes: 3 additions & 2 deletions src/hazard/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def days_tas_above_indicator(
bucket: Optional[str] = None,
prefix: Optional[str] = None,
store: Optional[str] = None,
extra_xarray_store: Optional[bool] = False
extra_xarray_store: Optional[bool] = False,
inventory_format: Optional[str] = "osc"
):
"""
Run the days_tas_above indicator generation for a list of models,scenarios, thresholds,
Expand Down Expand Up @@ -58,6 +59,6 @@ def days_tas_above_indicator(
central_years=central_year_list,
)

docs_store.update_inventory(model.inventory())
docs_store.update_inventory(model.inventory(), format=inventory_format)

model.run_all(source, target, client=client)
36 changes: 36 additions & 0 deletions src/inventories/hazard/stac/collection.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"stac_version": "1.0.0",
"type": "Collection",
"stac_extensions": [],
"id": "osc-hazard-indicators",
"title": "OSC hazard indicators",
"description": "A STAC collection of OS-C hazard indicators.",
"license": "CC-BY-4.0",
"extent": {
"spatial": {
"bbox": [[-180, -90, 180, 90]]
},
"temporal": {
"interval": [["1950-01-01T00:00:00Z", "2100-12-31T23:59:59Z"]]
}
},
"providers": [
{
"name": "UKRI",
"roles": ["producer"],
"url": "https://www.ukri.org/"
}
],
"links": [
{
"rel": "self",
"type": "application/json",
"href": "./collection.json"
},
{
"rel": "items",
"type": "application/geo+json",
"href": "./items.json"
}
]
}
10 changes: 5 additions & 5 deletions graph.cwl → workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ $graph:

requirements:
ResourceRequirement:
coresMax: 4
ramMax: 8192
coresMax: 2
ramMax: 4096

inputs:
gcm_list:
Expand All @@ -41,16 +41,16 @@ $graph:

requirements:
ResourceRequirement:
coresMax: 4
ramMax: 8192
coresMax: 2
ramMax: 4096
NetworkAccess:
networkAccess: true

hints:
DockerRequirement:
dockerPull: public.ecr.aws/c9k5s3u3/os-hazard-indicator

baseCommand: ["os_climate_hazard", "days_tas_above_indicator", "--store", "./indicator", "--"]
baseCommand: ["os_climate_hazard", "days_tas_above_indicator", "--inventory_format", "stac", "--store", "./indicator", "--"]

arguments: []

Expand Down

0 comments on commit 709b945

Please sign in to comment.