99 "os"
1010 "strings"
1111 "sync"
12+ "sync/atomic"
1213 "time"
1314 "unicode/utf8"
1415
@@ -31,6 +32,9 @@ type ScoringEngine struct {
3132 CurrentRoundStartTime time.Time
3233 RedisClient * redis.Client
3334
35+ // Concurrency control for materialized view refresh
36+ Refreshing atomic.Bool
37+
3438 // Config update handling
3539 configPath string
3640}
@@ -574,10 +578,16 @@ func (se *ScoringEngine) processCollectedResults(results []checks.Result) {
574578
575579 slog .Debug ("Successfully processed results for round" , "round" , se .CurrentRound , "total" , len (dbResults ))
576580
577- // Refresh materialized view asynchronously
578- go func () {
579- if err := db .RefreshScoresMaterializedView (); err != nil {
580- slog .Error ("failed to refresh materialized view" , "round" , se .CurrentRound , "error" , err )
581- }
582- }()
581+ // Refresh materialized view asynchronously, but avoid concurrent refreshes
582+ currentRound := se .CurrentRound
583+ if se .Refreshing .CompareAndSwap (false , true ) {
584+ go func (round uint ) {
585+ defer se .Refreshing .Store (false )
586+ if err := db .RefreshScoresMaterializedView (); err != nil {
587+ slog .Error ("failed to refresh materialized view" , "round" , round , "error" , err )
588+ }
589+ }(currentRound )
590+ } else {
591+ slog .Debug ("refresh already in progress, skipping refresh spawn" , "round" , currentRound )
592+ }
583593}
0 commit comments