diff --git a/app/__init__.py b/app/__init__.py index 380964b53..16ffbd5a9 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,6 +4,7 @@ import time import uuid from contextlib import contextmanager +from multiprocessing import Manager from time import monotonic from celery import Celery, Task, current_task @@ -119,6 +120,9 @@ def create_app(application): redis_store.init_app(application) document_download_client.init_app(application) + manager = Manager() + application.config["job_cache"] = manager.dict() + register_blueprint(application) # avoid circular imports by importing this file later diff --git a/app/aws/s3.py b/app/aws/s3.py index 44785cf98..f83b9059d 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -2,7 +2,6 @@ import re import time from concurrent.futures import ThreadPoolExecutor -from multiprocessing import Manager import botocore from boto3 import Session @@ -16,8 +15,6 @@ # Temporarily extend cache to 7 days ttl = 60 * 60 * 24 * 7 -manager = Manager() -job_cache = manager.dict() # Global variable @@ -25,17 +22,40 @@ s3_resource = None -def set_job_cache(job_cache, key, value): +def set_job_cache(key, value): + current_app.logger.info(f"Setting {key} in the job_cache.") + job_cache = current_app.config["job_cache"] job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) +def get_job_cache(key): + job_cache = current_app.config["job_cache"] + ret = job_cache.get(key) + if ret is None: + current_app.logger.warning(f"Could not find {key} in the job_cache.") + else: + current_app.logger.info(f"Got {key} from job_cache.") + return ret + + +def len_job_cache(): + job_cache = current_app.config["job_cache"] + ret = len(job_cache) + current_app.logger.info(f"Length of job_cache is {ret}") + return ret + + def clean_cache(): + job_cache = current_app.config["job_cache"] current_time = time.time() keys_to_delete = [] for key, (_, expiry_time) in job_cache.items(): if expiry_time < current_time: keys_to_delete.append(key) + current_app.logger.info( + f"Deleting the following keys from the job_cache: {keys_to_delete}" + ) for key in keys_to_delete: del job_cache[key] @@ -162,17 +182,16 @@ def read_s3_file(bucket_name, object_key, s3res): """ try: job_id = get_job_id_from_s3_object_key(object_key) - if job_cache.get(job_id) is None: + if get_job_cache(job_id) is None: object = ( s3res.Object(bucket_name, object_key) .get()["Body"] .read() .decode("utf-8") ) - set_job_cache(job_cache, job_id, object) - set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) + set_job_cache(job_id, object) + set_job_cache(f"{job_id}_phones", extract_phones(object)) set_job_cache( - job_cache, f"{job_id}_personalisation", extract_personalisation(object), ) @@ -192,7 +211,7 @@ def get_s3_files(): s3res = get_s3_resource() current_app.logger.info( - f"job_cache length before regen: {len(job_cache)} #notify-admin-1200" + f"job_cache length before regen: {len_job_cache()} #notify-admin-1200" ) try: with ThreadPoolExecutor() as executor: @@ -201,7 +220,7 @@ def get_s3_files(): current_app.logger.exception("Connection pool issue") current_app.logger.info( - f"job_cache length after regen: {len(job_cache)} #notify-admin-1200" + f"job_cache length after regen: {len_job_cache()} #notify-admin-1200" ) @@ -424,12 +443,12 @@ def extract_personalisation(job): def get_phone_number_from_s3(service_id, job_id, job_row_number): - job = job_cache.get(job_id) + job = get_job_cache(job_id) if job is None: current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors - set_job_cache(job_cache, job_id, job) + set_job_cache(job_id, job) else: # skip expiration date from cache, we don't need it here job = job[0] @@ -441,7 +460,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): return "Unavailable" phones = extract_phones(job) - set_job_cache(job_cache, f"{job_id}_phones", phones) + set_job_cache(f"{job_id}_phones", phones) # If we can find the quick dictionary, use it phone_to_return = phones[job_row_number] @@ -458,12 +477,12 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): # We don't want to constantly pull down a job from s3 every time we need the personalisation. # At the same time we don't want to store it in redis or the db # So this is a little recycling mechanism to reduce the number of downloads. - job = job_cache.get(job_id) + job = get_job_cache(job_id) if job is None: current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors - set_job_cache(job_cache, job_id, job) + set_job_cache(job_id, job) else: # skip expiration date from cache, we don't need it here job = job[0] @@ -478,9 +497,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - set_job_cache(job_cache, f"{job_id}_personalisation", extract_personalisation(job)) + set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) - return job_cache.get(f"{job_id}_personalisation")[0].get(job_row_number) + return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number) def get_job_metadata_from_s3(service_id, job_id): diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 7f1509896..88565bd22 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -17,8 +17,8 @@ # there may come a time when increasing this helps # with job cache management. # max_pool_connections=10, - # Reducing to 4 connections due to BrokenPipeErrors - max_pool_connections=4, + # Reducing to 7 connections due to BrokenPipeErrors + max_pool_connections=7, ) diff --git a/deploy-config/production.yml b/deploy-config/production.yml index f593f63a2..d09084ac0 100644 --- a/deploy-config/production.yml +++ b/deploy-config/production.yml @@ -1,8 +1,8 @@ env: production web_instances: 2 -web_memory: 2G +web_memory: 3G worker_instances: 4 -worker_memory: 3G +worker_memory: 2G scheduler_memory: 256M public_api_route: notify-api.app.cloud.gov admin_base_url: https://beta.notify.gov diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 6efe55fe2..e4a9c1c07 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -1,7 +1,7 @@ import os from datetime import timedelta from os import getenv -from unittest.mock import ANY, MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, call, patch import botocore import pytest @@ -70,7 +70,7 @@ def test_cleanup_old_s3_objects(mocker): mock_remove_csv_object.assert_called_once_with("A") -def test_read_s3_file_success(mocker): +def test_read_s3_file_success(client, mocker): mock_s3res = MagicMock() mock_extract_personalisation = mocker.patch("app.aws.s3.extract_personalisation") mock_extract_phones = mocker.patch("app.aws.s3.extract_phones") @@ -89,16 +89,13 @@ def test_read_s3_file_success(mocker): mock_extract_phones.return_value = ["1234567890"] mock_extract_personalisation.return_value = {"name": "John Doe"} - global job_cache - job_cache = {} - read_s3_file(bucket_name, object_key, mock_s3res) mock_get_job_id.assert_called_once_with(object_key) mock_s3res.Object.assert_called_once_with(bucket_name, object_key) expected_calls = [ - call(ANY, job_id, file_content), - call(ANY, f"{job_id}_phones", ["1234567890"]), - call(ANY, f"{job_id}_personalisation", {"name": "John Doe"}), + call(job_id, file_content), + call(f"{job_id}_phones", ["1234567890"]), + call(f"{job_id}_personalisation", {"name": "John Doe"}), ] mock_set_job_cache.assert_has_calls(expected_calls, any_order=True) @@ -380,9 +377,12 @@ def test_file_exists_false(notify_api, mocker): get_s3_mock.assert_called_once() -def test_get_s3_files_success(notify_api, mocker): +def test_get_s3_files_success(client, mocker): mock_current_app = mocker.patch("app.aws.s3.current_app") - mock_current_app.config = {"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}} + mock_current_app.config = { + "CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}, + "job_cache": {}, + } mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor") mock_read_s3_file = mocker.patch("app.aws.s3.read_s3_file") mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects")