Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 8 additions & 0 deletions Consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
<name>Consumer</name>
<description>Consumer</description>


<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
9 changes: 8 additions & 1 deletion KafkaStreams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
<name>KafkaStreams</name>
<description>KafkaStreams</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -30,4 +36,5 @@
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>

</project>
7 changes: 7 additions & 0 deletions Producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<packaging>jar</packaging>
<description>Producer</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
180 changes: 36 additions & 144 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,176 +1,68 @@
# MsgBrokerSys
# test-0

The App is designed to compare Kafka Streams with Apache Spark Structed Streaming. The application was created based on multiple modules with their own pom files and set as the parent of the main pom. The exception is the Spark Streming application, which has Spring Boot 2.7.10 as a parent.
The test focuses on finding the right number of sample repetitions for the next tests. The test involves receiving data and sending it to the next subject. The test was done for 1,2,5,10,15 and 20 repeats.
Of course, for Kafka Streams and Spark Structured Streaming.

On the test-0 branch there are tests that compare whether data from x samples are similar. On the test-1 to test-15 branch, tests will be performed in different configurations. More description of these branches in the README.md file.
### Input data / Output data
The data used for testing is taken from the file "DsDusznikMOB_PM25.csv". The full set is 8760, half 4380.

The App includes a script written in python for data analysis. The name of the save files can be changed in the ConsumerConf class. The save files are used by a script (unfortunately not written according to the art, but simply to understand) that generates graphs and pdf reportd.
### Results
Examples of results used for test below (folder "results"), instead of *, insert numbers from 1-10.

On each branch you can check what naming I used for the results in the results folder. The folder dataAfterAnalysis contains the script, charts and the pdf.
For example:
- test_kafka_d3_full_*

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/c010b295-d383-4660-a2e0-dea2bc94819f)
- test_kafka_d0_full_*

- test_kafka_d0_half_*

Before starting the services, you need to run the docker-compose.yaml file once. This file downloads the images in the given configuration. The next step is to run these images, for this you can use Docker Desktop application.
- test_spark_d3_full_*

![image](https://user-images.githubusercontent.com/52525583/235375242-7946f5a2-f7b9-4ab2-b901-c0b653a68640.png)
- test_spark_d0_full_*

- test_spark_d0_half_*

The project also includes data files such as "DsDusznikMOB_PM25.csv." These files provide data to the Producer service, which sends them to the topic Order.
The charts and reports created from the results are located in the "dataAfterAnalysis" folder.

### Configurations

The application includes 5 modules:
- SparkStreaming
- KafkaStreams
- Producer
- Consumer
- Utils
Configurations are changed based on 2 classes, and look like this:

The "results" folder stores the results of the tests performed. On the master branch tests are performed for Kafka and Spark.
The tests were executed with a delay ".delayElements(Duration.ofMillis(50));". For each test, you must change the file name in the ConsumerConf class in the writeDataAtOnce method.
- d3_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 3ms.
- d0_full -> This is a configuration that sends all data (8760) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class.

![image](https://user-images.githubusercontent.com/52525583/235783861-79d2da5a-ecf4-49a9-b6fb-a18235660928.png)
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)

- d0_half -> This is a configuration that sends half data (4380) on Topic "Order" and sends it with a delay of 0ms. The change occurs in the Prodcuer class and IoTSimulation class.

Below is a description and requirements of each service and docker-compse file.
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/afbe51f2-cfee-48ae-aa1b-0d7c7ce64928)

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/07fb6580-36a3-4fcd-bb51-9048d88b7d6e)

--------------------------------------------------------------------------------------------------------------
# Time results for a given number of samples

## docker-compose.yaml
## kafkaDelay3Full

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/1e61ed17-29e6-465b-ae1c-b11058cc43d5)

The docker-compose file contains a zookeeper image, 3 kafka brokers and a manager that makes it easy to preview the port http://localhost:9000/ kafka cluster.
## kafkaDelay0Full

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/bb82fafe-c26b-4644-8fc4-13805006e60f)

Information about image versions:
- confluentinc/cp-zookeeper:7.0.3
- confluentinc/cp-kafka:7.0.3
- ghcr.io/eshepelyuk/dckr/cmak-3.0.0.6:latest
## kafkaDelay0Half

![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/fdcef799-d824-4d58-9fd4-33005982829e)

Links to source:
- https://hub.docker.com/r/confluentinc/cp-kafka
- https://hub.docker.com/r/confluentinc/cp-zookeeper
- https://github.com/eshepelyuk/cmak-docker/pkgs/container/dckr%2Fcmak-3.0.0.6
## sparkDelay3Full

--------------------------------------------------------------------------------------------------------------
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/a7046227-a81e-4aed-b8c4-134746d1ad95)

## SparkStreaming:
## sparkDelay0Full

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/a9776378-f84c-4426-93cc-214b50071ac8)

## sparkDelay0Half

Information about the application:
- uses version 3.4.0 of spark libraries
- uses Spark Structured Streaming to process the stream [1]
- uses Java 17
- uses Spring boot 2.7.10 version [2]
- uses Apache Hadoop 3.3.1 version
![image](https://github.com/xkondix/MsgBrokerSys/assets/52525583/ae03d40d-1f9a-440a-bb97-de919a33c481)


Requirements to run the application:
- install versions of Apache Hadoop on your computer, or download some image and run it
- if you have a windows system, you need to move additional files to the bin folder of the Apache Hadoop application [3]
- in my case, I had to add paths to Apache Hadoop folders to VM options [4]
- if you want to process data from kafka, then you need to run the zookeeper and kafka image from docker compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)
- create a folder to store archive data [5]



Spark Structured Streaming [1] ->
I use Spark Structured Streaming instead of Spark Streaming because it is newer and supported. Links to read:
- https://issues.apache.org/jira/browse/SPARK-42075
- https://spark.apache.org/docs/latest/streaming-programming-guide.html#note

![image](https://user-images.githubusercontent.com/52525583/235370647-dcbcb79d-266c-4d6f-bda0-b1f8b6edc4a5.png)


Spring Boot version [2] ->
The application uses Spring boot version 2.7.10 because it is missing the Jersey servlet and maybe other things,
or it has an incompatible version of the component data with Spark 3.4.0 on Spring Boot version 3.0.5 , so it can't run properly.


Additional files [3] ->
Repo with additional files to copy and past to folder bin https://github.com/kontext-tech/winutils/tree/master/hadoop-3.3.1/bin.
For more detailed instructions https://kontext.tech/article/829/install-hadoop-331-on-windows-10-step-by-step-guide.
On linux you should not have this problem.


VM options [4] ->
--add-exports
java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-Dhadoop.home.dir=C:/hadoop-3.3.1
-Djava.library.path=C:/hadoop-3.3.1/bin

![image](https://user-images.githubusercontent.com/52525583/235370169-230fab69-517a-4008-b66f-acc1f7ced9d9.png)


Archive data [5] ->
Create a folder and add a path in the SparkConfiguration class. In my case it is ".option("checkpointLocation", "C:\\checkpoint")".
Read more here https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html or https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.

![image](https://user-images.githubusercontent.com/52525583/235376976-86ff5110-c248-4dbc-9998-6dee3ad98fe0.png)

--------------------------------------------------------------------------------------------------------------

## KafkaStreams:

The service is designed to process data in real time. The entry point is a Kafka Topic called Order, and the exit point is a Kafka Topic called Summary.


Information about the application:
- uses version 3.3.2 of kafka libraries
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file


Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- in order to process the data it is necessary to start the Producer service (sends data to the topic Order)

--------------------------------------------------------------------------------------------------------------

## Producer:

The service is designed to simulate an IoT device. It is the responsibility of the service to retrieve data from the csv file and send it to the topic Order.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file

--------------------------------------------------------------------------------------------------------------

## Consumer:

The service is designed to receive data from the Summary topic and save it to a file.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
- uses spring-cloud-stream-binder-kafka-streams to bind configuration from yaml file

Requirements to run the application:
- run the zookerpara and broker images that are in the docker-compose file
- requires a running Kafka Stream or Spark Streaming service to process the data

--------------------------------------------------------------------------------------------------------------

## Utils:

The service is designed to store classes needed by other services. The reason for the service is the possibility of duplication of classes in each service.


Information about the application:
- uses Java 17
- uses Spring boot 3.0.5 version
19 changes: 17 additions & 2 deletions SparkStreaming/pom.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<!-- <parent>-->
<!-- <groupId>com.kowalczyk.konrad</groupId>-->
<!-- <artifactId>MsgBrokerSys</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- </parent>-->

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
Expand All @@ -20,11 +22,24 @@
<name>SparkStreaming</name>
<description>SparkStreaming</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -74,12 +89,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -47,28 +49,28 @@ public StructType getSchema() {
.add("indicator", StringType)
.add("stationCode", StringType)
.add("timestampSend", LongType)
.add("timestampStream", LongType)
.add("timestampConsumer", LongType);
}

@Bean
public void process() throws Exception {
sparkSession()
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "Order")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "Summary")
.option("checkpointLocation", "C:\\checkpoint")
.option("idempotent", "true")
.start()
.awaitTermination();
Dataset<Row> df = sparkSession()
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "Order")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "Summary")
.option("checkpointLocation", "C:\\checkpoint")
.option("idempotent", "true")
.start()
.awaitTermination();
}
}
Loading