Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions coverage.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@
ℹ urlFilter.js | 100.00 | 93.75 | 100.00 |
ℹ scheduler | | | |
ℹ cronInstaller.js | 57.60 | 29.17 | 85.71 | 22-24 54-58 77-122 139-141 143-145 156-158 160-162 182-184 186-188 190-207 211-212
ℹ index.js | 100.00 | 100.00 | 100.00 |
ℹ logger.js | 86.96 | 70.00 | 100.00 | 36-40 63-66
ℹ matcher.js | 100.00 | 100.00 | 100.00 |
ℹ parser.js | 100.00 | 94.12 | 100.00 |
ℹ queue.js | 100.00 | 100.00 | 100.00 |
ℹ runner.js | 100.00 | 100.00 | 100.00 |
ℹ scheduler.js | 100.00 | 93.55 | 91.67 |
ℹ scheduler.js | 98.86 | 91.18 | 91.67 | 170-171
ℹ session | | | |
ℹ checkpointer.js | 82.22 | 87.50 | 50.00 | 22 24 39-43 45
ℹ factory.js | 100.00 | 100.00 | 100.00 |
Expand All @@ -50,7 +49,7 @@
ℹ clarify.js | 100.00 | 94.44 | 80.00 |
ℹ code.js | 100.00 | 89.13 | 92.31 |
ℹ common.js | 100.00 | 93.33 | 83.33 |
ℹ cron.js | 100.00 | 97.30 | 90.00 |
ℹ cron.js | 98.69 | 93.41 | 75.00 | 70-71 83-84 205-206
ℹ date.js | 100.00 | 100.00 | 100.00 |
ℹ filesystem.js | 94.50 | 86.79 | 79.17 | 44-45 107-110 170-177 187-188 196-202 397-398 415-419 422-423
ℹ image.js | 97.90 | 95.83 | 50.00 | 92-94
Expand All @@ -73,6 +72,6 @@
ℹ messages.js | 100.00 | 94.44 | 100.00 |
ℹ panels.js | 100.00 | 100.00 | 100.00 |
ℹ ------------------------------------------------------------------------------------------------------------------------------------------
ℹ all files | 94.55 | 87.96 | 84.52 |
ℹ all files | 94.52 | 87.78 | 83.91 |
ℹ ------------------------------------------------------------------------------------------------------------------------------------------
ℹ end of coverage report
4 changes: 2 additions & 2 deletions docs/FLOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ scheduleManager.stop()
├── if !#running → return
├── for each entry in #scheduleEntry:
│ ├── if entry.paused → skip
│ └── if shouldRun(entry.cron, now):
│ │ shouldRun(cron, now):
│ └── if matchesCron(entry.cron, now):
│ │ matchesCron(cron, now):
│ │ └── minutes/hours from cron fields → matchesField(value, field):
│ │ ├── "*" → true
│ │ ├── */N → start + (value - start) % step === 0
Expand Down
48 changes: 26 additions & 22 deletions src/scheduler/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ export class ScheduleManager {
*/
start(scheduler, intervalMs = 60000) {
this.#running = true;
this.#tickId = setInterval(() => this.#clockTick(scheduler), intervalMs);
this.#clockTick(scheduler);
this.#tickId = setInterval(() => this.#clockTick(), intervalMs);
this.#clockTick();
}

/**
Expand All @@ -134,39 +134,43 @@ export class ScheduleManager {
this.#queue.clear();
}

/**
* Test helper: set a schedule entry directly on the internal map.
* @param {string} name
* @param {Object} entry
* @returns {void}
*/
_testSetEntry(name, entry) {
this.#scheduleEntry.set(name, entry);
}

/**
* Clock tick: check schedule expressions and enqueue triggered tasks.
* @param {Object} scheduler - The scheduler instance
*/
#clockTick(_scheduler) {
#clockTick() {
if (!this.#running) return;

const now = new Date();
for (const [, entry] of this.#scheduleEntry) {
if (entry.paused) continue;

if (matchesCron(entry.cron, now)) {
const dedup = {
entryName: entry.name,
...entry,
triggeredAt: now.toISOString(),
};
const { queued } = this.#queue.enqueue(dedup);
if (queued) {
entry.lastRun = now.toISOString();
try {
if (matchesCron(entry.cron, now)) {
const dedup = {
entryName: entry.name,
...entry,
triggeredAt: now.toISOString(),
};
const { queued } = this.#queue.enqueue(dedup);
if (queued) {
entry.lastRun = now.toISOString();
}
}
} catch {
// Single entry failure doesn't block remaining checks
}
}
}
}

/**
* Check if a cron expression matches a given date.
* @param {string} cron - Cron expression
* @param {Date} now - Current date/time
* @returns {boolean}
*/
export function shouldRun(cron, now) {
return matchesCron(cron, now);
}
export { matchesCron } from "./matcher.js";
110 changes: 107 additions & 3 deletions src/tools/cron.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,90 @@
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { mkdir, writeFile, readFile, readdir, unlink } from "node:fs/promises";
import { existsSync } from "node:fs";
import { join } from "node:path";
import { spawn } from "node:child_process";

/// -- Helper to find skill script --

/**
* Locate the main script file for a skill.
* @param {string} skillName - Skill name
* @param {string} [baseDir="skills"] - Base directory to search
* @returns {Promise<string|null>} Path to the main script, or null
*/
export async function findSkillScript(skillName, baseDir = "skills") {
const skillDir = join(baseDir, skillName);
const scriptCandidates = [
"scripts/run.sh",
"scripts/run.py",
"scripts/run.js",
"scripts/run.bash",
];

for (const candidate of scriptCandidates) {
const fullPath = join(skillDir, candidate);
if (existsSync(fullPath)) return fullPath;
}

const rootScripts = ["run.sh", "run.py", "run.js", "run.bash"];
for (const candidate of rootScripts) {
const fullPath = join(skillDir, candidate);
if (existsSync(fullPath)) return fullPath;
}

return null;
}

/**
* Run a script via spawn with stdout/stderr collection and timeout.
* @param {string} scriptPath - Path to the script
* @param {string[]} [args=[]] - Command-line arguments
* @param {object} [options] - Execution options
* @param {number} [options.timeout=30000] - Timeout in milliseconds
* @param {string} [options.cwd] - Working directory
* @returns {Promise<{ stdout: string, stderr: string, exitCode: number }>}
*/
export async function runScript(scriptPath, args = [], options = {}) {
const { timeout = 30000, cwd = process.cwd() } = options;
return new Promise((resolve) => {
const child = spawn(scriptPath, args, {
cwd,
stdio: ["pipe", "pipe", "pipe"],
});

const chunks = { stdout: [], stderr: [] };
let settled = false;

const settle = (exitCode) => {
if (settled) return;
settled = true;
resolve({
stdout: Buffer.concat(chunks.stdout).toString(),
stderr: Buffer.concat(chunks.stderr).toString(),
exitCode,
});
};

const timer = setTimeout(() => {
child.kill("SIGTERM");
setTimeout(() => settle(-1), 2000);
}, timeout);

child.stdout.on("data", (chunk) => chunks.stdout.push(chunk));
child.stderr.on("data", (chunk) => chunks.stderr.push(chunk));

child.on("exit", (code) => {
clearTimeout(timer);
settle(code ?? 0);
});

child.on("error", () => {
clearTimeout(timer);
if (!settled) settle(-1);
});
});
}

/// -- Cron validation --

Expand Down Expand Up @@ -92,7 +175,7 @@ async function getScheduleFiles(schedulesDir) {
}

/**
* Trigger a job immediately via the scheduler.
* Trigger a job immediately via the sandbox.
* @param {object} job - Job to run
* @param {object} [schedulerModule] - Scheduler module for testing
* @param {string} [schedulesDir] - Directory to write output to
Expand All @@ -104,8 +187,25 @@ async function runJob(job, schedulerModule, schedulesDir) {
}

if (!schedulerModule) {
schedulerModule = await import("../scheduler/index.js");
const scriptPath = await findSkillScript(job.skill);
if (!scriptPath) {
return {
ok: false,
error: `Skill "${job.skill}" has no discoverable script. Job "${job.name}" was not executed.`,
};
}

try {
const result = await runScript(scriptPath, [], { timeout: 30000 });
job.lastRun = new Date().toISOString();
job.updatedAt = new Date().toISOString();
await saveJob(job, schedulesDir);
return { ok: result.exitCode === 0, result };
} catch (err) {
return { ok: false, error: `Execution failed: ${err.message}` };
}
}

const scheduleEntry = {
name: job.name,
cron: job.cron,
Expand All @@ -115,7 +215,11 @@ async function runJob(job, schedulerModule, schedulesDir) {
};

try {
await schedulerModule.runScheduledSkill(scheduleEntry, {}, {});
await schedulerModule.runScheduledSkill(
scheduleEntry,
schedulerModule.sandbox || (() => ({})),
{},
);
job.lastRun = new Date().toISOString();
job.updatedAt = new Date().toISOString();
await saveJob(job, schedulesDir);
Expand Down
65 changes: 59 additions & 6 deletions tests/unit/scheduler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { join } from "node:path";
// --- Imports ---
import { validateCron, parseScheduleEntry } from "../../src/scheduler/parser.js";
import { ScheduleQueue } from "../../src/scheduler/queue.js";
import { ScheduleManager, shouldRun, matchesCron } from "../../src/scheduler/scheduler.js";
import { ScheduleManager, matchesCron } from "../../src/scheduler/scheduler.js";
import { logScheduleResult } from "../../src/scheduler/logger.js";
import { CronInstaller } from "../../src/scheduler/cronInstaller.js";

Expand Down Expand Up @@ -184,11 +184,6 @@ describe("scheduler - cron matching", () => {
it("rejects completely invalid cron", () => {
assert.strictEqual(matchesCron("invalid cron", new Date()), false);
});

it("shouldRun is an alias for matchesCron", () => {
assert.strictEqual(shouldRun("* * * * *", new Date(Date.UTC(2024, 0, 1, 9, 30))), true);
assert.strictEqual(shouldRun("0 9 * * *", new Date(Date.UTC(2024, 0, 1, 10, 30))), false);
});
});

// --- Queue ---
Expand Down Expand Up @@ -525,6 +520,64 @@ describe("scheduler - ScheduleManager", () => {
const entries = mgr.list();
assert.strictEqual(entries[0].paused, true);
});

it("#clockTick continues checking other entries if one throws", () => {
const mgr = new ScheduleManager(10);
mgr.register([{ name: "good", cron: "* * * * *" }]);
// Stash bad cron directly into internal map to trigger try catch
const badEntry = { name: "bad", cron: "invalid", paused: false, lastRun: null };
mgr._testSetEntry("bad", badEntry);
mgr.start({ sandbox: async () => ({}), state: {} }, 60000);
mgr.stop();
const entries = mgr.list();
const good = entries.find((e) => e.name === "good");
assert.ok(good.lastRun, "good entry should have lastRun set despite bad entry throwing");
});

it("#clockTick does not throw even with corrupted entries", () => {
const mgr = new ScheduleManager(10);
mgr.register([{ name: "good", cron: "* * * * *" }]);
const badEntry = { name: "bad", cron: null, paused: false, lastRun: null };
mgr._testSetEntry("bad", badEntry);
assert.doesNotThrow(() => {
mgr.start({ sandbox: async () => ({}), state: {} }, 60000);
mgr.stop();
});
});

it("#clockTick with 2 valid + 1 paused — all skip correctly", () => {
const mgr = new ScheduleManager(10);
const now = new Date();
const minuteField = String(now.getUTCMinutes());
mgr.register([
{ name: "a", cron: `${minuteField} * * * *` },
{ name: "b", cron: `${minuteField} * * * *` },
]);
mgr.pause("a");
mgr.start({ sandbox: async () => ({}), state: {} }, 60000);
mgr.stop();
const entries = mgr.list();
assert.strictEqual(entries.filter((e) => e.paused).length, 1);
const running = entries.filter((e) => !e.paused && e.lastRun);
assert.strictEqual(running.length, 1, "only 'b' should have lastRun set");
});

it("#clockTick with 3 matching entries — only 1 deduped", () => {
const mgr = new ScheduleManager(10);
const now = new Date();
const minuteField = String(now.getUTCMinutes());
mgr.register([
{ name: "a", cron: `${minuteField} * * * *` },
{ name: "b", cron: `${minuteField} * * * *` },
{ name: "c", cron: `${minuteField} * * * *` },
]);
mgr.start({ sandbox: async () => ({}), state: {} }, 60000);
mgr.stop();
const entries = mgr.list();
assert.strictEqual(entries.length, 3);
assert.strictEqual(entries.filter((e) => e.lastRun).length, 3);
assert.strictEqual(entries[0].queued, 0, "first tick should not leave tasks queued");
});
});

// --- CronInstaller ---
Expand Down
Loading