From 2ff074427b686fe60e2283e0b4822675e668f2b9 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 8 Jun 2026 22:03:17 -0700 Subject: [PATCH] perf(indexer): cap aggregate score update cadence --- indexer/aggregates_calculator.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/indexer/aggregates_calculator.go b/indexer/aggregates_calculator.go index 5a54551e..c760b57b 100644 --- a/indexer/aggregates_calculator.go +++ b/indexer/aggregates_calculator.go @@ -20,6 +20,8 @@ type AggregatesCalculator struct { updateAggregatesJob *jobs.UpdateAggregatesJob } +const aggregateScoreUpdateInterval = 10 * time.Minute + func NewAggregatesCalculator(config config.Config) *AggregatesCalculator { logger := logging.NewZapLogger(config).Named("AggregatesCalculator") readPool, err := dbv1.NewDBPools([]string{config.ReadDbUrl}, logger, config.Env, config.ZapLevel) @@ -46,7 +48,8 @@ func NewAggregatesCalculator(config config.Config) *AggregatesCalculator { func (a *AggregatesCalculator) Start(ctx context.Context) error { a.logger.Info("Starting aggregates calculator") go logging.SyncOnTicks(ctx, a.logger, time.Second*10) - // This job runs in a continous loop until the context is cancelled. + // This job scans all scoreable users. Keep the cadence bounded so a + // completed full pass does not immediately start another full pass. for { select { case <-ctx.Done(): @@ -55,6 +58,20 @@ func (a *AggregatesCalculator) Start(ctx context.Context) error { default: a.updateAggregatesJob.Run(ctx) } + + timer := time.NewTimer(aggregateScoreUpdateInterval) + select { + case <-ctx.Done(): + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + a.logger.Info("Shutting down aggregates calculator") + return ctx.Err() + case <-timer.C: + } } }