|
3 | 3 | from node.logger import ops_logger |
4 | 4 | from node.settings import CONFIG |
5 | 5 |
|
| 6 | + |
6 | 7 | def send_to_rabbitmq(data): |
7 | | - connection = None |
8 | | - try: |
9 | | - ops_logger.info("Attempting to connect to RabbitMQ") |
10 | | - if CONFIG["rabbitmq_user"] and CONFIG["rabbitmq_password"]: |
11 | | - credentials = pika.PlainCredentials(CONFIG["rabbitmq_user"], CONFIG["rabbitmq_password"]) |
12 | | - else: |
13 | | - credentials = pika.PlainCredentials("guest", "guest") |
14 | | - connection = pika.BlockingConnection(pika.ConnectionParameters(host=CONFIG["rabbitmq_host"], port=CONFIG["rabbitmq_port"], credentials=credentials)) |
15 | | - channel = connection.channel() |
16 | | - ops_logger.info("Connected to RabbitMQ") |
| 8 | + connection = None |
| 9 | + try: |
| 10 | + ops_logger.info("Attempting to connect to RabbitMQ") |
| 11 | + if CONFIG["rabbitmq_user"] and CONFIG["rabbitmq_password"]: |
| 12 | + credentials = pika.PlainCredentials( |
| 13 | + CONFIG["rabbitmq_user"], CONFIG["rabbitmq_password"] |
| 14 | + ) |
| 15 | + else: |
| 16 | + credentials = pika.PlainCredentials("guest", "guest") |
| 17 | + connection = pika.BlockingConnection( |
| 18 | + pika.ConnectionParameters( |
| 19 | + host=CONFIG["rabbitmq_host"], |
| 20 | + port=CONFIG["rabbitmq_port"], |
| 21 | + credentials=credentials, |
| 22 | + ) |
| 23 | + ) |
| 24 | + channel = connection.channel() |
| 25 | + ops_logger.info("Connected to RabbitMQ") |
17 | 26 |
|
18 | | - ops_logger.info(f"Declaring queue: {CONFIG['rabbitmq_queue']}") |
19 | | - channel.queue_declare(queue=CONFIG["rabbitmq_queue"], durable=True) |
| 27 | + ops_logger.info(f"Declaring queue: {CONFIG['rabbitmq_queue']}") |
| 28 | + channel.queue_declare(queue=CONFIG["rabbitmq_queue"], durable=True) |
20 | 29 |
|
21 | | - ops_logger.info(f"Publishing data to queue: {CONFIG['rabbitmq_queue']}") |
22 | | - channel.basic_publish( |
23 | | - exchange='', |
24 | | - routing_key=CONFIG["rabbitmq_queue"], |
25 | | - body=json.dumps(data) |
26 | | - ) |
| 30 | + ops_logger.info(f"Publishing data to queue: {CONFIG['rabbitmq_queue']}") |
| 31 | + channel.basic_publish( |
| 32 | + exchange="", |
| 33 | + routing_key=CONFIG["rabbitmq_queue"], |
| 34 | + body=json.dumps(data), |
| 35 | + ) |
27 | 36 |
|
28 | | - ops_logger.info(f"Sent data to RabbitMQ: {data}") |
29 | | - except pika.exceptions.AMQPConnectionError as e: |
30 | | - ops_logger.error(f"RabbitMQ connection error") |
31 | | - except pika.exceptions.AMQPChannelError as e: |
32 | | - ops_logger.error(f"RabbitMQ channel error") |
33 | | - except Exception as e: |
34 | | - ops_logger.error(f"RabbitMQ error") |
35 | | - finally: |
36 | | - try: |
37 | | - connection.close() |
38 | | - ops_logger.info("Closed RabbitMQ connection") |
| 37 | + ops_logger.info(f"Sent data to RabbitMQ: {data}") |
| 38 | + except pika.exceptions.AMQPConnectionError as e: |
| 39 | + ops_logger.error("RabbitMQ connection error") |
| 40 | + except pika.exceptions.AMQPChannelError as e: |
| 41 | + ops_logger.error("RabbitMQ channel error") |
39 | 42 | except Exception as e: |
40 | | - ops_logger.error(f"Error closing RabbitMQ connection: {e}") |
| 43 | + ops_logger.error("RabbitMQ error") |
| 44 | + finally: |
| 45 | + try: |
| 46 | + connection.close() |
| 47 | + ops_logger.info("Closed RabbitMQ connection") |
| 48 | + except Exception as e: |
| 49 | + ops_logger.error(f"Error closing RabbitMQ connection: {e}") |
0 commit comments