diff --git a/distribution/pom.xml b/distribution/pom.xml index 38c4602d1135b..025a67b379d66 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -53,6 +53,12 @@ library-udf 2.0.7-SNAPSHOT + + org.apache.iotdb + mqtt + 2.0.7-SNAPSHOT + provided + @@ -74,6 +80,7 @@ src/assembly/confignode.xml src/assembly/cli.xml src/assembly/library-udf.xml + src/assembly/external-service-impl.xml apache-iotdb-${project.version} @@ -107,6 +114,7 @@ apache-iotdb-${project.version}-cli-bin.zip apache-iotdb-${project.version}-confignode-bin.zip apache-iotdb-${project.version}-library-udf-bin.zip + apache-iotdb-${project.version}-external-service-impl-bin.zip diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml index b77f71a32dcd9..255912d75e611 100644 --- a/distribution/src/assembly/all.xml +++ b/distribution/src/assembly/all.xml @@ -92,6 +92,10 @@ conf 0755 + + ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar + lib + common-files.xml diff --git a/distribution/src/assembly/datanode.xml b/distribution/src/assembly/datanode.xml index 016059a903a4f..8994667fb873a 100644 --- a/distribution/src/assembly/datanode.xml +++ b/distribution/src/assembly/datanode.xml @@ -74,6 +74,12 @@ 0755 + + + ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar + lib + + common-files.xml diff --git a/distribution/src/assembly/external-service-impl.xml b/distribution/src/assembly/external-service-impl.xml new file mode 100644 index 0000000000000..273efe6eddfb5 --- /dev/null +++ b/distribution/src/assembly/external-service-impl.xml @@ -0,0 +1,51 @@ + + + + external-service-impl-bin + + dir + zip + + apache-iotdb-${project.version}-external-service-impl-bin + + + ${project.basedir}/../licenses + licenses + + + + + ${project.basedir}/../LICENSE-binary + licenses + LICENSE + + + ${project.basedir}/../NOTICE-binary + licenses + NOTICE + + + ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar + / + + + diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml index 2624571f4e87d..6d0ba2d0953d5 100644 --- a/example/mqtt-customize/pom.xml +++ b/example/mqtt-customize/pom.xml @@ -29,7 +29,11 @@ customize-mqtt-example IoTDB: Example: Customized MQTT - + + org.apache.iotdb + mqtt + ${project.version} + org.apache.iotdb iotdb-server diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java index 8c3a962173985..b16c4928a0b7a 100644 --- a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java +++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java @@ -19,9 +19,9 @@ package org.apache.iotdb.mqtt.server; -import org.apache.iotdb.db.protocol.mqtt.Message; -import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; -import org.apache.iotdb.db.protocol.mqtt.TreeMessage; +import org.apache.iotdb.mqtt.Message; +import org.apache.iotdb.mqtt.PayloadFormatter; +import org.apache.iotdb.mqtt.TreeMessage; import io.netty.buffer.ByteBuf; import org.apache.tsfile.external.commons.lang3.NotImplementedException; diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter similarity index 100% rename from example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter rename to example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter diff --git a/external-service-impl/mqtt/pom.xml b/external-service-impl/mqtt/pom.xml new file mode 100644 index 0000000000000..5ec51069b212e --- /dev/null +++ b/external-service-impl/mqtt/pom.xml @@ -0,0 +1,177 @@ + + + + 4.0.0 + + org.apache.iotdb + external-service-impl + 2.0.7-SNAPSHOT + + mqtt + IoTDB: External-Service-Impl: MQTT + + 8 + 8 + UTF-8 + + + + com.github.moquette-io.moquette + moquette-broker + + + + org.slf4j + slf4j-api + + + io.netty + netty-buffer + + + io.netty + netty-common + + + io.netty + netty-transport + + + io.netty + netty-resolver + + + io.netty + netty-handler + + + io.netty + netty-transport-native-unix-common + + + io.netty + netty-codec + + + io.netty + netty-codec-http + + + + + org.apache.iotdb + iotdb-server + provided + 2.0.7-SNAPSHOT + + + com.google.code.gson + gson + provided + + + io.netty + netty-buffer + provided + + + org.apache.iotdb + iotdb-thrift-commons + 2.0.7-SNAPSHOT + provided + + + org.apache.tsfile + tsfile + ${tsfile.version} + provided + + + org.apache.tsfile + common + ${tsfile.version} + provided + + + com.google.guava + guava + provided + + + org.apache.iotdb + iotdb-thrift + 2.0.7-SNAPSHOT + provided + + + org.apache.iotdb + node-commons + 2.0.7-SNAPSHOT + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.iotdb + service-rpc + 2.0.7-SNAPSHOT + provided + + + + + + maven-assembly-plugin + ${maven.assembly.version} + + + jar-with-dependencies + + + + + make-assembly + + + single + + + package + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + io.netty:netty-codec-mqtt + org.apache.tsfile:common + + + + + + diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java index a05c8264b8225..14647a8582f04 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java index cc857b7295cf1..7a348c850b42d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import com.google.common.collect.Lists; import com.google.gson.Gson; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java index 8b596ee2c89ce..f80c3eb0b5e4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java index 443b50160aa42..b13b1e89934a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java similarity index 84% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java index c6cc3fa47ee10..3b2be5ac3f4f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java @@ -16,14 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.service; -import org.apache.iotdb.commons.service.IService; -import org.apache.iotdb.commons.service.ServiceType; +package org.apache.iotdb.mqtt; + import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; -import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler; +import org.apache.iotdb.externalservice.api.IExternalService; import io.moquette.BrokerConstants; import io.moquette.broker.Server; @@ -40,12 +38,10 @@ import java.util.Properties; /** The IoTDB MQTT Service. */ -public class MQTTService implements IService { +public class MQTTService implements IExternalService { private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class); private final Server server = new Server(); - private MQTTService() {} - @Override public void start() { startup(); @@ -106,20 +102,4 @@ private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) { public void shutdown() { server.stopServer(); } - - @Override - public ServiceType getID() { - return ServiceType.MQTT_SERVICE; - } - - public static MQTTService getInstance() { - return MQTTServiceHolder.INSTANCE; - } - - private static class MQTTServiceHolder { - - private static final MQTTService INSTANCE = new MQTTService(); - - private MQTTServiceHolder() {} - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java index ba31d86976035..9a4311754533f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; /** Generic parsing of messages */ public class Message { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java index c0b48539cd744..7bb051a852692 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.db.conf.IoTDBDescriptor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java index c86648ac16136..672c512d54a21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java similarity index 98% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java index b8aec19da5873..5c8fcc5d708d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java index 9416ea3c83805..07ff8ed0b3d84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter similarity index 87% rename from iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter rename to external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter index 488d6d02d5039..f42aa83ff8678 100644 --- a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter +++ b/external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter @@ -17,5 +17,5 @@ # under the License. # -org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter -org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter +org.apache.iotdb.mqtt.JSONPayloadFormatter +org.apache.iotdb.mqtt.LinePayloadFormatter diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java similarity index 77% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java index 1f66b517e9196..6884a644fedc0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java @@ -15,29 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.utils.EnvironmentUtils; +package org.apache.iotdb.mqtt; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import java.io.IOException; - public class BrokerAuthenticatorTest { - @Before - public void before() { - EnvironmentUtils.envSetUp(); - } - - @After - public void after() throws IOException, StorageEngineException { - EnvironmentUtils.cleanEnv(); - } - @Test public void checkValid() { // In the previous implementation, the datanode will init a root file, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java similarity index 99% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java index deecf607d8162..eab4b42243d1e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; + +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java similarity index 98% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java index 7bf9bce0702d3..16ed807c9ae69 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; + +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java similarity index 84% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java index 096f5d0d90d2e..afbe88cf047a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java @@ -15,20 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; -import org.apache.iotdb.db.utils.EnvironmentUtils; +package org.apache.iotdb.mqtt; -import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertNotNull; public class PayloadFormatManagerTest { - @After - public void tearDown() throws Exception { - EnvironmentUtils.cleanAllDir(); - } @Test(expected = IllegalArgumentException.class) public void getPayloadFormat() { diff --git a/external-service-impl/pom.xml b/external-service-impl/pom.xml new file mode 100644 index 0000000000000..986522e6275d2 --- /dev/null +++ b/external-service-impl/pom.xml @@ -0,0 +1,53 @@ + + + + 4.0.0 + + org.apache.iotdb + iotdb-parent + 2.0.7-SNAPSHOT + + external-service-impl + IoTDB: External-Service-Impl + pom + + mqtt + + + 8 + 8 + UTF-8 + + + + org.apache.iotdb + external-service-api + provided + 2.0.7-SNAPSHOT + + + junit + junit + test + + + diff --git a/integration-test/pom.xml b/integration-test/pom.xml index f566c8e5995b7..c82a63c3f3b15 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -227,6 +227,18 @@ httpcore test + + org.apache.iotdb + external-service-api + 2.0.7-SNAPSHOT + + + org.apache.iotdb + mqtt + 2.0.7-SNAPSHOT + + provided + diff --git a/integration-test/src/assembly/mpp-share.xml b/integration-test/src/assembly/mpp-share.xml index 01fce3555def2..96ac13e64d179 100644 --- a/integration-test/src/assembly/mpp-share.xml +++ b/integration-test/src/assembly/mpp-share.xml @@ -30,4 +30,10 @@ lib + + + ${project.basedir}/../external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar + lib + + diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 72c4acba58615..b55fd8e5c6827 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -212,10 +212,6 @@ org.eclipse.jetty jetty-http - - io.netty - netty-codec-mqtt - org.apache.commons commons-pool2 @@ -267,18 +263,10 @@ org.glassfish.jersey.containers jersey-container-servlet-core - - com.github.moquette-io.moquette - moquette-broker - com.google.code.gson gson - - io.netty - netty-buffer - org.eclipse.jetty jetty-servlet diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 27a5ce3b7b795..d0eef468d4f94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -1190,14 +1190,14 @@ private void getTriggerInformationList(List allTriggerInformation) { private void prepareExternalServiceResources() throws StartupException { long startTime = System.currentTimeMillis(); - if (resourcesInformationHolder.getExternalServiceEntryList() == null - || resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) { - return; - } try { - ExternalServiceManagementService.getInstance() - .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList()); + if (resourcesInformationHolder.getExternalServiceEntryList() != null + && !resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) { + ExternalServiceManagementService.getInstance() + .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList()); + } + ExternalServiceManagementService.getInstance().restoreRunningServiceInstance(); } catch (Exception e) { throw new StartupException(e); @@ -1290,6 +1290,7 @@ public void deleteDataNodeSystemProperties() { public void stop() { stopTriggerRelatedServices(); registerManager.deregisterAll(); + ExternalServiceManagementService.getInstance().stopRunningServices(); JMXService.deregisterMBean(mbeanName); MetricService.getInstance().stop(); if (schemaRegionConsensusStarted) { @@ -1310,9 +1311,6 @@ public void stop() { } private void initProtocols() throws StartupException { - if (config.isEnableMQTTService()) { - registerManager.register(MQTTService.getInstance()); - } if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) { registerManager.register(RestService.getInstance()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java index 7bed3ffe4711d..4dee890e65fd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java @@ -19,17 +19,18 @@ package org.apache.iotdb.db.service.externalservice; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + import java.util.function.Supplier; public enum BuiltinExternalServices { MQTT( "MQTT", - "org.apache.iotdb.externalservice.Mqtt", - // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService - () -> false), + "org.apache.iotdb.mqtt.MQTTService", + IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService), REST( "REST", - "org.apache.iotdb.externalservice.Rest", + "org.apache.iotdb.rest.RestService", // IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService () -> false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java index 71f0df341a687..54cc576c048de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; @@ -65,10 +66,25 @@ public class ExternalServiceManagementService { private static final Logger LOGGER = LoggerFactory.getLogger(ExternalServiceManagementService.class); + public static final String INSTANCE_NULL_ERROR_MSG = + "External Service instance is null when state is RUNNING!"; + private ExternalServiceManagementService(String libRoot) { this.serviceInfos = new HashMap<>(); restoreBuiltInServices(); this.libRoot = libRoot; + makeDir(libRoot); + } + + private static void makeDir(String dir) { + try { + SystemFileFactory.INSTANCE.makeDirIfNecessary(dir); + } catch (IOException e) { + LOGGER.error("Failed to make external service dir", e); + throw new ExternalServiceManagementException( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage())); + } } public Iterator showService(int dataNodeId) @@ -170,7 +186,10 @@ public void startService(String serviceName) throws ClientManagerException, TExc private IExternalService createExternalServiceInstance(String serviceName, String className) { // close ClassLoader automatically to release the file handle - try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) { + try { + // Remind: this classLoader should be closed when service is dropped after user-defined + // service supported + ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); return (IExternalService) Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); } catch (IOException @@ -235,7 +254,7 @@ public void stopService(String serviceName) throws ClientManagerException, TExce private void stopService(ServiceInfo serviceInfo) { checkState( serviceInfo.getServiceInstance() != null, - "External Service instance is null when state is RUNNING!", + INSTANCE_NULL_ERROR_MSG, serviceInfo.getServiceName()); serviceInfo.getServiceInstance().stop(); } @@ -311,11 +330,34 @@ public void restoreRunningServiceInstance() { serviceInfo -> { // start services with RUNNING state if (serviceInfo.getState() == RUNNING) { - IExternalService serviceInstance = - createExternalServiceInstance( - serviceInfo.getServiceName(), serviceInfo.getClassName()); - serviceInfo.setServiceInstance(serviceInstance); - serviceInstance.start(); + + try { + IExternalService serviceInstance = + createExternalServiceInstance( + serviceInfo.getServiceName(), serviceInfo.getClassName()); + checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG); + serviceInfo.setServiceInstance(serviceInstance); + serviceInstance.start(); + } finally { + // set STOPPED to avoid the case: service is RUNNING, but its instance is null + if (serviceInfo.getServiceInstance() == null) { + serviceInfo.setState(STOPPED); + } + } + } + }); + } + + public void stopRunningServices() { + serviceInfos + .values() + .forEach( + serviceInfo -> { + // stop services with RUNNING state + if (serviceInfo.getState() == RUNNING) { + IExternalService serviceInstance = serviceInfo.getServiceInstance(); + checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG); + serviceInstance.stop(); } }); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 7267c79a66551..55fad14e5a10f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -25,7 +25,6 @@ public enum ServiceType { METRIC_SERVICE("Metrics ServerService", "MetricService"), RPC_SERVICE("RPC ServerService", "RPCService"), INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"), - MQTT_SERVICE("MQTTService", "MqttService"), AIR_GAP_SERVICE("AirGapService", "AirGapService"), MONITOR_SERVICE("Monitor ServerService", "Monitor"), STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"), diff --git a/pom.xml b/pom.xml index 6696f783d804b..ab4b25e01034c 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ distribution example library-udf + external-service-impl