From 0fa1e15fc37436c248fd18ecde818b1636b9db92 Mon Sep 17 00:00:00 2001 From: Nikita Kokitkar Date: Thu, 19 Mar 2026 17:06:58 -0700 Subject: [PATCH 1/3] Throw Runtime Exception when manual ack fails --- .../apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java | 2 +- .../main/java/org/apache/camel/component/paho/PahoConsumer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java index c129ccba17800..365eba01acef6 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java @@ -167,7 +167,7 @@ public void onComplete(Exchange exchange) { try { PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); } catch (MqttException e) { - LOG.warn("Failed to commit message with ID: {} due to {}", mqttMessage.getId(), e.getMessage(), e); + throw new RuntimeException(e); } } diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index b2544c42e255d..252fce1d61a35 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -154,7 +154,7 @@ public void onComplete(Exchange exchange) { try { PahoConsumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); } catch (MqttException e) { - LOG.warn("Failed to commit message with ID {} due to MqttException.", mqttMessage.getId()); + throw new RuntimeException(e); } } From 05ee1eb63d5a9d64be1c61f1901e51ae81e2938e Mon Sep 17 00:00:00 2001 From: Nikita Kokitkar Date: Wed, 1 Apr 2026 14:55:33 -0700 Subject: [PATCH 2/3] Resolve Pr comments --- .../paho/mqtt5/PahoMqtt5Consumer.java | 4 +- ...hoMqtt5ConsumerManualAckExceptionTest.java | 130 ++++++++++++++++++ .../camel/component/paho/PahoConsumer.java | 6 +- .../PahoConsumerManualAckExceptionTest.java | 130 ++++++++++++++++++ 4 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ConsumerManualAckExceptionTest.java create mode 100644 components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoConsumerManualAckExceptionTest.java diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java index 365eba01acef6..ef6413031742f 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java @@ -167,7 +167,9 @@ public void onComplete(Exchange exchange) { try { PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); } catch (MqttException e) { - throw new RuntimeException(e); + getExceptionHandler().handleException( + "Error acknowledging MQTT message with ID: " + mqttMessage.getId(), + exchange, e); } } diff --git a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ConsumerManualAckExceptionTest.java b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ConsumerManualAckExceptionTest.java new file mode 100644 index 0000000000000..d791979998ca4 --- /dev/null +++ b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ConsumerManualAckExceptionTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho.mqtt5; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit6.CamelTestSupport; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class PahoMqtt5ConsumerManualAckExceptionTest extends CamelTestSupport { + + @Test + public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception { + PahoMqtt5Endpoint endpoint = context.getEndpoint( + "paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION)) + .when(mockClient).messageArrivedComplete(any(int.class), any(int.class)); + consumer.setClient(mockClient); + + ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class); + consumer.setExceptionHandler(mockExceptionHandler); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + mqttMessage.setQos(2); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + assertNotNull(completions); + assertFalse(completions.isEmpty()); + + // Trigger the onComplete callback (simulating successful exchange processing) + completions.get(0).onComplete(exchange); + + // Verify exception handler was called instead of throwing RuntimeException + verify(mockExceptionHandler).handleException( + contains("Error acknowledging MQTT message"), + eq(exchange), + any(MqttException.class)); + } + + @Test + public void testOnCompleteAcknowledgesSuccessfully() throws Exception { + PahoMqtt5Endpoint endpoint = context.getEndpoint( + "paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + consumer.setClient(mockClient); + + ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class); + consumer.setExceptionHandler(mockExceptionHandler); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + mqttMessage.setQos(2); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + assertNotNull(completions); + assertEquals(1, completions.size()); + + // Trigger the onComplete callback + completions.get(0).onComplete(exchange); + + // Verify messageArrivedComplete was called and no exception was handled + verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); + verifyNoInteractions(mockExceptionHandler); + } + + @Test + public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception { + PahoMqtt5Endpoint endpoint = context.getEndpoint( + "paho-mqtt5:test?brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + consumer.setClient(mockClient); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + // No synchronization should be registered when manualAcks is disabled + assertTrue(completions == null || completions.isEmpty()); + } +} diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index 252fce1d61a35..e33d624c77532 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -154,13 +154,15 @@ public void onComplete(Exchange exchange) { try { PahoConsumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); } catch (MqttException e) { - throw new RuntimeException(e); + getExceptionHandler().handleException( + "Error acknowledging MQTT message with ID: " + mqttMessage.getId(), + exchange, e); } } @Override public void onFailure(Exchange exchange) { - LOG.error("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(), + LOG.debug("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(), exchange.getException()); } }); diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoConsumerManualAckExceptionTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoConsumerManualAckExceptionTest.java new file mode 100644 index 0000000000000..82accbb4c7f03 --- /dev/null +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoConsumerManualAckExceptionTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.test.junit6.CamelTestSupport; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class PahoConsumerManualAckExceptionTest extends CamelTestSupport { + + @Test + public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception { + PahoEndpoint endpoint = context.getEndpoint( + "paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION)) + .when(mockClient).messageArrivedComplete(any(int.class), any(int.class)); + consumer.setClient(mockClient); + + ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class); + consumer.setExceptionHandler(mockExceptionHandler); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + mqttMessage.setQos(2); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + assertNotNull(completions); + assertFalse(completions.isEmpty()); + + // Trigger the onComplete callback (simulating successful exchange processing) + completions.get(0).onComplete(exchange); + + // Verify exception handler was called instead of throwing RuntimeException + verify(mockExceptionHandler).handleException( + contains("Error acknowledging MQTT message"), + eq(exchange), + any(MqttException.class)); + } + + @Test + public void testOnCompleteAcknowledgesSuccessfully() throws Exception { + PahoEndpoint endpoint = context.getEndpoint( + "paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + consumer.setClient(mockClient); + + ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class); + consumer.setExceptionHandler(mockExceptionHandler); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + mqttMessage.setQos(2); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + assertNotNull(completions); + assertEquals(1, completions.size()); + + // Trigger the onComplete callback + completions.get(0).onComplete(exchange); + + // Verify messageArrivedComplete was called and no exception was handled + verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); + verifyNoInteractions(mockExceptionHandler); + } + + @Test + public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception { + PahoEndpoint endpoint = context.getEndpoint( + "paho:test?brokerUrl=tcp://localhost:1883", PahoEndpoint.class); + + Processor mockProcessor = mock(Processor.class); + PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor); + + MqttClient mockClient = mock(MqttClient.class); + consumer.setClient(mockClient); + + MqttMessage mqttMessage = new MqttMessage("test".getBytes()); + + Exchange exchange = consumer.createExchange(mqttMessage, "test-topic"); + List completions = exchange.getExchangeExtension().handoverCompletions(); + + // No synchronization should be registered when manualAcks is disabled + assertTrue(completions == null || completions.isEmpty()); + } +} From ec82c09830bc1ad3cd48bb25e7c5322d19cc285b Mon Sep 17 00:00:00 2001 From: Nikita Kokitkar Date: Fri, 3 Apr 2026 16:20:13 -0700 Subject: [PATCH 3/3] Fix: restore error log level in onFailure callback Revert the log level in onFailure from debug back to error, as this callback handles exchange processing failures which should not be silently swallowed at debug level. Addresses review comment from @Croway. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../main/java/org/apache/camel/component/paho/PahoConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index e33d624c77532..2cfcfe1c526bc 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -162,7 +162,7 @@ public void onComplete(Exchange exchange) { @Override public void onFailure(Exchange exchange) { - LOG.debug("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(), + LOG.error("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(), exchange.getException()); } });