|
3 | 3 | from datetime import datetime |
4 | 4 | from typing import Any |
5 | 5 |
|
6 | | -import aioboto3 |
7 | | -import aio_pika |
| 6 | +import boto3 |
| 7 | +import pika |
8 | 8 | import httpx |
9 | 9 | import redis |
10 | 10 | import redis.exceptions |
| 11 | +from pika.exceptions import AMQPConnectionError, UnroutableError |
11 | 12 | from rdflib import Graph |
12 | 13 | from tqdm import tqdm |
13 | 14 | import xmltodict |
|
16 | 17 | from hml_reader.settings import Settings |
17 | 18 |
|
18 | 19 |
|
19 | | -async def get_rabbitmq_creds(): |
| 20 | +def get_rabbitmq_creds() -> tuple[str, str, str]: |
20 | 21 | secret_arn = os.getenv("RABBITMQ_SECRET_ARN") |
21 | 22 | rabbit_mq_endpoint = os.getenv("RABBITMQ_ENDPOINT") |
22 | | - region = os.getenv("AWS_REGION", "us-east-1") |
| 23 | + region = os.getenv("AWS_REGION", "us-east-1") |
23 | 24 |
|
24 | | - session = aioboto3.Session() |
25 | | - async with session.client("secretsmanager", region_name=region) as client: # type: ignore |
26 | | - secret_value = await client.get_secret_value(SecretId=secret_arn) |
27 | | - secret = json.loads(secret_value["SecretString"]) |
28 | | - return secret["username"], secret["password"], rabbit_mq_endpoint |
| 25 | + if secret_arn is None: |
| 26 | + raise ValueError("Cannot find RABBITMQ_SECRET_ARN") |
| 27 | + if rabbit_mq_endpoint is None: |
| 28 | + raise ValueError("Cannot find RABBITMQ_ENDPOINT") |
29 | 29 |
|
| 30 | + client = boto3.client("secretsmanager", region_name=region) |
| 31 | + secret_value = client.get_secret_value(SecretId=secret_arn) |
| 32 | + secret = json.loads(secret_value["SecretString"]) |
| 33 | + return secret["username"], secret["password"], rabbit_mq_endpoint |
30 | 34 |
|
31 | | -async def get_settings(): |
| 35 | + |
| 36 | +def get_settings(): |
32 | 37 | return Settings() |
33 | 38 |
|
34 | 39 | def fetch_weather_products() -> list[Any]: |
@@ -70,56 +75,65 @@ def fetch_weather_products() -> list[Any]: |
70 | 75 | else: |
71 | 76 | raise httpx.HTTPError(f"Error fetching data: {response.status_code}") |
72 | 77 |
|
73 | | -async def publish(channel: aio_pika.channel, hml: HML, settings: Settings) -> None: |
| 78 | +def publish(channel, hml, settings) -> None: |
74 | 79 | if not channel: |
75 | 80 | raise RuntimeError( |
76 | 81 | "Message could not be sent as there is no RabbitMQ Connection" |
77 | 82 | ) |
78 | | - async with channel.transaction(): |
79 | | - msg = hml.json().encode() |
80 | | - try: |
81 | | - await channel.default_exchange.publish( |
82 | | - aio_pika.Message(body=msg), |
83 | | - routing_key=settings.flooded_data_queue, |
84 | | - mandatory=True |
85 | | - ) |
86 | | - except aio_pika.exceptions.DeliveryError as e: |
87 | | - raise e("Message rejected") |
| 83 | + |
| 84 | + msg = hml.model_dump_json().encode() |
| 85 | + try: |
| 86 | + channel.basic_publish( |
| 87 | + exchange='', |
| 88 | + routing_key=settings.flooded_data_queue, |
| 89 | + body=msg, |
| 90 | + properties=pika.BasicProperties( |
| 91 | + delivery_mode=2, |
| 92 | + ), |
| 93 | + mandatory=True |
| 94 | + ) |
| 95 | + except UnroutableError as e: |
| 96 | + raise RuntimeError("Message rejected") from e |
88 | 97 |
|
89 | 98 |
|
90 | | -async def lambda_handler(event, context): |
| 99 | +def lambda_handler(event, context): |
91 | 100 | print("Producer Lambda triggered") |
92 | 101 |
|
93 | | - user, pwd, rabbit_mq_endpoint = await get_rabbitmq_creds() |
94 | | - |
95 | | - connection = await aio_pika.connect_robust( |
96 | | - f"amqp://{user}:{pwd}@{rabbit_mq_endpoint}/", |
97 | | - heartbeat=30 |
98 | | - ) |
99 | | - settings = await get_settings() |
| 102 | + user, pwd, rabbit_mq_endpoint = get_rabbitmq_creds() |
| 103 | + settings = get_settings() |
100 | 104 |
|
101 | | - async with connection: |
102 | | - channel = await connection.channel(publisher_confirms=False) |
103 | | - await channel.declare_queue( |
| 105 | + creds = pika.PlainCredentials(user, pwd) |
| 106 | + try: |
| 107 | + conn = pika.BlockingConnection(pika.ConnectionParameters( |
| 108 | + host=rabbit_mq_endpoint, |
| 109 | + credentials=creds, |
| 110 | + heartbeat=30, |
| 111 | + blocked_connection_timeout=300, |
| 112 | + )) |
| 113 | + channel = conn.channel() |
| 114 | + channel.queue_declare( |
104 | 115 | settings.flooded_data_queue, |
105 | 116 | durable=True |
106 | 117 | ) |
107 | | - print("Successfully connected to RabbitMQ") |
108 | | - hml_data = fetch_weather_products() |
109 | | - try: |
110 | | - r = redis.Redis( |
111 | | - host=settings.redis_url, |
112 | | - port=settings.redis_port, |
113 | | - decode_responses=True |
114 | | - ) |
115 | | - hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"])) |
116 | | - for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"): |
117 | | - hml_id = hml["id"] |
118 | | - if r.get(hml_id) is None: |
119 | | - hml_obj = HML(**hml) |
120 | | - await publish(channel, hml_obj, settings) |
121 | | - r.set(hml_id, hml_obj.json()) |
122 | | - r.expire(hml_id, 604800) # exires after a week |
123 | | - except redis.exceptions.ConnectionError as e: |
124 | | - raise e("Cannot run Redis service") |
| 118 | + except AMQPConnectionError as e: |
| 119 | + print(f"RabbitMQ connection error: {e}") |
| 120 | + raise RuntimeError("Cannot connect to RabbitMQ service") from e |
| 121 | + print("Successfully connected to RabbitMQ") |
| 122 | + hml_data = fetch_weather_products() |
| 123 | + try: |
| 124 | + r = redis.Redis( |
| 125 | + host=settings.redis_url, |
| 126 | + port=settings.redis_port, |
| 127 | + decode_responses=True |
| 128 | + ) |
| 129 | + hml_data = sorted(hml_data, key=lambda x: datetime.fromisoformat(x["issuanceTime"])) |
| 130 | + for hml in tqdm(hml_data, desc="reading through api.weather.gov HML outputs"): |
| 131 | + hml_id = hml["id"] |
| 132 | + if r.get(hml_id) is None: |
| 133 | + hml_obj = HML(**hml) |
| 134 | + publish(channel, hml_obj, settings) |
| 135 | + r.set(hml_id, hml_obj.model_dump_json()) |
| 136 | + r.expire(hml_id, 604800) # exires after a week |
| 137 | + except redis.exceptions.ConnectionError as e: |
| 138 | + raise e("Cannot run Redis service") |
125 | 139 | return {"status": "ok"} |
0 commit comments