Skip to content

Commit

Permalink
Merge pull request #1 from geospatial-jeff/dev
Browse files Browse the repository at this point in the history
v0.2
  • Loading branch information
geospatial-jeff authored Jun 28, 2019
2 parents 73d16fd + 09d7351 commit 1abb4c1
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 36 deletions.
48 changes: 37 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ Use the [stac-updater CLI](stac_updater/cli.py) to build and deploy your service
stac-updater new-service
# Build AWS resources to update collection
stac-updater update-collection --root https://stac.com/landsat-8-l1/catalog.json
stac-updater update-collection --root https://stac.com/landsat-8-l1/catalog.json \
--path {landsat:path}/{landsat:row} \
--row {date}/{id}
# Modify kickoff event source to s3:ObjectCreated
stac-updater modify-kickoff --type s3 --bucket_name stac-updater-kickoff
Expand All @@ -26,20 +28,14 @@ stac-updater modify-kickoff --type s3 --bucket_name stac-updater-kickoff
stac-updater deploy
```

Once deployed, any STAC Item uploaded to the `stac-updater-kickoff` bucket will be ingested by the service and added to the `https://stac.com/landsat-8-l1/catalog.json` collection. Regardless of event source, the service expects the following JSON payload:

| Field Name | Type | Description | Example |
| ---------- | ----- | ----------- | ------- |
| stac_item | dict | **REQUIRED.** [STAC Item](https://github.com/radiantearth/stac-spec/tree/master/item-spec) to ingest into collection. | [link](https://github.com/radiantearth/stac-spec/blob/dev/item-spec/examples/sample-full.json) |
| path | str | String pattern indicating subcatalogs. Used by [sat-stac](https://github.com/sat-utils/sat-stac/blob/master/tutorial-1.ipynb#Views) to automatically build sub catalogs from item properties. | '${landsat:path}/${landsat:row}' |
| filename | str | String pattern indicating filename. Used by [sat-stac](https://github.com/sat-utils/sat-stac/blob/master/tutorial-1.ipynb#Views) to automatically build item filename from item properties.| '${date}/${id}' |
Once deployed, any STAC Item uploaded to the `stac-updater-kickoff` bucket will be ingested by the service and added to the `https://stac.com/landsat-8-l1/catalog.json` collection. Regardless of event source, the service expects the payload to contain a [STAC Item](https://github.com/radiantearth/stac-spec/tree/master/item-spec).

Each call to `update-collection` tells the services to update a single collection. Updating multiple collections within a single deployment is accomplished with multiple calls to `update-collection`. When updating multiple collections, the services uses a SNS fanout pattern to distribute messages across multiple queues (1 queue per collection).

![abc](docs/images/update-collection.png)

## SNS Notifications
You may additionally deploy a SNS topic which publishes messages whenever a STAC Item is succesfully uploaded to a collection.
You may deploy a SNS topic which publishes messages whenever a STAC Item is succesfully uploaded to a collection.

```
# Add SNS notification
Expand All @@ -60,8 +56,38 @@ Once deployed, end-users may subscribe to the newly created SNS topic to be noti

![abc](docs/images/sns-notifications.png)

## Logging
You may pipe CloudWatch logs to a deployed instance of AWS Elasticsearch service for monitoring and visualizing with kibana.

```
# Add ES logging
stac-updater add-logging --es_host xxxxxxxxxx.region.es.amazonaws.com
```

Logs are saved to the `stac_updater_logs_YYYYMMDD` index (a new index is created each day) with the following schema:

| Field Name | Type | Description | Example |
| ---------- | ----- | ----------- | ------- |
| id | string | Unique ID of the CloudWatch log event. | 34819275800 |
| timestamp | date | Date of the lambda invocation. | June 23rd 2019, 21:25:26.649 |
| BilledDuration | str | Time (ms) charged for execution. | 87 |
| CollectionName | str | Name of collection. | landsat8 |
| Duration | str | Runtime (ms) of the lambda function. | 442.49 |
| ItemCount | number | Number of STAC Items processed by the invocation. | 4 |
| ItemLinks | string array | URLs of STAC Items processed by the invocation. | ['https://stac.s3.amazonaws.com/landsat8/item.json'] |
| MemorySize | number | Memory limit of lambda function. | 1024 |
| MaxMemoryUsed | number | Maximum memory (MB) consumed by the lambda function. | 87 |
| RequestId | str | Unique request ID of the lambda invocation. | 87 |

The following image is a kibana time-series visualization showing number of lambda invocations binned into 15 second intervals after 200 STAC Items were pushed into the queue. Notice how lambda scales up to handle the initial burst of messages.

![es-logging-1](docs/images/es-logging-invokes.png)

It took 86 total invocations to process the 200 STAC Items.

![es-logging-2](docs/images/es-logging-summary.png)



# TODOS
- Add support for dynamic catalogs ([sat-api](https://github.com/sat-utils/sat-api), [staccato](https://github.com/boundlessgeo/staccato)).
- Add aggregator service for metrics/logging etc on batch jobs.
- Add SNS event source.
Binary file added docs/images/es-logging-invokes.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/es-logging-summary.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
click==7.0
elasticsearch>=6.0.0,<7.0.0
requests-aws4auth==0.9
sat-stac==0.1.3
PyYAML==5.1.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
requirements = [line.rstrip() for line in reqs]

setup(name="STAC Updater",
version='0.1',
version='0.2',
author='Jeff Albrecht',
author_email='geospatialjeff@gmail.com',
packages=find_packages(exclude=['package']),
Expand Down
49 changes: 45 additions & 4 deletions stac_updater/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@ def new_service():
@click.option('--root', '-r', type=str, required=True, help="URL of collection.")
@click.option('--long-poll/--short-poll', default=False, help="Enable long polling.")
@click.option('--concurrency', type=int, default=1, help="Sets lambda concurrency limit when polling the queue.")
def update_collection(root, long_poll, concurrency):
@click.option('--path', type=str, help="Pattern used by sat-stac to build sub-catalogs.")
@click.option('--filename', type=str, help="Pattern used by sat-stac to build item name.")
def update_collection(root, long_poll, concurrency, path, filename):
# Create a SQS queue for the collection
# Subscribe SQS queue to SNS topic with filter policy on collection name
# Configure lambda function and attach to SQS queue (use ENV variables to pass state)

name = Collection.open(root).id
filter_rule = {'collection': [name]}

pattern = re.compile('[\W_]+')
name = pattern.sub('', name)

with open(sls_config_path, 'r') as f:
# Using unsafe load to preserve type.
sls_config = yaml.unsafe_load(f)

aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency)
aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency, path, filename)
sls_config['resources']['Resources'].update(aws_resources['resources'])
sls_config['functions'].update(aws_resources['functions'])

Expand All @@ -45,14 +50,17 @@ def update_collection(root, long_poll, concurrency):

@stac_updater.command(name='modify-kickoff', short_help="modify event source of kickoff")
@click.option('--type', '-t', type=str, default='lambda', help="Type of event source used by kickoff.")
@click.option('--bucket_name', '-n', type=str, help="Required if type=='s3'; creates new bucket used by event source.")
def modify_kickoff(type, bucket_name):
@click.option('--bucket_name', type=str, help="Required if type=='s3'; defines name of bucket used by event source.")
@click.option('--topic_name', type=str, help="Required if type=='sns'; defines name of SNS topic used by event source.")
def modify_kickoff(type, bucket_name, topic_name):
func_name = 'kickoff'

if type == 's3':
kickoff_func = resources.lambda_s3_trigger(func_name, bucket_name)
elif type == 'lambda':
kickoff_func = resources.lambda_invoke(func_name)
elif type == 'sns':
kickoff_func = resources.lambda_sns_trigger(func_name, topic_name)
else:
raise ValueError("The `type` parameter must be one of ['s3', 'lambda'].")

Expand All @@ -64,6 +72,9 @@ def modify_kickoff(type, bucket_name):
sls_config = yaml.unsafe_load(f)
sls_config['functions']['kickoff'].update(kickoff_func)

if type == 'lambda' and 'events' in sls_config['functions']['kickoff']:
del(sls_config['functions']['kickoff']['events'])

with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)

Expand All @@ -87,6 +98,36 @@ def add_notifications(topic_name):
with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)

@stac_updater.command(name='add-logging', short_help="Pipe cloudwatch logs into elasticsearch.")
@click.option('--es_host', type=str, required=True, help="Domain name of elasticsearch instance.")
def add_logging(es_host):
# Add the ES_LOGGING lambda function (cloudwatch trigger).
# Add es_domain to ES_LOGGING lambda as environment variable.
# Update IAM permissions (es:*, arn:Aws:es:*)
with open(sls_config_path, 'r') as f:
sls_config = yaml.unsafe_load(f)

# Create lambda function
service_name = sls_config['custom']['service-name']
service_stage = sls_config['custom']['stage']
collection_names = [x.split('_')[0] for x in list(sls_config['functions']) if x not in ['kickoff', 'es_log_ingest']]
func = resources.lambda_cloudwatch_trigger("es_log_ingest", service_name, service_stage, collection_names)
func.update({'environment': {'ES_HOST': es_host}})
sls_config['functions'].update({'es_log_ingest': func})

# Expanding IAM role
if 'es:*' not in sls_config['provider']['iamRoleStatements'][0]['Action']:
sls_config['provider']['iamRoleStatements'][0]['Action'].append('es:*')
if 'arn:aws:es:*' not in sls_config['provider']['iamRoleStatements'][0]['Resource']:
sls_config['provider']['iamRoleStatements'][0]['Resource'].append('arn:aws:ecs:*')

with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)





@stac_updater.command(name='deploy', short_help="deploy service to aws")
def deploy():
subprocess.call("docker build . -t stac-updater:latest", shell=True)
Expand Down
58 changes: 44 additions & 14 deletions stac_updater/handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import json
import base64
import gzip

from satstac import Collection, Item
import boto3
from satstac import Collection, Item

from stac_updater import utils

Expand All @@ -13,7 +15,6 @@
REGION = os.getenv('REGION')
NOTIFICATION_TOPIC = os.getenv('NOTIFICATION_TOPIC')


def kickoff(event, context):
event_source = os.getenv('EVENT_SOURCE')

Expand All @@ -24,18 +25,22 @@ def kickoff(event, context):
content_object = s3_res.Object(bucket, key)
file_content = content_object.get()['Body'].read().decode('utf-8')
payload = json.loads(file_content)
elif event_source == "sns":
payload = json.loads(event['Records'][0]['Sns']['Message'])
else:
# Default is lambda
payload = event

print(payload)

try:
coll_name = payload['stac_item']['properties']['collection']
coll_name = payload['properties']['collection']
except KeyError:
coll_name = payload['stac_item']['collection']
coll_name = payload['collection']

sns_client.publish(
TopicArn=f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:newStacItemTopic",
Message=json.dumps(event),
Message=json.dumps(payload),
MessageAttributes={
'collection': {
'DataType': 'String',
Expand All @@ -44,26 +49,51 @@ def kickoff(event, context):
}
)


def update_collection(event, context):
collection_root = os.getenv('COLLECTION_ROOT')
path = os.getenv('PATH')
filename = os.getenv('FILENAME')

item_count = len(event['Records'])
stac_links = []

for record in event['Records']:
message = json.loads(record['body'])
stac_item = json.loads(record['body'])

print(stac_item)

col = Collection.open(collection_root)
kwargs = {'item': Item(message['stac_item'])}
if 'path' in message:
kwargs.update({'path': message['path']})
if 'filename' in message:
kwargs.update({'filename': message['filename']})
collection_name = col.id
kwargs = {'item': Item(stac_item)}
if path:
kwargs.update({'path': '$' + '/$'.join(path.split('/'))})
if filename:
kwargs.update({'filename': '$' + '/$'.join(filename.split('/'))})
print(kwargs)
col.add_item(**kwargs)
col.save()

stac_links.append(kwargs['item'].links('self')[0])

# Send message to SNS Topic if enabled
if NOTIFICATION_TOPIC:
kwargs = utils.stac_to_sns(message['stac_item'])
kwargs = utils.stac_to_sns(kwargs['item'].data)
kwargs.update({
'TopicArn': f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:{NOTIFICATION_TOPIC}"
})
sns_client.publish(**kwargs)
sns_client.publish(**kwargs)


print(f"LOGS CollectionName: {collection_name}\tItemCount: {item_count}\tItemLinks: {stac_links}")


def es_log_ingest(event, context):
from stac_updater import logging

cw_data = event['awslogs']['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)

# Index to ES
logging.index_logs(payload)
Loading

0 comments on commit 1abb4c1

Please sign in to comment.