diff --git a/infra/openshift-manifests/analytics-service/feedbacks-trigger.yaml b/infra/openshift-manifests/analytics-service/feedbacks-trigger.yaml new file mode 100644 index 00000000..dc4b3321 --- /dev/null +++ b/infra/openshift-manifests/analytics-service/feedbacks-trigger.yaml @@ -0,0 +1,16 @@ +apiVersion: eventing.knative.dev/v1 +kind: Trigger +metadata: + name: analytics-service-feedbacks-trigger + namespace: ai-demo +spec: + broker: analytics-broker + filter: + attributes: + type: com.knative.dev.feedback.event + subscriber: + ref: + apiVersion: serving.knative.dev/v1 + kind: Service + name: analytics-service + uri: /feedbacks diff --git a/infra/openshift-manifests/analytics-service/kustomization.yaml b/infra/openshift-manifests/analytics-service/kustomization.yaml index abeebf57..5eb609d6 100644 --- a/infra/openshift-manifests/analytics-service/kustomization.yaml +++ b/infra/openshift-manifests/analytics-service/kustomization.yaml @@ -9,3 +9,4 @@ resources: - analytics-broker.yaml - analytics-service.yaml - predictions-trigger.yaml + - feedbacks-trigger.yaml diff --git a/infra/openshift-manifests/analytics-service/predictions-trigger.yaml b/infra/openshift-manifests/analytics-service/predictions-trigger.yaml index fa3cf4a4..ca8a84e6 100644 --- a/infra/openshift-manifests/analytics-service/predictions-trigger.yaml +++ b/infra/openshift-manifests/analytics-service/predictions-trigger.yaml @@ -1,10 +1,13 @@ apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: - name: analytics-service + name: analytics-service-predictions-trigger namespace: ai-demo spec: broker: predictions-broker + filter: + attributes: + type: com.knative.dev.prediction.event subscriber: ref: apiVersion: serving.knative.dev/v1 diff --git a/services/analytics-service/README.md b/services/analytics-service/README.md index 7c3ebb0c..abec7a58 100644 --- a/services/analytics-service/README.md +++ b/services/analytics-service/README.md @@ -43,7 +43,15 @@ python app.py Test sending feedbacks: ```shell -curl -v -X POST -H "Content-Type: application/json" localhost:8086/feedbacks -d '{"uploadId": "xyz", "feedback": 1}' +curl -v "localhost:8086/feedbacks" \ + -X POST \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: does.not.matter" \ + -H "Content-Type: application/json" \ + -H "Ce-Source: knative://foo.bar" \ + -H "Ce-time: 2020-12-02T13:49:13.77Z" \ + -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ + -d '{"uploadId": "xyz", "feedback": 1}' ``` Check the DB: @@ -58,7 +66,15 @@ docker exec -ti postgres psql postgresql://postgres:postgres@localhost:5432/ai-d Test sending predictions: ```shell -curl -v -X POST -H "Content-Type: application/json" localhost:8086/predictions -d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}' +curl -v "localhost:8086/predictions" \ + -X POST \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: does.not.matter" \ + -H "Content-Type: application/json" \ + -H "Ce-Source: knative://foo.bar" \ + -H "Ce-time: 2020-12-02T13:49:13.77Z" \ + -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ + -d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}' ``` Check the DB: @@ -84,19 +100,17 @@ Build the image: docker build . -t ${DOCKER_REPO_OVERRIDE}/analytics-service ``` - -# Sending feedbacks - +Run the image: ```shell -analytics_service_url=$(k get ksvc -n ai-demo analytics-service -o=jsonpath='{.status.url}') -curl -v -X POST -H "Content-Type: application/json" ${analytics_service_url}/feedbacks -d '{"uploadId": "xyz", "feedback": 1}' +docker run --rm \ +-p 8086:8086 \ +-e DB_HOST="192.168.2.160" \ +-e DB_PORT="5432" \ +-e DB_DATABASE="ai-demo" \ +-e DB_USERNAME="postgres" \ +-e DB_PASSWORD="postgres" \ +-e MAX_ITEMS_IN_CACHES="1000000" \ +-e CACHE_ITEM_TTL_IN_SECONDS="300" \ +${DOCKER_REPO_OVERRIDE}/analytics-service ``` -# Sending predictions - -```shell -analytics_service_url=$(k get ksvc -n ai-demo analytics-service -o=jsonpath='{.status.url}') -curl -v -X POST -H "Content-Type: application/json" ${analytics_service_url}/predictions -d '{"uploadId": "xyz", "probability": 0.999, "x0": 0.24543, "x1": 0.24543, "y0": 0.24543, "y1": 0.24543}' -``` - - diff --git a/services/analytics-service/app.py b/services/analytics-service/app.py index c9eacb8e..43788c6b 100644 --- a/services/analytics-service/app.py +++ b/services/analytics-service/app.py @@ -4,6 +4,7 @@ import psycopg2 from flask import Flask, request +from cloudevents.http import from_http def init_feedbacks_table(): @@ -56,27 +57,30 @@ def handler(signal, frame): @app.route("/feedbacks", methods=["POST"]) def receive_feedbacks(): + print(f"Received request") + # request body looks like this: # { # "feedback": # "uploadId": # } - print(f"Received request: {request.json}") + event = from_http(request.headers, request.get_data()) - body = request.json + data = event.data required_fields = ['feedback', 'uploadId'] for x in required_fields: - if x not in body: + if x not in data: return f"'{x}' not in request", 400 cur = conn.cursor() + print(f"Inserting feedback {data['feedback']} for upload {data['uploadId']}") cur.execute('INSERT INTO feedbacks (feedback, upload_id) ' 'VALUES (%s, %s) ' 'ON CONFLICT (upload_id) DO NOTHING', - (body['feedback'], body['uploadId'])) + (data['feedback'], data['uploadId'])) conn.commit() cur.close() @@ -96,22 +100,25 @@ def receive_predictions(): # "y1": "0.556647" # } - print(f"Received request: {request.json}") + print(f"Received request") + + event = from_http(request.headers, request.get_data()) - body = request.json + data = event.data required_fields = ['probability', 'uploadId', 'x0', 'x1', 'y0', 'y1'] for x in required_fields: - if x not in body: + if x not in data: return f"'{x}' not in request", 400 cur = conn.cursor() + print(f"Inserting prediction {data['probability']} for upload {data['uploadId']}") cur.execute( 'INSERT INTO predictions (probability, upload_id, x0, x1, y0, y1) ' 'VALUES (%s, %s, %s, %s, %s, %s) ' 'ON CONFLICT (upload_id) DO NOTHING', - (body['probability'], body['uploadId'], body['x0'], body['x1'], body['y0'], body['y1'])) + (data['probability'], data['uploadId'], data['x0'], data['x1'], data['y0'], data['y1'])) conn.commit() cur.close() diff --git a/services/feedback-service/app.py b/services/feedback-service/app.py index 2e85b139..623a40cb 100644 --- a/services/feedback-service/app.py +++ b/services/feedback-service/app.py @@ -71,8 +71,12 @@ def receive_feedback(): headers, body = to_binary(event) try: - print(f"Sending event with payload {str(ce_data)} to {K_SINK}") - requests.post(K_SINK, data=body, headers=headers) + print(f"Sending event with payload {str(ce_data)} and headers {str(headers)} to {K_SINK}") + response = requests.post(K_SINK, data=body, headers=headers) + print(f"Received response with status code: {response.status_code}") + if response.status_code >= 300: + print(f"Failed to send event to {K_SINK} with status code: {response.status_code}") + return f"Failed to send event to {K_SINK}", 500 except Exception as e: print(f"Failed to send event to {K_SINK} with error: {e}") return f"Failed to send event to {K_SINK}", 500 diff --git a/services/minio-webhook-source/app.py b/services/minio-webhook-source/app.py index fa2d0613..dd79eaf0 100644 --- a/services/minio-webhook-source/app.py +++ b/services/minio-webhook-source/app.py @@ -132,7 +132,10 @@ def receive_event(): try: print(f"Sending event for object {ce_data_records[0]['bucket']}/{ce_data_records[0]['object']} to {K_SINK}") - requests.post(K_SINK, data=body, headers=headers) + response = requests.post(K_SINK, data=body, headers=headers) + if response.status_code >= 300: + print(f"Failed to send event to {K_SINK} with status code: {response.status_code}") + return f"Failed to send event to {K_SINK}", 500 except Exception as e: print(f"Failed to send event to {K_SINK} with error: {e}") return f"Failed to send event to {K_SINK}", 500 diff --git a/services/prediction-service/app.py b/services/prediction-service/app.py index 40697ff5..808335f6 100644 --- a/services/prediction-service/app.py +++ b/services/prediction-service/app.py @@ -140,7 +140,10 @@ def prediction_request(): payload = {'instances': [image_np.tolist()]} try: - call = requests.post(URL, json=payload) + response = requests.post(URL, json=payload) + if response.status_code >= 300: + print(f"Failed to call inference service for uploadId {upload_id} with status code: {response.status_code}") + return f"Failed to call inference service", 500 except Exception as e: print(f"Failed to call inference service for uploadId {upload_id}") print(e) @@ -149,7 +152,7 @@ def prediction_request(): call_end_time = time.time() print('Inference call took {} seconds'.format(call_end_time - call_start_time)) - inference = call.json() + inference = response.json() if "predictions" not in inference: print(f"Failed to get predictions from inference service for uploadId {upload_id}")