-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Decouple mqtt from iotdb-server using ExternalService framework #17042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Weihao Li <18110526956@163.com>
| "org.apache.iotdb.externalservice.Mqtt", | ||
| // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService | ||
| () -> false), | ||
| MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", () -> true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", () -> true), | |
| MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request refactors the MQTT service from a built-in component of iotdb-server to an external service using the ExternalService framework. This change improves modularity by separating MQTT functionality into its own deployable module.
Changes:
- Created new
external-service-impl/mqttmodule with MQTT service implementation - Removed MQTT dependencies from iotdb-core/datanode module
- Updated package structure from
org.apache.iotdb.db.protocol.mqtttoorg.apache.iotdb.mqtt - Added lifecycle management for external services in DataNode
- Updated build and distribution configurations to package MQTT as a separate jar
Reviewed changes
Copilot reviewed 30 out of 31 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| pom.xml | Added external-service-impl module to the reactor build |
| ServiceType.java | Removed MQTT_SERVICE enum value as MQTT is now external |
| ExternalServiceManagementService.java | Enhanced service lifecycle management with directory creation, service restoration, and stop functionality |
| BuiltinExternalServices.java | Updated MQTT and REST service class names for external service framework |
| DataNode.java | Modified initialization to use external service framework and added service cleanup on shutdown |
| iotdb-core/datanode/pom.xml | Removed MQTT-related dependencies (netty-codec-mqtt, moquette-broker) |
| external-service-impl/pom.xml | New parent POM for external service implementations |
| external-service-impl/mqtt/pom.xml | New MQTT module with dependencies and jar-with-dependencies packaging |
| external-service-impl/mqtt/src/main/java/**/*.java | Moved and refactored MQTT implementation classes to new package structure |
| external-service-impl/mqtt/src/test/java/**/*.java | Migrated and cleaned up MQTT test classes |
| distribution/*.xml | Updated assembly descriptors to include MQTT jar in distributions |
| example/mqtt-customize/** | Updated example to reference new MQTT package structure |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClassLoader is no longer being closed, which could lead to a resource leak. The original code used try-with-resources to automatically close the ExternalServiceClassLoader. Removing this will prevent the file handles from being released properly. Consider either reverting to try-with-resources or explicitly closing the classLoader in a finally block.
| try { | |
| ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); | |
| try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot)) { |
| makDir(libRoot); | ||
| } | ||
|
|
||
| private static void makDir(String dir) { |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name 'makDir' is misspelled. It should be 'makeDir' or 'mkDir' to follow standard naming conventions.
| makDir(libRoot); | |
| } | |
| private static void makDir(String dir) { | |
| makeDir(libRoot); | |
| } | |
| private static void makeDir(String dir) { |
| } finally { | ||
| // set STOPPED to avoid the case: service is RUNNING, but its instance is null | ||
| if (serviceInfo.getServiceInstance() == null) { | ||
| serviceInfo.setState(STOPPED); | ||
| } | ||
| } | ||
|
|
||
| serviceInfo.getServiceInstance().start(); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a logic issue where serviceInstance.start() is called outside the try-finally block. If serviceInfo.getServiceInstance() is null (which is explicitly handled in the finally block by setting state to STOPPED), this will throw a NullPointerException when trying to call start() on line 344. The start() call should be inside the try block or checked for null before calling.
| } finally { | |
| // set STOPPED to avoid the case: service is RUNNING, but its instance is null | |
| if (serviceInfo.getServiceInstance() == null) { | |
| serviceInfo.setState(STOPPED); | |
| } | |
| } | |
| serviceInfo.getServiceInstance().start(); | |
| if (serviceInstance != null) { | |
| serviceInstance.start(); | |
| } | |
| } finally { | |
| // set STOPPED to avoid the case: service is RUNNING, but its instance is null | |
| if (serviceInfo.getServiceInstance() == null) { | |
| serviceInfo.setState(STOPPED); | |
| } | |
| } |
| .values() | ||
| .forEach( | ||
| serviceInfo -> { | ||
| // start services with RUNNING state |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says 'start services with RUNNING state' but the method actually stops services. This should be 'stop services with RUNNING state' to accurately describe what the code does.
| // start services with RUNNING state | |
| // stop services with RUNNING state |
| "org.apache.iotdb.externalservice.Mqtt", | ||
| // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService | ||
| () -> false), | ||
| MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", () -> true), |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MQTT service is now hardcoded to always be enabled (supplier returns true), ignoring the previous configuration check for isEnableMQTTService. This changes the behavior from the commented-out line and may cause the MQTT service to start even when users have it disabled in their configuration. The configuration check should be restored or this change should be documented as an intentional behavior change.
| <groupId>com.github.moquette-io.moquette</groupId> | ||
| <artifactId>moquette-broker</artifactId> | ||
| <exclusions> | ||
| <!-- exclude dependencies has included by iotdb-server --> |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar issue in the comment: 'dependencies has included' should be 'dependencies already included' or 'dependencies that have been included'.
| <!-- exclude dependencies has included by iotdb-server --> | |
| <!-- exclude dependencies already included by iotdb-server --> |
| makDir(libRoot); | ||
| } | ||
|
|
||
| private static void makDir(String dir) { |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name 'makDir' is misspelled. It should be 'makeDir' or 'mkDir' to follow standard naming conventions.
| makDir(libRoot); | |
| } | |
| private static void makDir(String dir) { | |
| makeDir(libRoot); | |
| } | |
| private static void makeDir(String dir) { |
| REST( | ||
| "REST", | ||
| "org.apache.iotdb.externalservice.Rest", | ||
| "org.apache.iotdb.mqtt.RestService", |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The REST service class name 'org.apache.iotdb.mqtt.RestService' appears to be incorrectly placed in the mqtt package. REST service should have its own package structure, not be part of the MQTT package. This is misleading and violates package organization principles.
| "org.apache.iotdb.mqtt.RestService", | |
| "org.apache.iotdb.rest.RestService", |
Signed-off-by: Weihao Li <18110526956@163.com>
Signed-off-by: Weihao Li <18110526956@163.com>
Signed-off-by: Weihao Li <18110526956@163.com>
Signed-off-by: Weihao Li <18110526956@163.com>
No description provided.