diff --git a/LICENSE_COMMERCIAL b/LICENSE_COMMERCIAL
index 9347e9d..bba922f 100644
--- a/LICENSE_COMMERCIAL
+++ b/LICENSE_COMMERCIAL
@@ -1,5 +1,5 @@
COMMERCIAL LICENSE
- 21 June 2020
+ 16 April 2025
Preamble
@@ -20,10 +20,10 @@ program (sum of all instances) per month.
Programs are billed according to the following table
- <100’000messages/month 0EUR/month
- <1’000’000messages/month 50EUR/month
-<10’000’000messages/month 100EUR/month
->10’000’000messages/month 200EUR/month
+ <100’000messages/month 0EUR/month
+ <1’000’000messages/month 500EUR/month
+<10’000’000messages/month 1000EUR/month
+>10’000’000messages/month 2000EUR/month
(plus taxes if applicable)
@@ -38,10 +38,11 @@ of rows copied is 50'000rows per month.
fee is 0EUR according to above table. No contract needs to be signed,
no invoices will be created.
-Example 3: The program is started 20 times to process different Kafka
-topics and partitions at speed, processing 5m records per month in
-total. --> The program is used under this Commercial License and
-100EUR/month are to be invoiced.
+Example 3: The program is started in production 20 times to process
+different Kafka topics and partitions at speed, processing 5m records
+per month in total.
+--> The program is used under this Commercial License and
+1000EUR/month are to be invoiced.
Once an order is placed for a defined volume, rtdi.io GmbH (Austria,
Europe), as the copyright owner, will send invoices on a monthly basis
diff --git a/README.md b/README.md
index 3164f21..650c4ad 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ Docker image here: [dockerhub](https://hub.docker.com/r/rtdi/rulesservice)
## Design Thinking goal
* As a business user I would like to validate the incoming data and cleanse it in realtime
-* Consumers have the choice to read the raw or the cleansed data
+* Hence consumers have the choice to read the raw or the cleansed topic
* Operational dashboards using the rule results provide information about the data quality
* Different types of rules should be supported, validation rules, cleansing rules, data augmentation, standardization rules,...
@@ -30,7 +30,7 @@ Docker image here: [dockerhub](https://hub.docker.com/r/rtdi/rulesservice)
## Requirements
* Payload (value) in Avro Format
-* Apache Kafka connection with the permissions to run as a KStream
+* Apache Kafka connection with the permissions to read input topic and produce the output topic
* Schema Registry connection to read (and write) schema definitions
@@ -45,7 +45,7 @@ Then start the image via docker run. For a quick test this command is sufficient
docker run -d -p 80:8080 --rm --name rulesservice rtdi/rulesservice
-to expose a webserver at port 80 on the host running the container. Make sure to open the web page via the http prefix, as https needs more configuration.
+to expose a webserver at port 80 on the host running the container. Make sure to open the web page via the http prefix, as https needs additional configuration.
For example [http://localhost:80/](http://localhost:80/) might do the trick if the container is hosted on the same computer.
The default login for this startup method is: **rtdi / rtdi!io**
@@ -61,6 +61,8 @@ The better start command is to mount two host directories into the container, th
For more details, especially https and security related, see the [ConnectorRootApp](https://github.com/rtdi/ConnectorRootApp) project, this application is based on.
+Note:
+
if the `settings` or `definitions` directories should point to somewhere else, the `context.xml` of the tomcat webserver can get additional environments in the `` root node:
```
@@ -71,7 +73,7 @@ if the `settings` or `definitions` directories should point to somewhere else, t
### Step 1: Connect to Kafka
-The first step is to connect the application to a Kafka server and the schema registry. In the settings screen the normal Kafka properties file data can be pasted and saved. By default the file location is `/apps/rulesservice/settings/kafka.properties` from the container's point of view.
+The first step is to connect the application to a Kafka server and the schema registry. In the settings screen a regular Kafka properties file can be pasted and saved. By default the file location is `/apps/rulesservice/settings/kafka.properties` from the container's point of view.
@@ -91,11 +93,11 @@ sasl.mechanism=PLAIN
### Step 2: Define Rules
-A rule file applies logic to messages of a given schema/subject. It consumes the input message, applies the rules and creates an output message with a derived schema - a schema that contains the rule results as additional field. What the input and output topic is, will be configured in the next step.
+A rule file applies logic to messages of a given schema/subject. It consumes the input message, applies the rules and creates an output message. What the input and output topic is, will be configured in the next step.
Rule files are saved by default in the directory `/apps/rulesservice/definitions//inactive>/....` and the file name itself can contain sub-directories.
-To simplify entering rules, sample values can be entered and the result be recalculated. A quick way to provide sample values is by reading topics and creating sample files with the content - see below. These files, with messages of the same subject name, can be selected and hence used as input.
+To simplify entering rules, sample values can be entered and the result be recalculated. A quick way to provide sample values is by reading a topic and creating sample files with the content - see below. These files, with messages of the same subject name, can be selected and hence used as sample input.
Once a rule file is complete, it must be copied from the `inactive` to the `active` directory. The button `Activate` does that. The reason for this two staged approach is to allow users saving intermediate definitions without impacting the currently running service.
@@ -105,7 +107,6 @@ Once a rule file is complete, it must be copied from the `inactive` to the `acti
### Step 3: Topics
An input topic can contain messages of different subjects, hence the dialog asks what input topic should cater which rule files (remember, a rule file work on a specific subject) and what the output topic should be.
-Scaling is achieved by increasing the number of KStream instances used for this topic or by spinning up more containers with the same settings.
The screen also allows to copy the rule files being used into the active folder to simplify activating each from the rule file dialog.
@@ -113,14 +114,8 @@ The screen also allows to copy the rule files being used into the active folder
### Result
-If the input schema has an `_audit` field, it is assumed the schema contains the structure for the rule results already. This would be a preferred case, because input schema = output schema.
-In all other cases the input schema's latest version is read from the schema registry and the additional `_audit` structure is being created. This will be the output schema.
-
-The reason for using the latest is because of schema evolution scenarios. It might happen that schema id 2 has an additional field `NAME` compared to schema id 1. So the subject evolved from schema 1 to 2. The KStream does receive a message with schema 2 first, adds the `_audit` field and it is saved in the schema registry. The next message has an input schema 1 and if that would get registered as output schema next, it would fail due to the missing `NAME` field. Hence both must be outputted with the latest schema version always. This also explains why adding the `_audit` on the original input schema already is preferred.
-The overall rule result is stored (did it pass all tests?), a list of all rules executed and their individual results.
-Querying this data allows detailed reporting which records were processed by what rule and the results.
-
-The exact Avro schema field definition can be found [here](docs/audit-schema.md)
+If the input schema has an `_audit` field, it is assumed the schema contains the structure for the rule results already. This is the preferred case, because input schema = output schema.
+In all other cases the input schema's latest version is read from the schema registry and the additional `_audit` structure is being added. This will be the output schema. But this should be considered a fallback option only. If possible the `_audit` field should be added to the input schema already. The exact Avro schema field definition can be found [here](docs/audit-schema.md)
@@ -143,30 +138,37 @@ If no file name is specified, the name will be `partition__offset_ 10`.
-Formulas can also access other fields as well, e.g. for the field `lastname` the condition is `lastname != null || firstname != null` meaning, either lastname or firstname must be set.
+Formulas can also access other fields as well, e.g. for the field `lastname` the condition is `lastname != null || firstname != null` meaning, either `lastname` or `firstname` must be set.
-To access fields of a deeper level, e.g. when lastname is set, the first record in the addresses array must have addresstype HOME, many cases must be considered. What if the addresses array is empty? What if the addresses array is an empty array? What if the addresstype field is null for some records? This would lead to a very long condition but there are shortcuts: `lastname != null && 'HOME' == addresses?[0]?addresstype`. The `?` tells that it is okay if the field is null and it should return null then.
+To access fields of a deeper level, e.g. when `lastname` is set, the first record in the `addresses` array must have `addresstype = HOME`, dot and array operators are used.
+The formula might look like: `lastname != null && 'HOME' == addresses[0].addresstype`
-Formulas at a deeper level, e.g. a formula for `addresstype`, can access fields from the higher level via `parent`. Example: `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype` tells that for addresstype HOME the lastname must be set as well, for all other addresstype values there is no such requirement.
+But this formula might get null pointer exceptions. If `addresses` is null, accessing its element at index 0 will cause an error. The normal approach would be to add `== null?` everywhere. The easier method is to make the accessor optional via the `?` modifier.
+
+Example: `lastname != null && 'HOME' == addresses?[0]?.addresstype`. The `?` tells it is okay if the field is null and the accessor should return `null` then.
+
+Formulas at a deeper level, e.g. a formula for `addresstype`, can access fields from the higher level via `parent`. Example: `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype` tells that for `addresstype` HOME the `lastname` must be set as well, for all other addresstype values there is no such requirement.
Note: In this example the customer record has an addresses array of address record. When within an address record, the parent is the array and its parent is the customer record with the lastname field.
These are just very basic examples, more below.
-If a condition returns false, maybe the correct value can be derived and then the rule as such has not been violated. The optional formula entered in `..change value to...` is executed only if the condition returns false and it overwrites the field value. Example: The rule for the field `addresstype` is `addresstype.upper() != addresstype` and the change-value formula is `addresstype.upper()`. This does change all values to upper case. In such a case the rule is considered to have been passed instead of failed, and that is accomplished via the `if test failed...` setting `=Pass`.
+If a condition returns false, maybe the correct value can be derived and then the rule as such has not been violated. The optional formula entered in `..change value to...` is executed only if the condition returns false and it overwrites the field value.
+
+Example: The rule for the field `addresstype` is `addresstype.upper() != addresstype` and the change-value formula is `addresstype.upper()`. This does change all values to upper case. In such a case the rule is considered to have been passed instead of failed, and that is accomplished via the `if test failed...` setting `=Pass`.
Each test and its rule result is available in the audit structure and hence we can see that this test was executed and passed.
### Rule Sets
-In above examples there was the test `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype`. A more realistic formula would say: if HOME then the lastname must be set, if COMPANY the companyname field must be set and other allowed values are SHIPPING and BILLING. This would get quite a long formula.
+In above examples there was the test `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype`. A more realistic formula would say: if HOME then the lastname must be set, if COMPANY the companyname field must be set and other allowed values are SHIPPING and BILLING. This would bwcome quite a long formula.
To simplify that, the Test-Set rule allows to specify multiple individual rules
@@ -198,7 +200,7 @@ Each record also has a `(more)` node to enter rules that do not belong to a sing
Another typical scenario is to standardize the values first, e.g. gender should be `M`, `F`, `X`, `?` only and then create rules based on the standardized values. In other words, rules build on each other. To enable that, the rule file consists of multiple tabs - the rule steps - and each tab is executed one after the other.
-### Rule syntax
+### Complete rule syntax
For more examples [see](docs/rule-syntax.md)
@@ -206,6 +208,17 @@ For more examples [see](docs/rule-syntax.md)
### FAQs
* Can a new output column be created via a formula? No, the output schema is always derived from the input schema, for two reasons. First, if adding fields would be possible, it might collide when the input subject is evolved to a new version. The other reason is performance. It would require to create a new output message from scratch, copying the majority of the data even if nothing has changed. That would be too expensive. So the only option is to add the column to the input schema first.
+
+ * How does the solution scale? The straight forward implementation would have been as KStream. A message is received, parsed, rules applied, serialized back to Avro and sent to the output topic. The typical processing time is 100ms. That is not bad but on the other hand, as all is done sequentially in a KStream, it would mean only 10 messages/sec can be processed per partition. Therefore the received messages are put into a pool for being processed in parallel and put into the output topic in the same order as received. This ensures all CPUs can be utilized when processing one partition. If multiple partitions should be processed in parallel, the instance count can be set to a value greater than one. And as usual with Kafka, deploying multiple containers allows the parallel processing of partitions across multiple servers.
+
+ * Is the order preserved? Yes, see above. Messages are processed in parallel, but produced in the order they were received. And if the partition count in the source is the same as in the target topic, the data will be put in the same partition id as it was read from.
+
+ * If the input schema has no `__audit` structure, which is required for the rule results, the field is added. But not to the input schema version but the latest subject version. Why is that? The reason for using the latest is because of schema evolution scenarios. It might happen that schema id 2 has an additional field `NAME` compared to schema id 1. So the subject evolved from version 1 to 2. The consumer does receive a message with schema 2 first, adds the `_audit` field and it is saved in the schema registry. The next message has an input schema 1 and if that would get registered as output schema next, it would fail due to the missing `NAME` field. Hence both must be outputted with the latest schema version always. This also explains why adding the `_audit` on the original input schema already is preferred.
+
+ * How to debug? The log level can be set by adding `ruleservice.loglevel=DEBUG` to the Kafka properties file.
+
+ * The average processing time is 1 second, the rows per second is 10. How can that be? Parallel processing. The processing time is per message, even if 10 messages are transformed in parallel.
+
diff --git a/WebContent/Controller.controller.js b/WebContent/Controller.controller.js
index a18ea70..7819dd7 100644
--- a/WebContent/Controller.controller.js
+++ b/WebContent/Controller.controller.js
@@ -4,6 +4,23 @@ sap.ui.define([ "sap/ui/core/mvc/Controller"], function(Controller) {
return Controller.extend("io.rtdi.bigdata.rulesservice.Controller", {
onInit : function() {
+ var oModel = new sap.ui.model.json.JSONModel();
+ var statusbox = this.getView().byId("systemstatus");
+ oModel.attachRequestCompleted(function(event) {
+ if (event.getParameter("success")) {
+ statusbox.setVisible(true);
+ }
+ });
+ oModel.attachRequestFailed(function(event) {
+ var text = event.getParameter("responseText");
+ sap.m.MessageToast.show("Reading status info failed: " + text);
+ });
+ this.getView().setModel(oModel);
+ oModel.loadData("./rest/config/service");
+ },
+ onRefresh : function(event) {
+ var model = this.getView().getModel();
+ model.loadData("./rest/config/service");
}
});
diff --git a/WebContent/View.view.xml b/WebContent/View.view.xml
index 3156da4..7d299a5 100644
--- a/WebContent/View.view.xml
+++ b/WebContent/View.view.xml
@@ -13,9 +13,11 @@
+
+
-
+
-
-
+
+
diff --git a/WebContent/ui5/Config.controller.js b/WebContent/ui5/Config.controller.js
index 821f63d..0234bc2 100644
--- a/WebContent/ui5/Config.controller.js
+++ b/WebContent/ui5/Config.controller.js
@@ -3,12 +3,18 @@ function(Controller) {"use strict";
return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Config", {
onInit : function() {
var model = new sap.ui.model.json.JSONModel();
+ var statusbox = this.getView().byId("systemstatus");
+ model.attachRequestCompleted(function(event) {
+ if (event.getParameter("success")) {
+ statusbox.setVisible(true);
+ }
+ });
model.attachRequestFailed(function(event) {
var text = event.getParameter("responseText");
sap.m.MessageToast.show("Reading config failed: " + text);
});
- model.loadData("../rest/config");
this.getView().setModel(model);
+ model.loadData("../rest/config");
},
onSave : function(event) {
var model = this.getView().getModel();
@@ -17,8 +23,11 @@ return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Config", {
var text = event.getParameter("responseText");
sap.m.MessageToast.show("Save failed: " + text);
});
- post.attachRequestCompleted(function() {
+ post.attachRequestCompleted(function(event) {
console.log(post.getProperty("/"));
+ if (event.getParameter("success")) {
+ sap.m.MessageToast.show("Saved");
+ }
});
var json = JSON.stringify(model.getProperty("/"));
var headers = {
@@ -27,6 +36,10 @@ return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Config", {
post.loadData("../rest/config", json, true, "POST", false, true, headers);
model.loadData("../rest/config");
},
+ onRefresh : function(event) {
+ var model = this.getView().getModel();
+ model.loadData("../rest/config");
+ }
});
});
diff --git a/WebContent/ui5/Config.view.xml b/WebContent/ui5/Config.view.xml
index d66e85b..fbb031a 100644
--- a/WebContent/ui5/Config.view.xml
+++ b/WebContent/ui5/Config.view.xml
@@ -6,13 +6,14 @@
+
-
+
@@ -137,9 +137,8 @@
@@ -164,7 +163,7 @@
-
+
diff --git a/WebContent/ui5/Rules.controller.js b/WebContent/ui5/Rules.controller.js
index d1a8b1d..11051b8 100644
--- a/WebContent/ui5/Rules.controller.js
+++ b/WebContent/ui5/Rules.controller.js
@@ -46,6 +46,10 @@ return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Rules", {
var ruleurl = url.replace("Rules.html", "Rule.html?subject=" + encodeURI(subjectname) + rulelink);
window.open(ruleurl, "_blank");
+ },
+ onRefresh : function(event) {
+ var model = this.getView().getModel();
+ model.loadData("../rest/subjects");
}
});
});
diff --git a/WebContent/ui5/Rules.view.xml b/WebContent/ui5/Rules.view.xml
index 8b58cb1..13ba70b 100644
--- a/WebContent/ui5/Rules.view.xml
+++ b/WebContent/ui5/Rules.view.xml
@@ -5,6 +5,8 @@
+
+
diff --git a/WebContent/ui5/Status.controller.js b/WebContent/ui5/Status.controller.js
index 750bb4c..b3f27a3 100644
--- a/WebContent/ui5/Status.controller.js
+++ b/WebContent/ui5/Status.controller.js
@@ -5,6 +5,39 @@ return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Status", {
var oModel = new sap.ui.model.json.JSONModel();
oModel.loadData("../rest/config/service");
this.getView().setModel(oModel);
+ },
+ onRefresh : function(event) {
+ var model = this.getView().getModel();
+ model.loadData("../rest/config/service");
+ },
+ onStartStop : function(event) {
+ var model = this.getView().getModel();
+ var source = event.getSource();
+ var path = source.getBindingContext().getPath();
+ var servicename = model.getProperty(path + "/topicname"); // path.substring(path.lastIndexOf("/") + 1);
+ var status = model.getProperty(path + "/status"); // path.substring(path.lastIndexOf("/") - 1, path.lastIndexOf("/"));
+ var newstatustext = status === true ? "stopping" : "starting";
+ var post = new sap.ui.model.json.JSONModel();
+ post.attachRequestFailed(function(event) {
+ var text = event.getParameter("responseText");
+ sap.m.MessageToast.show(newstatustext + " failed: " + text);
+ });
+ post.attachRequestCompleted(function(event) {
+ console.log(post.getProperty("/"));
+ if (event.getParameter("success")) {
+ sap.m.MessageToast.show(newstatustext + " completed");
+ }
+ model.loadData("../rest/config/service");
+ });
+ var json = JSON.stringify(model.getProperty("/"));
+ var headers = {
+ "Content-Type": "application/json;charset=utf-8"
+ }
+ if (status === true) {
+ post.loadData("../rest/topics/stop/" + servicename, json, true, "POST", false, true, headers);
+ } else {
+ post.loadData("../rest/topics/start/" + servicename, json, true, "POST", false, true, headers);
+ }
}
});
});
diff --git a/WebContent/ui5/Status.view.xml b/WebContent/ui5/Status.view.xml
index 43c16ef..369c9b9 100644
--- a/WebContent/ui5/Status.view.xml
+++ b/WebContent/ui5/Status.view.xml
@@ -6,6 +6,7 @@
+
@@ -25,7 +26,7 @@
items="{/topicstatus}"
width="100%"
height="100%">
-
+
+
+
+
diff --git a/WebContent/ui5/Topics.controller.js b/WebContent/ui5/Topics.controller.js
index 4179b8c..dd3d44e 100644
--- a/WebContent/ui5/Topics.controller.js
+++ b/WebContent/ui5/Topics.controller.js
@@ -19,17 +19,34 @@ return Controller.extend("io.rtdi.bigdata.rulesservice.ui5.Topics", {
},
onSave : function(event) {
var model = this.getView().getModel();
+ var post = new sap.ui.model.json.JSONModel();
+ post.attachRequestFailed(function(event) {
+ var text = event.getParameter("responseText");
+ sap.m.MessageToast.show("Save failed: " + text);
+ });
+ post.attachRequestCompleted(function(event) {
+ console.log(post.getProperty("/"));
+ if (event.getParameter("success")) {
+ sap.m.MessageToast.show("Saved");
+ }
+ });
var json = JSON.stringify(model.getProperty("/"));
var headers = {
"Content-Type": "application/json;charset=utf-8"
}
- model.loadData("../rest/topicrules", json, true, "POST", false, true, headers);
+ post.loadData("../rest/topicrules", json, true, "POST", false, true, headers);
},
onChange: function(event) {
var model = this.getView().getModel();
var context = event.getSource().getBindingContext();
var path = context.getPath();
model.setProperty(path + "/modified", true);
+ },
+ onRefresh : function(event) {
+ var model = this.getView().getModel();
+ model.loadData("../rest/topicrules");
+ var model2 = this.getView().getModel("rulegroups");
+ model2.loadData("../rest/rules");
}
});
diff --git a/WebContent/ui5/Topics.view.xml b/WebContent/ui5/Topics.view.xml
index 141ee5e..fa78af9 100644
--- a/WebContent/ui5/Topics.view.xml
+++ b/WebContent/ui5/Topics.view.xml
@@ -6,6 +6,7 @@
+
diff --git a/WebContent/ui5/lib/FocusInput.js b/WebContent/ui5/lib/FocusInput.js
index 7f173d8..d6d55dd 100644
--- a/WebContent/ui5/lib/FocusInput.js
+++ b/WebContent/ui5/lib/FocusInput.js
@@ -9,12 +9,16 @@ sap.ui.define([
aggregations : {
},
events : {
- "focusIn" : {"value": {type: "string"}}
+ "focusIn" : {"value": {type: "string"}},
+ "focusOut" : {"value": {type: "string"}},
}
},
renderer : {},
onfocusin : function() {
this.fireFocusIn({ "value" : this.getValue()});
},
+ onfocusout : function() {
+ this.fireFocusOut({ "value" : this.getValue()});
+ },
});
});
diff --git a/docs/audit-schema.md b/docs/audit-schema.md
index d81ad15..7e868d1 100644
--- a/docs/audit-schema.md
+++ b/docs/audit-schema.md
@@ -2,11 +2,11 @@ This is the `__audit` field definition within the Avro schema.
```
{
+ "name": "__audit",
"__internal": true,
"__originalname": "__audit",
"default": null,
"doc": "If data is transformed this information is recorded here",
- "name": "__audit",
"type": [
"null",
{
diff --git a/pom.xml b/pom.xml
index 345120b..fad6a52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,12 +48,6 @@
5.11.2test
-
- org.apache.kafka
- kafka-streams-test-utils
- ${kafka.version}
- test
- org.glassfish.jersey.containersjersey-container-servlet
@@ -105,11 +99,6 @@
kafka-clients${kafka.version}
-
- org.apache.kafka
- kafka-streams
- ${kafka.version}
- io.confluentkafka-schema-registry-client
@@ -131,9 +120,9 @@
3.1.8
- io.rtdi.bigdata.kafka
- kafkaavro
- 0.10.15
+ io.rtdi.bigdata.kafka
+ kafkaavro
+ 0.10.17org.apache.logging.log4j
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/LoggingUtil.java b/src/main/java/io/rtdi/bigdata/rulesservice/LoggingUtil.java
index dff36a4..644d867 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/LoggingUtil.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/LoggingUtil.java
@@ -1,5 +1,6 @@
package io.rtdi.bigdata.rulesservice;
+import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import io.rtdi.bigdata.rulesservice.rest.ErrorResponse;
@@ -8,54 +9,55 @@
import jakarta.ws.rs.core.Response.Status;
public class LoggingUtil {
+ public static final Logger LOGGER = LogManager.getLogger("io.rtdi.bigdata.rulesservice");
- public static void logRequestBegin(Logger log, HttpServletRequest request) {
- log.info("Execution started: <{}> called by <{}>",
+ public static void logRequestBegin(HttpServletRequest request) {
+ LOGGER.info("Execution started: <{}> called by <{}>",
request != null ? request.getRequestURI() : "?",
request != null && request.getUserPrincipal() != null ? request.getUserPrincipal().getName() : "unknown");
}
- public static void logRequestEnd(Logger log, HttpServletRequest request) {
- log.info("Execution completed: <{}> called by <{}>",
+ public static void logRequestEnd(HttpServletRequest request) {
+ LOGGER.info("Execution completed: <{}> called by <{}>",
request != null ? request.getRequestURI() : "?",
request != null && request.getUserPrincipal() != null ? request.getUserPrincipal().getName() : "unknown");
}
- public static Response requestEnd(Logger log, HttpServletRequest request, Object payload) {
+ public static Response requestEnd(HttpServletRequest request, Object payload) {
Response response = Response.ok(payload).build();
- logRequestEnd(log, request);
+ logRequestEnd(request);
return response;
}
- public static Response requestEndTechnicalError(Logger log, HttpServletRequest request, Exception e) {
+ public static Response requestEndTechnicalError(HttpServletRequest request, Exception e) {
Response response = ErrorResponse.createErrorResponse(e);
- log.error("Exception", e);
- logRequestEnd(log, request);
+ LOGGER.error("Exception", e);
+ logRequestEnd(request);
return response;
}
- public static Response requestEndInputError(Logger log, HttpServletRequest request, Exception e) {
+ public static Response requestEndInputError(HttpServletRequest request, Exception e) {
Response response = ErrorResponse.createErrorResponse(e, Status.BAD_REQUEST);
- log.warn("Exception", e);
- logRequestEnd(log, request);
+ LOGGER.warn("Exception", e);
+ logRequestEnd(request);
return response;
}
- public static Response requestEndInputError(Logger log, HttpServletRequest request, Exception e, Status status) {
+ public static Response requestEndInputError(HttpServletRequest request, Exception e, Status status) {
Response response = ErrorResponse.createErrorResponse(e, status);
- log.warn("Exception", e);
- logRequestEnd(log, request);
+ LOGGER.warn("Exception", e);
+ logRequestEnd(request);
return response;
}
- public static void logRequestEndTechnicalError(Logger log, HttpServletRequest request, Exception e) {
- log.error("Exception", e);
- logRequestEnd(log, request);
+ public static void logRequestEndTechnicalError(HttpServletRequest request, Exception e) {
+ LOGGER.error("Exception", e);
+ logRequestEnd(request);
}
- public static void logRequestEndInputError(Logger log, HttpServletRequest request, Exception e) {
- log.warn("Exception", e);
- logRequestEnd(log, request);
+ public static void logRequestEndInputError(HttpServletRequest request, Exception e) {
+ LOGGER.warn("Exception", e);
+ logRequestEnd(request);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileKStream.java b/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileKStream.java
deleted file mode 100644
index dc679f6..0000000
--- a/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileKStream.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package io.rtdi.bigdata.rulesservice;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.avro.Schema;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
-import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
-import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.rtdi.bigdata.kafka.avro.AvroDeserializer;
-import io.rtdi.bigdata.kafka.avro.AvroSerializer;
-import io.rtdi.bigdata.kafka.avro.recordbuilders.SchemaBuilder;
-import io.rtdi.bigdata.kafka.avro.recordbuilders.ValueSchema;
-import io.rtdi.bigdata.rulesservice.config.RuleFileDefinition;
-import io.rtdi.bigdata.rulesservice.jexl.JexlAvroDeserializer;
-import io.rtdi.bigdata.rulesservice.jexl.JexlRecord;
-
-public class RuleFileKStream extends Thread {
-
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
- private Map ruledefinitions = new HashMap<>();
- private Map> inoutschema = new HashMap<>();
- private RulesService service;
- private String inputtopicname;
- private String outputtopicname;
- private AtomicLong rowsprocessed = new AtomicLong();
- private volatile long lastprocessedtimestamp = 0L;
- private AvroSchemaProvider provider = new AvroSchemaProvider();
- private Exception error;
- private Integer instances;
- private List rulefiles;
- private AtomicLong processingtime = new AtomicLong();
-
-
- public RuleFileKStream(List rulefiles, RulesService service, String inputtopicname, String outputtopicname, Integer instances) throws IOException {
- this.service = service;
- this.inputtopicname = inputtopicname;
- this.outputtopicname = outputtopicname;
- if (instances == null) {
- this.instances = 1;
- } else {
- this.instances = instances;
- }
- this.rulefiles = rulefiles;
- for (String location : rulefiles) {
- int pos = location.indexOf('/');
- if (pos <= 0) {
- throw new IOException("Rulefile path does not follow the convention /");
- }
- String subjectname = location.substring(0, pos);
- String path = location.substring(pos+1);
- RuleFileDefinition rule = RuleFileDefinition.load(service.getRuleFileRootDir(), subjectname, Path.of(path), true);
- if (rule != null) {
- this.ruledefinitions.put(rule.schema().getFullName(), rule);
- }
- }
- }
-
- @Override
- public void run() {
- if (ruledefinitions.size() == 0) {
- this.error = new IOException("Stream has no active rule files");
- } else {
- StreamsBuilder builder = new StreamsBuilder();
- KStream transformation = builder.stream(
- inputtopicname,
- Consumed.with(Serdes.Bytes(), Serdes.Bytes()));
- KStream result = transformation.mapValues(value -> mapper(value));
- result.to(outputtopicname, Produced.with(Serdes.Bytes(), Serdes.Bytes()));
- HashMap map = new HashMap<>(service.getKafkaProperties());
- map.put(StreamsConfig.APPLICATION_ID_CONFIG, "rulesservice_" + inputtopicname);
- map.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, String.valueOf(instances));
- StreamsConfig streamsConfig = new StreamsConfig(map);
- try (KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);) {
- streams.start();
- try {
- while (streams.state().isRunningOrRebalancing()) {
- Thread.sleep(Duration.ofMinutes(1));
- }
- } catch (InterruptedException e) {
- // exit loop and stop thread
- }
- } catch (Exception e) {
- this.error = e;
- }
- }
- }
-
- private Bytes mapper(Bytes value) {
- long startts = System.currentTimeMillis();
- byte[] data = value.get();
-
- int schemaid;
- try {
- schemaid = AvroDeserializer.getSchemaId(data);
- } catch (IOException e) {
- log.error("This input is not a valid Avro record with a magic byte", e);
- throw new KafkaException("This input is not a valid Avro record with a magic byte", e);
- }
-
- ParsedSchema writerparsedschema;
- try {
- writerparsedschema = service.getSchemaclient().getSchemaById(schemaid);
- } catch (IOException | RestClientException e) {
- log.error("Exception when reading the schema <{}> from the schema registry by id", schemaid, e);
- throw new KafkaException("Exception when reading the schema <" + schemaid + "> from the schema registry by id", e);
- }
- Schema writerschema = (Schema) writerparsedschema.rawSchema();
-
- RuleFileDefinition ruledefinition = ruledefinitions.get(writerschema.getFullName());
- if (ruledefinition == null) {
- /*
- * No transformations for that schema, hence a pass-through
- */
- return value;
- } else {
- JexlRecord inputrecord;
- if (writerschema.getField(ValueSchema.AUDIT) == null) {
- synchronized (this) {
- Entry schemadata = inoutschema.get(schemaid);
- if (schemadata == null) {
- // if no, derive the new schema
- SchemaBuilder outputschemabuilder = new SchemaBuilder(writerschema.getName() + "_CLEANSED", writerschema, false);
- ValueSchema.addAuditField(outputschemabuilder);
- outputschemabuilder.build();
- Schema outputschema = outputschemabuilder.getSchema();
- io.confluent.kafka.schemaregistry.client.rest.entities.Schema schemaentity = new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
- ruledefinition.getOutputsubjectname(), 0, 0, new SchemaString(outputschema.toString()));
- ParsedSchema parsed = provider.parseSchemaOrElseThrow(schemaentity , false, false);
- // register the schema - if already known the API call will not create it and just return the schema id
- int outputschemaid;
- try {
- outputschemaid = service.getSchemaclient().register(ruledefinition.getOutputsubjectname(), parsed);
- } catch (IOException | RestClientException e) {
- log.error("Exception when registering the schema for subject name <{}>\nschema: {}", ruledefinition.getOutputsubjectname(), outputschema.toString(true), e);
- throw new KafkaException("Exception when writing the schema for subject name <" + ruledefinition.getOutputsubjectname() + ">", e);
- }
- schemadata = new SimpleEntry<>(outputschemaid, outputschema);
- inoutschema.put(schemaid, schemadata);
- schemaid = outputschemaid;
- } else {
- schemaid = schemadata.getKey();
- }
- try {
- inputrecord = JexlAvroDeserializer.deserialize(data, writerschema, schemadata.getValue());
- } catch (IOException e) {
- log.error("Exception when deserializing the byte[] into an Avro record\ndata: {}\ninputschema: {}\noutputschema: {}", data, writerschema.toString(true), writerschema.toString(true), e);
- throw new KafkaException("Exception when deserializing the byte[] into an Avro record", e);
- }
- }
- } else {
- /*
- * The input has an AUDIT structure already, deserialize it into a JexRecord, apply the rules and serialize it into the same schema
- */
- try {
- inputrecord = JexlAvroDeserializer.deserialize(data, writerschema);
- } catch (IOException e) {
- log.error("Exception when deserializing the byte[] into an Avro record\ndata: {}\ninputschema: {}\noutputschema: {}", data, writerschema.toString(true), writerschema.toString(true), e);
- throw new KafkaException("Exception when deserializing the byte[] into an Avro record", e);
- }
- }
-
- try {
- ruledefinition.apply(inputrecord, false);
- } catch (IOException e) {
- log.error("Exception when applying the rules to the record\\nrulefile: {}\nrecord: {}", ruledefinition.getName(), inputrecord, e);
- throw new KafkaException("Exception when applying the rules to the record", e);
- }
-
- byte[] out;
- try {
- out = AvroSerializer.serialize(schemaid, inputrecord);
- rowsprocessed.incrementAndGet();
- lastprocessedtimestamp = System.currentTimeMillis();
- processingtime.addAndGet(lastprocessedtimestamp - startts);
- return Bytes.wrap(out);
- } catch (IOException e) {
- log.error("Exception when serializing the Avro record into a byte[]", e);
- throw new KafkaException("Exception when serializing the Avro record into a byte[]", e);
- }
- }
- }
-
- public long getRowsprocessed() {
- return rowsprocessed.get();
- }
-
- public long getLastprocessedtimestamp() {
- return lastprocessedtimestamp;
- }
-
- public Map getRulefiledefinitions() {
- return ruledefinitions;
- }
-
- public String getInputtopicname() {
- return inputtopicname;
- }
-
- public String getOutputtopicname() {
- return outputtopicname;
- }
-
- public Exception getError() {
- return error;
- }
-
- public int getInstances() {
- return instances;
- }
-
- public List getRulefiles() {
- return rulefiles;
- }
-
- public Long getAvgProcessingtime() {
- if (rowsprocessed.get() != 0) {
- return processingtime.get() / rowsprocessed.get();
- } else {
- return null;
- }
- }
-
-}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileTransformer.java b/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileTransformer.java
new file mode 100644
index 0000000..ba2d31d
--- /dev/null
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/RuleFileTransformer.java
@@ -0,0 +1,454 @@
+package io.rtdi.bigdata.rulesservice;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.avro.Schema;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.rtdi.bigdata.kafka.avro.AvroDeserializer;
+import io.rtdi.bigdata.kafka.avro.AvroSerializer;
+import io.rtdi.bigdata.kafka.avro.recordbuilders.SchemaBuilder;
+import io.rtdi.bigdata.kafka.avro.recordbuilders.ValueSchema;
+import io.rtdi.bigdata.rulesservice.config.RuleFileDefinition;
+import io.rtdi.bigdata.rulesservice.jexl.JexlAvroDeserializer;
+import io.rtdi.bigdata.rulesservice.jexl.JexlRecord;
+
+/**
+ * The RuleFileTransformer is a pipeline with
+ * Thread(consumer --> Executor(apply rules) --> Future --> queue) --> queue reader --> producer
+ */
+public class RuleFileTransformer extends Thread {
+
+ private static final int MAX_QUEUE_SIZE = 1000;
+
+ private Map ruledefinitions = new HashMap<>();
+ private Map> inoutschema = new HashMap<>();
+ private RulesService service;
+ private String inputtopicname;
+ private String outputtopicname;
+ private AtomicLong rowsprocessed = new AtomicLong();
+ private volatile long lastprocessedtimestamp = 0L;
+ private AvroSchemaProvider provider = new AvroSchemaProvider();
+ private Exception error;
+ private Integer instances;
+ private List rulefiles;
+ private AtomicLong processingtime = new AtomicLong();
+ private ArrayBlockingQueue> queue;
+ private Long starttime = null;
+ private int numbercores;
+
+
+ public RuleFileTransformer(List rulefiles, RulesService service, String inputtopicname, String outputtopicname, Integer instances) throws IOException {
+ this.service = service;
+ this.inputtopicname = inputtopicname;
+ this.outputtopicname = outputtopicname;
+ if (instances == null) {
+ this.instances = 1;
+ } else {
+ this.instances = instances;
+ }
+ this.rulefiles = rulefiles;
+ for (String location : rulefiles) {
+ int pos = location.indexOf('/');
+ if (pos <= 0) {
+ throw new IOException("Rulefile path does not follow the convention /");
+ }
+ String subjectname = location.substring(0, pos);
+ String path = location.substring(pos+1);
+ RuleFileDefinition rule = RuleFileDefinition.load(service.getRuleFileRootDir(), subjectname, Path.of(path), true);
+ if (rule != null) {
+ this.ruledefinitions.put(rule.schema().getFullName(), rule);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ if (ruledefinitions.size() == 0) {
+ this.error = new IOException("Stream has no active rule files");
+ } else {
+ starttime = System.currentTimeMillis();
+ Thread reader = null;
+ try {
+ Properties consumerProperties = new Properties();
+ consumerProperties.putAll(service.getKafkaProperties());
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "rulesservice_" + inputtopicname);
+ consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+ consumerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
+ consumerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
+
+
+ try (KafkaProducer producer = new KafkaProducer<>(consumerProperties);
+ ) {
+ queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
+ try {
+ KafkaReader kafkareader = new KafkaReader(queue, consumerProperties);
+ reader = new Thread(kafkareader);
+ reader.setName("Kafka Reader for " + inputtopicname);
+ reader.start();
+ int rowssent = 0;
+ int producerpartitioncount = producer.partitionsFor(outputtopicname).size();
+ long nextcommit = System.currentTimeMillis() + 60000L;
+ Map offsets = new HashMap<>();
+ while (reader.isAlive()) {
+ while (!queue.isEmpty()) {
+ LoggingUtil.LOGGER.debug("Taking a row from the transformed queue");
+ Future future = queue.take();
+ Transformer transformer = future.get();
+ if (transformer.out != null) {
+ LoggingUtil.LOGGER.debug("Got record with offset {} of topic_partition {}-{}", transformer.offset, transformer.topic, transformer.partition);
+ /*
+ * Input partition should be preserved in case both have the same number of partitions
+ */
+ producer.send(new ProducerRecord(outputtopicname, producerpartitioncount == kafkareader.partitioncount ? transformer.partition : null, transformer.key, transformer.out));
+ offsets.put(new TopicPartition(transformer.topic, transformer.partition), new OffsetAndMetadata(transformer.offset+1L));
+ rowsprocessed.incrementAndGet();
+ lastprocessedtimestamp = System.currentTimeMillis();
+ processingtime.addAndGet(transformer.endts - transformer.startts);
+ if (rowssent > 10000 || System.currentTimeMillis() > nextcommit) {
+ LoggingUtil.LOGGER.debug("Committing consumer records");
+ producer.flush();
+ kafkareader.commit(offsets);
+ rowssent = 0;
+ nextcommit = System.currentTimeMillis() + 60000L;
+ offsets = new HashMap<>();
+ }
+ }
+ }
+ if (System.currentTimeMillis() > nextcommit && offsets.size() > 0) {
+ LoggingUtil.LOGGER.debug("Committing consumer records");
+ producer.flush();
+ kafkareader.commit(offsets);
+ rowssent = 0;
+ nextcommit = System.currentTimeMillis() + 60000L;
+ offsets = new HashMap<>();
+ } else if (offsets.size() == 0){
+ Thread.sleep(1000); // if there is no data, wait a bit
+ }
+ }
+ if (kafkareader.e != null) {
+ this.error = kafkareader.e;
+ }
+ } finally {
+ if (reader != null) {
+ reader.interrupt();
+ try {
+ reader.join(5000);
+ } catch (InterruptedException e) {
+ LoggingUtil.LOGGER.warn("Exception when waiting for the reader thread to finish", e);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LoggingUtil.LOGGER.info("Service got stopped");
+ } catch (Exception e) {
+ LoggingUtil.LOGGER.error("Exception in Transformer thread", e);
+ this.error = e;
+ }
+ starttime = null;
+ }
+ }
+
+ /**
+ * The KafkaReader thread keeps putting records into the processing queue and, as the owner of the consumer, commits
+ * the offsets the producer has safely sent.
+ * In case there is a rebalance, the offsets might be about a partition the consumer no longer is subscribed to. Such an error
+ * is captured and the next commit of another offset-map will take care of this situation.
+ */
+ private class KafkaReader implements Runnable {
+ private ArrayBlockingQueue> queue;
+ private Exception e;
+ private Properties consumerProperties;
+ private boolean commitnow = false;
+ private Map offsets;
+ private int partitioncount = 0;
+ private int failedcommits;
+
+ public KafkaReader(ArrayBlockingQueue> queue, Properties consumerProperties) {
+ this.queue = queue;
+ this.consumerProperties = consumerProperties;
+ }
+
+ @Override
+ public void run() {
+ numbercores = Runtime.getRuntime().availableProcessors();
+ try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties);
+ ExecutorService executor = Executors.newWorkStealingPool(numbercores);) {
+ ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ if (offsets.size() != 0) {
+ LoggingUtil.LOGGER.debug("Partitions revoked {}, hence committing the offsets", partitions);
+ commit(consumer);
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ }
+ };
+ consumer.subscribe(List.of(inputtopicname), listener );
+ this.partitioncount = consumer.partitionsFor(inputtopicname).size();
+ failedcommits = 0;
+ while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord record : records) {
+ queue.add(executor.submit(new Transformer(record.value(), record.key(), record.offset(), record.topic(), record.partition())));
+ LoggingUtil.LOGGER.debug("Queued record with offset {} of topic_partition {}-{}", record.offset(), record.topic(), record.partition());
+ }
+ if (commitnow) {
+ commit(consumer);
+ }
+ }
+ } catch (Exception ex) {
+ LoggingUtil.LOGGER.error("Exception when consuming the record", ex);
+ e = ex;
+ }
+ }
+
+ private void commit(KafkaConsumer consumer) {
+ commitnow = false; // These offsets should be committed once, because it might contain partitions the consumer is no longer in charge
+ if (offsets.size() != 0) {
+ try {
+ LoggingUtil.LOGGER.debug("Commit was requested for offsets {}", offsets);
+ consumer.commitSync(offsets);
+ LoggingUtil.LOGGER.debug("Commit was completed for offsets {}", offsets);
+ failedcommits = 0;
+ } catch (RebalanceInProgressException | CommitFailedException e) {
+ failedcommits++;
+ if (failedcommits > 5) {
+ LoggingUtil.LOGGER.error("Exception when committing the offsets due to rebalance related operations for the fifth time in a row, offsets = {}", offsets, e);
+ throw e;
+ } else {
+ LoggingUtil.LOGGER.warn("Exception when committing the offsets due to rebalance related operations - ignored, offsets = {}", offsets, e);
+ }
+ }
+ }
+ }
+
+ public void commit(Map offsets) {
+ commitnow = true;
+ this.offsets = offsets;
+ }
+ }
+
+ private class Transformer implements Callable {
+ public Bytes in;
+ public Bytes out;
+ public Bytes key;
+ public long offset;
+ public long startts;
+ public long endts;
+ private String topic;
+ private int partition;
+
+ public Transformer(Bytes in, Bytes key, long offset, String topic, int partition) {
+ this.in = in;
+ this.key = key;
+ this.offset = offset;
+ this.topic = topic;
+ this.partition = partition;
+ }
+
+ @Override
+ public Transformer call() throws Exception {
+ startts = System.currentTimeMillis();
+ byte[] data = in.get();
+
+ int schemaid;
+ try {
+ schemaid = AvroDeserializer.getSchemaId(data);
+ } catch (IOException e) {
+ LoggingUtil.LOGGER.error("This input is not a valid Avro record with a magic byte", e);
+ throw new KafkaException("This input is not a valid Avro record with a magic byte", e);
+ }
+
+ ParsedSchema writerparsedschema;
+ try {
+ writerparsedschema = service.getSchemaclient().getSchemaById(schemaid);
+ } catch (IOException | RestClientException e) {
+ LoggingUtil.LOGGER.error("Exception when reading the schema <{}> from the schema registry by id", schemaid, e);
+ throw new KafkaException("Exception when reading the schema <" + schemaid + "> from the schema registry by id", e);
+ }
+ Schema writerschema = (Schema) writerparsedschema.rawSchema();
+
+ RuleFileDefinition ruledefinition = ruledefinitions.get(writerschema.getFullName());
+ if (ruledefinition == null) {
+ /*
+ * No transformations for that schema, hence a pass-through
+ */
+ out = in;
+ return this;
+ } else {
+ JexlRecord inputrecord;
+ if (writerschema.getField(ValueSchema.AUDIT) == null) {
+ synchronized (this) {
+ Entry schemadata = inoutschema.get(schemaid);
+ if (schemadata == null) {
+ // if no, derive the new schema
+ SchemaBuilder outputschemabuilder = new SchemaBuilder(writerschema.getName() + "_CLEANSED", writerschema, false);
+ ValueSchema.addAuditField(outputschemabuilder);
+ outputschemabuilder.build();
+ Schema outputschema = outputschemabuilder.getSchema();
+ io.confluent.kafka.schemaregistry.client.rest.entities.Schema schemaentity = new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
+ ruledefinition.getOutputsubjectname(), 0, 0, new SchemaString(outputschema.toString()));
+ ParsedSchema parsed = provider.parseSchemaOrElseThrow(schemaentity , false, false);
+ // register the schema - if already known the API call will not create it and just return the schema id
+ int outputschemaid;
+ try {
+ outputschemaid = service.getSchemaclient().register(ruledefinition.getOutputsubjectname(), parsed);
+ } catch (IOException | RestClientException e) {
+ LoggingUtil.LOGGER.error("Exception when registering the schema for subject name <{}>\nschema: {}", ruledefinition.getOutputsubjectname(), outputschema.toString(true), e);
+ throw new KafkaException("Exception when writing the schema for subject name <" + ruledefinition.getOutputsubjectname() + ">", e);
+ }
+ schemadata = new SimpleEntry<>(outputschemaid, outputschema);
+ inoutschema.put(schemaid, schemadata);
+ schemaid = outputschemaid;
+ } else {
+ schemaid = schemadata.getKey();
+ }
+ try {
+ inputrecord = JexlAvroDeserializer.deserialize(data, writerschema, schemadata.getValue());
+ } catch (IOException e) {
+ LoggingUtil.LOGGER.error("Exception when deserializing the byte[] into an Avro record\ndata: {}\ninputschema: {}\noutputschema: {}", data, writerschema.toString(true), writerschema.toString(true), e);
+ throw new KafkaException("Exception when deserializing the byte[] into an Avro record", e);
+ }
+ }
+ } else {
+ /*
+ * The input has an AUDIT structure already, deserialize it into a JexRecord, apply the rules and serialize it into the same schema
+ */
+ try {
+ inputrecord = JexlAvroDeserializer.deserialize(data, writerschema);
+ } catch (IOException e) {
+ LoggingUtil.LOGGER.error("Exception when deserializing the byte[] into an Avro record\ndata: {}\ninputschema: {}\noutputschema: {}", data, writerschema.toString(true), writerschema.toString(true), e);
+ throw new KafkaException("Exception when deserializing the byte[] into an Avro record", e);
+ }
+ }
+
+ try {
+ ruledefinition.apply(inputrecord, false);
+ } catch (IOException e) {
+ LoggingUtil.LOGGER.error("Exception when applying the rules to the record\\nrulefile: {}\nrecord: {}", ruledefinition.getName(), inputrecord, e);
+ throw new KafkaException("Exception when applying the rules to the record", e);
+ }
+
+ try {
+ out = Bytes.wrap(AvroSerializer.serialize(schemaid, inputrecord));
+ endts = System.currentTimeMillis();
+ return this;
+ } catch (IOException e) {
+ LoggingUtil.LOGGER.error("Exception when serializing the Avro record into a byte[]", e);
+ throw new KafkaException("Exception when serializing the Avro record into a byte[]", e);
+ }
+ }
+ }
+ }
+
+ public long getRowsprocessed() {
+ return rowsprocessed.get();
+ }
+
+ public long getLastprocessedtimestamp() {
+ return lastprocessedtimestamp;
+ }
+
+ public int getQueuedRecordCount() {
+ if (queue == null) {
+ return 0;
+ } else {
+ return queue.size();
+ }
+ }
+
+ public int getQueuecapacity() {
+ return MAX_QUEUE_SIZE;
+ }
+
+ public int getNumbercores() {
+ return numbercores;
+ }
+
+ public Float getRowspersecond() {
+ if (starttime == null) {
+ return null;
+ } else {
+ long elapsed = System.currentTimeMillis() - starttime;
+ if (elapsed == 0) {
+ return null;
+ }
+ float rowspersecond = rowsprocessed.get() / (elapsed / 1000.0f);
+ return rowspersecond;
+ }
+ }
+
+ public Map getRulefiledefinitions() {
+ return ruledefinitions;
+ }
+
+ public String getInputtopicname() {
+ return inputtopicname;
+ }
+
+ public String getOutputtopicname() {
+ return outputtopicname;
+ }
+
+ public Exception getError() {
+ return error;
+ }
+
+ public int getInstances() {
+ return instances;
+ }
+
+ public List getRulefiles() {
+ return rulefiles;
+ }
+
+ public Long getAvgProcessingtime() {
+ if (rowsprocessed.get() != 0) {
+ return processingtime.get() / rowsprocessed.get();
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/RulesService.java b/src/main/java/io/rtdi/bigdata/rulesservice/RulesService.java
index 27ac9f8..a9f358f 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/RulesService.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/RulesService.java
@@ -35,8 +35,8 @@
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.common.KafkaFuture;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -64,7 +64,6 @@
*/
@WebListener
public class RulesService implements ServletContextListener {
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
private ScheduledExecutorService executor;
private CachedSchemaRegistryClient schemaclient;
@@ -78,7 +77,7 @@ public class RulesService implements ServletContextListener {
/**
* Map>
*/
- private Map services = new HashMap<>();
+ private Map services = new HashMap<>();
/**
* Default constructor.
@@ -102,7 +101,7 @@ public String getBootstrapServers() {
public long getRowsProduced() {
if (services != null) {
long count = 0L;
- for (RuleFileKStream service : services.values()) {
+ for (RuleFileTransformer service : services.values()) {
count += service.getRowsprocessed();
}
return count;
@@ -122,7 +121,7 @@ public String getState() {
public Long getLastDataTimestamp() {
if (services != null) {
long ts = 0L;
- for (RuleFileKStream service : services.values()) {
+ for (RuleFileTransformer service : services.values()) {
if (ts < service.getLastprocessedtimestamp()) {
ts = service.getLastprocessedtimestamp();
}
@@ -167,38 +166,38 @@ public void contextInitialized(ServletContextEvent sce) {
Object o = envCtx.lookup("rulesettings");
if (o != null) {
settingsdir = Path.of(o.toString());
- log.info("Found JNDI resource name , hence the settings directory is <{}>", settingsdir);
+ LoggingUtil.LOGGER.info("Found JNDI resource name , hence the settings directory is <{}>", settingsdir);
}
} catch (NamingException e) {
- log.info("No JNDI resource found in the context.xml for name , hence using the default");
+ LoggingUtil.LOGGER.info("No JNDI resource found in the context.xml for name , hence using the default");
}
try {
Object o = envCtx.lookup("rulegroups");
if (o != null) {
rulefiledir = Path.of(o.toString());
- log.info("Found JNDI resource name , hence the rules root directory is <{}>", rulefiledir);
+ LoggingUtil.LOGGER.info("Found JNDI resource name , hence the rules root directory is <{}>", rulefiledir);
}
} catch (NamingException e) {
- log.info("No JNDI resource found in the context.xml for name , hence the default");
+ LoggingUtil.LOGGER.info("No JNDI resource found in the context.xml for name , hence the default");
}
} catch (Exception e) {
this.globalexception = e;
- log.info("Exception when reading the webserver settings", e);
+ LoggingUtil.LOGGER.info("Exception when reading the webserver settings", e);
}
try {
if (settingsdir == null) {
settingsdir = Path.of("/apps/rulesservice/settings");
- log.info("Settings directory is the default <{}>", settingsdir);
+ LoggingUtil.LOGGER.info("Settings directory is the default <{}>", settingsdir);
}
if (rulefiledir == null) {
rulefiledir = Path.of("/apps/rulesservice/definitions");
- log.info("Rulefile directory is the default <{}>", rulefiledir);
+ LoggingUtil.LOGGER.info("Rulefile directory is the default <{}>", rulefiledir);
}
configure();
startService();
} catch (Exception e) {
this.globalexception = e;
- log.info("Exception when reading the webserver settings", e);
+ LoggingUtil.LOGGER.info("Exception when reading the webserver settings", e);
}
}
@@ -207,13 +206,13 @@ public void startService() throws IOException {
Map topicrulefiles = getTopicRuleFiles();
for (Entry entry : topicrulefiles.entrySet()) {
TopicRule r = entry.getValue();
- RuleFileKStream rule = new RuleFileKStream(r.getRulefiles(), this, r.getInputtopicname(), r.getOutputtopicname(), r.getInstances());
+ RuleFileTransformer rule = new RuleFileTransformer(r.getRulefiles(), this, r.getInputtopicname(), r.getOutputtopicname(), r.getInstances());
if (rule.getRulefiledefinitions().size() > 0) {
services.put(r.getInputtopicname(), rule);
rule.start();
- log.info("Thread for topic {} started", r.getInputtopicname());
+ LoggingUtil.LOGGER.info("Thread for topic {} started", r.getInputtopicname());
} else {
- log.info("Topic {} has no active rules", r.getInputtopicname());
+ LoggingUtil.LOGGER.info("Topic {} has no active rules", r.getInputtopicname());
}
}
}
@@ -223,28 +222,28 @@ public void startService(String topicname) throws IOException, InterruptedExcept
Map topicrules = getTopicRuleFiles();
TopicRule r = topicrules.get(topicname);
if (r != null) {
- RuleFileKStream rule = new RuleFileKStream(r.getRulefiles(), this, r.getInputtopicname(), r.getOutputtopicname(), r.getInstances());
+ RuleFileTransformer rule = new RuleFileTransformer(r.getRulefiles(), this, r.getInputtopicname(), r.getOutputtopicname(), r.getInstances());
if (rule.getRulefiledefinitions().size() > 0) {
services.put(r.getInputtopicname(), rule);
rule.start();
- log.info("Thread for topic {} started", r.getInputtopicname());
+ LoggingUtil.LOGGER.info("Thread for topic {} started", r.getInputtopicname());
} else {
- log.info("Topic {} has no active rules", r.getInputtopicname());
+ LoggingUtil.LOGGER.info("Topic {} has no active rules", r.getInputtopicname());
}
}
}
public void startFailedServices() throws IOException {
- for (Entry entry : services.entrySet()) {
- RuleFileKStream stream = entry.getValue();
+ for (Entry entry : services.entrySet()) {
+ RuleFileTransformer stream = entry.getValue();
if (! stream.isAlive()) {
- RuleFileKStream rule = new RuleFileKStream(stream.getRulefiles(), this, stream.getInputtopicname(), stream.getOutputtopicname(), stream.getInstances());
+ RuleFileTransformer rule = new RuleFileTransformer(stream.getRulefiles(), this, stream.getInputtopicname(), stream.getOutputtopicname(), stream.getInstances());
if (rule.getRulefiledefinitions().size() > 0) {
services.put(rule.getInputtopicname(), rule);
rule.start();
- log.info("Thread for topic {} re-started", rule.getInputtopicname());
+ LoggingUtil.LOGGER.info("Thread for topic {} re-started", rule.getInputtopicname());
} else {
- log.info("Topic {} has no active rules", rule.getInputtopicname());
+ LoggingUtil.LOGGER.info("Topic {} has no active rules", rule.getInputtopicname());
}
}
}
@@ -258,7 +257,7 @@ public void stopService() throws IOException {
/*
* Signal all to stop
*/
- for (RuleFileKStream service : services.values()) {
+ for (RuleFileTransformer service : services.values()) {
service.interrupt();
}
/*
@@ -266,13 +265,13 @@ public void stopService() throws IOException {
*/
long until = System.currentTimeMillis() + 60000L;
while (System.currentTimeMillis() < until && services.size() > 0) {
- Iterator> iter = services.entrySet().iterator();
+ Iterator> iter = services.entrySet().iterator();
while (iter.hasNext()) {
- Entry service = iter.next();
+ Entry service = iter.next();
try {
if (service.getValue().join(Duration.ofSeconds(2))) {
iter.remove();
- log.info("Thread for topic {} stopped", service.getKey());
+ LoggingUtil.LOGGER.info("Thread for topic {} stopped", service.getKey());
}
} catch (InterruptedException e) {
// NOOP as we have to close all resources, no matter what
@@ -282,7 +281,7 @@ public void stopService() throws IOException {
}
public void stopService(String topicname) throws IOException, InterruptedException {
- RuleFileKStream service = services.get(topicname);
+ RuleFileTransformer service = services.get(topicname);
if (service != null) {
/*
* Signal all to stop
@@ -293,7 +292,7 @@ public void stopService(String topicname) throws IOException, InterruptedExcepti
*/
if (service.join(Duration.ofSeconds(60))) {
services.remove(topicname);
- log.info("Thread for topic {} stopped", topicname);
+ LoggingUtil.LOGGER.info("Thread for topic {} stopped", topicname);
}
}
}
@@ -306,15 +305,26 @@ private void configure() throws IOException, RestClientException {
this.globalexception = null;
File propertiesfile = new File(settingsdir.toFile(), "kafka.properties");
if (!propertiesfile.isFile()) {
- log.error("The mandatory kafka.properties file does not exist at <{}>", propertiesfile.toString());
+ LoggingUtil.LOGGER.error("The mandatory kafka.properties file does not exist at <{}>", propertiesfile.toString());
throw new IOException("The mandatory kafka.properties file does not exist at <" + propertiesfile.toString() + ">");
} else {
- log.info("Found the kafka.properties file as <{}>", propertiesfile.toString());
+ LoggingUtil.LOGGER.info("Found the kafka.properties file at <{}>", propertiesfile.toString());
}
Properties kafkaproperties = new Properties();
try (InputStream is = new FileInputStream(propertiesfile)) {
kafkaproperties.load(is);
}
+ String loglevel = kafkaproperties.getProperty("ruleservice.loglevel");
+ if (loglevel != null) {
+ Level level = Level.getLevel(loglevel);
+ if (level != null) {
+ LoggingUtil.LOGGER.info("Setting the log level for the rules service to <{}>", level);
+ Configurator.setLevel(LoggingUtil.LOGGER.getName(), level);
+ } else {
+ LoggingUtil.LOGGER.warn("The log level <{}> is not valid, using the default", loglevel);
+ }
+ }
+
this.properties = Files.readString(settingsdir.resolve("kafka.properties"), StandardCharsets.UTF_8);
String schemaurls = kafkaproperties.getProperty("schema.registry.url");
if (schemaurls == null) {
@@ -325,7 +335,7 @@ private void configure() throws IOException, RestClientException {
for (String url : urls) {
schemaRegistryUrls.add(url.trim());
}
- propertiesmap = kafkaproperties.entrySet().stream().collect(
+ propertiesmap = kafkaproperties.entrySet().stream().filter(e -> e.getKey().toString().startsWith("ruleservice.") == false).collect(
Collectors.toMap(
e -> e.getKey().toString(),
e -> e.getValue().toString()
@@ -385,7 +395,7 @@ public ServiceSettings getConfig(boolean isadmin) {
settings.setSchemaregsubjects(schemas.size());
settings.setSchemaregconnected(true);
} catch (IOException | RestClientException e) {
- log.error("getConfig() ran into an error", e);
+ LoggingUtil.LOGGER.error("getConfig() ran into an error", e);
globalexception = e;
}
}
@@ -396,7 +406,7 @@ public ServiceSettings getConfig(boolean isadmin) {
result.nodes().get(2, TimeUnit.SECONDS);
settings.setKafkaconnected(true);
} catch (Exception e) {
- log.error("getConfig() ran into an error", e);
+ LoggingUtil.LOGGER.error("getConfig() ran into an error", e);
globalexception = e;
}
}
@@ -475,13 +485,13 @@ public List saveTopicRules(List input) {
topicrule.setModified(null);
topicrule.setInfo("saved");
} catch (IOException e) {
- log.error("Failed to save the file for topic <{}>", topicrule.getInputtopicname(), e);
+ LoggingUtil.LOGGER.error("Failed to save the file for topic <{}>", topicrule.getInputtopicname(), e);
topicrule.setInfo("Failed to save the topic file (" + e.getMessage() + ")");
}
try {
startService(topicrule.getInputtopicname());
} catch (IOException | InterruptedException e) {
- log.error("Failed to start the service for the topic <{}>", topicrule.getInputtopicname(), e);
+ LoggingUtil.LOGGER.error("Failed to start the service for the topic <{}>", topicrule.getInputtopicname(), e);
topicrule.setInfo("Failed to start the service for the topic <" + topicrule.getInputtopicname() + ">: " + e.getMessage());
}
}
@@ -507,24 +517,24 @@ public void saveConfig(ServiceSettings input) throws IOException {
}
private void close() {
- log.info("Closing all resources");
+ LoggingUtil.LOGGER.info("Closing all resources");
try {
stopService();
} catch (IOException e) {
- log.error("Stopping the services failed - ignored", e);
+ LoggingUtil.LOGGER.error("Stopping the services failed - ignored", e);
}
if (admin != null) {
try {
admin.close(Duration.ofSeconds(10));
} catch (Exception e) {
- log.error("Closing the Kafka admin client within 10 seconds failed - ignored", e);
+ LoggingUtil.LOGGER.error("Closing the Kafka admin client within 10 seconds failed - ignored", e);
}
}
if (schemaclient != null) {
try {
schemaclient.close();
} catch (Exception e) {
- log.error("Closing the schema registry client failed - ignored", e);
+ LoggingUtil.LOGGER.error("Closing the schema registry client failed - ignored", e);
}
}
}
@@ -572,7 +582,7 @@ public CachedSchemaRegistryClient getSchemaclient() {
return schemaclient;
}
- public Map getServices() {
+ public Map getServices() {
return services;
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceSettings.java b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceSettings.java
index 31a4b6d..b3ffce4 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceSettings.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceSettings.java
@@ -11,7 +11,7 @@ public class ServiceSettings {
private boolean servicerunning;
private String errormessage;
private boolean adminuser;
- private static final String propertieshelp = "The properties must contain info to connect to the schema registry (read/write) and create a Kafka KStream";
+ private static final String propertieshelp = "The properties must contain info to connect to the schema registry and topics (read/write)";
public String getProperties() {
if (properties != null) {
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatus.java b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatus.java
index 0043be0..62b9296 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatus.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatus.java
@@ -1,11 +1,13 @@
package io.rtdi.bigdata.rulesservice.config;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import io.rtdi.bigdata.rulesservice.RuleFileKStream;
+import io.rtdi.bigdata.rulesservice.RuleFileTransformer;
import io.rtdi.bigdata.rulesservice.RulesService;
public class ServiceStatus {
@@ -14,18 +16,30 @@ public class ServiceStatus {
private ServiceSettings config;
public ServiceStatus(RulesService service) {
- Map services = service.getServices();
+ Map rulefiles = null;
+ try {
+ rulefiles = service.getTopicRuleFiles();
+ } catch (IOException e) {
+ rulefiles = new HashMap<>();
+ }
+
+ Map services = service.getServices();
config = service.getConfig(false);
if (services != null) {
topicstatus = new ArrayList<>();
- for (Entry entity : services.entrySet()) {
+ for (Entry entity : services.entrySet()) {
ServiceStatusTopic s = new ServiceStatusTopic(entity.getValue());
if (s.getStatus() == Boolean.TRUE) {
activetopics++;
}
topicstatus.add(s);
+ rulefiles.remove(entity.getKey());
}
}
+ for (String topic : rulefiles.keySet()) {
+ ServiceStatusTopic s = new ServiceStatusTopic(topic);
+ topicstatus.add(s);
+ }
}
public List getTopicstatus() {
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatusTopic.java b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatusTopic.java
index b401a16..7f87543 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatusTopic.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/config/ServiceStatusTopic.java
@@ -3,7 +3,7 @@
import java.util.ArrayList;
import java.util.List;
-import io.rtdi.bigdata.rulesservice.RuleFileKStream;
+import io.rtdi.bigdata.rulesservice.RuleFileTransformer;
public class ServiceStatusTopic {
private Boolean status;
@@ -14,15 +14,23 @@ public class ServiceStatusTopic {
private String topicname;
private String name;
private Integer rulecount = 0;
+ private Integer queuedrecords = 0;
+ private Float rowspersecond = null;
+ private int numbercores = 0;
+ private int queuecapacity;
- public ServiceStatusTopic(RuleFileKStream stream) {
+ public ServiceStatusTopic(RuleFileTransformer stream) {
topicname = stream.getInputtopicname();
name = stream.getName();
if (stream.isAlive()) {
status = Boolean.TRUE;
rowsprocessed = stream.getRowsprocessed();
lastprocessedtimestamp = stream.getLastprocessedtimestamp();
+ queuedrecords = stream.getQueuedRecordCount();
+ rowspersecond = stream.getRowspersecond();
avgtime = stream.getAvgProcessingtime();
+ numbercores = stream.getNumbercores();
+ queuecapacity = stream.getQueuecapacity();
activerules = new ArrayList<>();
for (RuleFileDefinition r : stream.getRulefiledefinitions().values()) {
activerules.add(r.getName());
@@ -33,6 +41,11 @@ public ServiceStatusTopic(RuleFileKStream stream) {
}
}
+ public ServiceStatusTopic(String topicname) {
+ status = Boolean.FALSE;
+ this.topicname = topicname;
+ }
+
public Boolean getStatus() {
return status;
}
@@ -64,4 +77,20 @@ public String getName() {
public Integer getRulecount() {
return rulecount;
}
+
+ public Integer getQueuedrecords() {
+ return queuedrecords;
+ }
+
+ public Float getRowspersecond() {
+ return rowspersecond;
+ }
+
+ public int getQueuecapacity() {
+ return queuecapacity;
+ }
+
+ public int getNumbercores() {
+ return numbercores;
+ }
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/jexl/AvroRuleUtils.java b/src/main/java/io/rtdi/bigdata/rulesservice/jexl/AvroRuleUtils.java
index 39379aa..b80ff80 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/jexl/AvroRuleUtils.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/jexl/AvroRuleUtils.java
@@ -3,11 +3,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilderException;
-import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceCalculate.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceCalculate.java
index 5db5edf..7e985d0 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceCalculate.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceCalculate.java
@@ -1,8 +1,5 @@
package io.rtdi.bigdata.rulesservice.rest;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import io.rtdi.bigdata.rulesservice.LoggingUtil;
import io.rtdi.bigdata.rulesservice.RulesService;
import io.rtdi.bigdata.rulesservice.config.RuleFileDefinition;
@@ -30,8 +27,6 @@
public class RestServiceCalculate {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -79,12 +74,12 @@ public Response processStep(
Integer stepindex,
@RequestBody RuleFileDefinition input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
RuleStep ret = service.applySampleData(input, stepindex);
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -119,12 +114,12 @@ public Response processStep(
public Response processAll(
@RequestBody RuleFileDefinition input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
RuleFileDefinition ret = service.applySampleFile(input);
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceConfig.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceConfig.java
index fd03635..902776c 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceConfig.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceConfig.java
@@ -1,8 +1,5 @@
package io.rtdi.bigdata.rulesservice.rest;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import io.rtdi.bigdata.rulesservice.LoggingUtil;
import io.rtdi.bigdata.rulesservice.RulesService;
import io.rtdi.bigdata.rulesservice.config.ServiceSettings;
@@ -30,8 +27,6 @@
public class RestServiceConfig {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -70,13 +65,13 @@ public class RestServiceConfig {
})
public Response getConfig() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
boolean admin = request.isUserInRole(ServletSecurityConstants.ROLE_ADMIN);
ServiceSettings ret = service.getConfig(admin);
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -110,12 +105,12 @@ public Response getConfig() {
})
public Response postConfig(@RequestBody ServiceSettings input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
service.saveConfig(input);
- return LoggingUtil.requestEnd(log, request, new SuccessResponse("Saved"));
+ return LoggingUtil.requestEnd(request, new SuccessResponse("Saved"));
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -149,12 +144,12 @@ public Response postConfig(@RequestBody ServiceSettings input) {
})
public Response startService() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
service.startService();
- return LoggingUtil.requestEnd(log, request, new SuccessResponse("Started"));
+ return LoggingUtil.requestEnd(request, new SuccessResponse("Started"));
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -188,12 +183,12 @@ public Response startService() {
})
public Response stopService() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
service.stopService();
- return LoggingUtil.requestEnd(log, request, new SuccessResponse("Started"));
+ return LoggingUtil.requestEnd(request, new SuccessResponse("Started"));
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -225,12 +220,12 @@ public Response stopService() {
})
public Response getServiceStatus() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
ServiceStatus ret = service.getServiceStatus();
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceRules.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceRules.java
index 7a4cfa2..4429122 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceRules.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceRules.java
@@ -2,9 +2,6 @@
import java.util.List;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import io.rtdi.bigdata.rulesservice.LoggingUtil;
import io.rtdi.bigdata.rulesservice.RulesService;
import io.rtdi.bigdata.rulesservice.config.RuleFileDefinition;
@@ -35,8 +32,6 @@
public class RestServiceRules {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -82,12 +77,12 @@ public Response getRuleFiles(
)
String subjectname) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
List rulegroups = service.getRuleFiles(subjectname);
- return LoggingUtil.requestEnd(log, request, rulegroups);
+ return LoggingUtil.requestEnd(request, rulegroups);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -120,12 +115,12 @@ public Response getRuleFiles(
})
public Response getAllRuleFiles() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
List rulegroups = service.getAllRuleFiles();
- return LoggingUtil.requestEnd(log, request, rulegroups);
+ return LoggingUtil.requestEnd(request, rulegroups);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -170,13 +165,13 @@ public Response getRuleFile(
)
String subjectname) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
java.nio.file.Path name = java.nio.file.Path.of(path);
RuleFileDefinition rg = RuleFileDefinition.load(service.getRuleFileRootDir(), subjectname, name, false);
- return LoggingUtil.requestEnd(log, request, rg);
+ return LoggingUtil.requestEnd(request, rg);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -223,15 +218,15 @@ public Response postRuleFile(
String subjectname,
@RequestBody RuleFileDefinition input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
java.nio.file.Path name = java.nio.file.Path.of(path); // The name of the file to be saved, which is the old name
input.setActive(false);
input.setInputsubjectname(subjectname);
input.save(service.getRuleFileRootDir(), name);
- return LoggingUtil.requestEnd(log, request, SuccessResponse.SUCCESS);
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -277,12 +272,12 @@ public Response activateRuleFile(
)
String subjectname) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
RuleFileDefinition.copyToActivate(service.getRuleFileRootDir(), subjectname, path);
- return LoggingUtil.requestEnd(log, request, SuccessResponse.SUCCESS);
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -321,12 +316,12 @@ public Response createEmptyRuleFile(
)
String subjectname) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
RuleFileDefinition rg = RuleFileDefinition.createEmptyRuleFileDefinition(subjectname, service);
- return LoggingUtil.requestEnd(log, request, rg);
+ return LoggingUtil.requestEnd(request, rg);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSample.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSample.java
index a1ba149..e4aebae 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSample.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSample.java
@@ -19,8 +19,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import org.glassfish.jersey.server.ChunkedOutput;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,8 +53,6 @@
public class RestServiceSample {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -98,7 +94,7 @@ public class RestServiceSample {
public Response sample(@RequestBody List topics) {
ObjectMapper om = new ObjectMapper();
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
ChunkedOutput output = new ChunkedOutput(String.class);
if (topics == null || topics.size() == 0) {
return ErrorResponse.createErrorResponse(new IOException("No topics to read from have been provided"), Status.BAD_REQUEST);
@@ -165,9 +161,9 @@ public void run() {
}
}
}
- LoggingUtil.logRequestEnd(log, request);
+ LoggingUtil.logRequestEnd(request);
} catch (IOException | RestClientException e) {
- LoggingUtil.logRequestEndInputError(log, request, e);
+ LoggingUtil.logRequestEndInputError(request, e);
ErrorResponse response = new ErrorResponse(e);
try {
output.write(om.writeValueAsString(response));
@@ -185,7 +181,7 @@ public void run() {
return Response.ok(output).build();
}
} catch (Exception e) {
- LoggingUtil.logRequestEndInputError(log, request, e);
+ LoggingUtil.logRequestEndInputError(request, e);
return ErrorResponse.createErrorResponse(e);
}
}
@@ -227,12 +223,12 @@ public Response savesample(
String subject,
@RequestBody SampleData data) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
String filename = data.save(service.getRuleFileRootDir());
- return LoggingUtil.requestEnd(log, request, new SampleFileName(filename));
+ return LoggingUtil.requestEnd(request, new SampleFileName(filename));
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -272,12 +268,12 @@ public Response getSampleFiles(
String subject
) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
List samplefiles = SampleData.getFiles(service.getRuleFileRootDir(), subject);
- return LoggingUtil.requestEnd(log, request, samplefiles);
+ return LoggingUtil.requestEnd(request, samplefiles);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSubjects.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSubjects.java
index a0c0298..7d9e424 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSubjects.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceSubjects.java
@@ -4,9 +4,6 @@
import java.util.Collection;
import java.util.List;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.rtdi.bigdata.rulesservice.LoggingUtil;
import io.rtdi.bigdata.rulesservice.RulesService;
@@ -38,8 +35,6 @@
public class RestServiceSubjects {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -78,16 +73,16 @@ public class RestServiceSubjects {
})
public Response getSubjects() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
Collection subjects = service.getSubjects();
List ret = new ArrayList<>();
for (String s : subjects) {
ret.add(new SubjectName(s));
}
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -128,13 +123,13 @@ public Response updateRuleFile(
String subjectname,
@RequestBody RuleFileDefinition input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
RuleFileDefinition empty = RuleFileDefinition.createEmptyRuleFileDefinition(subjectname, service);
input.update(empty);
- return LoggingUtil.requestEnd(log, request, input);
+ return LoggingUtil.requestEnd(request, input);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -174,19 +169,19 @@ public Response getSchemaDefiniton(
)
String subjectname) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
org.apache.avro.Schema schema = service.getLatestSchema(subjectname);
- return LoggingUtil.requestEnd(log, request, schema);
+ return LoggingUtil.requestEnd(request, schema);
} catch (RestClientException e) {
if (e.getStatus() == 404) {
// invalid schema
- return LoggingUtil.requestEndInputError(log, request, e, Status.NOT_FOUND);
+ return LoggingUtil.requestEndInputError(request, e, Status.NOT_FOUND);
} else {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceTopics.java b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceTopics.java
index 3df6516..7c3983b 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceTopics.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rest/RestServiceTopics.java
@@ -4,14 +4,12 @@
import java.util.Collection;
import java.util.List;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import io.rtdi.bigdata.rulesservice.LoggingUtil;
import io.rtdi.bigdata.rulesservice.RulesService;
import io.rtdi.bigdata.rulesservice.config.TopicName;
import io.rtdi.bigdata.rulesservice.config.TopicRule;
import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -24,6 +22,7 @@
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Configuration;
import jakarta.ws.rs.core.Context;
@@ -34,8 +33,6 @@
public class RestServiceTopics {
protected static final int SAMPLE_MAX_ROWS = 100;
- protected final Logger log = LogManager.getLogger(this.getClass().getName());
-
@Context
private Configuration configuration;
@@ -75,12 +72,12 @@ public class RestServiceTopics {
})
public Response getTopicRules() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
Collection topicrules = service.getTopicsAndRules();
- return LoggingUtil.requestEnd(log, request, topicrules);
+ return LoggingUtil.requestEnd(request, topicrules);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -114,12 +111,12 @@ public Response getTopicRules() {
})
public Response postRules(@RequestBody List input) {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
List ret = service.saveTopicRules(input);
- return LoggingUtil.requestEnd(log, request, ret);
+ return LoggingUtil.requestEnd(request, ret);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
@@ -152,17 +149,180 @@ public Response postRules(@RequestBody List input) {
})
public Response getTopics() {
try {
- LoggingUtil.logRequestBegin(log, request);
+ LoggingUtil.logRequestBegin(request);
RulesService service = RulesService.getRulesService(servletContext);
Collection topics = service.getTopics();
List topicnames = new ArrayList<>(topics.size());
for (String t : topics) {
topicnames.add(new TopicName(t));
}
- return LoggingUtil.requestEnd(log, request, topicnames);
+ return LoggingUtil.requestEnd(request, topicnames);
} catch (Exception e) {
- return LoggingUtil.requestEndTechnicalError(log, request, e);
+ return LoggingUtil.requestEndTechnicalError(request, e);
}
}
+ @POST
+ @Path("/topics/start")
+ @Produces(MediaType.APPLICATION_JSON)
+ @RolesAllowed(ServletSecurityConstants.ROLE_EDIT)
+ @Operation(
+ summary = "Start all services",
+ description = "All services are (re)started",
+ responses = {
+ @ApiResponse(
+ responseCode = "200",
+ description = "Success message",
+ content = {
+ @Content(
+ schema = @Schema(implementation = SuccessResponse.class)
+ )
+ }
+ ),
+ @ApiResponse(
+ responseCode = "500",
+ description = "Any exception thrown",
+ content = {
+ @Content(
+ schema = @Schema(implementation = ErrorResponse.class)
+ )
+ }
+ )
+ })
+ public Response startServices() {
+ try {
+ LoggingUtil.logRequestBegin(request);
+ RulesService service = RulesService.getRulesService(servletContext);
+ service.startService();
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
+ } catch (Exception e) {
+ return LoggingUtil.requestEndTechnicalError(request, e);
+ }
+ }
+
+ @POST
+ @Path("/topics/stop")
+ @Produces(MediaType.APPLICATION_JSON)
+ @RolesAllowed(ServletSecurityConstants.ROLE_EDIT)
+ @Operation(
+ summary = "Stop all services",
+ description = "All running services are stopped",
+ responses = {
+ @ApiResponse(
+ responseCode = "200",
+ description = "Success message",
+ content = {
+ @Content(
+ schema = @Schema(implementation = SuccessResponse.class)
+ )
+ }
+ ),
+ @ApiResponse(
+ responseCode = "500",
+ description = "Any exception thrown",
+ content = {
+ @Content(
+ schema = @Schema(implementation = ErrorResponse.class)
+ )
+ }
+ )
+ })
+ public Response stopServices() {
+ try {
+ LoggingUtil.logRequestBegin(request);
+ RulesService service = RulesService.getRulesService(servletContext);
+ service.stopService();
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
+ } catch (Exception e) {
+ return LoggingUtil.requestEndTechnicalError(request, e);
+ }
+ }
+
+ @POST
+ @Path("/topics/start/{inputtopic}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @RolesAllowed(ServletSecurityConstants.ROLE_EDIT)
+ @Operation(
+ summary = "Start a named service",
+ description = "(re)start a service for a specific topic",
+ responses = {
+ @ApiResponse(
+ responseCode = "200",
+ description = "Success message",
+ content = {
+ @Content(
+ schema = @Schema(implementation = SuccessResponse.class)
+ )
+ }
+ ),
+ @ApiResponse(
+ responseCode = "500",
+ description = "Any exception thrown",
+ content = {
+ @Content(
+ schema = @Schema(implementation = ErrorResponse.class)
+ )
+ }
+ )
+ })
+ public Response startService(
+ @PathParam("inputtopic")
+ @Parameter(
+ description = "name of input topic",
+ example = "order_data"
+ )
+ String inputtopic) {
+ try {
+ LoggingUtil.logRequestBegin(request);
+ RulesService service = RulesService.getRulesService(servletContext);
+ service.startService(inputtopic);
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
+ } catch (Exception e) {
+ return LoggingUtil.requestEndTechnicalError(request, e);
+ }
+ }
+
+ @POST
+ @Path("/topics/stop/{inputtopic}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @RolesAllowed(ServletSecurityConstants.ROLE_EDIT)
+ @Operation(
+ summary = "Stop a named service",
+ description = "Stop a service for a specific topic",
+ responses = {
+ @ApiResponse(
+ responseCode = "200",
+ description = "Success message",
+ content = {
+ @Content(
+ schema = @Schema(implementation = SuccessResponse.class)
+ )
+ }
+ ),
+ @ApiResponse(
+ responseCode = "500",
+ description = "Any exception thrown",
+ content = {
+ @Content(
+ schema = @Schema(implementation = ErrorResponse.class)
+ )
+ }
+ )
+ })
+ public Response stopService(
+ @PathParam("inputtopic")
+ @Parameter(
+ description = "name of input topic",
+ example = "order_data"
+ )
+ String inputtopic) {
+ try {
+ LoggingUtil.logRequestBegin(request);
+ RulesService service = RulesService.getRulesService(servletContext);
+ service.stopService(inputtopic);
+ return LoggingUtil.requestEnd(request, SuccessResponse.SUCCESS);
+ } catch (Exception e) {
+ return LoggingUtil.requestEndTechnicalError(request, e);
+ }
+ }
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rules/Mapping.java b/src/main/java/io/rtdi/bigdata/rulesservice/rules/Mapping.java
index 0fc22f5..4140e06 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rules/Mapping.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rules/Mapping.java
@@ -56,7 +56,7 @@ public Object evaluate(AvroContainer context) throws IOException {
return expression.execute(context);
} catch (JexlException e) {
StringBuilder msg = createErrorDetails(e);
- throw new PropertiesException("Cannot evaluate the Expression" + msg.toString(), e, "Validate the syntax", expression.getSourceText());
+ throw new PropertiesException("Cannot evaluate the Expression " + msg.toString(), e, "Validate the syntax", expression.getSourceText());
}
}
diff --git a/src/main/java/io/rtdi/bigdata/rulesservice/rules/PrimitiveRule.java b/src/main/java/io/rtdi/bigdata/rulesservice/rules/PrimitiveRule.java
index 9046d20..a61852c 100644
--- a/src/main/java/io/rtdi/bigdata/rulesservice/rules/PrimitiveRule.java
+++ b/src/main/java/io/rtdi/bigdata/rulesservice/rules/PrimitiveRule.java
@@ -41,6 +41,8 @@ public PrimitiveRule() {
@Override
public RuleResult apply(Object value, AvroContainer container, boolean test) throws IOException {
try {
+ setConditionerror(null);
+ setSubstituteerror(null);
setSampleValue(value, test);
RuleResult result = calculateResult(container, test);
if (result != null) {
@@ -76,7 +78,6 @@ public RuleResult apply(Object value, AvroContainer container, boolean test) thr
}
if (test) {
setSampleresult(result);
- setConditionerror(null);
}
return result;
} catch (JexlException e) {
@@ -92,7 +93,17 @@ public RuleResult apply(Object value, AvroContainer container, boolean test) thr
private RuleResult calculateResult(AvroContainer valuerecord, boolean test) throws IOException {
if (condition != null) {
RuleResult r = null;
- Object o = condition.evaluate(valuerecord);
+ Object o = null;
+ if (test) {
+ try {
+ o = condition.evaluate(valuerecord);
+ } catch (PropertiesException e) {
+ setConditionerror(e.getMessage());
+ return RuleResult.FAIL;
+ }
+ } else {
+ o = condition.evaluate(valuerecord);
+ }
if (o != null && o instanceof Boolean) {
if (((Boolean) o).booleanValue()) {
r = RuleResult.PASS;
@@ -107,12 +118,12 @@ private RuleResult calculateResult(AvroContainer valuerecord, boolean test) thro
valuerecord.addChangedvalue(getFieldname(), substitutevalue);
if (test) {
setSampleoutput(Rule.valueToJavaObject(substitutevalue, getDataType()));
- setSubstituteerror(null);
}
} catch (IOException e) {
if (test) {
setSampleoutput(null);
setSubstituteerror(getResultString(substitute.getExpression(), e));
+ return RuleResult.FAIL;
} else {
throw e;
}
@@ -122,6 +133,7 @@ private RuleResult calculateResult(AvroContainer valuerecord, boolean test) thro
} else {
if (test) {
setConditionerror("This rule expression does not return true/false");
+ return RuleResult.FAIL;
} else {
throw new PropertiesException("This rule expression does not return true/false", null, "A condition must return true/false", condition.getExpression());
}
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
index 8401eae..5367e81 100644
--- a/src/main/resources/log4j2.properties
+++ b/src/main/resources/log4j2.properties
@@ -21,13 +21,13 @@ rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.file.ref = LOGFILE
logger.kafka.name = org.apache.kafka
-logger.kafka.level = INFO
+logger.kafka.level = WARN
logger.kafka-consumer.name = org.apache.kafka.clients.consumer
-logger.kafka-consumer.level = INFO
+logger.kafka-consumer.level = WARN
-logger.kafka-streams.name = org.apache.kafka.streams
-logger.kafka-streams.level = INFO
+logger.kafka-consumer.name = org.apache.kafka.clients.producer
+logger.kafka-consumer.level = WARN
logger.jexl.name = org.apache.commons.jexl3
logger.jexl.level = DEBUG