Skip to content

Commit

Permalink
merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth Kehl committed Nov 11, 2024
2 parents c5b2274 + 1d5e54e commit 62c4ccc
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 282 deletions.
4 changes: 4 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import uuid
from contextlib import contextmanager
from multiprocessing import Manager
from time import monotonic

from celery import Celery, Task, current_task
Expand Down Expand Up @@ -119,6 +120,9 @@ def create_app(application):
redis_store.init_app(application)
document_download_client.init_app(application)

manager = Manager()
application.config["job_cache"] = manager.dict()

register_blueprint(application)

# avoid circular imports by importing this file later
Expand Down
53 changes: 36 additions & 17 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager

import botocore
from boto3 import Session
Expand All @@ -16,26 +15,47 @@

# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
manager = Manager()
job_cache = manager.dict()


# Global variable
s3_client = None
s3_resource = None


def set_job_cache(job_cache, key, value):
def set_job_cache(key, value):
current_app.logger.info(f"Setting {key} in the job_cache.")
job_cache = current_app.config["job_cache"]
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)


def get_job_cache(key):
job_cache = current_app.config["job_cache"]
ret = job_cache.get(key)
if ret is None:
current_app.logger.warning(f"Could not find {key} in the job_cache.")
else:
current_app.logger.info(f"Got {key} from job_cache.")
return ret


def len_job_cache():
job_cache = current_app.config["job_cache"]
ret = len(job_cache)
current_app.logger.info(f"Length of job_cache is {ret}")
return ret


def clean_cache():
job_cache = current_app.config["job_cache"]
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
if expiry_time < current_time:
keys_to_delete.append(key)

current_app.logger.info(
f"Deleting the following keys from the job_cache: {keys_to_delete}"
)
for key in keys_to_delete:
del job_cache[key]

Expand Down Expand Up @@ -162,17 +182,16 @@ def read_s3_file(bucket_name, object_key, s3res):
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
if job_cache.get(job_id) is None:
if get_job_cache(job_id) is None:
object = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_cache, job_id, object)
set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object))
set_job_cache(job_id, object)
set_job_cache(f"{job_id}_phones", extract_phones(object))
set_job_cache(
job_cache,
f"{job_id}_personalisation",
extract_personalisation(object),
)
Expand All @@ -192,7 +211,7 @@ def get_s3_files():

s3res = get_s3_resource()
current_app.logger.info(
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
f"job_cache length before regen: {len_job_cache()} #notify-admin-1200"
)
try:
with ThreadPoolExecutor() as executor:
Expand All @@ -201,7 +220,7 @@ def get_s3_files():
current_app.logger.exception("Connection pool issue")

current_app.logger.info(
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
f"job_cache length after regen: {len_job_cache()} #notify-admin-1200"
)


Expand Down Expand Up @@ -424,12 +443,12 @@ def extract_personalisation(job):


def get_phone_number_from_s3(service_id, job_id, job_row_number):
job = job_cache.get(job_id)
job = get_job_cache(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
set_job_cache(job_cache, job_id, job)
set_job_cache(job_id, job)
else:
# skip expiration date from cache, we don't need it here
job = job[0]
Expand All @@ -441,7 +460,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
return "Unavailable"

phones = extract_phones(job)
set_job_cache(job_cache, f"{job_id}_phones", phones)
set_job_cache(f"{job_id}_phones", phones)

# If we can find the quick dictionary, use it
phone_to_return = phones[job_row_number]
Expand All @@ -458,12 +477,12 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = job_cache.get(job_id)
job = get_job_cache(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
set_job_cache(job_cache, job_id, job)
set_job_cache(job_id, job)
else:
# skip expiration date from cache, we don't need it here
job = job[0]
Expand All @@ -478,9 +497,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}

set_job_cache(job_cache, f"{job_id}_personalisation", extract_personalisation(job))
set_job_cache(f"{job_id}_personalisation", extract_personalisation(job))

return job_cache.get(f"{job_id}_personalisation")[0].get(job_row_number)
return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number)


def get_job_metadata_from_s3(service_id, job_id):
Expand Down
4 changes: 3 additions & 1 deletion app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# This is the default but just for doc sake
# there may come a time when increasing this helps
# with job cache management.
max_pool_connections=10,
# max_pool_connections=10,
# Reducing to 7 connections due to BrokenPipeErrors
max_pool_connections=7,
)


Expand Down
2 changes: 1 addition & 1 deletion app/dao/services_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def dao_fetch_stats_for_service_from_days(service_id, start_date, end_date):
func.date_trunc("day", NotificationAllTimeView.created_at),
)
)
return db.session.execute(stmt).scalars().all()
return db.session.execute(stmt).all()


def dao_fetch_stats_for_service_from_days_for_user(
Expand Down
2 changes: 1 addition & 1 deletion app/service/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def add_user_to_service(service_id, user_id):
service = dao_fetch_service_by_id(service_id)
user = get_user_by_id(user_id=user_id)
if user in service.users:
error = "User id: {} already part of service id: {}".format(user_id, service_id)
error = f"User id: {user_id} already part of service id: {service_id}"
raise InvalidRequest(error, status_code=400)

data = request.get_json()
Expand Down
37 changes: 29 additions & 8 deletions app/service_invite/rest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import os
from urllib.parse import unquote

from flask import Blueprint, current_app, jsonify, request
from itsdangerous import BadData, SignatureExpired
Expand Down Expand Up @@ -32,7 +33,7 @@
register_errors(service_invite)


def _create_service_invite(invited_user, nonce):
def _create_service_invite(invited_user, nonce, state):

template_id = current_app.config["INVITATION_EMAIL_TEMPLATE_ID"]

Expand All @@ -52,13 +53,14 @@ def _create_service_invite(invited_user, nonce):
data["invited_user_id"] = str(invited_user.id)
data["invited_user_email"] = invited_user.email_address

invite_redis_key = f"invite-data-{unquote(state)}"
redis_store.set(invite_redis_key, get_user_data_url_safe(data))

url = os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"]

url = url.replace("NONCE", nonce) # handed from data sent from admin.

user_data_url_safe = get_user_data_url_safe(data)

url = url.replace("STATE", user_data_url_safe)
url = url.replace("STATE", state)

personalisation = {
"user_name": invited_user.from_user.name,
Expand All @@ -85,6 +87,8 @@ def _create_service_invite(invited_user, nonce):
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)

return data


@service_invite.route("/service/<service_id>/invite", methods=["POST"])
def create_invited_user(service_id):
Expand All @@ -94,13 +98,18 @@ def create_invited_user(service_id):
except KeyError:
current_app.logger.exception("nonce not found in submitted data.")
raise
try:
state = request_json.pop("state")
except KeyError:
current_app.logger.exception("state not found in submitted data.")
raise

invited_user = invited_user_schema.load(request_json)
save_invited_user(invited_user)

_create_service_invite(invited_user, nonce)
invite_data = _create_service_invite(invited_user, nonce, state)

return jsonify(data=invited_user_schema.dump(invited_user)), 201
return jsonify(data=invited_user_schema.dump(invited_user), invite=invite_data), 201


@service_invite.route("/service/<service_id>/invite/expired", methods=["GET"])
Expand Down Expand Up @@ -148,6 +157,18 @@ def resend_service_invite(service_id, invited_user_id):
Note:
This ignores the POST data entirely.
"""
request_json = request.get_json()
try:
nonce = request_json.pop("nonce")
except KeyError:
current_app.logger.exception("nonce not found in submitted data.")
raise
try:
state = request_json.pop("state")
except KeyError:
current_app.logger.exception("state not found in submitted data.")
raise

fetched = get_expired_invite_by_service_and_id(
service_id=service_id,
invited_user_id=invited_user_id,
Expand All @@ -161,9 +182,9 @@ def resend_service_invite(service_id, invited_user_id):

save_invited_user(update_dict)

_create_service_invite(fetched, current_app.config["ADMIN_BASE_URL"])
invite_data = _create_service_invite(fetched, nonce, state)

return jsonify(data=invited_user_schema.dump(fetched)), 200
return jsonify(data=invited_user_schema.dump(fetched), invite=invite_data), 200


def invited_user_url(invited_user_id, invite_link_host=None):
Expand Down
4 changes: 2 additions & 2 deletions deploy-config/production.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
env: production
web_instances: 2
web_memory: 2G
worker_instances: 2
web_memory: 3G
worker_instances: 4
worker_memory: 2G
scheduler_memory: 256M
public_api_route: notify-api.app.cloud.gov
Expand Down
Loading

0 comments on commit 62c4ccc

Please sign in to comment.