Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
20dec6d
test-2 - clean branch
xkondix May 27, 2023
4dfc2c4
test-2 - adding filtering of values equal to 0 in KafkaStreamsProcess…
xkondix May 27, 2023
ec969b3
test-2 - clean imports
xkondix May 27, 2023
70f0e35
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
19146f5
test-2 - adding to the report the number of records before the stream…
xkondix May 27, 2023
58a3193
test-3 - add field averagingValue to model
xkondix May 28, 2023
2e2b826
test-3 - add averagingValue to script en
xkondix May 28, 2023
90bacd5
test-3 - add averagingValue to script pl
xkondix Jun 9, 2023
26d2860
test-3 - add pl descriptions in sparkDelay0HalfValueAverage
xkondix Jun 10, 2023
c034e43
test-5 - rename median from average
xkondix Jun 11, 2023
58083df
test-5 - adding classes to support kafka streams
xkondix Jun 11, 2023
f8247a7
test-5 - adding a median counting solution for kafka streams
xkondix Jun 11, 2023
e899785
test-5 - add lombok
xkondix Jun 11, 2023
b06d9b6
test-5 - adding a median counting solution for spark strucured stream…
xkondix Jun 11, 2023
0ef15d0
test-5 - changing the script_en
xkondix Jun 11, 2023
a2b0cb3
test-5 - changing the script_pl
xkondix Jun 11, 2023
a6ff8ff
test-6 - adjusting models under the average
xkondix Jun 11, 2023
c0bc2c7
test-6 - added average calculation in kafka
xkondix Jun 11, 2023
aa029ac
test-6 - added average calculation in spark (but does not work properly)
xkondix Jun 11, 2023
1c4629c
test-6 - update README
xkondix Jun 11, 2023
c787176
test-6 - update DataModel
xkondix Jun 18, 2023
b5e08d0
test-5 - update IotSimulation
xkondix Jun 18, 2023
e621367
test-6 - update Spark
xkondix Jun 18, 2023
59f7498
test-6 - update Kafka
xkondix Jun 18, 2023
4ca0064
test-5 - update scripts
xkondix Jun 18, 2023
d7a7f1f
test-6 - update Spark
xkondix Jun 18, 2023
0019414
test-6 - add results
xkondix Jun 19, 2023
5c0db77
test-6 - update script_en
xkondix Jun 19, 2023
eb9262b
test-6 - update script_pl
xkondix Jun 19, 2023
d314eb3
test-6 - update scripts median to mean
xkondix Jun 19, 2023
23611f3
test-6 - add reports
xkondix Jun 19, 2023
a5c6d9a
test-6 - update README.md
xkondix Jun 19, 2023
4df88a2
test-6 add conf to readme
xkondix Jun 19, 2023
32e7fa4
test-6 Update README.md
xkondix Jun 19, 2023
7897fdd
test-6 - update readme
xkondix Jun 21, 2023
d1fdda6
master - fix maven
xkondix Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
<name>Consumer</name>
<description>Consumer</description>


<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
14 changes: 13 additions & 1 deletion KafkaStreams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
<name>KafkaStreams</name>
<description>KafkaStreams</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
<artifactId>Utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -29,5 +35,11 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.kowalczyk.konrad.kafkastreams;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.kowalczyk.konrad.utils.DataModel;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;

import java.util.Collections;


@JsonIgnoreProperties(ignoreUnknown = true)
public class DataCalc {

@Setter
@Getter
private String date;
@Setter
@Getter
private double value;
@Getter
@Setter
private String positionCode;
@Getter
@Setter
private String unit;
@Getter
@Setter
private String averagingTime;
@Getter
@Setter
private String indicator;
@Getter
@Setter
private String stationCode;
@Setter
@Getter
private long timestampSend;
@Setter
@Getter
private long timestampConsumer;
@Setter
@Getter
private double averageValue;
@Setter
@Getter
private double sum;
@Setter
@Getter
private long count;


public DataCalc(String date, double value, String positionCode, String unit, String averagingTime, String indicator
, String stationCode, long timestampSend, long timestampConsumer, double averageValue) {
this.date = date;
this.value = value;
this.positionCode = positionCode;
this.unit = unit;
this.averagingTime = averagingTime;
this.indicator = indicator;
this.stationCode = stationCode;
this.timestampSend = timestampSend;
this.timestampConsumer = timestampConsumer;
this.averageValue = averageValue;
}

public DataCalc() {
this.date = StringUtils.EMPTY;
this.value = 0.0;
this.positionCode = StringUtils.EMPTY;
this.unit = StringUtils.EMPTY;
this.averagingTime = StringUtils.EMPTY;
this.indicator = StringUtils.EMPTY;
this.stationCode = StringUtils.EMPTY;
this.timestampSend = 0;
this.timestampConsumer = 0;
this.averageValue = 0.0;
this.sum = 0;
this.count = 0;
}

public DataCalc(DataModel dataModel) {
this.date = dataModel.getDate();
this.value = dataModel.getValue();
this.positionCode = dataModel.getPositionCode();
this.unit = dataModel.getUnit();
this.averagingTime = dataModel.getAveragingTime();
this.indicator = dataModel.getIndicator();
this.stationCode = dataModel.getStationCode();
this.timestampSend = dataModel.getTimestampSend();
this.timestampConsumer = 0;
this.averageValue = 0.0;
this.sum = 0;
this.count = 0;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.kowalczyk.konrad.kafkastreams;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;

public class DataCalcDeserializer implements Deserializer<DataCalc> {

private final static ObjectMapper objectMapper = new ObjectMapper();

@Override
public DataCalc deserialize(String topic, byte[] data) {
DataCalc object = null;
try {
object = objectMapper.readValue(data, DataCalc.class);
} catch (Exception e) {
e.printStackTrace();
}
return object;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.kowalczyk.konrad.kafkastreams;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class DataCalcSerde implements Serde<DataCalc> {

@Override
public Serializer<DataCalc> serializer() {
return new DataCalcSerializer();
}

@Override
public Deserializer<DataCalc> deserializer() {
return new DataCalcDeserializer();
}

@Override
public void close() {
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.kowalczyk.konrad.kafkastreams;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

public class DataCalcSerializer implements Serializer<DataCalc> {

private final static ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String s, DataCalc dataCalc) {
byte[] result = null;
try {
result = objectMapper.writeValueAsBytes(dataCalc);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.kowalczyk.konrad.kafkastreams;

import com.kowalczyk.konrad.utils.DataModel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -10,10 +14,38 @@
@Configuration
public class KafkaStreamsProcess {

public static final String START_DATE = "1/1/2019 01:00";

@Bean
public Function<KStream<String, DataModel>, KStream<String, DataModel>> process() {
return kStream -> kStream;
}
return kStream -> kStream.filter((key, value) -> value.getValue() > 0)
.map((key, value) -> new KeyValue<>(value.getId(), new DataCalc(value)))
.groupByKey(Grouped.with(Serdes.String(), new DataCalcSerde()))
.aggregate(DataCalc::new,
(key, value, aggregate) -> {
if (START_DATE.equals(value.getDate())) {
aggregate = value;
}
aggregate.setSum(aggregate.getSum() + value.getValue());
aggregate.setCount(aggregate.getCount() + 1);
aggregate.setAverageValue(aggregate.getSum() / aggregate.getCount());
return aggregate;
}
, Materialized.with(Serdes.String(), new DataCalcSerde()))
.toStream()
.map((key, value) -> new KeyValue<>(String.valueOf(value.getPositionCode())
, new DataModel(value.getDate()
, value.getValue()
, value.getPositionCode()
, value.getUnit()
, value.getAveragingTime()
, value.getIndicator()
, value.getStationCode()
, value.getTimestampSend()
, 0
, value.getAverageValue()
, value.getCount())));

}

}
7 changes: 7 additions & 0 deletions Producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<packaging>jar</packaging>
<description>Producer</description>

<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.kowalczyk.konrad</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import com.kowalczyk.konrad.utils.IoTSimulation;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down
Loading