diff --git a/AGENTS.md b/AGENTS.md index 28c18fb6..a7b6ef1e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -112,6 +112,7 @@ Co-Authored-By: (agent model name) - `specs/oauth-flows-spec.md` (OAuth authorization code flow + Slack UX contract) - `specs/agent-prompt-spec.md` (core prompt ownership, execution-bias, and bloat-control contract) - `specs/advisor-tool-spec.md` (draft provider-agnostic advisor tool contract) +- `specs/scheduler-spec.md` (draft scheduled Junior task contract) - `specs/harness-agent-spec.md` (agent loop and output contract) - `specs/agent-session-resumability-spec.md` (multi-slice turn resumability and timeout recovery contract) - `specs/agent-execution-spec.md` (agent execution rubric and completion gates) diff --git a/packages/docs/src/content/docs/reference/config-and-env.md b/packages/docs/src/content/docs/reference/config-and-env.md index 94d8e1df..88494787 100644 --- a/packages/docs/src/content/docs/reference/config-and-env.md +++ b/packages/docs/src/content/docs/reference/config-and-env.md @@ -12,18 +12,19 @@ related: ## Core runtime -| Variable | Required | Purpose | -| ------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | -| `SLACK_SIGNING_SECRET` | Yes | Verifies Slack request signatures. | -| `SLACK_BOT_TOKEN` or `SLACK_BOT_USER_TOKEN` | Yes | Posts thread replies and calls Slack APIs. | -| `REDIS_URL` | Yes | Queue and runtime state storage. | -| `JUNIOR_BOT_NAME` | No | Bot display/config naming. | -| `AI_MODEL` | No | Primary model selection override for main assistant turns. Defaults to `openai/gpt-5.4`; Junior chooses the reasoning effort per turn automatically. | -| `AI_FAST_MODEL` | No | Faster model for lightweight tasks and routing/classification passes before the main turn begins. Defaults to `openai/gpt-5.4-mini`. | -| `AI_VISION_MODEL` | No | Dedicated image-understanding model; unset disables vision features. | -| `AI_WEB_SEARCH_MODEL` | No | Override for the `webSearch` tool model. Defaults to a search-tuned model; does not fall through to `AI_MODEL`. | -| `JUNIOR_BASE_URL` | No | Canonical base URL for callback/auth URL generation. | -| `AI_GATEWAY_API_KEY` | No | AI gateway auth if used in your setup. | +| Variable | Required | Purpose | +| ------------------------------------------- | ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | +| `SLACK_SIGNING_SECRET` | Yes | Verifies Slack request signatures. | +| `SLACK_BOT_TOKEN` or `SLACK_BOT_USER_TOKEN` | Yes | Posts thread replies and calls Slack APIs. | +| `REDIS_URL` | Yes | Queue and runtime state storage. | +| `JUNIOR_BOT_NAME` | No | Bot display/config naming. | +| `AI_MODEL` | No | Primary model selection override for main assistant turns. Defaults to `openai/gpt-5.4`; Junior chooses the reasoning effort per turn automatically. | +| `AI_FAST_MODEL` | No | Faster model for lightweight tasks and routing/classification passes before the main turn begins. Defaults to `openai/gpt-5.4-mini`. | +| `AI_VISION_MODEL` | No | Dedicated image-understanding model; unset disables vision features. | +| `AI_WEB_SEARCH_MODEL` | No | Override for the `webSearch` tool model. Defaults to a search-tuned model; does not fall through to `AI_MODEL`. | +| `JUNIOR_BASE_URL` | No | Canonical base URL for callback/auth URL generation. | +| `CRON_SECRET` or `JUNIOR_SCHEDULER_SECRET` | Conditional | Bearer token for `/api/internal/scheduler/tick`; use `CRON_SECRET` with Vercel Cron, or `JUNIOR_SCHEDULER_SECRET` for an external scheduler. | +| `AI_GATEWAY_API_KEY` | No | AI gateway auth if used in your setup. | ## Build-time snapshot warmup diff --git a/packages/junior-evals/evals/behavior-harness.ts b/packages/junior-evals/evals/behavior-harness.ts index c90f2bb2..6e96027c 100644 --- a/packages/junior-evals/evals/behavior-harness.ts +++ b/packages/junior-evals/evals/behavior-harness.ts @@ -180,6 +180,7 @@ export interface EvalCanvasArtifact { } export interface EvalToolInvocation { + arguments?: Record; tool: string; bash_command?: string; mcp_arguments?: Record; @@ -216,6 +217,24 @@ function toEvalToolInvocation(input: { }): EvalToolInvocation { const invocation: EvalToolInvocation = { tool: input.toolName }; + if (input.toolName.startsWith("slackSchedule")) { + invocation.arguments = Object.fromEntries( + [ + "title", + "objective", + "schedule_description", + "timezone", + "next_run_at_iso", + "recurrence_frequency", + "recurrence_interval", + "recurrence_weekdays", + "status", + ] + .filter((key) => key in input.params) + .map((key) => [key, input.params[key]]), + ); + } + if (input.toolName === "bash" && typeof input.params.command === "string") { invocation.bash_command = input.params.command.trim(); } @@ -598,6 +617,7 @@ function toIncomingMessage(event: MentionEvent | SubscribedMessageEvent) { runId: event.thread.run_id, raw: { channel: event.thread.channel_id, + team_id: "T_EVAL", ts: messageTs, thread_ts: event.thread.thread_ts, }, diff --git a/packages/junior-evals/evals/core/scheduler.eval.ts b/packages/junior-evals/evals/core/scheduler.eval.ts new file mode 100644 index 00000000..64dfccea --- /dev/null +++ b/packages/junior-evals/evals/core/scheduler.eval.ts @@ -0,0 +1,73 @@ +import { describeEval } from "vitest-evals"; +import { mention, rubric, slackEvals } from "../helpers"; + +describeEval("Scheduler", slackEvals, (it) => { + it("when asked to schedule recurring work, draft the task for confirmation before creating it", async ({ + run, + }) => { + await run({ + events: [ + mention( + "@bot schedule this every Monday at 9am Pacific: check open GitHub issues about the scheduler and post a short digest here.", + ), + ], + criteria: rubric({ + contract: + "A future or recurring task request is normalized into a scheduled task draft for the active Slack context before it is persisted.", + pass: [ + "observed_tool_invocations does not contain slackScheduleCreateTask unless the tool input includes confirmed_by_user true.", + "The draft task title/objective/instructions describe checking scheduler-related GitHub issues, not creating a schedule.", + "The reply asks the user to confirm the normalized cadence or next run before creating the schedule.", + ], + fail: [ + "Do not persist a scheduled task before user confirmation.", + "Do not ask the user to provide a channel ID.", + "Do not use Slack chat.scheduleMessage.", + "Do not only give instructions for how the user can set up an external cron.", + ], + }), + }); + }); + + it("when executing a scheduled-task prompt, perform the task instead of creating another schedule", async ({ + run, + }) => { + await run({ + events: [ + mention(`@bot +This is an autonomous scheduled run. Treat the stored task contract as the user request for this turn. + + +- id: sched_eval +- title: Weekly scheduler digest +- objective: Summarize open scheduler issues. + +- Check for open scheduler issues. +- Post a concise digest. + + + + +- Execute the scheduled task described in ; do not create, update, pause, delete, or list schedules. + + + +Execute the scheduled task now and provide the final result for the configured destination. + +`), + ], + criteria: rubric({ + contract: + "A scheduled-task execution prompt is treated as the task to run, not as a request to schedule something.", + pass: [ + "observed_tool_invocations does not contain slackScheduleCreateTask.", + "The assistant attempts to produce or explain a scheduler issue digest.", + ], + fail: [ + "Do not create, edit, delete, or list scheduled tasks.", + "Do not say the task has been scheduled.", + ], + }), + }); + }); +}); diff --git a/packages/junior-evals/evals/helpers.ts b/packages/junior-evals/evals/helpers.ts index dbbc9d7b..892db594 100644 --- a/packages/junior-evals/evals/helpers.ts +++ b/packages/junior-evals/evals/helpers.ts @@ -68,6 +68,9 @@ function toToolCallRecord( invocation: EvalResult["toolInvocations"][number], ): ToolCallRecord { const args: Record = {}; + if (invocation.arguments) { + args.arguments = toJson(invocation.arguments); + } if (invocation.bash_command) { args.command = invocation.bash_command; } diff --git a/packages/junior/src/app.ts b/packages/junior/src/app.ts index cac0c149..1038663c 100644 --- a/packages/junior/src/app.ts +++ b/packages/junior/src/app.ts @@ -7,6 +7,7 @@ import { GET as dashboardGET } from "@/handlers/diagnostics-dashboard"; import { GET as healthGET } from "@/handlers/health"; import { GET as mcpOauthCallbackGET } from "@/handlers/mcp-oauth-callback"; import { GET as oauthCallbackGET } from "@/handlers/oauth-callback"; +import { ALL as schedulerTickALL } from "@/handlers/scheduler-tick"; import { ALL as sandboxEgressProxyALL, isSandboxEgressRequest, @@ -103,6 +104,10 @@ export async function createApp(options?: JuniorAppOptions): Promise { return turnResumePOST(c.req.raw, waitUntil); }); + app.all("/api/internal/scheduler/tick", (c) => { + return schedulerTickALL(c.req.raw, waitUntil); + }); + app.post("/api/webhooks/:platform", (c) => { return webhooksPOST(c.req.raw, c.req.param("platform"), waitUntil); }); diff --git a/packages/junior/src/chat/ingress/workspace-membership.ts b/packages/junior/src/chat/ingress/workspace-membership.ts index 3d507afa..9d59f9e2 100644 --- a/packages/junior/src/chat/ingress/workspace-membership.ts +++ b/packages/junior/src/chat/ingress/workspace-membership.ts @@ -1,15 +1,5 @@ -import { AsyncLocalStorage } from "node:async_hooks"; - -const workspaceTeamIdStorage = new AsyncLocalStorage(); - -/** Run a callback with the workspace team ID available for membership checks. */ -export function runWithWorkspaceTeamId( - teamId: string | undefined, - fn: () => T, -): T { - if (!teamId) return fn(); - return workspaceTeamIdStorage.run(teamId, fn); -} +import { getWorkspaceTeamId } from "@/chat/slack/workspace-context"; +export { runWithWorkspaceTeamId } from "@/chat/slack/workspace-context"; /** * Return true when a Slack event's author is from an external workspace. @@ -23,7 +13,7 @@ export function isExternalSlackUser( ): boolean { if (!raw) return false; - const workspaceTeamId = workspaceTeamIdStorage.getStore(); + const workspaceTeamId = getWorkspaceTeamId(); if (!workspaceTeamId) return false; const userTeam = diff --git a/packages/junior/src/chat/prompt.ts b/packages/junior/src/chat/prompt.ts index 0b832f02..6c29e7fc 100644 --- a/packages/junior/src/chat/prompt.ts +++ b/packages/junior/src/chat/prompt.ts @@ -411,6 +411,7 @@ const SLACK_ACTION_RULES = [ "- Context-bound Slack tools use runtime-owned targets; do not invent channel, canvas, list, or message IDs.", "- Use first-class Slack tools for Slack side effects; do not use bash, curl, or provider APIs to bypass Slack tool targeting.", "- Use channel-post and emoji-reaction tools only when the user explicitly asks for that Slack side effect.", + "- Use Slack schedule tools only when the user explicitly asks to create, list, edit, pause, resume, remove, or run future/recurring Junior work; scheduled task destinations are always the active Slack context, and task creation needs an exact next-run ISO timestamp.", "- For explicit channel-post or emoji-reaction requests, skip a duplicate thread text reply when the tool result already satisfies the request.", "- Do not claim an attachment, canvas, channel post, list update, or reaction succeeded unless the tool returned success this turn; when it did, include any link the tool returned.", "- Do not use reactions as progress indicators.", diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index 10ce5031..254d73cd 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -86,7 +86,11 @@ import { } from "@/chat/services/turn-checkpoint"; import { createMcpAuthOrchestration } from "@/chat/services/mcp-auth-orchestration"; import { createPluginAuthOrchestration } from "@/chat/services/plugin-auth-orchestration"; -import { AuthorizationPauseError } from "@/chat/services/auth-pause"; +import { + AuthorizationFlowDisabledError, + AuthorizationPauseError, + type AuthorizationFlowMode, +} from "@/chat/services/auth-pause"; // Re-export types for backward compatibility with existing consumers. export type { AssistantReply, AgentTurnDiagnostics }; @@ -104,6 +108,7 @@ export interface ReplyRequestContext { turnId?: string; runId?: string; channelId?: string; + teamId?: string; messageTs?: string; threadTs?: string; requesterId?: string; @@ -112,6 +117,7 @@ export interface ReplyRequestContext { conversationContext?: string; artifactState?: ThreadArtifactsState; pendingAuth?: ConversationPendingAuthState; + authorizationFlowMode?: AuthorizationFlowMode; configuration?: Record; /** Durable Pi transcript for this conversation, excluding ephemeral turn context. */ piMessages?: PiMessage[]; @@ -676,6 +682,7 @@ export async function generateAssistantReply( getMergedArtifactState: () => mergeArtifactsState(context.artifactState ?? {}, artifactStatePatch), onPendingAuth: context.onAuthPending, + authorizationFlowMode: context.authorizationFlowMode, }, () => agent?.abort(), ); @@ -690,6 +697,7 @@ export async function generateAssistantReply( channelConfiguration: context.channelConfiguration, currentPendingAuth: context.pendingAuth, onPendingAuth: context.onAuthPending, + authorizationFlowMode: context.authorizationFlowMode, userTokenStore, }, () => agent?.abort(), @@ -777,6 +785,8 @@ export async function generateAssistantReply( { channelId: toolChannelId, channelCapabilities, + requester: context.requester, + teamId: context.correlation?.teamId, messageTs: context.correlation?.messageTs, threadTs: context.correlation?.threadTs, userText: userInput, @@ -1192,6 +1202,9 @@ export async function generateAssistantReply( if (isRetryableTurnError(error)) { throw error; } + if (error instanceof AuthorizationFlowDisabledError) { + throw error; + } logException( error, diff --git a/packages/junior/src/chat/runtime/reply-executor.ts b/packages/junior/src/chat/runtime/reply-executor.ts index d60275d7..b0d4a09f 100644 --- a/packages/junior/src/chat/runtime/reply-executor.ts +++ b/packages/junior/src/chat/runtime/reply-executor.ts @@ -23,6 +23,7 @@ import { getAssistantThreadContext, getChannelId, getMessageTs, + getTeamId, getThreadId, getThreadTs, getRunId, @@ -140,6 +141,7 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { const threadTs = getThreadTs(threadId); const assistantThreadContext = getAssistantThreadContext(message); const messageTs = getMessageTs(message); + const teamId = getTeamId(message); const runId = getRunId(thread, message); const conversationId = threadId ?? runId; @@ -400,6 +402,7 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { turnId, threadTs, messageTs, + teamId, runId, channelId, requesterId: message.author.userId, diff --git a/packages/junior/src/chat/runtime/thread-context.ts b/packages/junior/src/chat/runtime/thread-context.ts index e6cc25ac..98b3b9fb 100644 --- a/packages/junior/src/chat/runtime/thread-context.ts +++ b/packages/junior/src/chat/runtime/thread-context.ts @@ -2,6 +2,7 @@ import type { Message, Thread } from "chat"; import { botConfig } from "@/chat/config"; import { toOptionalString } from "@/chat/coerce"; import { isDmChannel, normalizeSlackConversationId } from "@/chat/slack/client"; +import { getWorkspaceTeamId } from "@/chat/slack/workspace-context"; import { parseSlackThreadId, resolveSlackChannelIdFromThreadId, @@ -127,3 +128,19 @@ export function getMessageTs(message: Message): string | undefined { toOptionalString((rawRecord.message as { ts?: unknown } | undefined)?.ts) ); } + +/** Resolve the Slack workspace/team id from the raw inbound message payload. */ +export function getTeamId(message: Message): string | undefined { + const raw = (message as unknown as { raw?: unknown }).raw; + if (!raw || typeof raw !== "object") { + return undefined; + } + + const rawRecord = raw as Record; + return ( + toOptionalString(rawRecord.team_id) ?? + toOptionalString(rawRecord.team) ?? + getWorkspaceTeamId() ?? + toOptionalString(rawRecord.user_team) + ); +} diff --git a/packages/junior/src/chat/scheduler/cadence.ts b/packages/junior/src/chat/scheduler/cadence.ts new file mode 100644 index 00000000..ff4ac345 --- /dev/null +++ b/packages/junior/src/chat/scheduler/cadence.ts @@ -0,0 +1,465 @@ +import type { + ScheduledCalendarFrequency, + ScheduledLocalTime, + ScheduledTask, + ScheduledTaskRecurrence, +} from "@/chat/scheduler/types"; + +/** Parse an ISO timestamp into a finite Unix timestamp in milliseconds. */ +export function parseScheduleTimestamp(value: string): number | undefined { + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : undefined; +} + +export interface ZonedDateTimeParts { + day: number; + hour: number; + minute: number; + month: number; + second: number; + weekday: number; + year: number; +} + +interface LocalDate { + day: number; + month: number; + year: number; +} + +const FORMATTERS = new Map(); + +function getFormatter(timezone: string): Intl.DateTimeFormat { + const existing = FORMATTERS.get(timezone); + if (existing) { + return existing; + } + + const formatter = new Intl.DateTimeFormat("en-US", { + timeZone: timezone, + hour12: false, + year: "numeric", + month: "2-digit", + day: "2-digit", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); + FORMATTERS.set(timezone, formatter); + return formatter; +} + +function normalizeHour(hour: number): number { + return hour === 24 ? 0 : hour; +} + +function getLocalDateWeekday(date: LocalDate): number { + return new Date(Date.UTC(date.year, date.month - 1, date.day)).getUTCDay(); +} + +/** Resolve a UTC timestamp into calendar parts for a named time zone. */ +export function getZonedDateTimeParts( + timestampMs: number, + timezone: string, +): ZonedDateTimeParts { + const parts = getFormatter(timezone).formatToParts(new Date(timestampMs)); + const values = new Map(parts.map((part) => [part.type, part.value])); + const year = Number(values.get("year")); + const month = Number(values.get("month")); + const day = Number(values.get("day")); + const hour = normalizeHour(Number(values.get("hour"))); + const minute = Number(values.get("minute")); + const second = Number(values.get("second")); + + return { + year, + month, + day, + hour, + minute, + second, + weekday: getLocalDateWeekday({ year, month, day }), + }; +} + +function getTimeZoneOffsetMs(timestampMs: number, timezone: string): number { + const parts = getZonedDateTimeParts(timestampMs, timezone); + return ( + Date.UTC( + parts.year, + parts.month - 1, + parts.day, + parts.hour, + parts.minute, + parts.second, + ) - timestampMs + ); +} + +function localDateTimeToTimestampMs(args: { + date: LocalDate; + time: ScheduledLocalTime; + timezone: string; +}): number { + const localAsUtcMs = Date.UTC( + args.date.year, + args.date.month - 1, + args.date.day, + args.time.hour, + args.time.minute, + 0, + ); + let timestampMs = + localAsUtcMs - getTimeZoneOffsetMs(localAsUtcMs, args.timezone); + + for (let index = 0; index < 3; index += 1) { + const next = localAsUtcMs - getTimeZoneOffsetMs(timestampMs, args.timezone); + if (next === timestampMs) { + break; + } + timestampMs = next; + } + + return timestampMs; +} + +function compareDate(left: LocalDate, right: LocalDate): number { + return ( + Date.UTC(left.year, left.month - 1, left.day) - + Date.UTC(right.year, right.month - 1, right.day) + ); +} + +function addDays(date: LocalDate, days: number): LocalDate { + const next = new Date(Date.UTC(date.year, date.month - 1, date.day + days)); + return { + year: next.getUTCFullYear(), + month: next.getUTCMonth() + 1, + day: next.getUTCDate(), + }; +} + +function daysInMonth(year: number, month: number): number { + return new Date(Date.UTC(year, month, 0)).getUTCDate(); +} + +function parseLocalDate(value: string): LocalDate | undefined { + const match = /^(\d{4})-(\d{2})-(\d{2})$/.exec(value); + if (!match) { + return undefined; + } + + const year = Number(match[1]); + const month = Number(match[2]); + const day = Number(match[3]); + if ( + !Number.isInteger(year) || + !Number.isInteger(month) || + !Number.isInteger(day) || + month < 1 || + month > 12 || + day < 1 || + day > daysInMonth(year, month) + ) { + return undefined; + } + + return { year, month, day }; +} + +function formatLocalDate(date: LocalDate): string { + return [ + String(date.year).padStart(4, "0"), + String(date.month).padStart(2, "0"), + String(date.day).padStart(2, "0"), + ].join("-"); +} + +function getLocalDate(timestampMs: number, timezone: string): LocalDate { + const parts = getZonedDateTimeParts(timestampMs, timezone); + return { year: parts.year, month: parts.month, day: parts.day }; +} + +function normalizeWeekdays(values: number[] | undefined): number[] { + return [ + ...new Set((values ?? []).filter((value) => value >= 0 && value <= 6)), + ].sort((a, b) => a - b); +} + +function buildCandidate(args: { + date: LocalDate; + recurrence: ScheduledTaskRecurrence; + timezone: string; +}): number { + return localDateTimeToTimestampMs({ + date: args.date, + time: args.recurrence.time, + timezone: args.timezone, + }); +} + +function getDailyNextRunAtMs(args: { + afterMs: number; + recurrence: ScheduledTaskRecurrence; + scheduledForMs: number; + timezone: string; +}): number | undefined { + const start = parseLocalDate(args.recurrence.startDate); + if (!start) { + return undefined; + } + + let candidateDate = addDays( + getLocalDate(args.scheduledForMs, args.timezone), + args.recurrence.interval, + ); + if (compareDate(candidateDate, start) < 0) { + candidateDate = start; + } + + let candidate = buildCandidate({ + date: candidateDate, + recurrence: args.recurrence, + timezone: args.timezone, + }); + while (candidate <= args.afterMs) { + candidateDate = addDays(candidateDate, args.recurrence.interval); + candidate = buildCandidate({ + date: candidateDate, + recurrence: args.recurrence, + timezone: args.timezone, + }); + } + return candidate; +} + +function getWeeklyNextRunAtMs(args: { + afterMs: number; + recurrence: ScheduledTaskRecurrence; + scheduledForMs: number; + timezone: string; +}): number | undefined { + const start = parseLocalDate(args.recurrence.startDate); + if (!start) { + return undefined; + } + + const weekdays = normalizeWeekdays(args.recurrence.weekdays); + if (weekdays.length === 0) { + return undefined; + } + + let candidateDate = addDays( + getLocalDate(args.scheduledForMs, args.timezone), + 1, + ); + for (let attempts = 0; attempts < 3660; attempts += 1) { + const weeksSinceStart = Math.floor( + (Date.UTC( + candidateDate.year, + candidateDate.month - 1, + candidateDate.day, + ) - + Date.UTC(start.year, start.month - 1, start.day)) / + (7 * 24 * 60 * 60 * 1000), + ); + const isInCycle = + weeksSinceStart >= 0 && weeksSinceStart % args.recurrence.interval === 0; + if (isInCycle && weekdays.includes(getLocalDateWeekday(candidateDate))) { + const candidate = buildCandidate({ + date: candidateDate, + recurrence: args.recurrence, + timezone: args.timezone, + }); + if (candidate > args.afterMs) { + return candidate; + } + } + candidateDate = addDays(candidateDate, 1); + } + + return undefined; +} + +function getMonthlyNextRunAtMs(args: { + afterMs: number; + recurrence: ScheduledTaskRecurrence; + scheduledForMs: number; + timezone: string; +}): number | undefined { + const start = parseLocalDate(args.recurrence.startDate); + const dayOfMonth = args.recurrence.dayOfMonth; + if (!start || !dayOfMonth) { + return undefined; + } + + const scheduledDate = getLocalDate(args.scheduledForMs, args.timezone); + let monthIndex = scheduledDate.year * 12 + scheduledDate.month - 1; + const startMonthIndex = start.year * 12 + start.month - 1; + + for (let attempts = 0; attempts < 1200; attempts += 1) { + monthIndex += args.recurrence.interval; + if (monthIndex < startMonthIndex) { + monthIndex = startMonthIndex; + } + const year = Math.floor(monthIndex / 12); + const month = (monthIndex % 12) + 1; + if (dayOfMonth > daysInMonth(year, month)) { + continue; + } + const candidate = buildCandidate({ + date: { year, month, day: dayOfMonth }, + recurrence: args.recurrence, + timezone: args.timezone, + }); + if (candidate > args.afterMs) { + return candidate; + } + } + + return undefined; +} + +function getYearlyNextRunAtMs(args: { + afterMs: number; + recurrence: ScheduledTaskRecurrence; + scheduledForMs: number; + timezone: string; +}): number | undefined { + const start = parseLocalDate(args.recurrence.startDate); + const month = args.recurrence.month; + const dayOfMonth = args.recurrence.dayOfMonth; + if (!start || !month || !dayOfMonth) { + return undefined; + } + + const scheduledDate = getLocalDate(args.scheduledForMs, args.timezone); + let year = scheduledDate.year; + + for (let attempts = 0; attempts < 100; attempts += 1) { + year += args.recurrence.interval; + if (year < start.year) { + year = start.year; + } + if (dayOfMonth > daysInMonth(year, month)) { + continue; + } + const candidate = buildCandidate({ + date: { year, month, day: dayOfMonth }, + recurrence: args.recurrence, + timezone: args.timezone, + }); + if (candidate > args.afterMs) { + return candidate; + } + } + + return undefined; +} + +/** Build a calendar recurrence anchored to an exact first run timestamp. */ +export function buildCalendarRecurrence(args: { + frequency: ScheduledCalendarFrequency; + interval?: number; + nextRunAtMs: number; + timezone: string; + weekdays?: number[]; +}): ScheduledTaskRecurrence { + const interval = args.interval && args.interval > 0 ? args.interval : 1; + const parts = getZonedDateTimeParts(args.nextRunAtMs, args.timezone); + const time = { hour: parts.hour, minute: parts.minute }; + const startDate = formatLocalDate(parts); + + if (args.frequency === "weekly") { + const weekdays = normalizeWeekdays(args.weekdays); + return { + frequency: args.frequency, + interval, + startDate, + time, + weekdays: weekdays.length > 0 ? weekdays : [parts.weekday], + }; + } + + if (args.frequency === "monthly") { + return { + dayOfMonth: parts.day, + frequency: args.frequency, + interval, + startDate, + time, + }; + } + + if (args.frequency === "yearly") { + return { + dayOfMonth: parts.day, + frequency: args.frequency, + interval, + month: parts.month, + startDate, + time, + }; + } + + return { + frequency: args.frequency, + interval, + startDate, + time, + }; +} + +/** Return the next fire time after a completed run, when the task recurs. */ +export function getNextRunAtMs( + task: ScheduledTask, + scheduledForMs: number, + afterMs: number = scheduledForMs, +): number | undefined { + if (task.schedule.kind !== "recurring") { + return undefined; + } + + const recurrence = task.schedule.recurrence; + if ( + !recurrence || + !Number.isFinite(recurrence.interval) || + recurrence.interval <= 0 + ) { + return undefined; + } + + if (recurrence.frequency === "daily") { + return getDailyNextRunAtMs({ + recurrence, + timezone: task.schedule.timezone, + scheduledForMs, + afterMs, + }); + } + + if (recurrence.frequency === "weekly") { + return getWeeklyNextRunAtMs({ + recurrence, + timezone: task.schedule.timezone, + scheduledForMs, + afterMs, + }); + } + + if (recurrence.frequency === "monthly") { + return getMonthlyNextRunAtMs({ + recurrence, + timezone: task.schedule.timezone, + scheduledForMs, + afterMs, + }); + } + + return getYearlyNextRunAtMs({ + recurrence, + timezone: task.schedule.timezone, + scheduledForMs, + afterMs, + }); +} diff --git a/packages/junior/src/chat/scheduler/executor.ts b/packages/junior/src/chat/scheduler/executor.ts new file mode 100644 index 00000000..39619fe7 --- /dev/null +++ b/packages/junior/src/chat/scheduler/executor.ts @@ -0,0 +1,163 @@ +import { buildScheduledTaskRunPrompt } from "@/chat/scheduler/prompt"; +import type { SchedulerStore } from "@/chat/scheduler/store"; +import type { ScheduledRun, ScheduledTask } from "@/chat/scheduler/types"; + +export type ScheduledTaskRunResult = + | { + status: "completed"; + resultMessageTs?: string; + } + | { + status: "blocked" | "failed"; + errorMessage: string; + }; + +export interface ScheduledTaskRunner { + run(args: { + nowMs: number; + prompt: string; + run: ScheduledRun; + task: ScheduledTask; + }): Promise; +} + +/** Execute one claimed scheduled run through the compiled task prompt. */ +export async function executeScheduledRun(args: { + nowMs: number; + run: ScheduledRun; + runner: ScheduledTaskRunner; + store: SchedulerStore; +}): Promise { + const task = await args.store.getTask(args.run.taskId); + if (!task) { + return await args.store.markRunFailed({ + runId: args.run.id, + completedAtMs: args.nowMs, + errorMessage: `Scheduled task ${args.run.taskId} was not found`, + }); + } + + const startedRun = await args.store.markRunStarted({ + runId: args.run.id, + claimedAtMs: args.run.claimedAtMs, + nowMs: args.nowMs, + }); + if (!startedRun) { + return undefined; + } + + const prompt = buildScheduledTaskRunPrompt({ + task, + run: startedRun, + nowMs: args.nowMs, + }); + + try { + const result = await args.runner.run({ + task, + run: startedRun, + prompt, + nowMs: args.nowMs, + }); + + if (result.status === "completed") { + const completed = await args.store.markRunCompleted({ + runId: startedRun.id, + completedAtMs: args.nowMs, + resultMessageTs: result.resultMessageTs, + startedAtMs: startedRun.startedAtMs!, + }); + if (!completed) { + return undefined; + } + await args.store.updateTaskAfterRun({ + run: startedRun, + status: result.status, + nowMs: args.nowMs, + }); + return completed; + } + + if (result.status === "blocked") { + const blocked = await args.store.markRunBlocked({ + runId: startedRun.id, + completedAtMs: args.nowMs, + errorMessage: result.errorMessage, + startedAtMs: startedRun.startedAtMs!, + }); + if (!blocked) { + return undefined; + } + await args.store.updateTaskAfterRun({ + run: startedRun, + status: result.status, + errorMessage: result.errorMessage, + nowMs: args.nowMs, + }); + return blocked; + } + + const failed = await args.store.markRunFailed({ + runId: startedRun.id, + completedAtMs: args.nowMs, + errorMessage: result.errorMessage, + startedAtMs: startedRun.startedAtMs!, + }); + if (!failed) { + return undefined; + } + await args.store.updateTaskAfterRun({ + run: startedRun, + status: result.status, + errorMessage: result.errorMessage, + nowMs: args.nowMs, + }); + return failed; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + const failed = await args.store.markRunFailed({ + runId: startedRun.id, + completedAtMs: args.nowMs, + errorMessage, + startedAtMs: startedRun.startedAtMs!, + }); + if (!failed) { + return undefined; + } + await args.store.updateTaskAfterRun({ + run: startedRun, + status: "failed", + errorMessage, + nowMs: args.nowMs, + }); + return failed; + } +} + +/** Claim due scheduled runs and execute each through the supplied runner. */ +export async function processDueScheduledRuns(args: { + limit: number; + nowMs: number; + runner: ScheduledTaskRunner; + store: SchedulerStore; +}): Promise { + const claimedRuns = await args.store.claimDueRuns({ + limit: args.limit, + nowMs: args.nowMs, + }); + const completedRuns: ScheduledRun[] = []; + + for (const run of claimedRuns) { + const completed = await executeScheduledRun({ + store: args.store, + runner: args.runner, + run, + nowMs: args.nowMs, + }); + if (completed) { + completedRuns.push(completed); + } + } + + return completedRuns; +} diff --git a/packages/junior/src/chat/scheduler/prompt.ts b/packages/junior/src/chat/scheduler/prompt.ts new file mode 100644 index 00000000..95d691ed --- /dev/null +++ b/packages/junior/src/chat/scheduler/prompt.ts @@ -0,0 +1,89 @@ +import { escapeXml } from "@/chat/xml"; +import type { ScheduledRun, ScheduledTask } from "@/chat/scheduler/types"; + +const EXECUTION_RULES = [ + "- Execute the scheduled task described in ; do not create, update, pause, delete, or list schedules.", + "- Complete the task without asking follow-up questions unless access, approval, or required input is missing.", + "- Use the available tools and skills that are relevant to the task contract.", + "- If blocked, report the specific missing provider, permission, configuration, or input.", + "- Keep the final result shaped for the configured destination audience.", +]; + +function renderList(tag: string, values: string[] | undefined): string[] { + const entries = (values ?? []).map((value) => value.trim()).filter(Boolean); + if (entries.length === 0) { + return [`<${tag}>`, ""]; + } + return [ + `<${tag}>`, + ...entries.map((value) => `- ${escapeXml(value)}`), + ``, + ]; +} + +function renderOptionalLine(name: string, value: string | undefined): string[] { + return value?.trim() ? [`- ${name}: ${escapeXml(value.trim())}`] : []; +} + +/** Build the marker-delimited user prompt for one scheduled task execution. */ +export function buildScheduledTaskRunPrompt(args: { + nowMs: number; + run: ScheduledRun; + task: ScheduledTask; +}): string { + const { run, task } = args; + const destination = task.destination; + const creator = task.createdBy; + + return [ + "", + "This is an autonomous scheduled run. Treat the stored task contract as the user request for this turn.", + "", + "", + `- id: ${escapeXml(task.id)}`, + `- title: ${escapeXml(task.task.title)}`, + `- objective: ${escapeXml(task.task.objective)}`, + ...renderOptionalLine("expected_output", task.task.expectedOutput), + "", + ...task.task.instructions.map( + (instruction) => `- ${escapeXml(instruction)}`, + ), + "", + ...renderList("constraints", task.task.constraints), + ...renderList("source-context", task.task.sourceContext), + "", + "", + "", + `- run_id: ${escapeXml(run.id)}`, + `- task_version: ${run.taskVersion}`, + `- scheduled_for: ${new Date(run.scheduledForMs).toISOString()}`, + `- running_at: ${new Date(args.nowMs).toISOString()}`, + `- schedule: ${escapeXml(task.schedule.description)}`, + `- timezone: ${escapeXml(task.schedule.timezone)}`, + `- schedule_kind: ${task.schedule.kind}`, + ...(task.schedule.recurrence + ? [ + `- recurrence_frequency: ${task.schedule.recurrence.frequency}`, + `- recurrence_interval: ${task.schedule.recurrence.interval}`, + `- recurrence_start_date: ${escapeXml(task.schedule.recurrence.startDate)}`, + ] + : []), + `- creator_slack_user_id: ${escapeXml(creator.slackUserId)}`, + ...renderOptionalLine("creator_user_name", creator.userName), + ...renderOptionalLine("creator_full_name", creator.fullName), + `- destination_platform: ${destination.platform}`, + `- destination_team_id: ${escapeXml(destination.teamId)}`, + `- destination_channel_id: ${escapeXml(destination.channelId)}`, + ...renderOptionalLine("destination_thread_ts", destination.threadTs), + "", + "", + "", + ...EXECUTION_RULES, + "", + "", + '', + "Execute the scheduled task now and provide the final result for the configured destination.", + "", + "", + ].join("\n"); +} diff --git a/packages/junior/src/chat/scheduler/slack-runner.ts b/packages/junior/src/chat/scheduler/slack-runner.ts new file mode 100644 index 00000000..318bde03 --- /dev/null +++ b/packages/junior/src/chat/scheduler/slack-runner.ts @@ -0,0 +1,349 @@ +import { botConfig } from "@/chat/config"; +import { generateAssistantReply as generateAssistantReplyImpl } from "@/chat/respond"; +import { isRetryableTurnError } from "@/chat/runtime/turn"; +import { AuthorizationFlowDisabledError } from "@/chat/services/auth-pause"; +import type { ScheduledTaskRunner } from "@/chat/scheduler/executor"; +import type { ScheduledRun, ScheduledTask } from "@/chat/scheduler/types"; +import { logException } from "@/chat/logging"; +import { deliverPrivateMessage } from "@/chat/oauth-flow"; +import { + buildConversationContext, + markConversationMessage, + normalizeConversationText, + updateConversationStats, + upsertConversationMessage, +} from "@/chat/services/conversation-memory"; +import { finalizeFailedTurnReply } from "@/chat/services/turn-failure-response"; +import { + coerceThreadConversationState, + type ThreadConversationState, +} from "@/chat/state/conversation"; +import { + coerceThreadArtifactsState, + type ThreadArtifactsState, +} from "@/chat/state/artifacts"; +import { + getChannelConfigurationServiceById, + getPersistedThreadState, + persistThreadStateById, +} from "@/chat/runtime/thread-state"; +import { + planSlackReplyPosts, + postSlackApiReplyPosts, +} from "@/chat/slack/reply"; +import { buildSlackReplyFooter } from "@/chat/slack/footer"; +import { mergeArtifactsState } from "@/chat/runtime/thread-state"; + +export interface SlackScheduledTaskRunnerDeps { + generateAssistantReply?: typeof generateAssistantReplyImpl; +} + +function getConversationId(task: ScheduledTask): string { + return `slack:${task.destination.teamId}:${task.destination.channelId}:${task.destination.threadTs}`; +} + +function buildScheduledConversationText(task: ScheduledTask): string { + return `[scheduled task] ${task.task.title}: ${task.task.objective}`; +} + +function getScheduledAssistantMessageId(run: ScheduledRun): string { + return `scheduled-run:${run.id}:assistant`; +} + +function buildScheduledAuthError( + error: AuthorizationFlowDisabledError, +): string { + return `Scheduled task requires ${error.provider} authorization. Connect ${error.provider} in an interactive Slack message, then resume the task.`; +} + +async function notifyCreatorOfBlockedRun(args: { + errorMessage: string; + task: ScheduledTask; +}): Promise { + await deliverPrivateMessage({ + channelId: args.task.destination.channelId, + threadTs: args.task.destination.threadTs, + userId: args.task.createdBy.slackUserId, + text: `Scheduled task "${args.task.task.title}" is blocked: ${args.errorMessage}`, + }); +} + +function upsertScheduledUserMessage(args: { + conversation: ThreadConversationState; + run: ScheduledRun; + task: ScheduledTask; +}): string { + return upsertConversationMessage(args.conversation, { + id: `scheduled-run:${args.run.id}:user`, + role: "user", + text: normalizeConversationText(buildScheduledConversationText(args.task)), + createdAtMs: args.run.scheduledForMs, + author: { + userId: args.task.createdBy.slackUserId, + userName: args.task.createdBy.userName, + fullName: args.task.createdBy.fullName, + isBot: false, + }, + meta: { + explicitMention: true, + }, + }); +} + +async function persistRuntimePatch(args: { + artifacts?: ThreadArtifactsState; + conversation: ThreadConversationState; + sandboxDependencyProfileHash?: string; + sandboxId?: string; + threadId: string; +}): Promise { + await persistThreadStateById(args.threadId, { + artifacts: args.artifacts, + conversation: args.conversation, + sandboxId: args.sandboxId, + sandboxDependencyProfileHash: args.sandboxDependencyProfileHash, + }); +} + +/** Create the Slack runner used by scheduler tick dispatch. */ +export function createSlackScheduledTaskRunner( + deps: SlackScheduledTaskRunnerDeps = {}, +): ScheduledTaskRunner { + const generateAssistantReply = + deps.generateAssistantReply ?? generateAssistantReplyImpl; + + return { + run: async ({ prompt, run, task, nowMs }) => { + const threadTs = task.destination.threadTs; + if (!threadTs) { + return { + status: "blocked", + errorMessage: "Scheduled Slack task has no thread destination.", + }; + } + + const conversationId = getConversationId(task); + const persisted = await getPersistedThreadState(conversationId); + const conversation = coerceThreadConversationState(persisted); + const deliveredMessage = conversation.messages.find( + (message) => + message.id === getScheduledAssistantMessageId(run) && + message.meta?.replied === true && + typeof message.meta.slackTs === "string", + ); + if (deliveredMessage?.meta?.slackTs) { + return { + status: "completed", + resultMessageTs: deliveredMessage.meta.slackTs, + }; + } + + const artifacts = coerceThreadArtifactsState(persisted); + const channelConfiguration = getChannelConfigurationServiceById( + task.destination.channelId, + ); + const configuration = await channelConfiguration.resolveValues(); + const userMessageId = upsertScheduledUserMessage({ + conversation, + run, + task, + }); + updateConversationStats(conversation); + const conversationContext = buildConversationContext(conversation, { + excludeMessageId: userMessageId, + }); + + let currentArtifacts = artifacts; + let sandboxId = + typeof persisted.app_sandbox_id === "string" + ? persisted.app_sandbox_id + : undefined; + let sandboxDependencyProfileHash = + typeof persisted.app_sandbox_dependency_profile_hash === "string" + ? persisted.app_sandbox_dependency_profile_hash + : undefined; + + try { + let reply = await generateAssistantReply(prompt, { + requester: { + userId: task.createdBy.slackUserId, + userName: task.createdBy.userName, + fullName: task.createdBy.fullName, + }, + conversationContext, + artifactState: currentArtifacts, + piMessages: conversation.piMessages, + authorizationFlowMode: "disabled", + configuration, + channelConfiguration, + correlation: { + conversationId, + threadId: conversationId, + turnId: `scheduled:${run.id}`, + runId: run.id, + channelId: task.destination.channelId, + teamId: task.destination.teamId, + requesterId: task.createdBy.slackUserId, + threadTs, + }, + toolChannelId: task.destination.channelId, + sandbox: { + sandboxId, + sandboxDependencyProfileHash, + }, + onSandboxAcquired: async (sandbox) => { + sandboxId = sandbox.sandboxId; + sandboxDependencyProfileHash = sandbox.sandboxDependencyProfileHash; + await persistRuntimePatch({ + threadId: conversationId, + conversation, + artifacts: currentArtifacts, + sandboxId, + sandboxDependencyProfileHash, + }); + }, + onArtifactStateUpdated: async (nextArtifacts) => { + currentArtifacts = nextArtifacts; + await persistRuntimePatch({ + threadId: conversationId, + conversation, + artifacts: currentArtifacts, + sandboxId, + sandboxDependencyProfileHash, + }); + }, + }); + + const turnFailureErrorMessage = + reply.diagnostics.outcome === "success" + ? undefined + : (reply.diagnostics.errorMessage ?? + `Agent turn ended with ${reply.diagnostics.outcome}.`); + if (turnFailureErrorMessage) { + reply = finalizeFailedTurnReply({ + reply, + logException, + context: { + conversationId, + slackThreadId: conversationId, + slackChannelId: task.destination.channelId, + slackUserId: task.createdBy.slackUserId, + runId: run.id, + assistantUserName: botConfig.userName, + modelId: reply.diagnostics.modelId, + }, + }); + } + + const plannedPosts = planSlackReplyPosts({ reply }); + const footer = buildSlackReplyFooter({ + conversationId, + durationMs: reply.diagnostics.durationMs, + thinkingLevel: reply.diagnostics.thinkingLevel, + usage: reply.diagnostics.usage, + }); + const resultMessageTs = await postSlackApiReplyPosts({ + channelId: task.destination.channelId, + threadTs, + posts: plannedPosts, + footer, + fileUploadFailureMode: "strict", + }); + + markConversationMessage(conversation, userMessageId, { + replied: true, + skippedReason: undefined, + }); + upsertConversationMessage(conversation, { + id: getScheduledAssistantMessageId(run), + role: "assistant", + text: normalizeConversationText(reply.text) || "[empty response]", + createdAtMs: nowMs, + author: { + userName: botConfig.userName, + isBot: true, + }, + meta: { + replied: true, + slackTs: resultMessageTs, + }, + }); + if (reply.piMessages) { + conversation.piMessages = reply.piMessages; + } + updateConversationStats(conversation); + + const nextArtifacts = reply.artifactStatePatch + ? mergeArtifactsState(currentArtifacts, reply.artifactStatePatch) + : currentArtifacts; + await persistRuntimePatch({ + threadId: conversationId, + conversation, + artifacts: nextArtifacts, + sandboxId: reply.sandboxId ?? sandboxId, + sandboxDependencyProfileHash: + reply.sandboxDependencyProfileHash ?? sandboxDependencyProfileHash, + }); + + if (turnFailureErrorMessage) { + return { + status: "failed", + errorMessage: turnFailureErrorMessage, + }; + } + + return { + status: "completed", + resultMessageTs, + }; + } catch (error) { + if (error instanceof AuthorizationFlowDisabledError) { + const errorMessage = buildScheduledAuthError(error); + await notifyCreatorOfBlockedRun({ + task, + errorMessage, + }); + return { + status: "blocked", + errorMessage, + }; + } + if ( + isRetryableTurnError(error, "mcp_auth_resume") || + isRetryableTurnError(error, "plugin_auth_resume") + ) { + const errorMessage = + "Scheduled task requires authorization. Connect the required provider in an interactive Slack message, then resume the task."; + await notifyCreatorOfBlockedRun({ + task, + errorMessage, + }); + return { + status: "blocked", + errorMessage, + }; + } + + logException( + error, + "scheduled_task_run_failed", + { + conversationId, + slackThreadId: conversationId, + slackChannelId: task.destination.channelId, + slackUserId: task.createdBy.slackUserId, + runId: run.id, + assistantUserName: botConfig.userName, + modelId: botConfig.modelId, + }, + {}, + "Scheduled task run failed", + ); + return { + status: "failed", + errorMessage: error instanceof Error ? error.message : String(error), + }; + } + }, + }; +} diff --git a/packages/junior/src/chat/scheduler/store.ts b/packages/junior/src/chat/scheduler/store.ts new file mode 100644 index 00000000..fb47099a --- /dev/null +++ b/packages/junior/src/chat/scheduler/store.ts @@ -0,0 +1,535 @@ +import type { Lock, StateAdapter } from "chat"; +import { getNextRunAtMs } from "@/chat/scheduler/cadence"; +import { getStateAdapter } from "@/chat/state/adapter"; +import type { ScheduledRun, ScheduledTask } from "@/chat/scheduler/types"; + +const SCHEDULER_KEY_PREFIX = "junior:scheduler"; +const SCHEDULER_RECORD_TTL_MS = 5 * 365 * 24 * 60 * 60 * 1000; +const SCHEDULED_RUN_TTL_MS = 90 * 24 * 60 * 60 * 1000; +const CLAIM_TTL_MS = 6 * 60 * 60 * 1000; +const PENDING_CLAIM_STALE_MS = 60_000; +const LOCK_TTL_MS = 10_000; + +export interface SchedulerStore { + claimDueRuns(args: { limit: number; nowMs: number }): Promise; + getRun(runId: string): Promise; + getTask(taskId: string): Promise; + listTasksForTeam(teamId: string): Promise; + markRunBlocked(args: { + completedAtMs: number; + errorMessage: string; + runId: string; + startedAtMs: number; + }): Promise; + markRunCompleted(args: { + completedAtMs: number; + resultMessageTs?: string; + runId: string; + startedAtMs: number; + }): Promise; + markRunFailed(args: { + completedAtMs: number; + errorMessage: string; + startedAtMs?: number; + runId: string; + }): Promise; + markRunStarted(args: { + claimedAtMs: number; + nowMs: number; + runId: string; + }): Promise; + saveTask(task: ScheduledTask): Promise; + updateTaskAfterRun(args: { + errorMessage?: string; + nowMs: number; + run: ScheduledRun; + status: "blocked" | "completed" | "failed"; + }): Promise; +} + +function taskKey(taskId: string): string { + return `${SCHEDULER_KEY_PREFIX}:task:${taskId}`; +} + +function taskLockKey(taskId: string): string { + return `${taskKey(taskId)}:lock`; +} + +function runKey(runId: string): string { + return `${SCHEDULER_KEY_PREFIX}:run:${runId}`; +} + +function claimKey(taskId: string, scheduledForMs: number): string { + return `${SCHEDULER_KEY_PREFIX}:claim:${taskId}:${scheduledForMs}`; +} + +function activeRunKey(taskId: string): string { + return `${SCHEDULER_KEY_PREFIX}:active:${taskId}`; +} + +function globalTaskIndexKey(): string { + return `${SCHEDULER_KEY_PREFIX}:tasks`; +} + +function teamTaskIndexKey(teamId: string): string { + return `${SCHEDULER_KEY_PREFIX}:team:${teamId}:tasks`; +} + +function indexLockKey(indexKey: string): string { + return `${indexKey}:lock`; +} + +function buildRunId(taskId: string, scheduledForMs: number): string { + return `${taskId}:${scheduledForMs}`; +} + +function unique(values: string[]): string[] { + return [...new Set(values.filter(Boolean))]; +} + +async function withLock( + state: StateAdapter, + key: string, + callback: () => Promise, +): Promise { + const lock: Lock | null = await state.acquireLock(key, LOCK_TTL_MS); + if (!lock) { + throw new Error(`Could not acquire scheduler lock for ${key}`); + } + + try { + return await callback(); + } finally { + await state.releaseLock(lock); + } +} + +async function addToIndex( + state: StateAdapter, + key: string, + taskId: string, +): Promise { + await withLock(state, indexLockKey(key), async () => { + const current = ((await state.get(key)) ?? []).filter( + (value): value is string => typeof value === "string", + ); + await state.set(key, unique([...current, taskId]), SCHEDULER_RECORD_TTL_MS); + }); +} + +async function removeFromIndex( + state: StateAdapter, + key: string, + taskId: string, +): Promise { + await withLock(state, indexLockKey(key), async () => { + const current = unique( + ((await state.get(key)) ?? []).filter( + (value): value is string => typeof value === "string", + ), + ); + const next = current.filter((value) => value !== taskId); + if (next.length === current.length) { + return; + } + if (next.length === 0) { + await state.delete(key); + return; + } + await state.set(key, next, SCHEDULER_RECORD_TTL_MS); + }); +} + +async function getIndex(state: StateAdapter, key: string): Promise { + const values = (await state.get(key)) ?? []; + return unique( + values.filter((value): value is string => typeof value === "string"), + ); +} + +async function clearActiveRun( + state: StateAdapter, + taskId: string, + runId: string, +): Promise { + await withLock(state, indexLockKey(activeRunKey(taskId)), async () => { + const current = await state.get<{ runId?: unknown }>(activeRunKey(taskId)); + if (current?.runId === runId) { + await state.delete(activeRunKey(taskId)); + } + }); +} + +function isStalePendingRun( + run: ScheduledRun | undefined, + nowMs: number, +): boolean { + return ( + run?.status === "pending" && + run.claimedAtMs + PENDING_CLAIM_STALE_MS <= nowMs + ); +} + +function isDueTask( + task: ScheduledTask, + nowMs: number, +): task is ScheduledTask & { + nextRunAtMs: number; +} { + return ( + task.status === "active" && + typeof task.nextRunAtMs === "number" && + Number.isFinite(task.nextRunAtMs) && + task.nextRunAtMs <= nowMs + ); +} + +function buildScheduledRun(args: { + claimedAtMs: number; + scheduledForMs: number; + task: ScheduledTask; +}): ScheduledRun { + const idempotencyKey = `${args.task.id}:${args.scheduledForMs}`; + return { + id: buildRunId(args.task.id, args.scheduledForMs), + attempt: 1, + claimedAtMs: args.claimedAtMs, + idempotencyKey, + scheduledForMs: args.scheduledForMs, + status: "pending", + taskId: args.task.id, + taskVersion: args.task.version, + }; +} + +function canFinishRun( + run: ScheduledRun, + startedAtMs: number | undefined, +): boolean { + if (run.status === "pending") { + return startedAtMs === undefined; + } + return run.status === "running" && run.startedAtMs === startedAtMs; +} + +class StateAdapterSchedulerStore implements SchedulerStore { + private readonly state: StateAdapter; + + constructor(state: StateAdapter) { + this.state = state; + } + + async saveTask(task: ScheduledTask): Promise { + await this.state.connect(); + await withLock(this.state, taskLockKey(task.id), async () => { + const current = + (await this.state.get(taskKey(task.id))) ?? undefined; + await this.saveTaskRecord(task, current); + }); + } + + private async saveTaskRecord( + task: ScheduledTask, + current: ScheduledTask | undefined, + ): Promise { + if ( + current?.status === "blocked" && + task.status === "active" && + typeof task.nextRunAtMs === "number" && + Number.isFinite(task.nextRunAtMs) + ) { + await this.state.delete(claimKey(task.id, task.nextRunAtMs)); + } + await this.state.set(taskKey(task.id), task, SCHEDULER_RECORD_TTL_MS); + + if (task.status === "deleted") { + await removeFromIndex(this.state, globalTaskIndexKey(), task.id); + await removeFromIndex( + this.state, + teamTaskIndexKey(task.destination.teamId), + task.id, + ); + if (current && current.destination.teamId !== task.destination.teamId) { + await removeFromIndex( + this.state, + teamTaskIndexKey(current.destination.teamId), + task.id, + ); + } + return; + } + + await addToIndex(this.state, globalTaskIndexKey(), task.id); + await addToIndex( + this.state, + teamTaskIndexKey(task.destination.teamId), + task.id, + ); + if (current && current.destination.teamId !== task.destination.teamId) { + await removeFromIndex( + this.state, + teamTaskIndexKey(current.destination.teamId), + task.id, + ); + } + } + + async getTask(taskId: string): Promise { + await this.state.connect(); + return (await this.state.get(taskKey(taskId))) ?? undefined; + } + + async listTasksForTeam(teamId: string): Promise { + await this.state.connect(); + const ids = await getIndex(this.state, teamTaskIndexKey(teamId)); + const tasks = await Promise.all(ids.map((id) => this.getTask(id))); + return tasks + .filter((task): task is ScheduledTask => Boolean(task)) + .filter((task) => task.status !== "deleted") + .sort((a, b) => a.createdAtMs - b.createdAtMs); + } + + async claimDueRuns(args: { + limit: number; + nowMs: number; + }): Promise { + await this.state.connect(); + const ids = await getIndex(this.state, globalTaskIndexKey()); + const runs: ScheduledRun[] = []; + + for (const id of ids) { + if (runs.length >= args.limit) { + break; + } + + const task = await this.getTask(id); + if (!task || !isDueTask(task, args.nowMs)) { + continue; + } + + const scheduledForMs = task.nextRunAtMs; + const runId = buildRunId(task.id, scheduledForMs); + const tryClaimActiveRun = async (): Promise => + await this.state.setIfNotExists( + activeRunKey(task.id), + { claimedAtMs: args.nowMs, runId, scheduledForMs }, + CLAIM_TTL_MS, + ); + + let activeClaimed = await tryClaimActiveRun(); + if (!activeClaimed) { + const activeRun = await this.getRun(runId); + if (isStalePendingRun(activeRun, args.nowMs)) { + await clearActiveRun(this.state, task.id, runId); + await this.state.delete(claimKey(task.id, scheduledForMs)); + activeClaimed = await tryClaimActiveRun(); + } + if (!activeClaimed) { + continue; + } + } + + const tryClaimScheduledSlot = async (): Promise => + await this.state.setIfNotExists( + claimKey(task.id, scheduledForMs), + { claimedAtMs: args.nowMs }, + CLAIM_TTL_MS, + ); + + let claimed = await tryClaimScheduledSlot(); + if (!claimed) { + const existingRun = await this.getRun(runId); + if (isStalePendingRun(existingRun, args.nowMs)) { + await clearActiveRun(this.state, task.id, runId); + await this.state.delete(claimKey(task.id, scheduledForMs)); + activeClaimed = await tryClaimActiveRun(); + claimed = activeClaimed ? await tryClaimScheduledSlot() : false; + } + if (!claimed) { + await clearActiveRun(this.state, task.id, runId); + continue; + } + } + + const run = buildScheduledRun({ + claimedAtMs: args.nowMs, + scheduledForMs, + task, + }); + await this.state.set(runKey(run.id), run, SCHEDULED_RUN_TTL_MS); + runs.push(run); + } + + return runs; + } + + async getRun(runId: string): Promise { + await this.state.connect(); + return (await this.state.get(runKey(runId))) ?? undefined; + } + + async markRunStarted(args: { + claimedAtMs: number; + nowMs: number; + runId: string; + }): Promise { + return await this.updateRun(args.runId, (run) => + run.status === "pending" && run.claimedAtMs === args.claimedAtMs + ? { + ...run, + startedAtMs: args.nowMs, + status: "running", + } + : undefined, + ); + } + + async markRunCompleted(args: { + completedAtMs: number; + resultMessageTs?: string; + runId: string; + startedAtMs: number; + }): Promise { + const next = await this.updateRun(args.runId, (run) => + canFinishRun(run, args.startedAtMs) + ? { + ...run, + completedAtMs: args.completedAtMs, + resultMessageTs: args.resultMessageTs, + status: "completed", + } + : undefined, + ); + if (next) { + await clearActiveRun(this.state, next.taskId, next.id); + } + return next; + } + + async markRunFailed(args: { + completedAtMs: number; + errorMessage: string; + startedAtMs?: number; + runId: string; + }): Promise { + const next = await this.updateRun(args.runId, (run) => + canFinishRun(run, args.startedAtMs) + ? { + ...run, + completedAtMs: args.completedAtMs, + errorMessage: args.errorMessage, + status: "failed", + } + : undefined, + ); + if (next) { + await clearActiveRun(this.state, next.taskId, next.id); + } + return next; + } + + async markRunBlocked(args: { + completedAtMs: number; + errorMessage: string; + runId: string; + startedAtMs: number; + }): Promise { + const next = await this.updateRun(args.runId, (run) => + canFinishRun(run, args.startedAtMs) + ? { + ...run, + completedAtMs: args.completedAtMs, + errorMessage: args.errorMessage, + status: "blocked", + } + : undefined, + ); + if (next) { + await clearActiveRun(this.state, next.taskId, next.id); + } + return next; + } + + async updateTaskAfterRun(args: { + errorMessage?: string; + nowMs: number; + run: ScheduledRun; + status: "blocked" | "completed" | "failed"; + }): Promise { + await this.state.connect(); + await withLock(this.state, taskLockKey(args.run.taskId), async () => { + const current = + (await this.state.get(taskKey(args.run.taskId))) ?? + undefined; + if (!current || current.status === "deleted") { + return; + } + + if ( + current.status !== "active" || + current.nextRunAtMs !== args.run.scheduledForMs + ) { + await this.saveTaskRecord( + { + ...current, + lastRunAtMs: args.run.scheduledForMs, + updatedAtMs: args.nowMs, + version: current.version + 1, + }, + current, + ); + return; + } + + const nextRunAtMs = + args.status === "blocked" + ? undefined + : getNextRunAtMs(current, args.run.scheduledForMs, args.nowMs); + + await this.saveTaskRecord( + { + ...current, + lastRunAtMs: args.run.scheduledForMs, + nextRunAtMs, + status: + args.status === "blocked" + ? "blocked" + : nextRunAtMs + ? "active" + : "paused", + statusReason: + args.status === "blocked" ? args.errorMessage : undefined, + updatedAtMs: args.nowMs, + version: current.version + 1, + }, + current, + ); + }); + } + + private async updateRun( + runId: string, + update: (run: ScheduledRun) => ScheduledRun | undefined, + ): Promise { + await this.state.connect(); + return await withLock(this.state, indexLockKey(runKey(runId)), async () => { + const current = await this.getRun(runId); + if (!current) { + return undefined; + } + const next = update(current); + if (!next) { + return undefined; + } + await this.state.set(runKey(runId), next, SCHEDULED_RUN_TTL_MS); + return next; + }); + } +} + +/** Create the production scheduler store backed by Junior's state adapter. */ +export function createStateSchedulerStore( + stateAdapter: StateAdapter = getStateAdapter(), +): SchedulerStore { + return new StateAdapterSchedulerStore(stateAdapter); +} diff --git a/packages/junior/src/chat/scheduler/types.ts b/packages/junior/src/chat/scheduler/types.ts new file mode 100644 index 00000000..77f50455 --- /dev/null +++ b/packages/junior/src/chat/scheduler/types.ts @@ -0,0 +1,90 @@ +export type ScheduledTaskStatus = "active" | "paused" | "blocked" | "deleted"; + +export type ScheduledRunStatus = + | "pending" + | "running" + | "completed" + | "failed" + | "blocked" + | "skipped"; + +export interface ScheduledTaskPrincipal { + slackUserId: string; + fullName?: string; + userName?: string; +} + +export interface ScheduledTaskDestination { + platform: "slack"; + teamId: string; + channelId: string; + threadTs?: string; +} + +export type ScheduledCalendarFrequency = + | "daily" + | "weekly" + | "monthly" + | "yearly"; + +export interface ScheduledLocalTime { + hour: number; + minute: number; +} + +export interface ScheduledTaskRecurrence { + dayOfMonth?: number; + frequency: ScheduledCalendarFrequency; + interval: number; + month?: number; + startDate: string; + time: ScheduledLocalTime; + weekdays?: number[]; +} + +export interface ScheduledTaskSchedule { + description: string; + timezone: string; + kind: "one_off" | "recurring"; + recurrence?: ScheduledTaskRecurrence; +} + +export interface ScheduledTaskSpec { + title: string; + objective: string; + instructions: string[]; + expectedOutput?: string; + constraints?: string[]; + sourceContext?: string[]; +} + +export interface ScheduledTask { + id: string; + createdAtMs: number; + createdBy: ScheduledTaskPrincipal; + destination: ScheduledTaskDestination; + lastRunAtMs?: number; + nextRunAtMs?: number; + originalRequest?: string; + schedule: ScheduledTaskSchedule; + status: ScheduledTaskStatus; + statusReason?: string; + task: ScheduledTaskSpec; + updatedAtMs: number; + version: number; +} + +export interface ScheduledRun { + id: string; + attempt: number; + claimedAtMs: number; + completedAtMs?: number; + errorMessage?: string; + idempotencyKey: string; + resultMessageTs?: string; + scheduledForMs: number; + startedAtMs?: number; + status: ScheduledRunStatus; + taskId: string; + taskVersion: number; +} diff --git a/packages/junior/src/chat/services/auth-pause.ts b/packages/junior/src/chat/services/auth-pause.ts index 0ffbe036..18d8fadf 100644 --- a/packages/junior/src/chat/services/auth-pause.ts +++ b/packages/junior/src/chat/services/auth-pause.ts @@ -1,5 +1,6 @@ export type AuthorizationPauseKind = "mcp" | "plugin"; export type AuthorizationPauseDisposition = "link_already_sent" | "link_sent"; +export type AuthorizationFlowMode = "interactive" | "disabled"; /** * Runtime-owned signal that the current turn must park until the user @@ -29,3 +30,18 @@ export class AuthorizationPauseError extends Error { this.provider = provider; } } + +/** Error indicating this turn cannot start an external authorization flow. */ +export class AuthorizationFlowDisabledError extends Error { + readonly kind: AuthorizationPauseKind; + readonly provider: string; + + constructor(kind: AuthorizationPauseKind, provider: string) { + super( + `Authorization is required for ${provider}, but this turn cannot start an authorization flow.`, + ); + this.name = "AuthorizationFlowDisabledError"; + this.kind = kind; + this.provider = provider; + } +} diff --git a/packages/junior/src/chat/services/mcp-auth-orchestration.ts b/packages/junior/src/chat/services/mcp-auth-orchestration.ts index ffc14e15..34097659 100644 --- a/packages/junior/src/chat/services/mcp-auth-orchestration.ts +++ b/packages/junior/src/chat/services/mcp-auth-orchestration.ts @@ -7,7 +7,11 @@ import { } from "@/chat/mcp/auth-store"; import { deliverPrivateMessage, formatProviderLabel } from "@/chat/oauth-flow"; import { canReusePendingAuthLink } from "@/chat/services/pending-auth"; -import { AuthorizationPauseError } from "@/chat/services/auth-pause"; +import { + AuthorizationFlowDisabledError, + AuthorizationPauseError, + type AuthorizationFlowMode, +} from "@/chat/services/auth-pause"; import type { ThreadArtifactsState } from "@/chat/state/artifacts"; import type { ConversationPendingAuthState } from "@/chat/state/conversation"; import type { PluginDefinition } from "@/chat/plugins/types"; @@ -36,6 +40,7 @@ export interface McpAuthOrchestrationDeps { onPendingAuth?: ( pendingAuth: ConversationPendingAuthState, ) => void | Promise; + authorizationFlowMode?: AuthorizationFlowMode; } export interface McpAuthOrchestration { @@ -90,6 +95,10 @@ export function createMcpAuthOrchestration( `Missing MCP auth session context for plugin "${provider}"`, ); } + if (deps.authorizationFlowMode === "disabled") { + await deleteMcpAuthSession(authSessionId); + throw new AuthorizationFlowDisabledError("mcp", provider); + } const latestArtifactState = deps.getMergedArtifactState(); await patchMcpAuthSession(authSessionId, { diff --git a/packages/junior/src/chat/services/plugin-auth-orchestration.ts b/packages/junior/src/chat/services/plugin-auth-orchestration.ts index 55f07819..0b0fb261 100644 --- a/packages/junior/src/chat/services/plugin-auth-orchestration.ts +++ b/packages/junior/src/chat/services/plugin-auth-orchestration.ts @@ -3,7 +3,11 @@ import { unlinkProvider } from "@/chat/credentials/unlink-provider"; import type { UserTokenStore } from "@/chat/credentials/user-token-store"; import { formatProviderLabel, startOAuthFlow } from "@/chat/oauth-flow"; import { canReusePendingAuthLink } from "@/chat/services/pending-auth"; -import { AuthorizationPauseError } from "@/chat/services/auth-pause"; +import { + AuthorizationFlowDisabledError, + AuthorizationPauseError, + type AuthorizationFlowMode, +} from "@/chat/services/auth-pause"; import type { ConversationPendingAuthState } from "@/chat/state/conversation"; import { getPluginDefinition, @@ -43,6 +47,7 @@ export interface PluginAuthOrchestrationDeps { onPendingAuth?: ( pendingAuth: ConversationPendingAuthState, ) => void | Promise; + authorizationFlowMode?: AuthorizationFlowMode; userTokenStore?: UserTokenStore; } @@ -219,6 +224,9 @@ export function createPluginAuthOrchestration( if (!deps.requesterId || !getPluginOAuthConfig(provider)) { throw new Error(`Cannot start plugin authorization for ${provider}`); } + if (deps.authorizationFlowMode === "disabled") { + throw new AuthorizationFlowDisabledError("plugin", provider); + } const providerLabel = formatProviderLabel(provider); const reusingPendingLink = canReusePendingAuthLink({ diff --git a/packages/junior/src/chat/slack/workspace-context.ts b/packages/junior/src/chat/slack/workspace-context.ts new file mode 100644 index 00000000..6e26410b --- /dev/null +++ b/packages/junior/src/chat/slack/workspace-context.ts @@ -0,0 +1,17 @@ +import { AsyncLocalStorage } from "node:async_hooks"; + +const workspaceTeamIdStorage = new AsyncLocalStorage(); + +/** Run a callback with the Slack workspace team ID for the inbound webhook. */ +export function runWithWorkspaceTeamId( + teamId: string | undefined, + fn: () => T, +): T { + if (!teamId) return fn(); + return workspaceTeamIdStorage.run(teamId, fn); +} + +/** Return the Slack workspace team ID for the current inbound webhook. */ +export function getWorkspaceTeamId(): string | undefined { + return workspaceTeamIdStorage.getStore(); +} diff --git a/packages/junior/src/chat/tools/agent-tools.ts b/packages/junior/src/chat/tools/agent-tools.ts index 4d58afd2..6fbd79b3 100644 --- a/packages/junior/src/chat/tools/agent-tools.ts +++ b/packages/junior/src/chat/tools/agent-tools.ts @@ -3,7 +3,10 @@ import { serializeGenAiAttribute } from "@/chat/logging"; import { setSpanAttributes, withSpan, type LogContext } from "@/chat/logging"; import { GEN_AI_PROVIDER_NAME } from "@/chat/pi/client"; import { shouldEmitDevAgentTrace } from "@/chat/runtime/dev-agent-trace"; -import { AuthorizationPauseError } from "@/chat/services/auth-pause"; +import { + AuthorizationFlowDisabledError, + AuthorizationPauseError, +} from "@/chat/services/auth-pause"; import type { PluginAuthOrchestration } from "@/chat/services/plugin-auth-orchestration"; import { buildReportedProgressStatus } from "@/chat/runtime/report-progress"; import type { AssistantStatusSpec } from "@/chat/slack/assistant-thread/status"; @@ -109,7 +112,10 @@ export function createAgentTools( } return normalized; } catch (error) { - if (error instanceof AuthorizationPauseError) { + if ( + error instanceof AuthorizationPauseError || + error instanceof AuthorizationFlowDisabledError + ) { throw error; } handleToolExecutionError( diff --git a/packages/junior/src/chat/tools/index.ts b/packages/junior/src/chat/tools/index.ts index cc9e85d0..c6ab1027 100644 --- a/packages/junior/src/chat/tools/index.ts +++ b/packages/junior/src/chat/tools/index.ts @@ -14,6 +14,12 @@ import { createReportProgressTool } from "@/chat/tools/runtime/report-progress"; import { createSlackChannelListMessagesTool } from "@/chat/tools/slack/channel-list-messages"; import { createSlackChannelPostMessageTool } from "@/chat/tools/slack/channel-post-message"; import { createSlackMessageAddReactionTool } from "@/chat/tools/slack/message-add-reaction"; +import { + createSlackScheduleCreateTaskTool, + createSlackScheduleDeleteTaskTool, + createSlackScheduleListTasksTool, + createSlackScheduleUpdateTaskTool, +} from "@/chat/tools/slack/schedule-tools"; import { createSlackCanvasCreateTool, createSlackCanvasEditTool, @@ -152,5 +158,12 @@ export function createTools( ); } + if (context.channelId) { + tools.slackScheduleCreateTask = createSlackScheduleCreateTaskTool(context); + tools.slackScheduleListTasks = createSlackScheduleListTasksTool(context); + tools.slackScheduleUpdateTask = createSlackScheduleUpdateTaskTool(context); + tools.slackScheduleDeleteTask = createSlackScheduleDeleteTaskTool(context); + } + return tools; } diff --git a/packages/junior/src/chat/tools/slack/schedule-tools.ts b/packages/junior/src/chat/tools/slack/schedule-tools.ts new file mode 100644 index 00000000..28d2f8e1 --- /dev/null +++ b/packages/junior/src/chat/tools/slack/schedule-tools.ts @@ -0,0 +1,596 @@ +import { randomUUID } from "node:crypto"; +import { Type } from "@sinclair/typebox"; +import { + buildCalendarRecurrence, + parseScheduleTimestamp, +} from "@/chat/scheduler/cadence"; +import { createStateSchedulerStore } from "@/chat/scheduler/store"; +import type { + ScheduledCalendarFrequency, + ScheduledTask, + ScheduledTaskDestination, + ScheduledTaskPrincipal, + ScheduledTaskRecurrence, + ScheduledTaskStatus, +} from "@/chat/scheduler/types"; +import { normalizeSlackConversationId } from "@/chat/slack/client"; +import { tool } from "@/chat/tools/definition"; +import type { ToolRuntimeContext } from "@/chat/tools/types"; + +const TASK_ID_PREFIX = "sched"; +const MAX_LISTED_TASKS = 50; + +function requireActiveDestination( + context: ToolRuntimeContext, +): + | { ok: true; destination: ScheduledTaskDestination } + | { ok: false; error: string } { + const channelId = normalizeSlackConversationId(context.channelId); + if (!channelId) { + return { + ok: false, + error: "No active Slack channel context is available.", + }; + } + if (!context.teamId) { + return { + ok: false, + error: "No active Slack workspace context is available.", + }; + } + if (!context.threadTs) { + return { + ok: false, + error: "No active Slack thread context is available.", + }; + } + + return { + ok: true, + destination: { + platform: "slack", + teamId: context.teamId, + channelId, + threadTs: context.threadTs, + }, + }; +} + +function requireRequester( + context: ToolRuntimeContext, +): + | { ok: true; requester: ScheduledTaskPrincipal } + | { ok: false; error: string } { + const userId = context.requester?.userId; + if (!userId) { + return { + ok: false, + error: "No active Slack requester context is available.", + }; + } + + return { + ok: true, + requester: { + slackUserId: userId, + ...(context.requester?.userName + ? { userName: context.requester.userName } + : {}), + ...(context.requester?.fullName + ? { fullName: context.requester.fullName } + : {}), + }, + }; +} + +function sameDestination( + task: ScheduledTask, + destination: ScheduledTaskDestination, +): boolean { + return ( + task.destination.platform === destination.platform && + task.destination.teamId === destination.teamId && + task.destination.channelId === destination.channelId && + (task.destination.threadTs ?? "") === (destination.threadTs ?? "") + ); +} + +async function getWritableTask(args: { + context: ToolRuntimeContext; + taskId: string; +}): Promise< + | { ok: true; task: ScheduledTask; destination: ScheduledTaskDestination } + | { ok: false; error: string } +> { + const destination = requireActiveDestination(args.context); + if (!destination.ok) { + return destination; + } + const requester = requireRequester(args.context); + if (!requester.ok) { + return requester; + } + + const task = await createStateSchedulerStore().getTask(args.taskId); + if (!task || task.status === "deleted") { + return { + ok: false, + error: "Scheduled task was not found in the active destination.", + }; + } + + if (!sameDestination(task, destination.destination)) { + return { + ok: false, + error: + "Scheduled task can only be managed from the Slack destination where it was created.", + }; + } + if (task.createdBy.slackUserId !== requester.requester.slackUserId) { + return { + ok: false, + error: + "Scheduled task can only be managed by the Slack user who created it.", + }; + } + + return { + ok: true, + task, + destination: destination.destination, + }; +} + +function compactTask(task: ScheduledTask): Record { + return { + id: task.id, + status: task.status, + title: task.task.title, + objective: task.task.objective, + schedule: task.schedule.description, + timezone: task.schedule.timezone, + recurrence: task.schedule.recurrence + ? { + frequency: task.schedule.recurrence.frequency, + interval: task.schedule.recurrence.interval, + start_date: task.schedule.recurrence.startDate, + time: task.schedule.recurrence.time, + weekdays: task.schedule.recurrence.weekdays, + month: task.schedule.recurrence.month, + day_of_month: task.schedule.recurrence.dayOfMonth, + } + : null, + next_run_at: task.nextRunAtMs + ? new Date(task.nextRunAtMs).toISOString() + : null, + last_run_at: task.lastRunAtMs + ? new Date(task.lastRunAtMs).toISOString() + : null, + version: task.version, + }; +} + +function buildTaskId(): string { + return `${TASK_ID_PREFIX}_${randomUUID()}`; +} + +function normalizeStatus( + value: string | undefined, +): ScheduledTaskStatus | undefined { + if (value === "active" || value === "paused" || value === "blocked") { + return value; + } + return undefined; +} + +function normalizeFrequency( + value: unknown, +): ScheduledCalendarFrequency | undefined { + if ( + value === "daily" || + value === "weekly" || + value === "monthly" || + value === "yearly" + ) { + return value; + } + return undefined; +} + +function buildRecurrence(args: { + existing?: ScheduledTaskRecurrence; + input: { + recurrence_frequency?: unknown; + recurrence_interval?: number; + recurrence_weekdays?: number[]; + }; + nextRunAtMs: number | undefined; + timezone: string; +}): + | { ok: true; recurrence?: ScheduledTaskRecurrence } + | { ok: false; error: string } { + if (args.input.recurrence_frequency === null) { + return { ok: true, recurrence: undefined }; + } + + const frequency = + normalizeFrequency(args.input.recurrence_frequency) ?? + args.existing?.frequency; + if (!frequency) { + return { ok: true, recurrence: undefined }; + } + if (!args.nextRunAtMs) { + return { + ok: false, + error: "Recurring scheduled tasks require next_run_at_iso.", + }; + } + + try { + return { + ok: true, + recurrence: buildCalendarRecurrence({ + frequency, + interval: args.input.recurrence_interval ?? args.existing?.interval, + nextRunAtMs: args.nextRunAtMs, + timezone: args.timezone, + weekdays: + frequency === "weekly" + ? (args.input.recurrence_weekdays ?? args.existing?.weekdays) + : undefined, + }), + }; + } catch (error) { + return { + ok: false, + error: + error instanceof RangeError + ? "timezone must be a valid IANA time zone." + : error instanceof Error + ? error.message + : String(error), + }; + } +} + +function shouldRebuildRecurrence(input: { + next_run_at_iso?: string; + recurrence_frequency?: unknown; + recurrence_interval?: number; + recurrence_weekdays?: number[]; + timezone?: string; +}): boolean { + return ( + input.next_run_at_iso !== undefined || + input.recurrence_frequency !== undefined || + input.recurrence_interval !== undefined || + input.recurrence_weekdays !== undefined || + input.timezone !== undefined + ); +} + +/** Create a tool that stores a scheduled task for the active Slack context. */ +export function createSlackScheduleCreateTaskTool(context: ToolRuntimeContext) { + return tool({ + description: + "Create a Junior scheduled task for the active Slack destination. The destination is always the current Slack channel/thread context; never accept or invent another destination. Use only after the user has confirmed the normalized scheduled task contract. For recurring work, provide an exact next_run_at_iso and a calendar recurrence_frequency.", + inputSchema: Type.Object({ + confirmed_by_user: Type.Boolean({ + description: + "Must be true only after the user explicitly confirms the normalized task, cadence, timezone, destination, and next run.", + }), + title: Type.String({ minLength: 1, maxLength: 120 }), + objective: Type.String({ minLength: 1, maxLength: 1000 }), + instructions: Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + minItems: 1, + maxItems: 12, + }), + expected_output: Type.Optional( + Type.String({ minLength: 1, maxLength: 1000 }), + ), + schedule_description: Type.String({ minLength: 1, maxLength: 300 }), + timezone: Type.String({ minLength: 1, maxLength: 80 }), + next_run_at_iso: Type.String({ + minLength: 1, + description: + "Exact next run time as an ISO timestamp, computed from the user's requested schedule.", + }), + recurrence_frequency: Type.Optional( + Type.Union( + [ + Type.Literal("daily"), + Type.Literal("weekly"), + Type.Literal("monthly"), + Type.Literal("yearly"), + ], + { + description: + "Calendar recurrence for recurring tasks. Omit for exact one-off calendar dates.", + }, + ), + ), + recurrence_interval: Type.Optional( + Type.Integer({ + minimum: 1, + maximum: 100, + description: + "Calendar interval. For example, 2 with weekly means every two weeks.", + }), + ), + recurrence_weekdays: Type.Optional( + Type.Array(Type.Integer({ minimum: 0, maximum: 6 }), { + maxItems: 7, + description: + "For weekly schedules only. Sunday is 0, Monday is 1, Saturday is 6.", + }), + ), + constraints: Type.Optional( + Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + maxItems: 12, + }), + ), + source_context: Type.Optional( + Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + maxItems: 12, + }), + ), + }), + execute: async (input) => { + const destination = requireActiveDestination(context); + if (!destination.ok) return destination; + const requester = requireRequester(context); + if (!requester.ok) return requester; + if (input.confirmed_by_user !== true) { + return { + ok: false, + error: + "Scheduled tasks require explicit user confirmation before they are created. Draft the task contract for the user to confirm.", + }; + } + + const nextRunAtMs = parseScheduleTimestamp(input.next_run_at_iso); + if (!nextRunAtMs) { + return { + ok: false, + error: "next_run_at_iso must be a valid ISO timestamp.", + }; + } + const recurrence = buildRecurrence({ + input, + nextRunAtMs, + timezone: input.timezone, + }); + if (!recurrence.ok) { + return recurrence; + } + + const nowMs = Date.now(); + const task: ScheduledTask = { + id: buildTaskId(), + createdAtMs: nowMs, + updatedAtMs: nowMs, + createdBy: requester.requester, + destination: destination.destination, + nextRunAtMs, + originalRequest: context.userText, + schedule: { + description: input.schedule_description, + timezone: input.timezone, + kind: recurrence.recurrence ? "recurring" : "one_off", + recurrence: recurrence.recurrence, + }, + status: "active", + task: { + title: input.title, + objective: input.objective, + instructions: input.instructions, + expectedOutput: input.expected_output, + constraints: input.constraints, + sourceContext: input.source_context, + }, + version: 1, + }; + + await createStateSchedulerStore().saveTask(task); + return { + ok: true, + task: compactTask(task), + }; + }, + }); +} + +/** Create a tool that lists scheduled tasks for the active Slack destination. */ +export function createSlackScheduleListTasksTool(context: ToolRuntimeContext) { + return tool({ + description: + "List Junior scheduled tasks for the active Slack destination only. Use when the user asks what is scheduled here or wants task IDs before editing/removing schedules.", + annotations: { readOnlyHint: true, destructiveHint: false }, + inputSchema: Type.Object({}), + execute: async () => { + const destination = requireActiveDestination(context); + if (!destination.ok) return destination; + + const tasks = await createStateSchedulerStore().listTasksForTeam( + destination.destination.teamId, + ); + const matching = tasks.filter((task) => + sameDestination(task, destination.destination), + ); + const visible = matching.slice(0, MAX_LISTED_TASKS).map(compactTask); + + return { + ok: true, + tasks: visible, + truncated: matching.length > visible.length, + }; + }, + }); +} + +/** Create a tool that edits a scheduled task in the active Slack destination. */ +export function createSlackScheduleUpdateTaskTool(context: ToolRuntimeContext) { + return tool({ + description: + "Edit a Junior scheduled task in the active Slack destination. Use only for task IDs returned from the active destination. Do not move tasks across channels or threads.", + inputSchema: Type.Object({ + task_id: Type.String({ minLength: 1 }), + title: Type.Optional(Type.String({ minLength: 1, maxLength: 120 })), + objective: Type.Optional(Type.String({ minLength: 1, maxLength: 1000 })), + instructions: Type.Optional( + Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + minItems: 1, + maxItems: 12, + }), + ), + expected_output: Type.Optional( + Type.String({ minLength: 1, maxLength: 1000 }), + ), + schedule_description: Type.Optional( + Type.String({ minLength: 1, maxLength: 300 }), + ), + timezone: Type.Optional(Type.String({ minLength: 1, maxLength: 80 })), + next_run_at_iso: Type.Optional(Type.String({ minLength: 1 })), + recurrence_frequency: Type.Optional( + Type.Union([ + Type.Literal("daily"), + Type.Literal("weekly"), + Type.Literal("monthly"), + Type.Literal("yearly"), + Type.Null(), + ]), + ), + recurrence_interval: Type.Optional( + Type.Integer({ minimum: 1, maximum: 100 }), + ), + recurrence_weekdays: Type.Optional( + Type.Array(Type.Integer({ minimum: 0, maximum: 6 }), { maxItems: 7 }), + ), + status: Type.Optional( + Type.Union([ + Type.Literal("active"), + Type.Literal("paused"), + Type.Literal("blocked"), + ]), + ), + constraints: Type.Optional( + Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + maxItems: 12, + }), + ), + source_context: Type.Optional( + Type.Array(Type.String({ minLength: 1, maxLength: 1000 }), { + maxItems: 12, + }), + ), + }), + execute: async (input) => { + const lookup = await getWritableTask({ + context, + taskId: input.task_id, + }); + if (!lookup.ok) return lookup; + + const nextRunAtMs = input.next_run_at_iso + ? parseScheduleTimestamp(input.next_run_at_iso) + : lookup.task.nextRunAtMs; + if (input.next_run_at_iso && !nextRunAtMs) { + return { + ok: false, + error: "next_run_at_iso must be a valid ISO timestamp.", + }; + } + + const status = normalizeStatus(input.status); + if (input.status && !status) { + return { + ok: false, + error: "status must be active, paused, or blocked.", + }; + } + if (status === "active" && !nextRunAtMs) { + return { + ok: false, + error: + "Active scheduled tasks require next_run_at_iso when no next run is stored.", + }; + } + const timezone = input.timezone ?? lookup.task.schedule.timezone; + const recurrence = shouldRebuildRecurrence(input) + ? buildRecurrence({ + existing: lookup.task.schedule.recurrence, + input, + nextRunAtMs, + timezone, + }) + : { ok: true as const, recurrence: lookup.task.schedule.recurrence }; + if (!recurrence.ok) { + return recurrence; + } + const nextStatus = status ?? lookup.task.status; + + const next: ScheduledTask = { + ...lookup.task, + updatedAtMs: Date.now(), + nextRunAtMs, + status: nextStatus, + statusReason: + nextStatus === "blocked" ? lookup.task.statusReason : undefined, + schedule: { + ...lookup.task.schedule, + description: + input.schedule_description ?? lookup.task.schedule.description, + timezone, + kind: recurrence.recurrence ? "recurring" : "one_off", + recurrence: recurrence.recurrence, + }, + task: { + ...lookup.task.task, + title: input.title ?? lookup.task.task.title, + objective: input.objective ?? lookup.task.task.objective, + instructions: input.instructions ?? lookup.task.task.instructions, + expectedOutput: + input.expected_output ?? lookup.task.task.expectedOutput, + constraints: input.constraints ?? lookup.task.task.constraints, + sourceContext: input.source_context ?? lookup.task.task.sourceContext, + }, + version: lookup.task.version + 1, + }; + + await createStateSchedulerStore().saveTask(next); + return { + ok: true, + task: compactTask(next), + }; + }, + }); +} + +/** Create a tool that removes a scheduled task from the active Slack destination. */ +export function createSlackScheduleDeleteTaskTool(context: ToolRuntimeContext) { + return tool({ + description: + "Remove a Junior scheduled task from the active Slack destination. Use only for task IDs returned from this destination.", + inputSchema: Type.Object({ + task_id: Type.String({ minLength: 1 }), + }), + execute: async ({ task_id }) => { + const lookup = await getWritableTask({ context, taskId: task_id }); + if (!lookup.ok) return lookup; + + const next: ScheduledTask = { + ...lookup.task, + updatedAtMs: Date.now(), + status: "deleted", + nextRunAtMs: undefined, + version: lookup.task.version + 1, + }; + + await createStateSchedulerStore().saveTask(next); + return { + ok: true, + task: compactTask(next), + }; + }, + }); +} diff --git a/packages/junior/src/chat/tools/types.ts b/packages/junior/src/chat/tools/types.ts index cea7c78a..75323e7f 100644 --- a/packages/junior/src/chat/tools/types.ts +++ b/packages/junior/src/chat/tools/types.ts @@ -30,6 +30,12 @@ export interface ToolRuntimeContext { advisor?: AdvisorToolRuntimeContext; channelId?: string; channelCapabilities: ChannelCapabilities; + requester?: { + userId?: string; + userName?: string; + fullName?: string; + }; + teamId?: string; messageTs?: string; threadTs?: string; userText?: string; diff --git a/packages/junior/src/handlers/diagnostics-dashboard.ts b/packages/junior/src/handlers/diagnostics-dashboard.ts index 5756e0d3..e271b1d1 100644 --- a/packages/junior/src/handlers/diagnostics-dashboard.ts +++ b/packages/junior/src/handlers/diagnostics-dashboard.ts @@ -126,6 +126,7 @@ export async function GET(): Promise { { method: "GET", path: "/api/info" }, { method: "GET", path: "/api/oauth/callback/mcp/:provider" }, { method: "GET", path: "/api/oauth/callback/:provider" }, + { method: "POST", path: "/api/internal/scheduler/tick" }, { method: "POST", path: "/api/webhooks/:platform" }, ]; html += `\n
diff --git a/packages/junior/src/handlers/scheduler-tick.ts b/packages/junior/src/handlers/scheduler-tick.ts new file mode 100644 index 00000000..9b3ba189 --- /dev/null +++ b/packages/junior/src/handlers/scheduler-tick.ts @@ -0,0 +1,56 @@ +import { processDueScheduledRuns } from "@/chat/scheduler/executor"; +import { createSlackScheduledTaskRunner } from "@/chat/scheduler/slack-runner"; +import { createStateSchedulerStore } from "@/chat/scheduler/store"; +import { logException } from "@/chat/logging"; +import type { WaitUntilFn } from "@/handlers/types"; + +const DEFAULT_SCHEDULER_TICK_LIMIT = 10; + +function getSchedulerSecret(): string | undefined { + return ( + process.env.JUNIOR_SCHEDULER_SECRET?.trim() || + process.env.CRON_SECRET?.trim() + ); +} + +function verifySchedulerRequest(request: Request): boolean { + const secret = getSchedulerSecret(); + if (!secret) { + return false; + } + + const authorization = request.headers.get("authorization")?.trim(); + return authorization === `Bearer ${secret}`; +} + +/** Handle the authenticated internal scheduler tick. */ +export async function ALL( + request: Request, + waitUntil: WaitUntilFn, +): Promise { + if (!verifySchedulerRequest(request)) { + return new Response("Unauthorized", { status: 401 }); + } + + const nowMs = Date.now(); + waitUntil(() => + processDueScheduledRuns({ + store: createStateSchedulerStore(), + runner: createSlackScheduledTaskRunner(), + nowMs, + limit: DEFAULT_SCHEDULER_TICK_LIMIT, + }).catch((error) => { + logException( + error, + "scheduler_tick_failed", + {}, + { + "app.scheduler.now_ms": nowMs, + }, + "Scheduler tick failed", + ); + }), + ); + + return new Response("Accepted", { status: 202 }); +} diff --git a/packages/junior/src/vercel.ts b/packages/junior/src/vercel.ts index cefa2743..5e31ccf6 100644 --- a/packages/junior/src/vercel.ts +++ b/packages/junior/src/vercel.ts @@ -9,6 +9,12 @@ export function juniorVercelConfig(options: JuniorVercelConfigOptions = {}) { const config: Record = { framework: "nitro", + crons: [ + { + path: "/api/internal/scheduler/tick", + schedule: "* * * * *", + }, + ], }; if (buildCommand !== null) { diff --git a/packages/junior/tests/integration/scheduler-executor.test.ts b/packages/junior/tests/integration/scheduler-executor.test.ts new file mode 100644 index 00000000..590ee513 --- /dev/null +++ b/packages/junior/tests/integration/scheduler-executor.test.ts @@ -0,0 +1,419 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { disconnectStateAdapter } from "@/chat/state/adapter"; +import { + executeScheduledRun, + processDueScheduledRuns, + type ScheduledTaskRunner, +} from "@/chat/scheduler/executor"; +import { createStateSchedulerStore } from "@/chat/scheduler/store"; +import type { ScheduledTask } from "@/chat/scheduler/types"; + +vi.hoisted(() => { + process.env.JUNIOR_STATE_ADAPTER = "memory"; +}); + +function createTask(overrides: Partial = {}): ScheduledTask { + const firstRunAtMs = Date.parse("2026-03-02T17:00:00.000Z"); + return { + id: `sched_executor_${Date.now()}`, + createdAtMs: firstRunAtMs, + updatedAtMs: firstRunAtMs, + createdBy: { + slackUserId: "U123", + userName: "dcramer", + fullName: "David Cramer", + }, + destination: { + platform: "slack", + teamId: "T_EXECUTOR", + channelId: "C123", + threadTs: "1700000000.000000", + }, + nextRunAtMs: firstRunAtMs, + schedule: { + description: "Every Monday at 9am Pacific", + timezone: "America/Los_Angeles", + kind: "recurring", + recurrence: { + frequency: "weekly", + interval: 1, + startDate: "2026-03-02", + time: { + hour: 9, + minute: 0, + }, + weekdays: [1], + }, + }, + status: "active", + task: { + title: "Issue digest", + objective: "Summarize scheduler issues.", + instructions: ["Find open scheduler issues", "Post a concise digest"], + }, + version: 1, + ...overrides, + }; +} + +describe("scheduler executor", () => { + beforeEach(async () => { + await disconnectStateAdapter(); + }); + + afterEach(async () => { + await disconnectStateAdapter(); + }); + + it("wraps claimed tasks in the scheduled-run prompt and advances recurrence", async () => { + const store = createStateSchedulerStore(); + const task = createTask(); + await store.saveTask(task); + const prompts: string[] = []; + const runner: ScheduledTaskRunner = { + run: async ({ prompt }) => { + prompts.push(prompt); + return { status: "completed", resultMessageTs: "1700000000.000001" }; + }, + }; + + const completed = await processDueScheduledRuns({ + store, + runner, + nowMs: Date.parse("2026-03-02T17:00:04.500Z"), + limit: 10, + }); + + expect(completed).toHaveLength(1); + expect(completed[0]).toMatchObject({ + taskId: task.id, + status: "completed", + scheduledForMs: Date.parse("2026-03-02T17:00:00.000Z"), + }); + expect(prompts[0]).toContain(""); + expect(prompts[0]).toContain( + "Execute the scheduled task now and provide the final result", + ); + + const updated = await store.getTask(task.id); + expect(updated).toMatchObject({ + status: "active", + lastRunAtMs: Date.parse("2026-03-02T17:00:00.000Z"), + nextRunAtMs: Date.parse("2026-03-09T16:00:00.000Z"), + version: 2, + }); + }); + + it("keeps monthly recurrence on the exact calendar date", async () => { + const store = createStateSchedulerStore(); + const firstRunAtMs = Date.parse("2026-01-31T09:00:00.000Z"); + const task = createTask({ + id: `sched_monthly_${Date.now()}`, + nextRunAtMs: firstRunAtMs, + schedule: { + description: "Every month on the 31st at 9am UTC", + timezone: "UTC", + kind: "recurring", + recurrence: { + frequency: "monthly", + interval: 1, + startDate: "2026-01-31", + time: { + hour: 9, + minute: 0, + }, + dayOfMonth: 31, + }, + }, + }); + await store.saveTask(task); + + await processDueScheduledRuns({ + store, + nowMs: Date.parse("2026-02-01T00:00:00.000Z"), + limit: 10, + runner: { + run: async () => ({ status: "completed" }), + }, + }); + + const updated = await store.getTask(task.id); + expect(updated).toMatchObject({ + lastRunAtMs: firstRunAtMs, + nextRunAtMs: Date.parse("2026-03-31T09:00:00.000Z"), + }); + }); + + it("blocks the task when the runner reports missing requirements", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_blocked_${Date.now()}` }); + await store.saveTask(task); + const [run] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + + const completed = await executeScheduledRun({ + store, + run, + nowMs: Date.parse("2026-03-02T17:00:01.500Z"), + runner: { + run: async () => ({ + status: "blocked", + errorMessage: "Missing GitHub credentials.", + }), + }, + }); + + expect(completed).toMatchObject({ + status: "blocked", + errorMessage: "Missing GitHub credentials.", + }); + const updated = await store.getTask(task.id); + expect(updated).toMatchObject({ + status: "blocked", + statusReason: "Missing GitHub credentials.", + nextRunAtMs: undefined, + }); + }); + + it("allows a resumed blocked task to retry the same due instant", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_blocked_retry_${Date.now()}` }); + await store.saveTask(task); + const [run] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + + await executeScheduledRun({ + store, + run, + nowMs: Date.parse("2026-03-02T17:00:01.500Z"), + runner: { + run: async () => ({ + status: "blocked", + errorMessage: "Missing GitHub credentials.", + }), + }, + }); + + const blocked = await store.getTask(task.id); + expect(blocked).toMatchObject({ + status: "blocked", + nextRunAtMs: undefined, + }); + await store.saveTask({ + ...blocked!, + nextRunAtMs: run.scheduledForMs, + status: "active", + statusReason: undefined, + updatedAtMs: Date.parse("2026-03-02T17:00:02.000Z"), + version: blocked!.version + 1, + }); + + const [retryRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:03.000Z"), + limit: 10, + }); + + expect(retryRun).toMatchObject({ + id: run.id, + taskId: task.id, + scheduledForMs: run.scheduledForMs, + status: "pending", + }); + }); + + it("does not claim another due run while the same task is running", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_overlap_${Date.now()}` }); + await store.saveTask(task); + const [firstRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + await store.markRunStarted({ + runId: firstRun.id, + claimedAtMs: firstRun.claimedAtMs, + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + }); + const editedNextRunAtMs = Date.parse("2026-03-09T16:00:00.000Z"); + await store.saveTask({ + ...task, + nextRunAtMs: editedNextRunAtMs, + updatedAtMs: Date.parse("2026-03-02T17:00:02.000Z"), + version: task.version + 1, + }); + + await expect( + store.claimDueRuns({ + nowMs: Date.parse("2026-03-09T16:00:01.000Z"), + limit: 10, + }), + ).resolves.toHaveLength(0); + + await store.markRunCompleted({ + runId: firstRun.id, + completedAtMs: Date.parse("2026-03-02T17:00:03.000Z"), + startedAtMs: Date.parse("2026-03-02T17:00:01.000Z"), + }); + + const [nextRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-09T16:00:01.000Z"), + limit: 10, + }); + expect(nextRun).toMatchObject({ + taskId: task.id, + scheduledForMs: editedNextRunAtMs, + status: "pending", + }); + }); + + it("reclaims due tasks left pending by an aborted tick", async () => { + const store = createStateSchedulerStore(); + const firstTask = createTask({ id: `sched_aborted_first_${Date.now()}` }); + const secondTask = createTask({ + id: `sched_aborted_second_${Date.now()}`, + }); + await store.saveTask(firstTask); + await store.saveTask(secondTask); + + const [firstRun, abandonedRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + expect(firstRun).toMatchObject({ taskId: firstTask.id }); + expect(abandonedRun).toMatchObject({ + taskId: secondTask.id, + status: "pending", + }); + + await executeScheduledRun({ + store, + run: firstRun, + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + runner: { + run: async () => ({ status: "completed" }), + }, + }); + + const [retryRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:01:00.000Z"), + limit: 10, + }); + expect(retryRun).toMatchObject({ + id: abandonedRun.id, + taskId: secondTask.id, + scheduledForMs: abandonedRun.scheduledForMs, + status: "pending", + }); + }); + + it("does not let an abandoned claim start after the run is reclaimed", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_stale_claim_${Date.now()}` }); + await store.saveTask(task); + const [abandonedRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + const [reclaimedRun] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:01:00.000Z"), + limit: 10, + }); + + await expect( + executeScheduledRun({ + store, + run: abandonedRun, + nowMs: Date.parse("2026-03-02T17:01:01.000Z"), + runner: { + run: async () => { + throw new Error("stale claim should not start"); + }, + }, + }), + ).resolves.toBeUndefined(); + + await expect( + executeScheduledRun({ + store, + run: reclaimedRun, + nowMs: Date.parse("2026-03-02T17:01:02.000Z"), + runner: { + run: async () => ({ status: "completed" }), + }, + }), + ).resolves.toMatchObject({ + status: "completed", + }); + }); + + it("does not restart a run another tick already completed", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_completed_claim_${Date.now()}` }); + await store.saveTask(task); + const [run] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + + await executeScheduledRun({ + store, + run, + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + runner: { + run: async () => ({ status: "completed" }), + }, + }); + + await expect( + executeScheduledRun({ + store, + run, + nowMs: Date.parse("2026-03-02T17:00:02.000Z"), + runner: { + run: async () => { + throw new Error("completed run should not restart"); + }, + }, + }), + ).resolves.toBeUndefined(); + }); + + it("does not resurrect a task deleted while a run is executing", async () => { + const store = createStateSchedulerStore(); + const task = createTask({ id: `sched_deleted_${Date.now()}` }); + await store.saveTask(task); + const [run] = await store.claimDueRuns({ + nowMs: Date.parse("2026-03-02T17:00:00.000Z"), + limit: 10, + }); + + await executeScheduledRun({ + store, + run, + nowMs: Date.parse("2026-03-02T17:00:01.500Z"), + runner: { + run: async () => { + await store.saveTask({ + ...task, + status: "deleted", + nextRunAtMs: undefined, + updatedAtMs: Date.parse("2026-03-02T17:00:01.000Z"), + version: task.version + 1, + }); + return { status: "completed" }; + }, + }, + }); + + const updated = await store.getTask(task.id); + expect(updated).toMatchObject({ + status: "deleted", + nextRunAtMs: undefined, + version: 2, + }); + }); +}); diff --git a/packages/junior/tests/integration/scheduler-slack-runner.test.ts b/packages/junior/tests/integration/scheduler-slack-runner.test.ts new file mode 100644 index 00000000..39b19eef --- /dev/null +++ b/packages/junior/tests/integration/scheduler-slack-runner.test.ts @@ -0,0 +1,313 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { disconnectStateAdapter } from "@/chat/state/adapter"; +import { createSlackScheduledTaskRunner } from "@/chat/scheduler/slack-runner"; +import { getPersistedThreadState } from "@/chat/runtime/thread-state"; +import { AuthorizationFlowDisabledError } from "@/chat/services/auth-pause"; +import type { ScheduledRun, ScheduledTask } from "@/chat/scheduler/types"; +import type { AssistantReply } from "@/chat/respond"; +import { + chatPostEphemeralOk, + chatPostMessageOk, +} from "../fixtures/slack/factories/api"; +import { + getCapturedSlackApiCalls, + queueSlackApiResponse, +} from "../msw/handlers/slack-api"; + +vi.hoisted(() => { + process.env.JUNIOR_STATE_ADAPTER = "memory"; +}); + +function createTask(): ScheduledTask { + const scheduledForMs = Date.parse("2026-03-02T17:00:00.000Z"); + return { + id: "sched_slack_runner", + createdAtMs: scheduledForMs, + updatedAtMs: scheduledForMs, + createdBy: { + slackUserId: "U123", + userName: "dcramer", + fullName: "David Cramer", + }, + destination: { + platform: "slack", + teamId: "T123", + channelId: "C123", + threadTs: "1700000000.000000", + }, + nextRunAtMs: scheduledForMs, + schedule: { + description: "Every Monday at 9am Pacific", + timezone: "America/Los_Angeles", + kind: "recurring", + recurrence: { + frequency: "weekly", + interval: 1, + startDate: "2026-03-02", + time: { + hour: 9, + minute: 0, + }, + weekdays: [1], + }, + }, + status: "active", + task: { + title: "Issue digest", + objective: "Summarize scheduler issues.", + instructions: ["Find open scheduler issues", "Post a concise digest"], + }, + version: 1, + }; +} + +function createRun(task: ScheduledTask): ScheduledRun { + const scheduledForMs = task.nextRunAtMs!; + return { + id: `${task.id}:${scheduledForMs}`, + attempt: 1, + claimedAtMs: scheduledForMs, + idempotencyKey: `${task.id}:${scheduledForMs}`, + scheduledForMs, + status: "running", + startedAtMs: scheduledForMs, + taskId: task.id, + taskVersion: task.version, + }; +} + +function createReply(): AssistantReply { + return { + text: "Scheduled digest delivered.", + deliveryMode: "thread", + deliveryPlan: { + mode: "thread", + postThreadText: true, + attachFiles: "none", + }, + diagnostics: { + assistantMessageCount: 1, + durationMs: 1234, + modelId: "test-model", + outcome: "success", + toolCalls: [], + toolErrorCount: 0, + toolResultCount: 0, + usedPrimaryText: true, + }, + }; +} + +describe("scheduled Slack runner", () => { + beforeEach(async () => { + await disconnectStateAdapter(); + }); + + afterEach(async () => { + await disconnectStateAdapter(); + }); + + it("delivers scheduled run output through Slack Web API", async () => { + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000001", + }), + }); + const task = createTask(); + const run = createRun(task); + const runner = createSlackScheduledTaskRunner({ + generateAssistantReply: async (_prompt, context) => { + if (!context) { + throw new Error("expected reply context"); + } + expect(context.requester).toMatchObject({ + userId: "U123", + userName: "dcramer", + fullName: "David Cramer", + }); + expect(context.correlation).toMatchObject({ + channelId: "C123", + teamId: "T123", + threadTs: "1700000000.000000", + runId: run.id, + }); + return createReply(); + }, + }); + + const result = await runner.run({ + task, + run, + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + }); + + expect(result).toEqual({ + status: "completed", + resultMessageTs: "1700000000.000001", + }); + expect(getCapturedSlackApiCalls("chat.postMessage")).toEqual([ + expect.objectContaining({ + params: expect.objectContaining({ + channel: "C123", + thread_ts: "1700000000.000000", + text: "Scheduled digest delivered.", + }), + }), + ]); + }); + + it("does not post again when a scheduled run already has a delivered result", async () => { + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000001", + }), + }); + const task = createTask(); + const run = createRun(task); + const generateAssistantReply = vi.fn(async () => createReply()); + const runner = createSlackScheduledTaskRunner({ generateAssistantReply }); + + await expect( + runner.run({ + task, + run, + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + }), + ).resolves.toEqual({ + status: "completed", + resultMessageTs: "1700000000.000001", + }); + await expect( + runner.run({ + task, + run, + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:02.000Z"), + }), + ).resolves.toEqual({ + status: "completed", + resultMessageTs: "1700000000.000001", + }); + + expect(generateAssistantReply).toHaveBeenCalledTimes(1); + expect(getCapturedSlackApiCalls("chat.postMessage")).toHaveLength(1); + }); + + it("isolates scheduled conversation state by Slack workspace", async () => { + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000001", + }), + }); + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000002", + }), + }); + const firstTask = createTask(); + const baseSecondTask = createTask(); + const secondTask = { + ...baseSecondTask, + id: "sched_slack_runner_other_team", + destination: { + ...baseSecondTask.destination, + teamId: "T999", + }, + }; + const runner = createSlackScheduledTaskRunner({ + generateAssistantReply: async () => createReply(), + }); + + await runner.run({ + task: firstTask, + run: createRun(firstTask), + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + }); + await runner.run({ + task: secondTask, + run: createRun(secondTask), + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:02.000Z"), + }); + + await expect( + getPersistedThreadState("slack:T123:C123:1700000000.000000"), + ).resolves.toMatchObject({ + conversation: { + messages: expect.arrayContaining([ + expect.objectContaining({ + id: `scheduled-run:${createRun(firstTask).id}:assistant`, + }), + ]), + }, + }); + await expect( + getPersistedThreadState("slack:T999:C123:1700000000.000000"), + ).resolves.toMatchObject({ + conversation: { + messages: expect.arrayContaining([ + expect.objectContaining({ + id: `scheduled-run:${createRun(secondTask).id}:assistant`, + }), + ]), + }, + }); + }); + + it("blocks scheduled runs instead of starting authorization", async () => { + queueSlackApiResponse("chat.postEphemeral", { + body: chatPostEphemeralOk(), + }); + const task = createTask(); + const run = createRun(task); + const runner = createSlackScheduledTaskRunner({ + generateAssistantReply: async (_prompt, context) => { + if (!context) { + throw new Error("expected reply context"); + } + expect(context.authorizationFlowMode).toBe("disabled"); + expect(context.pendingAuth).toBeUndefined(); + expect(context.onAuthPending).toBeUndefined(); + throw new AuthorizationFlowDisabledError("mcp", "github"); + }, + }); + + const result = await runner.run({ + task, + run, + prompt: "", + nowMs: Date.parse("2026-03-02T17:00:01.000Z"), + }); + + expect(result).toEqual({ + status: "blocked", + errorMessage: + "Scheduled task requires github authorization. Connect github in an interactive Slack message, then resume the task.", + }); + expect(getCapturedSlackApiCalls("chat.postMessage")).toHaveLength(0); + expect(getCapturedSlackApiCalls("chat.postEphemeral")).toEqual([ + expect.objectContaining({ + params: expect.objectContaining({ + channel: "C123", + thread_ts: "1700000000.000000", + user: "U123", + text: expect.stringContaining( + 'Scheduled task "Issue digest" is blocked', + ), + }), + }), + ]); + await expect( + getPersistedThreadState("slack:T123:C123:1700000000.000000"), + ).resolves.not.toMatchObject({ + conversation: { processing: { pendingAuth: expect.anything() } }, + }); + }); +}); diff --git a/packages/junior/tests/integration/scheduler-tick.test.ts b/packages/junior/tests/integration/scheduler-tick.test.ts new file mode 100644 index 00000000..72ca2a6e --- /dev/null +++ b/packages/junior/tests/integration/scheduler-tick.test.ts @@ -0,0 +1,90 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { disconnectStateAdapter } from "@/chat/state/adapter"; +import { ALL as schedulerTick } from "@/handlers/scheduler-tick"; +import type { WaitUntilFn } from "@/handlers/types"; + +vi.hoisted(() => { + process.env.JUNIOR_STATE_ADAPTER = "memory"; +}); + +function collectWaitUntil(tasks: Promise[]): WaitUntilFn { + return (task) => { + tasks.push(typeof task === "function" ? task() : task); + }; +} + +describe("scheduler tick handler", () => { + beforeEach(async () => { + process.env.JUNIOR_SCHEDULER_SECRET = "test-secret"; + await disconnectStateAdapter(); + }); + + afterEach(async () => { + await disconnectStateAdapter(); + delete process.env.JUNIOR_SCHEDULER_SECRET; + delete process.env.CRON_SECRET; + delete process.env.JUNIOR_INTERNAL_RESUME_SECRET; + }); + + it("rejects unauthenticated scheduler ticks", async () => { + const waitUntilTasks: Promise[] = []; + const response = await schedulerTick( + new Request("https://example.invalid/api/internal/scheduler/tick"), + collectWaitUntil(waitUntilTasks), + ); + + expect(response.status).toBe(401); + expect(waitUntilTasks).toHaveLength(0); + }); + + it("accepts bearer-authenticated scheduler ticks", async () => { + const waitUntilTasks: Promise[] = []; + const response = await schedulerTick( + new Request("https://example.invalid/api/internal/scheduler/tick", { + headers: { + authorization: "Bearer test-secret", + }, + }), + collectWaitUntil(waitUntilTasks), + ); + + expect(response.status).toBe(202); + await Promise.all(waitUntilTasks); + expect(waitUntilTasks).toHaveLength(1); + }); + + it("accepts cron bearer authentication", async () => { + delete process.env.JUNIOR_SCHEDULER_SECRET; + process.env.CRON_SECRET = "cron-secret"; + const waitUntilTasks: Promise[] = []; + const response = await schedulerTick( + new Request("https://example.invalid/api/internal/scheduler/tick", { + headers: { + authorization: "Bearer cron-secret", + }, + }), + collectWaitUntil(waitUntilTasks), + ); + + expect(response.status).toBe(202); + await Promise.all(waitUntilTasks); + expect(waitUntilTasks).toHaveLength(1); + }); + + it("does not accept the timeout resume secret for scheduler ticks", async () => { + delete process.env.JUNIOR_SCHEDULER_SECRET; + process.env.JUNIOR_INTERNAL_RESUME_SECRET = "resume-secret"; + const waitUntilTasks: Promise[] = []; + const response = await schedulerTick( + new Request("https://example.invalid/api/internal/scheduler/tick", { + headers: { + authorization: "Bearer resume-secret", + }, + }), + collectWaitUntil(waitUntilTasks), + ); + + expect(response.status).toBe(401); + expect(waitUntilTasks).toHaveLength(0); + }); +}); diff --git a/packages/junior/tests/integration/slack-schedule-tools.test.ts b/packages/junior/tests/integration/slack-schedule-tools.test.ts new file mode 100644 index 00000000..0633c9a1 --- /dev/null +++ b/packages/junior/tests/integration/slack-schedule-tools.test.ts @@ -0,0 +1,389 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter"; +import { createStateSchedulerStore } from "@/chat/scheduler/store"; +import { + createSlackScheduleCreateTaskTool, + createSlackScheduleDeleteTaskTool, + createSlackScheduleListTasksTool, + createSlackScheduleUpdateTaskTool, +} from "@/chat/tools/slack/schedule-tools"; +import type { ToolRuntimeContext } from "@/chat/tools/types"; + +vi.hoisted(() => { + process.env.JUNIOR_STATE_ADAPTER = "memory"; +}); + +const TEST_TEAM_ID = `T_SCHEDULE_${Date.now()}`; + +function createContext( + overrides: Partial = {}, +): ToolRuntimeContext { + return { + channelId: "C123", + teamId: TEST_TEAM_ID, + threadTs: "1700000000.000000", + requester: { + userId: "U123", + userName: "dcramer", + fullName: "David Cramer", + }, + channelCapabilities: { + canCreateCanvas: true, + canPostToChannel: true, + canAddReactions: true, + }, + userText: "schedule this weekly", + sandbox: {} as ToolRuntimeContext["sandbox"], + ...overrides, + }; +} + +async function executeTool(tool: any, input: TInput) { + if (typeof tool?.execute !== "function") { + throw new Error("tool execute function missing"); + } + return await tool.execute(input, {} as any); +} + +async function createTask( + context = createContext(), + overrides: Record = {}, +) { + const tool = createSlackScheduleCreateTaskTool(context); + return await executeTool(tool, { + confirmed_by_user: true, + title: "Weekly issue digest", + objective: "Summarize open scheduler issues.", + instructions: ["Find open scheduler issues", "Post a concise summary"], + expected_output: "A short Slack digest", + schedule_description: "Every Monday at 9am", + timezone: "America/Los_Angeles", + next_run_at_iso: "2026-05-25T16:00:00.000Z", + recurrence_frequency: "weekly", + recurrence_weekdays: [1], + ...overrides, + }); +} + +describe("Slack schedule tools", () => { + beforeEach(async () => { + await disconnectStateAdapter(); + }); + + afterEach(async () => { + await disconnectStateAdapter(); + }); + + it("creates and lists tasks only for the active Slack destination", async () => { + const created = await createTask(); + expect(created).toMatchObject({ + ok: true, + task: { + status: "active", + title: "Weekly issue digest", + recurrence: { + frequency: "weekly", + interval: 1, + weekdays: [1], + }, + next_run_at: "2026-05-25T16:00:00.000Z", + }, + }); + + const listed = await executeTool( + createSlackScheduleListTasksTool(createContext()), + {}, + ); + expect(listed).toMatchObject({ + ok: true, + tasks: [ + { + title: "Weekly issue digest", + schedule: "Every Monday at 9am", + }, + ], + }); + + const wrongThread = await executeTool( + createSlackScheduleListTasksTool( + createContext({ threadTs: "1700000999.000000" }), + ), + {}, + ); + expect(wrongThread).toMatchObject({ + ok: true, + tasks: [], + }); + }); + + it("requires explicit confirmation before creating a task", async () => { + const result = await executeTool( + createSlackScheduleCreateTaskTool(createContext()), + { + title: "Weekly issue digest", + objective: "Summarize open scheduler issues.", + instructions: ["Find open scheduler issues", "Post a concise summary"], + schedule_description: "Every Monday at 9am", + timezone: "America/Los_Angeles", + next_run_at_iso: "2026-05-25T16:00:00.000Z", + recurrence_frequency: "weekly", + recurrence_weekdays: [1], + }, + ); + + expect(result).toMatchObject({ + ok: false, + error: + "Scheduled tasks require explicit user confirmation before they are created. Draft the task contract for the user to confirm.", + }); + await expect( + createStateSchedulerStore().listTasksForTeam(TEST_TEAM_ID), + ).resolves.toEqual([]); + }); + + it("edits and deletes a task from the same Slack destination", async () => { + const context = createContext({ threadTs: "1700000001.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + const taskId = created.task.id; + + const updated = await executeTool( + createSlackScheduleUpdateTaskTool(context), + { + task_id: taskId, + title: "Daily scheduler digest", + schedule_description: "Every day at 9am", + recurrence_frequency: "daily", + }, + ); + expect(updated).toMatchObject({ + ok: true, + task: { + id: taskId, + title: "Daily scheduler digest", + schedule: "Every day at 9am", + version: 2, + }, + }); + + const deleted = await executeTool( + createSlackScheduleDeleteTaskTool(context), + { + task_id: taskId, + }, + ); + expect(deleted).toMatchObject({ + ok: true, + task: { + id: taskId, + status: "deleted", + }, + }); + + const listed = await executeTool( + createSlackScheduleListTasksTool(context), + {}, + ); + expect(listed).toMatchObject({ ok: true, tasks: [] }); + }); + + it("rejects edits from another active Slack destination", async () => { + const context = createContext({ threadTs: "1700000002.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + + const updated = await executeTool( + createSlackScheduleUpdateTaskTool(createContext({ channelId: "C999" })), + { + task_id: created.task.id, + title: "Wrong channel edit", + }, + ); + + expect(updated).toMatchObject({ + ok: false, + error: + "Scheduled task can only be managed from the Slack destination where it was created.", + }); + }); + + it("rejects edits and deletes from another requester in the same Slack destination", async () => { + const context = createContext({ threadTs: "1700000003.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + const otherRequester = createContext({ + threadTs: context.threadTs, + requester: { + userId: "U999", + userName: "alice", + fullName: "Alice Reviewer", + }, + }); + + const updated = await executeTool( + createSlackScheduleUpdateTaskTool(otherRequester), + { + task_id: created.task.id, + title: "Hijacked digest", + }, + ); + const deleted = await executeTool( + createSlackScheduleDeleteTaskTool(otherRequester), + { + task_id: created.task.id, + }, + ); + + expect(updated).toMatchObject({ + ok: false, + error: + "Scheduled task can only be managed by the Slack user who created it.", + }); + expect(deleted).toMatchObject({ + ok: false, + error: + "Scheduled task can only be managed by the Slack user who created it.", + }); + await expect( + createStateSchedulerStore().getTask(created.task.id), + ).resolves.toMatchObject({ + status: "active", + task: { + title: "Weekly issue digest", + }, + version: 1, + }); + }); + + it("preserves a recurring task calendar anchor on content-only edits", async () => { + const context = createContext({ threadTs: "1700000004.000000" }); + const created = (await createTask(context, { + recurrence_interval: 2, + })) as { + task: { id: string }; + }; + const store = createStateSchedulerStore(); + const task = await store.getTask(created.task.id); + expect(task?.schedule.recurrence).toMatchObject({ + interval: 2, + startDate: "2026-05-25", + }); + await store.saveTask({ + ...task!, + nextRunAtMs: Date.parse("2026-06-08T16:00:00.000Z"), + updatedAtMs: Date.parse("2026-05-26T16:00:00.000Z"), + version: task!.version + 1, + }); + + const updated = await executeTool( + createSlackScheduleUpdateTaskTool(context), + { + task_id: created.task.id, + title: "Renamed issue digest", + }, + ); + + expect(updated).toMatchObject({ + ok: true, + task: { + title: "Renamed issue digest", + }, + }); + await expect(store.getTask(created.task.id)).resolves.toMatchObject({ + nextRunAtMs: Date.parse("2026-06-08T16:00:00.000Z"), + schedule: { + recurrence: { + interval: 2, + startDate: "2026-05-25", + }, + }, + }); + }); + + it("clears stale block reasons when resuming a task", async () => { + const context = createContext({ threadTs: "1700000005.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + const store = createStateSchedulerStore(); + const task = await store.getTask(created.task.id); + expect(task).toBeDefined(); + await store.saveTask({ + ...task!, + status: "blocked", + statusReason: "Missing GitHub credentials.", + updatedAtMs: Date.parse("2026-05-25T16:01:00.000Z"), + version: task!.version + 1, + }); + + const updated = await executeTool( + createSlackScheduleUpdateTaskTool(context), + { + task_id: created.task.id, + status: "active", + }, + ); + + expect(updated).toMatchObject({ + ok: true, + task: { + id: created.task.id, + status: "active", + }, + }); + const resumed = await store.getTask(created.task.id); + expect(resumed).toMatchObject({ + status: "active", + }); + expect(resumed?.statusReason).toBeUndefined(); + }); + + it("removes deleted tasks from scheduler indexes", async () => { + const context = createContext({ threadTs: "1700000006.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + + await executeTool(createSlackScheduleDeleteTaskTool(context), { + task_id: created.task.id, + }); + + const state = getStateAdapter(); + await state.connect(); + await expect(state.get("junior:scheduler:tasks")).resolves.toBe( + null, + ); + await expect( + state.get(`junior:scheduler:team:${TEST_TEAM_ID}:tasks`), + ).resolves.toBe(null); + }); + + it("claims due runs idempotently", async () => { + const context = createContext({ threadTs: "1700000007.000000" }); + const created = (await createTask(context)) as { + task: { id: string }; + }; + const store = createStateSchedulerStore(); + const task = await store.getTask(created.task.id); + expect(task).toBeDefined(); + await store.saveTask({ + ...task!, + nextRunAtMs: 1000, + updatedAtMs: 1000, + }); + + const first = await store.claimDueRuns({ nowMs: 2000, limit: 10 }); + const second = await store.claimDueRuns({ nowMs: 2000, limit: 10 }); + + expect(first).toHaveLength(1); + expect(first[0]).toMatchObject({ + taskId: created.task.id, + scheduledForMs: 1000, + status: "pending", + }); + expect(second).toHaveLength(0); + }); +}); diff --git a/packages/junior/tests/unit/runtime/thread-context.test.ts b/packages/junior/tests/unit/runtime/thread-context.test.ts index 79c2c08f..8420e7d1 100644 --- a/packages/junior/tests/unit/runtime/thread-context.test.ts +++ b/packages/junior/tests/unit/runtime/thread-context.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it } from "vitest"; -import { getAssistantThreadContext } from "@/chat/runtime/thread-context"; +import { + getAssistantThreadContext, + getTeamId, +} from "@/chat/runtime/thread-context"; +import { runWithWorkspaceTeamId } from "@/chat/slack/workspace-context"; describe("getAssistantThreadContext", () => { it("uses the current raw message ts for the first non-DM thread reply", () => { @@ -61,3 +65,41 @@ describe("getAssistantThreadContext", () => { ).toBeUndefined(); }); }); + +describe("getTeamId", () => { + it("uses the raw Slack workspace team when Slack provides it", () => { + expect( + getTeamId({ + raw: { + team_id: "T_RAW", + }, + } as any), + ).toBe("T_RAW"); + }); + + it("falls back to the inbound webhook workspace team", async () => { + await runWithWorkspaceTeamId("T_WORKSPACE", async () => { + await Promise.resolve(); + expect( + getTeamId({ + raw: { + channel: "C12345", + ts: "1700000000.200", + }, + } as any), + ).toBe("T_WORKSPACE"); + }); + }); + + it("prefers the inbound workspace over a Slack Connect author team", () => { + runWithWorkspaceTeamId("T_WORKSPACE", () => { + expect( + getTeamId({ + raw: { + user_team: "T_EXTERNAL", + }, + } as any), + ).toBe("T_WORKSPACE"); + }); + }); +}); diff --git a/packages/junior/tests/unit/services/mcp-auth-orchestration.test.ts b/packages/junior/tests/unit/services/mcp-auth-orchestration.test.ts new file mode 100644 index 00000000..c00ea46d --- /dev/null +++ b/packages/junior/tests/unit/services/mcp-auth-orchestration.test.ts @@ -0,0 +1,83 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createMcpAuthOrchestration } from "@/chat/services/mcp-auth-orchestration"; +import { AuthorizationFlowDisabledError } from "@/chat/services/auth-pause"; + +const { + createMcpOAuthClientProvider, + deleteMcpAuthSession, + deliverPrivateMessage, + formatProviderLabel, + getMcpAuthSession, + patchMcpAuthSession, +} = vi.hoisted(() => ({ + createMcpOAuthClientProvider: vi.fn(), + deleteMcpAuthSession: vi.fn(), + deliverPrivateMessage: vi.fn(), + formatProviderLabel: vi.fn((provider: string) => provider), + getMcpAuthSession: vi.fn(), + patchMcpAuthSession: vi.fn(), +})); + +vi.mock("@/chat/mcp/oauth", () => ({ + createMcpOAuthClientProvider, +})); + +vi.mock("@/chat/mcp/auth-store", () => ({ + deleteMcpAuthSession, + getMcpAuthSession, + patchMcpAuthSession, +})); + +vi.mock("@/chat/oauth-flow", () => ({ + deliverPrivateMessage, + formatProviderLabel, +})); + +describe("createMcpAuthOrchestration", () => { + beforeEach(() => { + createMcpOAuthClientProvider.mockReset(); + createMcpOAuthClientProvider.mockResolvedValue({ + authSessionId: "auth_1", + }); + deleteMcpAuthSession.mockReset(); + deliverPrivateMessage.mockReset(); + formatProviderLabel.mockClear(); + getMcpAuthSession.mockReset(); + patchMcpAuthSession.mockReset(); + }); + + it("returns a deterministic error instead of delivering auth links when authorization is disabled", async () => { + const abortAgent = vi.fn(); + const orchestration = createMcpAuthOrchestration( + { + conversationId: "slack:C123:1700000000.000000", + sessionId: "scheduled:sched_1:1000", + requesterId: "U123", + channelId: "C123", + threadTs: "1700000000.000000", + userMessage: "", + getConfiguration: () => ({}), + getArtifactState: () => undefined, + getMergedArtifactState: () => ({}), + authorizationFlowMode: "disabled", + }, + abortAgent, + ); + + await orchestration.authProviderFactory({ + manifest: { + name: "github", + }, + } as any); + + await expect( + orchestration.onAuthorizationRequired("github"), + ).rejects.toBeInstanceOf(AuthorizationFlowDisabledError); + + expect(deleteMcpAuthSession).toHaveBeenCalledWith("auth_1"); + expect(patchMcpAuthSession).not.toHaveBeenCalled(); + expect(getMcpAuthSession).not.toHaveBeenCalled(); + expect(deliverPrivateMessage).not.toHaveBeenCalled(); + expect(abortAgent).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/junior/tests/unit/services/plugin-auth-orchestration.test.ts b/packages/junior/tests/unit/services/plugin-auth-orchestration.test.ts index 442e094f..9d20559c 100644 --- a/packages/junior/tests/unit/services/plugin-auth-orchestration.test.ts +++ b/packages/junior/tests/unit/services/plugin-auth-orchestration.test.ts @@ -4,6 +4,7 @@ import { PluginAuthorizationPauseError, PluginCredentialFailureError, } from "@/chat/services/plugin-auth-orchestration"; +import { AuthorizationFlowDisabledError } from "@/chat/services/auth-pause"; import type { Skill } from "@/chat/skills"; const { @@ -143,6 +144,39 @@ describe("createPluginAuthOrchestration", () => { ); }); + it("returns a deterministic error instead of starting oauth when authorization is disabled", async () => { + startOAuthFlow.mockResolvedValue({ + ok: true, + delivery: { channelId: "D123" }, + }); + const abortAgent = vi.fn(); + const userTokenStore = {} as any; + const orchestration = createPluginAuthOrchestration( + { + requesterId: "U123", + userMessage: "check Sentry", + userTokenStore, + authorizationFlowMode: "disabled", + }, + abortAgent, + ); + + await expect( + orchestration.handleCommandFailure({ + activeSkill: sentrySkill, + command: "sentry issue list", + details: { + exit_code: 1, + stderr: "junior-auth-required provider=sentry", + }, + }), + ).rejects.toBeInstanceOf(AuthorizationFlowDisabledError); + + expect(startOAuthFlow).not.toHaveBeenCalled(); + expect(unlinkProvider).not.toHaveBeenCalled(); + expect(abortAgent).not.toHaveBeenCalled(); + }); + it("unlinks the stored token only after oauth restart is launched", async () => { const order: string[] = []; const userTokenStore = {} as any; diff --git a/packages/junior/tests/unit/tools/agent-tools.test.ts b/packages/junior/tests/unit/tools/agent-tools.test.ts index edc684fe..a3bb5ded 100644 --- a/packages/junior/tests/unit/tools/agent-tools.test.ts +++ b/packages/junior/tests/unit/tools/agent-tools.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { PluginAuthorizationPauseError } from "@/chat/services/plugin-auth-orchestration"; +import { AuthorizationFlowDisabledError } from "@/chat/services/auth-pause"; import { SkillSandbox } from "@/chat/sandbox/skill-sandbox"; import { createAgentTools } from "@/chat/tools/agent-tools"; import type { Skill } from "@/chat/skills"; @@ -310,4 +311,51 @@ describe("createAgentTools", () => { }); expect(handleToolExecutionError).not.toHaveBeenCalled(); }); + + it("rethrows disabled authorization errors without reporting a tool failure", async () => { + const sandbox = new SkillSandbox([githubSkill], [githubSkill]); + const pluginAuthOrchestration = { + handleCommandFailure: vi.fn(async () => { + throw new AuthorizationFlowDisabledError("plugin", "github"); + }), + } as any; + const sandboxExecutor = { + canExecute: (toolName: string) => toolName === "bash", + execute: vi.fn(async () => ({ + result: { + ok: false, + command: "gh issue view 123", + cwd: "/vercel/sandbox", + exit_code: 1, + signal: null, + timed_out: false, + stdout: "", + stderr: "bad credentials", + stdout_truncated: false, + stderr_truncated: false, + }, + })), + } as any; + + const [bashTool] = createAgentTools( + { + bash: { + description: "bash", + inputSchema: {} as any, + execute: async () => ({ ok: true }), + }, + }, + sandbox, + {}, + undefined, + sandboxExecutor, + pluginAuthOrchestration, + undefined, + ); + + await expect( + bashTool!.execute("tool-2", { command: "gh issue view 123" }), + ).rejects.toBeInstanceOf(AuthorizationFlowDisabledError); + expect(handleToolExecutionError).not.toHaveBeenCalled(); + }); }); diff --git a/specs/index.md b/specs/index.md index 9c3b8c4a..c05871f5 100644 --- a/specs/index.md +++ b/specs/index.md @@ -17,6 +17,7 @@ - 2026-04-28: Added canonical agent prompt spec. - 2026-05-06: Added draft advisor tool spec. - 2026-05-13: Added ownership map for chat, agent session, and Slack delivery specs. +- 2026-05-18: Added draft scheduler spec for scheduled Junior tasks. ## Status @@ -82,6 +83,7 @@ For chat/agent/Slack turn behavior: ## Draft Specs - `specs/advisor-tool-spec.md` +- `specs/scheduler-spec.md` ## Archive Policy diff --git a/specs/scheduler-spec.md b/specs/scheduler-spec.md new file mode 100644 index 00000000..2b0d143e --- /dev/null +++ b/specs/scheduler-spec.md @@ -0,0 +1,189 @@ +# Scheduler Spec + +## Metadata + +- Created: 2026-05-18 +- Last Edited: 2026-05-18 + +## Changelog + +- 2026-05-18: Clarified V1 calendar model: exact next-run instants plus simple daily/weekly/monthly/yearly recurrence rules. +- 2026-05-18: Initial draft contract for scheduled Junior tasks, prompt framing, no-SQL storage, run idempotency, and eval-first verification. + +## Status + +Draft + +## Purpose + +Define the first scheduler contract for Junior: users can create durable tasks that Junior executes later or repeatedly, with explicit task framing and delivery back to the configured surface. + +## Scope + +- Scheduled task and scheduled run data model. +- Prompt envelope used when executing a scheduled task. +- Storage and idempotency rules. +- Slack authoring and management behavior. +- Verification layer ownership. + +## Non-Goals + +- A generic event-rule engine for GitHub, Slack, Sentry, or webhook events. +- SQL-backed storage as a V1 requirement. +- A full durable workflow runtime such as Temporal or Vercel Workflow. +- Reusing timeout-resume callbacks as the product scheduler. +- Slack `chat.scheduleMessage` as the execution mechanism. + +## Contracts + +### Product Boundary + +A scheduled task is not a stored Slack message. It is a normalized task contract that Junior executes on a time trigger. + +The stored task must include: + +- task title +- objective +- instructions +- expected output +- creator/requester identity +- destination surface +- schedule and timezone +- current status +- next-run timestamp when active +- recurrence rule when recurring +- optional constraints and source context + +The original user utterance may be retained for audit/debugging, but it must not be the sole execution input. + +### Calendar Model + +Every active task must have an exact `nextRunAtMs` instant. For one-off tasks, that instant is the complete schedule. + +Recurring tasks must also store a small calendar recurrence rule: + +- frequency: `daily`, `weekly`, `monthly`, or `yearly` +- positive interval +- local start date +- local time +- timezone +- optional weekly weekdays +- optional monthly/yearly exact day-of-month and month + +V1 recurrence is calendar-based, not fixed-duration. For example, "every Monday at 9am America/Los_Angeles" should continue to run at 9am local time across daylight-saving changes. Monthly and yearly recurrences use exact calendar dates; unsupported dates are skipped rather than converted into "last day" or "business day" behavior. + +The scheduler does not need advanced rules such as first business day, nearest weekday, holiday calendars, or arbitrary cron syntax. + +### Prompt Framing + +Every scheduled run must compile the stored task into a marker-delimited prompt before entering the agent runtime. + +The prompt must make these facts explicit: + +1. This is an autonomous scheduled run. +2. This is not a request to create, update, pause, delete, or list schedules. +3. The task contract is the source of truth for what to execute. +4. The run should complete without asking follow-up questions unless access, approval, or required input is missing. +5. If blocked, the result should identify the missing provider, permission, or input. + +The compiled prompt must separate descriptive task facts from directives. Use marker blocks such as: + +- `` +- `` +- `` +- `` +- `` + +This follows the router and turn-context pattern: background and state live in descriptive blocks, while behavior rules live in a rules block and the actual ask appears last. + +### Storage + +V1 must not require SQL. The scheduler store should use the existing durable state dependency already required by Junior deployments. + +The initial implementation may use the Chat SDK state adapter and a global task index: + +- `junior:scheduler:task:{task_id}` stores the task record. +- `junior:scheduler:tasks` stores task ids for due scans. +- `junior:scheduler:team:{team_id}:tasks` stores task ids for workspace management. +- `junior:scheduler:run:{run_id}` stores run history. +- `junior:scheduler:claim:{task_id}:{scheduled_for_ms}` is the idempotency claim. + +A future Redis-native store may replace the scan index with a sorted due index without changing the runtime-facing scheduler store interface. + +### Run Idempotency + +Scheduled execution is at-least-once at the trigger layer and exactly-once-best-effort at Junior's run layer. + +Rules: + +1. A run idempotency key is `task_id:scheduled_for_ms`. +2. The scheduler must claim that key before dispatch. +3. Duplicate ticks, retries, and overlapping invocations must return the existing run or skip dispatch. +4. Run side effects must be keyed by the scheduled run id where possible. +5. A task must not overlap with itself by default. If one run is active, a later due time should be skipped, coalesced, or blocked according to the task policy. + +### Auth Principal + +Scheduled runs execute as the task creator unless the task contract explicitly names a different supported service principal. + +Requester-bound provider credentials, OAuth state, sandbox egress, and audit metadata must use the scheduled task principal. If that principal lacks valid credentials, Junior must block the run and privately notify the creator when possible. Authorization links must not be posted publicly. + +### Slack UX + +Slack authoring should be confirm-first: + +1. User asks Junior to schedule work. +2. Junior drafts the normalized task: title, cadence, timezone, destination, objective, expected output, and next run. +3. User confirms before the task becomes active. +4. Junior supports list, pause, resume, delete, and run-now commands. + +Confirmation should show the executable task contract, not only echo the user's text. + +## Failure Model + +1. Tick delivery fails: the task remains due and a later tick may claim it. +2. Duplicate tick delivery: the run claim suppresses duplicate dispatch. +3. Run fails after claim: run record captures failure and retry policy decides whether to re-dispatch. +4. Task credentials are missing: mark the run blocked and keep or pause the task according to policy. +5. Prompt framing is ambiguous: evals must catch cases where the model creates/edits a schedule instead of executing the task. + +## Observability + +Scheduler execution should emit safe task/run metadata only: + +- task id +- run id +- scheduled timestamp +- task status +- run status +- destination platform and channel id +- requester Slack user id + +Logs and spans must not include OAuth tokens, provider credentials, raw authorization URLs, or private tool payloads. + +## Verification + +Use evals for model-dependent behavior: + +- natural-language schedule extraction +- task framing quality +- confirmation quality +- scheduled-run execution behavior +- not confusing scheduled execution with schedule creation + +Use integration tests for runtime/storage contracts that do not depend on model interpretation: + +- due claim idempotency +- blocked auth path +- dispatch to Slack delivery +- pause/delete/list management surfaces + +Use unit tests only for small deterministic helpers when integration or eval coverage would be wasteful. + +## Related Specs + +- `./chat-architecture-spec.md` +- `./agent-prompt-spec.md` +- `./agent-session-resumability-spec.md` +- `./slack-agent-delivery-spec.md` +- `./testing/index.md`