From 149ad67ac060ba768caef26c649f2d4d467c2310 Mon Sep 17 00:00:00 2001 From: Subin Cho Date: Tue, 29 Apr 2025 15:09:24 +0900 Subject: [PATCH] =?UTF-8?q?feat=20|=20sprint1=20|=20FRB-30=20|time=20stamp?= =?UTF-8?q?=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC=ED=98=84=20|=20=EC=A1=B0?= =?UTF-8?q?=EC=88=98=EB=B9=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/monitory/data/FlinkApplication.java | 13 +++--------- .../transformations/TimeStampAssigner.java | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+), 10 deletions(-) create mode 100644 src/main/java/com/monitory/data/transformations/TimeStampAssigner.java diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkApplication.java index 52be518..9dec0f5 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkApplication.java @@ -1,8 +1,8 @@ package com.monitory.data; import com.monitory.data.sources.MqttSource; +import com.monitory.data.transformations.TimeStampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -16,16 +16,9 @@ public static void main (String [] args) throws Exception { // 2. 데이터 소스 DataStream sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source"); - // 3. 데이터 처리: 단순하게 문자열을 대문자로 변환하는 예시 + // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지 DataStream transformedStream = sourceStream - .map(new MapFunction() { - @Override - public String map(String value) throws Exception { -// Thread.sleep(2000000); - System.out.println("💡 received: " + value); - return value.toUpperCase(); - } - }); + .map(new TimeStampAssigner()); // 4. 데이터 싱크: 콘솔에 출력 transformedStream.print(); diff --git a/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java b/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java new file mode 100644 index 0000000..0dbc561 --- /dev/null +++ b/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java @@ -0,0 +1,20 @@ +package com.monitory.data.transformations; + +import org.apache.flink.api.common.functions.MapFunction; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.time.Instant; +import java.time.LocalDateTime; + +public class TimeStampAssigner implements MapFunction { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Override + public String map(String value) throws Exception { + ObjectNode jsonNode = (ObjectNode) mapper.readTree(value); + jsonNode.put("time", LocalDateTime.now().toString()); + return mapper.writeValueAsString(jsonNode); + } +}