From 2088289daf31dbc78936f5801772b2842fe2877a Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Thu, 18 Jun 2026 15:32:41 -0400 Subject: [PATCH 1/3] fix(worker): reindex repos with missing zoekt shards --- packages/backend/src/repoIndexManager.test.ts | 94 +++++++++++++++++++ packages/backend/src/repoIndexManager.ts | 67 ++++++++++++- 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/repoIndexManager.test.ts b/packages/backend/src/repoIndexManager.test.ts index 684f1a826..c74349d37 100644 --- a/packages/backend/src/repoIndexManager.test.ts +++ b/packages/backend/src/repoIndexManager.test.ts @@ -26,6 +26,10 @@ vi.mock('@sourcebot/shared', () => ({ path: `/test-data/repos/${repo.id}`, isReadOnly: false, })), + getRepoIdFromPath: vi.fn((repoPath: string) => { + const repoId = Number(repoPath.split('/').at(-1)); + return Number.isNaN(repoId) ? undefined : repoId; + }), repoMetadataSchema: { parse: vi.fn((metadata: unknown) => metadata ?? {}), }, @@ -36,6 +40,7 @@ vi.mock('@sourcebot/shared', () => ({ vi.mock('./constants.js', () => ({ WORKER_STOP_GRACEFUL_TIMEOUT_MS: 5000, + REPOS_CACHE_DIR: 'test-data/repos', INDEX_CACHE_DIR: 'test-data/index', })); @@ -56,6 +61,7 @@ vi.mock('./git.js', () => ({ vi.mock('./zoekt.js', () => ({ indexGitRepository: vi.fn().mockResolvedValue({ stdout: '', stderr: '' }), + cleanupTempShards: vi.fn(), })); vi.mock('./posthog.js', () => ({ @@ -64,6 +70,10 @@ vi.mock('./posthog.js', () => ({ vi.mock('./utils.js', () => ({ getAuthCredentialsForRepo: vi.fn().mockResolvedValue(null), + getRepoIdFromShardFileName: vi.fn((fileName: string) => { + const match = fileName.match(/^(\d+)_(\d+)_/); + return match ? Number(match[2]) : undefined; + }), getShardPrefix: vi.fn((orgId: number, repoId: number) => `${orgId}_${repoId}`), measure: vi.fn(async (cb: () => Promise) => { const data = await cb(); @@ -148,6 +158,7 @@ const createMockPrisma = () => { repo: { findMany: vi.fn().mockResolvedValue([]), update: vi.fn(), + updateMany: vi.fn(), delete: vi.fn(), }, repoIndexingJob: { @@ -696,6 +707,89 @@ describe('RepoIndexManager', () => { }); }); + describe('Startup Reconciliation', () => { + test('marks indexed repos with missing shard files stale and queues reindex jobs', async () => { + const staleRepo = createMockRepo({ + id: 42, + orgId: 7, + name: 'missing-shards', + indexedAt: new Date('2026-01-01T00:00:00Z'), + indexedCommitHash: 'missing123', + }); + const healthyRepo = createMockRepo({ + id: 43, + orgId: 7, + name: 'has-shards', + indexedAt: new Date('2026-01-01T00:00:00Z'), + indexedCommitHash: 'healthy123', + }); + + (existsSync as Mock).mockReturnValue(true); + (readdir as Mock).mockImplementation(async (dir: string) => { + if (dir === 'test-data/repos') { + return []; + } + if (dir === 'test-data/index') { + return [ + '7_43_main.zoekt', + '7_42_main.zoekt.tmp', + ]; + } + return []; + }); + (mockPrisma.repo.findMany as Mock) + .mockResolvedValueOnce([{ id: 42 }, { id: 43 }]) + .mockResolvedValueOnce([staleRepo, healthyRepo]); + (mockPrisma.repo.updateMany as Mock).mockResolvedValue({ count: 1 }); + (mockPrisma.repoIndexingJob.createManyAndReturn as Mock).mockResolvedValue([ + { + id: 'reindex-job-42', + repo: staleRepo, + }, + ]); + + manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any); + + await manager.startScheduler(); + + expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({ + where: { + id: { + in: [42], + }, + }, + data: { + indexedAt: null, + indexedCommitHash: null, + latestIndexingJobStatus: RepoIndexingJobStatus.PENDING, + }, + }); + + expect(mockPrisma.repoIndexingJob.createManyAndReturn).toHaveBeenCalledWith({ + data: [ + { + type: RepoIndexingJobType.INDEX, + repoId: 42, + }, + ], + include: { + repo: true, + }, + }); + + expect(mockQueueAdd).toHaveBeenCalledWith( + 'repo-index-job', + { + jobId: 'reindex-job-42', + type: RepoIndexingJobType.INDEX, + repoName: staleRepo.name, + repoId: staleRepo.id, + }, + { jobId: 'reindex-job-42' } + ); + }); + }); + describe('Cleanup Jobs', () => { test('deletes repo directory and index shards', async () => { const repo = createMockRepoWithConnections({ id: 5, orgId: 2 }); diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index aea1291dc..ea78bfd44 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -19,6 +19,7 @@ const LOG_TAG = 'repo-index-manager'; const logger = createLogger(LOG_TAG); const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`); const QUEUE_NAME = 'repo-index-queue'; +const STALE_REPO_UPDATE_BATCH_SIZE = 500; type JobPayload = { type: 'INDEX' | 'CLEANUP'; @@ -98,8 +99,9 @@ export class RepoIndexManager { public async startScheduler() { logger.debug('Starting scheduler'); - // Cleanup any orphaned disk resources on startup + // Reconcile DB and disk state on startup before scheduling new work. await this.cleanupOrphanedDiskResources(); + await this.scheduleMissingShardReindexJobs(); this.interval = setIntervalAsync(async () => { await this.scheduleIndexJobs(); await this.scheduleCleanupJobs(); @@ -747,6 +749,69 @@ export class RepoIndexManager { } } + private async scheduleMissingShardReindexJobs() { + const repoIdsWithShards = new Set(); + + if (existsSync(INDEX_CACHE_DIR)) { + const entries = await readdir(INDEX_CACHE_DIR); + for (const entry of entries) { + if (!entry.endsWith('.zoekt') || entry.includes('.tmp')) { + continue; + } + + const repoId = getRepoIdFromShardFileName(entry); + if (repoId !== undefined) { + repoIdsWithShards.add(repoId); + } + } + } + + const indexedRepos = await this.db.repo.findMany({ + where: { + indexedAt: { not: null }, + indexedCommitHash: { not: null }, + NOT: { + jobs: { + some: { + type: RepoIndexingJobType.INDEX, + status: { + in: [ + RepoIndexingJobStatus.PENDING, + RepoIndexingJobStatus.IN_PROGRESS, + ] + } + } + } + } + }, + }); + + const reposMissingShards = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id)); + if (reposMissingShards.length === 0) { + return; + } + + logger.warn(`Found ${reposMissingShards.length} indexed repo(s) with missing zoekt shard files. Marking stale and scheduling reindex jobs.`); + + for (let i = 0; i < reposMissingShards.length; i += STALE_REPO_UPDATE_BATCH_SIZE) { + const batch = reposMissingShards.slice(i, i + STALE_REPO_UPDATE_BATCH_SIZE); + await this.db.repo.updateMany({ + where: { + id: { + in: batch.map(repo => repo.id), + }, + }, + data: { + indexedAt: null, + indexedCommitHash: null, + latestIndexingJobStatus: RepoIndexingJobStatus.PENDING, + }, + }); + } + + await this.createJobs(reposMissingShards, RepoIndexingJobType.INDEX); + } + public async dispose() { if (this.interval) { clearInterval(this.interval); From 5f268f1682cbf3027c0b8ecd13668c9e1a38d217 Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Thu, 18 Jun 2026 15:33:19 -0400 Subject: [PATCH 2/3] chore: add changelog entry for shard reindex fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2dd6795c..b2dd55019 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Upgraded `tar` to `^7.5.16`. [#1338](https://github.com/sourcebot-dev/sourcebot/pull/1338) - Upgraded `esbuild` to `^0.28.1`. [#1342](https://github.com/sourcebot-dev/sourcebot/pull/1342) - Enabled Next.js version skew protection to fix "Failed to load chunk" errors during rolling deploys. [#1346](https://github.com/sourcebot-dev/sourcebot/pull/1346) +- Re-indexed repositories whose database state is indexed but whose zoekt shard files are missing on worker startup. [#1350](https://github.com/sourcebot-dev/sourcebot/pull/1350) ## [5.0.3] - 2026-06-17 From b24f26f21f4849aec7e44beb6fc64b9a7c0b03a5 Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Thu, 18 Jun 2026 17:16:49 -0400 Subject: [PATCH 3/3] fix(worker): ignore stale active jobs during shard recovery --- packages/backend/src/repoIndexManager.test.ts | 30 +++++++++++++++++++ packages/backend/src/repoIndexManager.ts | 27 ++++++++++++----- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/packages/backend/src/repoIndexManager.test.ts b/packages/backend/src/repoIndexManager.test.ts index c74349d37..766aa8f4d 100644 --- a/packages/backend/src/repoIndexManager.test.ts +++ b/packages/backend/src/repoIndexManager.test.ts @@ -752,6 +752,36 @@ describe('RepoIndexManager', () => { await manager.startScheduler(); + expect(mockPrisma.repo.findMany).toHaveBeenNthCalledWith(2, expect.objectContaining({ + where: expect.objectContaining({ + NOT: { + jobs: { + some: { + AND: expect.arrayContaining([ + { + type: RepoIndexingJobType.INDEX, + }, + { + status: { + in: [ + RepoIndexingJobStatus.PENDING, + RepoIndexingJobStatus.IN_PROGRESS, + ], + }, + }, + { + OR: [ + { createdAt: { gt: expect.any(Date) } }, + { updatedAt: { gt: expect.any(Date) } }, + ], + }, + ]), + }, + }, + }, + }), + })); + expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({ where: { id: { diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index ea78bfd44..a1f82323c 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -750,6 +750,7 @@ export class RepoIndexManager { } private async scheduleMissingShardReindexJobs() { + const timeoutDate = new Date(Date.now() - this.settings.repoIndexTimeoutMs); const repoIdsWithShards = new Set(); if (existsSync(INDEX_CACHE_DIR)) { @@ -773,13 +774,25 @@ export class RepoIndexManager { NOT: { jobs: { some: { - type: RepoIndexingJobType.INDEX, - status: { - in: [ - RepoIndexingJobStatus.PENDING, - RepoIndexingJobStatus.IN_PROGRESS, - ] - } + AND: [ + { + type: RepoIndexingJobType.INDEX, + }, + { + status: { + in: [ + RepoIndexingJobStatus.PENDING, + RepoIndexingJobStatus.IN_PROGRESS, + ] + }, + }, + { + OR: [ + { createdAt: { gt: timeoutDate } }, + { updatedAt: { gt: timeoutDate } }, + ], + }, + ], } } }