Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,18 @@ AGENTS_DEFAULT_MODEL=haiku
AGENTS_MAX_STEPS=15
AGENTS_TIMEOUT=300000

# ==============================================================================
# Task Decomposition (opt-in; requires AGENTS_ENABLED=true)
# ==============================================================================
# Breaks complex, divisible tasks into focused subtasks run with isolated
# context (parallel where independent), then synthesizes the result. A cost-aware
# gate decides WHEN to decompose — decomposition can cost MORE than it saves on
# small/indivisible tasks, so it only triggers on complex, large, divisible work.
# Exposed as the DecomposeTask tool. All other settings (models, gate thresholds,
# shadow mode) are hardcoded in src/config/index.js.

TASK_DECOMPOSITION_ENABLED=false

# ==============================================================================
# MCP Sandbox Configuration
# ==============================================================================
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM node:24-alpine AS runtime

ARG VCS_REF
ARG BUILD_DATE
ARG VERSION=9.5.0
ARG VERSION=9.6.0

LABEL org.opencontainers.image.title="Lynkr" \
org.opencontainers.image.description="Universal LLM proxy for Claude Code, Cursor, and AI coding tools" \
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
lynkr:
build: .
container_name: lynkr
image: lynkr:9.5.0
image: lynkr:9.6.0
ports:
- "8081:8081"
extra_hosts:
Expand Down Expand Up @@ -329,7 +329,7 @@ services:
retries: 3
start_period: 40s
labels:
- "com.lynkr.version=9.5.0"
- "com.lynkr.version=9.6.0"
- "com.lynkr.description=Claude Code proxy with multi-provider support"
# Uncomment to set resource limits
# deploy:
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "lynkr",
"version": "9.5.0",
"version": "9.6.0",
"description": "Self-hosted LLM gateway and tier-routing proxy for Claude Code, Cursor, and Codex. Routes across Ollama, AWS Bedrock, OpenRouter, Databricks, Azure OpenAI, llama.cpp, and LM Studio with prompt caching, MCP tools, and 60-80% cost savings.",
"main": "index.js",
"bin": {
Expand All @@ -16,7 +16,7 @@
"dev": "nodemon index.js",
"lint": "eslint src index.js",
"test": "npm run test:unit && npm run test:performance",
"test:unit": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/routing.test.js test/hybrid-routing-integration.test.js test/web-tools.test.js test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js test/azure-openai-config.test.js test/azure-openai-format-conversion.test.js test/azure-openai-routing.test.js test/azure-openai-streaming.test.js test/azure-openai-error-resilience.test.js test/azure-openai-integration.test.js test/openai-integration.test.js test/toon-compression.test.js test/llamacpp-integration.test.js test/resilience.test.js test/telemetry-routing.test.js test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js test/distill.test.js test/large-payload.test.js test/code-mode.test.js test/prompt-cache-injection.test.js test/risk-analyzer.test.js test/interaction-block.test.js test/preflight.test.js test/token-reduction.test.js test/session-affinity.test.js test/model-registry-cost.test.js",
"test:unit": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/routing.test.js test/hybrid-routing-integration.test.js test/web-tools.test.js test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js test/azure-openai-config.test.js test/azure-openai-format-conversion.test.js test/azure-openai-routing.test.js test/azure-openai-streaming.test.js test/azure-openai-error-resilience.test.js test/azure-openai-integration.test.js test/openai-integration.test.js test/toon-compression.test.js test/llamacpp-integration.test.js test/resilience.test.js test/telemetry-routing.test.js test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js test/distill.test.js test/large-payload.test.js test/code-mode.test.js test/prompt-cache-injection.test.js test/risk-analyzer.test.js test/interaction-block.test.js test/preflight.test.js test/token-reduction.test.js test/session-affinity.test.js test/model-registry-cost.test.js test/task-decomposition.test.js test/output-format-guard.test.js test/tier-fallback.test.js",
"test:memory": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js",
"test:new-features": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js",
"test:performance": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node test/hybrid-routing-performance.test.js && DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node test/performance-tests.js",
Expand Down
185 changes: 185 additions & 0 deletions src/agents/decomposition/dispatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Subtask dispatcher (Phase 3).
*
* Executes a validated plan respecting its dependency DAG:
* - subtasks are grouped into topological "levels" (Kahn's algorithm)
* - subtasks in the same level have no dependency on each other → run in
* parallel via the existing ParallelCoordinator (spawnParallel)
* - a subtask receives ONLY its own prompt plus the compressed results of the
* subtasks it depends on (context isolation — the token win)
*
* The spawn functions are injectable for testing.
*/

const logger = require("../../logger");

// Cap how much of a dependency's result we forward, to preserve the
// context-isolation savings (subagents already return summaries; this bounds
// pathological cases).
const MAX_CONTEXT_CHARS = 2000;

/**
* Group subtasks into dependency levels. Returns array of arrays of ids.
* Throws if the graph is unresolvable (should not happen — planner validated).
*/
function topologicalLevels(subtasks) {
const byId = new Map(subtasks.map((s) => [s.id, s]));
const indegree = new Map(subtasks.map((s) => [s.id, 0]));
const dependents = new Map(subtasks.map((s) => [s.id, []]));

for (const s of subtasks) {
for (const dep of s.dependsOn) {
if (!byId.has(dep)) continue;
indegree.set(s.id, indegree.get(s.id) + 1);
dependents.get(dep).push(s.id);
}
}

const levels = [];
let frontier = subtasks.filter((s) => indegree.get(s.id) === 0).map((s) => s.id);
const resolved = new Set();

while (frontier.length > 0) {
levels.push(frontier);
const next = [];
for (const id of frontier) {
resolved.add(id);
for (const child of dependents.get(id)) {
indegree.set(child, indegree.get(child) - 1);
if (indegree.get(child) === 0) next.push(child);
}
}
frontier = next;
}

if (resolved.size !== subtasks.length) {
throw new Error("Unresolvable subtask graph (cycle or dangling dependency)");
}
return levels;
}

function compressResult(text) {
if (typeof text !== "string") return String(text ?? "");
if (text.length <= MAX_CONTEXT_CHARS) return text;
return text.slice(0, MAX_CONTEXT_CHARS) + "\n…[truncated]";
}

function buildContextForSubtask(subtask, resultsById) {
if (!subtask.dependsOn || subtask.dependsOn.length === 0) return null;
const parts = [];
for (const dep of subtask.dependsOn) {
const r = resultsById.get(dep);
if (r && r.success && r.result) {
parts.push(`Result of subtask ${dep}:\n${compressResult(r.result)}`);
} else if (r) {
parts.push(`Subtask ${dep} did not complete successfully.`);
}
}
return parts.length > 0 ? parts.join("\n\n") : null;
}

/**
* Dispatch a validated plan.
* @param {Object} plan - { subtasks: [...] }
* @param {Object} [options]
* @param {string} [options.sessionId]
* @param {string} [options.cwd]
* @param {Function} [options.spawnParallel] - (agentTypes[], prompts[], opts) => results[]
* @returns {Promise<{results: Array, levels: Array, stats: Object}>}
*/
async function dispatchPlan(plan, options = {}) {
const spawnParallel = options.spawnParallel || require("../index").spawnParallel;
const subtasks = plan.subtasks;
const byId = new Map(subtasks.map((s) => [s.id, s]));
const levels = topologicalLevels(subtasks);
const resultsById = new Map();

let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalSubagents = 0;

for (let li = 0; li < levels.length; li++) {
const levelIds = levels[li];
const levelSubtasks = levelIds.map((id) => byId.get(id));

const agentTypes = levelSubtasks.map((s) => s.agentType);
const prompts = levelSubtasks.map((s) => s.prompt);
const perTaskContext = levelSubtasks.map((s) => buildContextForSubtask(s, resultsById));

logger.info(
{ level: li, count: levelIds.length, ids: levelIds },
"[Decomposition] Dispatching subtask level"
);

// spawnParallel shares one options object; pass per-task context by spawning
// the level as individual parallel calls when contexts differ.
const levelResults = await runLevel(
spawnParallel,
agentTypes,
prompts,
perTaskContext,
options
);

levelResults.forEach((res, idx) => {
const st = levelSubtasks[idx];
totalSubagents += 1;
totalInputTokens += res?.stats?.inputTokens || 0;
totalOutputTokens += res?.stats?.outputTokens || 0;
resultsById.set(st.id, {
id: st.id,
agentType: st.agentType,
success: !!res?.success,
result: res?.success ? res.result : null,
error: res?.success ? null : res?.error || "unknown error",
stats: res?.stats || {},
});
});
}

const results = subtasks.map((s) => resultsById.get(s.id));
return {
results,
levels,
stats: {
subagents: totalSubagents,
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
},
};
}

/**
* Run one level. When subtasks in the level have differing injected contexts we
* spawn them as separate parallel calls (each with its own mainContext), then
* await all. When none need context, a single spawnParallel batch is used.
*/
async function runLevel(spawnParallel, agentTypes, prompts, perTaskContext, options) {
const anyContext = perTaskContext.some((c) => c);

if (!anyContext) {
return spawnParallel(agentTypes, prompts, {
sessionId: options.sessionId,
cwd: options.cwd,
});
}

// Mixed/with-context: one spawnParallel call per subtask so each gets its own
// mainContext, executed concurrently.
const calls = agentTypes.map((type, i) =>
spawnParallel([type], [prompts[i]], {
sessionId: options.sessionId,
cwd: options.cwd,
mainContext: perTaskContext[i] ? { relevant_context: perTaskContext[i] } : undefined,
}).then((arr) => arr[0])
);
return Promise.all(calls);
}

module.exports = {
dispatchPlan,
topologicalLevels,
buildContextForSubtask,
compressResult,
MAX_CONTEXT_CHARS,
};
136 changes: 136 additions & 0 deletions src/agents/decomposition/gate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Decomposition gate (Phase 1).
*
* Decides whether breaking a task into isolated-context subtasks is actually
* worth it. This is the make-or-break of the feature: naive "decompose
* everything" loses money, because every subagent carries fixed overhead
* (planning + per-agent handoff/summarisation). Decomposition only pays off
* when the task is (a) genuinely complex, (b) large enough to amortise that
* overhead, and (c) divisible into reasonably independent units.
*
* Pure and synchronous so it can be unit-tested without a model. The caller
* supplies a pre-computed complexity `analysis` (from routing/complexity-analyzer)
* and the raw payload.
*/

const DEFAULTS = {
minComplexity: 60, // 0-100; only decompose genuinely complex work
minTokens: 3000, // estimated monolithic tokens; below this the overhead wins
minIndependentUnits: 2, // need at least 2 separable pieces to bother
maxSubtasks: 6,
};

const ENUMERATION_RE = /^\s*(?:[-*+]|\d+[.)]|step\s+\d+\b)/gim;
const CONJUNCTION_RE = /\b(?:and then|then|also|additionally|as well as|after that|finally|next,)\b/gi;
const IMPERATIVE_RE = /\b(?:add|create|build|implement|write|refactor|update|fix|remove|delete|migrate|test|document|configure|set up|wire|integrate|generate)\b/gi;
const FILE_PATH_RE = /\b[\w./-]+\.(?:js|ts|tsx|jsx|py|go|rs|java|rb|c|cpp|h|json|yaml|yml|md|sql|sh|css|html)\b/gi;

function _uniqueMatches(text, re) {
const set = new Set();
const matches = text.match(re) || [];
for (const m of matches) set.add(m.toLowerCase().trim());
return set;
}

/**
* Heuristically estimate how many independent units a task contains.
* Conservative: takes the strongest of several weak signals rather than summing
* them, so a single rambling sentence doesn't look like five subtasks.
* @param {string} text
* @returns {number}
*/
function estimateIndependentUnits(text) {
if (!text || typeof text !== "string") return 1;

const enumerated = (text.match(ENUMERATION_RE) || []).length;
const conjunctions = (text.match(CONJUNCTION_RE) || []).length;
const imperatives = _uniqueMatches(text, IMPERATIVE_RE).size;
const files = _uniqueMatches(text, FILE_PATH_RE).size;

// Each signal is an independent lower-bound estimate of separable units.
const signals = [
enumerated, // explicit list items
conjunctions + 1, // "do A and then B" → 2 units
imperatives, // distinct action verbs
files, // distinct files usually map to distinct work
];

const estimate = Math.max(...signals, 1);
return estimate;
}

/**
* Decide whether to decompose.
* @param {Object} analysis - result of analyzeComplexity(payload)
* @param {Object} payload - the request payload
* @param {Object} [options]
* @param {Object} [options.config] - threshold overrides (see DEFAULTS)
* @param {string} [options.riskLevel] - 'low'|'medium'|'high'; 'high' disables decomposition
* @param {string} [options.taskText] - explicit task text (else derived from analysis/payload)
* @returns {{ decompose: boolean, reason: string, signals: Object }}
*/
function shouldDecompose(analysis, payload = {}, options = {}) {
const cfg = { ...DEFAULTS, ...(options.config || {}) };

const score = Number(analysis?.score ?? 0);
const estimatedTokens = Number(
analysis?.breakdown?.tokens?.estimated ?? options.estimatedTokens ?? 0
);

const taskText =
options.taskText ||
analysis?.content ||
_firstUserText(payload) ||
"";

const independentUnits = estimateIndependentUnits(taskText);

const signals = {
score,
estimatedTokens,
independentUnits,
riskLevel: options.riskLevel || "low",
thresholds: cfg,
};

// Never decompose high-risk work — keep it in one capable context where the
// exempt-from-laziness concerns (validation/security) stay coherent.
if (options.riskLevel === "high") {
return { decompose: false, reason: "high_risk_skip", signals };
}

if (score < cfg.minComplexity) {
return { decompose: false, reason: "below_complexity_threshold", signals };
}

if (estimatedTokens < cfg.minTokens) {
return { decompose: false, reason: "too_small_to_amortise_overhead", signals };
}

if (independentUnits < cfg.minIndependentUnits) {
return { decompose: false, reason: "not_divisible", signals };
}

return { decompose: true, reason: "decompose_worthwhile", signals };
}

function _firstUserText(payload) {
const messages = payload?.messages;
if (!Array.isArray(messages)) return "";
const user = [...messages].reverse().find((m) => m.role === "user");
if (!user) return "";
if (typeof user.content === "string") return user.content;
if (Array.isArray(user.content)) {
return user.content
.filter((b) => b?.type === "text" || typeof b?.text === "string")
.map((b) => b.text || "")
.join("\n");
}
return "";
}

module.exports = {
shouldDecompose,
estimateIndependentUnits,
DEFAULTS,
};
Loading
Loading