diff --git a/pom.xml b/pom.xml index ae45bec..c8177df 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,12 @@ org.springframework.boot spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + org.springframework.boot @@ -75,7 +81,7 @@ jaxb-impl 2.2.11 - + org.springframework.boot spring-boot-starter-test @@ -98,7 +104,6 @@ 1.0 compile - diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 28792d9..584fd1a 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -10,7 +10,7 @@ import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; import org.springframework.kafka.annotation.EnableKafka; -import com.uci.dao.service.HealthService; +import com.uci.utils.kafka.KafkaLogConfig; @EnableKafka @EnableReactiveCassandraRepositories("com.uci.dao") @@ -24,7 +24,7 @@ @SpringBootApplication @ComponentScan(basePackages = {"com.uci.inbound", "com.uci.adapter", "com.uci.utils"}) public class Inbound { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { SpringApplication.run(Inbound.class, args); } } \ No newline at end of file diff --git a/src/main/java/com/uci/inbound/health/ServiceStatusController.java b/src/main/java/com/uci/inbound/health/ServiceStatusController.java index 9ec4b0b..5f513b7 100644 --- a/src/main/java/com/uci/inbound/health/ServiceStatusController.java +++ b/src/main/java/com/uci/inbound/health/ServiceStatusController.java @@ -10,6 +10,9 @@ import com.uci.utils.BotService; import com.uci.dao.service.HealthService; import com.uci.utils.kafka.KafkaConfig; +import com.uci.utils.telemetry.LogTelemetryBuilder; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -18,6 +21,9 @@ import java.io.IOException; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator; @@ -32,15 +38,9 @@ @RestController @RequestMapping(value = "/service") public class ServiceStatusController { - @Autowired - private HealthService healthService; - @RequestMapping(value = "/health", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity statusCheck() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode json = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":true}}"); - return ResponseEntity.ok(json); - } + @Autowired + private HealthService healthService; @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" }) public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { diff --git a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java index ba5a200..eb884ec 100644 --- a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java +++ b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java @@ -8,6 +8,8 @@ import com.uci.utils.kafka.SimpleProducer; import lombok.extern.slf4j.Slf4j; import com.uci.utils.BotService; +import com.uci.utils.CampaignService; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; @@ -42,6 +44,9 @@ public class NetcoreWhatsappConverter { @Autowired public BotService botService; + + @Autowired + public CampaignService campaignService; @RequestMapping(value = "/whatsApp", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE) public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws JsonProcessingException, JAXBException { @@ -60,6 +65,7 @@ public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws Js .topicSuccess(inboundProcessed) .kafkaProducer(kafkaProducer) .botService(botService) + .campaignService(campaignService) .build() .process(); } diff --git a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java index 78e4037..ea1a625 100644 --- a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java +++ b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java @@ -1,6 +1,7 @@ package com.uci.inbound.utils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.uci.adapter.provider.factory.AbstractProvider; import com.uci.adapter.Request.CommonMessage; @@ -8,7 +9,12 @@ import com.uci.dao.repository.XMessageRepository; import com.uci.dao.utils.XMessageDAOUtils; import com.uci.utils.BotService; +import com.uci.utils.CampaignService; import com.uci.utils.kafka.SimpleProducer; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; +import com.uci.utils.telemetry.util.TelemetryEventNames; + import lombok.Builder; import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.SenderReceiverInfo; @@ -16,6 +22,12 @@ import reactor.core.publisher.Mono; import javax.xml.bind.JAXBException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Comparator; @@ -27,195 +39,237 @@ @Builder public class XMsgProcessingUtil { - AbstractProvider adapter; - CommonMessage inboundMessage; - SimpleProducer kafkaProducer; - XMessageRepository xMsgRepo; - String topicSuccess; - String topicFailure; - BotService botService; - - - public void process() throws JsonProcessingException { - - log.info("incoming message {}", new ObjectMapper().writeValueAsString(inboundMessage)); - try { - adapter.convertMessageToXMsg(inboundMessage) - .doOnError(genericError("Error in converting to XMessage by Adapter")) - .subscribe(xmsg -> { - getAppName(xmsg.getPayload().getText(), xmsg.getFrom()) - .subscribe(appName -> { - xmsg.setApp(appName); - XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg); - if (isCurrentMessageNotAReply(xmsg)) { - String whatsappId = xmsg.getMessageId().getChannelMessageId(); - getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED) - .doOnError(genericError("Error in getting last message")) - .subscribe(new Consumer() { - @Override - public void accept(XMessageDAO previousMessage) { - previousMessage.setMessageId(whatsappId); - xMsgRepo.save(previousMessage) - .doOnError(genericError("Error in saving previous message")) - .subscribe(new Consumer() { - @Override - public void accept(XMessageDAO updatedPreviousMessage) { - xMsgRepo.insert(currentMessageToBeInserted) - .doOnError(genericError("Error in inserting current message")) - .subscribe(insertedMessage -> { - sendEventToKafka(xmsg); - }); - } - }); - } - }); - } else { - xMsgRepo.insert(currentMessageToBeInserted) - .doOnError(genericError("Error in inserting current message")) - .subscribe(xMessageDAO -> { - sendEventToKafka(xmsg); - }); - } - }); - - }); - - } catch (JAXBException e) { - e.printStackTrace(); - } - } - - private Consumer genericError(String s) { - return c -> { - log.error(s + "::" + c.getMessage()); - }; - } - - private boolean isCurrentMessageNotAReply(XMessage xmsg) { - return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED); - } - - private void sendEventToKafka(XMessage xmsg) { - String xmessage = null; - try { - xmessage = xmsg.toXML(); - } catch (JAXBException e) { - kafkaProducer.send(topicFailure, inboundMessage.toString()); - } - kafkaProducer.send(topicSuccess, xmessage); - } - - private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) { - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - return xMsgRepo - .findAllByFromIdAndTimestampAfter(userID, yesterday) - .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID))) - .collectList() - .map(xMessageDAOS -> { - if (xMessageDAOS.size() > 0) { - List filteredList = new ArrayList<>(); - for (XMessageDAO xMessageDAO : xMessageDAOS) { - if (xMessageDAO.getMessageState().equals(messageState.name())) - filteredList.add(xMessageDAO); - } - if (filteredList.size() > 0) { - filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp)); - } - - return xMessageDAOS.get(0); - } - return new XMessageDAO(); - }); - } - - private Mono getAppName(String text, SenderReceiverInfo from) { - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - if (text.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } else { - try { - return botService.getCampaignFromStartingMessage(text) - .flatMap(new Function>() { - @Override - public Mono apply(String appName1) { - if (appName1 == null || appName1.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } - } - return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1); - } - }); - } catch (Exception e) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } - } - } - - private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { - return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday) - .collectList() - .map(new Function, XMessageDAO>() { - @Override - public XMessageDAO apply(List xMessageDAOS) { - if (xMessageDAOS.size() > 0) { - List filteredList = new ArrayList<>(); - for (XMessageDAO xMessageDAO : xMessageDAOS) { - if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name())) - filteredList.add(xMessageDAO); - } - if (filteredList.size() > 0) { - filteredList.sort(new Comparator() { - @Override - public int compare(XMessageDAO o1, XMessageDAO o2) { - return o1.getTimestamp().compareTo(o2.getTimestamp()); - } - }); - } - return xMessageDAOS.get(0); - } - return new XMessageDAO(); - } - }); - } + AbstractProvider adapter; + CommonMessage inboundMessage; + SimpleProducer kafkaProducer; + XMessageRepository xMsgRepo; + String topicSuccess; + String topicFailure; + BotService botService; + CampaignService campaignService; + + @Value("${producer.id}") + private String producerID; + + private static final Logger telemetrylogger = LogManager.getLogger(TelemetryLogger.class); + + public void process() throws JsonProcessingException { + + String incomingMessage = new ObjectMapper().writeValueAsString(inboundMessage); + log.info("incoming message {}", incomingMessage); + try { + adapter.convertMessageToXMsg(inboundMessage) + .doOnError(genericError("Error in converting to XMessage by Adapter", null)).subscribe(xmsg -> { + getAppName(xmsg.getPayload().getText(), xmsg.getFrom()).subscribe(appName -> { + xmsg.setApp(appName); + XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg); + if (isCurrentMessageNotAReply(xmsg)) { + String whatsappId = xmsg.getMessageId().getChannelMessageId(); + getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED) + .doOnError(genericError("Error in getting last message", xmsg)) + .subscribe(new Consumer() { + @Override + public void accept(XMessageDAO previousMessage) { + previousMessage.setMessageId(whatsappId); + xMsgRepo.save(previousMessage) + .doOnError(genericError("Error in saving previous message", xmsg)) + .subscribe(new Consumer() { + @Override + public void accept(XMessageDAO updatedPreviousMessage) { + xMsgRepo.insert(currentMessageToBeInserted) + .doOnError(genericError( + "Error in inserting current message", xmsg)) + .subscribe(insertedMessage -> { + sendEventToKafka(xmsg); + }); + } + }); + } + }); + } else { + /* Log telemtery event - Start Conversation */ + campaignService.getCampaignFromNameTransformer(xmsg.getCampaign()) + .doOnError(genericError("Error in getting campaign data for telemetry log", null)) + .subscribe(new Consumer() { + @Override + public void accept(JsonNode t) { + String id = t.get("id") != null ? t.get("id").asText() : null; + String ownerId = t.get("owner") != null ? t.get("owner").asText() : null; + System.out.println("hey"); + telemetrylogger.info(new LogTelemetryMessage(String.format("Start Conversation with incoming message : %s", incomingMessage), + TelemetryEventNames.STARTCONVERSATION, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID(), id, ownerId)); + } + }); + + xMsgRepo.insert(currentMessageToBeInserted) + .doOnError(genericError("Error in inserting current message", xmsg)) + .subscribe(xMessageDAO -> { + sendEventToKafka(xmsg); + }); + } + }); + + }); + + } catch (JAXBException e) { + e.printStackTrace(); + } + } + + /* Error to be logged */ + private Consumer genericError(String s, XMessage xmsg) { + return c -> { + log.error(s + "::" + c.getMessage()); + /* Log Telemetry event - Exception */ + if(xmsg != null) { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID())); + } else { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS)); + } + }; + } + + private boolean isCurrentMessageNotAReply(XMessage xmsg) { + return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED); + } + + private void sendEventToKafka(XMessage xmsg) { + String xmessage = null; + try { + xmessage = xmsg.toXML(); + } catch (JAXBException e) { + kafkaProducer.send(topicFailure, inboundMessage.toString()); + } + kafkaProducer.send(topicSuccess, xmessage); + } + + private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) { + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + return xMsgRepo.findAllByFromIdAndTimestampAfter(userID, yesterday) + .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID), null)) + .collectList().map(xMessageDAOS -> { + if (xMessageDAOS.size() > 0) { + List filteredList = new ArrayList<>(); + for (XMessageDAO xMessageDAO : xMessageDAOS) { + if (xMessageDAO.getMessageState().equals(messageState.name())) + filteredList.add(xMessageDAO); + } + if (filteredList.size() > 0) { + filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp)); + } + + return xMessageDAOS.get(0); + } + return new XMessageDAO(); + }); + } + + private Mono getAppName(String text, SenderReceiverInfo from) { + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + if (text.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } else { + try { + return botService.getCampaignFromStartingMessage(text) + .flatMap(new Function>() { + @Override + public Mono apply(String appName1) { + if (appName1 == null || appName1.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } + } + return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") + : Mono.just(appName1); + } + }); + } catch (Exception e) { + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } + } + } + + private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { + return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday).collectList() + .map(new Function, XMessageDAO>() { + @Override + public XMessageDAO apply(List xMessageDAOS) { + if (xMessageDAOS.size() > 0) { + List filteredList = new ArrayList<>(); + for (XMessageDAO xMessageDAO : xMessageDAOS) { + if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name())) + filteredList.add(xMessageDAO); + } + if (filteredList.size() > 0) { + filteredList.sort(new Comparator() { + @Override + public int compare(XMessageDAO o1, XMessageDAO o2) { + return o1.getTimestamp().compareTo(o2.getTimestamp()); + } + }); + } + return xMessageDAOS.get(0); + } + return new XMessageDAO(); + } + }); + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index d7c0d01..4ad6a09 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -34,6 +34,12 @@ fusionauth.url = ${FUSIONAUTH_URL} fusionauth.key = ${FUSIONAUTH_KEY} +# log4j2 log topic config +kafka.logs.topic=${KAFKA_LOGS_TOPIC} + +log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector + +producer.id=inbound diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..4e04e85 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,35 @@ + + + + ${env:KAFKA_LOGS_TOPIC} + ${env:BOOTSTRAP_SERVERS} + + + + + ${kafka.bootstrap.servers} + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file