Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Upgraded `tar` to `^7.5.16`. [#1338](https://github.com/sourcebot-dev/sourcebot/pull/1338)
- Upgraded `esbuild` to `^0.28.1`. [#1342](https://github.com/sourcebot-dev/sourcebot/pull/1342)
- Enabled Next.js version skew protection to fix "Failed to load chunk" errors during rolling deploys. [#1346](https://github.com/sourcebot-dev/sourcebot/pull/1346)
- Re-indexed repositories whose database state is indexed but whose zoekt shard files are missing on worker startup. [#1350](https://github.com/sourcebot-dev/sourcebot/pull/1350)

## [5.0.3] - 2026-06-17

Expand Down
124 changes: 124 additions & 0 deletions packages/backend/src/repoIndexManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ vi.mock('@sourcebot/shared', () => ({
path: `/test-data/repos/${repo.id}`,
isReadOnly: false,
})),
getRepoIdFromPath: vi.fn((repoPath: string) => {
const repoId = Number(repoPath.split('/').at(-1));
return Number.isNaN(repoId) ? undefined : repoId;
}),
repoMetadataSchema: {
parse: vi.fn((metadata: unknown) => metadata ?? {}),
},
Expand All @@ -36,6 +40,7 @@ vi.mock('@sourcebot/shared', () => ({

vi.mock('./constants.js', () => ({
WORKER_STOP_GRACEFUL_TIMEOUT_MS: 5000,
REPOS_CACHE_DIR: 'test-data/repos',
INDEX_CACHE_DIR: 'test-data/index',
}));

Expand All @@ -56,6 +61,7 @@ vi.mock('./git.js', () => ({

vi.mock('./zoekt.js', () => ({
indexGitRepository: vi.fn().mockResolvedValue({ stdout: '', stderr: '' }),
cleanupTempShards: vi.fn(),
}));

vi.mock('./posthog.js', () => ({
Expand All @@ -64,6 +70,10 @@ vi.mock('./posthog.js', () => ({

vi.mock('./utils.js', () => ({
getAuthCredentialsForRepo: vi.fn().mockResolvedValue(null),
getRepoIdFromShardFileName: vi.fn((fileName: string) => {
const match = fileName.match(/^(\d+)_(\d+)_/);
return match ? Number(match[2]) : undefined;
}),
getShardPrefix: vi.fn((orgId: number, repoId: number) => `${orgId}_${repoId}`),
measure: vi.fn(async (cb: () => Promise<unknown>) => {
const data = await cb();
Expand Down Expand Up @@ -148,6 +158,7 @@ const createMockPrisma = () => {
repo: {
findMany: vi.fn().mockResolvedValue([]),
update: vi.fn(),
updateMany: vi.fn(),
delete: vi.fn(),
},
repoIndexingJob: {
Expand Down Expand Up @@ -696,6 +707,119 @@ describe('RepoIndexManager', () => {
});
});

describe('Startup Reconciliation', () => {
test('marks indexed repos with missing shard files stale and queues reindex jobs', async () => {
const staleRepo = createMockRepo({
id: 42,
orgId: 7,
name: 'missing-shards',
indexedAt: new Date('2026-01-01T00:00:00Z'),
indexedCommitHash: 'missing123',
});
const healthyRepo = createMockRepo({
id: 43,
orgId: 7,
name: 'has-shards',
indexedAt: new Date('2026-01-01T00:00:00Z'),
indexedCommitHash: 'healthy123',
});

(existsSync as Mock).mockReturnValue(true);
(readdir as Mock).mockImplementation(async (dir: string) => {
if (dir === 'test-data/repos') {
return [];
}
if (dir === 'test-data/index') {
return [
'7_43_main.zoekt',
'7_42_main.zoekt.tmp',
];
}
return [];
});
(mockPrisma.repo.findMany as Mock)
.mockResolvedValueOnce([{ id: 42 }, { id: 43 }])
.mockResolvedValueOnce([staleRepo, healthyRepo]);
(mockPrisma.repo.updateMany as Mock).mockResolvedValue({ count: 1 });
(mockPrisma.repoIndexingJob.createManyAndReturn as Mock).mockResolvedValue([
{
id: 'reindex-job-42',
repo: staleRepo,
},
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);

await manager.startScheduler();

expect(mockPrisma.repo.findMany).toHaveBeenNthCalledWith(2, expect.objectContaining({
where: expect.objectContaining({
NOT: {
jobs: {
some: {
AND: expect.arrayContaining([
{
type: RepoIndexingJobType.INDEX,
},
{
status: {
in: [
RepoIndexingJobStatus.PENDING,
RepoIndexingJobStatus.IN_PROGRESS,
],
},
},
{
OR: [
{ createdAt: { gt: expect.any(Date) } },
{ updatedAt: { gt: expect.any(Date) } },
],
},
]),
},
},
},
}),
}));

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: {
id: {
in: [42],
},
},
data: {
indexedAt: null,
indexedCommitHash: null,
latestIndexingJobStatus: RepoIndexingJobStatus.PENDING,
},
});

expect(mockPrisma.repoIndexingJob.createManyAndReturn).toHaveBeenCalledWith({
data: [
{
type: RepoIndexingJobType.INDEX,
repoId: 42,
},
],
include: {
repo: true,
},
});

expect(mockQueueAdd).toHaveBeenCalledWith(
'repo-index-job',
{
jobId: 'reindex-job-42',
type: RepoIndexingJobType.INDEX,
repoName: staleRepo.name,
repoId: staleRepo.id,
},
{ jobId: 'reindex-job-42' }
);
});
});

describe('Cleanup Jobs', () => {
test('deletes repo directory and index shards', async () => {
const repo = createMockRepoWithConnections({ id: 5, orgId: 2 });
Expand Down
80 changes: 79 additions & 1 deletion packages/backend/src/repoIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const LOG_TAG = 'repo-index-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
const QUEUE_NAME = 'repo-index-queue';
const STALE_REPO_UPDATE_BATCH_SIZE = 500;

type JobPayload = {
type: 'INDEX' | 'CLEANUP';
Expand Down Expand Up @@ -98,8 +99,9 @@ export class RepoIndexManager {

public async startScheduler() {
logger.debug('Starting scheduler');
// Cleanup any orphaned disk resources on startup
// Reconcile DB and disk state on startup before scheduling new work.
await this.cleanupOrphanedDiskResources();
await this.scheduleMissingShardReindexJobs();
this.interval = setIntervalAsync(async () => {
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
Expand Down Expand Up @@ -747,6 +749,82 @@ export class RepoIndexManager {
}
}

private async scheduleMissingShardReindexJobs() {
const timeoutDate = new Date(Date.now() - this.settings.repoIndexTimeoutMs);
const repoIdsWithShards = new Set<number>();

if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
for (const entry of entries) {
if (!entry.endsWith('.zoekt') || entry.includes('.tmp')) {
continue;
}

const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
}

const indexedRepos = await this.db.repo.findMany({
where: {
indexedAt: { not: null },
indexedCommitHash: { not: null },
NOT: {
jobs: {
some: {
AND: [
{
type: RepoIndexingJobType.INDEX,
},
{
status: {
in: [
RepoIndexingJobStatus.PENDING,
RepoIndexingJobStatus.IN_PROGRESS,
]
},
},
{
OR: [
{ createdAt: { gt: timeoutDate } },
{ updatedAt: { gt: timeoutDate } },
],
},
],
}
}
}
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

const reposMissingShards = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
if (reposMissingShards.length === 0) {
return;
}

logger.warn(`Found ${reposMissingShards.length} indexed repo(s) with missing zoekt shard files. Marking stale and scheduling reindex jobs.`);

for (let i = 0; i < reposMissingShards.length; i += STALE_REPO_UPDATE_BATCH_SIZE) {
const batch = reposMissingShards.slice(i, i + STALE_REPO_UPDATE_BATCH_SIZE);
await this.db.repo.updateMany({
where: {
id: {
in: batch.map(repo => repo.id),
},
},
data: {
indexedAt: null,
indexedCommitHash: null,
latestIndexingJobStatus: RepoIndexingJobStatus.PENDING,
},
});
}

await this.createJobs(reposMissingShards, RepoIndexingJobType.INDEX);
}

public async dispose() {
if (this.interval) {
clearInterval(this.interval);
Expand Down