From de9b8ae8a556590adc77228c8cfc15f86ead00e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Wed, 28 Jan 2026 16:48:52 +0100 Subject: [PATCH] feature/RabitMQ adapter tweaks --- .../resources/props/sample.props.template | 2 ++ .../rabbitmq/RabbitMQUtils.scala | 36 ++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/obp-api/src/main/resources/props/sample.props.template b/obp-api/src/main/resources/props/sample.props.template index c0f151d836..77a420868f 100644 --- a/obp-api/src/main/resources/props/sample.props.template +++ b/obp-api/src/main/resources/props/sample.props.template @@ -1047,6 +1047,8 @@ featured_apis=elasticSearchWarehouseV300 # rabbitmq_connector.username=obp # rabbitmq_connector.password=obp # rabbitmq_connector.virtual_host=/ +# rabbitmq_connector.request_queue=obp_rpc_queue +# rabbitmq_connector.response_queue_prefix=obp_reply_queue # -- RabbitMQ Adapter -------------------------------------------- #rabbitmq.adapter.enabled=false diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 52d0b1975e..d321379910 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -53,8 +53,8 @@ object RabbitMQUtils extends MdcLoggable{ private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats - val RPC_QUEUE_NAME: String = "obp_rpc_queue" - val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue" + val RPC_QUEUE_NAME: String = APIUtil.getPropsValue("rabbitmq_connector.request_queue", "obp_rpc_queue") + val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = APIUtil.getPropsValue("rabbitmq_connector.response_queue_prefix", "obp_reply_queue") class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback { @@ -92,14 +92,30 @@ object RabbitMQUtils extends MdcLoggable{ val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string val connection = RabbitMQConnectionPool.borrowConnection() + // Check if queue already exists using a temporary channel (passive declare closes channel on failure) + val queueExists = try { + val tempChannel = connection.createChannel() + try { + tempChannel.queueDeclarePassive(RPC_QUEUE_NAME) + true + } finally { + if (tempChannel.isOpen) tempChannel.close() + } + } catch { + case _: java.io.IOException => false + } + val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. - channel.queueDeclare( - RPC_QUEUE_NAME, // Queue name - true, // durable: non-persis, here set durable = true - false, // exclusive: non-excl4, here set exclusive = false - false, // autoDelete: delete, here set autoDelete = false - rpcQueueArgs // extra arguments, - ) + // Only declare queue if it doesn't already exist (avoids argument conflicts with external adapters) + if (!queueExists) { + channel.queueDeclare( + RPC_QUEUE_NAME, // Queue name + true, // durable: non-persis, here set durable = true + false, // exclusive: non-excl4, here set exclusive = false + false, // autoDelete: delete, here set autoDelete = false + rpcQueueArgs // extra arguments, + ) + } val replyQueueName:String = channel.queueDeclare( s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue @@ -112,6 +128,7 @@ object RabbitMQUtils extends MdcLoggable{ val rabbitResponseJsonFuture = { try { logger.debug(s"${RabbitMQConnector_vOct2024.toString} outBoundJson: $messageId = $rabbitRequestJsonString") + logger.info(s"[RabbitMQ] Sending message to queue: $RPC_QUEUE_NAME, messageId: $messageId, replyTo: $replyQueueName") val rabbitMQCorrelationId = UUID.randomUUID().toString val rabbitMQProps = new BasicProperties.Builder() @@ -121,6 +138,7 @@ object RabbitMQUtils extends MdcLoggable{ .replyTo(replyQueueName) .build() channel.basicPublish("", RPC_QUEUE_NAME, rabbitMQProps, rabbitRequestJsonString.getBytes("UTF-8")) + logger.info(s"[RabbitMQ] Message published, correlationId: $rabbitMQCorrelationId, waiting for response on: $replyQueueName") val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel) channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)