diff --git a/config-templates/windows--local-dev-segue-config.properties b/config-templates/windows--local-dev-segue-config.properties index 5541bbbd96..8a88a840ee 100644 --- a/config-templates/windows--local-dev-segue-config.properties +++ b/config-templates/windows--local-dev-segue-config.properties @@ -19,6 +19,11 @@ EMAIL_SIGNATURE=Isaac Physics Project EVENT_ADMIN_EMAIL=events@isaacphysics.org EVENT_ICAL_UID_DOMAIN=isaacphysics.org +# CORS Configuration (ALB Migration) +# Allowed origins for CORS requests. --- MMM Update for production +# Format: comma-separated list of origins or wildcard patterns (e.g., https://*.isaaccomputerscience.org) +CORS_ALLOWED_ORIGINS=https://*.isaaccomputerscience.org + SCHOOL_CSV_LIST_PATH=C:\\dev\\isaac-other-resources\\schools_list_2026_spring.csv # Segue diff --git a/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java b/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java index d4f6edb0d7..1868fb2164 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java +++ b/src/main/java/uk/ac/cam/cl/dtg/isaac/configuration/IsaacApplicationRegister.java @@ -47,6 +47,7 @@ import uk.ac.cam.cl.dtg.segue.api.AuthenticationFacade; import uk.ac.cam.cl.dtg.segue.api.AuthorisationFacade; import uk.ac.cam.cl.dtg.segue.api.ContactFacade; +import uk.ac.cam.cl.dtg.segue.api.CorsFilter; import uk.ac.cam.cl.dtg.segue.api.EmailFacade; import uk.ac.cam.cl.dtg.segue.api.ExceptionSanitiser; import uk.ac.cam.cl.dtg.segue.api.GlossaryFacade; @@ -127,6 +128,7 @@ public final Set getSingletons() { this.singletons.add(injector.getInstance(QuizFacade.class)); // initialise filters + this.singletons.add(injector.getInstance(CorsFilter.class)); this.singletons.add(injector.getInstance(PerformanceMonitor.class)); this.singletons.add(injector.getInstance(SessionValidator.class)); this.singletons.add(injector.getInstance(ExceptionSanitiser.class)); diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java new file mode 100644 index 0000000000..c7b6e9d192 --- /dev/null +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/CorsFilter.java @@ -0,0 +1,88 @@ +package uk.ac.cam.cl.dtg.segue.api; + +import com.google.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.container.ContainerResponseContext; +import jakarta.ws.rs.container.ContainerResponseFilter; +import jakarta.ws.rs.container.PreMatching; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.Provider; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.ac.cam.cl.dtg.util.PropertiesLoader; + +/** + * CORS Filter for ALB migration. + * Emits CORS headers from the application instead of relying on nginx ingress annotations. + * This allows the API to work with AWS ALB which doesn't have a built-in CORS module. + */ +@Provider +@PreMatching +public class CorsFilter implements ContainerRequestFilter, ContainerResponseFilter { + + private static final Logger log = LoggerFactory.getLogger(CorsFilter.class); + + private static final String ALLOWED_ORIGINS_PROPERTY = "CORS_ALLOWED_ORIGINS"; + private static final String DEFAULT_ALLOWED_ORIGINS = "https://*.isaaccomputerscience.org"; + + private final String allowedOrigins; + + @Inject + public CorsFilter(final PropertiesLoader properties) { + String configuredOrigins = properties.getProperty(ALLOWED_ORIGINS_PROPERTY); + this.allowedOrigins = configuredOrigins != null ? configuredOrigins : DEFAULT_ALLOWED_ORIGINS; + } + + @Override + public void filter(final ContainerRequestContext requestContext) throws IOException { + if (requestContext.getMethod().equalsIgnoreCase("OPTIONS")) { + requestContext.abortWith( + Response.ok() + .header("Access-Control-Allow-Origin", getAllowedOrigin(requestContext)) + .header("Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS, PATCH") + .header("Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept, Authorization, X-Api-Token") + .header("Access-Control-Max-Age", "3600") + .build()); + } + } + + @Override + public void filter(final ContainerRequestContext requestContext, + final ContainerResponseContext responseContext) throws IOException { + String allowedOrigin = getAllowedOrigin(requestContext); + responseContext.getHeaders().add("Access-Control-Allow-Origin", allowedOrigin); + responseContext.getHeaders().add("Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS, PATCH"); + responseContext.getHeaders().add("Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept, Authorization, X-Api-Token"); + responseContext.getHeaders().add("Access-Control-Allow-Credentials", "true"); + } + + /** + * Validate and return the allowed origin from the request. + * Currently, allows all requests from Isaac domains (*.isaaccomputerscience.org). + * More sophisticated --- MMM ??? + * + * @param requestContext the request context + * @return the allowed origin, or * if validation fails + */ + private String getAllowedOrigin(final ContainerRequestContext requestContext) { + String origin = requestContext.getHeaderString("Origin"); + + if (origin == null) { + return allowedOrigins; + } + + // For simplicity, allow any Isaac domain --- MMM ??? + // The allowedOrigins property can be configured as a regex or comma-separated list if needed. + if (origin.contains("isaaccomputerscience.org") || origin.contains("localhost")) { + return origin; + } + + return allowedOrigins; + } +} \ No newline at end of file diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java index 7ac095e882..20fafa63e3 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManager.java @@ -19,7 +19,12 @@ import com.mailjet.client.errors.MailjetClientCommunicationException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetRateLimitException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,10 +33,16 @@ import uk.ac.cam.cl.dtg.segue.dao.SegueDatabaseException; import uk.ac.cam.cl.dtg.segue.dao.users.IExternalAccountDataManager; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; +import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper.JobStatus; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; public class ExternalAccountManager implements IExternalAccountManager { private static final Logger log = LoggerFactory.getLogger(ExternalAccountManager.class); + private static final String MAILJET = "MAILJET - "; + private static final int BULK_BATCH_SIZE = 100; + private static final int RATE_LIMIT_RETRY_SLEEP_MS = 10000; // 10 seconds + private static final int JOB_POLL_INTERVAL_MS = 5000; // 5 seconds between polls + private static final int JOB_POLL_MAX_ATTEMPTS = 60; // max 5 minutes (60 × 5s) private final IExternalAccountDataManager database; private final MailJetApiClientWrapper mailjetApi; @@ -58,193 +69,341 @@ public ExternalAccountManager(final MailJetApiClientWrapper mailjetApi, final IE */ @Override public synchronized void synchroniseChangedUsers() throws ExternalAccountSynchronisationException { - log.info("Starting Mailjet synchronization process"); + log.info("{}Starting Mailjet synchronization process", MAILJET); - List userRecordsToUpdate; + List userRecordsToUpdate = getRecentlyChangedUsersOrThrow(); + if (userRecordsToUpdate.isEmpty()) { + log.info("{}No users to synchronize", MAILJET); + return; + } + + SyncMetrics metrics = new SyncMetrics(); + List successfullyProcessedUserIds = new ArrayList<>(); + List failedUsers = new ArrayList<>(); + + List usersToDelete = new ArrayList<>(); + List usersToSync = new ArrayList<>(); + partitionUsersByType(userRecordsToUpdate, usersToDelete, usersToSync); + + processDeletions(usersToDelete, metrics, successfullyProcessedUserIds); + processBulkSyncsWithJobTracking(usersToSync, metrics, successfullyProcessedUserIds, failedUsers); + markSuccessfullyProcessedAsSynced(successfullyProcessedUserIds, metrics); + logFailedUsers(failedUsers); + + logSyncSummary(metrics, userRecordsToUpdate.size()); + } + + /** + * Retrieve recently changed users from database. + * Extracted to reduce cognitive complexity. + */ + private List getRecentlyChangedUsersOrThrow() + throws ExternalAccountSynchronisationException { try { - userRecordsToUpdate = database.getRecentlyChangedRecords(); - log.info("Found {} users to synchronize with Mailjet", userRecordsToUpdate.size()); + List users = database.getRecentlyChangedRecords(); + log.info("{}Found {} users to synchronize with Mailjet", MAILJET, users.size()); + return users; } catch (SegueDatabaseException e) { throw new ExternalAccountSynchronisationException("Failed to retrieve users for synchronization" + e.getMessage()); } + } - if (userRecordsToUpdate.isEmpty()) { - log.info("No users to synchronize"); - return; + /** + * Partition users into deletions and syncs. + * Extracted to reduce cognitive complexity. + */ + private void partitionUsersByType(final List userRecords, + final List usersToDelete, + final List usersToSync) { + for (UserExternalAccountChanges userRecord : userRecords) { + if (Boolean.TRUE.equals(userRecord.isDeleted()) && userRecord.getProviderUserId() != null) { + usersToDelete.add(userRecord); + } else { + usersToSync.add(userRecord); + } } + } - SyncMetrics metrics = new SyncMetrics(); - - for (UserExternalAccountChanges userRecord : userRecordsToUpdate) { - Long userId = userRecord.getUserId(); - + /** + * Process user deletions with error handling and backoff. + * Extracted to reduce cognitive complexity. + */ + private void processDeletions(final List usersToDelete, final SyncMetrics metrics, + final List successfullyProcessedUserIds) + throws ExternalAccountSynchronisationException { + for (UserExternalAccountChanges userRecord : usersToDelete) { try { - processUserSync(userRecord, metrics); - metrics.incrementSuccess(); - + deleteUserFromMailJetWithBackoff(userRecord.getProviderUserId(), userRecord); + database.updateProviderLastUpdated(userRecord.getUserId()); + metrics.incrementDeleted(); + successfullyProcessedUserIds.add(userRecord.getUserId()); } catch (SegueDatabaseException e) { metrics.incrementDatabaseError(); - log.error("Database error storing Mailjet update for user ID: {}", userId, e); + log.error("{}Database error during deletion for user ID: {}", MAILJET, userRecord.getUserId(), e); } catch (MailjetClientCommunicationException e) { metrics.incrementCommunicationError(); throw new ExternalAccountSynchronisationException("Failed to connect to Mailjet: " + e.getMessage()); - } catch (MailjetRateLimitException e) { - metrics.incrementRateLimitError(); - log.warn("Mailjet rate limit exceeded while processing user ID: {}. Processed {} users before limit", - userId, metrics.getSuccessCount()); - throw new ExternalAccountSynchronisationException( - "Mailjet API rate limits exceeded after processing " + metrics.getSuccessCount() + " users" + e); - } catch (MailjetException e) { metrics.incrementMailjetError(); - log.error("Mailjet API error while processing user ID: {}. Continuing with next user", userId, e); - } catch (Exception e) { - metrics.incrementUnexpectedError(); - log.error("Unexpected error processing user ID: {}", userId, e); + log.error("{}Mailjet API error during deletion for user ID: {}. Continuing.", + MAILJET, userRecord.getUserId(), e); } } - - logSyncSummary(metrics, userRecordsToUpdate.size()); } /** - * Process synchronization for a single user. + * Process bulk syncs with job tracking, polling, and per-user recovery. + * Groups users by subscription state, submits batches, polls for completion, + * and handles errors by verifying users individually at Mailjet. */ - private void processUserSync(UserExternalAccountChanges userRecord, SyncMetrics metrics) - throws SegueDatabaseException, MailjetException { + private void processBulkSyncsWithJobTracking(final List usersToSync, + final SyncMetrics metrics, + final List successfullyProcessedUserIds, + final List failedUsers) + throws ExternalAccountSynchronisationException { + try { + Map> groupedUsers = + groupUsersBySubscriptionState(usersToSync); - Long userId = userRecord.getUserId(); - String accountEmail = userRecord.getAccountEmail(); + List submittedJobs = new ArrayList<>(); + for (Map.Entry> entry : groupedUsers.entrySet()) { + List groupJobs = submitBatchesForGroup(entry.getKey(), entry.getValue()); + submittedJobs.addAll(groupJobs); + } - if (accountEmail == null || accountEmail.trim().isEmpty()) { - log.warn("User ID {} has null or empty email address. Skipping", userId); - metrics.incrementSkipped(); - return; + for (BatchJob batch : submittedJobs) { + try { + Optional status = pollJobToCompletion(batch.jobId()); + if (status.isEmpty()) { + log.warn("{}Job {} timed out. Treating all {} users as failed.", MAILJET, batch.jobId(), batch.users().size()); + failedUsers.addAll(batch.users()); + metrics.incrementUnexpectedError(batch.users().size()); + continue; + } + processCompletedJob(batch, status.get(), successfullyProcessedUserIds, failedUsers, metrics); + } catch (ExternalAccountSynchronisationException e) { + log.error("{}Job {} polling failed: {}", MAILJET, batch.jobId(), e.getMessage()); + throw e; + } + } + } catch (ExternalAccountSynchronisationException e) { + throw e; + } catch (Exception e) { + metrics.incrementUnexpectedError(); + log.error("{}Unexpected error during bulk sync", MAILJET, e); } + } - boolean accountEmailDeliveryFailed = - EmailVerificationStatus.DELIVERY_FAILED.equals(userRecord.getEmailVerificationStatus()); - String mailjetId = userRecord.getProviderUserId(); + /** + * Submit batches for a subscription group and return list of submitted jobs. + */ + private List submitBatchesForGroup(final SubscriptionGroup group, + final List groupUsers) + throws ExternalAccountSynchronisationException { + List submittedJobs = new ArrayList<>(); - if (mailjetId != null && !mailjetId.trim().isEmpty()) { - handleExistingMailjetUser(mailjetId, userRecord, accountEmail, accountEmailDeliveryFailed, metrics); - } else { - handleNewMailjetUser(userRecord, accountEmail, accountEmailDeliveryFailed, metrics); + for (int i = 0; i < groupUsers.size(); i += BULK_BATCH_SIZE) { + int endIndex = Math.min(i + BULK_BATCH_SIZE, groupUsers.size()); + List batch = groupUsers.subList(i, endIndex); + + try { + String jobId = mailjetApi.bulkSyncUsers(batch, group.newsAction, group.eventsAction); + if (jobId != null && !jobId.trim().isEmpty()) { + submittedJobs.add(new BatchJob(jobId, group, new ArrayList<>(batch))); + } else { + log.warn("{}Bulk sync returned null/empty job ID for {} users. Continuing.", MAILJET, batch.size()); + } + } catch (MailjetRateLimitException e) { + log.warn("{}Mailjet rate limit exceeded during batch submission.", MAILJET); + throw new ExternalAccountSynchronisationException("Mailjet API rate limits exceeded: " + e.getMessage()); + } catch (MailjetClientCommunicationException e) { + log.error("{}Communication error during batch submission for {} users.", MAILJET, batch.size(), e); + throw new ExternalAccountSynchronisationException("Failed to connect to Mailjet: " + e.getMessage()); + } catch (MailjetException e) { + log.error("{}Mailjet API error during batch submission of {} users. Continuing with next batch.", + MAILJET, batch.size(), e); + } } - database.updateProviderLastUpdated(userId); + return submittedJobs; } /** - * Handle synchronization for users that already exist in Mailjet. + * Poll a job until it completes or times out. + * Returns Optional.empty() if job times out; otherwise returns the final JobStatus. + * Fails fast on repeated rate limiting (2+ consecutive rate limits). */ - private void handleExistingMailjetUser(String mailjetId, UserExternalAccountChanges userRecord, - String accountEmail, boolean accountEmailDeliveryFailed, SyncMetrics metrics) - throws SegueDatabaseException, MailjetException { + private Optional pollJobToCompletion(final String jobId) + throws ExternalAccountSynchronisationException { + int consecutiveRateLimits = 0; - Long userId = userRecord.getUserId(); - JSONObject mailjetDetails = mailjetApi.getAccountByIdOrEmail(mailjetId); + for (int attempt = 0; attempt < JOB_POLL_MAX_ATTEMPTS; attempt++) { + try { + JobStatus status = mailjetApi.getBulkJobStatus(jobId); + consecutiveRateLimits = 0; // Reset on successful poll - if (mailjetDetails == null) { - log.warn("User ID {} has Mailjet ID {} but account not found. Treating as new user", userId, mailjetId); - database.updateExternalAccount(userId, null); - handleNewMailjetUser(userRecord, accountEmail, accountEmailDeliveryFailed, metrics); - return; - } + if (status.isComplete() || status.hasFailed()) { + log.debug("{}Job {} completed with status: {}", MAILJET, jobId, status.status()); + return Optional.of(status); + } + log.debug("{}Job {} in progress (attempt {}/{}). Processed: {}, Errors: {}", + MAILJET, jobId, attempt + 1, JOB_POLL_MAX_ATTEMPTS, status.processed(), status.errors()); - if (userRecord.isDeleted()) { - deleteUserFromMailJet(mailjetId, userRecord); - metrics.incrementDeleted(); - - } else if (accountEmailDeliveryFailed) { - log.info("User ID {} has delivery failed status. Unsubscribing from all lists", userId); - mailjetApi.updateUserSubscriptions(mailjetId, - MailJetSubscriptionAction.REMOVE, - MailJetSubscriptionAction.REMOVE); - metrics.incrementUnsubscribed(); - - } else if (!accountEmail.equalsIgnoreCase(mailjetDetails.getString("Email"))) { - log.info("User ID {} changed email. Recreating Mailjet account", userId); - mailjetApi.permanentlyDeleteAccountById(mailjetId); - String newMailjetId = mailjetApi.addNewUserOrGetUserIfExists(accountEmail); - - if (newMailjetId == null) { - throw new MailjetException("Failed to create new Mailjet account after email change for user: " + userId); + Thread.sleep(JOB_POLL_INTERVAL_MS); + } catch (MailjetRateLimitException e) { + consecutiveRateLimits++; + log.warn("{}Rate limit during job {} polling (attempt {}). Consecutive limits: {}", + MAILJET, jobId, attempt + 1, consecutiveRateLimits); + + if (consecutiveRateLimits >= 2) { + log.error("{}Job {} hit rate limit {} times. Failing fast to respect API limits.", + MAILJET, jobId, consecutiveRateLimits); + throw new ExternalAccountSynchronisationException( + "Mailjet API rate limit exceeded during job polling for job " + jobId); + } + } catch (MailjetException e) { + consecutiveRateLimits = 0; // Reset on non-rate-limit errors + log.warn("{}Error polling job {}: {}. Continuing with next attempt.", MAILJET, jobId, e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("{}Polling interrupted for job {}.", MAILJET, jobId); + return Optional.empty(); } + } - updateUserOnMailJet(newMailjetId, userRecord); - metrics.incrementEmailChanged(); + log.warn("{}Job {} timed out after {} attempts.", MAILJET, jobId, JOB_POLL_MAX_ATTEMPTS); + return Optional.empty(); + } + /** + * Process a completed job: happy path (0 errors) or error path (recover per-user). + */ + private void processCompletedJob(final BatchJob batch, final JobStatus status, + final List successfullyProcessedUserIds, + final List failedUsers, + final SyncMetrics metrics) { + if (status.errors() == 0) { + log.info("{}Job {} completed successfully with {} users (inserted: {}, updated: {}, unchanged: {})", + MAILJET, batch.jobId(), batch.users().size(), status.inserted(), status.updated(), status.unchanged()); + for (UserExternalAccountChanges user : batch.users()) { + successfullyProcessedUserIds.add(user.getUserId()); + metrics.incrementSuccess(); + } } else { - updateUserOnMailJet(mailjetId, userRecord); - metrics.incrementUpdated(); + log.warn("{}Job {} completed with {} errors. Recovering per-user.", + MAILJET, batch.jobId(), status.errors()); + recoverUsersFromFailedJob(batch, successfullyProcessedUserIds, failedUsers, metrics); } } /** - * Handle synchronization for users that don't exist in Mailjet yet. + * Recover users from a failed job by querying each user individually at Mailjet. + * Verifies that user data (name, role, verification status, stage) was correctly synced. */ - private void handleNewMailjetUser(UserExternalAccountChanges userRecord, - String accountEmail, boolean accountEmailDeliveryFailed, SyncMetrics metrics) - throws SegueDatabaseException, MailjetException { - - Long userId = userRecord.getUserId(); + private void recoverUsersFromFailedJob(final BatchJob batch, + final List successfullyProcessedUserIds, + final List failedUsers, + final SyncMetrics metrics) { + for (UserExternalAccountChanges user : batch.users()) { + try { + JSONObject mailjetContact = mailjetApi.getAccountByIdOrEmail(user.getAccountEmail()); + if (mailjetContact != null && isUserDataCorrectInMailjet(mailjetContact, user)) { + log.debug("{}User ID {} verified in Mailjet after job error.", MAILJET, user.getUserId()); + successfullyProcessedUserIds.add(user.getUserId()); + metrics.incrementSuccess(); + } else { + log.warn("{}User ID {} ({}) data mismatch or not found in Mailjet after job error.", + MAILJET, user.getUserId(), user.getAccountEmail()); + failedUsers.add(user); + metrics.incrementMailjetError(); + } + } catch (MailjetException e) { + log.warn("{}Failed to verify user ID {} ({}) at Mailjet: {}", + MAILJET, user.getUserId(), user.getAccountEmail(), e.getMessage()); + failedUsers.add(user); + metrics.incrementMailjetError(); + } + } + } - if (!accountEmailDeliveryFailed && !userRecord.isDeleted()) { - log.info("Creating new Mailjet account for user ID {}", userId); + /** + * Verify that user data in Mailjet matches what we expect. + * Checks: givenName, role, emailVerificationStatus, stage. + */ + private boolean isUserDataCorrectInMailjet(final JSONObject mailjetContact, + final UserExternalAccountChanges user) { + try { + String mailjetName = mailjetContact.optString("Name", ""); + String expectedName = user.getGivenName() != null ? user.getGivenName() : ""; - String mailjetId = mailjetApi.addNewUserOrGetUserIfExists(accountEmail); + JSONObject properties = mailjetContact.optJSONObject("Properties"); + if (properties == null) { + log.debug("{}No properties found in Mailjet contact for user ID {}.", MAILJET, user.getUserId()); + return false; + } - if (mailjetId == null) { - throw new MailjetException("Mailjet returned null ID when creating account for user: " + userId); + String mailjetRole = properties.optString("role", ""); + String expectedRole = user.getRole().toString(); + + String mailjetVerification = properties.optString("verification_status", ""); + String expectedVerification = user.getEmailVerificationStatus().toString(); + + String mailjetStage = properties.optString("stage", ""); + String expectedStage = user.getStage() != null ? user.getStage() : "unknown"; + + boolean nameMatches = mailjetName.equals(expectedName); + boolean roleMatches = mailjetRole.equals(expectedRole); + boolean verificationMatches = mailjetVerification.equals(expectedVerification); + boolean stageMatches = mailjetStage.equals(expectedStage); + + if (!nameMatches || !roleMatches || !verificationMatches || !stageMatches) { + if (log.isDebugEnabled()) { + log.debug("{}User ID {} data mismatch: name ({}/{}) role ({}/{}) verification ({}/{}) stage ({}/{})", + MAILJET, user.getUserId(), + mailjetName, expectedName, + mailjetRole, expectedRole, + mailjetVerification, expectedVerification, + mailjetStage, expectedStage); + } + return false; } - updateUserOnMailJet(mailjetId, userRecord); - metrics.incrementCreated(); + return true; - } else { - log.debug("User ID {} not eligible for Mailjet (deleted={}, deliveryFailed={}). Skipping", - userId, userRecord.isDeleted(), accountEmailDeliveryFailed); - database.updateExternalAccount(userId, null); - metrics.incrementSkipped(); + } catch (JSONException e) { + log.warn("{}JSON error checking user data for user ID {}: {}", MAILJET, user.getUserId(), e.getMessage()); + return false; } } /** - * Update user details and subscriptions in Mailjet. + * Log failed users that could not be synced to Mailjet. */ - private void updateUserOnMailJet(final String mailjetId, final UserExternalAccountChanges userRecord) - throws SegueDatabaseException, MailjetException { - - if (mailjetId == null || mailjetId.trim().isEmpty()) { - throw new IllegalArgumentException("Mailjet ID cannot be null or empty"); + private void logFailedUsers(final List failedUsers) { + if (failedUsers.isEmpty()) { + return; } + log.warn("{}=== {} users failed Mailjet sync ===", MAILJET, failedUsers.size()); + for (UserExternalAccountChanges user : failedUsers) { + log.warn("{}Failed user - ID: {}, email: {}", MAILJET, user.getUserId(), user.getAccountEmail()); + } + } - Long userId = userRecord.getUserId(); - String stage = userRecord.getStage() != null ? userRecord.getStage() : "unknown"; - - mailjetApi.updateUserProperties( - mailjetId, - userRecord.getGivenName(), - userRecord.getRole().toString(), - userRecord.getEmailVerificationStatus().toString(), - stage - ); - - MailJetSubscriptionAction newsStatus = Boolean.TRUE.equals(userRecord.allowsNewsEmails()) - ? MailJetSubscriptionAction.FORCE_SUBSCRIBE - : MailJetSubscriptionAction.UNSUBSCRIBE; - - MailJetSubscriptionAction eventsStatus = Boolean.TRUE.equals(userRecord.allowsEventsEmails()) - ? MailJetSubscriptionAction.FORCE_SUBSCRIBE - : MailJetSubscriptionAction.UNSUBSCRIBE; - - mailjetApi.updateUserSubscriptions(mailjetId, newsStatus, eventsStatus); - database.updateExternalAccount(userId, mailjetId); - - log.debug("Updated Mailjet account {} for user ID {} (news={}, events={})", - mailjetId, userId, newsStatus, eventsStatus); + /** + * Mark successfully processed users as synced in database. + * Extracted to reduce cognitive complexity. + */ + private void markSuccessfullyProcessedAsSynced(final List successfullyProcessedUserIds, + final SyncMetrics metrics) { + try { + if (!successfullyProcessedUserIds.isEmpty()) { + database.batchMarkAsSynced(successfullyProcessedUserIds); + } + } catch (SegueDatabaseException e) { + metrics.incrementDatabaseError(); + log.error("{}Database error marking {} users as synced", MAILJET, successfullyProcessedUserIds.size(), e); + } } /** @@ -254,7 +413,7 @@ private void deleteUserFromMailJet(final String mailjetId, final UserExternalAcc throws SegueDatabaseException, MailjetException { if (mailjetId == null || mailjetId.trim().isEmpty()) { - log.warn("Attempted to delete user with null/empty Mailjet ID. User ID: {}", userRecord.getUserId()); + log.warn("{}Attempted to delete user with null/empty Mailjet ID. User ID: {}", MAILJET, userRecord.getUserId()); return; } @@ -262,29 +421,87 @@ private void deleteUserFromMailJet(final String mailjetId, final UserExternalAcc mailjetApi.permanentlyDeleteAccountById(mailjetId); database.updateExternalAccount(userId, null); - log.info("Deleted Mailjet account {} for user ID {} (GDPR deletion)", mailjetId, userId); + log.info("{}Deleted Mailjet account {} for user ID {} (GDPR deletion)", MAILJET, mailjetId, userId); + } + + /** + * Delete user from Mailjet with exponential backoff on rate limit. + */ + private void deleteUserFromMailJetWithBackoff(final String mailjetId, + final UserExternalAccountChanges userRecord) + throws SegueDatabaseException, MailjetException { + try { + deleteUserFromMailJet(mailjetId, userRecord); + } catch (MailjetRateLimitException e) { + log.warn("{}Rate limit on deletion, retrying after backoff...", MAILJET); + try { + Thread.sleep(RATE_LIMIT_RETRY_SLEEP_MS); + deleteUserFromMailJet(mailjetId, userRecord); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new MailjetException("Interrupted during deletion backoff", ie); + } + } + } + + /** + * Group users by subscription state combination. + */ + private Map> + groupUsersBySubscriptionState(final List users) { + Map> groups = new HashMap<>(); + + for (UserExternalAccountChanges user : users) { + if (user.getAccountEmail() == null || user.getAccountEmail().trim().isEmpty()) { + log.warn("{}User ID {} has null/empty email. Skipping.", MAILJET, user.getUserId()); + continue; + } + + boolean deliveryFailed = + EmailVerificationStatus.DELIVERY_FAILED.equals(user.getEmailVerificationStatus()); + + MailJetSubscriptionAction newsAction; + MailJetSubscriptionAction eventsAction; + + if (deliveryFailed) { + newsAction = MailJetSubscriptionAction.REMOVE; + eventsAction = MailJetSubscriptionAction.REMOVE; + } else { + newsAction = Boolean.TRUE.equals(user.allowsNewsEmails()) + ? MailJetSubscriptionAction.FORCE_SUBSCRIBE + : MailJetSubscriptionAction.UNSUBSCRIBE; + eventsAction = Boolean.TRUE.equals(user.allowsEventsEmails()) + ? MailJetSubscriptionAction.FORCE_SUBSCRIBE + : MailJetSubscriptionAction.UNSUBSCRIBE; + } + + SubscriptionGroup group = new SubscriptionGroup(newsAction, eventsAction); + groups.computeIfAbsent(group, k -> new ArrayList<>()).add(user); + } + + return groups; } /** * Log summary of synchronization results. */ private void logSyncSummary(SyncMetrics metrics, int totalUsers) { - log.info("=== Mailjet Synchronization Complete ==="); - log.info("Total users to process: {}", totalUsers); - log.info("Successfully processed: {}", metrics.getSuccessCount()); - log.info(" - Created: {}", metrics.getCreatedCount()); - log.info(" - Updated: {}", metrics.getUpdatedCount()); - log.info(" - Deleted: {}", metrics.getDeletedCount()); - log.info(" - Email changed: {}", metrics.getEmailChangedCount()); - log.info(" - Unsubscribed: {}", metrics.getUnsubscribedCount()); - log.info(" - Skipped: {}", metrics.getSkippedCount()); - log.info("Errors:"); - log.info(" - Database errors: {}", metrics.getDatabaseErrorCount()); - log.info(" - Communication errors: {}", metrics.getCommunicationErrorCount()); - log.info(" - Rate limit errors: {}", metrics.getRateLimitErrorCount()); - log.info(" - Mailjet API errors: {}", metrics.getMailjetErrorCount()); - log.info(" - Unexpected errors: {}", metrics.getUnexpectedErrorCount()); - log.info("========================================"); + log.info("{}=== Mailjet Synchronization Complete ===", MAILJET); + log.info("{}Total users to process: {}", MAILJET, totalUsers); + log.info("{}Successfully processed: {}", MAILJET, metrics.getSuccessCount()); + log.info("{} - Created: {}", MAILJET, metrics.getCreatedCount()); + log.info("{} - Updated: {}", MAILJET, metrics.getUpdatedCount()); + log.info("{} - Deleted: {}", MAILJET, metrics.getDeletedCount()); + log.info("{} - Email changed: {}", MAILJET, metrics.getEmailChangedCount()); + log.info("{} - Unsubscribed: {}", MAILJET, metrics.getUnsubscribedCount()); + log.info("{} - Skipped: {}", MAILJET, metrics.getSkippedCount()); + log.info("{}Errors:", MAILJET); + log.info("{} - Database errors: {}", MAILJET, metrics.getDatabaseErrorCount()); + log.info("{} - Communication errors: {}", MAILJET, metrics.getCommunicationErrorCount()); + log.info("{} - Rate limit errors: {}", MAILJET, metrics.getRateLimitErrorCount()); + log.info("{} - Mailjet API errors: {}", MAILJET, metrics.getMailjetErrorCount()); + log.info("{} - Unexpected errors: {}", MAILJET, metrics.getUnexpectedErrorCount()); + log.info("{}========================================", MAILJET); } /** @@ -352,6 +569,10 @@ void incrementUnexpectedError() { unexpectedErrorCount++; } + void incrementUnexpectedError(int count) { + unexpectedErrorCount += count; + } + int getSuccessCount() { return successCount; } @@ -400,4 +621,44 @@ int getUnexpectedErrorCount() { return unexpectedErrorCount; } } + + /** + * Key for grouping users by their subscription preferences. + */ + private static class SubscriptionGroup { + final MailJetSubscriptionAction newsAction; + final MailJetSubscriptionAction eventsAction; + + SubscriptionGroup(final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) { + this.newsAction = newsAction; + this.eventsAction = eventsAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionGroup that = (SubscriptionGroup) o; + return newsAction == that.newsAction && eventsAction == that.eventsAction; + } + + @Override + public int hashCode() { + return java.util.Objects.hash(newsAction, eventsAction); + } + } + + /** + * Record representing a submitted batch job with its users. + */ + private record BatchJob( + String jobId, + SubscriptionGroup group, + List users + ) {} } \ No newline at end of file diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java b/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java index 25fa9eae8b..03b296daea 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/configuration/SegueGuiceConfigurationModule.java @@ -1057,7 +1057,7 @@ private static SegueJobService getSegueJobService(final PropertiesLoader propert "syncMailjetUsersJob", CRON_GROUP_NAME_JAVA_JOB, "Sync users to mailjet", - CRON_STRING_EVERY_FOUR_HOURS); + CRON_STRING_HOURLY); List configuredScheduledJobs = new ArrayList<>(Arrays.asList( piiSqlJob, diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java index d84d98a43b..63db38df84 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/IExternalAccountDataManager.java @@ -27,4 +27,6 @@ public interface IExternalAccountDataManager { void updateProviderLastUpdated(Long userId) throws SegueDatabaseException; void updateExternalAccount(Long userId, String providerUserIdentifier) throws SegueDatabaseException; + + void batchMarkAsSynced(List userIds) throws SegueDatabaseException; } diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java index cb4bbeae57..c78a5480cb 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManager.java @@ -184,6 +184,41 @@ public void updateExternalAccount(final Long userId, final String providerUserId } } + @Override + public void batchMarkAsSynced(final List userIds) throws SegueDatabaseException { + if (userIds == null || userIds.isEmpty()) { + log.debug("{} Batch mark as synced called with empty list", MAILJET); + return; + } + + StringBuilder queryBuilder = new StringBuilder( + "INSERT INTO external_accounts (user_id, provider_name, provider_last_updated) VALUES "); + + for (int i = 0; i < userIds.size(); i++) { + if (i > 0) { + queryBuilder.append(", "); + } + queryBuilder.append("(?, 'MailJet', NOW())"); + } + + queryBuilder.append(" ON CONFLICT (user_id, provider_name) DO UPDATE SET provider_last_updated = NOW()"); + + try (Connection conn = database.getDatabaseConnection(); + PreparedStatement pst = conn.prepareStatement(queryBuilder.toString()) + ) { + for (int i = 0; i < userIds.size(); i++) { + pst.setLong(i + 1, userIds.get(i)); + } + + int rowsAffected = pst.executeUpdate(); + log.debug("{} Batch marked {} users as synced ({} rows affected)", MAILJET, userIds.size(), rowsAffected); + + } catch (SQLException e) { + String errorMsg = String.format("Database error marking %d users as synced", userIds.size()); + throw new SegueDatabaseException(errorMsg + ": " + e.getMessage(), e); + } + } + /** * Build UserExternalAccountChanges object from database result set. * Extracts stage information from registered_contexts JSONB[] field. diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java index cc2eef4482..a0ac426f39 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ContentIndexer.java @@ -71,6 +71,7 @@ public class ContentIndexer { private static final Logger log = LoggerFactory.getLogger(ContentIndexer.class); private static final ConcurrentHashMap VERSION_LOCKS = new ConcurrentHashMap<>(); + private static final String QUESTION = "Question: "; private final ElasticSearchIndexer es; private final GitDb database; @@ -79,30 +80,16 @@ public class ContentIndexer { private static final int MEDIA_FILE_SIZE_LIMIT = 300 * 1024; // Bytes private static final int NANOSECONDS_IN_A_MILLISECOND = 1000000; private static final String ERROR_OCCURRED_SUFFIX = ". The following error occurred: "; + private static final String CONTENT_LOG_PREFIX = "CONTENT - "; - private static class IndexingContext { - final Map contentCache; - final Set tagsList; - final Map allUnits; - final Map publishedUnits; - final Map> indexProblemCache; - final boolean includeUnpublished; - - IndexingContext(final Map contentCache, final Set tagsList, - final Map allUnits, final Map publishedUnits, - final Map> indexProblemCache, final boolean includeUnpublished) { - this.contentCache = contentCache; - this.tagsList = tagsList; - this.allUnits = allUnits; - this.publishedUnits = publishedUnits; - this.indexProblemCache = indexProblemCache; - this.includeUnpublished = includeUnpublished; - } + private record IndexingContext(Map contentCache, Set tagsList, Map allUnits, + Map publishedUnits, Map> indexProblemCache, + boolean includeUnpublished) { boolean shouldSkipUnpublished(final Content content) { - return !includeUnpublished && !content.getPublished(); + return !includeUnpublished && !content.getPublished(); + } } - } @Inject public ContentIndexer(final GitDb database, final ElasticSearchIndexer es, final ContentMapperUtils mapperUtils) { @@ -112,7 +99,7 @@ public ContentIndexer(final GitDb database, final ElasticSearchIndexer es, final } - void loadAndIndexContent(final String version) throws Exception, VersionLockedException { + void loadAndIndexContent(final String version) throws Exception { // Take version lock or fail Boolean alreadyLocked = VERSION_LOCKS.putIfAbsent(version, true); @@ -131,7 +118,7 @@ void loadAndIndexContent(final String version) throws Exception, VersionLockedEx // The case where only some of the content types have been successfully indexed for this version, should // never happen but is covered by an expunge at the start of #buildElasticSearchIndex(...). if (allContentTypesAreIndexedForVersion(version)) { - log.debug("Content already indexed: " + sanitiseInternalLogValue(version)); + log.debug("Content already indexed: {}", sanitiseInternalLogValue(version)); return; } @@ -203,7 +190,7 @@ void setNamedVersion(final String alias, final String version) { * @param allUnits a map of units used in numeric questions * @param publishedUnits a map of units used in published numeric questions * @param indexProblemCache a map of problems found in the indexed content - * @throws ContentManagerException if the SHA is null or the associated resource cannot be accessed + * @throws ContentManagerException if the SHA is null or the associated resource cannot be accessed */ private synchronized void buildGitContentIndex(final String sha, final boolean includeUnpublished, @@ -246,7 +233,6 @@ private synchronized void buildGitContentIndex(final String sha, repository.close(); log.debug("Tags available {}", tagsList); log.debug("All units: {}", allUnits); - log.info("Git content cache population for " + sanitiseInternalLogValue(sha) + " completed!"); } catch (IOException e) { log.error("IOException while trying to access git repository. ", e); @@ -255,7 +241,7 @@ private synchronized void buildGitContentIndex(final String sha, } private void processJsonFile(final TreeWalk treeWalk, final Repository repository, - final IndexingContext context) { + final IndexingContext context) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectLoader loader = repository.open(treeWalk.getObjectId(0)); @@ -278,7 +264,7 @@ private void processJsonFile(final TreeWalk treeWalk, final Repository repositor context.indexProblemCache, treeWalk.getPathString(), content); } } catch (JsonMappingException e) { - log.debug(String.format("Unable to parse the json file found %s as a content object. " + log.warn(String.format(CONTENT_LOG_PREFIX + "Unable to parse the json file found %s as a content object. " + "Skipping file due to error: \n %s", treeWalk.getPathString(), e.getMessage())); Content dummyContent = new Content(); dummyContent.setCanonicalSourceFile(treeWalk.getPathString()); @@ -322,7 +308,7 @@ private void indexContentObject( } private void validateAndCacheContent(final Content flattenedContent, final Content parentContent, - final String treeWalkPath, final IndexingContext context) { + final String treeWalkPath, final IndexingContext context) { if (flattenedContent.getId() == null) { return; } @@ -354,8 +340,6 @@ private void validateAndCacheContent(final Content flattenedContent, final Conte } if (!context.contentCache.containsKey(flattenedContent.getId())) { - log.debug("Loading into cache: {} ({}) from {}", flattenedContent.getId(), flattenedContent.getType(), - treeWalkPath); context.contentCache.put(flattenedContent.getId(), flattenedContent); registerTags(flattenedContent.getTags(), context.tagsList); @@ -419,26 +403,27 @@ private Content augmentChildContent(final Content content, final String canonica content.setCanonicalSourceFile(canonicalSourceFile); - if (!content.getChildren().isEmpty()) { + if (content.getChildren() != null && !content.getChildren().isEmpty()) { for (ContentBase cb : content.getChildren()) { - if (cb instanceof Content) { - Content c = (Content) cb; + if (cb instanceof Content c) { this.augmentChildContent(c, canonicalSourceFile, newParentId, parentPublished); } } } - if (content instanceof Choice) { - Choice choice = (Choice) content; + if (content instanceof Choice choice) { this.augmentChildContent((Content) choice.getExplanation(), canonicalSourceFile, newParentId, parentPublished); } // hack to get cards to count as children: if (content instanceof IsaacCardDeck) { - for (IsaacCard card : ((IsaacCardDeck) content).getCards()) { - this.augmentChildContent(card, canonicalSourceFile, newParentId, parentPublished); + List cards = ((IsaacCardDeck) content).getCards(); + if (cards != null) { + for (IsaacCard card : cards) { + this.augmentChildContent(card, canonicalSourceFile, newParentId, parentPublished); + } } } @@ -462,8 +447,7 @@ private Content augmentChildContent(final Content content, final String canonica } } - if (content instanceof Media) { - Media media = (Media) content; + if (content instanceof Media media) { media.setSrc(fixMediaSrc(canonicalSourceFile, media.getSrc())); // for tracking purposes we want to generate an id for all image content objects. @@ -494,10 +478,9 @@ private void collateSearchableContent(final Content content, final StringBuilder } // Repeat the process for each child - if (!content.getChildren().isEmpty()) { + if (content.getChildren() != null && !content.getChildren().isEmpty()) { for (ContentBase childContentBase : content.getChildren()) { - if (childContentBase instanceof Content) { - Content child = (Content) childContentBase; + if (childContentBase instanceof Content child) { this.collateSearchableContent(child, searchableContentBuilder); } } @@ -512,7 +495,7 @@ private void collateSearchableContent(final Content content, final StringBuilder * @return src with relative paths fixed. */ private void augmentQuestionContent(final Question question, final String canonicalSourceFile, - final String newParentId, final boolean parentPublished) { + final String newParentId, final boolean parentPublished) { augmentHints(question, canonicalSourceFile, newParentId, parentPublished); augmentAnswerContent(question, canonicalSourceFile, newParentId, parentPublished); augmentFeedbackContent(question, canonicalSourceFile, newParentId, parentPublished); @@ -520,7 +503,7 @@ private void augmentQuestionContent(final Question question, final String canoni } private void augmentHints(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getHints() != null) { for (ContentBase cb : question.getHints()) { Content c = (Content) cb; @@ -530,7 +513,7 @@ private void augmentHints(final Question question, final String canonicalSourceF } private void augmentAnswerContent(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getAnswer() != null) { Content answer = (Content) question.getAnswer(); if (answer.getChildren() != null) { @@ -540,12 +523,22 @@ private void augmentAnswerContent(final Question question, final String canonica } } } + + if (question.getDefaultFeedback() != null) { + Content defaultFeedback = question.getDefaultFeedback(); + if (defaultFeedback.getChildren() != null) { + for (ContentBase cb : defaultFeedback.getChildren()) { + Content c = (Content) cb; + this.augmentChildContent(c, canonicalSourceFile, newParentId, parentPublished); + } + } + } } private void augmentFeedbackContent(final Question question, final String canonicalSourceFile, final String newParentId, - final boolean parentPublished) { + final boolean parentPublished) { if (question.getDefaultFeedback() != null) { Content defaultFeedback = question.getDefaultFeedback(); if (defaultFeedback.getChildren() != null) { @@ -558,9 +551,8 @@ private void augmentFeedbackContent(final Question question, } private void augmentChoiceQuestionContent(final Question question, final String canonicalSourceFile, - final String newParentId, final boolean parentPublished) { - if (question instanceof ChoiceQuestion) { - ChoiceQuestion choiceQuestion = (ChoiceQuestion) question; + final String newParentId, final boolean parentPublished) { + if (question instanceof ChoiceQuestion choiceQuestion) { if (choiceQuestion.getChoices() != null) { for (ContentBase cb : choiceQuestion.getChoices()) { Content c = (Content) cb; @@ -623,7 +615,7 @@ private synchronized void registerContentProblem(final Content c, final String m indexProblemCache.put(c, new ArrayList<>()); } - log.debug(message); + log.warn(CONTENT_LOG_PREFIX + "{}", message); indexProblemCache.get(c).add(message); //.replace("_", "\\_")); } @@ -663,8 +655,7 @@ private synchronized void registerUnits(final IsaacNumericQuestion q, final Map< HashMap newUnits = Maps.newHashMap(); for (Choice c : q.getChoices()) { - if (c instanceof Quantity) { - Quantity quantity = (Quantity) c; + if (c instanceof Quantity quantity) { if (quantity.getUnits() != null && !quantity.getUnits().isEmpty()) { String units = quantity.getUnits(); @@ -699,11 +690,11 @@ private synchronized void registerUnits(final IsaacNumericQuestion q, final Map< * @param indexProblemCache a map of problems found in the indexed content */ public synchronized void buildElasticSearchIndex(final String sha, - final Map gitCache, - final Set tagsList, - final Map allUnits, - final Map publishedUnits, - final Map> indexProblemCache) { + final Map gitCache, + final Set tagsList, + final Map allUnits, + final Map publishedUnits, + final Map> indexProblemCache) { if (anyContentTypesAreIndexedForVersion(sha)) { expungeAnyContentTypeIndicesRelatedToVersion(sha); } @@ -720,7 +711,7 @@ public synchronized void buildElasticSearchIndex(final String sha, } catch (JsonProcessingException e) { log.error("Unable to serialize content object: {} for indexing with the search provider.", content.getId(), e); this.registerContentProblem(content, "Search Index Error: " + content.getId() - + content.getCanonicalSourceFile() + " Exception: " + e.toString(), indexProblemCache); + + content.getCanonicalSourceFile() + " Exception: " + e, indexProblemCache); } } @@ -781,8 +772,8 @@ public synchronized void buildElasticSearchIndex(final String sha, startTime = System.nanoTime(); es.bulkIndexWithIds(sha, ContentIndextype.CONTENT.toString(), contentToIndex); endTime = System.nanoTime(); - log.info("Bulk indexing content took: {}ms", (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); - log.info("Search index request sent for: " + sanitiseInternalLogValue(sha)); + log.debug(CONTENT_LOG_PREFIX + "Indexing completed in {}ms", + (endTime - startTime) / NANOSECONDS_IN_A_MILLISECOND); } catch (SegueSearchException e) { log.error("Error whilst trying to perform bulk index operation.", e); } catch (ActionRequestValidationException e) { @@ -851,8 +842,8 @@ private void recordContentErrors(final String sha, final Map gi } } if (!missingContent.isEmpty()) { - log.debug("Referential integrity broken for ({}) related Content items. " - + "The following ids are referenced but do not exist: {}", missingContent.size(), expectedIds); + log.warn(CONTENT_LOG_PREFIX + "Referential integrity broken for ({}) related Content items. " + + "The following ids are referenced but do not exist: {}", missingContent.size(), missingContent); } // Find all references from published content to unpublished content. @@ -925,7 +916,7 @@ private String collateExpandableChildren(final Content content) { ret.append(null != child.getType() ? child.getType() : "undefined").append(","); } } - if (ret.length() > 0) { + if (!ret.isEmpty()) { ret.deleteCharAt(ret.length() - 1); } return ret.toString(); @@ -974,25 +965,25 @@ private void recordContentTypeSpecificError(final String sha, final Content cont private void registerContentProblemsClozeQuestionChoicesHaveWrongNumberOFItems( final Content content, final Map> indexProblemCache) { - if (content instanceof IsaacClozeQuestion) { - IsaacClozeQuestion q = (IsaacClozeQuestion) content; + if (content instanceof IsaacClozeQuestion q) { Integer numberItems = null; - for (Choice choice : q.getChoices()) { - if (choice instanceof ItemChoice) { - ItemChoice c = (ItemChoice) choice; - if (null == c.getItems() || c.getItems().isEmpty()) { - this.registerContentProblem(content, "Cloze Question: " + q.getId() + " has choice with missing items!", - indexProblemCache); - continue; + if (q.getChoices() != null) { + for (Choice choice : q.getChoices()) { + if (choice instanceof ItemChoice c) { + if (null == c.getItems() || c.getItems().isEmpty()) { + this.registerContentProblem(content, "Cloze Question: " + q.getId() + " has choice with missing items!", + indexProblemCache); + continue; + } + int items = c.getItems().size(); + if (numberItems != null && items != numberItems) { + this.registerContentProblem(content, + "Cloze Question: " + q.getId() + " has choice with incorrect number of items!" + + " (Expected " + numberItems + ", got " + items + "!)", indexProblemCache); + continue; + } + numberItems = items; } - int items = c.getItems().size(); - if (numberItems != null && items != numberItems) { - this.registerContentProblem(content, - "Cloze Question: " + q.getId() + " has choice with incorrect number of items!" - + " (Expected " + numberItems + ", got " + items + "!)", indexProblemCache); - continue; - } - numberItems = items; } } } @@ -1001,19 +992,22 @@ private void registerContentProblemsClozeQuestionChoicesHaveWrongNumberOFItems( private void registerContentProblemsSymbolicQuestionInvalidProperties( final Content content, final Map> indexProblemCache) { IsaacSymbolicQuestion question = (IsaacSymbolicQuestion) content; - for (String sym : question.getAvailableSymbols()) { - registerContentProblemQuestionSymbolContainsBackslash(content, indexProblemCache, question, sym); + if (question.getAvailableSymbols() != null) { + for (String sym : question.getAvailableSymbols()) { + registerContentProblemQuestionSymbolContainsBackslash(content, indexProblemCache, question, sym); + } } - for (Choice choice : question.getChoices()) { - if (choice instanceof Formula) { - Formula f = (Formula) choice; - if (f.getPythonExpression().contains("\\")) { - registerContentProblemQuestionFormulaContainsBackslash(content, indexProblemCache, question, choice); - } else if (f.getPythonExpression() == null || f.getPythonExpression().isEmpty()) { - registerContentProblemQuestionFormulaIsEmpty(content, indexProblemCache, question, choice); + if (question.getChoices() != null) { + for (Choice choice : question.getChoices()) { + if (choice instanceof Formula f) { + if (f.getPythonExpression().contains("\\")) { + registerContentProblemQuestionFormulaContainsBackslash(content, indexProblemCache, question, choice); + } else if (f.getPythonExpression() == null || f.getPythonExpression().isEmpty()) { + registerContentProblemQuestionFormulaIsEmpty(content, indexProblemCache, question, choice); + } + } else { + registerContentProblemSymbolicQuestionChoiceIsNotFormula(content, indexProblemCache, question, choice); } - } else { - registerContentProblemSymbolicQuestionChoiceIsNotFormula(content, indexProblemCache, question, choice); } } } @@ -1050,19 +1044,19 @@ private void registerContentProblemQuestionSymbolContainsBackslash( private void registerContentProblemsNumericQuestionInvalidChoicesOrUnits( final Content content, final Map> indexProblemCache) { - if (content instanceof IsaacNumericQuestion) { - IsaacNumericQuestion question = (IsaacNumericQuestion) content; - for (Choice choice : question.getChoices()) { - if (choice instanceof Quantity) { - Quantity quantity = (Quantity) choice; + if (content instanceof IsaacNumericQuestion question) { + if (question.getChoices() != null) { + for (Choice choice : question.getChoices()) { + if (choice instanceof Quantity quantity) { - registerContentProblemCannotParseQuantityChoiceAsNumber(content, indexProblemCache, question, quantity); + registerContentProblemCannotParseQuantityChoiceAsNumber(content, indexProblemCache, question, quantity); - registerContentProblemUnnecessaryQuantityChoiceUnits(content, indexProblemCache, question, quantity); + registerContentProblemUnnecessaryQuantityChoiceUnits(content, indexProblemCache, question, quantity); - } else { - registerContentProblemNumericQuestionChoiceIsNotQuantity(content, indexProblemCache, question, choice); + } else { + registerContentProblemNumericQuestionChoiceIsNotQuantity(content, indexProblemCache, question, choice); + } } } registerContentProblemConflictingUnitSettings(content, indexProblemCache, question); @@ -1146,11 +1140,12 @@ private void registerContentProblemChoiceQuestionMissingAnswer( for (Choice choice : question.getChoices()) { if (choice.isCorrect()) { correctOptionFound = true; + break; } } if (!correctOptionFound) { this.registerContentProblem(question, - "Question: " + question.getId() + " found without a correct answer. " + QUESTION + question.getId() + " found without a correct answer. " + "This question will always be automatically marked as incorrect", indexProblemCache); } } @@ -1158,22 +1153,21 @@ private void registerContentProblemChoiceQuestionMissingAnswer( private void registerContentProblemChoiceQuestionMissingChoices( final Map> indexProblemCache, final ChoiceQuestion question) { this.registerContentProblem(question, - "Question: " + question.getId() + " found without any choice metadata. " + QUESTION + question.getId() + " found without any choice metadata. " + "This question will always be automatically " + "marked as incorrect", indexProblemCache); } private void registerContentProblemQuestionMissingId( final Content content, final Map> indexProblemCache) { if (content instanceof Question && content.getId() == null) { - this.registerContentProblem(content, "Question: " + content.getTitle() + " in " + content.getCanonicalSourceFile() + this.registerContentProblem(content, QUESTION + content.getTitle() + " in " + content.getCanonicalSourceFile() + " found without a unique id. " + "This question cannot be logged correctly.", indexProblemCache); } } private void registerContentProblemsMediaInvalidProperties( final String sha, final Content content, final Map> indexProblemCache) { - if (content instanceof Media) { - Media media = (Media) content; + if (content instanceof Media media) { registerContentProblemMediaNotFoundOrTooLarge(sha, content, indexProblemCache, media); @@ -1184,14 +1178,12 @@ private void registerContentProblemsMediaInvalidProperties( private void registerContentProblemMediaMissingAltText( final Content content, final Map> indexProblemCache, final Media media) { - if (media.getAltText() == null || media.getAltText().isEmpty()) { - if (!(media instanceof Video) && !media.getId().equals("eventThumbnail")) { - // Videos probably don't need alt text unless there is a good reason. It's not important that event - // thumbnails have alt text, so we don't record errors for those either. - this.registerContentProblem(content, "No altText attribute set for media element: " + media.getSrc() - + " in Git source file " + content.getCanonicalSourceFile(), indexProblemCache); - } + if ((media.getAltText() == null || media.getAltText().isEmpty()) && !(media instanceof Video) && + !media.getId().equals("eventThumbnail")) { + this.registerContentProblem(content, "No altText attribute set for media element: " + media.getSrc() + + " in Git source file " + content.getCanonicalSourceFile(), indexProblemCache); } + } private void registerContentProblemMediaNotFoundOrTooLarge( diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java index 382e913d07..e358b00492 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/etl/ETLManager.java @@ -20,6 +20,7 @@ class ETLManager { private static final String LATEST_INDEX_ALIAS = "latest"; private static final String TASK_PERIOD_SECONDS = "TASK_PERIOD_SECONDS"; private static final long TASK_PERIOD_SECONDS_FALLBACK = 300; + private static final String CONTENT_LOG_PREFIX = "CONTENT - "; private final ContentIndexer indexer; private final SchoolIndexer schoolIndexer; @@ -55,10 +56,10 @@ class ETLManager { } void setNamedVersion(final String alias, final String version) throws Exception { - log.debug("Requested aliased version: {} - {}", alias, version); + log.info(CONTENT_LOG_PREFIX + "Requested aliased version: {} - {}", alias, version); indexer.loadAndIndexContent(version); indexer.setNamedVersion(alias, version); - log.debug("Version {} with alias '{}' is successfully indexed.", version, alias); + log.info(CONTENT_LOG_PREFIX + "Version {} with alias '{}' is successfully indexed.", version, alias); } // Indexes all content in idempotent fashion. If the content is already indexed no action is taken. @@ -95,13 +96,13 @@ private class ContentIndexerTask implements Runnable { @Override public void run() { - log.debug("Starting content indexer thread."); + log.info(CONTENT_LOG_PREFIX + "Starting content indexer thread."); try { indexContent(); } catch (Exception e) { - log.error("ContentIndexerTask failed.", e); + log.error(CONTENT_LOG_PREFIX + "ContentIndexerTask failed.", e); } - log.debug("Content indexer thread complete, waiting for next scheduled run."); + log.info(CONTENT_LOG_PREFIX + "Content indexer thread complete, waiting for next scheduled run."); } } } diff --git a/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java b/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java index 12fd2d101c..235ef19183 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java +++ b/src/main/java/uk/ac/cam/cl/dtg/segue/scheduler/jobs/SyncMailjetUsersJob.java @@ -35,6 +35,7 @@ public class SyncMailjetUsersJob implements Job { private static final Logger log = LoggerFactory.getLogger(SyncMailjetUsersJob.class); + private static final String MAILJET = "MAILJET - "; private final IExternalAccountManager externalAccountManager; private final EmailManager emailManager; @@ -55,7 +56,7 @@ public SyncMailjetUsersJob() { public void execute(final JobExecutionContext context) throws JobExecutionException { try { externalAccountManager.synchroniseChangedUsers(); - log.info("Success: synchronised users"); + log.info("{}Success: synchronised users", MAILJET); } catch (ExternalAccountSynchronisationException e) { final String subject = "Failed to execute SyncMailjetUsersJob"; StringWriter stringWriter = new StringWriter(); @@ -66,7 +67,7 @@ public void execute(final JobExecutionContext context) throws JobExecutionExcept new EmailCommunicationMessage(properties.getProperty(Constants.SERVER_ADMIN_ADDRESS), subject, exception, exception, EmailType.ADMIN); emailManager.addSystemEmailToQueue(email); - log.error("Failed to synchronise users"); + log.error("{}Failed to synchronise users", MAILJET); } } diff --git a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java index 50daa7cf06..f619bb710c 100644 --- a/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java +++ b/src/main/java/uk/ac/cam/cl/dtg/util/email/MailJetApiClientWrapper.java @@ -26,6 +26,7 @@ import com.mailjet.client.errors.MailjetException; import com.mailjet.client.resource.Contact; import com.mailjet.client.resource.ContactManagecontactslists; +import com.mailjet.client.resource.ContactManagemanycontacts; import com.mailjet.client.resource.Contactdata; import com.mailjet.client.resource.Contacts; import com.mailjet.client.resource.ContactslistImportList; @@ -34,12 +35,17 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; public class MailJetApiClientWrapper { private static final Logger log = LoggerFactory.getLogger(MailJetApiClientWrapper.class); + private static final String MAILJET = "MAILJET - "; private static final String PROPERTY_VALUE_KEY = "value"; + private static final int BULK_BATCH_SIZE = 100; + private static final String ACTION = "Action"; + private static final String LIST_ID = "ListID"; private final MailjetClient mailjetClient; private final String newsListId; @@ -81,7 +87,7 @@ public MailJetApiClientWrapper(final String mailjetApiKey, final String mailjetA */ public JSONObject getAccountByIdOrEmail(final String mailjetIdOrEmail) throws MailjetException { if (mailjetIdOrEmail == null || mailjetIdOrEmail.trim().isEmpty()) { - log.debug("Attempted to get account with null/empty identifier"); + log.debug("{}Attempted to get account with null/empty identifier", MAILJET); return null; } @@ -94,7 +100,7 @@ public JSONObject getAccountByIdOrEmail(final String mailjetIdOrEmail) throws Ma } if (response.getStatus() != 200) { - log.warn("Unexpected Mailjet response status {} when fetching account", response.getStatus()); + log.warn("{}Unexpected Mailjet response status {} when fetching account", MAILJET, response.getStatus()); throw new MailjetException("Unexpected response status: " + response.getStatus()); } @@ -135,9 +141,9 @@ public void permanentlyDeleteAccountById(final String mailjetId) throws MailjetE MailjetResponse response = mailjetClient.delete(request); if (response.getStatus() == 204 || response.getStatus() == 200) { - log.info("Successfully deleted Mailjet account: {}", mailjetId); + log.info("{}Successfully deleted Mailjet account: {}", MAILJET, mailjetId); } else if (response.getStatus() == 404) { - log.debug("Attempted to delete non-existent Mailjet account: {}", mailjetId); + log.debug("{}Attempted to delete non-existent Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException("Failed to delete account. Status: " + response.getStatus()); } @@ -168,7 +174,7 @@ public void permanentlyDeleteAccountById(final String mailjetId) throws MailjetE */ public String addNewUserOrGetUserIfExists(final String email) throws MailjetException { if (email == null || email.trim().isEmpty()) { - log.warn("Attempted to create Mailjet account with null/empty email"); + log.warn("{}Attempted to create Mailjet account with null/empty email", MAILJET); return null; } @@ -201,7 +207,7 @@ private String createNewMailjetAccount(String normalizedEmail) throws MailjetExc if (response.getStatus() == 201 || response.getStatus() == 200) { JSONObject responseData = response.getData().getJSONObject(0); String mailjetId = String.valueOf(responseData.get("ID")); - log.info("Successfully created Mailjet account: {}", mailjetId); + log.info("{}Successfully created Mailjet account: {}", MAILJET, mailjetId); return mailjetId; } @@ -220,13 +226,13 @@ private String createNewMailjetAccount(String normalizedEmail) throws MailjetExc private String handleUserAlreadyExists(MailjetClientRequestException e, String normalizedEmail) throws MailjetException { if (e.getMessage() != null && e.getMessage().toLowerCase().contains("already exists")) { - log.debug("User already exists in Mailjet, fetching existing account"); + log.debug("{}User already exists in Mailjet, fetching existing account", MAILJET); try { JSONObject existingAccount = getAccountByIdOrEmail(normalizedEmail); if (existingAccount != null) { String mailjetId = String.valueOf(existingAccount.get("ID")); - log.info("Retrieved existing Mailjet account: {}", mailjetId); + log.info("{}Retrieved existing Mailjet account: {}", MAILJET, mailjetId); return mailjetId; } else { String errorMsg = String.format("User reported as existing but couldn't fetch account: %s", normalizedEmail); @@ -284,7 +290,7 @@ public void updateUserProperties(final String mailjetId, final String firstName, MailjetResponse response = mailjetClient.put(request); if (response.getStatus() == 200 && response.getTotal() == 1) { - log.debug("Successfully updated properties for Mailjet account: {}", mailjetId); + log.debug("{}Successfully updated properties for Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException( String.format("Failed to update user properties. Status: %d, Total: %d", response.getStatus(), @@ -343,7 +349,7 @@ ContactManagecontactslists.CONTACTSLISTS, new JSONArray().put( MailjetResponse response = mailjetClient.post(request); if (response.getStatus() == 201 && response.getTotal() == 1) { - log.debug("Successfully updated subscriptions for Mailjet account: {}", mailjetId); + log.debug("{}Successfully updated subscriptions for Mailjet account: {}", MAILJET, mailjetId); } else { throw new MailjetException( String.format("Failed to update user subscriptions. Status: %d, Total: %d", response.getStatus(), @@ -393,4 +399,182 @@ private boolean isCommunicationException(MailjetException e) { String msg = e.getMessage().toLowerCase(); return msg.contains("timeout") || msg.contains("connection"); } + + /** + * Bulk create or update contacts with properties and list subscriptions. + *
+ * Uses the asynchronous ContactManagemanycontacts endpoint to handle up to BULK_BATCH_SIZE + * contacts in a single API call. All contacts in the batch share the same list subscription + * actions. The caller must partition users by subscription state before calling this method. + * + * @param users - list of users to sync (caller must ensure size <= BULK_BATCH_SIZE) + * @param newsAction - subscription action for NEWS_AND_UPDATES list + * @param eventsAction - subscription action for EVENTS list + * @return Mailjet job ID for async tracking (or null if request fails) + * @throws MailjetException - if underlying MailjetClient throws an exception + */ + public String bulkSyncUsers(final java.util.List users, + final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) throws MailjetException { + if (users == null || users.isEmpty()) { + log.warn("{}Attempted to bulk sync empty user list", MAILJET); + return null; + } + + if (users.size() > BULK_BATCH_SIZE) { + throw new IllegalArgumentException("{}Bulk sync batch size ({}) exceeds limit ({})" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())) + .replace("{}", String.valueOf(BULK_BATCH_SIZE))); + } + + try { + JSONArray contactsArray = buildContactsArray(users); + JSONArray listsArray = buildListsArray(newsAction, eventsAction); + + MailjetRequest request = new MailjetRequest(ContactManagemanycontacts.resource) + .property(ContactManagemanycontacts.CONTACTS, contactsArray) + .property(ContactManagemanycontacts.CONTACTSLISTS, listsArray); + + return submitBulkSyncRequest(request, users); + + } catch (JSONException e) { + String errorMsg = "{}JSON parsing error during bulk sync of {} users".replace("{}", MAILJET) + .replace("{}", String.valueOf(users.size())); + throw new MailjetException(errorMsg, e); + + } catch (MailjetException e) { + if (isCommunicationException(e)) { + String errorMsg = "{}Communication error during bulk sync of {} users" + .replace("{}", MAILJET).replace("{}", String.valueOf(users.size())); + throw new MailjetClientCommunicationException(errorMsg, e); + } + throw e; + } + } + + /** + * Build contacts array for bulk sync request. + * Extracted to reduce cognitive complexity. + */ + private JSONArray buildContactsArray(final java.util.List users) { + JSONArray contactsArray = new JSONArray(); + for (UserExternalAccountChanges user : users) { + JSONObject contactObj = new JSONObject() + .put("Email", user.getAccountEmail()) + .put("Name", user.getGivenName() != null ? user.getGivenName() : "") + .put("Properties", new JSONObject() + .put("firstname", user.getGivenName() != null ? user.getGivenName() : "") + .put("role", user.getRole().toString()) + .put("verification_status", user.getEmailVerificationStatus().toString()) + .put("stage", user.getStage() != null ? user.getStage() : "unknown")); + contactsArray.put(contactObj); + } + return contactsArray; + } + + /** + * Build lists array for bulk sync request. + * Extracted to reduce cognitive complexity. + */ + private JSONArray buildListsArray(final MailJetSubscriptionAction newsAction, + final MailJetSubscriptionAction eventsAction) { + return new JSONArray() + .put(new JSONObject() + .put(LIST_ID, legalListId) + .put(ACTION, MailJetSubscriptionAction.FORCE_SUBSCRIBE.getValue())) + .put(new JSONObject() + .put(LIST_ID, newsListId) + .put(ACTION, newsAction.getValue())) + .put(new JSONObject() + .put(LIST_ID, eventsListId) + .put(ACTION, eventsAction.getValue())); + } + + /** + * Submit the bulk sync request and handle the response. + * Extracted to reduce cognitive complexity. + */ + private String submitBulkSyncRequest(final MailjetRequest request, + final java.util.List users) + throws MailjetException { + MailjetResponse response = mailjetClient.post(request); + int status = response.getStatus(); + + if (status == 200 || status == 201) { + JSONObject responseData = response.getData().getJSONObject(0); + String jobId = responseData.optString("JobID", null); + log.info("{}Bulk sync submitted for {} users (job ID: {})", MAILJET, users.size(), jobId); + return jobId; + } + + throw new MailjetException( + "{}Failed to submit bulk sync. Status: {}".replace("{}", MAILJET) + .replace("{}", String.valueOf(status))); + } + + /** + * Get the status of an async bulk sync job. + * + * @param jobId - the job ID returned from bulkSyncUsers + * @return a JobStatus record with job status details + * @throws MailjetException - if the API call fails + */ + public JobStatus getBulkJobStatus(final String jobId) throws MailjetException { + if (jobId == null || jobId.trim().isEmpty()) { + throw new IllegalArgumentException("Job ID cannot be null or empty"); + } + + try { + MailjetRequest request = new MailjetRequest(ContactManagemanycontacts.resource, Long.parseLong(jobId)); + MailjetResponse response = mailjetClient.get(request); + + if (response.getStatus() != 200) { + throw new MailjetException("Failed to fetch job status. Status: " + response.getStatus()); + } + + JSONObject jobData = response.getData().getJSONObject(0); + String status = jobData.optString("Status", "Unknown"); + int processed = jobData.optInt("Processed", 0); + int inserted = jobData.optInt("Inserted", 0); + int updated = jobData.optInt("Updated", 0); + int unchanged = jobData.optInt("Unchanged", 0); + int errors = jobData.optInt("Error", 0); + + return new JobStatus(status, processed, inserted, updated, unchanged, errors); + + } catch (NumberFormatException e) { + throw new MailjetException("Invalid job ID format: " + jobId, e); + + } catch (JSONException e) { + throw new MailjetException("JSON parsing error when fetching job status for job ID: " + jobId, e); + + } catch (MailjetException e) { + if (isCommunicationException(e)) { + String errorMsg = "Communication error fetching job status for job ID: " + jobId; + throw new MailjetClientCommunicationException(errorMsg, e); + } + throw e; + } + } + + /** + * Record representing the status of a bulk sync job. + */ + public record JobStatus( + String status, + int processed, + int inserted, + int updated, + int unchanged, + int errors + ) { + public boolean isComplete() { + return "Completed".equalsIgnoreCase(status); + } + + public boolean hasFailed() { + return "Error".equalsIgnoreCase(status); + } + } + } diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java index d23ca4a64c..7e3eafa296 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/api/managers/ExternalAccountManagerTest.java @@ -1,6 +1,8 @@ package uk.ac.cam.cl.dtg.segue.api.managers; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; @@ -11,21 +13,18 @@ import com.mailjet.client.errors.MailjetClientCommunicationException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetRateLimitException; -import java.util.ArrayList; import java.util.List; import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.NullAndEmptySource; -import org.junit.jupiter.params.provider.ValueSource; import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; import uk.ac.cam.cl.dtg.isaac.dos.users.Role; import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; import uk.ac.cam.cl.dtg.segue.dao.SegueDatabaseException; import uk.ac.cam.cl.dtg.segue.dao.users.IExternalAccountDataManager; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; +import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper.JobStatus; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; class ExternalAccountManagerTest { @@ -45,25 +44,27 @@ public void setUp() { class SynchroniseChangedUsersTests { @Test - void synchroniseChangedUsers_WithNewUser_ShouldCreateAccount() + void synchroniseChangedUsers_WithBulkUsers_ShouldSubmitAndPoll() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + // Arrange - users with same subscription preferences should be batched + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.STUDENT, "Jane", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn("mailjetId123"); - mockMailjetApi.updateUserProperties("mailjetId123", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId123", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "mailjetId123"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + // Job completes successfully with no errors + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 2, 2, 0, 0, 0)); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); replay(mockDatabase, mockMailjetApi); @@ -76,28 +77,34 @@ void synchroniseChangedUsers_WithNewUser_ShouldCreateAccount() } @Test - void synchroniseChangedUsers_WithExistingUser_ShouldUpdateAccount() + void synchroniseChangedUsers_WithMixedSubscriptionPreferences_ShouldGroupByPreferences() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + // Arrange - users with different subscription preferences should be in different groups + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.STUDENT, "Jane", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.updateUserProperties("existingMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("existingMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "existingMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); + // First bulk call for group (FORCE_SUBSCRIBE, FORCE_SUBSCRIBE) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE))).andReturn("job123"); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 1, 0, 0, 0)); + + // Second bulk call for group (FORCE_SUBSCRIBE, UNSUBSCRIBE) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.FORCE_SUBSCRIBE), + eq(MailJetSubscriptionAction.UNSUBSCRIBE))).andReturn("job124"); + expect(mockMailjetApi.getBulkJobStatus("job124")) + .andReturn(new JobStatus("Completed", 1, 0, 1, 0, 0)); + + mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); replay(mockDatabase, mockMailjetApi); @@ -110,26 +117,25 @@ void synchroniseChangedUsers_WithExistingUser_ShouldUpdateAccount() } @Test - void synchroniseChangedUsers_WithDeletedUser_ShouldDeleteAccount() + void synchroniseChangedUsers_WithDeletedUser_ShouldDeleteIndividually() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", true, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, "mailjetId123", "test@example.com", Role.STUDENT, "John", true, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.permanentlyDeleteAccountById("existingMailjetId"); + mockMailjetApi.permanentlyDeleteAccountById("mailjetId123"); expectLastCall(); mockDatabase.updateExternalAccount(1L, null); expectLastCall(); mockDatabase.updateProviderLastUpdated(1L); expectLastCall(); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); replay(mockDatabase, mockMailjetApi); @@ -141,24 +147,23 @@ void synchroniseChangedUsers_WithDeletedUser_ShouldDeleteAccount() } @Test - void synchroniseChangedUsers_WithDeliveryFailed_ShouldUnsubscribeFromAll() + void synchroniseChangedUsers_WithDeliveryFailedUser_ShouldGroupAsRemove() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.DELIVERY_FAILED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.DELIVERY_FAILED, true, true, "GCSE" + ) ); - List changedUsers = List.of(userChanges); - - JSONObject mailjetDetails = new JSONObject(); - mailjetDetails.put("Email", "test@example.com"); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(mailjetDetails); - mockMailjetApi.updateUserSubscriptions("existingMailjetId", - MailJetSubscriptionAction.REMOVE, MailJetSubscriptionAction.REMOVE); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); + // Delivery failed users should call with REMOVE for both news and events + expect(mockMailjetApi.bulkSyncUsers(anyObject(), eq(MailJetSubscriptionAction.REMOVE), + eq(MailJetSubscriptionAction.REMOVE))).andReturn("job123"); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 1, 0)); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); replay(mockDatabase, mockMailjetApi); @@ -171,32 +176,10 @@ void synchroniseChangedUsers_WithDeliveryFailed_ShouldUnsubscribeFromAll() } @Test - void synchroniseChangedUsers_WithEmailChange_ShouldRecreateAccount() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + void synchroniseChangedUsers_WithEmptyUserList_ShouldReturnWithoutError() + throws SegueDatabaseException, ExternalAccountSynchronisationException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "newemail@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - JSONObject oldMailjetDetails = new JSONObject(); - oldMailjetDetails.put("Email", "oldemail@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")).andReturn(oldMailjetDetails); - mockMailjetApi.permanentlyDeleteAccountById("existingMailjetId"); - expectLastCall(); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("newemail@example.com")).andReturn("newMailjetId"); - mockMailjetApi.updateUserProperties("newMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(List.of()); replay(mockDatabase, mockMailjetApi); @@ -208,67 +191,79 @@ void synchroniseChangedUsers_WithEmailChange_ShouldRecreateAccount() } @Test - void synchroniseChangedUsers_WithMailjetIdButAccountNotFound_ShouldTreatAsNew() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDatabaseException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "nonExistentMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("nonExistentMailjetId")).andReturn(null); - mockDatabase.updateExternalAccount(1L, null); - expectLastCall(); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn("newMailjetId"); - mockMailjetApi.updateUserProperties("newMailjetId", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newMailjetId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newMailjetId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); + expect(mockDatabase.getRecentlyChangedRecords()) + .andThrow(new SegueDatabaseException("Database error")); - replay(mockDatabase, mockMailjetApi); + replay(mockDatabase); - // Act - externalAccountManager.synchroniseChangedUsers(); + // Act & Assert + assertThrows(ExternalAccountSynchronisationException.class, + () -> externalAccountManager.synchroniseChangedUsers()); - // Assert - verify(mockDatabase, mockMailjetApi); + verify(mockDatabase); } - @ParameterizedTest - @NullAndEmptySource - @ValueSource(strings = {" "}) - void synchroniseChangedUsers_WithNullOrEmptyEmail_ShouldSkip(String email) - throws SegueDatabaseException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, email, Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + @Test + void synchroniseChangedUsers_WithJobErrorsButUserDataCorrect_ShouldMarkAsSynced() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - job completes but has errors, but user data is correct at Mailjet + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Job has 1 error + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 0, 1)); + // Recovery: user data is correct in Mailjet despite job error + JSONObject mailjetContact = new JSONObject() + .put("Name", "John") + .put("Properties", new JSONObject() + .put("role", "STUDENT") + .put("verification_status", "VERIFIED") + .put("stage", "GCSE")); + expect(mockMailjetApi.getAccountByIdOrEmail("test@example.com")) + .andReturn(mailjetContact); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); + expectLastCall(); replay(mockDatabase, mockMailjetApi); // Act externalAccountManager.synchroniseChangedUsers(); - // Assert - No mailjet calls should be made + // Assert verify(mockDatabase, mockMailjetApi); } @Test - void synchroniseChangedUsers_WithEmptyList_ShouldReturnEarly() - throws SegueDatabaseException, ExternalAccountSynchronisationException { - // Arrange - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(new ArrayList<>()); + void synchroniseChangedUsers_WithJobErrorsAndUserNotFound_ShouldNotSyncUser() + throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { + // Arrange - job completes with errors and user not found in Mailjet + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) + ); + + expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Job has 1 error + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 0, 0, 1)); + // Recovery: user not found in Mailjet after error - treated as failed + expect(mockMailjetApi.getAccountByIdOrEmail("test@example.com")) + .andReturn(null); + // batchMarkAsSynced not called since user failed recovery (empty list) replay(mockDatabase, mockMailjetApi); @@ -279,33 +274,19 @@ void synchroniseChangedUsers_WithEmptyList_ShouldReturnEarly() verify(mockDatabase, mockMailjetApi); } - @Test - void synchroniseChangedUsers_WithDatabaseException_ShouldThrow() throws SegueDatabaseException { - // Arrange - expect(mockDatabase.getRecentlyChangedRecords()) - .andThrow(new SegueDatabaseException("Database error")); - - replay(mockDatabase); - - // Act & Assert - assertThrows(ExternalAccountSynchronisationException.class, - () -> externalAccountManager.synchroniseChangedUsers()); - - verify(mockDatabase); - } - @Test void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetException("Mailjet error")); replay(mockDatabase, mockMailjetApi); @@ -321,14 +302,15 @@ void synchroniseChangedUsers_WithMailjetException_ShouldLogAndContinue() void synchroniseChangedUsers_WithCommunicationException_ShouldThrow() throws SegueDatabaseException, MailjetException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetClientCommunicationException("Communication error")); replay(mockDatabase, mockMailjetApi); @@ -344,14 +326,15 @@ void synchroniseChangedUsers_WithCommunicationException_ShouldThrow() void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() throws SegueDatabaseException, MailjetException { // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "existingMailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("existingMailjetId")) + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) .andThrow(new MailjetRateLimitException("Rate limit exceeded")); replay(mockDatabase, mockMailjetApi); @@ -367,98 +350,25 @@ void synchroniseChangedUsers_WithRateLimitException_ShouldThrow() } @Test - void synchroniseChangedUsers_WithDatabaseErrorDuringUpdate_ShouldLogAndContinue() + void synchroniseChangedUsers_WithSingleRateLimitDuringPolling_ShouldContinuePolling() throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges user1 = new UserExternalAccountChanges( - 1L, "mailjetId1", "test1@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - UserExternalAccountChanges user2 = new UserExternalAccountChanges( - 2L, "mailjetId2", "test2@example.com", Role.TEACHER, "Jane", false, - EmailVerificationStatus.VERIFIED, false, true, "A Level" + // Arrange - single rate limit should be tolerated + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(user1, user2); - - JSONObject mailjet1 = new JSONObject(); - mailjet1.put("Email", "test1@example.com"); - JSONObject mailjet2 = new JSONObject(); - mailjet2.put("Email", "test2@example.com"); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - - // First user - database error during update - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId1")).andReturn(mailjet1); - mockMailjetApi.updateUserProperties("mailjetId1", "John", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId1", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "mailjetId1"); - expectLastCall().andThrow(new SegueDatabaseException("DB error")); - - // Second user - should still process - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId2")).andReturn(mailjet2); - mockMailjetApi.updateUserProperties("mailjetId2", "Jane", "TEACHER", "VERIFIED", "A Level"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("mailjetId2", - MailJetSubscriptionAction.UNSUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(2L, "mailjetId2"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(2L); - expectLastCall(); - - replay(mockDatabase, mockMailjetApi); - - // Act - Should not throw, continues processing - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithMultipleUsers_ShouldProcessAll() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges user1 = new UserExternalAccountChanges( - 1L, null, "new@example.com", Role.STUDENT, "Alice", false, - EmailVerificationStatus.VERIFIED, true, true, "GCSE" - ); - UserExternalAccountChanges user2 = new UserExternalAccountChanges( - 2L, "existingId", "existing@example.com", Role.TEACHER, "Bob", false, - EmailVerificationStatus.VERIFIED, false, false, "A Level" - ); - List changedUsers = List.of(user1, user2); - - JSONObject mailjet2 = new JSONObject(); - mailjet2.put("Email", "existing@example.com"); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - - // User 1 - new user - expect(mockMailjetApi.addNewUserOrGetUserIfExists("new@example.com")).andReturn("newId"); - mockMailjetApi.updateUserProperties("newId", "Alice", "STUDENT", "VERIFIED", "GCSE"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("newId", - MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(1L, "newId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(1L); - expectLastCall(); - - // User 2 - existing user - expect(mockMailjetApi.getAccountByIdOrEmail("existingId")).andReturn(mailjet2); - mockMailjetApi.updateUserProperties("existingId", "Bob", "TEACHER", "VERIFIED", "A Level"); - expectLastCall(); - mockMailjetApi.updateUserSubscriptions("existingId", - MailJetSubscriptionAction.UNSUBSCRIBE, MailJetSubscriptionAction.UNSUBSCRIBE); - expectLastCall(); - mockDatabase.updateExternalAccount(2L, "existingId"); - expectLastCall(); - mockDatabase.updateProviderLastUpdated(2L); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // First poll hits rate limit, second succeeds + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andThrow(new MailjetRateLimitException("Rate limit")); + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andReturn(new JobStatus("Completed", 1, 0, 1, 0, 0)); + mockDatabase.batchMarkAsSynced(anyObject(List.class)); expectLastCall(); replay(mockDatabase, mockMailjetApi); @@ -471,48 +381,34 @@ void synchroniseChangedUsers_WithMultipleUsers_ShouldProcessAll() } @Test - void synchroniseChangedUsers_WithUnexpectedError_ShouldLogAndContinue() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, "mailjetId", "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" + void synchroniseChangedUsers_WithRepeatedRateLimitDuringPolling_ShouldFailFast() + throws SegueDatabaseException, MailjetException { + // Arrange - 2+ consecutive rate limits should fail fast + List changedUsers = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + ) ); - List changedUsers = List.of(userChanges); expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.getAccountByIdOrEmail("mailjetId")) - .andThrow(new RuntimeException("Unexpected error")); + expect(mockMailjetApi.bulkSyncUsers(anyObject(), anyObject(), anyObject())) + .andReturn("job123"); + // Both polls hit rate limit + expect(mockMailjetApi.getBulkJobStatus("job123")) + .andThrow(new MailjetRateLimitException("Rate limit")) + .times(2); replay(mockDatabase, mockMailjetApi); - // Act - Should not throw, logs error and continues - externalAccountManager.synchroniseChangedUsers(); - - // Assert - verify(mockDatabase, mockMailjetApi); - } - - @Test - void synchroniseChangedUsers_WithNewUserAndNullMailjetId() - throws SegueDatabaseException, MailjetException, ExternalAccountSynchronisationException { - // Arrange - UserExternalAccountChanges userChanges = new UserExternalAccountChanges( - 1L, null, "test@example.com", Role.STUDENT, "John", false, - EmailVerificationStatus.VERIFIED, true, false, "GCSE" - ); - List changedUsers = List.of(userChanges); - - expect(mockDatabase.getRecentlyChangedRecords()).andReturn(changedUsers); - expect(mockMailjetApi.addNewUserOrGetUserIfExists("test@example.com")).andReturn(null); - - replay(mockDatabase, mockMailjetApi); + // Act & Assert - should throw on 2nd consecutive rate limit + ExternalAccountSynchronisationException exception = assertThrows( + ExternalAccountSynchronisationException.class, + () -> externalAccountManager.synchroniseChangedUsers()); - // Act & Assert - externalAccountManager.synchroniseChangedUsers(); + assertTrue(exception.getMessage().contains("rate limit")); verify(mockDatabase, mockMailjetApi); } - } } diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java index 16cd0726be..74d8cf215e 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/dao/users/PgExternalAccountPersistenceManagerTest.java @@ -15,6 +15,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -126,6 +127,115 @@ void getRecentlyChangedRecords_WithDatabaseError_ShouldThrowException() throws E } } + @Nested + class BatchMarkAsSyncedTests { + + @Test + void batchMarkAsSynced_WithValidUserIds_ShouldUpdateDatabase() throws Exception { + // Arrange + List userIds = List.of(1L, 2L, 3L); + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + mockPreparedStatement.setLong(1, 1L); + expectLastCall(); + mockPreparedStatement.setLong(2, 2L); + expectLastCall(); + mockPreparedStatement.setLong(3, 3L); + expectLastCall(); + + expect(mockPreparedStatement.executeUpdate()).andReturn(3); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(userIds); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithEmptyList_ShouldDoNothing() throws Exception { + // Arrange - no expectations set, method should return early + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(new ArrayList<>()); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithLargeUserList_ShouldBatchInsert() throws Exception { + // Arrange + List userIds = new ArrayList<>(); + for (long i = 1; i <= 100; i++) { + userIds.add(i); + } + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + for (long i = 1; i <= 100; i++) { + mockPreparedStatement.setLong((int) i, i); + expectLastCall(); + } + + expect(mockPreparedStatement.executeUpdate()).andReturn(100); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act + persistenceManager.batchMarkAsSynced(userIds); + + // Assert + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + + @Test + void batchMarkAsSynced_WithDatabaseError_ShouldThrowException() throws Exception { + // Arrange + List userIds = List.of(1L, 2L); + + expect(mockDatabase.getDatabaseConnection()).andReturn(mockConnection); + expect(mockConnection.prepareStatement(anyString())).andReturn(mockPreparedStatement); + + mockPreparedStatement.setLong(1, 1L); + expectLastCall(); + mockPreparedStatement.setLong(2, 2L); + expectLastCall(); + + expect(mockPreparedStatement.executeUpdate()).andThrow(new SQLException("Database error")); + + mockPreparedStatement.close(); + expectLastCall(); + mockConnection.close(); + expectLastCall(); + + replay(mockDatabase, mockConnection, mockPreparedStatement); + + // Act & Assert + assertThrows(SegueDatabaseException.class, + () -> persistenceManager.batchMarkAsSynced(userIds)); + + verify(mockDatabase, mockConnection, mockPreparedStatement); + } + } + // Helper method to setup mock ResultSet with all expected calls private void setupMockResultSetForUser(Long userId, String mailjetId, String email, String role, String givenName, boolean deleted, String verificationStatus, diff --git a/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java b/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java index 54870aa6af..9b10a6e142 100644 --- a/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java +++ b/src/test/java/uk/ac/cam/cl/dtg/segue/util/email/MailJetApiClientWrapperTest.java @@ -6,6 +6,8 @@ import com.mailjet.client.errors.MailjetClientRequestException; import com.mailjet.client.errors.MailjetException; import com.mailjet.client.errors.MailjetClientCommunicationException; +import java.util.ArrayList; +import java.util.List; import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; @@ -14,6 +16,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; +import uk.ac.cam.cl.dtg.isaac.dos.users.EmailVerificationStatus; +import uk.ac.cam.cl.dtg.isaac.dos.users.Role; +import uk.ac.cam.cl.dtg.isaac.dos.users.UserExternalAccountChanges; import uk.ac.cam.cl.dtg.util.email.MailJetApiClientWrapper; import uk.ac.cam.cl.dtg.util.email.MailJetSubscriptionAction; @@ -671,6 +676,123 @@ void updateUserSubscriptions_WithCommunicationError_ShouldThrowCommunicationExce } } + @Nested + class BulkSyncUsersTests { + + @Test + void bulkSyncUsers_WithValidUsers_ShouldReturnJobId() throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test1@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ), + new UserExternalAccountChanges( + 2L, null, "test2@example.com", Role.TEACHER, "Jane", false, + EmailVerificationStatus.VERIFIED, false, true, "A_LEVEL" + ) + ); + + MailjetResponse mockResponse = createMock(MailjetResponse.class); + JSONArray mockData = new JSONArray(); + JSONObject mockResult = new JSONObject(); + mockResult.put("JobID", "job123"); + mockData.put(mockResult); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))).andReturn(mockResponse); + expect(mockResponse.getStatus()).andReturn(200); + expect(mockResponse.getData()).andReturn(mockData); + + replay(mockMailjetClient, mockResponse); + + // Act + String jobId = mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); + + // Assert + verify(mockMailjetClient, mockResponse); + assertEquals("job123", jobId); + } + + @Test + void bulkSyncUsers_WithEmptyList_ShouldReturnNull() throws MailjetException { + // Act + String result = mailJetApiClientWrapper.bulkSyncUsers(new ArrayList<>(), + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE); + + // Assert + assertNull(result); + } + + @Test + void bulkSyncUsers_WithOversizedBatch_ShouldThrowException() { + // Arrange + List users = new ArrayList<>(); + for (int i = 0; i < 1001; i++) { + users.add(new UserExternalAccountChanges( + (long) i, null, "test" + i + "@example.com", Role.STUDENT, "User" + i, false, + EmailVerificationStatus.VERIFIED, true, false, "GCSE" + )); + } + + // Act & Assert + assertThrows(IllegalArgumentException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + + @Test + void bulkSyncUsers_WithApiError_ShouldThrowException() throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + MailjetResponse mockResponse = createMock(MailjetResponse.class); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))).andReturn(mockResponse); + expect(mockResponse.getStatus()).andReturn(500); + + replay(mockMailjetClient, mockResponse); + + // Act & Assert + assertThrows(MailjetException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + + @Test + void bulkSyncUsers_WithCommunicationError_ShouldThrowCommunicationException() + throws MailjetException { + // Arrange + List users = List.of( + new UserExternalAccountChanges( + 1L, null, "test@example.com", Role.STUDENT, "John", false, + EmailVerificationStatus.VERIFIED, true, true, "GCSE" + ) + ); + + MailjetClientCommunicationException commException = + new MailjetClientCommunicationException("Connection timeout"); + + expect(mockMailjetClient.post(anyObject(MailjetRequest.class))) + .andThrow(commException); + + replay(mockMailjetClient); + + // Act & Assert + assertThrows(MailjetClientCommunicationException.class, () -> + mailJetApiClientWrapper.bulkSyncUsers(users, + MailJetSubscriptionAction.FORCE_SUBSCRIBE, MailJetSubscriptionAction.FORCE_SUBSCRIBE) + ); + } + } + private void injectMockMailjetClient(MailJetApiClientWrapper wrapper, MailjetClient client) { try { var field = MailJetApiClientWrapper.class.getDeclaredField("mailjetClient");