From b1b03c1a65e61fd861ff352f45274002c25e89b0 Mon Sep 17 00:00:00 2001 From: werner daehn Date: Wed, 7 May 2025 13:31:10 +0200 Subject: [PATCH] parallel processing --- LICENSE_COMMERCIAL | 19 +- README.md | 59 ++- WebContent/Controller.controller.js | 17 + WebContent/View.view.xml | 8 +- WebContent/ui5/Config.controller.js | 17 +- WebContent/ui5/Config.view.xml | 3 +- WebContent/ui5/Rule.controller.js | 27 +- WebContent/ui5/Rule.view.xml | 11 +- WebContent/ui5/Rules.controller.js | 4 + WebContent/ui5/Rules.view.xml | 2 + WebContent/ui5/Status.controller.js | 33 ++ WebContent/ui5/Status.view.xml | 18 +- WebContent/ui5/Topics.controller.js | 19 +- WebContent/ui5/Topics.view.xml | 1 + WebContent/ui5/lib/FocusInput.js | 6 +- docs/audit-schema.md | 2 +- pom.xml | 17 +- .../bigdata/rulesservice/LoggingUtil.java | 44 +- .../bigdata/rulesservice/RuleFileKStream.java | 245 ---------- .../rulesservice/RuleFileTransformer.java | 454 ++++++++++++++++++ .../bigdata/rulesservice/RulesService.java | 96 ++-- .../rulesservice/config/ServiceSettings.java | 2 +- .../rulesservice/config/ServiceStatus.java | 20 +- .../config/ServiceStatusTopic.java | 33 +- .../rulesservice/jexl/AvroRuleUtils.java | 2 - .../rest/RestServiceCalculate.java | 17 +- .../rulesservice/rest/RestServiceConfig.java | 35 +- .../rulesservice/rest/RestServiceRules.java | 41 +- .../rulesservice/rest/RestServiceSample.java | 24 +- .../rest/RestServiceSubjects.java | 27 +- .../rulesservice/rest/RestServiceTopics.java | 188 +++++++- .../bigdata/rulesservice/rules/Mapping.java | 2 +- .../rulesservice/rules/PrimitiveRule.java | 18 +- src/main/resources/log4j2.properties | 8 +- 34 files changed, 1026 insertions(+), 493 deletions(-) delete mode 100644 src/main/java/io/rtdi/bigdata/rulesservice/RuleFileKStream.java create mode 100644 src/main/java/io/rtdi/bigdata/rulesservice/RuleFileTransformer.java diff --git a/LICENSE_COMMERCIAL b/LICENSE_COMMERCIAL index 9347e9d..bba922f 100644 --- a/LICENSE_COMMERCIAL +++ b/LICENSE_COMMERCIAL @@ -1,5 +1,5 @@ COMMERCIAL LICENSE - 21 June 2020 + 16 April 2025 Preamble @@ -20,10 +20,10 @@ program (sum of all instances) per month. Programs are billed according to the following table - <100’000messages/month 0EUR/month - <1’000’000messages/month 50EUR/month -<10’000’000messages/month 100EUR/month ->10’000’000messages/month 200EUR/month + <100’000messages/month 0EUR/month + <1’000’000messages/month 500EUR/month +<10’000’000messages/month 1000EUR/month +>10’000’000messages/month 2000EUR/month (plus taxes if applicable) @@ -38,10 +38,11 @@ of rows copied is 50'000rows per month. fee is 0EUR according to above table. No contract needs to be signed, no invoices will be created. -Example 3: The program is started 20 times to process different Kafka -topics and partitions at speed, processing 5m records per month in -total. --> The program is used under this Commercial License and -100EUR/month are to be invoiced. +Example 3: The program is started in production 20 times to process +different Kafka topics and partitions at speed, processing 5m records +per month in total. +--> The program is used under this Commercial License and +1000EUR/month are to be invoiced. Once an order is placed for a defined volume, rtdi.io GmbH (Austria, Europe), as the copyright owner, will send invoices on a monthly basis diff --git a/README.md b/README.md index 3164f21..650c4ad 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Docker image here: [dockerhub](https://hub.docker.com/r/rtdi/rulesservice) ## Design Thinking goal * As a business user I would like to validate the incoming data and cleanse it in realtime -* Consumers have the choice to read the raw or the cleansed data +* Hence consumers have the choice to read the raw or the cleansed topic * Operational dashboards using the rule results provide information about the data quality * Different types of rules should be supported, validation rules, cleansing rules, data augmentation, standardization rules,... @@ -30,7 +30,7 @@ Docker image here: [dockerhub](https://hub.docker.com/r/rtdi/rulesservice) ## Requirements * Payload (value) in Avro Format -* Apache Kafka connection with the permissions to run as a KStream +* Apache Kafka connection with the permissions to read input topic and produce the output topic * Schema Registry connection to read (and write) schema definitions @@ -45,7 +45,7 @@ Then start the image via docker run. For a quick test this command is sufficient docker run -d -p 80:8080 --rm --name rulesservice rtdi/rulesservice -to expose a webserver at port 80 on the host running the container. Make sure to open the web page via the http prefix, as https needs more configuration. +to expose a webserver at port 80 on the host running the container. Make sure to open the web page via the http prefix, as https needs additional configuration. For example [http://localhost:80/](http://localhost:80/) might do the trick if the container is hosted on the same computer. The default login for this startup method is: **rtdi / rtdi!io** @@ -61,6 +61,8 @@ The better start command is to mount two host directories into the container, th For more details, especially https and security related, see the [ConnectorRootApp](https://github.com/rtdi/ConnectorRootApp) project, this application is based on. +Note: + if the `settings` or `definitions` directories should point to somewhere else, the `context.xml` of the tomcat webserver can get additional environments in the `` root node: ``` @@ -71,7 +73,7 @@ if the `settings` or `definitions` directories should point to somewhere else, t ### Step 1: Connect to Kafka -The first step is to connect the application to a Kafka server and the schema registry. In the settings screen the normal Kafka properties file data can be pasted and saved. By default the file location is `/apps/rulesservice/settings/kafka.properties` from the container's point of view. +The first step is to connect the application to a Kafka server and the schema registry. In the settings screen a regular Kafka properties file can be pasted and saved. By default the file location is `/apps/rulesservice/settings/kafka.properties` from the container's point of view. @@ -91,11 +93,11 @@ sasl.mechanism=PLAIN ### Step 2: Define Rules -A rule file applies logic to messages of a given schema/subject. It consumes the input message, applies the rules and creates an output message with a derived schema - a schema that contains the rule results as additional field. What the input and output topic is, will be configured in the next step. +A rule file applies logic to messages of a given schema/subject. It consumes the input message, applies the rules and creates an output message. What the input and output topic is, will be configured in the next step. Rule files are saved by default in the directory `/apps/rulesservice/definitions//inactive>/....` and the file name itself can contain sub-directories. -To simplify entering rules, sample values can be entered and the result be recalculated. A quick way to provide sample values is by reading topics and creating sample files with the content - see below. These files, with messages of the same subject name, can be selected and hence used as input. +To simplify entering rules, sample values can be entered and the result be recalculated. A quick way to provide sample values is by reading a topic and creating sample files with the content - see below. These files, with messages of the same subject name, can be selected and hence used as sample input. Once a rule file is complete, it must be copied from the `inactive` to the `active` directory. The button `Activate` does that. The reason for this two staged approach is to allow users saving intermediate definitions without impacting the currently running service. @@ -105,7 +107,6 @@ Once a rule file is complete, it must be copied from the `inactive` to the `acti ### Step 3: Topics An input topic can contain messages of different subjects, hence the dialog asks what input topic should cater which rule files (remember, a rule file work on a specific subject) and what the output topic should be. -Scaling is achieved by increasing the number of KStream instances used for this topic or by spinning up more containers with the same settings. The screen also allows to copy the rule files being used into the active folder to simplify activating each from the rule file dialog. @@ -113,14 +114,8 @@ The screen also allows to copy the rule files being used into the active folder ### Result -If the input schema has an `_audit` field, it is assumed the schema contains the structure for the rule results already. This would be a preferred case, because input schema = output schema. -In all other cases the input schema's latest version is read from the schema registry and the additional `_audit` structure is being created. This will be the output schema. - -The reason for using the latest is because of schema evolution scenarios. It might happen that schema id 2 has an additional field `NAME` compared to schema id 1. So the subject evolved from schema 1 to 2. The KStream does receive a message with schema 2 first, adds the `_audit` field and it is saved in the schema registry. The next message has an input schema 1 and if that would get registered as output schema next, it would fail due to the missing `NAME` field. Hence both must be outputted with the latest schema version always. This also explains why adding the `_audit` on the original input schema already is preferred. -The overall rule result is stored (did it pass all tests?), a list of all rules executed and their individual results. -Querying this data allows detailed reporting which records were processed by what rule and the results. - -The exact Avro schema field definition can be found [here](docs/audit-schema.md) +If the input schema has an `_audit` field, it is assumed the schema contains the structure for the rule results already. This is the preferred case, because input schema = output schema. +In all other cases the input schema's latest version is read from the schema registry and the additional `_audit` structure is being added. This will be the output schema. But this should be considered a fallback option only. If possible the `_audit` field should be added to the input schema already. The exact Avro schema field definition can be found [here](docs/audit-schema.md) @@ -143,30 +138,37 @@ If no file name is specified, the name will be `partition__offset_ 10`. -Formulas can also access other fields as well, e.g. for the field `lastname` the condition is `lastname != null || firstname != null` meaning, either lastname or firstname must be set. +Formulas can also access other fields as well, e.g. for the field `lastname` the condition is `lastname != null || firstname != null` meaning, either `lastname` or `firstname` must be set. -To access fields of a deeper level, e.g. when lastname is set, the first record in the addresses array must have addresstype HOME, many cases must be considered. What if the addresses array is empty? What if the addresses array is an empty array? What if the addresstype field is null for some records? This would lead to a very long condition but there are shortcuts: `lastname != null && 'HOME' == addresses?[0]?addresstype`. The `?` tells that it is okay if the field is null and it should return null then. +To access fields of a deeper level, e.g. when `lastname` is set, the first record in the `addresses` array must have `addresstype = HOME`, dot and array operators are used. +The formula might look like: `lastname != null && 'HOME' == addresses[0].addresstype` -Formulas at a deeper level, e.g. a formula for `addresstype`, can access fields from the higher level via `parent`. Example: `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype` tells that for addresstype HOME the lastname must be set as well, for all other addresstype values there is no such requirement. +But this formula might get null pointer exceptions. If `addresses` is null, accessing its element at index 0 will cause an error. The normal approach would be to add `== null?` everywhere. The easier method is to make the accessor optional via the `?` modifier. + +Example: `lastname != null && 'HOME' == addresses?[0]?.addresstype`. The `?` tells it is okay if the field is null and the accessor should return `null` then. + +Formulas at a deeper level, e.g. a formula for `addresstype`, can access fields from the higher level via `parent`. Example: `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype` tells that for `addresstype` HOME the `lastname` must be set as well, for all other addresstype values there is no such requirement. Note: In this example the customer record has an addresses array of address record. When within an address record, the parent is the array and its parent is the customer record with the lastname field. These are just very basic examples, more below. -If a condition returns false, maybe the correct value can be derived and then the rule as such has not been violated. The optional formula entered in `..change value to...` is executed only if the condition returns false and it overwrites the field value. Example: The rule for the field `addresstype` is `addresstype.upper() != addresstype` and the change-value formula is `addresstype.upper()`. This does change all values to upper case. In such a case the rule is considered to have been passed instead of failed, and that is accomplished via the `if test failed...` setting `=Pass`. +If a condition returns false, maybe the correct value can be derived and then the rule as such has not been violated. The optional formula entered in `..change value to...` is executed only if the condition returns false and it overwrites the field value. + +Example: The rule for the field `addresstype` is `addresstype.upper() != addresstype` and the change-value formula is `addresstype.upper()`. This does change all values to upper case. In such a case the rule is considered to have been passed instead of failed, and that is accomplished via the `if test failed...` setting `=Pass`. Each test and its rule result is available in the audit structure and hence we can see that this test was executed and passed. ### Rule Sets -In above examples there was the test `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype`. A more realistic formula would say: if HOME then the lastname must be set, if COMPANY the companyname field must be set and other allowed values are SHIPPING and BILLING. This would get quite a long formula. +In above examples there was the test `('HOME' == addresstype && parent.parent.lastname != null) || 'HOME' != addresstype`. A more realistic formula would say: if HOME then the lastname must be set, if COMPANY the companyname field must be set and other allowed values are SHIPPING and BILLING. This would bwcome quite a long formula. To simplify that, the Test-Set rule allows to specify multiple individual rules @@ -198,7 +200,7 @@ Each record also has a `(more)` node to enter rules that do not belong to a sing Another typical scenario is to standardize the values first, e.g. gender should be `M`, `F`, `X`, `?` only and then create rules based on the standardized values. In other words, rules build on each other. To enable that, the rule file consists of multiple tabs - the rule steps - and each tab is executed one after the other. -### Rule syntax +### Complete rule syntax For more examples [see](docs/rule-syntax.md) @@ -206,6 +208,17 @@ For more examples [see](docs/rule-syntax.md) ### FAQs * Can a new output column be created via a formula? No, the output schema is always derived from the input schema, for two reasons. First, if adding fields would be possible, it might collide when the input subject is evolved to a new version. The other reason is performance. It would require to create a new output message from scratch, copying the majority of the data even if nothing has changed. That would be too expensive. So the only option is to add the column to the input schema first. + + * How does the solution scale? The straight forward implementation would have been as KStream. A message is received, parsed, rules applied, serialized back to Avro and sent to the output topic. The typical processing time is 100ms. That is not bad but on the other hand, as all is done sequentially in a KStream, it would mean only 10 messages/sec can be processed per partition. Therefore the received messages are put into a pool for being processed in parallel and put into the output topic in the same order as received. This ensures all CPUs can be utilized when processing one partition. If multiple partitions should be processed in parallel, the instance count can be set to a value greater than one. And as usual with Kafka, deploying multiple containers allows the parallel processing of partitions across multiple servers. + + * Is the order preserved? Yes, see above. Messages are processed in parallel, but produced in the order they were received. And if the partition count in the source is the same as in the target topic, the data will be put in the same partition id as it was read from. + + * If the input schema has no `__audit` structure, which is required for the rule results, the field is added. But not to the input schema version but the latest subject version. Why is that? The reason for using the latest is because of schema evolution scenarios. It might happen that schema id 2 has an additional field `NAME` compared to schema id 1. So the subject evolved from version 1 to 2. The consumer does receive a message with schema 2 first, adds the `_audit` field and it is saved in the schema registry. The next message has an input schema 1 and if that would get registered as output schema next, it would fail due to the missing `NAME` field. Hence both must be outputted with the latest schema version always. This also explains why adding the `_audit` on the original input schema already is preferred. + + * How to debug? The log level can be set by adding `ruleservice.loglevel=DEBUG` to the Kafka properties file. + + * The average processing time is 1 second, the rows per second is 10. How can that be? Parallel processing. The processing time is per message, even if 10 messages are transformed in parallel. + diff --git a/WebContent/Controller.controller.js b/WebContent/Controller.controller.js index a18ea70..7819dd7 100644 --- a/WebContent/Controller.controller.js +++ b/WebContent/Controller.controller.js @@ -4,6 +4,23 @@ sap.ui.define([ "sap/ui/core/mvc/Controller"], function(Controller) { return Controller.extend("io.rtdi.bigdata.rulesservice.Controller", { onInit : function() { + var oModel = new sap.ui.model.json.JSONModel(); + var statusbox = this.getView().byId("systemstatus"); + oModel.attachRequestCompleted(function(event) { + if (event.getParameter("success")) { + statusbox.setVisible(true); + } + }); + oModel.attachRequestFailed(function(event) { + var text = event.getParameter("responseText"); + sap.m.MessageToast.show("Reading status info failed: " + text); + }); + this.getView().setModel(oModel); + oModel.loadData("./rest/config/service"); + }, + onRefresh : function(event) { + var model = this.getView().getModel(); + model.loadData("./rest/config/service"); } }); diff --git a/WebContent/View.view.xml b/WebContent/View.view.xml index 3156da4..7d299a5 100644 --- a/WebContent/View.view.xml +++ b/WebContent/View.view.xml @@ -13,9 +13,11 @@ + +