From 78f5ce0d04e846136612446f78a830fea9b394d6 Mon Sep 17 00:00:00 2001 From: gitjiho Date: Tue, 5 Aug 2025 01:16:35 +0900 Subject: [PATCH 1/9] =?UTF-8?q?Remove:=20=EB=A1=9C=EA=B9=85=EC=9A=A9=20?= =?UTF-8?q?=EC=9E=90=EB=A3=8C=ED=98=95=EC=9C=BC=EB=A1=9C=20=EC=95=88?= =?UTF-8?q?=EC=A0=84=ED=95=9CAtomicInteger=20=EC=82=AC=EC=9A=A9=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20=EC=9D=B8=ED=95=9C=20=EA=B8=B0=EC=A1=B4=20=EB=A1=9C?= =?UTF-8?q?=EA=B9=85=20dto=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/dto/BatchProcessingResult.java | 36 ------------------- 1 file changed, 36 deletions(-) delete mode 100644 src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java diff --git a/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java b/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java deleted file mode 100644 index 332e499a..00000000 --- a/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.newzet.api.article.repository.batch.dto; - -import lombok.Data; - -@Data -public class BatchProcessingResult { - private int successCount; - private int failCount; - private int duplicateCount; - private int cacheHitCount; - private int batchDuplicateCount; - - public void incrementFailCount() { - this.failCount++; - } - - public void incrementFailCount(int count) { - this.failCount += count; - } - - public void incrementDuplicateCount() { - this.duplicateCount++; - } - - public void addToDuplicateCount(int count) { - this.duplicateCount += count; - } - - public void incrementCacheHitCount() { - this.cacheHitCount++; - } - - public void incrementBatchDuplicateCount(int count) { - this.batchDuplicateCount += count; - } -} From db4e9517541cb2a194bbf488da586e539615873a Mon Sep 17 00:00:00 2001 From: gitjiho Date: Tue, 5 Aug 2025 01:17:02 +0900 Subject: [PATCH 2/9] =?UTF-8?q?Fix:=20ArticleEntity=20=EA=B0=9D=EC=B2=B4?= =?UTF-8?q?=EB=A5=BC=20=EB=A7=8C=EB=93=A4=20=EB=95=8C=20ImageUrl=EC=9D=84?= =?UTF-8?q?=20=ED=8F=AC=ED=95=A8=ED=95=98=EB=8F=84=EB=A1=9D=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../newzet/api/article/repository/entity/ArticleEntity.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/newzet/api/article/repository/entity/ArticleEntity.java b/src/main/java/com/newzet/api/article/repository/entity/ArticleEntity.java index 86d511fa..63ff9891 100644 --- a/src/main/java/com/newzet/api/article/repository/entity/ArticleEntity.java +++ b/src/main/java/com/newzet/api/article/repository/entity/ArticleEntity.java @@ -74,6 +74,7 @@ public static ArticleEntity fromEntityDto(ArticleEntityDto dto) { .fromDomain(dto.getFromDomain()) .mailingList(dto.getMailingList()) .title(dto.getTitle()) + .imageUrl(dto.getImageUrl()) .contentUrl(dto.getContentUrl()) .isRead(dto.isRead()) .isLike(dto.isLike()) @@ -111,4 +112,4 @@ public void readArticle() { public void updateLike(boolean newLike) { this.isLike = newLike; } -} \ No newline at end of file +} From 6f58a058e10de3e463d2a89fd7369d1602bf5717 Mon Sep 17 00:00:00 2001 From: gitjiho Date: Tue, 5 Aug 2025 01:17:50 +0900 Subject: [PATCH 3/9] =?UTF-8?q?Chore:=20Redis=20=EC=97=B0=EA=B2=B0=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EC=95=88=EC=A0=95=EC=A0=81=EC=9D=B8=20?= =?UTF-8?q?=EB=8F=99=EC=8B=9C=EC=84=B1=20=EC=B2=98=EB=A6=AC=EB=A5=BC=20?= =?UTF-8?q?=EC=9C=84=ED=95=9C=20=EC=B5=9C=EC=A0=81=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../newzet/api/config/cache/RedisConfig.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/newzet/api/config/cache/RedisConfig.java b/src/main/java/com/newzet/api/config/cache/RedisConfig.java index 33010e62..ca1d9e41 100644 --- a/src/main/java/com/newzet/api/config/cache/RedisConfig.java +++ b/src/main/java/com/newzet/api/config/cache/RedisConfig.java @@ -16,6 +16,8 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.SocketOptions; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; @Configuration public class RedisConfig { @@ -35,18 +37,31 @@ public LettuceConnectionFactory redisConnectionFactory() { serverConfig.setPassword(RedisPassword.of(password)); LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder() - .commandTimeout(Duration.ofMillis(500)) + .commandTimeout(Duration.ofMillis(1000)) .clientOptions(ClientOptions.builder() .autoReconnect(true) .socketOptions( - SocketOptions.builder().connectTimeout(Duration.ofMillis(1000)).build()) + SocketOptions.builder() + .connectTimeout(Duration.ofMillis(1000)) + .keepAlive(true) + .tcpNoDelay(true) + .build()) .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .build()) + .clientResources(clientResources()) .build(); return new LettuceConnectionFactory(serverConfig, clientConfig); } + @Bean(destroyMethod = "shutdown") + public ClientResources clientResources() { + return DefaultClientResources.builder() + .ioThreadPoolSize(8) + .computationThreadPoolSize(8) + .build(); + } + @Bean public RedisTemplate redisTemplate() { RedisTemplate redisTemplate = new RedisTemplate<>(); From c746456fa1071358fd5d2b786f4f9f24629cd3ea Mon Sep 17 00:00:00 2001 From: gitjiho Date: Tue, 5 Aug 2025 01:20:08 +0900 Subject: [PATCH 4/9] =?UTF-8?q?Refactor:=20Article=20=EB=B9=84=EB=8F=99?= =?UTF-8?q?=EA=B8=B0=20=EB=B0=B0=EC=B9=98=20Consumer=EC=97=90=EC=84=9C=20R?= =?UTF-8?q?edis=EB=A5=BC=20=ED=99=9C=EC=9A=A9=ED=95=9C=20=EC=A4=91?= =?UTF-8?q?=EB=B3=B5=EC=B2=98=EB=A6=AC=20=EC=A1=B0=ED=9A=8C=20multiGet?= =?UTF-8?q?=EC=9D=84=20=ED=86=B5=ED=95=9C=20=EB=B0=B0=EC=B9=98=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EB=B0=8F=20=EC=BA=90=EC=8B=9C=20=EC=97=85=EB=8D=B0?= =?UTF-8?q?=EC=9D=B4=ED=8A=B8=20=EB=B9=84=EB=8F=99=EA=B8=B0=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/ArticleRedisBatchConsumerImpl.java | 250 ++++++++++-------- 1 file changed, 145 insertions(+), 105 deletions(-) diff --git a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java index 7a2976a8..08d859f8 100644 --- a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; @@ -15,7 +15,6 @@ import com.newzet.api.article.business.dto.ArticleEntityDto; import com.newzet.api.article.business.repository.ArticleRepository; import com.newzet.api.article.domain.Article; -import com.newzet.api.article.repository.batch.dto.BatchProcessingResult; import com.newzet.api.article.repository.batch.dto.BatchSaveData; import com.newzet.api.common.batch.RedisBatchConsumer; import com.newzet.api.common.batch.config.BatchConfig; @@ -34,6 +33,7 @@ public class ArticleRedisBatchConsumerImpl extends RedisBatchConsumer
private static final String CONSUMER_NAME = "article-processor"; private static final String ARTICLE_DUPLICATE_CACHE_PREFIX = "article:dup:"; private static final long DUPLICATE_CACHE_TTL_MINUTES = 10; + private static final int REDIS_BATCH_SIZE = 50; private final ArticleRepository articleRepository; private final FcmSenderOrchestrator fcmSenderOrchestrator; @@ -77,154 +77,194 @@ protected Class
getItemClass() { @Override protected void processBatchItems(List
articles) { long startTime = System.currentTimeMillis(); - BatchProcessingResult result = new BatchProcessingResult(); - Map> keyToArticlesMap = prepareArticlesWithCacheKeys( - articles, result); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger duplicateCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + AtomicInteger cacheHitCount = new AtomicInteger(0); - if (keyToArticlesMap.isEmpty()) { - log.info("No articles to process"); - return; - } + try { + Map uniqueArticlesMap = removeBatchDuplicates(articles, + duplicateCount, failCount); + + if (uniqueArticlesMap.isEmpty()) { + log.info("No articles to process after batch deduplication"); + return; + } + + BatchSaveData saveData = identifyUniqueArticlesWithBatch(uniqueArticlesMap, + duplicateCount, cacheHitCount, failCount); + + List savedArticles = saveToDatabaseOptimized(saveData.toSave(), + successCount, failCount); - BatchSaveData saveData = identifyUniqueArticles(keyToArticlesMap, result); + updateRedisCacheAsync(saveData.toCache()); - saveToDatabaseAndUpdateCounters(saveData.toSave(), result); + sendFCMAsync(savedArticles); - updateRedisCache(saveData.toCache()); + long duration = System.currentTimeMillis() - startTime; + logBatchSummary(articles.size(), successCount.get(), duplicateCount.get(), + cacheHitCount.get(), failCount.get(), duration); - long duration = System.currentTimeMillis() - startTime; - logBatchSummary(articles.size(), result, duration); + } catch (Exception e) { + log.error("Article batch processing failed: {}", e.getMessage(), e); + failCount.addAndGet(articles.size()); + } } - private Map> prepareArticlesWithCacheKeys( - List
articles, BatchProcessingResult result) { - Map> keyToArticlesMap = new HashMap<>(); + private Map removeBatchDuplicates(List
articles, + AtomicInteger duplicateCount, AtomicInteger failCount) { + + Map uniqueArticlesMap = new HashMap<>(); for (Article article : articles) { try { ArticleEntityDto entityDto = ArticleEntityDto.fromDomain(article); + String cacheKey = generateOptimizedCacheKey(entityDto); - String cacheKey = generateSimpleCacheKey( - entityDto.getFromName(), - entityDto.getFromDomain(), - entityDto.getTitle(), - entityDto.getToUserId() - ); + if (uniqueArticlesMap.containsKey(cacheKey)) { + duplicateCount.incrementAndGet(); + } else { + uniqueArticlesMap.put(cacheKey, entityDto); + } - keyToArticlesMap.computeIfAbsent(cacheKey, k -> new ArrayList<>()) - .add(entityDto); } catch (Exception e) { - result.incrementFailCount(); - log.error("Failed to process article: {}, error: {}", article.getTitle(), - e.getMessage(), e); + failCount.incrementAndGet(); + log.error("Failed to process article: {}", article.getTitle(), e); } } - return keyToArticlesMap; + return uniqueArticlesMap; } - private BatchSaveData identifyUniqueArticles( - Map> keyToArticlesMap, BatchProcessingResult result) { + private BatchSaveData identifyUniqueArticlesWithBatch( + Map uniqueArticlesMap, + AtomicInteger duplicateCount, AtomicInteger cacheHitCount, AtomicInteger failCount) { + List toSave = new ArrayList<>(); Map toCache = new HashMap<>(); - for (Map.Entry> entry : keyToArticlesMap.entrySet()) { - String cacheKey = entry.getKey(); - List entitiesWithSameKey = entry.getValue(); + List cacheKeys = new ArrayList<>(uniqueArticlesMap.keySet()); - if (entitiesWithSameKey.size() > 1) { - int batchDuplicates = entitiesWithSameKey.size() - 1; - result.incrementBatchDuplicateCount(batchDuplicates); - log.info("BATCH DUPLICATES: {} articles with same key '{}'", - entitiesWithSameKey.size(), cacheKey); - } - - ArticleEntityDto entityDto = entitiesWithSameKey.get(0); - - String cachedValue = redisTemplate.opsForValue().get(cacheKey); + for (int i = 0; i < cacheKeys.size(); i += REDIS_BATCH_SIZE) { + int endIndex = Math.min(i + REDIS_BATCH_SIZE, cacheKeys.size()); + List batchKeys = cacheKeys.subList(i, endIndex); - if (cachedValue != null) { - result.incrementDuplicateCount(); - result.incrementCacheHitCount(); - log.info( - "REDIS DUPLICATE: '{}' with key '{}', counters: duplicate={}, cacheHit={}", - entityDto.getTitle(), cacheKey, result.getDuplicateCount(), - result.getCacheHitCount()); - } else { - toSave.add(entityDto); - markForCaching(toCache, cacheKey); + try { + List cachedValues = redisTemplate.opsForValue().multiGet(batchKeys); + + for (int j = 0; j < batchKeys.size(); j++) { + String cacheKey = batchKeys.get(j); + String cachedValue = cachedValues.get(j); + ArticleEntityDto entityDto = uniqueArticlesMap.get(cacheKey); + + if (cachedValue != null) { + duplicateCount.incrementAndGet(); + cacheHitCount.incrementAndGet(); + } else { + toSave.add(entityDto); + toCache.put(cacheKey, "1"); + } + } + } catch (Exception e) { + log.error("Redis batch query failed: {}", e.getMessage()); + failCount.addAndGet(batchKeys.size()); + // Redis 실패 시 모든 아티클을 저장 대상으로 (안전장치) + for (String key : batchKeys) { + toSave.add(uniqueArticlesMap.get(key)); + toCache.put(key, "1"); + } } } - result.addToDuplicateCount(result.getBatchDuplicateCount()); - return new BatchSaveData(toSave, toCache); } - private void saveToDatabaseAndUpdateCounters(List toSave, - BatchProcessingResult result) { - if (!toSave.isEmpty()) { - try { - List saved = articleRepository.saveAll(toSave); - result.setSuccessCount(saved.size()); - sendFCM(saved); // fcm 메시지 전송 배치처리 - } catch (Exception e) { - result.incrementFailCount(toSave.size()); - log.error("SAVE FAILED for {} articles: {}", toSave.size(), e.getMessage(), e); - } - } else { - log.info("No new articles to save (all are duplicates or failed)"); + private List saveToDatabaseOptimized(List toSave, + AtomicInteger successCount, AtomicInteger failCount) { + + if (toSave.isEmpty()) { + return List.of(); } - } - private void sendFCM(List saved) { - if (!saved.isEmpty()) { - for (ArticleEntityDto articleData : saved) { - fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch(articleData.getToUserId(), - articleData.getId(), articleData.getCreatedAt(), - articleData.getTitle(), articleData.getFromName()); - } + try { + List saved = articleRepository.saveAll(toSave); + successCount.addAndGet(saved.size()); + log.info("Successfully saved {} articles to database", saved.size()); + return saved; + } catch (Exception e) { + failCount.addAndGet(toSave.size()); + log.error("Database save failed for {} articles: {}", toSave.size(), e.getMessage()); + return List.of(); } } - private void updateRedisCache(Map toCache) { - if (!toCache.isEmpty()) { - for (Map.Entry entry : toCache.entrySet()) { - try { - String key = entry.getKey(); - redisTemplate.opsForValue().setIfAbsent( - key, entry.getValue(), Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES)); - } catch (Exception e) { - log.error("Cache update ERROR for key '{}': {}", entry.getKey(), - e.getMessage()); + private void updateRedisCacheAsync(Map toCache) { + if (toCache.isEmpty()) { + return; + } + + reactiveRedisTemplate.opsForValue() + .multiSet(toCache) + .doOnSuccess(result -> { + if (Boolean.TRUE.equals(result)) { + log.debug("Cache updated for {} keys", toCache.size()); } + }) + .doOnError(error -> log.error("Cache update failed: {}", error.getMessage())) + .subscribe(); + + for (String key : toCache.keySet()) { + reactiveRedisTemplate.expire(key, Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES)) + .doOnError( + error -> log.warn("TTL setting failed for key {}: {}", key, error.getMessage())) + .subscribe(); + } + } + + private void sendFCMAsync(List savedArticles) { + if (savedArticles.isEmpty()) { + return; + } + + for (ArticleEntityDto articleData : savedArticles) { + try { + fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch( + articleData.getToUserId(), + articleData.getId(), + articleData.getCreatedAt(), + articleData.getTitle(), + articleData.getFromName() + ); + } catch (Exception e) { + log.error("FCM send failed for article {}: {}", articleData.getId(), + e.getMessage()); } } } - private void logBatchSummary(int totalArticles, BatchProcessingResult result, long duration) { - log.info( - "ARTICLE BATCH SUMMARY: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms", - totalArticles, result.getSuccessCount(), result.getDuplicateCount(), - result.getCacheHitCount(), result.getFailCount(), duration); + private String generateOptimizedCacheKey(ArticleEntityDto entityDto) { + String fromName = normalizeString(entityDto.getFromName()); + String fromDomain = normalizeString(entityDto.getFromDomain()); + String title = normalizeString(entityDto.getTitle()); + String userId = entityDto.getToUserId().toString().substring(0, 8); + + return ARTICLE_DUPLICATE_CACHE_PREFIX + + fromName + ":" + + fromDomain + ":" + + userId + ":" + + Math.abs(title.hashCode()); } - private void markForCaching(Map cacheMap, String cacheKey) { - cacheMap.put(cacheKey, "1"); + private String normalizeString(String input) { + return input != null ? input.trim().toLowerCase() : ""; } - private String generateSimpleCacheKey(String fromName, String fromDomain, String title, - UUID toUserId) { - fromName = fromName != null ? fromName.trim().toLowerCase() : ""; - fromDomain = fromDomain != null ? fromDomain.trim().toLowerCase() : ""; - String normalizedTitle = title != null ? title.trim().toLowerCase() : ""; - String userIdStr = toUserId != null ? toUserId.toString() : "null"; + private void logBatchSummary(int totalArticles, int successCount, int duplicateCount, + int cacheHitCount, int failCount, long duration) { - return ARTICLE_DUPLICATE_CACHE_PREFIX + - fromName + "_" + - fromDomain + "_" + - userIdStr.substring(0, Math.min(8, userIdStr.length())) + "_" + - Math.abs(normalizedTitle.hashCode()); + log.info( + "ARTICLE BATCH: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms", + totalArticles, successCount, duplicateCount, cacheHitCount, failCount, duration); } } From 51ed06cf0b4239bcb7fe3cb9f171fd96e942568e Mon Sep 17 00:00:00 2001 From: gitjiho Date: Tue, 5 Aug 2025 01:21:48 +0900 Subject: [PATCH 5/9] =?UTF-8?q?Refactor:=20Fcm=20=EB=B9=84=EB=8F=99?= =?UTF-8?q?=EA=B8=B0=20=EB=B0=B0=EC=B9=98=20Consumer=EC=97=90=EC=84=9C=20f?= =?UTF-8?q?cm=20=EC=93=B0=EB=A0=88=EB=93=9C=20=ED=92=80=EC=9D=84=20?= =?UTF-8?q?=ED=99=9C=EC=9A=A9=ED=95=9C=20=EB=B3=91=EB=A0=AC=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EB=B0=8F=20=EC=A0=84=EC=86=A1=20=EB=A1=9C=EC=A7=81?= =?UTF-8?q?=20=EC=B5=9C=EC=A0=81=ED=99=94=EB=A1=9C=20=EC=84=B1=EB=8A=A5=20?= =?UTF-8?q?=ED=96=A5=EC=83=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jpa/batch/FcmRedisBatchConsumerImpl.java | 108 +++++++++++++++--- 1 file changed, 90 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java index 48fe22fe..70f40ef2 100644 --- a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java @@ -1,6 +1,11 @@ package com.newzet.api.fcm.jpa.batch; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; @@ -11,7 +16,6 @@ import com.newzet.api.common.objectMapper.OptionalObjectMapper; import com.newzet.api.fcm.business.batch.FcmBatchConsumer; import com.newzet.api.fcm.domain.FcmNotification; -import com.newzet.api.fcm.jpa.batch.dto.FcmBatchProcessingResult; import com.newzet.api.fcm.orchestrator.FcmSenderOrchestrator; import lombok.extern.slf4j.Slf4j; @@ -24,8 +28,11 @@ public class FcmRedisBatchConsumerImpl extends RedisBatchConsumer redisTemplate, ReactiveRedisTemplate reactiveRedisTemplate, @@ -34,6 +41,13 @@ public FcmRedisBatchConsumerImpl(RedisTemplate redisTemplate, FcmSenderOrchestrator fcmSenderOrchestrator) { super(redisTemplate, reactiveRedisTemplate, batchConfig, optionalObjectMapper); this.fcmSenderOrchestrator = fcmSenderOrchestrator; + + this.fcmExecutor = Executors.newFixedThreadPool(PARALLEL_THREADS, r -> { + Thread t = new Thread(r, "fcm-turbo-" + System.nanoTime()); + t.setDaemon(true); + t.setPriority(Thread.NORM_PRIORITY + 1); + return t; + }); } @Override @@ -64,38 +78,96 @@ protected Class getItemClass() { @Override protected void processBatchItems(List fcmNotifications) { long startTime = System.currentTimeMillis(); - FcmBatchProcessingResult result = new FcmBatchProcessingResult(); - for (FcmNotification fcmNotification : fcmNotifications) { - processSingleNotification(fcmNotification, result); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + AtomicInteger invalidTokenCount = new AtomicInteger(0); + + try { + log.info("Processing FCM batch: {} notifications", fcmNotifications.size()); + + processInParallelTurbo(fcmNotifications, successCount, failCount, invalidTokenCount); + + long duration = System.currentTimeMillis() - startTime; + logBatchSummary(fcmNotifications.size(), successCount.get(), failCount.get(), + invalidTokenCount.get(), duration); + + } catch (Exception e) { + log.error("FCM batch processing failed: {}", e.getMessage(), e); + failCount.addAndGet(fcmNotifications.size()); } + } + + private void processInParallelTurbo(List fcmNotifications, + AtomicInteger successCount, AtomicInteger failCount, AtomicInteger invalidTokenCount) { + + try { + CompletableFuture[] futures = fcmNotifications.stream() + .map(notification -> CompletableFuture.runAsync(() -> { + processSingleNotificationFast(notification, successCount, failCount, + invalidTokenCount); + }, fcmExecutor) + .orTimeout(5, TimeUnit.SECONDS)) + .toArray(CompletableFuture[]::new); - long duration = System.currentTimeMillis() - startTime; - logBatchSummary(fcmNotifications.size(), result, duration); + CompletableFuture.allOf(futures) + .get(BATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + } catch (Exception e) { + log.error("FCM processing failed: {}", e.getMessage()); + int totalProcessed = successCount.get() + failCount.get() + invalidTokenCount.get(); + int remainingFails = fcmNotifications.size() - totalProcessed; + if (remainingFails > 0) { + failCount.addAndGet(remainingFails); + log.warn("FCM batch incomplete: {} remaining marked as failed", remainingFails); + } + } } - private void processSingleNotification(FcmNotification fcmNotification, - FcmBatchProcessingResult result) { + private void processSingleNotificationFast(FcmNotification fcmNotification, + AtomicInteger successCount, AtomicInteger failCount, AtomicInteger invalidTokenCount) { + if (!fcmNotification.isValid()) { - result.incrementInvalidTokenCount(); - log.warn("Invalid FCM notification: userId={}, token={}", - fcmNotification.getUserId(), fcmNotification.getToken()); + invalidTokenCount.incrementAndGet(); return; } try { fcmSenderOrchestrator.send(fcmNotification); - result.incrementSuccessCount(); + successCount.incrementAndGet(); + } catch (Exception e) { - result.incrementFailCount(); + failCount.incrementAndGet(); + log.trace("FCM send failed for userId {}: {}", + fcmNotification.getUserId(), e.getMessage()); } } - private void logBatchSummary(int totalNotifications, FcmBatchProcessingResult result, - long duration) { + private void logBatchSummary(int totalNotifications, int successCount, int failCount, + int invalidTokenCount, long duration) { + log.info( - "FCM BATCH SUMMARY: total={}, success={}, failed={}, invalidToken={}, elapsed={}ms", - totalNotifications, result.getSuccessCount(), result.getFailCount(), - result.getInvalidTokenCount(), duration); + "FCM BATCH: total={}, success={}, failed={}, invalid={}, elapsed={}ms", + totalNotifications, successCount, failCount, invalidTokenCount, duration); + } + + @Override + public void onShutdown() { + super.onShutdown(); + if (fcmExecutor != null && !fcmExecutor.isShutdown()) { + log.info("Shutting down FCM executor with {} threads", PARALLEL_THREADS); + fcmExecutor.shutdown(); + try { + if (!fcmExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + fcmExecutor.shutdownNow(); + if (!fcmExecutor.awaitTermination(2, TimeUnit.SECONDS)) { + log.warn("FCM executor did not terminate gracefully"); + } + } + } catch (InterruptedException e) { + fcmExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } } From 84a5dd4995cfe52928e2ac9c30837c7d57c58f0e Mon Sep 17 00:00:00 2001 From: gitjiho Date: Fri, 8 Aug 2025 00:06:40 +0900 Subject: [PATCH 6/9] =?UTF-8?q?Chore:=20invaildToken=EC=97=90=20=EB=8C=80?= =?UTF-8?q?=ED=95=9C=20=EB=A1=9C=EA=B9=85=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java index 70f40ef2..a6fd3b06 100644 --- a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java @@ -128,6 +128,8 @@ private void processSingleNotificationFast(FcmNotification fcmNotification, AtomicInteger successCount, AtomicInteger failCount, AtomicInteger invalidTokenCount) { if (!fcmNotification.isValid()) { + log.warn("Invalid FCM notification: userId={}, token={}", + fcmNotification.getUserId(), fcmNotification.getToken()); invalidTokenCount.incrementAndGet(); return; } From 29961b61a15d4575c3d0925a506af8849fe05664 Mon Sep 17 00:00:00 2001 From: gitjiho Date: Thu, 28 Aug 2025 02:11:13 +0900 Subject: [PATCH 7/9] =?UTF-8?q?Feat:=20article=20=EB=A1=9C=EC=A7=81?= =?UTF-8?q?=EC=9D=98=20ack=20=EC=B2=98=EB=A6=AC=20=EC=A0=84=20=EB=B0=B0?= =?UTF-8?q?=EC=B9=98=EB=8B=A8=EC=9C=84=EC=9D=98=20=EC=9B=90=EC=9E=90?= =?UTF-8?q?=EC=A0=81=20=ED=8A=B8=EB=9E=9C=EC=9E=AD=EC=85=98=20=EB=B0=8F=20?= =?UTF-8?q?=EC=8B=A4=ED=8C=A8=EC=8B=9C=20ack=20=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=EB=B0=A9=EC=A7=80=EB=A5=BC=20=EC=9C=84=ED=95=9C=20try-catch?= =?UTF-8?q?=EB=AC=B8=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repository/ArticleRepositoryImpl.java | 6 +-- .../batch/ArticleRedisBatchConsumerImpl.java | 50 +++++++------------ 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java b/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java index 6cdb97eb..28931301 100644 --- a/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java +++ b/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java @@ -52,11 +52,7 @@ public List saveAll(List articleEntityDtoLis int processedArticles = 0; for (ArticleEntity entityToSave : entitiesToSave) { - try { - entityManager.persist(entityToSave); - } catch (Exception e) { - log.error("Failed to persist ArticleEntity: {}", entityToSave, e); - } + entityManager.persist(entityToSave); result.add(entityToSave.toEntityDto()); processedArticles++; if (processedArticles % BATCH_SIZE == 0) { diff --git a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java index 08d859f8..2f4ecd4e 100644 --- a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java @@ -83,33 +83,27 @@ protected void processBatchItems(List
articles) { AtomicInteger failCount = new AtomicInteger(0); AtomicInteger cacheHitCount = new AtomicInteger(0); - try { - Map uniqueArticlesMap = removeBatchDuplicates(articles, - duplicateCount, failCount); + Map uniqueArticlesMap = removeBatchDuplicates(articles, + duplicateCount, failCount); - if (uniqueArticlesMap.isEmpty()) { - log.info("No articles to process after batch deduplication"); - return; - } - - BatchSaveData saveData = identifyUniqueArticlesWithBatch(uniqueArticlesMap, - duplicateCount, cacheHitCount, failCount); + if (uniqueArticlesMap.isEmpty()) { + log.info("No articles to process after batch deduplication"); + return; + } - List savedArticles = saveToDatabaseOptimized(saveData.toSave(), - successCount, failCount); + BatchSaveData saveData = identifyUniqueArticlesWithBatch(uniqueArticlesMap, + duplicateCount, cacheHitCount, failCount); - updateRedisCacheAsync(saveData.toCache()); + List savedArticles = saveToDatabaseOptimized(saveData.toSave(), + successCount, failCount); - sendFCMAsync(savedArticles); + updateRedisCacheAsync(saveData.toCache()); - long duration = System.currentTimeMillis() - startTime; - logBatchSummary(articles.size(), successCount.get(), duplicateCount.get(), - cacheHitCount.get(), failCount.get(), duration); + sendFCMAsync(savedArticles); - } catch (Exception e) { - log.error("Article batch processing failed: {}", e.getMessage(), e); - failCount.addAndGet(articles.size()); - } + long duration = System.currentTimeMillis() - startTime; + logBatchSummary(articles.size(), successCount.get(), duplicateCount.get(), + cacheHitCount.get(), failCount.get(), duration); } private Map removeBatchDuplicates(List
articles, @@ -187,16 +181,10 @@ private List saveToDatabaseOptimized(List to return List.of(); } - try { - List saved = articleRepository.saveAll(toSave); - successCount.addAndGet(saved.size()); - log.info("Successfully saved {} articles to database", saved.size()); - return saved; - } catch (Exception e) { - failCount.addAndGet(toSave.size()); - log.error("Database save failed for {} articles: {}", toSave.size(), e.getMessage()); - return List.of(); - } + List saved = articleRepository.saveAll(toSave); + successCount.addAndGet(saved.size()); + log.info("Successfully saved {} articles to database", saved.size()); + return saved; } private void updateRedisCacheAsync(Map toCache) { From 5946633e06296bc76860ec9f1cf72c3402ecd00d Mon Sep 17 00:00:00 2001 From: gitjiho Date: Thu, 28 Aug 2025 02:12:05 +0900 Subject: [PATCH 8/9] =?UTF-8?q?Feat:=20=EB=B9=84=EB=8F=99=EA=B8=B0=20?= =?UTF-8?q?=EB=B0=B0=EC=B9=98=EC=B2=98=EB=A6=AC=20=EC=9E=AC=EC=8B=9C?= =?UTF-8?q?=EC=9E=91=20=EC=8B=9C=20consumer=20group=20=EB=82=B4=EB=B6=80?= =?UTF-8?q?=20Pending=20Entity=20List=EC=97=90=20=EB=82=A8=EC=95=84?= =?UTF-8?q?=EC=9E=88=EB=8A=94=20redis=20stream=20=EC=86=8C=EB=B9=84?= =?UTF-8?q?=EB=A5=BC=20=EC=9C=84=ED=95=9C=20=EB=A1=9C=EC=A7=81=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/common/batch/RedisBatchConsumer.java | 99 ++++++++++++++----- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java b/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java index a99f6bc7..117578c9 100644 --- a/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java +++ b/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java @@ -15,6 +15,7 @@ import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamReceiver; @@ -80,7 +81,14 @@ public void startProcessing() { if (isProcessing.compareAndSet(false, true)) { executorService = Executors.newSingleThreadExecutor(); ackExecutorService = Executors.newSingleThreadExecutor(); - executorService.submit(this::processBatchesAsync); + executorService.submit(() -> { + log.info("{} consumer starting... Draining pending messages first.", + getProcessorTypeName()); + drainPendingMessages(); + log.info("{} pending messages drained. Starting to listen for new messages.", + getProcessorTypeName()); + processBatchesAsync(); + }); log.info("{} batch processor started with configuration: size={}, timeout={}s", getProcessorTypeName(), batchConfig.getBatchSize(), batchConfig.getTimeoutSeconds()); @@ -102,8 +110,44 @@ public void stopProcessing() { } } + private void drainPendingMessages() { + try { + Consumer consumer = Consumer.from(getConsumerGroup(), getConsumerName()); + StreamOffset offset = StreamOffset.create(getStreamKey(), + ReadOffset.from("0-0")); + StreamReadOptions readOptions = StreamReadOptions.empty() + .count(batchConfig.getBatchSize()); + + while (isProcessing.get()) { + List> rawRecords = redisTemplate.opsForStream() + .read(consumer, readOptions, offset); + + if (rawRecords == null || rawRecords.isEmpty()) { + break; + } + + List> records = new ArrayList<>(); + for (MapRecord rawRecord : rawRecords) { + Map stringMap = new HashMap<>(); + for (Map.Entry entry : rawRecord.getValue().entrySet()) { + stringMap.put(String.valueOf(entry.getKey()), + String.valueOf(entry.getValue())); + } + records.add(MapRecord.create(rawRecord.getStream(), stringMap) + .withId(rawRecord.getId())); + } + + log.info("Processing {} pending messages from drain task.", records.size()); + processBatchWithAck(records); + } + } catch (Exception e) { + log.error("Error draining pending messages for {}: {}", getProcessorTypeName(), + e.getMessage(), e); + } + } + protected void processBatchesAsync() { - log.info("{} batch processing thread initialized", getProcessorTypeName()); + log.info("{} batch processing thread initialized for new messages", getProcessorTypeName()); try { StreamReceiver.StreamReceiverOptions> options = @@ -163,33 +207,36 @@ protected void processBatchWithAck(List> recor } } - processBatchItems(items); - - CompletableFuture.runAsync(() -> { - List ackedMessageIds = new ArrayList<>(); - - for (MapRecord record : records) { - String messageId = record.getId().getValue(); - try { - redisTemplate.opsForStream() - .acknowledge(getStreamKey(), getConsumerGroup(), messageId); - ackedMessageIds.add(messageId); - } catch (Exception e) { - log.warn("Ack failed for {} message {}, skipping for now: {}", - getProcessorTypeName(), messageId, e.getMessage()); + try { + processBatchItems(items); + CompletableFuture.runAsync(() -> { + List ackedMessageIds = new ArrayList<>(); + for (MapRecord record : records) { + try { + redisTemplate.opsForStream() + .acknowledge(getStreamKey(), getConsumerGroup(), record.getId()); + ackedMessageIds.add(record.getId().getValue()); + } catch (Exception e) { + log.warn("Ack failed for {} message {}, skipping for now: {}", + getProcessorTypeName(), record.getId(), e.getMessage()); + } } - } - if (!ackedMessageIds.isEmpty()) { - try { - redisTemplate.opsForStream() - .delete(getStreamKey(), ackedMessageIds.toArray(new String[0])); - } catch (Exception e) { - log.error("Failed to delete acked {} messages: {}", getProcessorTypeName(), - e.getMessage(), e); + if (!ackedMessageIds.isEmpty()) { + try { + redisTemplate.opsForStream() + .delete(getStreamKey(), ackedMessageIds.toArray(new String[0])); + } catch (Exception e) { + log.error("Failed to delete acked {} messages: {}", getProcessorTypeName(), + e.getMessage(), e); + } } - } - }, ackExecutorService); + }, ackExecutorService); + } catch (Exception e) { + log.error( + "{} batch processing failed. Messages will not be acknowledged and will be retried.", + getProcessorTypeName(), e); + } } @PreDestroy From 8cec6f97c1bf7e4b5e1431b27c20045a622101ff Mon Sep 17 00:00:00 2001 From: gitjiho Date: Thu, 28 Aug 2025 02:21:40 +0900 Subject: [PATCH 9/9] =?UTF-8?q?Test:=20=EC=97=86=EC=96=B4=EC=A7=84=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=EC=97=90=20=EB=A7=9E=EC=B6=B0=20=ED=85=8C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repository/ArticleRepositoryTest.java | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java b/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java index 6d199b51..1748cb98 100644 --- a/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java +++ b/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java @@ -146,21 +146,6 @@ void saveAll_WhenEmptyList_ThenReturnEmptyList() { verify(entityManager, never()).clear(); } - @Test - void saveAll_WhenExceptionOccurs_ThenShouldNotPropagateException() { - // Given - List dtos = createMockArticleDtos(3); - - doThrow(new RuntimeException("Test exception")).when(entityManager) - .persist(any(ArticleEntity.class)); - - // When - List result = articleRepository.saveAll(dtos); - - // Then - assertThat(result).hasSize(3); - } - @Test void saveAll_WhenProcessingNullFields_ThenHandleGracefully() { // Given @@ -183,26 +168,6 @@ void saveAll_WhenProcessingNullFields_ThenHandleGracefully() { assertThat(capturedEntity.getFromDomain()).isNull(); } - @Test - void saveAll_WhenPersistThrowsException_ShouldContinueSavingOthers() { - // Given - List dtos = createMockArticleDtos(3); - - doThrow(new RuntimeException("persist fail")) - .doNothing() - .doNothing() - .when(entityManager).persist(any(ArticleEntity.class)); - - // When - List result = articleRepository.saveAll(dtos); - - // Then - assertThat(result).hasSize(3); - verify(entityManager, times(3)).persist(any(ArticleEntity.class)); - verify(entityManager, atLeast(1)).flush(); - verify(entityManager, atLeast(1)).clear(); - } - private List createMockArticleDtos(int count) { return IntStream.range(0, count) .mapToObj(i -> Article.createNewArticle(