Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c0ae501
test-2 - clean branch
xkondix May 27, 2023
5786f45
test-2 - adding filtering of values equal to 0 in KafkaStreamsProcess…
xkondix May 27, 2023
782a4f8
test-2 - clean imports
xkondix May 27, 2023
650e73b
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
0dbdbf7
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
d206c21
test-2 - delete timestampStream
xkondix May 27, 2023
65506ef
test-2 - update README
xkondix Jun 11, 2023
c6c0df4
test-2 - update README
xkondix Jun 11, 2023
3fef54d
test-2 - update README
xkondix Jun 11, 2023
aa456ea
test-2 - update README
xkondix Jun 11, 2023
c3bb8d8
test-2 - fix kafkaDelay0Half Box Chart
xkondix Jun 13, 2023
34e8244
test-2 - Adding a list of averages script_en
xkondix Jun 13, 2023
924a1b0
test-2 - Adding a list of averages script_pl
xkondix Jun 13, 2023
69b4cc9
test-2 - int(countNonEmptyLine/2)
xkondix Jun 13, 2023
b88a46a
test-2 - add results test_kafka_d3_full
xkondix Jun 13, 2023
9756e7f
test-2 - add results test_kafka_d0_full
xkondix Jun 13, 2023
dbe288f
test-2 - add results test_kafka_d0_half
xkondix Jun 13, 2023
85154d3
test-2 - add results test_spark_d3_full
xkondix Jun 13, 2023
0e2e0b7
test-2 - add results test_spark_d0_half
xkondix Jun 13, 2023
9a42b60
test-2 - add results test_spark_d0_full
xkondix Jun 13, 2023
8c3601d
test-2 - add results
xkondix Jun 13, 2023
12cf7ea
test-3 - fix report_pl (Spark Structured Streaming - test_spark_d0_half)
xkondix Jun 16, 2023
3d907d1
test-2 - update reports
xkondix Jun 16, 2023
8edbd7e
test-2 - update readme
xkondix Jun 21, 2023
4775de9
master - update units in charts
xkondix Jul 2, 2023
4a23d8f
master - comment title in charts
xkondix Jul 2, 2023
144a534
master - fix maven
xkondix Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
<name>Consumer</name>
<description>Consumer</description>


<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
9 changes: 8 additions & 1 deletion KafkaStreams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
<name>KafkaStreams</name>
<description>KafkaStreams</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -30,4 +36,5 @@
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class KafkaStreamsProcess {

@Bean
public Function<KStream<String, DataModel>, KStream<String, DataModel>> process() {
return kStream -> kStream;
return kStream -> kStream.filter((key, value) -> value.getValue() > 0);
}


Expand Down
7 changes: 7 additions & 0 deletions Producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<packaging>jar</packaging>
<description>Producer</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import com.kowalczyk.konrad.utils.IoTSimulation;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down
215 changes: 27 additions & 188 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,207 +1,46 @@
# MsgBrokerSys
# test-2

The App is designed to compare Kafka Streams with Apache Spark Structed Streaming. The application was created based on multiple modules with their own pom files and set as the parent of the main pom. The exception is the Spark Streming application, which has Spring Boot 2.7.10 as a parent.
The test focuses on measuring the time taken by Kafka Streams and Spark Structured Streaming.
The test includes filtering out invalid data.

On the test-0 branch there are tests that compare whether data from x samples are similar. On the test-1 to test-15 branch, tests will be performed in different configurations. More description of these branches in the README.md file.
Operations used in data processing:
- Reading values from Topic "Order"
- Filtering invalid data that has a value of 0. I inserted 0 for missing fields with a value.
- Sending results on Topic "Summary"

The App includes a script written in python for data analysis. The name of the save files can be changed in the ConsumerConf class. The save files are used by a script (unfortunately not written according to the art, but simply to understand) that generates graphs and pdf reportd.
The test has 3 configurations, which are described at the end of the description.

On each branch you can check what naming I used for the results in the results folder. The folder dataAfterAnalysis contains the script, charts and the pdf.
For example:
### Input data / Output data
The data used for testing is taken from the file "DsDusznikMOB_PM25.csv". The full set is 8760, half 4380. After rejecting invalid data, the full set is 8646, half 4313.

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/c010b295-d383-4660-a2e0-dea2bc94819f)
### Results
Examples of results used for test below (folder "results"), instead of *, insert numbers from 1-10.

- test_kafka_d3_full_*

Before starting the services, you need to run the docker-compose.yaml file once. This file downloads the images in the given configuration. The next step is to run these images, for this you can use Docker Desktop application.
- test_kafka_d0_full_*

![image](https://user-images.githubusercontent.com/52525583/235375242-7946f5a2-f7b9-4ab2-b901-c0b653a68640.png)
- test_kafka_d0_half_*

- test_spark_d3_full_*

The project also includes data files such as "DsDusznikMOB_PM25.csv." These files provide data to the Producer service, which sends them to the topic Order.
- test_spark_d0_full_*

- test_spark_d0_half_*

The application includes 5 modules:
- SparkStreaming
- KafkaStreams
- Producer
- Consumer
- Utils
The charts and reports created from the results are located in the "dataAfterAnalysis" folder.

The "results" folder stores the results of the tests performed. On the master branch tests are performed for Kafka and Spark.
The tests were executed with a delay ".delayElements(Duration.ofMillis(50));". For each test, you must change the file name in the ConsumerConf class in the writeDataAtOnce method.
### Configurations

![image](https://user-images.githubusercontent.com/52525583/235783861-79d2da5a-ecf4-49a9-b6fb-a18235660928.png)
Configurations are changed based on 2 classes, and look like this:

- d3_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 3ms.
- d0_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class.

Below is a description and requirements of each service and docker-compse file.
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)

- d0_half -> This is a configuration that sends half data (4380) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class and IoTSimulation class.

--------------------------------------------------------------------------------------------------------------
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)

## docker-compose.yaml

The docker-compose file contains a zookeeper image, 3 kafka brokers and a manager that makes it easy to preview the port http://localhost:9000/ kafka cluster.


Information about image versions:
- confluentinc/cp-zookeeper:7.0.3
- confluentinc/cp-kafka:7.0.3
- ghcr.io/eshepelyuk/dckr/cmak-3.0.0.6:latest


Links to source:
- https://hub.docker.com/r/confluentinc/cp-kafka
- https://hub.docker.com/r/confluentinc/cp-zookeeper
- https://github.com/eshepelyuk/cmak-docker/pkgs/container/dckr%2Fcmak-3.0.0.6

--------------------------------------------------------------------------------------------------------------

## SparkStreaming:

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.


Information about the application:
- uses version 3.4.0 of spark libraries
- uses Spark Structured Streaming to process the stream [1]
- uses Java 17
- uses Spring boot 2.7.10 version [2]
- uses Apache Hadoop 3.3.1 version


Requirements to run the application:
- install versions of Apache Hadoop on your computer, or download some image and run it
- if you have a windows system, you need to move additional files to the bin folder of the Apache Hadoop application [3]
- in my case, I had to add paths to Apache Hadoop folders to VM options [4]
- if you want to process data from kafka, then you need to run the zookeeper and kafka image from docker compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)
- create a folder to store archive data [5]



Spark Structured Streaming [1] ->
I use Spark Structured Streaming instead of Spark Streaming because it is newer and supported. Links to read:
- https://issues.apache.org/jira/browse/SPARK-42075
- https://spark.apache.org/docs/latest/streaming-programming-guide.html#note

![image](https://user-images.githubusercontent.com/52525583/235370647-dcbcb79d-266c-4d6f-bda0-b1f8b6edc4a5.png)


Spring Boot version [2] ->
The application uses Spring boot version 2.7.10 because it is missing the Jersey servlet and maybe other things,
or it has an incompatible version of the component data with Spark 3.4.0 on Spring Boot version 3.0.5 , so it can't run properly.


Additional files [3] ->
Repo with additional files to copy and past to folder bin https://github.com/kontext-tech/winutils/tree/master/hadoop-3.3.1/bin.
For more detailed instructions https://kontext.tech/article/829/install-hadoop-331-on-windows-10-step-by-step-guide.
On linux you should not have this problem.


VM options [4] ->
--add-exports
java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-Dhadoop.home.dir=C:/hadoop-3.3.1
-Djava.library.path=C:/hadoop-3.3.1/bin

![image](https://user-images.githubusercontent.com/52525583/235370169-230fab69-517a-4008-b66f-acc1f7ced9d9.png)


Archive data [5] ->
Create a folder and add a path in the SparkConfiguration class. In my case it is ".option("checkpointLocation", "C:\\checkpoint")".
Read more here https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html or https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.

![image](https://user-images.githubusercontent.com/52525583/235376976-86ff5110-c248-4dbc-9998-6dee3ad98fe0.png)

--------------------------------------------------------------------------------------------------------------

## KafkaStreams:

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.


Information about the application:
- uses version 3.3.2 of kafka libraries
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file


Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)

--------------------------------------------------------------------------------------------------------------

## Producer:

The service is designed to simulate an IoT device. It is the responsibility of the service to retrieve data from the csv file and send it to the topic Order.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file

--------------------------------------------------------------------------------------------------------------

## Consumer:

The service is designed to receive data from the Summary topic and save it to a file.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- requires a running Kafka Stream or Spark Streaming service to process the data

--------------------------------------------------------------------------------------------------------------

## Utils:

The service is designed to store classes needed by other services. The reason for the service is the possibility of duplication of classes in each service.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version

--------------------------------------------------------------------------------------------------------------

## Python script for data analysis ("analysis_en.py", "analysis_pl.py")

The script was created for data analysis. The script creates charts, which it places in the charst_en or chart_pl folder, depending on which language you run it in (There are 2 versions, Polish and English). The script also creates pdfs, also in 2 languages depending on which one you run it. The script is written in a simple way, unfortunately it is not a very clean solution, but it is very easy to understand. The script contains 6 same sections that perform calculations for the given data.


Information about the script:
- uses Python 3
- uses numpy (pip install numpy)
- uses matplotlib (pip install matplotlib)
- uses reportlab (pip install reportlab)

Examples of results used for script below (folder "results"), instead of *, insert numbers from 1-10 (There are 10 repetitions in tests 1-15, the number chosen based on the test-0 branch):

test_kafka_d3_full_*

test_kafka_d0_full_*

test_kafka_d0_half_*

test_spark_d3_full_*

test_spark_d0_full_*

test_spark_d0_half_*

The charts and reports created from the test results are located in the "dataAfterAnalysis" folder.

--------------------------------------------------------------------------------------------------------------
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/07fb6580-36a3-4fcd-bb51-9048d88b7d6e)
19 changes: 17 additions & 2 deletions SparkStreaming/pom.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<!-- <parent>-->
<!-- <groupId>com.kowalczyk.konrad</groupId>-->
<!-- <artifactId>MsgBrokerSys</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- </parent>-->

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
Expand All @@ -20,11 +22,24 @@
<name>SparkStreaming</name>
<description>SparkStreaming</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -74,12 +89,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_json;
import static org.apache.spark.sql.types.DataTypes.*;


Expand Down Expand Up @@ -49,7 +51,6 @@ public StructType getSchema() {
.add("indicator", StringType)
.add("stationCode", StringType)
.add("timestampSend", LongType)
.add("timestampStream", LongType)
.add("timestampConsumer", LongType);
}

Expand All @@ -63,9 +64,14 @@ public void process() throws Exception {
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value"), getSchema()).as("data"), col("key").as("key"))
.selectExpr("key", "data.*");

df.writeStream()
Dataset<Row> filteredDf = df.filter(col("value").gt(0));

filteredDf.selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "Summary")
Expand Down
Loading