Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ export const env = createEnv({
TABLE_SNAPSHOT_CACHE: z.boolean().optional(), // Mount tables into sandboxes by reference via a version-keyed CSV snapshot in object storage instead of draining the whole table into web-process heap
PII_REDACTION: z.boolean().optional(), // Redact PII from workflow logs via configurable Data Retention rules (Presidio at the logger persist choke point) and expose the Data Retention config UI
TRIGGER_EU_REGION: z.boolean().optional(), // Route Trigger.dev runs to eu-central-1 instead of the default us-east-1 (fallback for the trigger-eu-region flag when AppConfig is not the source of truth)
REDIS_PROGRESS_MARKERS: z.boolean().optional(), // Write per-block live progress markers to Redis instead of jsonb_set UPDATEs on workflow_execution_logs (fallback for the redis-progress-markers flag when AppConfig is not the source of truth)

// Table feature limits (per plan). Apply when billing is disabled (free tier defaults) or for billed plans.
FREE_TABLES_LIMIT: z.number().optional(), // Max user tables per workspace on free tier (default: 5)
Expand Down
8 changes: 0 additions & 8 deletions apps/sim/lib/core/config/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ const FEATURE_FLAGS = {
'resolveTriggerRegion, so the whole deployment switches regions together.',
fallback: 'TRIGGER_EU_REGION',
},
'redis-progress-markers': {
description:
'Write per-block live progress markers (lastStartedBlock/lastCompletedBlock) to Redis ' +
'instead of jsonb_set UPDATEs on workflow_execution_logs, folding them into the single ' +
'terminal UPDATE at completion. Eliminates the heaviest write query. Resolved once per ' +
'logging session (no user/org context) so an execution never mixes write paths.',
fallback: 'REDIS_PROGRESS_MARKERS',
},
'workspace-forking': {
description:
'Runtime rollout gate for workspace forking (fork/promote/rollback), layered on top of ' +
Expand Down
4 changes: 1 addition & 3 deletions apps/sim/lib/logs/execution/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,6 @@ export class ExecutionLogger implements IExecutionLoggerService {
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
readProgressMarkers?: boolean
}): Promise<WorkflowExecutionLog> {
const {
executionId,
Expand All @@ -695,7 +694,6 @@ export class ExecutionLogger implements IExecutionLoggerService {
isResume,
level: levelOverride,
status: statusOverride,
readProgressMarkers = true,
} = params

let execLog = logger.withMetadata({ executionId })
Expand Down Expand Up @@ -753,7 +751,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
models: costSummary.models,
}

const progressMarkers = readProgressMarkers ? await getProgressMarkers(executionId) : null
const progressMarkers = await getProgressMarkers(executionId)

const builtExecutionData = this.buildCompletedExecutionData({
existingExecutionData,
Expand Down
54 changes: 2 additions & 52 deletions apps/sim/lib/logs/execution/logging-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,17 @@ vi.mock('@/lib/logs/execution/logger', () => ({
}))

const {
isFeatureEnabledMock,
setLastStartedBlockMock,
setLastCompletedBlockMock,
getProgressMarkersMock,
clearProgressMarkersMock,
} = vi.hoisted(() => ({
isFeatureEnabledMock: vi.fn().mockResolvedValue(false),
setLastStartedBlockMock: vi.fn().mockResolvedValue(false),
setLastCompletedBlockMock: vi.fn().mockResolvedValue(false),
getProgressMarkersMock: vi.fn().mockResolvedValue({}),
clearProgressMarkersMock: vi.fn().mockResolvedValue(undefined),
}))

vi.mock('@/lib/core/config/feature-flags', () => ({
isFeatureEnabled: isFeatureEnabledMock,
}))

vi.mock('@/lib/logs/execution/progress-markers', () => ({
setLastStartedBlock: setLastStartedBlockMock,
setLastCompletedBlock: setLastCompletedBlockMock,
Expand Down Expand Up @@ -720,8 +714,7 @@ describe('LoggingSession progress-marker write path', () => {
dbMocks.execute.mockResolvedValue(undefined)
})

it('writes markers to Redis (not the row) when the flag is on and Redis accepts the write', async () => {
isFeatureEnabledMock.mockResolvedValue(true)
it('writes markers to Redis (not the row) when Redis accepts the write', async () => {
setLastStartedBlockMock.mockResolvedValue(true)
setLastCompletedBlockMock.mockResolvedValue(true)
const session = new LoggingSession('wf-1', 'exec-redis', 'manual', 'req-1')
Expand All @@ -741,8 +734,7 @@ describe('LoggingSession progress-marker write path', () => {
expect(dbMocks.execute).not.toHaveBeenCalled()
})

it('falls back to the SQL UPDATE when the flag is on but the Redis write fails', async () => {
isFeatureEnabledMock.mockResolvedValue(true)
it('falls back to the SQL UPDATE when the Redis write fails', async () => {
setLastStartedBlockMock.mockResolvedValue(false)
const session = new LoggingSession('wf-1', 'exec-redis-down', 'manual', 'req-1')
await session.start({ workspaceId: 'ws-1' })
Expand All @@ -752,46 +744,4 @@ describe('LoggingSession progress-marker write path', () => {
expect(setLastStartedBlockMock).toHaveBeenCalled()
expect(dbMocks.execute).toHaveBeenCalledTimes(1)
})

it('writes markers via jsonb_set UPDATE when the flag is off', async () => {
isFeatureEnabledMock.mockResolvedValue(false)
const session = new LoggingSession('wf-1', 'exec-sql', 'manual', 'req-1')
await session.start({ workspaceId: 'ws-1' })

await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')

expect(dbMocks.execute).toHaveBeenCalledTimes(1)
expect(setLastStartedBlockMock).not.toHaveBeenCalled()
})

it('falls back to the SQL path when flag resolution throws', async () => {
isFeatureEnabledMock.mockRejectedValue(new Error('appconfig unavailable'))
const session = new LoggingSession('wf-1', 'exec-fallback', 'manual', 'req-1')
await session.start({ workspaceId: 'ws-1' })

await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')

expect(dbMocks.execute).toHaveBeenCalledTimes(1)
expect(setLastStartedBlockMock).not.toHaveBeenCalled()
})

it('tells completion to read Redis markers only when the flag is on (no wasted ops when off)', async () => {
completeWorkflowExecutionMock.mockResolvedValue({})

isFeatureEnabledMock.mockResolvedValue(true)
const onSession = new LoggingSession('wf-1', 'exec-on', 'manual', 'req-1')
await onSession.start({ workspaceId: 'ws-1' })
await onSession.safeComplete({ finalOutput: { ok: true } })
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
expect.objectContaining({ executionId: 'exec-on', readProgressMarkers: true })
)

isFeatureEnabledMock.mockResolvedValue(false)
const offSession = new LoggingSession('wf-1', 'exec-off', 'manual', 'req-1')
await offSession.start({ workspaceId: 'ws-1' })
await offSession.safeComplete({ finalOutput: { ok: true } })
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
expect.objectContaining({ executionId: 'exec-off', readProgressMarkers: false })
)
})
})
39 changes: 8 additions & 31 deletions apps/sim/lib/logs/execution/logging-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { createLogger } from '@sim/logger'
import { describeError, toError } from '@sim/utils/errors'
import { and, eq, sql } from 'drizzle-orm'
import { releaseExecutionSlot } from '@/lib/billing/calculations/usage-reservation'
import { isFeatureEnabled } from '@/lib/core/config/feature-flags'
import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure'
import { executionLogger } from '@/lib/logs/execution/logger'
import {
Expand Down Expand Up @@ -136,13 +135,6 @@ export class LoggingSession {
private workflowState?: WorkflowState
private correlation?: NonNullable<ExecutionTrigger['data']>['correlation']
private isResume = false
/**
* Whether per-block progress markers go to Redis (vs jsonb_set UPDATEs on the
* log row). Resolved once in {@link start} and cached so an execution never
* mixes write paths across its block callbacks. Defaults to the legacy SQL
* path until resolved.
*/
private useRedisMarkers = false
private completed = false
/** Synchronous flag to prevent concurrent completion attempts (race condition guard) */
private completing = false
Expand Down Expand Up @@ -182,24 +174,12 @@ export class LoggingSession {
}

/**
* Resolve the per-block marker write path (Redis vs jsonb_set UPDATE) for this
* session. Defaults to the legacy SQL path if flag resolution fails.
*/
private async resolveRedisMarkerMode(): Promise<boolean> {
try {
return await isFeatureEnabled('redis-progress-markers')
} catch {
return false
}
}

/**
* Persist the last-started-block marker. Redis is the primary path when the
* flag is on; falls back to the durable jsonb_set UPDATE when Redis is
* unavailable or the write fails, so a marker is never dropped.
* Persist the last-started-block marker. Redis is the primary path; falls back
* to the durable jsonb_set UPDATE when Redis is unavailable or the write fails,
* so a marker is never dropped.
*/
private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise<void> {
Comment thread
waleedlatif1 marked this conversation as resolved.
if (this.useRedisMarkers && (await setLastStartedBlock(this.executionId, marker))) {
if (await setLastStartedBlock(this.executionId, marker)) {
return
}
try {
Expand All @@ -220,12 +200,12 @@ export class LoggingSession {
}

/**
* Persist the last-completed-block marker. Redis is the primary path when the
* flag is on; falls back to the durable jsonb_set UPDATE when Redis is
* unavailable or the write fails, so a marker is never dropped.
* Persist the last-completed-block marker. Redis is the primary path; falls
* back to the durable jsonb_set UPDATE when Redis is unavailable or the write
* fails, so a marker is never dropped.
*/
private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise<void> {
if (this.useRedisMarkers && (await setLastCompletedBlock(this.executionId, marker))) {
if (await setLastCompletedBlock(this.executionId, marker)) {
return
}
try {
Expand Down Expand Up @@ -308,7 +288,6 @@ export class LoggingSession {
isResume: this.isResume,
Comment thread
waleedlatif1 marked this conversation as resolved.
Comment thread
waleedlatif1 marked this conversation as resolved.
level: params.level,
status: params.status,
readProgressMarkers: this.useRedisMarkers,
})

// Release the admission reservation from preprocessing. Skipped on pause: a
Expand Down Expand Up @@ -356,8 +335,6 @@ export class LoggingSession {
} = params

try {
this.useRedisMarkers = await this.resolveRedisMarkerMode()

this.trigger = createTriggerObject(this.triggerType, triggerData)
this.correlation = triggerData?.correlation
this.environment = createEnvironmentObject(
Expand Down
7 changes: 0 additions & 7 deletions apps/sim/lib/logs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,5 @@ export interface ExecutionLoggerService {
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
/**
* Whether this session wrote live progress markers to Redis. When false, the
* completion fold skips the Redis read/clear entirely (markers are already on
* the row via the SQL path). Defaults to true so non-session callers keep the
* safe read-and-fold behavior.
*/
readProgressMarkers?: boolean
}): Promise<WorkflowExecutionLog>
}
Loading