Skip to content

Commit

Permalink
Connect analytics service (#33)
Browse files Browse the repository at this point in the history
* Check response status when posting something to the broker via K_SINK

* Make analytics service receive CloudEvents

* Create feedback trigger for analytics service

* Make analytics service consume only prediction events from predictions-broker
  • Loading branch information
aliok authored Oct 4, 2023
1 parent 27aa2c3 commit a5347b6
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 29 deletions.
16 changes: 16 additions & 0 deletions infra/openshift-manifests/analytics-service/feedbacks-trigger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: analytics-service-feedbacks-trigger
namespace: ai-demo
spec:
broker: analytics-broker
filter:
attributes:
type: com.knative.dev.feedback.event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: analytics-service
uri: /feedbacks
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ resources:
- analytics-broker.yaml
- analytics-service.yaml
- predictions-trigger.yaml
- feedbacks-trigger.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: analytics-service
name: analytics-service-predictions-trigger
namespace: ai-demo
spec:
broker: predictions-broker
filter:
attributes:
type: com.knative.dev.prediction.event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
Expand Down
44 changes: 29 additions & 15 deletions services/analytics-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ python app.py

Test sending feedbacks:
```shell
curl -v -X POST -H "Content-Type: application/json" localhost:8086/feedbacks -d '{"uploadId": "xyz", "feedback": 1}'
curl -v "localhost:8086/feedbacks" \
-X POST \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: does.not.matter" \
-H "Content-Type: application/json" \
-H "Ce-Source: knative://foo.bar" \
-H "Ce-time: 2020-12-02T13:49:13.77Z" \
-H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \
-d '{"uploadId": "xyz", "feedback": 1}'
```

Check the DB:
Expand All @@ -58,7 +66,15 @@ docker exec -ti postgres psql postgresql://postgres:postgres@localhost:5432/ai-d

Test sending predictions:
```shell
curl -v -X POST -H "Content-Type: application/json" localhost:8086/predictions -d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}'
curl -v "localhost:8086/predictions" \
-X POST \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: does.not.matter" \
-H "Content-Type: application/json" \
-H "Ce-Source: knative://foo.bar" \
-H "Ce-time: 2020-12-02T13:49:13.77Z" \
-H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \
-d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}'
```

Check the DB:
Expand All @@ -84,19 +100,17 @@ Build the image:
docker build . -t ${DOCKER_REPO_OVERRIDE}/analytics-service
```


# Sending feedbacks

Run the image:
```shell
analytics_service_url=$(k get ksvc -n ai-demo analytics-service -o=jsonpath='{.status.url}')
curl -v -X POST -H "Content-Type: application/json" ${analytics_service_url}/feedbacks -d '{"uploadId": "xyz", "feedback": 1}'
docker run --rm \
-p 8086:8086 \
-e DB_HOST="192.168.2.160" \
-e DB_PORT="5432" \
-e DB_DATABASE="ai-demo" \
-e DB_USERNAME="postgres" \
-e DB_PASSWORD="postgres" \
-e MAX_ITEMS_IN_CACHES="1000000" \
-e CACHE_ITEM_TTL_IN_SECONDS="300" \
${DOCKER_REPO_OVERRIDE}/analytics-service
```

# Sending predictions

```shell
analytics_service_url=$(k get ksvc -n ai-demo analytics-service -o=jsonpath='{.status.url}')
curl -v -X POST -H "Content-Type: application/json" ${analytics_service_url}/predictions -d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}'
```


23 changes: 15 additions & 8 deletions services/analytics-service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import psycopg2
from flask import Flask, request
from cloudevents.http import from_http


def init_feedbacks_table():
Expand Down Expand Up @@ -56,27 +57,30 @@ def handler(signal, frame):

@app.route("/feedbacks", methods=["POST"])
def receive_feedbacks():
print(f"Received request")

# request body looks like this:
# {
# "feedback": <int>
# "uploadId": <string>
# }

print(f"Received request: {request.json}")
event = from_http(request.headers, request.get_data())

body = request.json
data = event.data

required_fields = ['feedback', 'uploadId']
for x in required_fields:
if x not in body:
if x not in data:
return f"'{x}' not in request", 400

cur = conn.cursor()

print(f"Inserting feedback {data['feedback']} for upload {data['uploadId']}")
cur.execute('INSERT INTO feedbacks (feedback, upload_id) '
'VALUES (%s, %s) '
'ON CONFLICT (upload_id) DO NOTHING',
(body['feedback'], body['uploadId']))
(data['feedback'], data['uploadId']))

conn.commit()
cur.close()
Expand All @@ -96,22 +100,25 @@ def receive_predictions():
# "y1": "0.556647"
# }

print(f"Received request: {request.json}")
print(f"Received request")

event = from_http(request.headers, request.get_data())

body = request.json
data = event.data

required_fields = ['probability', 'uploadId', 'x0', 'x1', 'y0', 'y1']
for x in required_fields:
if x not in body:
if x not in data:
return f"'{x}' not in request", 400

cur = conn.cursor()

print(f"Inserting prediction {data['probability']} for upload {data['uploadId']}")
cur.execute(
'INSERT INTO predictions (probability, upload_id, x0, x1, y0, y1) '
'VALUES (%s, %s, %s, %s, %s, %s) '
'ON CONFLICT (upload_id) DO NOTHING',
(body['probability'], body['uploadId'], body['x0'], body['x1'], body['y0'], body['y1']))
(data['probability'], data['uploadId'], data['x0'], data['x1'], data['y0'], data['y1']))

conn.commit()
cur.close()
Expand Down
8 changes: 6 additions & 2 deletions services/feedback-service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ def receive_feedback():
headers, body = to_binary(event)

try:
print(f"Sending event with payload {str(ce_data)} to {K_SINK}")
requests.post(K_SINK, data=body, headers=headers)
print(f"Sending event with payload {str(ce_data)} and headers {str(headers)} to {K_SINK}")
response = requests.post(K_SINK, data=body, headers=headers)
print(f"Received response with status code: {response.status_code}")
if response.status_code >= 300:
print(f"Failed to send event to {K_SINK} with status code: {response.status_code}")
return f"Failed to send event to {K_SINK}", 500
except Exception as e:
print(f"Failed to send event to {K_SINK} with error: {e}")
return f"Failed to send event to {K_SINK}", 500
Expand Down
5 changes: 4 additions & 1 deletion services/minio-webhook-source/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ def receive_event():

try:
print(f"Sending event for object {ce_data_records[0]['bucket']}/{ce_data_records[0]['object']} to {K_SINK}")
requests.post(K_SINK, data=body, headers=headers)
response = requests.post(K_SINK, data=body, headers=headers)
if response.status_code >= 300:
print(f"Failed to send event to {K_SINK} with status code: {response.status_code}")
return f"Failed to send event to {K_SINK}", 500
except Exception as e:
print(f"Failed to send event to {K_SINK} with error: {e}")
return f"Failed to send event to {K_SINK}", 500
Expand Down
7 changes: 5 additions & 2 deletions services/prediction-service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def prediction_request():
payload = {'instances': [image_np.tolist()]}

try:
call = requests.post(URL, json=payload)
response = requests.post(URL, json=payload)
if response.status_code >= 300:
print(f"Failed to call inference service for uploadId {upload_id} with status code: {response.status_code}")
return f"Failed to call inference service", 500
except Exception as e:
print(f"Failed to call inference service for uploadId {upload_id}")
print(e)
Expand All @@ -149,7 +152,7 @@ def prediction_request():
call_end_time = time.time()
print('Inference call took {} seconds'.format(call_end_time - call_start_time))

inference = call.json()
inference = response.json()

if "predictions" not in inference:
print(f"Failed to get predictions from inference service for uploadId {upload_id}")
Expand Down

0 comments on commit a5347b6

Please sign in to comment.