Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 64 additions & 50 deletions lambdas/producer/producer_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from datetime import datetime
from typing import Any

import aioboto3
import aio_pika
import boto3
import pika
import httpx
import redis
import redis.exceptions
from pika.exceptions import AMQPConnectionError, UnroutableError
from rdflib import Graph
from tqdm import tqdm
import xmltodict
Expand All @@ -16,19 +17,23 @@
from hml_reader.settings import Settings


async def get_rabbitmq_creds():
def get_rabbitmq_creds() -> tuple[str, str, str]:
secret_arn = os.getenv("RABBITMQ_SECRET_ARN")
rabbit_mq_endpoint = os.getenv("RABBITMQ_ENDPOINT")
region = os.getenv("AWS_REGION", "us-east-1")
region = os.getenv("AWS_REGION", "us-east-1")

session = aioboto3.Session()
async with session.client("secretsmanager", region_name=region) as client: # type: ignore
secret_value = await client.get_secret_value(SecretId=secret_arn)
secret = json.loads(secret_value["SecretString"])
return secret["username"], secret["password"], rabbit_mq_endpoint
if secret_arn is None:
raise ValueError("Cannot find RABBITMQ_SECRET_ARN")
if rabbit_mq_endpoint is None:
raise ValueError("Cannot find RABBITMQ_ENDPOINT")

client = boto3.client("secretsmanager", region_name=region)
secret_value = client.get_secret_value(SecretId=secret_arn)
secret = json.loads(secret_value["SecretString"])
return secret["username"], secret["password"], rabbit_mq_endpoint

async def get_settings():

def get_settings():
return Settings()

def fetch_weather_products() -> list[Any]:
Expand Down Expand Up @@ -70,56 +75,65 @@ def fetch_weather_products() -> list[Any]:
else:
raise httpx.HTTPError(f"Error fetching data: {response.status_code}")

async def publish(channel: aio_pika.channel, hml: HML, settings: Settings) -> None:
def publish(channel, hml, settings) -> None:
if not channel:
raise RuntimeError(
"Message could not be sent as there is no RabbitMQ Connection"
)
async with channel.transaction():
msg = hml.json().encode()
try:
await channel.default_exchange.publish(
aio_pika.Message(body=msg),
routing_key=settings.flooded_data_queue,
mandatory=True
)
except aio_pika.exceptions.DeliveryError as e:
raise e("Message rejected")

msg = hml.model_dump_json().encode()
try:
channel.basic_publish(
exchange='',
routing_key=settings.flooded_data_queue,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
),
mandatory=True
)
except UnroutableError as e:
raise RuntimeError("Message rejected") from e


async def lambda_handler(event, context):
def lambda_handler(event, context):
print("Producer Lambda triggered")

user, pwd, rabbit_mq_endpoint = await get_rabbitmq_creds()

connection = await aio_pika.connect_robust(
f"amqp://{user}:{pwd}@{rabbit_mq_endpoint}/",
heartbeat=30
)
settings = await get_settings()
user, pwd, rabbit_mq_endpoint = get_rabbitmq_creds()
settings = get_settings()

async with connection:
channel = await connection.channel(publisher_confirms=False)
await channel.declare_queue(
creds = pika.PlainCredentials(user, pwd)
try:
conn = pika.BlockingConnection(pika.ConnectionParameters(
host=rabbit_mq_endpoint,
credentials=creds,
heartbeat=30,
blocked_connection_timeout=300,
))
channel = conn.channel()
channel.queue_declare(
settings.flooded_data_queue,
durable=True
)
print("Successfully connected to RabbitMQ")
hml_data = fetch_weather_products()
try:
r = redis.Redis(
host=settings.redis_url,
port=settings.redis_port,
decode_responses=True
)
hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"]))
for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"):
hml_id = hml["id"]
if r.get(hml_id) is None:
hml_obj = HML(**hml)
await publish(channel, hml_obj, settings)
r.set(hml_id, hml_obj.json())
r.expire(hml_id, 604800) # exires after a week
except redis.exceptions.ConnectionError as e:
raise e("Cannot run Redis service")
except AMQPConnectionError as e:
print(f"RabbitMQ connection error: {e}")
raise RuntimeError("Cannot connect to RabbitMQ service") from e
print("Successfully connected to RabbitMQ")
hml_data = fetch_weather_products()
try:
r = redis.Redis(
host=settings.redis_url,
port=settings.redis_port,
decode_responses=True
)
hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"]))
for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"):
hml_id = hml["id"]
if r.get(hml_id) is None:
hml_obj = HML(**hml)
publish(channel, hml_obj, settings)
r.set(hml_id, hml_obj.model_dump_json())
r.expire(hml_id, 604800) # exires after a week
except redis.exceptions.ConnectionError as e:
raise e("Cannot run Redis service")
return {"status": "ok"}
2 changes: 1 addition & 1 deletion lambdas/producer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = [
dependencies = [
"pydantic==2.7.1",
"httpx==0.27.0",
"aio-pika==9.4.3",
"pika==1.3.2",
"pydantic-settings==2.3.4",
"aioboto3==15.1.0",
"aio-pika==9.4.3",
Expand Down