From 1dd5025b0cff5131ce6d388c20d84221e0c8a9bf Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 9 Aug 2021 16:03:48 +0530 Subject: [PATCH 1/6] stream logs to kakfa --- pom.xml | 18 +++++++++-- src/main/java/com/uci/inbound/Inbound.java | 7 +++-- .../java/com/uci/inbound/KakfaLogTopics.java | 26 ++++++++++++++++ src/main/resources/log4j2.xml | 30 +++++++++++++++++++ 4 files changed, 76 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/uci/inbound/KakfaLogTopics.java create mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index ae45bec..7b6ea2f 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,12 @@ org.springframework.boot spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + org.springframework.boot @@ -75,7 +81,7 @@ jaxb-impl 2.2.11 - + org.springframework.boot spring-boot-starter-test @@ -98,7 +104,15 @@ 1.0 compile - + + org.springframework.boot + spring-boot-starter-log4j2 + + + org.apache.kafka + kafka-log4j-appender + 1.0.0 + diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 28792d9..198f059 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -10,8 +10,6 @@ import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; import org.springframework.kafka.annotation.EnableKafka; -import com.uci.dao.service.HealthService; - @EnableKafka @EnableReactiveCassandraRepositories("com.uci.dao") @EntityScan("com.uci.dao") @@ -24,7 +22,10 @@ @SpringBootApplication @ComponentScan(basePackages = {"com.uci.inbound", "com.uci.adapter", "com.uci.utils"}) public class Inbound { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { SpringApplication.run(Inbound.class, args); + + KakfaLogTopics object = new KakfaLogTopics(); + object.createTopic(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/inbound/KakfaLogTopics.java b/src/main/java/com/uci/inbound/KakfaLogTopics.java new file mode 100644 index 0000000..5fe18d8 --- /dev/null +++ b/src/main/java/com/uci/inbound/KakfaLogTopics.java @@ -0,0 +1,26 @@ +package com.uci.inbound; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.stereotype.Component; + +@Component +public class KakfaLogTopics { + + public void createTopic() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "165.232.182.146:9094"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("group.id", "logs"); + + KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); + List topics = new ArrayList(); + topics.add("inbound-logs"); + kafkaConsumer.subscribe(topics); + } +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..70b9c0b --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,30 @@ + + + + + + + 165.232.182.146:9094 + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From dbcb00a23ece5038e1068db9c6c7e2ecb3ad2fad Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 9 Aug 2021 17:55:23 +0530 Subject: [PATCH 2/6] push logs to kafka via utils --- pom.xml | 9 ------ src/main/java/com/uci/inbound/Inbound.java | 3 -- .../java/com/uci/inbound/KakfaLogTopics.java | 26 ---------------- src/main/resources/application.properties | 2 ++ src/main/resources/log4j2.xml | 30 ------------------- 5 files changed, 2 insertions(+), 68 deletions(-) delete mode 100644 src/main/java/com/uci/inbound/KakfaLogTopics.java delete mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index 7b6ea2f..c8177df 100644 --- a/pom.xml +++ b/pom.xml @@ -104,15 +104,6 @@ 1.0 compile - - org.springframework.boot - spring-boot-starter-log4j2 - - - org.apache.kafka - kafka-log4j-appender - 1.0.0 - diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 198f059..7370a3e 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -24,8 +24,5 @@ public class Inbound { public static void main(String[] args) throws InterruptedException { SpringApplication.run(Inbound.class, args); - - KakfaLogTopics object = new KakfaLogTopics(); - object.createTopic(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/inbound/KakfaLogTopics.java b/src/main/java/com/uci/inbound/KakfaLogTopics.java deleted file mode 100644 index 5fe18d8..0000000 --- a/src/main/java/com/uci/inbound/KakfaLogTopics.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.uci.inbound; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.stereotype.Component; - -@Component -public class KakfaLogTopics { - - public void createTopic() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", "165.232.182.146:9094"); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("group.id", "logs"); - - KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); - List topics = new ArrayList(); - topics.add("inbound-logs"); - kafkaConsumer.subscribe(topics); - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 965059e..202fb28 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -28,6 +28,8 @@ provider.gupshup.whatsapp.appname=Ekstep campaign.url = ${CAMPAIGN_URL} campaign.admin.token = ${CAMPAIGN_ADMIN_TOKEN} +# log4j2 log topic config +kafka.logs.topic=${KAFKA_LOGS_TOPIC} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml deleted file mode 100644 index 70b9c0b..0000000 --- a/src/main/resources/log4j2.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - - 165.232.182.146:9094 - - - - - - - - - - - - - - - - - - - - \ No newline at end of file From 1d4b1a79e1e016835fa97f891e66c5286834c948 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Thu, 19 Aug 2021 17:46:45 +0530 Subject: [PATCH 3/6] custom kafka appender for logs with custom layout --- .../health/ServiceStatusController.java | 109 ++++++++++++------ src/main/resources/application.properties | 2 + src/main/resources/log4j2.xml | 35 ++++++ 3 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 src/main/resources/log4j2.xml diff --git a/src/main/java/com/uci/inbound/health/ServiceStatusController.java b/src/main/java/com/uci/inbound/health/ServiceStatusController.java index 9a80d13..a6a771b 100644 --- a/src/main/java/com/uci/inbound/health/ServiceStatusController.java +++ b/src/main/java/com/uci/inbound/health/ServiceStatusController.java @@ -10,6 +10,9 @@ import com.uci.utils.BotService; import com.uci.dao.service.HealthService; import com.uci.utils.kafka.KafkaConfig; +import com.uci.utils.telemetry.LogTelemetryBuilder; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -18,6 +21,9 @@ import java.io.IOException; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator; @@ -32,43 +38,70 @@ @RestController @RequestMapping(value = "/service") public class ServiceStatusController { - @Autowired + + @Autowired private HealthService healthService; - - @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCassandraHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity kafkaStatusCheck() throws IOException, JsonProcessingException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/campaign", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingException, IOException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCampaignUrlHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - /** - * Returns json node for service response - * - * @return JsonNode - * @throws JsonMappingException - * @throws JsonProcessingException - */ - private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); - return jsonNode; - } + + @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", + "text/json" }) + public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getCassandraHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + @RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity kafkaStatusCheck() + throws IOException, JsonProcessingException, InterruptedException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + @RequestMapping(value = "/health/campaign", method = RequestMethod.GET, produces = { "application/json", + "text/json" }) + public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingException, IOException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getCampaignUrlHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + private static final Logger logger = LogManager.getLogger(TelemetryLogger.class); + + /* + * Test with custom kafka appender & custom layout, + * telemetry object build internally via custom layout mentioned in xml by sent message + */ + @RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity testKafkaLogAppender() throws JsonProcessingException, IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}"); + + logger.fatal("Fatal Test Message 1"); + + logger.info("Info Test Message 1"); + + logger.error("Error Test Message 1"); + + logger.warn("Warn Test Message 1"); + + return ResponseEntity.ok(jsonNode); + } + + /** + * Returns json node for service response + * + * @return JsonNode + * @throws JsonMappingException + * @throws JsonProcessingException + */ + private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree( + "{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); + return jsonNode; + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 202fb28..ae05601 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -31,5 +31,7 @@ campaign.admin.token = ${CAMPAIGN_ADMIN_TOKEN} # log4j2 log topic config kafka.logs.topic=${KAFKA_LOGS_TOPIC} +log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector + diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..61e2c0a --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,35 @@ + + + + ${env:KAFKA_LOGS_TOPIC} + ${env:BOOTSTRAP_SERVERS} + + + + + ${kafka.bootstrap.servers} + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 3e54af65ea9ca470b14dc76a90ff977c518283c3 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Tue, 7 Sep 2021 15:43:18 +0530 Subject: [PATCH 4/6] telemetry logs on starting a converation --- .../uci/inbound/utils/XMsgProcessingUtil.java | 424 ++++++++++-------- src/main/resources/application.properties | 2 + src/main/resources/log4j2.xml | 2 +- 3 files changed, 236 insertions(+), 192 deletions(-) diff --git a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java index 78e4037..5562410 100644 --- a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java +++ b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java @@ -8,7 +8,12 @@ import com.uci.dao.repository.XMessageRepository; import com.uci.dao.utils.XMessageDAOUtils; import com.uci.utils.BotService; +import com.uci.utils.CampaignService; import com.uci.utils.kafka.SimpleProducer; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; +import com.uci.utils.telemetry.util.TelemetryEventNames; + import lombok.Builder; import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.SenderReceiverInfo; @@ -16,6 +21,12 @@ import reactor.core.publisher.Mono; import javax.xml.bind.JAXBException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Comparator; @@ -27,195 +38,226 @@ @Builder public class XMsgProcessingUtil { - AbstractProvider adapter; - CommonMessage inboundMessage; - SimpleProducer kafkaProducer; - XMessageRepository xMsgRepo; - String topicSuccess; - String topicFailure; - BotService botService; - - - public void process() throws JsonProcessingException { - - log.info("incoming message {}", new ObjectMapper().writeValueAsString(inboundMessage)); - try { - adapter.convertMessageToXMsg(inboundMessage) - .doOnError(genericError("Error in converting to XMessage by Adapter")) - .subscribe(xmsg -> { - getAppName(xmsg.getPayload().getText(), xmsg.getFrom()) - .subscribe(appName -> { - xmsg.setApp(appName); - XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg); - if (isCurrentMessageNotAReply(xmsg)) { - String whatsappId = xmsg.getMessageId().getChannelMessageId(); - getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED) - .doOnError(genericError("Error in getting last message")) - .subscribe(new Consumer() { - @Override - public void accept(XMessageDAO previousMessage) { - previousMessage.setMessageId(whatsappId); - xMsgRepo.save(previousMessage) - .doOnError(genericError("Error in saving previous message")) - .subscribe(new Consumer() { - @Override - public void accept(XMessageDAO updatedPreviousMessage) { - xMsgRepo.insert(currentMessageToBeInserted) - .doOnError(genericError("Error in inserting current message")) - .subscribe(insertedMessage -> { - sendEventToKafka(xmsg); - }); - } - }); - } - }); - } else { - xMsgRepo.insert(currentMessageToBeInserted) - .doOnError(genericError("Error in inserting current message")) - .subscribe(xMessageDAO -> { - sendEventToKafka(xmsg); - }); - } - }); - - }); - - } catch (JAXBException e) { - e.printStackTrace(); - } - } - - private Consumer genericError(String s) { - return c -> { - log.error(s + "::" + c.getMessage()); - }; - } - - private boolean isCurrentMessageNotAReply(XMessage xmsg) { - return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED); - } - - private void sendEventToKafka(XMessage xmsg) { - String xmessage = null; - try { - xmessage = xmsg.toXML(); - } catch (JAXBException e) { - kafkaProducer.send(topicFailure, inboundMessage.toString()); - } - kafkaProducer.send(topicSuccess, xmessage); - } - - private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) { - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - return xMsgRepo - .findAllByFromIdAndTimestampAfter(userID, yesterday) - .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID))) - .collectList() - .map(xMessageDAOS -> { - if (xMessageDAOS.size() > 0) { - List filteredList = new ArrayList<>(); - for (XMessageDAO xMessageDAO : xMessageDAOS) { - if (xMessageDAO.getMessageState().equals(messageState.name())) - filteredList.add(xMessageDAO); - } - if (filteredList.size() > 0) { - filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp)); - } - - return xMessageDAOS.get(0); - } - return new XMessageDAO(); - }); - } - - private Mono getAppName(String text, SenderReceiverInfo from) { - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - if (text.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } else { - try { - return botService.getCampaignFromStartingMessage(text) - .flatMap(new Function>() { - @Override - public Mono apply(String appName1) { - if (appName1 == null || appName1.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } - } - return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1); - } - }); - } catch (Exception e) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } - } - } - - private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { - return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday) - .collectList() - .map(new Function, XMessageDAO>() { - @Override - public XMessageDAO apply(List xMessageDAOS) { - if (xMessageDAOS.size() > 0) { - List filteredList = new ArrayList<>(); - for (XMessageDAO xMessageDAO : xMessageDAOS) { - if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name())) - filteredList.add(xMessageDAO); - } - if (filteredList.size() > 0) { - filteredList.sort(new Comparator() { - @Override - public int compare(XMessageDAO o1, XMessageDAO o2) { - return o1.getTimestamp().compareTo(o2.getTimestamp()); - } - }); - } - return xMessageDAOS.get(0); - } - return new XMessageDAO(); - } - }); - } + AbstractProvider adapter; + CommonMessage inboundMessage; + SimpleProducer kafkaProducer; + XMessageRepository xMsgRepo; + String topicSuccess; + String topicFailure; + BotService botService; + CampaignService campaignService; + + @Value("${producer.id}") + private String producerID; + + private static final Logger telemetrylogger = LogManager.getLogger(TelemetryLogger.class); + + public void process() throws JsonProcessingException { + + String incomingMessage = new ObjectMapper().writeValueAsString(inboundMessage); + log.info("incoming message {}", incomingMessage); + try { + adapter.convertMessageToXMsg(inboundMessage) + .doOnError(genericError("Error in converting to XMessage by Adapter", null)).subscribe(xmsg -> { + getAppName(xmsg.getPayload().getText(), xmsg.getFrom()).subscribe(appName -> { + xmsg.setApp(appName); + XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg); + if (isCurrentMessageNotAReply(xmsg)) { + String whatsappId = xmsg.getMessageId().getChannelMessageId(); + getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED) + .doOnError(genericError("Error in getting last message", xmsg)) + .subscribe(new Consumer() { + @Override + public void accept(XMessageDAO previousMessage) { + previousMessage.setMessageId(whatsappId); + xMsgRepo.save(previousMessage) + .doOnError(genericError("Error in saving previous message", xmsg)) + .subscribe(new Consumer() { + @Override + public void accept(XMessageDAO updatedPreviousMessage) { + xMsgRepo.insert(currentMessageToBeInserted) + .doOnError(genericError( + "Error in inserting current message", xmsg)) + .subscribe(insertedMessage -> { + sendEventToKafka(xmsg); + }); + } + }); + } + }); + } else { + /* Log telemtery event - Start Conversation */ + telemetrylogger.info(new LogTelemetryMessage(String.format("Start Conversation with incoming message : %s", incomingMessage), + TelemetryEventNames.STARTCONVERSATION, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID())); + xMsgRepo.insert(currentMessageToBeInserted) + .doOnError(genericError("Error in inserting current message", xmsg)) + .subscribe(xMessageDAO -> { + sendEventToKafka(xmsg); + }); + } + }); + + }); + + } catch (JAXBException e) { + e.printStackTrace(); + } + } + + /* Error to be logged */ + private Consumer genericError(String s, XMessage xmsg) { + return c -> { + log.error(s + "::" + c.getMessage()); + /* Log Telemetry event - Exception */ + if(xmsg != null) { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID())); + } else { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS)); + } + }; + } + + private boolean isCurrentMessageNotAReply(XMessage xmsg) { + return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED); + } + + private void sendEventToKafka(XMessage xmsg) { + String xmessage = null; + try { + xmessage = xmsg.toXML(); + } catch (JAXBException e) { + kafkaProducer.send(topicFailure, inboundMessage.toString()); + } + kafkaProducer.send(topicSuccess, xmessage); + } + + private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) { + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + return xMsgRepo.findAllByFromIdAndTimestampAfter(userID, yesterday) + .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID), null)) + .collectList().map(xMessageDAOS -> { + if (xMessageDAOS.size() > 0) { + List filteredList = new ArrayList<>(); + for (XMessageDAO xMessageDAO : xMessageDAOS) { + if (xMessageDAO.getMessageState().equals(messageState.name())) + filteredList.add(xMessageDAO); + } + if (filteredList.size() > 0) { + filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp)); + } + + return xMessageDAOS.get(0); + } + return new XMessageDAO(); + }); + } + + private Mono getAppName(String text, SenderReceiverInfo from) { + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + if (text.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } else { + try { + return botService.getCampaignFromStartingMessage(text) + .flatMap(new Function>() { + @Override + public Mono apply(String appName1) { + if (appName1 == null || appName1.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } + } + return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") + : Mono.just(appName1); + } + }); + } catch (Exception e) { + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } + } + } + + private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { + return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday).collectList() + .map(new Function, XMessageDAO>() { + @Override + public XMessageDAO apply(List xMessageDAOS) { + if (xMessageDAOS.size() > 0) { + List filteredList = new ArrayList<>(); + for (XMessageDAO xMessageDAO : xMessageDAOS) { + if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name())) + filteredList.add(xMessageDAO); + } + if (filteredList.size() > 0) { + filteredList.sort(new Comparator() { + @Override + public int compare(XMessageDAO o1, XMessageDAO o2) { + return o1.getTimestamp().compareTo(o2.getTimestamp()); + } + }); + } + return xMessageDAOS.get(0); + } + return new XMessageDAO(); + } + }); + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ae05601..e1e7484 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -33,5 +33,7 @@ kafka.logs.topic=${KAFKA_LOGS_TOPIC} log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector +producer.id = "inbound" + diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 61e2c0a..4e04e85 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -7,7 +7,7 @@ - + ${kafka.bootstrap.servers} From ff1861091b261d28686a82f8652f58ae8a2b4dfc Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 13 Sep 2021 15:29:41 +0530 Subject: [PATCH 5/6] conversation id and owner id from campaign data to be sent to telemetry log kafka topic --- src/main/java/com/uci/inbound/Inbound.java | 2 ++ .../netcore/NetcoreWhatsappConverter.java | 6 ++++++ .../uci/inbound/utils/XMsgProcessingUtil.java | 18 +++++++++++++++--- src/main/resources/application.properties | 2 +- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 7370a3e..584fd1a 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -10,6 +10,8 @@ import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; import org.springframework.kafka.annotation.EnableKafka; +import com.uci.utils.kafka.KafkaLogConfig; + @EnableKafka @EnableReactiveCassandraRepositories("com.uci.dao") @EntityScan("com.uci.dao") diff --git a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java index ba5a200..eb884ec 100644 --- a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java +++ b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java @@ -8,6 +8,8 @@ import com.uci.utils.kafka.SimpleProducer; import lombok.extern.slf4j.Slf4j; import com.uci.utils.BotService; +import com.uci.utils.CampaignService; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; @@ -42,6 +44,9 @@ public class NetcoreWhatsappConverter { @Autowired public BotService botService; + + @Autowired + public CampaignService campaignService; @RequestMapping(value = "/whatsApp", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE) public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws JsonProcessingException, JAXBException { @@ -60,6 +65,7 @@ public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws Js .topicSuccess(inboundProcessed) .kafkaProducer(kafkaProducer) .botService(botService) + .campaignService(campaignService) .build() .process(); } diff --git a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java index 5562410..ea1a625 100644 --- a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java +++ b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java @@ -1,6 +1,7 @@ package com.uci.inbound.utils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.uci.adapter.provider.factory.AbstractProvider; import com.uci.adapter.Request.CommonMessage; @@ -87,9 +88,20 @@ public void accept(XMessageDAO updatedPreviousMessage) { }); } else { /* Log telemtery event - Start Conversation */ - telemetrylogger.info(new LogTelemetryMessage(String.format("Start Conversation with incoming message : %s", incomingMessage), - TelemetryEventNames.STARTCONVERSATION, "", xmsg.getChannel(), - xmsg.getProvider(), producerID, xmsg.getFrom().getUserID())); + campaignService.getCampaignFromNameTransformer(xmsg.getCampaign()) + .doOnError(genericError("Error in getting campaign data for telemetry log", null)) + .subscribe(new Consumer() { + @Override + public void accept(JsonNode t) { + String id = t.get("id") != null ? t.get("id").asText() : null; + String ownerId = t.get("owner") != null ? t.get("owner").asText() : null; + System.out.println("hey"); + telemetrylogger.info(new LogTelemetryMessage(String.format("Start Conversation with incoming message : %s", incomingMessage), + TelemetryEventNames.STARTCONVERSATION, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID(), id, ownerId)); + } + }); + xMsgRepo.insert(currentMessageToBeInserted) .doOnError(genericError("Error in inserting current message", xmsg)) .subscribe(xMessageDAO -> { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e1e7484..4595e9b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -33,7 +33,7 @@ kafka.logs.topic=${KAFKA_LOGS_TOPIC} log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -producer.id = "inbound" +producer.id=inbound From 51db2ef246d00dc95304a1d8df8122f88f733684 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 13 Sep 2021 15:45:21 +0530 Subject: [PATCH 6/6] service status controller error resolved --- .../health/ServiceStatusController.java | 72 ------------------- 1 file changed, 72 deletions(-) diff --git a/src/main/java/com/uci/inbound/health/ServiceStatusController.java b/src/main/java/com/uci/inbound/health/ServiceStatusController.java index cbd6c8f..5f513b7 100644 --- a/src/main/java/com/uci/inbound/health/ServiceStatusController.java +++ b/src/main/java/com/uci/inbound/health/ServiceStatusController.java @@ -41,77 +41,6 @@ public class ServiceStatusController { @Autowired private HealthService healthService; - -<<<<<<< HEAD - @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", - "text/json" }) - public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCassandraHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity kafkaStatusCheck() - throws IOException, JsonProcessingException, InterruptedException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/campaign", method = RequestMethod.GET, produces = { "application/json", - "text/json" }) - public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingException, IOException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCampaignUrlHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - private static final Logger logger = LogManager.getLogger(TelemetryLogger.class); - - /* - * Test with custom kafka appender & custom layout, - * telemetry object build internally via custom layout mentioned in xml by sent message - */ - @RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity testKafkaLogAppender() throws JsonProcessingException, IOException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}"); - - logger.fatal("Fatal Test Message 1"); - - logger.info("Info Test Message 1"); - - logger.error("Error Test Message 1"); - - logger.warn("Warn Test Message 1"); - - return ResponseEntity.ok(jsonNode); - } - - /** - * Returns json node for service response - * - * @return JsonNode - * @throws JsonMappingException - * @throws JsonProcessingException - */ - private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree( - "{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); - return jsonNode; - } -======= - @RequestMapping(value = "/health", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity statusCheck() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode json = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":true}}"); - return ResponseEntity.ok(json); - } @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" }) public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { @@ -149,5 +78,4 @@ private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessi JsonNode jsonNode = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); return jsonNode; } ->>>>>>> 1011337be25f6d13991e44083933a03c15717ef7 }