Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f532393
test-2 - clean branch
xkondix May 27, 2023
82516e5
test-2 - adding filtering of values equal to 0 in KafkaStreamsProcess…
xkondix May 27, 2023
7cffc6e
test-2 - clean imports
xkondix May 27, 2023
a61a833
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
be9ae92
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
bcf63f2
test-3 - add field averagingValue to model
xkondix May 28, 2023
638c4eb
test-3 - add averagingValue to script en
xkondix May 28, 2023
931c79e
test-3 - add solution to count average to kafka streams
xkondix May 28, 2023
b39a73c
test-3 - add solution to count average to spark structured streaming.…
xkondix Jun 8, 2023
14e9026
test-3 - add averagingValue to script pl
xkondix Jun 9, 2023
e264f6a
test-3 - add pl descriptions in sparkDelay0HalfValueAverage
xkondix Jun 10, 2023
34d88d7
test-3 - update README
xkondix Jun 11, 2023
edb0e82
test-3 - update README
xkondix Jun 11, 2023
029cc3f
test-3 - update README
xkondix Jun 11, 2023
05e9cea
test-3 - update README
xkondix Jun 11, 2023
9613e59
test-3 - update README
xkondix Jun 11, 2023
43dc263
test-2 - fix kafkaDelay0Half Box Chart
xkondix Jun 13, 2023
368d7b9
test-2 - Adding a list of averages script_en
xkondix Jun 13, 2023
a8e6aa3
test-2 - Adding a list of averages script_pl
xkondix Jun 13, 2023
210ca4e
test-3 - update scripts
xkondix Jun 13, 2023
7236887
test-3 - add results test_kafka_d0_full
xkondix Jun 15, 2023
7666b45
test-3 - add results test_kafka_d0_half
xkondix Jun 15, 2023
80c13d5
test-3 - add results test_kafka_d3_full
xkondix Jun 15, 2023
4db0cc6
test-3 - add results test_spark_d3_full
xkondix Jun 15, 2023
60600ae
test-3 - add results test_spark_d0_full
xkondix Jun 16, 2023
4e9bf44
test-3 - add results test_spark_d0_half
xkondix Jun 16, 2023
4a7d378
test-3 - fix report_pl (Spark Structured Streaming - test_spark_d0_half)
xkondix Jun 16, 2023
f576531
test-3 - add reports and charts
xkondix Jun 16, 2023
355fd11
test-3 - fix report_en (kafkaDelay0HalfValueLine)
xkondix Jun 16, 2023
d2be58b
test-3 - update readme
xkondix Jun 21, 2023
d75e427
test-3 - update units in charts
xkondix Jul 2, 2023
e05644d
master - comment title in charts
xkondix Jul 2, 2023
fe7c093
test-3 - comment title in charts 2
xkondix Jul 2, 2023
6778e17
master - fix maven
xkondix Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
<name>Consumer</name>
<description>Consumer</description>


<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
9 changes: 8 additions & 1 deletion KafkaStreams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
<name>KafkaStreams</name>
<description>KafkaStreams</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -30,4 +36,5 @@
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.kowalczyk.konrad.utils.DataModel;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -12,7 +15,34 @@ public class KafkaStreamsProcess {

@Bean
public Function<KStream<String, DataModel>, KStream<String, DataModel>> process() {
return kStream -> kStream;
return kStream -> kStream
.filter((key, value) -> value.getValue() > 0)
.transformValues(new ValueTransformerSupplier<DataModel, DataModel>() {
@Override
public ValueTransformer<DataModel, DataModel> get() {
return new ValueTransformer<DataModel, DataModel>() {
private double sum = 0;
private long count = 0;

@Override
public void init(ProcessorContext context) {
}

@Override
public DataModel transform(DataModel value) {
sum += value.getValue();
count++;
double average = sum / count;
value.setAveragingValue(average);
return value;
}

@Override
public void close() {
}
};
}
});
}


Expand Down
7 changes: 7 additions & 0 deletions Producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<packaging>jar</packaging>
<description>Producer</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import com.kowalczyk.konrad.utils.IoTSimulation;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down
225 changes: 41 additions & 184 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,207 +1,64 @@
# MsgBrokerSys
# test-3

The App is designed to compare Kafka Streams with Apache Spark Structed Streaming. The application was created based on multiple modules with their own pom files and set as the parent of the main pom. The exception is the Spark Streming application, which has Spring Boot 2.7.10 as a parent.
The test focuses on measuring the time taken by Kafka Streams and Spark Structured Streaming to calculate the average.
Both solutions store the current state and calculate on the fly each value.

On the test-0 branch there are tests that compare whether data from x samples are similar. On the test-1 to test-15 branch, tests will be performed in different configurations. More description of these branches in the README.md file.
Operations used in data processing:
- Reading values from Topic "Order"
- Filtering invalid data that has a value of 0. I inserted 0 for missing fields with a value.
- Counting the current average
- Sending results on Topic "Summary"

The App includes a script written in python for data analysis. The name of the save files can be changed in the ConsumerConf class. The save files are used by a script (unfortunately not written according to the art, but simply to understand) that generates graphs and pdf reportd.
The test has 3 configurations, which are described at the end of the description.

On each branch you can check what naming I used for the results in the results folder. The folder dataAfterAnalysis contains the script, charts and the pdf.
For example:
### Input data / Output data
The data used for testing is taken from the file "DsDusznikMOB_PM25.csv". The full set is 8760, half 4380. After rejecting invalid data, the full set is 8646, half 4313.

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/c010b295-d383-4660-a2e0-dea2bc94819f)
### Kafka Streams
In the case of Kafka, I used transformValues to store the state. This method is deprecated, but it works well. I did not use the newer processValues method because I would have had to upgrade the Kafka versions, and I did not want to do that. I think I missed this FixedKeyContextualProcessor class, more here https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API?fbclid=IwAR0xCuQ17UlNYHyOW-hrQlob_6WTXDsLv6RHoujPH9fPgyJxvRgmueYDxR8

### Spark Structured Streaming
In Spark, unfortunately, I was not able to use the method to hold the state. I tried to implement the MapGroupsWithStateFunction, but I fell down, and I did not want to waste time, so I used java variables.
This is not the best solution, but it allowed me to run a test.

Before starting the services, you need to run the docker-compose.yaml file once. This file downloads the images in the given configuration. The next step is to run these images, for this you can use Docker Desktop application.
### Problems
The problem was sending as many records as receiving, because I could not return different values of records in different tests, I had to hold the state and return values for the actual values that were processed.
So the problem was actually holding the state and returning the current average for each row.

![image](https://user-images.githubusercontent.com/52525583/235375242-7946f5a2-f7b9-4ab2-b901-c0b653a68640.png)
### Errors
If there is an error in the spark, it will probably be about the scheme. The problem can occur when changing the branch as the processing is not completed and the application is restarted.
For solving this problem there are 2 solutions:
- Remove all things from the "checkpointLocation" folder.
- Add such a piece of code "sparkSession().conf().set("spark.sql.streaming.stateStore.stateSchemaCheck", "false");" of course, you can also add when creating a session.

### Results
Examples of results used for test below (folder "results"), instead of *, insert numbers from 1-10.

The project also includes data files such as "DsDusznikMOB_PM25.csv." These files provide data to the Producer service, which sends them to the topic Order.
- test_kafka_d3_full_*

- test_kafka_d0_full_*

The application includes 5 modules:
- SparkStreaming
- KafkaStreams
- Producer
- Consumer
- Utils
- test_kafka_d0_half_*

The "results" folder stores the results of the tests performed. On the master branch tests are performed for Kafka and Spark.
The tests were executed with a delay ".delayElements(Duration.ofMillis(50));". For each test, you must change the file name in the ConsumerConf class in the writeDataAtOnce method.
- test_spark_d3_full_*

![image](https://user-images.githubusercontent.com/52525583/235783861-79d2da5a-ecf4-49a9-b6fb-a18235660928.png)
- test_spark_d0_full_*

- test_spark_d0_half_*

Below is a description and requirements of each service and docker-compse file.
The charts and reports created from the results are located in the "dataAfterAnalysis" folder.

### Configurations

--------------------------------------------------------------------------------------------------------------
Configurations are changed based on 2 classes, and look like this:

## docker-compose.yaml
- d3_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 3ms.
- d0_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class.

The docker-compose file contains a zookeeper image, 3 kafka brokers and a manager that makes it easy to preview the port http://localhost:9000/ kafka cluster.
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)

- d0_half -> This is a configuration that sends half data (4380) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class and IoTSimulation class.

Information about image versions:
- confluentinc/cp-zookeeper:7.0.3
- confluentinc/cp-kafka:7.0.3
- ghcr.io/eshepelyuk/dckr/cmak-3.0.0.6:latest
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)


Links to source:
- https://hub.docker.com/r/confluentinc/cp-kafka
- https://hub.docker.com/r/confluentinc/cp-zookeeper
- https://github.com/eshepelyuk/cmak-docker/pkgs/container/dckr%2Fcmak-3.0.0.6

--------------------------------------------------------------------------------------------------------------

## SparkStreaming:

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.


Information about the application:
- uses version 3.4.0 of spark libraries
- uses Spark Structured Streaming to process the stream [1]
- uses Java 17
- uses Spring boot 2.7.10 version [2]
- uses Apache Hadoop 3.3.1 version


Requirements to run the application:
- install versions of Apache Hadoop on your computer, or download some image and run it
- if you have a windows system, you need to move additional files to the bin folder of the Apache Hadoop application [3]
- in my case, I had to add paths to Apache Hadoop folders to VM options [4]
- if you want to process data from kafka, then you need to run the zookeeper and kafka image from docker compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)
- create a folder to store archive data [5]



Spark Structured Streaming [1] ->
I use Spark Structured Streaming instead of Spark Streaming because it is newer and supported. Links to read:
- https://issues.apache.org/jira/browse/SPARK-42075
- https://spark.apache.org/docs/latest/streaming-programming-guide.html#note

![image](https://user-images.githubusercontent.com/52525583/235370647-dcbcb79d-266c-4d6f-bda0-b1f8b6edc4a5.png)


Spring Boot version [2] ->
The application uses Spring boot version 2.7.10 because it is missing the Jersey servlet and maybe other things,
or it has an incompatible version of the component data with Spark 3.4.0 on Spring Boot version 3.0.5 , so it can't run properly.


Additional files [3] ->
Repo with additional files to copy and past to folder bin https://github.com/kontext-tech/winutils/tree/master/hadoop-3.3.1/bin.
For more detailed instructions https://kontext.tech/article/829/install-hadoop-331-on-windows-10-step-by-step-guide.
On linux you should not have this problem.


VM options [4] ->
--add-exports
java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-Dhadoop.home.dir=C:/hadoop-3.3.1
-Djava.library.path=C:/hadoop-3.3.1/bin

![image](https://user-images.githubusercontent.com/52525583/235370169-230fab69-517a-4008-b66f-acc1f7ced9d9.png)


Archive data [5] ->
Create a folder and add a path in the SparkConfiguration class. In my case it is ".option("checkpointLocation", "C:\\checkpoint")".
Read more here https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html or https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.

![image](https://user-images.githubusercontent.com/52525583/235376976-86ff5110-c248-4dbc-9998-6dee3ad98fe0.png)

--------------------------------------------------------------------------------------------------------------

## KafkaStreams:

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.


Information about the application:
- uses version 3.3.2 of kafka libraries
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file


Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)

--------------------------------------------------------------------------------------------------------------

## Producer:

The service is designed to simulate an IoT device. It is the responsibility of the service to retrieve data from the csv file and send it to the topic Order.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file

--------------------------------------------------------------------------------------------------------------

## Consumer:

The service is designed to receive data from the Summary topic and save it to a file.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- requires a running Kafka Stream or Spark Streaming service to process the data

--------------------------------------------------------------------------------------------------------------

## Utils:

The service is designed to store classes needed by other services. The reason for the service is the possibility of duplication of classes in each service.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version

--------------------------------------------------------------------------------------------------------------

## Python script for data analysis ("analysis_en.py", "analysis_pl.py")

The script was created for data analysis. The script creates charts, which it places in the charst_en or chart_pl folder, depending on which language you run it in (There are 2 versions, Polish and English). The script also creates pdfs, also in 2 languages depending on which one you run it. The script is written in a simple way, unfortunately it is not a very clean solution, but it is very easy to understand. The script contains 6 same sections that perform calculations for the given data.


Information about the script:
- uses Python 3
- uses numpy (pip install numpy)
- uses matplotlib (pip install matplotlib)
- uses reportlab (pip install reportlab)

Examples of results used for script below (folder "results"), instead of *, insert numbers from 1-10 (There are 10 repetitions in tests 1-15, the number chosen based on the test-0 branch):

test_kafka_d3_full_*

test_kafka_d0_full_*

test_kafka_d0_half_*

test_spark_d3_full_*

test_spark_d0_full_*

test_spark_d0_half_*

The charts and reports created from the test results are located in the "dataAfterAnalysis" folder.

--------------------------------------------------------------------------------------------------------------
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/07fb6580-36a3-4fcd-bb51-9048d88b7d6e)
Loading