From ed713c835c415c09bcd7bfc33205c2275b808d60 Mon Sep 17 00:00:00 2001 From: Marius Date: Sun, 3 May 2026 23:46:32 +0300 Subject: [PATCH 1/9] 858 - Failed SyncMailjetUsersJob #858 --- ...windows--local-dev-segue-config.properties | 5 + .../IsaacApplicationRegister.java | 2 + .../ac/cam/cl/dtg/segue/api/CorsFilter.java | 88 +++++ .../api/managers/ExternalAccountManager.java | 339 +++++++++--------- .../SegueGuiceConfigurationModule.java | 2 +- .../users/IExternalAccountDataManager.java | 2 + .../PgExternalAccountPersistenceManager.java | 35 ++ .../scheduler/jobs/SyncMailjetUsersJob.java | 5 +- .../util/email/MailJetApiClientWrapper.java | 108 +++++- .../managers/ExternalAccountManagerTest.java | 138 +++++++ ...ExternalAccountPersistenceManagerTest.java | 110 ++++++ .../email/MailJetApiClientWrapperTest.java | 122 +++++++ 12 files changed, 778 insertions(+), 178 deletions(-) create mode 100644 src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java diff --git a/config-templates/windows--local-dev-segue-config.properties b/config-templates/windows--local-dev-segue-config.properties index 5541bbbd96..ba98fa1d54 100644 --- a/config-templates/windows--local-dev-segue-config.properties +++ b/config-templates/windows--local-dev-segue-config.properties @@ -19,6 +19,11 @@ EMAIL_SIGNATURE=Isaac Physics Project EVENT_ADMIN_EMAIL=events@isaacphysics.org EVENT_ICAL_UID_DOMAIN=isaacphysics.org +# CORS Configuration (ALB Migration) +# Allowed origins for CORS requests. --- MMM Update for production +# Format: comma-separated list of origins or wildcard patterns (e.g., https://*.isaaccomputerscience.org) --- MMM ? +CORS_ALLOWED_ORIGINS=https://*.isaaccomputerscience.org + SCHOOL_CSV_LIST_PATH=C:\\dev\\isaac-other-resources\\schools_list_2026_spring.csv # Segue diff --git a/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java b/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java index d4f6edb0d7..1868fb2164 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java +++ b/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java @@ -47,6 +47,7 @@ import uk.ac.cam.cl.dtg.segue.api.AuthenticationFacade; import uk.ac.cam.cl.dtg.segue.api.AuthorisationFacade; import uk.ac.cam.cl.dtg.segue.api.ContactFacade; +import uk.ac.cam.cl.dtg.segue.api.CorsFilter; import uk.ac.cam.cl.dtg.segue.api.EmailFacade; import uk.ac.cam.cl.dtg.segue.api.ExceptionSanitiser; import uk.ac.cam.cl.dtg.segue.api.GlossaryFacade; @@ -127,6 +128,7 @@ public final Set getSingletons() { this.singletons.add(injector.getInstance(QuizFacade.class)); // initialise filters + this.singletons.add(injector.getInstance(CorsFilter.class)); this.singletons.add(injector.getInstance(PerformanceMonitor.class)); this.singletons.add(injector.getInstance(SessionValidator.class)); this.singletons.add(injector.getInstance(ExceptionSanitiser.class)); diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java new file mode 100644 index 0000000000..c7b6e9d192 --- /dev/null +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java @@ -0,0 +1,88 @@ +package uk.ac.cam.cl.dtg.segue.api; + +import com.google.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.container.ContainerResponseContext; +import jakarta.ws.rs.container.ContainerResponseFilter; +import jakarta.ws.rs.container.PreMatching; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.Provider; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.ac.cam.cl.dtg.util.PropertiesLoader; + +/** + * CORS Filter for ALB migration. + * Emits CORS headers from the application instead of relying on nginx ingress annotations. + * This allows the API to work with AWS ALB which doesn't have a built-in CORS module. + */ +@Provider +@PreMatching +public class CorsFilter implements ContainerRequestFilter, ContainerResponseFilter { + + private static final Logger log = LoggerFactory.getLogger(CorsFilter.class); + + private static final String ALLOWED_ORIGINS_PROPERTY = "CORS_ALLOWED_ORIGINS"; + private static final String DEFAULT_ALLOWED_ORIGINS = "https://*.isaaccomputerscience.org"; + + private final String allowedOrigins; + + @Inject + public CorsFilter(final PropertiesLoader properties) { + String configuredOrigins = properties.getProperty(ALLOWED_ORIGINS_PROPERTY); + this.allowedOrigins = configuredOrigins != null ? configuredOrigins : DEFAULT_ALLOWED_ORIGINS; + } + + @Override + public void filter(final ContainerRequestContext requestContext) throws IOException { + if (requestContext.getMethod().equalsIgnoreCase("OPTIONS")) { + requestContext.abortWith( + Response.ok() + .header("Access-Control-Allow-Origin", getAllowedOrigin(requestContext)) + .header("Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS, PATCH") + .header("Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept, Authorization, X-Api-Token") + .header("Access-Control-Max-Age", "3600") + .build()); + } + } + + @Override + public void filter(final ContainerRequestContext requestContext, + final ContainerResponseContext responseContext) throws IOException { + String allowedOrigin = getAllowedOrigin(requestContext); + responseContext.getHeaders().add("Access-Control-Allow-Origin", allowedOrigin); + responseContext.getHeaders().add("Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS, PATCH"); + responseContext.getHeaders().add("Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept, Authorization, X-Api-Token"); + responseContext.getHeaders().add("Access-Control-Allow-Credentials", "true"); + } + + /** + * Validate and return the allowed origin from the request. + * Currently, allows all requests from Isaac domains (*.isaaccomputerscience.org). + * More sophisticated --- MMM ??? + * + * @param requestContext the request context + * @return the allowed origin, or * if validation fails + */ + private String getAllowedOrigin(final ContainerRequestContext requestContext) { + String origin = requestContext.getHeaderString("Origin"); + + if (origin == null) { + return allowedOrigins; + } + + // For simplicity, allow any Isaac domain --- MMM ??? + // The allowedOrigins property can be configured as a regex or comma-separated list if needed. + if (origin.contains("isaaccomputerscience.org") || origin.contains("localhost")) { + return origin; + } + + return allowedOrigins; + } +} \ No newline at end of file diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index 7ac095e882..7f0aa51192 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -19,8 +19,11 @@ import com.mailjet.client.errors.MailjetClientCommunicationException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetRateLimitException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import org.json.JSONObject; +import java.util.Map; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; @@ -32,6 +35,9 @@ public class ExternalAccountManager implements IExternalAccountManager { private static final Logger log = LoggerFactory.getLogger(ExternalAccountManager.class); + private static final String MAILJET = "MAILJET - "; + private static final int BULK_BATCH_SIZE = 1000; + private static final int RATE_LIMIT_RETRY_SLEEP_MS = 10000; // 10 seconds private final IExternalAccountDataManager database; private final MailJetApiClientWrapper mailjetApi; @@ -58,233 +64,205 @@ public ExternalAccountManager(final MailJetApiClientWrapper mailjetApi, final IE */ @Override public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchronisationException { - log.info("Starting Mailjet synchronization process"); + log.info("{}Starting Mailjet synchronization process", MAILJET); List userRecordsToUpdate; try { userRecordsToUpdate = database.getRecentlyChangedRecords(); - log.info("Found {} users to synchronize with Mailjet", userRecordsToUpdate.size()); + log.info("{}Found {} users to synchronize with Mailjet", MAILJET, userRecordsToUpdate.size()); } catch (SegueDatabaseException e) { throw new ExternalAccountSynchronisationException("Failed to retrieve users for synchronization" + e.getMessage()); } if (userRecordsToUpdate.isEmpty()) { - log.info("No users to synchronize"); + log.info("{}No users to synchronize", MAILJET); return; } SyncMetrics metrics = new SyncMetrics(); + List successfullyProcessedUserIds = new ArrayList<>(); + + // Separate users into deletions and syncs + List usersToDelete = new ArrayList<>(); + List usersToSync = new ArrayList<>(); for (UserExternalAccountChanges userRecord : userRecordsToUpdate) { - Long userId = userRecord.getUserId(); + if (userRecord.isDeleted() && userRecord.getProviderUserId() != null) { + usersToDelete.add(userRecord); + } else { + usersToSync.add(userRecord); + } + } + // Process deletions individually with backoff + for (UserExternalAccountChanges userRecord : usersToDelete) { try { - processUserSync(userRecord, metrics); - metrics.incrementSuccess(); - + deleteUserFromMailJetWithBackoff(userRecord.getProviderUserId(), userRecord); + database.updateProviderLastUpdated(userRecord.getUserId()); + metrics.incrementDeleted(); + successfullyProcessedUserIds.add(userRecord.getUserId()); } catch (SegueDatabaseException e) { metrics.incrementDatabaseError(); - log.error("Database error storing Mailjet update for user ID: {}", userId, e); + log.error("{}Database error during deletion for user ID: {}", MAILJET, userRecord.getUserId(), e); } catch (MailjetClientCommunicationException e) { metrics.incrementCommunicationError(); throw new ExternalAccountSynchronisationException("Failed to connect to Mailjet: " + e.getMessage()); - } catch (MailjetRateLimitException e) { - metrics.incrementRateLimitError(); - log.warn("Mailjet rate limit exceeded while processing user ID: {}. Processed {} users before limit", - userId, metrics.getSuccessCount()); - throw new ExternalAccountSynchronisationException( - "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users" + e); - } catch (MailjetException e) { metrics.incrementMailjetError(); - log.error("Mailjet API error while processing user ID: {}. Continuing with next user", userId, e); - } catch (Exception e) { - metrics.incrementUnexpectedError(); - log.error("Unexpected error processing user ID: {}", userId, e); + log.error("{}Mailjet API error during deletion for user ID: {}. Continuing.", + MAILJET, userRecord.getUserId(), e); } } - logSyncSummary(metrics, userRecordsToUpdate.size()); - } - - /** - * Process synchronization for a single user. - */ - private void processUserSync(UserExternalAccountChanges userRecord, SyncMetrics metrics) - throws SegueDatabaseException, MailjetException { - - Long userId = userRecord.getUserId(); - String accountEmail = userRecord.getAccountEmail(); - - if (accountEmail == null || accountEmail.trim().isEmpty()) { - log.warn("User ID {} has null or empty email address. Skipping", userId); - metrics.incrementSkipped(); - return; + // Process syncs via bulk API grouped by subscription state + try { + Map> groupedUsers = + groupUsersBySubscriptionState(usersToSync); + + for (Map.Entry> entry : groupedUsers.entrySet()) { + SubscriptionGroup group = entry.getKey(); + List groupUsers = entry.getValue(); + + for (int i = 0; i < groupUsers.size(); i += BULK_BATCH_SIZE) { + int endIndex = Math.min(i + BULK_BATCH_SIZE, groupUsers.size()); + List batch = groupUsers.subList(i, endIndex); + + try { + mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); + for (UserExternalAccountChanges user : batch) { + metrics.incrementSuccess(); + successfullyProcessedUserIds.add(user.getUserId()); + } + } catch (MailjetRateLimitException e) { + metrics.incrementRateLimitError(); + log.warn("{}Mailjet rate limit exceeded during bulk sync. Processed {} users so far.", + MAILJET, metrics.getSuccessCount()); + throw new ExternalAccountSynchronisationException( + "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users"); + } catch (MailjetException e) { + metrics.incrementMailjetError(); + log.error("{}Mailjet API error during bulk sync of {} users. Continuing with next batch.", + MAILJET, batch.size(), e); + } + } + } + } catch (ExternalAccountSynchronisationException e) { + throw e; + } catch (Exception e) { + metrics.incrementUnexpectedError(); + log.error("{}Unexpected error during bulk sync", MAILJET, e); } - boolean accountEmailDeliveryFailed = - EmailVerificationStatus.DELIVERY_FAILED.equals(userRecord.getEmailVerificationStatus()); - String mailjetId = userRecord.getProviderUserId(); - - if (mailjetId != null && !mailjetId.trim().isEmpty()) { - handleExistingMailjetUser(mailjetId, userRecord, accountEmail, accountEmailDeliveryFailed, metrics); - } else { - handleNewMailjetUser(userRecord, accountEmail, accountEmailDeliveryFailed, metrics); + // Batch mark all successfully processed users as synced + try { + if (!successfullyProcessedUserIds.isEmpty()) { + database.batchMarkAsSynced(successfullyProcessedUserIds); + } + } catch (SegueDatabaseException e) { + metrics.incrementDatabaseError(); + log.error("{}Database error marking {} users as synced", MAILJET, successfullyProcessedUserIds.size(), e); } - database.updateProviderLastUpdated(userId); + logSyncSummary(metrics, userRecordsToUpdate.size()); } /** - * Handle synchronization for users that already exist in Mailjet. + * Delete user from Mailjet (GDPR compliance). */ - private void handleExistingMailjetUser(String mailjetId, UserExternalAccountChanges userRecord, - String accountEmail, boolean accountEmailDeliveryFailed, SyncMetrics metrics) + private void deleteUserFromMailJet(final String mailjetId, final UserExternalAccountChanges userRecord) throws SegueDatabaseException, MailjetException { - Long userId = userRecord.getUserId(); - JSONObject mailjetDetails = mailjetApi.getAccountByIdOrEmail(mailjetId); - - if (mailjetDetails == null) { - log.warn("User ID {} has Mailjet ID {} but account not found. Treating as new user", userId, mailjetId); - database.updateExternalAccount(userId, null); - handleNewMailjetUser(userRecord, accountEmail, accountEmailDeliveryFailed, metrics); + if (mailjetId == null || mailjetId.trim().isEmpty()) { + log.warn("{}Attempted to delete user with null/empty Mailjet ID. User ID: {}", MAILJET, userRecord.getUserId()); return; } - if (userRecord.isDeleted()) { - deleteUserFromMailJet(mailjetId, userRecord); - metrics.incrementDeleted(); - - } else if (accountEmailDeliveryFailed) { - log.info("User ID {} has delivery failed status. Unsubscribing from all lists", userId); - mailjetApi.updateUserSubscriptions(mailjetId, - MailJetSubscriptionAction.REMOVE, - MailJetSubscriptionAction.REMOVE); - metrics.incrementUnsubscribed(); - - } else if (!accountEmail.equalsIgnoreCase(mailjetDetails.getString("Email"))) { - log.info("User ID {} changed email. Recreating Mailjet account", userId); - mailjetApi.permanentlyDeleteAccountById(mailjetId); - String newMailjetId = mailjetApi.addNewUserOrGetUserIfExists(accountEmail); - - if (newMailjetId == null) { - throw new MailjetException("Failed to create new Mailjet account after email change for user: " + userId); - } - - updateUserOnMailJet(newMailjetId, userRecord); - metrics.incrementEmailChanged(); + Long userId = userRecord.getUserId(); + mailjetApi.permanentlyDeleteAccountById(mailjetId); + database.updateExternalAccount(userId, null); - } else { - updateUserOnMailJet(mailjetId, userRecord); - metrics.incrementUpdated(); - } + log.info("{}Deleted Mailjet account {} for user ID {} (GDPR deletion)", MAILJET, mailjetId, userId); } /** - * Handle synchronization for users that don't exist in Mailjet yet. + * Delete user from Mailjet with exponential backoff on rate limit. */ - private void handleNewMailjetUser(UserExternalAccountChanges userRecord, - String accountEmail, boolean accountEmailDeliveryFailed, SyncMetrics metrics) + private void deleteUserFromMailJetWithBackoff(final String mailjetId, + final UserExternalAccountChanges userRecord) throws SegueDatabaseException, MailjetException { - - Long userId = userRecord.getUserId(); - - if (!accountEmailDeliveryFailed && !userRecord.isDeleted()) { - log.info("Creating new Mailjet account for user ID {}", userId); - - String mailjetId = mailjetApi.addNewUserOrGetUserIfExists(accountEmail); - - if (mailjetId == null) { - throw new MailjetException("Mailjet returned null ID when creating account for user: " + userId); + try { + deleteUserFromMailJet(mailjetId, userRecord); + } catch (MailjetRateLimitException e) { + log.warn("{}Rate limit on deletion, retrying after backoff...", MAILJET); + try { + Thread.sleep(RATE_LIMIT_RETRY_SLEEP_MS); + deleteUserFromMailJet(mailjetId, userRecord); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new MailjetException("Interrupted during deletion backoff", ie); } - - updateUserOnMailJet(mailjetId, userRecord); - metrics.incrementCreated(); - - } else { - log.debug("User ID {} not eligible for Mailjet (deleted={}, deliveryFailed={}). Skipping", - userId, userRecord.isDeleted(), accountEmailDeliveryFailed); - database.updateExternalAccount(userId, null); - metrics.incrementSkipped(); } } /** - * Update user details and subscriptions in Mailjet. + * Group users by subscription state combination. */ - private void updateUserOnMailJet(final String mailjetId, final UserExternalAccountChanges userRecord) - throws SegueDatabaseException, MailjetException { - - if (mailjetId == null || mailjetId.trim().isEmpty()) { - throw new IllegalArgumentException("Mailjet ID cannot be null or empty"); - } - - Long userId = userRecord.getUserId(); - String stage = userRecord.getStage() != null ? userRecord.getStage() : "unknown"; - - mailjetApi.updateUserProperties( - mailjetId, - userRecord.getGivenName(), - userRecord.getRole().toString(), - userRecord.getEmailVerificationStatus().toString(), - stage - ); - - MailJetSubscriptionAction newsStatus = Boolean.TRUE.equals(userRecord.allowsNewsEmails()) - ? MailJetSubscriptionAction.FORCE_SUBSCRIBE - : MailJetSubscriptionAction.UNSUBSCRIBE; - - MailJetSubscriptionAction eventsStatus = Boolean.TRUE.equals(userRecord.allowsEventsEmails()) - ? MailJetSubscriptionAction.FORCE_SUBSCRIBE - : MailJetSubscriptionAction.UNSUBSCRIBE; - - mailjetApi.updateUserSubscriptions(mailjetId, newsStatus, eventsStatus); - database.updateExternalAccount(userId, mailjetId); - - log.debug("Updated Mailjet account {} for user ID {} (news={}, events={})", - mailjetId, userId, newsStatus, eventsStatus); - } + private Map> + groupUsersBySubscriptionState(final List users) { + Map> groups = new HashMap<>(); + + for (UserExternalAccountChanges user : users) { + if (user.getAccountEmail() == null || user.getAccountEmail().trim().isEmpty()) { + log.warn("{}User ID {} has null/empty email. Skipping.", MAILJET, user.getUserId()); + continue; + } - /** - * Delete user from Mailjet (GDPR compliance). - */ - private void deleteUserFromMailJet(final String mailjetId, final UserExternalAccountChanges userRecord) - throws SegueDatabaseException, MailjetException { + boolean deliveryFailed = + EmailVerificationStatus.DELIVERY_FAILED.equals(user.getEmailVerificationStatus()); + + MailJetSubscriptionAction newsAction; + MailJetSubscriptionAction eventsAction; + + if (deliveryFailed) { + newsAction = MailJetSubscriptionAction.REMOVE; + eventsAction = MailJetSubscriptionAction.REMOVE; + } else { + newsAction = Boolean.TRUE.equals(user.allowsNewsEmails()) + ? MailJetSubscriptionAction.FORCE_SUBSCRIBE + : MailJetSubscriptionAction.UNSUBSCRIBE; + eventsAction = Boolean.TRUE.equals(user.allowsEventsEmails()) + ? MailJetSubscriptionAction.FORCE_SUBSCRIBE + : MailJetSubscriptionAction.UNSUBSCRIBE; + } - if (mailjetId == null || mailjetId.trim().isEmpty()) { - log.warn("Attempted to delete user with null/empty Mailjet ID. User ID: {}", userRecord.getUserId()); - return; + SubscriptionGroup group = new SubscriptionGroup(newsAction, eventsAction); + groups.computeIfAbsent(group, k -> new ArrayList<>()).add(user); } - Long userId = userRecord.getUserId(); - mailjetApi.permanentlyDeleteAccountById(mailjetId); - database.updateExternalAccount(userId, null); - - log.info("Deleted Mailjet account {} for user ID {} (GDPR deletion)", mailjetId, userId); + return groups; } /** * Log summary of synchronization results. */ private void logSyncSummary(SyncMetrics metrics, int totalUsers) { - log.info("=== Mailjet Synchronization Complete ==="); - log.info("Total users to process: {}", totalUsers); - log.info("Successfully processed: {}", metrics.getSuccessCount()); - log.info(" - Created: {}", metrics.getCreatedCount()); - log.info(" - Updated: {}", metrics.getUpdatedCount()); - log.info(" - Deleted: {}", metrics.getDeletedCount()); - log.info(" - Email changed: {}", metrics.getEmailChangedCount()); - log.info(" - Unsubscribed: {}", metrics.getUnsubscribedCount()); - log.info(" - Skipped: {}", metrics.getSkippedCount()); - log.info("Errors:"); - log.info(" - Database errors: {}", metrics.getDatabaseErrorCount()); - log.info(" - Communication errors: {}", metrics.getCommunicationErrorCount()); - log.info(" - Rate limit errors: {}", metrics.getRateLimitErrorCount()); - log.info(" - Mailjet API errors: {}", metrics.getMailjetErrorCount()); - log.info(" - Unexpected errors: {}", metrics.getUnexpectedErrorCount()); - log.info("========================================"); + log.info("{}=== Mailjet Synchronization Complete ===", MAILJET); + log.info("{}Total users to process: {}", MAILJET, totalUsers); + log.info("{}Successfully processed: {}", MAILJET, metrics.getSuccessCount()); + log.info("{} - Created: {}", MAILJET, metrics.getCreatedCount()); + log.info("{} - Updated: {}", MAILJET, metrics.getUpdatedCount()); + log.info("{} - Deleted: {}", MAILJET, metrics.getDeletedCount()); + log.info("{} - Email changed: {}", MAILJET, metrics.getEmailChangedCount()); + log.info("{} - Unsubscribed: {}", MAILJET, metrics.getUnsubscribedCount()); + log.info("{} - Skipped: {}", MAILJET, metrics.getSkippedCount()); + log.info("{}Errors:", MAILJET); + log.info("{} - Database errors: {}", MAILJET, metrics.getDatabaseErrorCount()); + log.info("{} - Communication errors: {}", MAILJET, metrics.getCommunicationErrorCount()); + log.info("{} - Rate limit errors: {}", MAILJET, metrics.getRateLimitErrorCount()); + log.info("{} - Mailjet API errors: {}", MAILJET, metrics.getMailjetErrorCount()); + log.info("{} - Unexpected errors: {}", MAILJET, metrics.getUnexpectedErrorCount()); + log.info("{}========================================", MAILJET); } /** @@ -400,4 +378,35 @@ int getUnexpectedErrorCount() { return unexpectedErrorCount; } } + + /** + * Key for grouping users by their subscription preferences. + */ + private static class SubscriptionGroup { + final MailJetSubscriptionAction newsAction; + final MailJetSubscriptionAction eventsAction; + + SubscriptionGroup(final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) { + this.newsAction = newsAction; + this.eventsAction = eventsAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionGroup that = (SubscriptionGroup) o; + return newsAction == that.newsAction && eventsAction == that.eventsAction; + } + + @Override + public int hashCode() { + return java.util.Objects.hash(newsAction, eventsAction); + } + } } \ No newline at end of file diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java b/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java index 25fa9eae8b..03b296daea 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java @@ -1057,7 +1057,7 @@ private static SegueJobService getSegueJobService(final PropertiesLoader propert "syncMailjetUsersJob", CRON_GROUP_NAME_JAVA_JOB, "Sync users to mailjet", - CRON_STRING_EVERY_FOUR_HOURS); + CRON_STRING_HOURLY); List configuredScheduledJobs = new ArrayList<>(Arrays.asList( piiSqlJob, diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java index d84d98a43b..63db38df84 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java @@ -27,4 +27,6 @@ public interface IExternalAccountDataManager { void updateProviderLastUpdated(Long userId) throws SegueDatabaseException; void updateExternalAccount(Long userId, String providerUserIdentifier) throws SegueDatabaseException; + + void batchMarkAsSynced(List userIds) throws SegueDatabaseException; } diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java index cb4bbeae57..c78a5480cb 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java @@ -184,6 +184,41 @@ public void updateExternalAccount(final Long userId, final String providerUserId } } + @Override + public void batchMarkAsSynced(final List userIds) throws SegueDatabaseException { + if (userIds == null || userIds.isEmpty()) { + log.debug("{} Batch mark as synced called with empty list", MAILJET); + return; + } + + StringBuilder queryBuilder = new StringBuilder( + "INSERT INTO external_accounts (user_id, provider_name, provider_last_updated) VALUES "); + + for (int i = 0; i < userIds.size(); i++) { + if (i > 0) { + queryBuilder.append(", "); + } + queryBuilder.append("(?, 'MailJet', NOW())"); + } + + queryBuilder.append(" ON CONFLICT (user_id, provider_name) DO UPDATE SET provider_last_updated = NOW()"); + + try (Connection conn = database.getDatabaseConnection(); + PreparedStatement pst = conn.prepareStatement(queryBuilder.toString()) + ) { + for (int i = 0; i < userIds.size(); i++) { + pst.setLong(i + 1, userIds.get(i)); + } + + int rowsAffected = pst.executeUpdate(); + log.debug("{} Batch marked {} users as synced ({} rows affected)", MAILJET, userIds.size(), rowsAffected); + + } catch (SQLException e) { + String errorMsg = String.format("Database error marking %d users as synced", userIds.size()); + throw new SegueDatabaseException(errorMsg + ": " + e.getMessage(), e); + } + } + /** * Build UserExternalAccountChanges object from database result set. * Extracts stage information from registered_contexts JSONB[] field. diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java b/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java index 12fd2d101c..235ef19183 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java @@ -35,6 +35,7 @@ public class SyncMailjetUsersJob implements Job { private static final Logger log = LoggerFactory.getLogger(SyncMailjetUsersJob.class); + private static final String MAILJET = "MAILJET - "; private final IExternalAccountManager externalAccountManager; private final EmailManager emailManager; @@ -55,7 +56,7 @@ public SyncMailjetUsersJob() { public void execute(final JobExecutionContext context) throws JobExecutionException { try { externalAccountManager.synchroniseChangedUsers(); - log.info("Success: synchronised users"); + log.info("{}Success: synchronised users", MAILJET); } catch (ExternalAccountSynchronisationException e) { final String subject = "Failed to execute SyncMailjetUsersJob"; StringWriter stringWriter = new StringWriter(); @@ -66,7 +67,7 @@ public void execute(final JobExecutionContext context) throws JobExecutionExcept new EmailCommunicationMessage(properties.getProperty(Constants.SERVER_ADMIN_ADDRESS), subject, exception, exception, EmailType.ADMIN); emailManager.addSystemEmailToQueue(email); - log.error("Failed to synchronise users"); + log.error("{}Failed to synchronise users", MAILJET); } } diff --git a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java index 50daa7cf06..21026ba3c0 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java +++ b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java @@ -26,6 +26,7 @@ import com.mailjet.client.errors.MailjetException; import com.mailjet.client.resource.Contact; import com.mailjet.client.resource.ContactManagecontactslists; +import com.mailjet.client.resource.ContactManagemanycontacts; import com.mailjet.client.resource.Contactdata; import com.mailjet.client.resource.Contacts; import com.mailjet.client.resource.ContactslistImportList; @@ -34,12 +35,15 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; public class MailJetApiClientWrapper { private static final Logger log = LoggerFactory.getLogger(MailJetApiClientWrapper.class); + private static final String MAILJET = "MAILJET - "; private static final String PROPERTY_VALUE_KEY = "value"; + private static final int BULK_BATCH_SIZE = 1000; private final MailjetClient mailjetClient; private final String newsListId; @@ -81,7 +85,7 @@ public MailJetApiClientWrapper(final String mailjetApiKey, final String mailjetA */ public JSONObject getAccountByIdOrEmail(final String mailjetIdOrEmail) throws MailjetException { if (mailjetIdOrEmail == null || mailjetIdOrEmail.trim().isEmpty()) { - log.debug("Attempted to get account with null/empty identifier"); + log.debug("{}Attempted to get account with null/empty identifier", MAILJET); return null; } @@ -94,7 +98,7 @@ public JSONObject getAccountByIdOrEmail(final String mailjetIdOrEmail) throws Ma } if (response.getStatus() != 200) { - log.warn("Unexpected Mailjet response status {} when fetching account", response.getStatus()); + log.warn("{}Unexpected Mailjet response status {} when fetching account", MAILJET, response.getStatus()); throw new MailjetException("Unexpected response status: " + response.getStatus()); } @@ -135,9 +139,9 @@ public void permanentlyDeleteAccountById(final String mailjetId) throws MailjetE MailjetResponse response = mailjetClient.delete(request); if (response.getStatus() == 204 || response.getStatus() == 200) { - log.info("Successfully deleted Mailjet account: {}", mailjetId); + log.info("{}Successfully deleted Mailjet account: {}", MAILJET, mailjetId); } else if (response.getStatus() == 404) { - log.debug("Attempted to delete non-existent Mailjet account: {}", mailjetId); + log.debug("{}Attempted to delete non-existent Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException("Failed to delete account. Status: " + response.getStatus()); } @@ -168,7 +172,7 @@ public void permanentlyDeleteAccountById(final String mailjetId) throws MailjetE */ public String addNewUserOrGetUserIfExists(final String email) throws MailjetException { if (email == null || email.trim().isEmpty()) { - log.warn("Attempted to create Mailjet account with null/empty email"); + log.warn("{}Attempted to create Mailjet account with null/empty email", MAILJET); return null; } @@ -201,7 +205,7 @@ private String createNewMailjetAccount(String normalizedEmail) throws MailjetExc if (response.getStatus() == 201 || response.getStatus() == 200) { JSONObject responseData = response.getData().getJSONObject(0); String mailjetId = String.valueOf(responseData.get("ID")); - log.info("Successfully created Mailjet account: {}", mailjetId); + log.info("{}Successfully created Mailjet account: {}", MAILJET, mailjetId); return mailjetId; } @@ -220,13 +224,13 @@ private String createNewMailjetAccount(String normalizedEmail) throws MailjetExc private String handleUserAlreadyExists(MailjetClientRequestException e, String normalizedEmail) throws MailjetException { if (e.getMessage() != null && e.getMessage().toLowerCase().contains("already exists")) { - log.debug("User already exists in Mailjet, fetching existing account"); + log.debug("{}User already exists in Mailjet, fetching existing account", MAILJET); try { JSONObject existingAccount = getAccountByIdOrEmail(normalizedEmail); if (existingAccount != null) { String mailjetId = String.valueOf(existingAccount.get("ID")); - log.info("Retrieved existing Mailjet account: {}", mailjetId); + log.info("{}Retrieved existing Mailjet account: {}", MAILJET, mailjetId); return mailjetId; } else { String errorMsg = String.format("User reported as existing but couldn't fetch account: %s", normalizedEmail); @@ -284,7 +288,7 @@ public void updateUserProperties(final String mailjetId, final String firstName, MailjetResponse response = mailjetClient.put(request); if (response.getStatus() == 200 && response.getTotal() == 1) { - log.debug("Successfully updated properties for Mailjet account: {}", mailjetId); + log.debug("{}Successfully updated properties for Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException( String.format("Failed to update user properties. Status: %d, Total: %d", response.getStatus(), @@ -343,7 +347,7 @@ ContactManagecontactslists.CONTACTSLISTS, new JSONArray().put( MailjetResponse response = mailjetClient.post(request); if (response.getStatus() == 201 && response.getTotal() == 1) { - log.debug("Successfully updated subscriptions for Mailjet account: {}", mailjetId); + log.debug("{}Successfully updated subscriptions for Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException( String.format("Failed to update user subscriptions. Status: %d, Total: %d", response.getStatus(), @@ -393,4 +397,88 @@ private boolean isCommunicationException(MailjetException e) { String msg = e.getMessage().toLowerCase(); return msg.contains("timeout") || msg.contains("connection"); } + + /** + * Bulk create or update contacts with properties and list subscriptions. + *
+ * Uses the asynchronous ContactManagemanycontacts endpoint to handle up to BULK_BATCH_SIZE + * contacts in a single API call. All contacts in the batch share the same list subscription + * actions. The caller must partition users by subscription state before calling this method. + * + * @param users - list of users to sync (caller must ensure size <= BULK_BATCH_SIZE) + * @param newsAction - subscription action for NEWS_AND_UPDATES list + * @param eventsAction - subscription action for EVENTS list + * @return Mailjet job ID for async tracking (or null if request fails) + * @throws MailjetException - if underlying MailjetClient throws an exception + */ + public String bulkSyncUsers(final java.util.List users, + final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) throws MailjetException { + if (users == null || users.isEmpty()) { + log.warn("{}Attempted to bulk sync empty user list", MAILJET); + return null; + } + + if (users.size() > BULK_BATCH_SIZE) { + throw new IllegalArgumentException("{}Bulk sync batch size ({}) exceeds limit ({})" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())) + .replace("{}", String.valueOf(BULK_BATCH_SIZE))); + } + + try { + JSONArray contactsArray = new JSONArray(); + for (UserExternalAccountChanges user : users) { + JSONObject contactObj = new JSONObject() + .put("Email", user.getAccountEmail()) + .put("Name", user.getGivenName() != null ? user.getGivenName() : "") + .put("Properties", new JSONObject() + .put("firstname", user.getGivenName() != null ? user.getGivenName() : "") + .put("role", user.getRole().toString()) + .put("verification_status", user.getEmailVerificationStatus().toString()) + .put("stage", user.getStage() != null ? user.getStage() : "unknown")); + contactsArray.put(contactObj); + } + + JSONArray listsArray = new JSONArray() + .put(new JSONObject() + .put("ListID", legalListId) + .put("Action", MailJetSubscriptionAction.FORCE_SUBSCRIBE.getValue())) + .put(new JSONObject() + .put("ListID", newsListId) + .put("Action", newsAction.getValue())) + .put(new JSONObject() + .put("ListID", eventsListId) + .put("Action", eventsAction.getValue())); + + MailjetRequest request = new MailjetRequest(ContactManagemanycontacts.resource) + .property(ContactManagemanycontacts.CONTACTS, contactsArray) + .property(ContactManagemanycontacts.CONTACTSLISTS, listsArray); + + MailjetResponse response = mailjetClient.post(request); + + if (response.getStatus() == 200 || response.getStatus() == 201) { + JSONObject responseData = response.getData().getJSONObject(0); + String jobId = responseData.optString("JobID", null); + log.info("{}Bulk sync submitted for {} users (job ID: {})", MAILJET, users.size(), jobId); + return jobId; + } + + throw new MailjetException( + "{}Failed to submit bulk sync. Status: {}".replace("{}", MAILJET) + .replace("{}", String.valueOf(response.getStatus()))); + + } catch (JSONException e) { + String errorMsg = "{}JSON parsing error during bulk sync of {} users".replace("{}", MAILJET) + .replace("{}", String.valueOf(users.size())); + throw new MailjetException(errorMsg, e); + + } catch (MailjetException e) { + if (isCommunicationException(e)) { + String errorMsg = "{}Communication error during bulk sync of {} users" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); + throw new MailjetClientCommunicationException(errorMsg, e); + } + throw e; + } + } } diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index d23ca4a64c..d61b4338f5 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -1,6 +1,8 @@ package uk.ac.cam.cl.dtg.segue.api.managers; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; @@ -44,6 +46,142 @@ public void setUp() { @Nested class SynchroniseChangedUsersTests { + @Test + void synchroniseChangedUsers_WithBulkUsers_ShouldUseBulkApi() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - users with same subscription preferences should be batched + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.STUDENT, "Jane", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + + @Test + void synchroniseChangedUsers_WithMixedSubscriptionPreferences_ShouldGroupByPreferences() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - users with different subscription preferences should be in different groups + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.STUDENT, "Jane", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + // First bulk call for group (FORCE_SUBSCRIBE, FORCE_SUBSCRIBE) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + // Second bulk call for group (FORCE_SUBSCRIBE, UNSUBSCRIBE) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.UNSUBSCRIBE))).andReturn("job124"); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + + @Test + void synchroniseChangedUsers_WithDeletedUser_ShouldDeleteIndividually() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, "mailjetId123", "test@example.com", Role.STUDENT, "John", true, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + mockMailjetApi.permanentlyDeleteAccountById("mailjetId123"); + expectLastCall(); + mockDatabase.updateExternalAccount(1L, null); + expectLastCall(); + mockDatabase.updateProviderLastUpdated(1L); + expectLastCall(); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + + @Test + void synchroniseChangedUsers_WithDeliveryFailedUser_ShouldGroupAsRemove() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.DELIVERY_FAILED, true, true, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + // Delivery failed users should call with REMOVE for both news and events + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.REMOVE), + eq(MailJetSubscriptionAction.REMOVE))).andReturn("job123"); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + + @Test + void synchroniseChangedUsers_WithEmptyUserList_ShouldReturnWithoutError() + throws SegueDatabaseException, ExternalAccountSynchronisationException { + // Arrange + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(new ArrayList<>()); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + @Test void synchroniseChangedUsers_WithNewUser_ShouldCreateAccount() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java index 16cd0726be..74d8cf215e 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java @@ -15,6 +15,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -126,6 +127,115 @@ void getRecentlyChangedRecords_WithDatabaseError_ShouldThrowException() throws E } } + @Nested + class BatchMarkAsSyncedTests { + + @Test + void batchMarkAsSynced_WithValidUserIds_ShouldUpdateDatabase() throws Exception { + // Arrange + List userIds = List.of(1L, 2L, 3L); + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + mockPreparedStatement.setLong(1, 1L); + expectLastCall(); + mockPreparedStatement.setLong(2, 2L); + expectLastCall(); + mockPreparedStatement.setLong(3, 3L); + expectLastCall(); + + expect(mockPreparedStatement.executeUpdate()).andReturn(3); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(userIds); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithEmptyList_ShouldDoNothing() throws Exception { + // Arrange - no expectations set, method should return early + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(new ArrayList<>()); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithLargeUserList_ShouldBatchInsert() throws Exception { + // Arrange + List userIds = new ArrayList<>(); + for (long i = 1; i <= 100; i++) { + userIds.add(i); + } + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + for (long i = 1; i <= 100; i++) { + mockPreparedStatement.setLong((int) i, i); + expectLastCall(); + } + + expect(mockPreparedStatement.executeUpdate()).andReturn(100); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(userIds); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithDatabaseError_ShouldThrowException() throws Exception { + // Arrange + List userIds = List.of(1L, 2L); + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + mockPreparedStatement.setLong(1, 1L); + expectLastCall(); + mockPreparedStatement.setLong(2, 2L); + expectLastCall(); + + expect(mockPreparedStatement.executeUpdate()).andThrow(new SQLException("Database error")); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act & Assert + assertThrows(SegueDatabaseException.class, + () -> persistenceManager.batchMarkAsSynced(userIds)); + + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + } + // Helper method to setup mock ResultSet with all expected calls private void setupMockResultSetForUser(Long userId, String mailjetId, String email, String role, String givenName, boolean deleted, String verificationStatus, diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java index 54870aa6af..9b10a6e142 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java @@ -6,6 +6,8 @@ import com.mailjet.client.errors.MailjetClientRequestException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetClientCommunicationException; +import java.util.ArrayList; +import java.util.List; import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; @@ -14,6 +16,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; +import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; +import uk.ac.cam.cl.dtg.isaac.dos.users.Role; +import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; @@ -671,6 +676,123 @@ void updateUserSubscriptions_WithCommunicationError_ShouldThrowCommunicationExce } } + @Nested + class BulkSyncUsersTests { + + @Test + void bulkSyncUsers_WithValidUsers_ShouldReturnJobId() throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.TEACHER, "Jane", false, + EmailVerificationStatus.VERIFIED, false, true, "A_LEVEL" + ) + ); + + MailjetResponse mockResponse = createMock(MailjetResponse.class); + JSONArray mockData = new JSONArray(); + JSONObject mockResult = new JSONObject(); + mockResult.put("JobID", "job123"); + mockData.put(mockResult); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))).andReturn(mockResponse); + expect(mockResponse.getStatus()).andReturn(200); + expect(mockResponse.getData()).andReturn(mockData); + + replay(mockMailjetClient, mockResponse); + + // Act + String jobId = mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); + + // Assert + verify(mockMailjetClient, mockResponse); + assertEquals("job123", jobId); + } + + @Test + void bulkSyncUsers_WithEmptyList_ShouldReturnNull() throws MailjetException { + // Act + String result = mailJetApiClientWrapper.bulkSyncUsers(new ArrayList<>(), + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); + + // Assert + assertNull(result); + } + + @Test + void bulkSyncUsers_WithOversizedBatch_ShouldThrowException() { + // Arrange + List users = new ArrayList<>(); + for (int i = 0; i < 1001; i++) { + users.add(new UserExternalAccountChanges( + (long) i, null, "test" + i + "@example.com", Role.STUDENT, "User" + i, false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + )); + } + + // Act & Assert + assertThrows(IllegalArgumentException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + + @Test + void bulkSyncUsers_WithApiError_ShouldThrowException() throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + MailjetResponse mockResponse = createMock(MailjetResponse.class); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))).andReturn(mockResponse); + expect(mockResponse.getStatus()).andReturn(500); + + replay(mockMailjetClient, mockResponse); + + // Act & Assert + assertThrows(MailjetException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + + @Test + void bulkSyncUsers_WithCommunicationError_ShouldThrowCommunicationException() + throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + MailjetClientCommunicationException commException = + new MailjetClientCommunicationException("Connection timeout"); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))) + .andThrow(commException); + + replay(mockMailjetClient); + + // Act & Assert + assertThrows(MailjetClientCommunicationException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + } + private void injectMockMailjetClient(MailJetApiClientWrapper wrapper, MailjetClient client) { try { var field = MailJetApiClientWrapper.class.getDeclaredField("mailjetClient"); From 6d1fb34714ff3972b6dbd6a58edc0966ae7e5373 Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 4 May 2026 00:07:46 +0300 Subject: [PATCH 2/9] 858 - Failed SyncMailjetUsersJob #858 --- .../api/managers/ExternalAccountManager.java | 134 ++++-- .../util/email/MailJetApiClientWrapper.java | 119 +++-- .../managers/ExternalAccountManagerTest.java | 421 +----------------- 3 files changed, 189 insertions(+), 485 deletions(-) diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index 7f0aa51192..f0dc7bdf12 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; @@ -66,15 +65,7 @@ public ExternalAccountManager(final MailJetApiClientWrapper mailjetApi, final IE public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchronisationException { log.info("{}Starting Mailjet synchronization process", MAILJET); - List userRecordsToUpdate; - try { - userRecordsToUpdate = database.getRecentlyChangedRecords(); - log.info("{}Found {} users to synchronize with Mailjet", MAILJET, userRecordsToUpdate.size()); - } catch (SegueDatabaseException e) { - throw new ExternalAccountSynchronisationException("Failed to retrieve users for synchronization" - + e.getMessage()); - } - + List userRecordsToUpdate = getRecentlyChangedUsersOrThrow(); if (userRecordsToUpdate.isEmpty()) { log.info("{}No users to synchronize", MAILJET); return; @@ -83,19 +74,56 @@ public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchro SyncMetrics metrics = new SyncMetrics(); List successfullyProcessedUserIds = new ArrayList<>(); - // Separate users into deletions and syncs List usersToDelete = new ArrayList<>(); List usersToSync = new ArrayList<>(); + partitionUsersByType(userRecordsToUpdate, usersToDelete, usersToSync); + + processDeletions(usersToDelete, metrics, successfullyProcessedUserIds); + processBulkSyncs(usersToSync, metrics, successfullyProcessedUserIds); + markSuccessfullyProcessedAsSynced(successfullyProcessedUserIds, metrics); + + logSyncSummary(metrics, userRecordsToUpdate.size()); + } + + /** + * Retrieve recently changed users from database. + * Extracted to reduce cognitive complexity. + */ + private List getRecentlyChangedUsersOrThrow() + throws ExternalAccountSynchronisationException { + try { + List users = database.getRecentlyChangedRecords(); + log.info("{}Found {} users to synchronize with Mailjet", MAILJET, users.size()); + return users; + } catch (SegueDatabaseException e) { + throw new ExternalAccountSynchronisationException("Failed to retrieve users for synchronization" + + e.getMessage()); + } + } - for (UserExternalAccountChanges userRecord : userRecordsToUpdate) { - if (userRecord.isDeleted() && userRecord.getProviderUserId() != null) { + /** + * Partition users into deletions and syncs. + * Extracted to reduce cognitive complexity. + */ + private void partitionUsersByType(final List userRecords, + final List usersToDelete, + final List usersToSync) { + for (UserExternalAccountChanges userRecord : userRecords) { + if (Boolean.TRUE.equals(userRecord.isDeleted()) && userRecord.getProviderUserId() != null) { usersToDelete.add(userRecord); } else { usersToSync.add(userRecord); } } + } - // Process deletions individually with backoff + /** + * Process user deletions with error handling and backoff. + * Extracted to reduce cognitive complexity. + */ + private void processDeletions(final List usersToDelete, final SyncMetrics metrics, + final List successfullyProcessedUserIds) + throws ExternalAccountSynchronisationException { for (UserExternalAccountChanges userRecord : usersToDelete) { try { deleteUserFromMailJetWithBackoff(userRecord.getProviderUserId(), userRecord); @@ -114,38 +142,21 @@ public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchro MAILJET, userRecord.getUserId(), e); } } + } - // Process syncs via bulk API grouped by subscription state + /** + * Process bulk syncs grouped by subscription state. + * Extracted to reduce cognitive complexity. + */ + private void processBulkSyncs(final List usersToSync, final SyncMetrics metrics, + final List successfullyProcessedUserIds) + throws ExternalAccountSynchronisationException { try { Map> groupedUsers = groupUsersBySubscriptionState(usersToSync); for (Map.Entry> entry : groupedUsers.entrySet()) { - SubscriptionGroup group = entry.getKey(); - List groupUsers = entry.getValue(); - - for (int i = 0; i < groupUsers.size(); i += BULK_BATCH_SIZE) { - int endIndex = Math.min(i + BULK_BATCH_SIZE, groupUsers.size()); - List batch = groupUsers.subList(i, endIndex); - - try { - mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); - for (UserExternalAccountChanges user : batch) { - metrics.incrementSuccess(); - successfullyProcessedUserIds.add(user.getUserId()); - } - } catch (MailjetRateLimitException e) { - metrics.incrementRateLimitError(); - log.warn("{}Mailjet rate limit exceeded during bulk sync. Processed {} users so far.", - MAILJET, metrics.getSuccessCount()); - throw new ExternalAccountSynchronisationException( - "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users"); - } catch (MailjetException e) { - metrics.incrementMailjetError(); - log.error("{}Mailjet API error during bulk sync of {} users. Continuing with next batch.", - MAILJET, batch.size(), e); - } - } + processBatchesForGroup(entry.getKey(), entry.getValue(), metrics, successfullyProcessedUserIds); } } catch (ExternalAccountSynchronisationException e) { throw e; @@ -153,8 +164,47 @@ public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchro metrics.incrementUnexpectedError(); log.error("{}Unexpected error during bulk sync", MAILJET, e); } + } + + /** + * Process batches for a subscription group. + * Extracted to reduce cognitive complexity. + */ + private void processBatchesForGroup(final SubscriptionGroup group, + final List groupUsers, + final SyncMetrics metrics, + final List successfullyProcessedUserIds) + throws ExternalAccountSynchronisationException { + for (int i = 0; i < groupUsers.size(); i += BULK_BATCH_SIZE) { + int endIndex = Math.min(i + BULK_BATCH_SIZE, groupUsers.size()); + List batch = groupUsers.subList(i, endIndex); + + try { + mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); + for (UserExternalAccountChanges user : batch) { + metrics.incrementSuccess(); + successfullyProcessedUserIds.add(user.getUserId()); + } + } catch (MailjetRateLimitException e) { + metrics.incrementRateLimitError(); + log.warn("{}Mailjet rate limit exceeded during bulk sync. Processed {} users so far.", + MAILJET, metrics.getSuccessCount()); + throw new ExternalAccountSynchronisationException( + "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users"); + } catch (MailjetException e) { + metrics.incrementMailjetError(); + log.error("{}Mailjet API error during bulk sync of {} users. Continuing with next batch.", + MAILJET, batch.size(), e); + } + } + } - // Batch mark all successfully processed users as synced + /** + * Mark successfully processed users as synced in database. + * Extracted to reduce cognitive complexity. + */ + private void markSuccessfullyProcessedAsSynced(final List successfullyProcessedUserIds, + final SyncMetrics metrics) { try { if (!successfullyProcessedUserIds.isEmpty()) { database.batchMarkAsSynced(successfullyProcessedUserIds); @@ -163,8 +213,6 @@ public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchro metrics.incrementDatabaseError(); log.error("{}Database error marking {} users as synced", MAILJET, successfullyProcessedUserIds.size(), e); } - - logSyncSummary(metrics, userRecordsToUpdate.size()); } /** diff --git a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java index 21026ba3c0..814b8356ee 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java +++ b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java @@ -44,6 +44,8 @@ public class MailJetApiClientWrapper { private static final String MAILJET = "MAILJET - "; private static final String PROPERTY_VALUE_KEY = "value"; private static final int BULK_BATCH_SIZE = 1000; + private static final String ACTION = "Action"; + private static final String LIST_ID = "ListID"; private final MailjetClient mailjetClient; private final String newsListId; @@ -426,46 +428,14 @@ public String bulkSyncUsers(final java.util.List use } try { - JSONArray contactsArray = new JSONArray(); - for (UserExternalAccountChanges user : users) { - JSONObject contactObj = new JSONObject() - .put("Email", user.getAccountEmail()) - .put("Name", user.getGivenName() != null ? user.getGivenName() : "") - .put("Properties", new JSONObject() - .put("firstname", user.getGivenName() != null ? user.getGivenName() : "") - .put("role", user.getRole().toString()) - .put("verification_status", user.getEmailVerificationStatus().toString()) - .put("stage", user.getStage() != null ? user.getStage() : "unknown")); - contactsArray.put(contactObj); - } - - JSONArray listsArray = new JSONArray() - .put(new JSONObject() - .put("ListID", legalListId) - .put("Action", MailJetSubscriptionAction.FORCE_SUBSCRIBE.getValue())) - .put(new JSONObject() - .put("ListID", newsListId) - .put("Action", newsAction.getValue())) - .put(new JSONObject() - .put("ListID", eventsListId) - .put("Action", eventsAction.getValue())); + JSONArray contactsArray = buildContactsArray(users); + JSONArray listsArray = buildListsArray(newsAction, eventsAction); MailjetRequest request = new MailjetRequest(ContactManagemanycontacts.resource) .property(ContactManagemanycontacts.CONTACTS, contactsArray) .property(ContactManagemanycontacts.CONTACTSLISTS, listsArray); - MailjetResponse response = mailjetClient.post(request); - - if (response.getStatus() == 200 || response.getStatus() == 201) { - JSONObject responseData = response.getData().getJSONObject(0); - String jobId = responseData.optString("JobID", null); - log.info("{}Bulk sync submitted for {} users (job ID: {})", MAILJET, users.size(), jobId); - return jobId; - } - - throw new MailjetException( - "{}Failed to submit bulk sync. Status: {}".replace("{}", MAILJET) - .replace("{}", String.valueOf(response.getStatus()))); + return submitBulkSyncRequest(request, users); } catch (JSONException e) { String errorMsg = "{}JSON parsing error during bulk sync of {} users".replace("{}", MAILJET) @@ -473,12 +443,81 @@ public String bulkSyncUsers(final java.util.List use throw new MailjetException(errorMsg, e); } catch (MailjetException e) { - if (isCommunicationException(e)) { - String errorMsg = "{}Communication error during bulk sync of {} users" - .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); - throw new MailjetClientCommunicationException(errorMsg, e); - } + handleBulkSyncException(e, users); throw e; } } + + /** + * Build contacts array for bulk sync request. + * Extracted to reduce cognitive complexity. + */ + private JSONArray buildContactsArray(final java.util.List users) { + JSONArray contactsArray = new JSONArray(); + for (UserExternalAccountChanges user : users) { + JSONObject contactObj = new JSONObject() + .put("Email", user.getAccountEmail()) + .put("Name", user.getGivenName() != null ? user.getGivenName() : "") + .put("Properties", new JSONObject() + .put("firstname", user.getGivenName() != null ? user.getGivenName() : "") + .put("role", user.getRole().toString()) + .put("verification_status", user.getEmailVerificationStatus().toString()) + .put("stage", user.getStage() != null ? user.getStage() : "unknown")); + contactsArray.put(contactObj); + } + return contactsArray; + } + + /** + * Build lists array for bulk sync request. + * Extracted to reduce cognitive complexity. + */ + private JSONArray buildListsArray(final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) { + return new JSONArray() + .put(new JSONObject() + .put(LIST_ID, legalListId) + .put(ACTION, MailJetSubscriptionAction.FORCE_SUBSCRIBE.getValue())) + .put(new JSONObject() + .put(LIST_ID, newsListId) + .put(ACTION, newsAction.getValue())) + .put(new JSONObject() + .put(LIST_ID, eventsListId) + .put(ACTION, eventsAction.getValue())); + } + + /** + * Submit the bulk sync request and handle the response. + * Extracted to reduce cognitive complexity. + */ + private String submitBulkSyncRequest(final MailjetRequest request, + final java.util.List users) + throws MailjetException { + MailjetResponse response = mailjetClient.post(request); + + if (response.getStatus() == 200 || response.getStatus() == 201) { + JSONObject responseData = response.getData().getJSONObject(0); + String jobId = responseData.optString("JobID", null); + log.info("{}Bulk sync submitted for {} users (job ID: {})", MAILJET, users.size(), jobId); + return jobId; + } + + throw new MailjetException( + "{}Failed to submit bulk sync. Status: {}".replace("{}", MAILJET) + .replace("{}", String.valueOf(response.getStatus()))); + } + + /** + * Handle exceptions during bulk sync. + * Extracted to reduce cognitive complexity. + */ + private void handleBulkSyncException(final MailjetException e, + final java.util.List users) + throws MailjetException { + if (isCommunicationException(e)) { + String errorMsg = "{}Communication error during bulk sync of {} users" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); + throw new MailjetClientCommunicationException(errorMsg, e); + } + } } diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index d61b4338f5..3bacf315a8 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -13,15 +13,10 @@ import com.mailjet.client.errors.MailjetClientCommunicationException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetRateLimitException; -import java.util.ArrayList; import java.util.List; -import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.NullAndEmptySource; -import org.junit.jupiter.params.provider.ValueSource; import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; import uk.ac.cam.cl.dtg.isaac.dos.users.Role; import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; @@ -171,7 +166,7 @@ void synchroniseChangedUsers_WithDeliveryFailedUser_ShouldGroupAsRemove() void synchroniseChangedUsers_WithEmptyUserList_ShouldReturnWithoutError() throws SegueDatabaseException, ExternalAccountSynchronisationException { // Arrange - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(new ArrayList<>()); + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(List.of()); replay(mockDatabase, mockMailjetApi); @@ -182,240 +177,6 @@ void synchroniseChangedUsers_WithEmptyUserList_ShouldReturnWithoutError() verify(mockDatabase, mockMailjetApi); } - @Test - void synchroniseChangedUsers_WithNewUser_ShouldCreateAccount() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn("mailjetId123"); - mockMailjetApi.updateUserProperties("mailjetId123", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId123", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "mailjetId123"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithExistingUser_ShouldUpdateAccount() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.updateUserProperties("existingMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("existingMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "existingMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithDeletedUser_ShouldDeleteAccount() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", true, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.permanentlyDeleteAccountById("existingMailjetId"); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, null); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithDeliveryFailed_ShouldUnsubscribeFromAll() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.DELIVERY_FAILED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.updateUserSubscriptions("existingMailjetId", - MailJetSubscriptionAction.REMOVE, MailJetSubscriptionAction.REMOVE); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithEmailChange_ShouldRecreateAccount() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "newemail@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - JSONObject oldMailjetDetails = new JSONObject(); - oldMailjetDetails.put("Email", "oldemail@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(oldMailjetDetails); - mockMailjetApi.permanentlyDeleteAccountById("existingMailjetId"); - expectLastCall(); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("newemail@example.com")).andReturn("newMailjetId"); - mockMailjetApi.updateUserProperties("newMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithMailjetIdButAccountNotFound_ShouldTreatAsNew() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "nonExistentMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("nonExistentMailjetId")).andReturn(null); - mockDatabase.updateExternalAccount(1L, null); - expectLastCall(); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn("newMailjetId"); - mockMailjetApi.updateUserProperties("newMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @ParameterizedTest - @NullAndEmptySource - @ValueSource(strings = {" "}) - void synchroniseChangedUsers_WithNullOrEmptyEmail_ShouldSkip(String email) - throws SegueDatabaseException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, email, Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - No mailjet calls should be made - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithEmptyList_ShouldReturnEarly() - throws SegueDatabaseException, ExternalAccountSynchronisationException { - // Arrange - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(new ArrayList<>()); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } @Test void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDatabaseException { @@ -436,14 +197,15 @@ void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDat void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetException("Mailjet error")); replay(mockDatabase, mockMailjetApi); @@ -459,14 +221,15 @@ void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() void synchroniseChangedUsers_WithCommunicationException_ShouldThrow() throws SegueDatabaseException, MailjetException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetClientCommunicationException("Communication error")); replay(mockDatabase, mockMailjetApi); @@ -482,14 +245,15 @@ void synchroniseChangedUsers_WithCommunicationException_ShouldThrow() void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() throws SegueDatabaseException, MailjetException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetRateLimitException("Rate limit exceeded")); replay(mockDatabase, mockMailjetApi); @@ -504,153 +268,6 @@ void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() verify(mockDatabase, mockMailjetApi); } - @Test - void synchroniseChangedUsers_WithDatabaseErrorDuringUpdate_ShouldLogAndContinue() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges user1 = new UserExternalAccountChanges( - 1L, "mailjetId1", "test1@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - UserExternalAccountChanges user2 = new UserExternalAccountChanges( - 2L, "mailjetId2", "test2@example.com", Role.TEACHER, "Jane", false, - EmailVerificationStatus.VERIFIED, false, true, "A Level" - ); - List changedUsers = List.of(user1, user2); - - JSONObject mailjet1 = new JSONObject(); - mailjet1.put("Email", "test1@example.com"); - JSONObject mailjet2 = new JSONObject(); - mailjet2.put("Email", "test2@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - - // First user - database error during update - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId1")).andReturn(mailjet1); - mockMailjetApi.updateUserProperties("mailjetId1", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId1", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "mailjetId1"); - expectLastCall().andThrow(new SegueDatabaseException("DB error")); - - // Second user - should still process - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId2")).andReturn(mailjet2); - mockMailjetApi.updateUserProperties("mailjetId2", "Jane", "TEACHER", "VERIFIED", "A Level"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId2", - MailJetSubscriptionAction.UNSUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(2L, "mailjetId2"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(2L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - Should not throw, continues processing - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithMultipleUsers_ShouldProcessAll() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges user1 = new UserExternalAccountChanges( - 1L, null, "new@example.com", Role.STUDENT, "Alice", false, - EmailVerificationStatus.VERIFIED, true, true, "GCSE" - ); - UserExternalAccountChanges user2 = new UserExternalAccountChanges( - 2L, "existingId", "existing@example.com", Role.TEACHER, "Bob", false, - EmailVerificationStatus.VERIFIED, false, false, "A Level" - ); - List changedUsers = List.of(user1, user2); - - JSONObject mailjet2 = new JSONObject(); - mailjet2.put("Email", "existing@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - - // User 1 - new user - expect(mockMailjetApi.addNewUserOrGetUserIfExists("new@example.com")).andReturn("newId"); - mockMailjetApi.updateUserProperties("newId", "Alice", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - // User 2 - existing user - expect(mockMailjetApi.getAccountByIdOrEmail("existingId")).andReturn(mailjet2); - mockMailjetApi.updateUserProperties("existingId", "Bob", "TEACHER", "VERIFIED", "A Level"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("existingId", - MailJetSubscriptionAction.UNSUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(2L, "existingId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(2L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithUnexpectedError_ShouldLogAndContinue() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "mailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId")) - .andThrow(new RuntimeException("Unexpected error")); - - replay(mockDatabase, mockMailjetApi); - - // Act - Should not throw, logs error and continues - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithNewUserAndNullMailjetId() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn(null); - - replay(mockDatabase, mockMailjetApi); - - // Act & Assert - externalAccountManager.synchroniseChangedUsers(); - - verify(mockDatabase, mockMailjetApi); - } } } From 8e34b35070cd32b60e510bad60c1c5cc4028c279 Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 4 May 2026 00:17:16 +0300 Subject: [PATCH 3/9] 858 - Failed SyncMailjetUsersJob #858 --- .../cam/cl/dtg/segue/api/managers/ExternalAccountManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index f0dc7bdf12..a9b611a178 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -191,6 +191,9 @@ private void processBatchesForGroup(final SubscriptionGroup group, MAILJET, metrics.getSuccessCount()); throw new ExternalAccountSynchronisationException( "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users"); + } catch (MailjetClientCommunicationException e) { + metrics.incrementCommunicationError(); + throw new ExternalAccountSynchronisationException("Failed to connect to Mailjet: " + e.getMessage()); } catch (MailjetException e) { metrics.incrementMailjetError(); log.error("{}Mailjet API error during bulk sync of {} users. Continuing with next batch.", From 3c71b37b0e1fa0e40d3b9d048623ee8e3f97f787 Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 4 May 2026 00:24:02 +0300 Subject: [PATCH 4/9] 858 - Failed SyncMailjetUsersJob #858 --- .../util/email/MailJetApiClientWrapper.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java index 814b8356ee..bd0d3b05cf 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java +++ b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java @@ -443,7 +443,11 @@ public String bulkSyncUsers(final java.util.List use throw new MailjetException(errorMsg, e); } catch (MailjetException e) { - handleBulkSyncException(e, users); + if (isCommunicationException(e)) { + String errorMsg = "{}Communication error during bulk sync of {} users" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); + throw new MailjetClientCommunicationException(errorMsg, e); + } throw e; } } @@ -494,8 +498,9 @@ private String submitBulkSyncRequest(final MailjetRequest request, final java.util.List users) throws MailjetException { MailjetResponse response = mailjetClient.post(request); + int status = response.getStatus(); - if (response.getStatus() == 200 || response.getStatus() == 201) { + if (status == 200 || status == 201) { JSONObject responseData = response.getData().getJSONObject(0); String jobId = responseData.optString("JobID", null); log.info("{}Bulk sync submitted for {} users (job ID: {})", MAILJET, users.size(), jobId); @@ -504,20 +509,7 @@ private String submitBulkSyncRequest(final MailjetRequest request, throw new MailjetException( "{}Failed to submit bulk sync. Status: {}".replace("{}", MAILJET) - .replace("{}", String.valueOf(response.getStatus()))); + .replace("{}", String.valueOf(status))); } - /** - * Handle exceptions during bulk sync. - * Extracted to reduce cognitive complexity. - */ - private void handleBulkSyncException(final MailjetException e, - final java.util.List users) - throws MailjetException { - if (isCommunicationException(e)) { - String errorMsg = "{}Communication error during bulk sync of {} users" - .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); - throw new MailjetClientCommunicationException(errorMsg, e); - } - } } From b5283eb1d3609b9bac00b290b0e044d105e4f6a3 Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 4 May 2026 09:35:48 +0300 Subject: [PATCH 5/9] 858 - Failed SyncMailjetUsersJob #858 --- ...windows--local-dev-segue-config.properties | 2 +- .../cam/cl/dtg/segue/etl/ContentIndexer.java | 21 ++++++++++++------- .../ac/cam/cl/dtg/segue/etl/ETLManager.java | 11 +++++----- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/config-templates/windows--local-dev-segue-config.properties b/config-templates/windows--local-dev-segue-config.properties index ba98fa1d54..8a88a840ee 100644 --- a/config-templates/windows--local-dev-segue-config.properties +++ b/config-templates/windows--local-dev-segue-config.properties @@ -21,7 +21,7 @@ EVENT_ICAL_UID_DOMAIN=isaacphysics.org # CORS Configuration (ALB Migration) # Allowed origins for CORS requests. --- MMM Update for production -# Format: comma-separated list of origins or wildcard patterns (e.g., https://*.isaaccomputerscience.org) --- MMM ? +# Format: comma-separated list of origins or wildcard patterns (e.g., https://*.isaaccomputerscience.org) CORS_ALLOWED_ORIGINS=https://*.isaaccomputerscience.org SCHOOL_CSV_LIST_PATH=C:\\dev\\isaac-other-resources\\schools_list_2026_spring.csv diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java index cc2eef4482..2fb8c881b5 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java @@ -79,6 +79,7 @@ public class ContentIndexer { private static final int MEDIA_FILE_SIZE_LIMIT = 300 * 1024; // Bytes private static final int NANOSECONDS_IN_A_MILLISECOND = 1000000; private static final String ERROR_OCCURRED_SUFFIX = ". The following error occurred: "; + private static final String CONTENT_LOG_PREFIX = "CONTENT - "; private static class IndexingContext { final Map contentCache; @@ -246,7 +247,8 @@ private synchronized void buildGitContentIndex(final String sha, repository.close(); log.debug("Tags available {}", tagsList); log.debug("All units: {}", allUnits); - log.info("Git content cache population for " + sanitiseInternalLogValue(sha) + " completed!"); + log.info(CONTENT_LOG_PREFIX + "Git content cache population for " + sanitiseInternalLogValue(sha) + " completed! Cached {} total objects", contentCache.size()); + log.info(CONTENT_LOG_PREFIX + "Git content cache for {} completed: {} total objects loaded", sanitiseInternalLogValue(sha), contentCache.size()); } catch (IOException e) { log.error("IOException while trying to access git repository. ", e); @@ -274,11 +276,12 @@ private void processJsonFile(final TreeWalk treeWalk, final Repository repositor content = this.augmentChildContent(content, treeWalk.getPathString(), null, content.getPublished()); if (null != content) { + log.info(CONTENT_LOG_PREFIX + "Successfully loaded page/content: {} from {}", content.getId(), treeWalk.getPathString()); indexContentObject(context.contentCache, context.tagsList, context.allUnits, context.publishedUnits, context.indexProblemCache, treeWalk.getPathString(), content); } } catch (JsonMappingException e) { - log.debug(String.format("Unable to parse the json file found %s as a content object. " + log.warn(String.format(CONTENT_LOG_PREFIX + "Unable to parse the json file found %s as a content object. " + "Skipping file due to error: \n %s", treeWalk.getPathString(), e.getMessage())); Content dummyContent = new Content(); dummyContent.setCanonicalSourceFile(treeWalk.getPathString()); @@ -354,7 +357,7 @@ private void validateAndCacheContent(final Content flattenedContent, final Conte } if (!context.contentCache.containsKey(flattenedContent.getId())) { - log.debug("Loading into cache: {} ({}) from {}", flattenedContent.getId(), flattenedContent.getType(), + log.info(CONTENT_LOG_PREFIX + "Loading into cache: {} ({}) from {}", flattenedContent.getId(), flattenedContent.getType(), treeWalkPath); context.contentCache.put(flattenedContent.getId(), flattenedContent); registerTags(flattenedContent.getTags(), context.tagsList); @@ -623,7 +626,7 @@ private synchronized void registerContentProblem(final Content c, final String m indexProblemCache.put(c, new ArrayList<>()); } - log.debug(message); + log.warn(CONTENT_LOG_PREFIX + message); indexProblemCache.get(c).add(message); //.replace("_", "\\_")); } @@ -781,8 +784,10 @@ public synchronized void buildElasticSearchIndex(final String sha, startTime = System.nanoTime(); es.bulkIndexWithIds(sha, ContentIndextype.CONTENT.toString(), contentToIndex); endTime = System.nanoTime(); - log.info("Bulk indexing content took: {}ms", (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); - log.info("Search index request sent for: " + sanitiseInternalLogValue(sha)); + log.info(CONTENT_LOG_PREFIX + "Bulk indexed {} content objects to Elasticsearch for version {}", contentToIndex.size(), sanitiseInternalLogValue(sha)); + log.info( + CONTENT_LOG_PREFIX + "Bulk indexing content took: {}ms", (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); + log.info(CONTENT_LOG_PREFIX + "Search index request sent for: {}", sanitiseInternalLogValue(sha)); // -- Delete } catch (SegueSearchException e) { log.error("Error whilst trying to perform bulk index operation.", e); } catch (ActionRequestValidationException e) { @@ -851,8 +856,8 @@ private void recordContentErrors(final String sha, final Map gi } } if (!missingContent.isEmpty()) { - log.debug("Referential integrity broken for ({}) related Content items. " - + "The following ids are referenced but do not exist: {}", missingContent.size(), expectedIds); + log.warn(CONTENT_LOG_PREFIX + "Referential integrity broken for ({}) related Content items. " + + "The following ids are referenced but do not exist: {}", missingContent.size(), missingContent); } // Find all references from published content to unpublished content. diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java index 382e913d07..e358b00492 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java @@ -20,6 +20,7 @@ class ETLManager { private static final String LATEST_INDEX_ALIAS = "latest"; private static final String TASK_PERIOD_SECONDS = "TASK_PERIOD_SECONDS"; private static final long TASK_PERIOD_SECONDS_FALLBACK = 300; + private static final String CONTENT_LOG_PREFIX = "CONTENT - "; private final ContentIndexer indexer; private final SchoolIndexer schoolIndexer; @@ -55,10 +56,10 @@ class ETLManager { } void setNamedVersion(final String alias, final String version) throws Exception { - log.debug("Requested aliased version: {} - {}", alias, version); + log.info(CONTENT_LOG_PREFIX + "Requested aliased version: {} - {}", alias, version); indexer.loadAndIndexContent(version); indexer.setNamedVersion(alias, version); - log.debug("Version {} with alias '{}' is successfully indexed.", version, alias); + log.info(CONTENT_LOG_PREFIX + "Version {} with alias '{}' is successfully indexed.", version, alias); } // Indexes all content in idempotent fashion. If the content is already indexed no action is taken. @@ -95,13 +96,13 @@ private class ContentIndexerTask implements Runnable { @Override public void run() { - log.debug("Starting content indexer thread."); + log.info(CONTENT_LOG_PREFIX + "Starting content indexer thread."); try { indexContent(); } catch (Exception e) { - log.error("ContentIndexerTask failed.", e); + log.error(CONTENT_LOG_PREFIX + "ContentIndexerTask failed.", e); } - log.debug("Content indexer thread complete, waiting for next scheduled run."); + log.info(CONTENT_LOG_PREFIX + "Content indexer thread complete, waiting for next scheduled run."); } } } From c51c3b8977b2a8b36d89a26b25072729c7cf5e9c Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 4 May 2026 10:36:04 +0300 Subject: [PATCH 6/9] 858 - Failed SyncMailjetUsersJob #858 --- .../cam/cl/dtg/segue/etl/ContentIndexer.java | 211 ++++++++---------- 1 file changed, 99 insertions(+), 112 deletions(-) diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java index 2fb8c881b5..a0ac426f39 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java @@ -71,6 +71,7 @@ public class ContentIndexer { private static final Logger log = LoggerFactory.getLogger(ContentIndexer.class); private static final ConcurrentHashMap VERSION_LOCKS = new ConcurrentHashMap<>(); + private static final String QUESTION = "Question: "; private final ElasticSearchIndexer es; private final GitDb database; @@ -81,29 +82,14 @@ public class ContentIndexer { private static final String ERROR_OCCURRED_SUFFIX = ". The following error occurred: "; private static final String CONTENT_LOG_PREFIX = "CONTENT - "; - private static class IndexingContext { - final Map contentCache; - final Set tagsList; - final Map allUnits; - final Map publishedUnits; - final Map> indexProblemCache; - final boolean includeUnpublished; - - IndexingContext(final Map contentCache, final Set tagsList, - final Map allUnits, final Map publishedUnits, - final Map> indexProblemCache, final boolean includeUnpublished) { - this.contentCache = contentCache; - this.tagsList = tagsList; - this.allUnits = allUnits; - this.publishedUnits = publishedUnits; - this.indexProblemCache = indexProblemCache; - this.includeUnpublished = includeUnpublished; - } + private record IndexingContext(Map contentCache, Set tagsList, Map allUnits, + Map publishedUnits, Map> indexProblemCache, + boolean includeUnpublished) { boolean shouldSkipUnpublished(final Content content) { - return !includeUnpublished && !content.getPublished(); + return !includeUnpublished && !content.getPublished(); + } } - } @Inject public ContentIndexer(final GitDb database, final ElasticSearchIndexer es, final ContentMapperUtils mapperUtils) { @@ -113,7 +99,7 @@ public ContentIndexer(final GitDb database, final ElasticSearchIndexer es, final } - void loadAndIndexContent(final String version) throws Exception, VersionLockedException { + void loadAndIndexContent(final String version) throws Exception { // Take version lock or fail Boolean alreadyLocked = VERSION_LOCKS.putIfAbsent(version, true); @@ -132,7 +118,7 @@ void loadAndIndexContent(final String version) throws Exception, VersionLockedEx // The case where only some of the content types have been successfully indexed for this version, should // never happen but is covered by an expunge at the start of #buildElasticSearchIndex(...). if (allContentTypesAreIndexedForVersion(version)) { - log.debug("Content already indexed: " + sanitiseInternalLogValue(version)); + log.debug("Content already indexed: {}", sanitiseInternalLogValue(version)); return; } @@ -204,7 +190,7 @@ void setNamedVersion(final String alias, final String version) { * @param allUnits a map of units used in numeric questions * @param publishedUnits a map of units used in published numeric questions * @param indexProblemCache a map of problems found in the indexed content - * @throws ContentManagerException if the SHA is null or the associated resource cannot be accessed + * @throws ContentManagerException if the SHA is null or the associated resource cannot be accessed */ private synchronized void buildGitContentIndex(final String sha, final boolean includeUnpublished, @@ -247,8 +233,6 @@ private synchronized void buildGitContentIndex(final String sha, repository.close(); log.debug("Tags available {}", tagsList); log.debug("All units: {}", allUnits); - log.info(CONTENT_LOG_PREFIX + "Git content cache population for " + sanitiseInternalLogValue(sha) + " completed! Cached {} total objects", contentCache.size()); - log.info(CONTENT_LOG_PREFIX + "Git content cache for {} completed: {} total objects loaded", sanitiseInternalLogValue(sha), contentCache.size()); } catch (IOException e) { log.error("IOException while trying to access git repository. ", e); @@ -257,7 +241,7 @@ private synchronized void buildGitContentIndex(final String sha, } private void processJsonFile(final TreeWalk treeWalk, final Repository repository, - final IndexingContext context) { + final IndexingContext context) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectLoader loader = repository.open(treeWalk.getObjectId(0)); @@ -276,7 +260,6 @@ private void processJsonFile(final TreeWalk treeWalk, final Repository repositor content = this.augmentChildContent(content, treeWalk.getPathString(), null, content.getPublished()); if (null != content) { - log.info(CONTENT_LOG_PREFIX + "Successfully loaded page/content: {} from {}", content.getId(), treeWalk.getPathString()); indexContentObject(context.contentCache, context.tagsList, context.allUnits, context.publishedUnits, context.indexProblemCache, treeWalk.getPathString(), content); } @@ -325,7 +308,7 @@ private void indexContentObject( } private void validateAndCacheContent(final Content flattenedContent, final Content parentContent, - final String treeWalkPath, final IndexingContext context) { + final String treeWalkPath, final IndexingContext context) { if (flattenedContent.getId() == null) { return; } @@ -357,8 +340,6 @@ private void validateAndCacheContent(final Content flattenedContent, final Conte } if (!context.contentCache.containsKey(flattenedContent.getId())) { - log.info(CONTENT_LOG_PREFIX + "Loading into cache: {} ({}) from {}", flattenedContent.getId(), flattenedContent.getType(), - treeWalkPath); context.contentCache.put(flattenedContent.getId(), flattenedContent); registerTags(flattenedContent.getTags(), context.tagsList); @@ -422,26 +403,27 @@ private Content augmentChildContent(final Content content, final String canonica content.setCanonicalSourceFile(canonicalSourceFile); - if (!content.getChildren().isEmpty()) { + if (content.getChildren() != null && !content.getChildren().isEmpty()) { for (ContentBase cb : content.getChildren()) { - if (cb instanceof Content) { - Content c = (Content) cb; + if (cb instanceof Content c) { this.augmentChildContent(c, canonicalSourceFile, newParentId, parentPublished); } } } - if (content instanceof Choice) { - Choice choice = (Choice) content; + if (content instanceof Choice choice) { this.augmentChildContent((Content) choice.getExplanation(), canonicalSourceFile, newParentId, parentPublished); } // hack to get cards to count as children: if (content instanceof IsaacCardDeck) { - for (IsaacCard card : ((IsaacCardDeck) content).getCards()) { - this.augmentChildContent(card, canonicalSourceFile, newParentId, parentPublished); + List cards = ((IsaacCardDeck) content).getCards(); + if (cards != null) { + for (IsaacCard card : cards) { + this.augmentChildContent(card, canonicalSourceFile, newParentId, parentPublished); + } } } @@ -465,8 +447,7 @@ private Content augmentChildContent(final Content content, final String canonica } } - if (content instanceof Media) { - Media media = (Media) content; + if (content instanceof Media media) { media.setSrc(fixMediaSrc(canonicalSourceFile, media.getSrc())); // for tracking purposes we want to generate an id for all image content objects. @@ -497,10 +478,9 @@ private void collateSearchableContent(final Content content, final StringBuilder } // Repeat the process for each child - if (!content.getChildren().isEmpty()) { + if (content.getChildren() != null && !content.getChildren().isEmpty()) { for (ContentBase childContentBase : content.getChildren()) { - if (childContentBase instanceof Content) { - Content child = (Content) childContentBase; + if (childContentBase instanceof Content child) { this.collateSearchableContent(child, searchableContentBuilder); } } @@ -515,7 +495,7 @@ private void collateSearchableContent(final Content content, final StringBuilder * @return src with relative paths fixed. */ private void augmentQuestionContent(final Question question, final String canonicalSourceFile, - final String newParentId, final boolean parentPublished) { + final String newParentId, final boolean parentPublished) { augmentHints(question, canonicalSourceFile, newParentId, parentPublished); augmentAnswerContent(question, canonicalSourceFile, newParentId, parentPublished); augmentFeedbackContent(question, canonicalSourceFile, newParentId, parentPublished); @@ -523,7 +503,7 @@ private void augmentQuestionContent(final Question question, final String canoni } private void augmentHints(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getHints() != null) { for (ContentBase cb : question.getHints()) { Content c = (Content) cb; @@ -533,7 +513,7 @@ private void augmentHints(final Question question, final String canonicalSourceF } private void augmentAnswerContent(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getAnswer() != null) { Content answer = (Content) question.getAnswer(); if (answer.getChildren() != null) { @@ -543,12 +523,22 @@ private void augmentAnswerContent(final Question question, final String canonica } } } + + if (question.getDefaultFeedback() != null) { + Content defaultFeedback = question.getDefaultFeedback(); + if (defaultFeedback.getChildren() != null) { + for (ContentBase cb : defaultFeedback.getChildren()) { + Content c = (Content) cb; + this.augmentChildContent(c, canonicalSourceFile, newParentId, parentPublished); + } + } + } } private void augmentFeedbackContent(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getDefaultFeedback() != null) { Content defaultFeedback = question.getDefaultFeedback(); if (defaultFeedback.getChildren() != null) { @@ -561,9 +551,8 @@ private void augmentFeedbackContent(final Question question, } private void augmentChoiceQuestionContent(final Question question, final String canonicalSourceFile, - final String newParentId, final boolean parentPublished) { - if (question instanceof ChoiceQuestion) { - ChoiceQuestion choiceQuestion = (ChoiceQuestion) question; + final String newParentId, final boolean parentPublished) { + if (question instanceof ChoiceQuestion choiceQuestion) { if (choiceQuestion.getChoices() != null) { for (ContentBase cb : choiceQuestion.getChoices()) { Content c = (Content) cb; @@ -626,7 +615,7 @@ private synchronized void registerContentProblem(final Content c, final String m indexProblemCache.put(c, new ArrayList<>()); } - log.warn(CONTENT_LOG_PREFIX + message); + log.warn(CONTENT_LOG_PREFIX + "{}", message); indexProblemCache.get(c).add(message); //.replace("_", "\\_")); } @@ -666,8 +655,7 @@ private synchronized void registerUnits(final IsaacNumericQuestion q, final Map< HashMap newUnits = Maps.newHashMap(); for (Choice c : q.getChoices()) { - if (c instanceof Quantity) { - Quantity quantity = (Quantity) c; + if (c instanceof Quantity quantity) { if (quantity.getUnits() != null && !quantity.getUnits().isEmpty()) { String units = quantity.getUnits(); @@ -702,11 +690,11 @@ private synchronized void registerUnits(final IsaacNumericQuestion q, final Map< * @param indexProblemCache a map of problems found in the indexed content */ public synchronized void buildElasticSearchIndex(final String sha, - final Map gitCache, - final Set tagsList, - final Map allUnits, - final Map publishedUnits, - final Map> indexProblemCache) { + final Map gitCache, + final Set tagsList, + final Map allUnits, + final Map publishedUnits, + final Map> indexProblemCache) { if (anyContentTypesAreIndexedForVersion(sha)) { expungeAnyContentTypeIndicesRelatedToVersion(sha); } @@ -723,7 +711,7 @@ public synchronized void buildElasticSearchIndex(final String sha, } catch (JsonProcessingException e) { log.error("Unable to serialize content object: {} for indexing with the search provider.", content.getId(), e); this.registerContentProblem(content, "Search Index Error: " + content.getId() - + content.getCanonicalSourceFile() + " Exception: " + e.toString(), indexProblemCache); + + content.getCanonicalSourceFile() + " Exception: " + e, indexProblemCache); } } @@ -784,10 +772,8 @@ public synchronized void buildElasticSearchIndex(final String sha, startTime = System.nanoTime(); es.bulkIndexWithIds(sha, ContentIndextype.CONTENT.toString(), contentToIndex); endTime = System.nanoTime(); - log.info(CONTENT_LOG_PREFIX + "Bulk indexed {} content objects to Elasticsearch for version {}", contentToIndex.size(), sanitiseInternalLogValue(sha)); - log.info( - CONTENT_LOG_PREFIX + "Bulk indexing content took: {}ms", (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); - log.info(CONTENT_LOG_PREFIX + "Search index request sent for: {}", sanitiseInternalLogValue(sha)); // -- Delete + log.debug(CONTENT_LOG_PREFIX + "Indexing completed in {}ms", + (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); } catch (SegueSearchException e) { log.error("Error whilst trying to perform bulk index operation.", e); } catch (ActionRequestValidationException e) { @@ -930,7 +916,7 @@ private String collateExpandableChildren(final Content content) { ret.append(null != child.getType() ? child.getType() : "undefined").append(","); } } - if (ret.length() > 0) { + if (!ret.isEmpty()) { ret.deleteCharAt(ret.length() - 1); } return ret.toString(); @@ -979,25 +965,25 @@ private void recordContentTypeSpecificError(final String sha, final Content cont private void registerContentProblemsClozeQuestionChoicesHaveWrongNumberOFItems( final Content content, final Map> indexProblemCache) { - if (content instanceof IsaacClozeQuestion) { - IsaacClozeQuestion q = (IsaacClozeQuestion) content; + if (content instanceof IsaacClozeQuestion q) { Integer numberItems = null; - for (Choice choice : q.getChoices()) { - if (choice instanceof ItemChoice) { - ItemChoice c = (ItemChoice) choice; - if (null == c.getItems() || c.getItems().isEmpty()) { - this.registerContentProblem(content, "Cloze Question: " + q.getId() + " has choice with missing items!", - indexProblemCache); - continue; - } - int items = c.getItems().size(); - if (numberItems != null && items != numberItems) { - this.registerContentProblem(content, - "Cloze Question: " + q.getId() + " has choice with incorrect number of items!" - + " (Expected " + numberItems + ", got " + items + "!)", indexProblemCache); - continue; + if (q.getChoices() != null) { + for (Choice choice : q.getChoices()) { + if (choice instanceof ItemChoice c) { + if (null == c.getItems() || c.getItems().isEmpty()) { + this.registerContentProblem(content, "Cloze Question: " + q.getId() + " has choice with missing items!", + indexProblemCache); + continue; + } + int items = c.getItems().size(); + if (numberItems != null && items != numberItems) { + this.registerContentProblem(content, + "Cloze Question: " + q.getId() + " has choice with incorrect number of items!" + + " (Expected " + numberItems + ", got " + items + "!)", indexProblemCache); + continue; + } + numberItems = items; } - numberItems = items; } } } @@ -1006,19 +992,22 @@ private void registerContentProblemsClozeQuestionChoicesHaveWrongNumberOFItems( private void registerContentProblemsSymbolicQuestionInvalidProperties( final Content content, final Map> indexProblemCache) { IsaacSymbolicQuestion question = (IsaacSymbolicQuestion) content; - for (String sym : question.getAvailableSymbols()) { - registerContentProblemQuestionSymbolContainsBackslash(content, indexProblemCache, question, sym); + if (question.getAvailableSymbols() != null) { + for (String sym : question.getAvailableSymbols()) { + registerContentProblemQuestionSymbolContainsBackslash(content, indexProblemCache, question, sym); + } } - for (Choice choice : question.getChoices()) { - if (choice instanceof Formula) { - Formula f = (Formula) choice; - if (f.getPythonExpression().contains("\\")) { - registerContentProblemQuestionFormulaContainsBackslash(content, indexProblemCache, question, choice); - } else if (f.getPythonExpression() == null || f.getPythonExpression().isEmpty()) { - registerContentProblemQuestionFormulaIsEmpty(content, indexProblemCache, question, choice); + if (question.getChoices() != null) { + for (Choice choice : question.getChoices()) { + if (choice instanceof Formula f) { + if (f.getPythonExpression().contains("\\")) { + registerContentProblemQuestionFormulaContainsBackslash(content, indexProblemCache, question, choice); + } else if (f.getPythonExpression() == null || f.getPythonExpression().isEmpty()) { + registerContentProblemQuestionFormulaIsEmpty(content, indexProblemCache, question, choice); + } + } else { + registerContentProblemSymbolicQuestionChoiceIsNotFormula(content, indexProblemCache, question, choice); } - } else { - registerContentProblemSymbolicQuestionChoiceIsNotFormula(content, indexProblemCache, question, choice); } } } @@ -1055,19 +1044,19 @@ private void registerContentProblemQuestionSymbolContainsBackslash( private void registerContentProblemsNumericQuestionInvalidChoicesOrUnits( final Content content, final Map> indexProblemCache) { - if (content instanceof IsaacNumericQuestion) { - IsaacNumericQuestion question = (IsaacNumericQuestion) content; - for (Choice choice : question.getChoices()) { - if (choice instanceof Quantity) { - Quantity quantity = (Quantity) choice; + if (content instanceof IsaacNumericQuestion question) { + if (question.getChoices() != null) { + for (Choice choice : question.getChoices()) { + if (choice instanceof Quantity quantity) { - registerContentProblemCannotParseQuantityChoiceAsNumber(content, indexProblemCache, question, quantity); + registerContentProblemCannotParseQuantityChoiceAsNumber(content, indexProblemCache, question, quantity); - registerContentProblemUnnecessaryQuantityChoiceUnits(content, indexProblemCache, question, quantity); + registerContentProblemUnnecessaryQuantityChoiceUnits(content, indexProblemCache, question, quantity); - } else { - registerContentProblemNumericQuestionChoiceIsNotQuantity(content, indexProblemCache, question, choice); + } else { + registerContentProblemNumericQuestionChoiceIsNotQuantity(content, indexProblemCache, question, choice); + } } } registerContentProblemConflictingUnitSettings(content, indexProblemCache, question); @@ -1151,11 +1140,12 @@ private void registerContentProblemChoiceQuestionMissingAnswer( for (Choice choice : question.getChoices()) { if (choice.isCorrect()) { correctOptionFound = true; + break; } } if (!correctOptionFound) { this.registerContentProblem(question, - "Question: " + question.getId() + " found without a correct answer. " + QUESTION + question.getId() + " found without a correct answer. " + "This question will always be automatically marked as incorrect", indexProblemCache); } } @@ -1163,22 +1153,21 @@ private void registerContentProblemChoiceQuestionMissingAnswer( private void registerContentProblemChoiceQuestionMissingChoices( final Map> indexProblemCache, final ChoiceQuestion question) { this.registerContentProblem(question, - "Question: " + question.getId() + " found without any choice metadata. " + QUESTION + question.getId() + " found without any choice metadata. " + "This question will always be automatically " + "marked as incorrect", indexProblemCache); } private void registerContentProblemQuestionMissingId( final Content content, final Map> indexProblemCache) { if (content instanceof Question && content.getId() == null) { - this.registerContentProblem(content, "Question: " + content.getTitle() + " in " + content.getCanonicalSourceFile() + this.registerContentProblem(content, QUESTION + content.getTitle() + " in " + content.getCanonicalSourceFile() + " found without a unique id. " + "This question cannot be logged correctly.", indexProblemCache); } } private void registerContentProblemsMediaInvalidProperties( final String sha, final Content content, final Map> indexProblemCache) { - if (content instanceof Media) { - Media media = (Media) content; + if (content instanceof Media media) { registerContentProblemMediaNotFoundOrTooLarge(sha, content, indexProblemCache, media); @@ -1189,14 +1178,12 @@ private void registerContentProblemsMediaInvalidProperties( private void registerContentProblemMediaMissingAltText( final Content content, final Map> indexProblemCache, final Media media) { - if (media.getAltText() == null || media.getAltText().isEmpty()) { - if (!(media instanceof Video) && !media.getId().equals("eventThumbnail")) { - // Videos probably don't need alt text unless there is a good reason. It's not important that event - // thumbnails have alt text, so we don't record errors for those either. - this.registerContentProblem(content, "No altText attribute set for media element: " + media.getSrc() - + " in Git source file " + content.getCanonicalSourceFile(), indexProblemCache); - } + if ((media.getAltText() == null || media.getAltText().isEmpty()) && !(media instanceof Video) && + !media.getId().equals("eventThumbnail")) { + this.registerContentProblem(content, "No altText attribute set for media element: " + media.getSrc() + + " in Git source file " + content.getCanonicalSourceFile(), indexProblemCache); } + } private void registerContentProblemMediaNotFoundOrTooLarge( From 81c47fb58222ac66aeea3be47cdec5c3ab5c70ab Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 6 May 2026 09:50:00 +0300 Subject: [PATCH 7/9] Implement Mailjet bulk sync with job tracking and error recovery --- .../api/managers/ExternalAccountManager.java | 228 ++++++++++++++++-- .../util/email/MailJetApiClientWrapper.java | 67 ++++- .../managers/ExternalAccountManagerTest.java | 49 +++- 3 files changed, 314 insertions(+), 30 deletions(-) diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index a9b611a178..cff33e128a 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import org.json.JSONException; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; @@ -30,13 +33,16 @@ import uk.ac.cam.cl.dtg.segue.dao.SegueDatabaseException; import uk.ac.cam.cl.dtg.segue.dao.users.IExternalAccountDataManager; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; +import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper.JobStatus; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; public class ExternalAccountManager implements IExternalAccountManager { private static final Logger log = LoggerFactory.getLogger(ExternalAccountManager.class); private static final String MAILJET = "MAILJET - "; - private static final int BULK_BATCH_SIZE = 1000; + private static final int BULK_BATCH_SIZE = 100; private static final int RATE_LIMIT_RETRY_SLEEP_MS = 10000; // 10 seconds + private static final int JOB_POLL_INTERVAL_MS = 5000; // 5 seconds between polls + private static final int JOB_POLL_MAX_ATTEMPTS = 60; // max 5 minutes (60 × 5s) private final IExternalAccountDataManager database; private final MailJetApiClientWrapper mailjetApi; @@ -73,14 +79,16 @@ public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchro SyncMetrics metrics = new SyncMetrics(); List successfullyProcessedUserIds = new ArrayList<>(); + List failedUsers = new ArrayList<>(); List usersToDelete = new ArrayList<>(); List usersToSync = new ArrayList<>(); partitionUsersByType(userRecordsToUpdate, usersToDelete, usersToSync); processDeletions(usersToDelete, metrics, successfullyProcessedUserIds); - processBulkSyncs(usersToSync, metrics, successfullyProcessedUserIds); + processBulkSyncsWithJobTracking(usersToSync, metrics, successfullyProcessedUserIds, failedUsers); markSuccessfullyProcessedAsSynced(successfullyProcessedUserIds, metrics); + logFailedUsers(failedUsers); logSyncSummary(metrics, userRecordsToUpdate.size()); } @@ -145,18 +153,34 @@ private void processDeletions(final List usersToDele } /** - * Process bulk syncs grouped by subscription state. - * Extracted to reduce cognitive complexity. + * Process bulk syncs with job tracking, polling, and per-user recovery. + * Groups users by subscription state, submits batches, polls for completion, + * and handles errors by verifying users individually at Mailjet. */ - private void processBulkSyncs(final List usersToSync, final SyncMetrics metrics, - final List successfullyProcessedUserIds) + private void processBulkSyncsWithJobTracking(final List usersToSync, + final SyncMetrics metrics, + final List successfullyProcessedUserIds, + final List failedUsers) throws ExternalAccountSynchronisationException { try { Map> groupedUsers = groupUsersBySubscriptionState(usersToSync); + List submittedJobs = new ArrayList<>(); for (Map.Entry> entry : groupedUsers.entrySet()) { - processBatchesForGroup(entry.getKey(), entry.getValue(), metrics, successfullyProcessedUserIds); + List groupJobs = submitBatchesForGroup(entry.getKey(), entry.getValue()); + submittedJobs.addAll(groupJobs); + } + + for (BatchJob batch : submittedJobs) { + Optional status = pollJobToCompletion(batch.jobId()); + if (status.isEmpty()) { + log.warn("{}Job {} timed out. Treating all {} users as failed.", MAILJET, batch.jobId(), batch.users().size()); + failedUsers.addAll(batch.users()); + metrics.incrementUnexpectedError(batch.users().size()); + continue; + } + processCompletedJob(batch, status.get(), successfullyProcessedUserIds, failedUsers, metrics); } } catch (ExternalAccountSynchronisationException e) { throw e; @@ -167,39 +191,180 @@ private void processBulkSyncs(final List usersToSync } /** - * Process batches for a subscription group. - * Extracted to reduce cognitive complexity. + * Submit batches for a subscription group and return list of submitted jobs. */ - private void processBatchesForGroup(final SubscriptionGroup group, - final List groupUsers, - final SyncMetrics metrics, - final List successfullyProcessedUserIds) + private List submitBatchesForGroup(final SubscriptionGroup group, + final List groupUsers) throws ExternalAccountSynchronisationException { + List submittedJobs = new ArrayList<>(); + for (int i = 0; i < groupUsers.size(); i += BULK_BATCH_SIZE) { int endIndex = Math.min(i + BULK_BATCH_SIZE, groupUsers.size()); List batch = groupUsers.subList(i, endIndex); try { - mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); - for (UserExternalAccountChanges user : batch) { - metrics.incrementSuccess(); - successfullyProcessedUserIds.add(user.getUserId()); + String jobId = mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); + if (jobId != null && !jobId.trim().isEmpty()) { + submittedJobs.add(new BatchJob(jobId, group, new ArrayList<>(batch))); + } else { + log.warn("{}Bulk sync returned null/empty job ID for {} users. Continuing.", MAILJET, batch.size()); } } catch (MailjetRateLimitException e) { - metrics.incrementRateLimitError(); - log.warn("{}Mailjet rate limit exceeded during bulk sync. Processed {} users so far.", - MAILJET, metrics.getSuccessCount()); - throw new ExternalAccountSynchronisationException( - "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users"); + log.warn("{}Mailjet rate limit exceeded during batch submission.", MAILJET); + throw new ExternalAccountSynchronisationException("Mailjet API rate limits exceeded: " + e.getMessage()); } catch (MailjetClientCommunicationException e) { - metrics.incrementCommunicationError(); + log.error("{}Communication error during batch submission for {} users.", MAILJET, batch.size(), e); throw new ExternalAccountSynchronisationException("Failed to connect to Mailjet: " + e.getMessage()); } catch (MailjetException e) { - metrics.incrementMailjetError(); - log.error("{}Mailjet API error during bulk sync of {} users. Continuing with next batch.", + log.error("{}Mailjet API error during batch submission of {} users. Continuing with next batch.", MAILJET, batch.size(), e); } } + + return submittedJobs; + } + + /** + * Poll a job until it completes or times out. + * Returns Optional.empty() if job times out; otherwise returns the final JobStatus. + */ + private Optional pollJobToCompletion(final String jobId) { + for (int attempt = 0; attempt < JOB_POLL_MAX_ATTEMPTS; attempt++) { + try { + JobStatus status = mailjetApi.getBulkJobStatus(jobId); + if (status.isComplete() || status.hasFailed()) { + log.debug("{}Job {} completed with status: {}", MAILJET, jobId, status.status()); + return Optional.of(status); + } + log.debug("{}Job {} in progress (attempt {}/{}). Processed: {}, Errors: {}", + MAILJET, jobId, attempt + 1, JOB_POLL_MAX_ATTEMPTS, status.processed(), status.errors()); + + Thread.sleep(JOB_POLL_INTERVAL_MS); + } catch (MailjetException e) { + log.warn("{}Error polling job {}: {}. Continuing with next attempt.", MAILJET, jobId, e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("{}Polling interrupted for job {}.", MAILJET, jobId); + return Optional.empty(); + } + } + + log.warn("{}Job {} timed out after {} attempts.", MAILJET, jobId, JOB_POLL_MAX_ATTEMPTS); + return Optional.empty(); + } + + /** + * Process a completed job: happy path (0 errors) or error path (recover per-user). + */ + private void processCompletedJob(final BatchJob batch, final JobStatus status, + final List successfullyProcessedUserIds, + final List failedUsers, + final SyncMetrics metrics) { + if (status.errors() == 0) { + log.info("{}Job {} completed successfully with {} users (inserted: {}, updated: {}, unchanged: {})", + MAILJET, batch.jobId(), batch.users().size(), status.inserted(), status.updated(), status.unchanged()); + for (UserExternalAccountChanges user : batch.users()) { + successfullyProcessedUserIds.add(user.getUserId()); + metrics.incrementSuccess(); + } + } else { + log.warn("{}Job {} completed with {} errors. Recovering per-user.", + MAILJET, batch.jobId(), status.errors()); + recoverUsersFromFailedJob(batch, successfullyProcessedUserIds, failedUsers, metrics); + } + } + + /** + * Recover users from a failed job by querying each user individually at Mailjet. + * Verifies that user data (name, role, verification status, stage) was correctly synced. + */ + private void recoverUsersFromFailedJob(final BatchJob batch, + final List successfullyProcessedUserIds, + final List failedUsers, + final SyncMetrics metrics) { + for (UserExternalAccountChanges user : batch.users()) { + try { + JSONObject mailjetContact = mailjetApi.getAccountByIdOrEmail(user.getAccountEmail()); + if (mailjetContact != null && isUserDataCorrectInMailjet(mailjetContact, user)) { + log.debug("{}User ID {} verified in Mailjet after job error.", MAILJET, user.getUserId()); + successfullyProcessedUserIds.add(user.getUserId()); + metrics.incrementSuccess(); + } else { + log.warn("{}User ID {} ({}) data mismatch or not found in Mailjet after job error.", + MAILJET, user.getUserId(), user.getAccountEmail()); + failedUsers.add(user); + metrics.incrementMailjetError(); + } + } catch (MailjetException e) { + log.warn("{}Failed to verify user ID {} ({}) at Mailjet: {}", + MAILJET, user.getUserId(), user.getAccountEmail(), e.getMessage()); + failedUsers.add(user); + metrics.incrementMailjetError(); + } + } + } + + /** + * Verify that user data in Mailjet matches what we expect. + * Checks: givenName, role, emailVerificationStatus, stage. + */ + private boolean isUserDataCorrectInMailjet(final JSONObject mailjetContact, + final UserExternalAccountChanges user) { + try { + String mailjetName = mailjetContact.optString("Name", ""); + String expectedName = user.getGivenName() != null ? user.getGivenName() : ""; + + JSONObject properties = mailjetContact.optJSONObject("Properties"); + if (properties == null) { + log.debug("{}No properties found in Mailjet contact for user ID {}.", MAILJET, user.getUserId()); + return false; + } + + String mailjetRole = properties.optString("role", ""); + String expectedRole = user.getRole().toString(); + + String mailjetVerification = properties.optString("verification_status", ""); + String expectedVerification = user.getEmailVerificationStatus().toString(); + + String mailjetStage = properties.optString("stage", ""); + String expectedStage = user.getStage() != null ? user.getStage() : "unknown"; + + boolean nameMatches = mailjetName.equals(expectedName); + boolean roleMatches = mailjetRole.equals(expectedRole); + boolean verificationMatches = mailjetVerification.equals(expectedVerification); + boolean stageMatches = mailjetStage.equals(expectedStage); + + if (!nameMatches || !roleMatches || !verificationMatches || !stageMatches) { + if (log.isDebugEnabled()) { + log.debug("{}User ID {} data mismatch: name ({}/{}) role ({}/{}) verification ({}/{}) stage ({}/{})", + MAILJET, user.getUserId(), + mailjetName, expectedName, + mailjetRole, expectedRole, + mailjetVerification, expectedVerification, + mailjetStage, expectedStage); + } + return false; + } + + return true; + + } catch (JSONException e) { + log.warn("{}JSON error checking user data for user ID {}: {}", MAILJET, user.getUserId(), e.getMessage()); + return false; + } + } + + /** + * Log failed users that could not be synced to Mailjet. + */ + private void logFailedUsers(final List failedUsers) { + if (failedUsers.isEmpty()) { + return; + } + log.warn("{}=== {} users failed Mailjet sync ===", MAILJET, failedUsers.size()); + for (UserExternalAccountChanges user : failedUsers) { + log.warn("{}Failed user - ID: {}, email: {}", MAILJET, user.getUserId(), user.getAccountEmail()); + } } /** @@ -381,6 +546,10 @@ void incrementUnexpectedError() { unexpectedErrorCount++; } + void incrementUnexpectedError(int count) { + unexpectedErrorCount += count; + } + int getSuccessCount() { return successCount; } @@ -460,4 +629,13 @@ public int hashCode() { return java.util.Objects.hash(newsAction, eventsAction); } } + + /** + * Record representing a submitted batch job with its users. + */ + private record BatchJob( + String jobId, + SubscriptionGroup group, + List users + ) {} } \ No newline at end of file diff --git a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java index bd0d3b05cf..f619bb710c 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java +++ b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java @@ -43,7 +43,7 @@ public class MailJetApiClientWrapper { private static final String MAILJET = "MAILJET - "; private static final String PROPERTY_VALUE_KEY = "value"; - private static final int BULK_BATCH_SIZE = 1000; + private static final int BULK_BATCH_SIZE = 100; private static final String ACTION = "Action"; private static final String LIST_ID = "ListID"; @@ -512,4 +512,69 @@ private String submitBulkSyncRequest(final MailjetRequest request, .replace("{}", String.valueOf(status))); } + /** + * Get the status of an async bulk sync job. + * + * @param jobId - the job ID returned from bulkSyncUsers + * @return a JobStatus record with job status details + * @throws MailjetException - if the API call fails + */ + public JobStatus getBulkJobStatus(final String jobId) throws MailjetException { + if (jobId == null || jobId.trim().isEmpty()) { + throw new IllegalArgumentException("Job ID cannot be null or empty"); + } + + try { + MailjetRequest request = new MailjetRequest(ContactManagemanycontacts.resource, Long.parseLong(jobId)); + MailjetResponse response = mailjetClient.get(request); + + if (response.getStatus() != 200) { + throw new MailjetException("Failed to fetch job status. Status: " + response.getStatus()); + } + + JSONObject jobData = response.getData().getJSONObject(0); + String status = jobData.optString("Status", "Unknown"); + int processed = jobData.optInt("Processed", 0); + int inserted = jobData.optInt("Inserted", 0); + int updated = jobData.optInt("Updated", 0); + int unchanged = jobData.optInt("Unchanged", 0); + int errors = jobData.optInt("Error", 0); + + return new JobStatus(status, processed, inserted, updated, unchanged, errors); + + } catch (NumberFormatException e) { + throw new MailjetException("Invalid job ID format: " + jobId, e); + + } catch (JSONException e) { + throw new MailjetException("JSON parsing error when fetching job status for job ID: " + jobId, e); + + } catch (MailjetException e) { + if (isCommunicationException(e)) { + String errorMsg = "Communication error fetching job status for job ID: " + jobId; + throw new MailjetClientCommunicationException(errorMsg, e); + } + throw e; + } + } + + /** + * Record representing the status of a bulk sync job. + */ + public record JobStatus( + String status, + int processed, + int inserted, + int updated, + int unchanged, + int errors + ) { + public boolean isComplete() { + return "Completed".equalsIgnoreCase(status); + } + + public boolean hasFailed() { + return "Error".equalsIgnoreCase(status); + } + } + } diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index 3bacf315a8..30ab649ce3 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -23,6 +23,7 @@ import uk.ac.cam.cl.dtg.segue.dao.SegueDatabaseException; import uk.ac.cam.cl.dtg.segue.dao.users.IExternalAccountDataManager; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; +import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper.JobStatus; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; class ExternalAccountManagerTest { @@ -42,7 +43,7 @@ public void setUp() { class SynchroniseChangedUsersTests { @Test - void synchroniseChangedUsers_WithBulkUsers_ShouldUseBulkApi() + void synchroniseChangedUsers_WithBulkUsers_ShouldSubmitAndPoll() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { // Arrange - users with same subscription preferences should be batched List changedUsers = List.of( @@ -59,6 +60,9 @@ void synchroniseChangedUsers_WithBulkUsers_ShouldUseBulkApi() expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + // Job completes successfully with no errors + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 2, 2, 0, 0, 0)); mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); @@ -90,9 +94,15 @@ void synchroniseChangedUsers_WithMixedSubscriptionPreferences_ShouldGroupByPrefe // First bulk call for group (FORCE_SUBSCRIBE, FORCE_SUBSCRIBE) expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 1, 0, 0, 0)); + // Second bulk call for group (FORCE_SUBSCRIBE, UNSUBSCRIBE) expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), eq(MailJetSubscriptionAction.UNSUBSCRIBE))).andReturn("job124"); + expect(mockMailjetApi.getBulkJobStatus("job124")) + .andReturn(new JobStatus("Completed", 1, 0, 1, 0, 0)); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); @@ -150,6 +160,8 @@ void synchroniseChangedUsers_WithDeliveryFailedUser_ShouldGroupAsRemove() // Delivery failed users should call with REMOVE for both news and events expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.REMOVE), eq(MailJetSubscriptionAction.REMOVE))).andReturn("job123"); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 1, 0)); mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); @@ -177,7 +189,6 @@ void synchroniseChangedUsers_WithEmptyUserList_ShouldReturnWithoutError() verify(mockDatabase, mockMailjetApi); } - @Test void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDatabaseException { // Arrange @@ -193,6 +204,38 @@ void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDat verify(mockDatabase); } + @Test + void synchroniseChangedUsers_WithJobErrors_ShouldRecoverPerUser() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - job completes but has errors + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Job has 1 error + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 0, 1)); + // Recovery: user not found in Mailjet after error + expect(mockMailjetApi.getAccountByIdOrEmail("test@example.com")) + .andReturn(null); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + @Test void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { @@ -267,7 +310,5 @@ void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() verify(mockDatabase, mockMailjetApi); } - - } } From 387ba8cb6b4aa4628a9ca835d1282f20cb4ace80 Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 6 May 2026 10:04:15 +0300 Subject: [PATCH 8/9] Add rate limit handling: fail fast on repeated rate limits during polling --- .../api/managers/ExternalAccountManager.java | 39 +++++++++--- .../managers/ExternalAccountManagerTest.java | 62 +++++++++++++++++++ 2 files changed, 93 insertions(+), 8 deletions(-) diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index cff33e128a..20fafa63e3 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -173,14 +173,19 @@ private void processBulkSyncsWithJobTracking(final List status = pollJobToCompletion(batch.jobId()); - if (status.isEmpty()) { - log.warn("{}Job {} timed out. Treating all {} users as failed.", MAILJET, batch.jobId(), batch.users().size()); - failedUsers.addAll(batch.users()); - metrics.incrementUnexpectedError(batch.users().size()); - continue; + try { + Optional status = pollJobToCompletion(batch.jobId()); + if (status.isEmpty()) { + log.warn("{}Job {} timed out. Treating all {} users as failed.", MAILJET, batch.jobId(), batch.users().size()); + failedUsers.addAll(batch.users()); + metrics.incrementUnexpectedError(batch.users().size()); + continue; + } + processCompletedJob(batch, status.get(), successfullyProcessedUserIds, failedUsers, metrics); + } catch (ExternalAccountSynchronisationException e) { + log.error("{}Job {} polling failed: {}", MAILJET, batch.jobId(), e.getMessage()); + throw e; } - processCompletedJob(batch, status.get(), successfullyProcessedUserIds, failedUsers, metrics); } } catch (ExternalAccountSynchronisationException e) { throw e; @@ -227,11 +232,17 @@ private List submitBatchesForGroup(final SubscriptionGroup group, /** * Poll a job until it completes or times out. * Returns Optional.empty() if job times out; otherwise returns the final JobStatus. + * Fails fast on repeated rate limiting (2+ consecutive rate limits). */ - private Optional pollJobToCompletion(final String jobId) { + private Optional pollJobToCompletion(final String jobId) + throws ExternalAccountSynchronisationException { + int consecutiveRateLimits = 0; + for (int attempt = 0; attempt < JOB_POLL_MAX_ATTEMPTS; attempt++) { try { JobStatus status = mailjetApi.getBulkJobStatus(jobId); + consecutiveRateLimits = 0; // Reset on successful poll + if (status.isComplete() || status.hasFailed()) { log.debug("{}Job {} completed with status: {}", MAILJET, jobId, status.status()); return Optional.of(status); @@ -240,7 +251,19 @@ private Optional pollJobToCompletion(final String jobId) { MAILJET, jobId, attempt + 1, JOB_POLL_MAX_ATTEMPTS, status.processed(), status.errors()); Thread.sleep(JOB_POLL_INTERVAL_MS); + } catch (MailjetRateLimitException e) { + consecutiveRateLimits++; + log.warn("{}Rate limit during job {} polling (attempt {}). Consecutive limits: {}", + MAILJET, jobId, attempt + 1, consecutiveRateLimits); + + if (consecutiveRateLimits >= 2) { + log.error("{}Job {} hit rate limit {} times. Failing fast to respect API limits.", + MAILJET, jobId, consecutiveRateLimits); + throw new ExternalAccountSynchronisationException( + "Mailjet API rate limit exceeded during job polling for job " + jobId); + } } catch (MailjetException e) { + consecutiveRateLimits = 0; // Reset on non-rate-limit errors log.warn("{}Error polling job {}: {}. Continuing with next attempt.", MAILJET, jobId, e.getMessage()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index 30ab649ce3..0c8b195304 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -310,5 +310,67 @@ void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() verify(mockDatabase, mockMailjetApi); } + + @Test + void synchroniseChangedUsers_WithSingleRateLimitDuringPolling_ShouldContinuePolling() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - single rate limit should be tolerated + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // First poll hits rate limit, second succeeds + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andThrow(new MailjetRateLimitException("Rate limit")); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 1, 0, 0)); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + + @Test + void synchroniseChangedUsers_WithRepeatedRateLimitDuringPolling_ShouldFailFast() + throws SegueDatabaseException, MailjetException { + // Arrange - 2+ consecutive rate limits should fail fast + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Both polls hit rate limit + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andThrow(new MailjetRateLimitException("Rate limit")) + .times(2); + + replay(mockDatabase, mockMailjetApi); + + // Act & Assert - should throw on 2nd consecutive rate limit + ExternalAccountSynchronisationException exception = assertThrows( + ExternalAccountSynchronisationException.class, + () -> externalAccountManager.synchroniseChangedUsers()); + + assertTrue(exception.getMessage().contains("rate limit")); + + verify(mockDatabase, mockMailjetApi); + } } } From 6d7984f5eeaca1315988a71ff68481b50cfe8b7d Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 6 May 2026 10:34:23 +0300 Subject: [PATCH 9/9] Fix and expand error recovery tests --- .../managers/ExternalAccountManagerTest.java | 46 +++++++++++++++++-- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index 0c8b195304..7e3eafa296 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -14,6 +14,7 @@ import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetRateLimitException; import java.util.List; +import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -205,9 +206,9 @@ void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDat } @Test - void synchroniseChangedUsers_WithJobErrors_ShouldRecoverPerUser() + void synchroniseChangedUsers_WithJobErrorsButUserDataCorrect_ShouldMarkAsSynced() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - job completes but has errors + // Arrange - job completes but has errors, but user data is correct at Mailjet List changedUsers = List.of( new UserExternalAccountChanges( 1L, null, "test@example.com", Role.STUDENT, "John", false, @@ -221,9 +222,15 @@ void synchroniseChangedUsers_WithJobErrors_ShouldRecoverPerUser() // Job has 1 error expect(mockMailjetApi.getBulkJobStatus("job123")) .andReturn(new JobStatus("Completed", 1, 0, 0, 0, 1)); - // Recovery: user not found in Mailjet after error + // Recovery: user data is correct in Mailjet despite job error + JSONObject mailjetContact = new JSONObject() + .put("Name", "John") + .put("Properties", new JSONObject() + .put("role", "STUDENT") + .put("verification_status", "VERIFIED") + .put("stage", "GCSE")); expect(mockMailjetApi.getAccountByIdOrEmail("test@example.com")) - .andReturn(null); + .andReturn(mailjetContact); mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); @@ -236,6 +243,37 @@ void synchroniseChangedUsers_WithJobErrors_ShouldRecoverPerUser() verify(mockDatabase, mockMailjetApi); } + @Test + void synchroniseChangedUsers_WithJobErrorsAndUserNotFound_ShouldNotSyncUser() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - job completes with errors and user not found in Mailjet + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Job has 1 error + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 0, 1)); + // Recovery: user not found in Mailjet after error - treated as failed + expect(mockMailjetApi.getAccountByIdOrEmail("test@example.com")) + .andReturn(null); + // batchMarkAsSynced not called since user failed recovery (empty list) + + replay(mockDatabase, mockMailjetApi); + + // Act + externalAccountManager.synchroniseChangedUsers(); + + // Assert + verify(mockDatabase, mockMailjetApi); + } + @Test void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException {