diff --git a/pom.xml b/pom.xml
index ae45bec..c8177df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,6 +23,12 @@
org.springframework.boot
spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
org.springframework.boot
@@ -75,7 +81,7 @@
jaxb-impl
2.2.11
-
+
org.springframework.boot
spring-boot-starter-test
@@ -98,7 +104,6 @@
1.0
compile
-
diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java
index 28792d9..584fd1a 100644
--- a/src/main/java/com/uci/inbound/Inbound.java
+++ b/src/main/java/com/uci/inbound/Inbound.java
@@ -10,7 +10,7 @@
import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories;
import org.springframework.kafka.annotation.EnableKafka;
-import com.uci.dao.service.HealthService;
+import com.uci.utils.kafka.KafkaLogConfig;
@EnableKafka
@EnableReactiveCassandraRepositories("com.uci.dao")
@@ -24,7 +24,7 @@
@SpringBootApplication
@ComponentScan(basePackages = {"com.uci.inbound", "com.uci.adapter", "com.uci.utils"})
public class Inbound {
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Inbound.class, args);
}
}
\ No newline at end of file
diff --git a/src/main/java/com/uci/inbound/health/ServiceStatusController.java b/src/main/java/com/uci/inbound/health/ServiceStatusController.java
index 9ec4b0b..5f513b7 100644
--- a/src/main/java/com/uci/inbound/health/ServiceStatusController.java
+++ b/src/main/java/com/uci/inbound/health/ServiceStatusController.java
@@ -10,6 +10,9 @@
import com.uci.utils.BotService;
import com.uci.dao.service.HealthService;
import com.uci.utils.kafka.KafkaConfig;
+import com.uci.utils.telemetry.LogTelemetryBuilder;
+import com.uci.utils.telemetry.LogTelemetryMessage;
+import com.uci.utils.telemetry.TelemetryLogger;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@@ -18,6 +21,9 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator;
@@ -32,15 +38,9 @@
@RestController
@RequestMapping(value = "/service")
public class ServiceStatusController {
- @Autowired
- private HealthService healthService;
- @RequestMapping(value = "/health", method = RequestMethod.GET, produces = { "application/json", "text/json" })
- public ResponseEntity statusCheck() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
- JsonNode json = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":true}}");
- return ResponseEntity.ok(json);
- }
+ @Autowired
+ private HealthService healthService;
@RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" })
public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException {
diff --git a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java
index ba5a200..eb884ec 100644
--- a/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java
+++ b/src/main/java/com/uci/inbound/netcore/NetcoreWhatsappConverter.java
@@ -8,6 +8,8 @@
import com.uci.utils.kafka.SimpleProducer;
import lombok.extern.slf4j.Slf4j;
import com.uci.utils.BotService;
+import com.uci.utils.CampaignService;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
@@ -42,6 +44,9 @@ public class NetcoreWhatsappConverter {
@Autowired
public BotService botService;
+
+ @Autowired
+ public CampaignService campaignService;
@RequestMapping(value = "/whatsApp", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws JsonProcessingException, JAXBException {
@@ -60,6 +65,7 @@ public void netcoreWhatsApp(@RequestBody NetcoreMessageFormat message) throws Js
.topicSuccess(inboundProcessed)
.kafkaProducer(kafkaProducer)
.botService(botService)
+ .campaignService(campaignService)
.build()
.process();
}
diff --git a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java
index 78e4037..ea1a625 100644
--- a/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java
+++ b/src/main/java/com/uci/inbound/utils/XMsgProcessingUtil.java
@@ -1,6 +1,7 @@
package com.uci.inbound.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uci.adapter.provider.factory.AbstractProvider;
import com.uci.adapter.Request.CommonMessage;
@@ -8,7 +9,12 @@
import com.uci.dao.repository.XMessageRepository;
import com.uci.dao.utils.XMessageDAOUtils;
import com.uci.utils.BotService;
+import com.uci.utils.CampaignService;
import com.uci.utils.kafka.SimpleProducer;
+import com.uci.utils.telemetry.LogTelemetryMessage;
+import com.uci.utils.telemetry.TelemetryLogger;
+import com.uci.utils.telemetry.util.TelemetryEventNames;
+
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import messagerosa.core.model.SenderReceiverInfo;
@@ -16,6 +22,12 @@
import reactor.core.publisher.Mono;
import javax.xml.bind.JAXBException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
@@ -27,195 +39,237 @@
@Builder
public class XMsgProcessingUtil {
- AbstractProvider adapter;
- CommonMessage inboundMessage;
- SimpleProducer kafkaProducer;
- XMessageRepository xMsgRepo;
- String topicSuccess;
- String topicFailure;
- BotService botService;
-
-
- public void process() throws JsonProcessingException {
-
- log.info("incoming message {}", new ObjectMapper().writeValueAsString(inboundMessage));
- try {
- adapter.convertMessageToXMsg(inboundMessage)
- .doOnError(genericError("Error in converting to XMessage by Adapter"))
- .subscribe(xmsg -> {
- getAppName(xmsg.getPayload().getText(), xmsg.getFrom())
- .subscribe(appName -> {
- xmsg.setApp(appName);
- XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg);
- if (isCurrentMessageNotAReply(xmsg)) {
- String whatsappId = xmsg.getMessageId().getChannelMessageId();
- getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED)
- .doOnError(genericError("Error in getting last message"))
- .subscribe(new Consumer() {
- @Override
- public void accept(XMessageDAO previousMessage) {
- previousMessage.setMessageId(whatsappId);
- xMsgRepo.save(previousMessage)
- .doOnError(genericError("Error in saving previous message"))
- .subscribe(new Consumer() {
- @Override
- public void accept(XMessageDAO updatedPreviousMessage) {
- xMsgRepo.insert(currentMessageToBeInserted)
- .doOnError(genericError("Error in inserting current message"))
- .subscribe(insertedMessage -> {
- sendEventToKafka(xmsg);
- });
- }
- });
- }
- });
- } else {
- xMsgRepo.insert(currentMessageToBeInserted)
- .doOnError(genericError("Error in inserting current message"))
- .subscribe(xMessageDAO -> {
- sendEventToKafka(xmsg);
- });
- }
- });
-
- });
-
- } catch (JAXBException e) {
- e.printStackTrace();
- }
- }
-
- private Consumer genericError(String s) {
- return c -> {
- log.error(s + "::" + c.getMessage());
- };
- }
-
- private boolean isCurrentMessageNotAReply(XMessage xmsg) {
- return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED);
- }
-
- private void sendEventToKafka(XMessage xmsg) {
- String xmessage = null;
- try {
- xmessage = xmsg.toXML();
- } catch (JAXBException e) {
- kafkaProducer.send(topicFailure, inboundMessage.toString());
- }
- kafkaProducer.send(topicSuccess, xmessage);
- }
-
- private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) {
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
- return xMsgRepo
- .findAllByFromIdAndTimestampAfter(userID, yesterday)
- .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID)))
- .collectList()
- .map(xMessageDAOS -> {
- if (xMessageDAOS.size() > 0) {
- List filteredList = new ArrayList<>();
- for (XMessageDAO xMessageDAO : xMessageDAOS) {
- if (xMessageDAO.getMessageState().equals(messageState.name()))
- filteredList.add(xMessageDAO);
- }
- if (filteredList.size() > 0) {
- filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp));
- }
-
- return xMessageDAOS.get(0);
- }
- return new XMessageDAO();
- });
- }
-
- private Mono getAppName(String text, SenderReceiverInfo from) {
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
- if (text.equals("")) {
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- }
- } else {
- try {
- return botService.getCampaignFromStartingMessage(text)
- .flatMap(new Function>() {
- @Override
- public Mono apply(String appName1) {
- if (appName1 == null || appName1.equals("")) {
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp();
- }
- });
- }
- }
- return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1);
- }
- });
- } catch (Exception e) {
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- }
- }
- }
- }
-
- private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) {
- return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday)
- .collectList()
- .map(new Function, XMessageDAO>() {
- @Override
- public XMessageDAO apply(List xMessageDAOS) {
- if (xMessageDAOS.size() > 0) {
- List filteredList = new ArrayList<>();
- for (XMessageDAO xMessageDAO : xMessageDAOS) {
- if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name()))
- filteredList.add(xMessageDAO);
- }
- if (filteredList.size() > 0) {
- filteredList.sort(new Comparator() {
- @Override
- public int compare(XMessageDAO o1, XMessageDAO o2) {
- return o1.getTimestamp().compareTo(o2.getTimestamp());
- }
- });
- }
- return xMessageDAOS.get(0);
- }
- return new XMessageDAO();
- }
- });
- }
+ AbstractProvider adapter;
+ CommonMessage inboundMessage;
+ SimpleProducer kafkaProducer;
+ XMessageRepository xMsgRepo;
+ String topicSuccess;
+ String topicFailure;
+ BotService botService;
+ CampaignService campaignService;
+
+ @Value("${producer.id}")
+ private String producerID;
+
+ private static final Logger telemetrylogger = LogManager.getLogger(TelemetryLogger.class);
+
+ public void process() throws JsonProcessingException {
+
+ String incomingMessage = new ObjectMapper().writeValueAsString(inboundMessage);
+ log.info("incoming message {}", incomingMessage);
+ try {
+ adapter.convertMessageToXMsg(inboundMessage)
+ .doOnError(genericError("Error in converting to XMessage by Adapter", null)).subscribe(xmsg -> {
+ getAppName(xmsg.getPayload().getText(), xmsg.getFrom()).subscribe(appName -> {
+ xmsg.setApp(appName);
+ XMessageDAO currentMessageToBeInserted = XMessageDAOUtils.convertXMessageToDAO(xmsg);
+ if (isCurrentMessageNotAReply(xmsg)) {
+ String whatsappId = xmsg.getMessageId().getChannelMessageId();
+ getLatestXMessage(xmsg.getFrom().getUserID(), XMessage.MessageState.REPLIED)
+ .doOnError(genericError("Error in getting last message", xmsg))
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(XMessageDAO previousMessage) {
+ previousMessage.setMessageId(whatsappId);
+ xMsgRepo.save(previousMessage)
+ .doOnError(genericError("Error in saving previous message", xmsg))
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(XMessageDAO updatedPreviousMessage) {
+ xMsgRepo.insert(currentMessageToBeInserted)
+ .doOnError(genericError(
+ "Error in inserting current message", xmsg))
+ .subscribe(insertedMessage -> {
+ sendEventToKafka(xmsg);
+ });
+ }
+ });
+ }
+ });
+ } else {
+ /* Log telemtery event - Start Conversation */
+ campaignService.getCampaignFromNameTransformer(xmsg.getCampaign())
+ .doOnError(genericError("Error in getting campaign data for telemetry log", null))
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(JsonNode t) {
+ String id = t.get("id") != null ? t.get("id").asText() : null;
+ String ownerId = t.get("owner") != null ? t.get("owner").asText() : null;
+ System.out.println("hey");
+ telemetrylogger.info(new LogTelemetryMessage(String.format("Start Conversation with incoming message : %s", incomingMessage),
+ TelemetryEventNames.STARTCONVERSATION, "", xmsg.getChannel(),
+ xmsg.getProvider(), producerID, xmsg.getFrom().getUserID(), id, ownerId));
+ }
+ });
+
+ xMsgRepo.insert(currentMessageToBeInserted)
+ .doOnError(genericError("Error in inserting current message", xmsg))
+ .subscribe(xMessageDAO -> {
+ sendEventToKafka(xmsg);
+ });
+ }
+ });
+
+ });
+
+ } catch (JAXBException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /* Error to be logged */
+ private Consumer genericError(String s, XMessage xmsg) {
+ return c -> {
+ log.error(s + "::" + c.getMessage());
+ /* Log Telemetry event - Exception */
+ if(xmsg != null) {
+ telemetrylogger.info(new LogTelemetryMessage(s,
+ TelemetryEventNames.AUDITEXCEPTIONS, "", xmsg.getChannel(),
+ xmsg.getProvider(), producerID, xmsg.getFrom().getUserID()));
+ } else {
+ telemetrylogger.info(new LogTelemetryMessage(s,
+ TelemetryEventNames.AUDITEXCEPTIONS));
+ }
+ };
+ }
+
+ private boolean isCurrentMessageNotAReply(XMessage xmsg) {
+ return !xmsg.getMessageState().equals(XMessage.MessageState.REPLIED);
+ }
+
+ private void sendEventToKafka(XMessage xmsg) {
+ String xmessage = null;
+ try {
+ xmessage = xmsg.toXML();
+ } catch (JAXBException e) {
+ kafkaProducer.send(topicFailure, inboundMessage.toString());
+ }
+ kafkaProducer.send(topicSuccess, xmessage);
+ }
+
+ private Mono getLatestXMessage(String userID, XMessage.MessageState messageState) {
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
+ return xMsgRepo.findAllByFromIdAndTimestampAfter(userID, yesterday)
+ .doOnError(genericError(String.format("Unable to find previous Message for userID %s", userID), null))
+ .collectList().map(xMessageDAOS -> {
+ if (xMessageDAOS.size() > 0) {
+ List filteredList = new ArrayList<>();
+ for (XMessageDAO xMessageDAO : xMessageDAOS) {
+ if (xMessageDAO.getMessageState().equals(messageState.name()))
+ filteredList.add(xMessageDAO);
+ }
+ if (filteredList.size() > 0) {
+ filteredList.sort(Comparator.comparing(XMessageDAO::getTimestamp));
+ }
+
+ return xMessageDAOS.get(0);
+ }
+ return new XMessageDAO();
+ });
+ }
+
+ private Mono getAppName(String text, SenderReceiverInfo from) {
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
+ if (text.equals("")) {
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ }
+ } else {
+ try {
+ return botService.getCampaignFromStartingMessage(text)
+ .flatMap(new Function>() {
+ @Override
+ public Mono apply(String appName1) {
+ if (appName1 == null || appName1.equals("")) {
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday,
+ XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return (xMessageLast.getApp() == null
+ || xMessageLast.getApp().isEmpty())
+ ? "finalAppName"
+ : xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday,
+ XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return (xMessageLast.getApp() == null
+ || xMessageLast.getApp().isEmpty())
+ ? "finalAppName"
+ : xMessageLast.getApp();
+ }
+ });
+ }
+ }
+ return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName")
+ : Mono.just(appName1);
+ }
+ });
+ } catch (Exception e) {
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ }
+ }
+ }
+ }
+
+ private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) {
+ return xMsgRepo.findAllByUserIdAndTimestampAfter(userID, yesterday).collectList()
+ .map(new Function, XMessageDAO>() {
+ @Override
+ public XMessageDAO apply(List xMessageDAOS) {
+ if (xMessageDAOS.size() > 0) {
+ List filteredList = new ArrayList<>();
+ for (XMessageDAO xMessageDAO : xMessageDAOS) {
+ if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name()))
+ filteredList.add(xMessageDAO);
+ }
+ if (filteredList.size() > 0) {
+ filteredList.sort(new Comparator() {
+ @Override
+ public int compare(XMessageDAO o1, XMessageDAO o2) {
+ return o1.getTimestamp().compareTo(o2.getTimestamp());
+ }
+ });
+ }
+ return xMessageDAOS.get(0);
+ }
+ return new XMessageDAO();
+ }
+ });
+ }
}
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index d7c0d01..4ad6a09 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -34,6 +34,12 @@ fusionauth.url = ${FUSIONAUTH_URL}
fusionauth.key = ${FUSIONAUTH_KEY}
+# log4j2 log topic config
+kafka.logs.topic=${KAFKA_LOGS_TOPIC}
+
+log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
+
+producer.id=inbound
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..4e04e85
--- /dev/null
+++ b/src/main/resources/log4j2.xml
@@ -0,0 +1,35 @@
+
+
+
+ ${env:KAFKA_LOGS_TOPIC}
+ ${env:BOOTSTRAP_SERVERS}
+
+
+
+
+ ${kafka.bootstrap.servers}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file