diff --git a/lambdas/producer/producer_lambda.py b/lambdas/producer/producer_lambda.py index b8ca5d1..2bfb4c7 100644 --- a/lambdas/producer/producer_lambda.py +++ b/lambdas/producer/producer_lambda.py @@ -3,11 +3,12 @@ from datetime import datetime from typing import Any -import aioboto3 -import aio_pika +import boto3 +import pika import httpx import redis import redis.exceptions +from pika.exceptions import AMQPConnectionError, UnroutableError from rdflib import Graph from tqdm import tqdm import xmltodict @@ -16,19 +17,23 @@ from hml_reader.settings import Settings -async def get_rabbitmq_creds(): +def get_rabbitmq_creds() -> tuple[str, str, str]: secret_arn = os.getenv("RABBITMQ_SECRET_ARN") rabbit_mq_endpoint = os.getenv("RABBITMQ_ENDPOINT") - region = os.getenv("AWS_REGION", "us-east-1") + region = os.getenv("AWS_REGION", "us-east-1") - session = aioboto3.Session() - async with session.client("secretsmanager", region_name=region) as client: # type: ignore - secret_value = await client.get_secret_value(SecretId=secret_arn) - secret = json.loads(secret_value["SecretString"]) - return secret["username"], secret["password"], rabbit_mq_endpoint + if secret_arn is None: + raise ValueError("Cannot find RABBITMQ_SECRET_ARN") + if rabbit_mq_endpoint is None: + raise ValueError("Cannot find RABBITMQ_ENDPOINT") + client = boto3.client("secretsmanager", region_name=region) + secret_value = client.get_secret_value(SecretId=secret_arn) + secret = json.loads(secret_value["SecretString"]) + return secret["username"], secret["password"], rabbit_mq_endpoint -async def get_settings(): + +def get_settings(): return Settings() def fetch_weather_products() -> list[Any]: @@ -70,56 +75,65 @@ def fetch_weather_products() -> list[Any]: else: raise httpx.HTTPError(f"Error fetching data: {response.status_code}") -async def publish(channel: aio_pika.channel, hml: HML, settings: Settings) -> None: +def publish(channel, hml, settings) -> None: if not channel: raise RuntimeError( "Message could not be sent as there is no RabbitMQ Connection" ) - async with channel.transaction(): - msg = hml.json().encode() - try: - await channel.default_exchange.publish( - aio_pika.Message(body=msg), - routing_key=settings.flooded_data_queue, - mandatory=True - ) - except aio_pika.exceptions.DeliveryError as e: - raise e("Message rejected") + + msg = hml.model_dump_json().encode() + try: + channel.basic_publish( + exchange='', + routing_key=settings.flooded_data_queue, + body=msg, + properties=pika.BasicProperties( + delivery_mode=2, + ), + mandatory=True + ) + except UnroutableError as e: + raise RuntimeError("Message rejected") from e -async def lambda_handler(event, context): +def lambda_handler(event, context): print("Producer Lambda triggered") - user, pwd, rabbit_mq_endpoint = await get_rabbitmq_creds() - - connection = await aio_pika.connect_robust( - f"amqp://{user}:{pwd}@{rabbit_mq_endpoint}/", - heartbeat=30 - ) - settings = await get_settings() + user, pwd, rabbit_mq_endpoint = get_rabbitmq_creds() + settings = get_settings() - async with connection: - channel = await connection.channel(publisher_confirms=False) - await channel.declare_queue( + creds = pika.PlainCredentials(user, pwd) + try: + conn = pika.BlockingConnection(pika.ConnectionParameters( + host=rabbit_mq_endpoint, + credentials=creds, + heartbeat=30, + blocked_connection_timeout=300, + )) + channel = conn.channel() + channel.queue_declare( settings.flooded_data_queue, durable=True ) - print("Successfully connected to RabbitMQ") - hml_data = fetch_weather_products() - try: - r = redis.Redis( - host=settings.redis_url, - port=settings.redis_port, - decode_responses=True - ) - hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"])) - for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"): - hml_id = hml["id"] - if r.get(hml_id) is None: - hml_obj = HML(**hml) - await publish(channel, hml_obj, settings) - r.set(hml_id, hml_obj.json()) - r.expire(hml_id, 604800) # exires after a week - except redis.exceptions.ConnectionError as e: - raise e("Cannot run Redis service") + except AMQPConnectionError as e: + print(f"RabbitMQ connection error: {e}") + raise RuntimeError("Cannot connect to RabbitMQ service") from e + print("Successfully connected to RabbitMQ") + hml_data = fetch_weather_products() + try: + r = redis.Redis( + host=settings.redis_url, + port=settings.redis_port, + decode_responses=True + ) + hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"])) + for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"): + hml_id = hml["id"] + if r.get(hml_id) is None: + hml_obj = HML(**hml) + publish(channel, hml_obj, settings) + r.set(hml_id, hml_obj.model_dump_json()) + r.expire(hml_id, 604800) # exires after a week + except redis.exceptions.ConnectionError as e: + raise e("Cannot run Redis service") return {"status": "ok"} diff --git a/lambdas/producer/pyproject.toml b/lambdas/producer/pyproject.toml index 6c6c4e2..2c5552c 100644 --- a/lambdas/producer/pyproject.toml +++ b/lambdas/producer/pyproject.toml @@ -8,7 +8,7 @@ authors = [ dependencies = [ "pydantic==2.7.1", "httpx==0.27.0", - "aio-pika==9.4.3", + "pika==1.3.2", "pydantic-settings==2.3.4", "aioboto3==15.1.0", "aio-pika==9.4.3",