From 2226914c0c4ba363b40f49c5d6c784cca35b1d4e Mon Sep 17 00:00:00 2001 From: zhanggroot7 Date: Tue, 28 Apr 2026 18:38:17 +0800 Subject: [PATCH] feat: build:web, workflow trace, and part_deltas chunk recording 1. Add build:web script. 2. Add trace for workflow (per-request JSON, middleware, Effect ref, deferred persist for prompt_async). 3. Use part_deltas to record each chunk (aligned with session.updatePartDelta / message.part.delta); model_stream_text and TTFT kept in workflow-trace. Made-with: Cursor --- WORKFLOW-TRACE-PERF.md | 41 ++ changelog/20260428/readme.md | 61 +++ package.json | 3 + packages/app/package.json | 8 +- packages/core/src/global.ts | 4 + .../core/test/fixture/effect-flock-worker.ts | 1 + packages/core/test/util/effect-flock.test.ts | 1 + packages/opencode/src/provider/provider.ts | 94 +++- .../src/server/routes/instance/middleware.ts | 18 +- .../src/server/routes/instance/session.ts | 31 +- .../src/server/routes/instance/trace.ts | 36 +- packages/opencode/src/server/server.ts | 2 + .../opencode/src/server/workflow-trace.ts | 477 ++++++++++++++++++ packages/opencode/src/session/processor.ts | 28 + packages/opencode/src/session/prompt.ts | 3 + scripts/build-and-web.sh | 61 +++ 16 files changed, 855 insertions(+), 14 deletions(-) create mode 100644 WORKFLOW-TRACE-PERF.md create mode 100644 changelog/20260428/readme.md create mode 100644 packages/opencode/src/server/workflow-trace.ts create mode 100755 scripts/build-and-web.sh diff --git a/WORKFLOW-TRACE-PERF.md b/WORKFLOW-TRACE-PERF.md new file mode 100644 index 000000000000..866970b31b1c --- /dev/null +++ b/WORKFLOW-TRACE-PERF.md @@ -0,0 +1,41 @@ +# Workflow trace 性能事故记录(供回顾) + +## 现象 + +在 `packages/opencode/src/server/workflow-trace.ts` 增强 trace(记录 `user_input`、`model_stream_text`、以及历史上按 SDK 事件逐条落盘的 `llm_stream` 等)之后,出现 **agent 长时间卡住、界面像全部失灵** 的情况。当前设计已移除 `llm_stream` / `stream_packages`,UI 对齐的流式片段见 `part_deltas`。 + +## 根因(大坑) + +流式输出时,模型会触发大量 **`text-delta`** 事件。若对**整段字符串**做反复拼接: + +```text +s = s + delta // 每次拷贝整段前缀,总长度 n、事件次数 m → 近似 O(n×m),实际可视为 O(n²) 级别 +``` + +两处都曾中招: + +1. **`chat.model_stream_text`**:每个 delta 执行 `model_stream_text = model_stream_text + delta`。 +2. **逐事件的流式 trace row**(旧版 `llm_stream`):对 `_traceTextBuf` / `_traceReasonBuf` 使用 `buf += delta` 再 `slice`。 + +token 稍多就会把 **主线程/event loop 打满**,表现为全局卡死,而不是单个接口超时。 + +## 修复要点 + +1. **流式正文**:用 **`string[]` 分块 `push`**,写盘前 **`join` 一次**(线性时间)。 +2. **逐 delta 的 trace**:避免对大 buffer 反复 `+=`;`part_deltas` 为按条 `push` 的结构化行;`model_stream_text` 仍用分块数组 + 一次 `join`。 +3. **写文件**:**不要**在 middleware 的同步路径里做 `materialize + JSON.stringify + fs.write`。使用 **`scheduleTracePersist`**:`setImmediate` 里再 `async` 执行 `materializeModelStreamText`、组 payload、`await writeTraceFile`,请求路径不等待。 + +## 相关代码 + +- `packages/opencode/src/server/workflow-trace.ts`:`appendModelStreamText`、`traceRecordMessagePartDelta`、`scheduleWorkflowTracePersist` +- `packages/opencode/src/session/processor.ts`:每条流事件仍会调用 `traceRecordLlmStreamEvent`(需保持 O(1) 摊销/线性累计,避免热路径字符串拼接) + +## 自检清单(以后改 trace) + +- [ ] 热路径里是否对** growing string** 做 `+=`? +- [ ] 大对象 `JSON.stringify` 是否在请求返回前同步执行? +- [ ] 磁盘 I/O 是否已脱离 HTTP `finally` 的主线? + +--- + +*记录日期:以本仓库当时修改为准;便于日后 code review 与 onboarding。* diff --git a/changelog/20260428/readme.md b/changelog/20260428/readme.md new file mode 100644 index 000000000000..1278c56a3e04 --- /dev/null +++ b/changelog/20260428/readme.md @@ -0,0 +1,61 @@ +# 2026-04-28 — Workflow trace 里程碑 + +## 概要 + +在 OpenCode 服务端 HTTP 工作流中引入 **workflow trace**:按请求生成结构化 JSON,便于排查会话、模型流与耗时。 + +## 主要能力 + +- **落盘位置**:与日志同级,例如 `~/.local/share/opencode/trace/`(随 `Global.Path.trace`)。 +- **文件命名**:`trace__.json`,**一请求一文件**(可配置/路径例外见下)。 +- **内容**: + - HTTP 元数据、`events` 时间线(`opencode..=|k=v`)。 + - **`chat`**:`user_input`、`model_stream_text`(流式正文拼接)、`part_deltas`(与 UI 一致的 `message.part.delta` 粒度)、`assistant_output`、`model` 等。 +- **Effect 贯通**:`WorkflowTraceSessionRef` 将 trace session 传入 `AppRuntime.runPromise`(ALS 无法穿透 Effect)。 +- **`prompt_async`**:204 返回后 prompt 仍在后台执行,trace **推迟到 `runRequest.finally`** 再持久化,避免只有 `user_input`、无模型回复。 +- **性能**: + - 流式场景禁止对 growing string 反复 `+=`(曾导致近似 O(n²) 卡死);改为分块数组 + 一次 `join`。 + - 持久化经 `setImmediate` 异步调度,避免阻塞 HTTP 收尾。 +- **默认范围**:仅对 `POST /session/:id/message|prompt_async|command|shell` 写 trace;`OPENCODE_WORKFLOW_TRACE=all` 或请求头 `x-opencode-workflow-trace: 1` 可扩大范围。 + +## 相关代码(入口) + +相对本文件 `changelog/20260428/readme.md` 的路径,可在 IDE / GitHub 中点击跳转。 + +- [`packages/opencode/src/server/workflow-trace.ts`](../../packages/opencode/src/server/workflow-trace.ts) — 中间件、`scheduleWorkflowTracePersist`、`traceRecordLlmStreamEvent` 等。 +- [`packages/opencode/src/server/routes/instance/trace.ts`](../../packages/opencode/src/server/routes/instance/trace.ts) — `runRequest` + `WorkflowTraceSessionRef`。 +- [`packages/opencode/src/session/prompt.ts`](../../packages/opencode/src/session/prompt.ts) — `traceChatInitFromUserMessage`。 +- [`packages/opencode/src/session/processor.ts`](../../packages/opencode/src/session/processor.ts) — 流事件与 `traceChatFinalizeAssistant`。 +- [`packages/core/src/global.ts`](../../packages/core/src/global.ts) — `Path.trace`。 +- [`packages/opencode/src/server/routes/instance/session.ts`](../../packages/opencode/src/server/routes/instance/session.ts) — `prompt_async` 推迟 `scheduleWorkflowTracePersist`。 + +## 事故与备忘 + +根目录 [**`WORKFLOW-TRACE-PERF.md`**](../../WORKFLOW-TRACE-PERF.md) 记录了「流式 trace 字符串拼接导致全局卡顿」的根因与自检清单,评审时可对照。 + +## 里程碑反思(提醒自己还能做得更好) + +这一条不只是「功能合入」,更是一轮 **设计纠偏**:先按 SDK `fullStream` 逐事件落盘(`llm_stream` / `stream_packages`),后来发现与 **UI / bus 实际展示的粒度** 不一致,排查时对不上号,还加重了热路径与体积。收敛为 **`part_deltas`**(与 `session.updatePartDelta` / `message.part.delta` 一一对应)后,trace 才成为「和用户看到的一样」的可靠回放。 + +**可以记下来的原则:** + +- **观测与产品同构**:调试数据优先对齐用户可见的通道与分片,而不是对齐任意一层 SDK 抽象。 +- **热路径先量后加**:流式场景默认假设「事件极多」;任何 `+=`、大块 `JSON.stringify`、同步写盘都要当作红灯。 +- **少即是多**:能用一个字段讲清故事,就不要维护两套语义相近的流(减少解析心智与维护成本)。 + +**后续仍可改进的方向(备忘,非承诺):** 采样/开关更细粒度、trace 与隐私/体积的平衡、与外部可观测性(trace id、导出格式)的统一——需要时再拆任务。 + +## 环境变量(节选) + +| 变量 | 含义 | +|------|------| +| `OPENCODE_WORKFLOW_TRACE` | `all` / `1`:对所有未跳过路由写 trace | +| `OPENCODE_TRACE_MAX_USER_CHARS` | `user_input` 上限 | +| `OPENCODE_TRACE_MAX_STREAM_CHARS` | `model_stream_text` 上限 | +| `OPENCODE_TRACE_MAX_ASSISTANT_CHARS` | `assistant_output` 上限 | +| `OPENCODE_TRACE_MAX_PART_DELTAS` | `part_deltas` 最大条数 | +| `OPENCODE_TRACE_PART_DELTA_CAP` | 单条 `part_deltas[].delta` 字符上限 | + +--- + +*本目录用于按日期归档重要变更;本条为 workflow trace 里程碑:含首次合入、性能事故教训,以及「与 UI 对齐的 `part_deltas`」这一设计转向,便于日后翻回来看「当时为什么这样选」。* diff --git a/package.json b/package.json index 2e53fab9cc5f..c9e8cb1067ba 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,9 @@ "type": "module", "packageManager": "bun@1.3.13", "scripts": { + "build": "bun run --cwd packages/opencode build", + "build:fast": "bun run --cwd packages/opencode build -- --single --skip-embed-web-ui", + "build:web": "bash scripts/build-and-web.sh", "dev": "bun run --cwd packages/opencode --conditions=browser src/index.ts", "dev:desktop": "bun --cwd packages/desktop-electron dev", "dev:web": "bun --cwd packages/app dev", diff --git a/packages/app/package.json b/packages/app/package.json index ce6b12ca3eec..55d1011e22e0 100644 --- a/packages/app/package.json +++ b/packages/app/package.json @@ -10,10 +10,10 @@ }, "scripts": { "typecheck": "tsgo -b", - "start": "vite", - "dev": "vite", - "build": "vite build", - "serve": "vite preview", + "start": "bunx --bun vite", + "dev": "bunx --bun vite", + "build": "bunx --bun vite build", + "serve": "bunx --bun vite preview", "test": "bun run test:unit", "test:ci": "mkdir -p .artifacts/unit && bun test --preload ./happydom.ts ./src --reporter=junit --reporter-outfile=.artifacts/unit/junit.xml", "test:unit": "bun test --preload ./happydom.ts ./src", diff --git a/packages/core/src/global.ts b/packages/core/src/global.ts index 0c83e3a1fa93..ba09337937d8 100644 --- a/packages/core/src/global.ts +++ b/packages/core/src/global.ts @@ -18,6 +18,7 @@ const paths = { data, bin: path.join(cache, "bin"), log: path.join(data, "log"), + trace: path.join(data, "trace"), cache, config, state, @@ -32,6 +33,7 @@ await Promise.all([ fs.mkdir(Path.config, { recursive: true }), fs.mkdir(Path.state, { recursive: true }), fs.mkdir(Path.log, { recursive: true }), + fs.mkdir(Path.trace, { recursive: true }), fs.mkdir(Path.bin, { recursive: true }), ]) @@ -45,6 +47,7 @@ export interface Interface { readonly state: string readonly bin: string readonly log: string + readonly trace: string } export const layer = Layer.effect( @@ -58,6 +61,7 @@ export const layer = Layer.effect( state: Path.state, bin: Path.bin, log: Path.log, + trace: Path.trace, }) }), ) diff --git a/packages/core/test/fixture/effect-flock-worker.ts b/packages/core/test/fixture/effect-flock-worker.ts index 3dc3ee2c8b6d..f0c070bc6e1d 100644 --- a/packages/core/test/fixture/effect-flock-worker.ts +++ b/packages/core/test/fixture/effect-flock-worker.ts @@ -30,6 +30,7 @@ const testGlobal = Layer.succeed( state: os.tmpdir(), bin: os.tmpdir(), log: os.tmpdir(), + trace: os.tmpdir(), }), ) diff --git a/packages/core/test/util/effect-flock.test.ts b/packages/core/test/util/effect-flock.test.ts index 9e8bc24ace2a..c3a6b9cc6233 100644 --- a/packages/core/test/util/effect-flock.test.ts +++ b/packages/core/test/util/effect-flock.test.ts @@ -103,6 +103,7 @@ const testGlobal = Layer.succeed( state: os.tmpdir(), bin: os.tmpdir(), log: os.tmpdir(), + trace: os.tmpdir(), }), ) diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index 841fd97f082e..4c50ad52a707 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -31,6 +31,93 @@ import { ModelID, ProviderID } from "./schema" const log = Log.create({ service: "provider" }) +/** Set `OPENCODE_LOG_SSE=1` to log each raw SSE body chunk after fetch (`request_id`, bytes, UTF-8 preview). */ +function sseLoggingEnabled() { + const v = process.env.OPENCODE_LOG_SSE?.toLowerCase() + return v === "1" || v === "true" || v === "all" +} + +function requestUrlString(input: RequestInfo | URL): string { + if (typeof input === "string") return input + if (input instanceof URL) return input.href + return input.url +} + +/** Stable id for one streamed response: prefer provider headers, else a new UUID for this tap. */ +function sseStreamRequestId(res: Response): string { + const h = res.headers + const fromHeader = + h.get("x-request-id") ?? + h.get("request-id") ?? + h.get("cf-ray") ?? + h.get("x-amzn-requestid") ?? + h.get("x-openai-request-id") + const trimmed = fromHeader?.trim() + if (trimmed) return trimmed + return crypto.randomUUID() +} + +/** Pass-through stream that logs every chunk from the provider (debugging SSE granularity). */ +function tapSSEBody(res: Response, url: string): Response { + if (!sseLoggingEnabled() || !res.body) return res + const ct = res.headers.get("content-type") ?? "" + if (!ct.includes("event-stream")) return res + + const requestId = sseStreamRequestId(res) + const reader = res.body.getReader() + const dec = new TextDecoder() + let seq = 0 + const maxPreview = envInt("OPENCODE_LOG_SSE_PREVIEW", 2000) + + const body = new ReadableStream({ + async pull(ctrl) { + const part = await reader.read() + if (part.done) { + const tail = dec.decode() + if (tail) { + seq += 1 + log.info("sse chunk", { + request_id: requestId, + url, + seq, + bytes: 0, + text: tail.length > maxPreview ? `${tail.slice(0, maxPreview)}...` : tail, + flush: true, + }) + } + ctrl.close() + return + } + seq += 1 + const text = dec.decode(part.value, { stream: true }) + log.info("sse chunk", { + request_id: requestId, + url, + seq, + bytes: part.value.byteLength, + text: text.length > maxPreview ? `${text.slice(0, maxPreview)}...` : text, + }) + ctrl.enqueue(part.value) + }, + cancel(reason) { + return reader.cancel(reason) + }, + }) + + return new Response(body, { + headers: new Headers(res.headers), + status: res.status, + statusText: res.statusText, + }) +} + +function envInt(name: string, fallback: number) { + const raw = process.env[name] + if (raw === undefined || raw === "") return fallback + const n = Number.parseInt(raw, 10) + return Number.isFinite(n) && n > 0 ? n : fallback +} + function shouldUseCopilotResponsesApi(modelID: string): boolean { const match = /^gpt-(\d+)/.exec(modelID) if (!match) return false @@ -1484,8 +1571,11 @@ const layer: Layer.Layer< timeout: false, }) - if (!chunkAbortCtl) return res - return wrapSSE(res, chunkTimeout, chunkAbortCtl) + const urlStr = requestUrlString(input as RequestInfo | URL) + const tapped = tapSSEBody(res, urlStr) + + if (!chunkAbortCtl) return tapped + return wrapSSE(tapped, chunkTimeout, chunkAbortCtl) } const bundledLoader = BUNDLED_PROVIDERS[model.api.npm] diff --git a/packages/opencode/src/server/routes/instance/middleware.ts b/packages/opencode/src/server/routes/instance/middleware.ts index 19918b8b487d..cd2acd9fa304 100644 --- a/packages/opencode/src/server/routes/instance/middleware.ts +++ b/packages/opencode/src/server/routes/instance/middleware.ts @@ -5,6 +5,7 @@ import { AppRuntime } from "@/effect/app-runtime" import { AppFileSystem } from "@opencode-ai/core/filesystem" import { WorkspaceContext } from "@/control-plane/workspace-context" import { WorkspaceID } from "@/control-plane/schema" +import { traceStep } from "@/server/workflow-trace" export function InstanceMiddleware(workspaceID?: WorkspaceID): MiddlewareHandler { return async (c, next) => { @@ -19,14 +20,29 @@ export function InstanceMiddleware(workspaceID?: WorkspaceID): MiddlewareHandler })(), ) + traceStep({ + business: "instance", + method: "resolve_directory", + detail: { + directory, + ...(workspaceID !== undefined ? { workspace_id: String(workspaceID) } : {}), + }, + }) + return WorkspaceContext.provide({ workspaceID, async fn() { + traceStep({ business: "instance", method: "workspace.enter" }) return Instance.provide({ directory, init: () => AppRuntime.runPromise(InstanceBootstrap), async fn() { - return next() + traceStep({ business: "instance", method: "route.enter" }) + try { + return await next() + } finally { + traceStep({ business: "instance", method: "route.exit" }) + } }, }) }, diff --git a/packages/opencode/src/server/routes/instance/session.ts b/packages/opencode/src/server/routes/instance/session.ts index 8a7752e34183..d35050c3f704 100644 --- a/packages/opencode/src/server/routes/instance/session.ts +++ b/packages/opencode/src/server/routes/instance/session.ts @@ -27,6 +27,11 @@ import { zodObject } from "@/util/effect-zod" import { Bus } from "@/bus" import { NamedError } from "@opencode-ai/core/util/error" import { jsonRequest, runRequest } from "./trace" +import { + getWorkflowTraceSession, + scheduleWorkflowTracePersist, + workflowTracePathDefersPersist, +} from "@/server/workflow-trace" const log = Log.create({ service: "server" }) @@ -920,19 +925,33 @@ export const SessionRoutes = lazy(() => async (c) => { const sessionID = c.req.valid("param").sessionID const body = c.req.valid("json") + const wfTrace = getWorkflowTraceSession() void runRequest( "SessionRoutes.prompt_async", c, SessionPrompt.Service.use((svc) => svc.prompt({ ...body, sessionID } as unknown as SessionPrompt.PromptInput), ), - ).catch((err) => { - log.error("prompt_async failed", { sessionID, error: err }) - void Bus.publish(Session.Event.Error, { - sessionID, - error: new NamedError.Unknown({ message: err instanceof Error ? err.message : String(err) }).toObject(), + ) + .finally(() => { + if (!wfTrace || !workflowTracePathDefersPersist(wfTrace.http.path)) return + const finishedAt = new Date().toISOString() + const durationMs = Math.round(performance.now() - wfTrace.t0Ms) + scheduleWorkflowTracePersist(wfTrace, wfTrace.traceFilePath, { + requestId: wfTrace.requestId, + startedAt: wfTrace.startedAt, + finishedAt, + durationMs, + finalStatus: 204, + }) + }) + .catch((err) => { + log.error("prompt_async failed", { sessionID, error: err }) + void Bus.publish(Session.Event.Error, { + sessionID, + error: new NamedError.Unknown({ message: err instanceof Error ? err.message : String(err) }).toObject(), + }) }) - }) return c.body(null, 204) }, diff --git a/packages/opencode/src/server/routes/instance/trace.ts b/packages/opencode/src/server/routes/instance/trace.ts index 4c7119ef3ace..f7128d792331 100644 --- a/packages/opencode/src/server/routes/instance/trace.ts +++ b/packages/opencode/src/server/routes/instance/trace.ts @@ -1,6 +1,7 @@ import type { Context } from "hono" import { Effect } from "effect" import { AppRuntime } from "@/effect/app-runtime" +import { getWorkflowTraceSession, traceStepWithSession, WorkflowTraceSessionRef } from "@/server/workflow-trace" type AppEnv = Parameters[0] extends Effect.Effect ? R : never @@ -41,7 +42,40 @@ export function requestAttributes(c: RequestLike): Record { } export function runRequest(name: string, c: Context, effect: Effect.Effect) { - return AppRuntime.runPromise(effect.pipe(Effect.withSpan(name, { attributes: requestAttributes(c) }))) + const session = getWorkflowTraceSession() + const attrs = requestAttributes(c) + traceStepWithSession(session, { + business: "handler", + method: "run_request.schedule", + detail: { + span: name, + http_method: attrs["http.method"], + http_path: attrs["http.path"], + }, + }) + const traced = effect.pipe( + Effect.withSpan(name, { attributes: attrs }), + Effect.tap(() => + Effect.sync(() => + traceStepWithSession(session, { + business: "handler", + method: "run_request.ok", + detail: { span: name }, + }), + ), + ), + Effect.tapError((e) => + Effect.sync(() => + traceStepWithSession(session, { + business: "handler", + method: "run_request.fail", + detail: { span: name, error: String(e) }, + }), + ), + ), + Effect.provideService(WorkflowTraceSessionRef, session), + ) + return AppRuntime.runPromise(traced) } export async function jsonRequest( diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 7d5373dd9663..90222600600a 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -7,6 +7,7 @@ import { Flag } from "@opencode-ai/core/flag/flag" import { WorkspaceID } from "@/control-plane/schema" import { MDNS } from "./mdns" import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware" +import { WorkflowTraceMiddleware } from "./workflow-trace" import { FenceMiddleware } from "./fence" import { initProjectors } from "./projectors" import { InstanceRoutes } from "./routes/instance" @@ -40,6 +41,7 @@ function create(opts: { cors?: string[] }) { const app = new Hono() .onError(ErrorMiddleware) .use(AuthMiddleware) + .use(WorkflowTraceMiddleware) .use(LoggerMiddleware) .use(CompressionMiddleware) .use(CorsMiddleware(opts)) diff --git a/packages/opencode/src/server/workflow-trace.ts b/packages/opencode/src/server/workflow-trace.ts new file mode 100644 index 000000000000..7c2022f6e20c --- /dev/null +++ b/packages/opencode/src/server/workflow-trace.ts @@ -0,0 +1,477 @@ +import path from "path" +import fs from "fs/promises" +import { AsyncLocalStorage } from "node:async_hooks" +import type { MiddlewareHandler } from "hono" +import { Context, Effect } from "effect" +import { Global } from "@opencode-ai/core/global" +import * as Log from "@opencode-ai/core/util/log" +import type { Event as LlmStreamEvent } from "@/session/llm" + +const log = Log.create({ service: "workflow-trace" }) + +/** Written as `"format"` in every trace file for parsers. */ +export const TRACE_FORMAT = "opencode-trace-v2" + +const traceAls = new AsyncLocalStorage() + +function envInt(name: string, fallback: number) { + const raw = process.env[name] + if (raw === undefined || raw === "") return fallback + const n = Number.parseInt(raw, 10) + return Number.isFinite(n) && n > 0 ? n : fallback +} + +/** Max chars for `chat.user_input` (prompt text). */ +function maxUserInputChars() { + return envInt("OPENCODE_TRACE_MAX_USER_CHARS", 200_000) +} +/** Max chars for `chat.assistant_output` (final assistant text, all rounds). */ +function maxAssistantOutputChars() { + return envInt("OPENCODE_TRACE_MAX_ASSISTANT_CHARS", 500_000) +} +/** Max chars for `chat.model_stream_text` (concatenated assistant text deltas). */ +function maxModelStreamChars() { + return envInt("OPENCODE_TRACE_MAX_STREAM_CHARS", 500_000) +} +/** Cap `chat.part_deltas` rows (same granularity as bus `message.part.delta`). */ +const MAX_PART_DELTA_EVENTS = () => envInt("OPENCODE_TRACE_MAX_PART_DELTAS", 100_000) +/** Per-row cap for `part_deltas[].delta`. */ +const PART_DELTA_CAP = () => envInt("OPENCODE_TRACE_PART_DELTA_CAP", 8192) + +/** One bus-aligned `message.part.delta` payload (UI stream chunks). */ +export type WorkflowChatPartDeltaEntry = { + ms: number + message_id: string + part_id: string + field: string + delta: string +} + +export type WorkflowChatTrace = { + session_id?: string + user_message_id?: string + /** Full user-visible prompt text (capped). */ + user_input?: string + /** @deprecated use `user_input` */ + user_text_preview?: string + model?: string + assistant_message_id?: string + /** Final assistant plain text after stream completes (capped). */ + assistant_output?: string + /** @deprecated use `assistant_output` */ + assistant_text_preview?: string + /** Concatenated assistant `text-delta` chunks in order (stream replay, capped). */ + model_stream_text?: string + /** + * Each entry matches one `session.updatePartDelta` / bus `message.part.delta` (text + reasoning). + * Order and chunking match what the UI receives. + */ + part_deltas?: WorkflowChatPartDeltaEntry[] + /** + * Time to first **user-visible** assistant text (`text-delta`), in ms from HTTP trace start (`t0`). + * Same notion as TTFT for chat UIs. Omitted if no text was streamed. + */ + ttft_ms?: number + /** + * Time to first streamed chunk from the model (`text-delta` or `reasoning-delta`), ms from HTTP trace start. + * Useful when the model emits reasoning before visible text. + */ + ttft_stream_ms?: number + /** Which delta type produced `ttft_stream_ms`. */ + ttft_stream_kind?: "text" | "reasoning" +} + +export type WorkflowTraceSession = { + readonly requestId: string + readonly startedAt: string + readonly t0Ms: number + /** Each entry is one line: `opencode..=[|k=v,...]` */ + events: string[] + readonly traceFilePath: string + http: { + method: string + path: string + url: string + status?: number + } + opencode?: { + directory?: string + workspace?: string + } + error?: { message: string } + /** Populated for chat HTTP handlers (user preview + LLM stream replay). */ + chat?: WorkflowChatTrace + /** @internal model stream chunks (avoid O(n²) string += on every token) */ + _modelStreamChunks?: string[] + _modelStreamTotal?: number +} + +/** + * Carries the HTTP workflow trace session through `AppRuntime.runPromise` (ALS does not propagate into Effect). + */ +export const WorkflowTraceSessionRef = Context.Reference( + "@opencode/WorkflowTraceSession", + { + defaultValue: () => undefined, + }, +) + +function seg(s: string) { + return s.replace(/[^a-zA-Z0-9_]/g, "_").replace(/_+/g, "_").replace(/^_|_$/g, "").toLowerCase() || "x" +} + +function encodeDetail(detail?: Record): string { + if (!detail) return "" + const parts: string[] = [] + for (const [k, v] of Object.entries(detail)) { + if (v === undefined) continue + const t = typeof v + if (t !== "string" && t !== "number" && t !== "boolean") continue + let s = String(v) + if (s.length > 500) s = s.slice(0, 497) + "..." + s = s.replace(/\|/g, "%7C").replace(/=/g, "%3D").replace(/,/g, "%2C").replace(/\n/g, "%0A") + parts.push(`${seg(k)}=${s}`) + } + return parts.length ? `|${parts.join(",")}` : "" +} + +/** + * Fixed trace line: `opencode..=` optional `|k=v,...` for grep / scripts. + * Example: `opencode.http.request.enter=0|request_id=abc` + */ +export function formatTraceLine( + business: string, + method: string, + ms: number, + detail?: Record, +): string { + return `opencode.${seg(business)}.${seg(method)}=${Math.round(ms)}${encodeDetail(detail)}` +} + +function appendEvent(session: WorkflowTraceSession, line: string) { + session.events.push(line) +} + +/** Record TTFT / first-stream metrics once per HTTP request (multi-round safe). */ +function recordTtftOnDelta(s: WorkflowTraceSession, delta: string, kind: "text" | "reasoning") { + if (!delta) return + ensureChat(s) + const ms = Math.round(performance.now() - s.t0Ms) + if (s.chat!.ttft_stream_ms === undefined) { + s.chat!.ttft_stream_ms = ms + s.chat!.ttft_stream_kind = kind + appendEvent( + s, + formatTraceLine("chat", "first_stream_token", ms, { kind, ttft_stream_ms: ms }), + ) + } + if (kind === "text" && s.chat!.ttft_ms === undefined) { + s.chat!.ttft_ms = ms + appendEvent(s, formatTraceLine("chat", "first_text_token", ms, { ttft_ms: ms })) + } +} + +/** Active workflow trace for this HTTP request, if any. */ +export function getWorkflowTraceSession(): WorkflowTraceSession | undefined { + return traceAls.getStore() +} + +/** + * Append one fixed-format line for the current request (inside WorkflowTraceMiddleware + traceAls.run). + */ +export function traceStep(input: { business: string; method: string; detail?: Record }) { + const s = traceAls.getStore() + if (!s) return + const ms = Math.round(performance.now() - s.t0Ms) + appendEvent(s, formatTraceLine(input.business, input.method, ms, input.detail)) +} + +export function traceStepEffect(input: { business: string; method: string; detail?: Record }) { + return Effect.sync(() => traceStep(input)) +} + +/** Use inside Effect when ALS may not propagate; pass session from getWorkflowTraceSession() at sync edge. */ +export function traceStepWithSession( + session: WorkflowTraceSession | undefined, + input: { business: string; method: string; detail?: Record }, +) { + if (!session) return + const ms = Math.round(performance.now() - session.t0Ms) + appendEvent(session, formatTraceLine(input.business, input.method, ms, input.detail)) +} + +function ensureChat(s: WorkflowTraceSession) { + if (!s.chat) s.chat = {} +} + +/** Accumulate assistant text-delta without repeated string reallocation (critical for streaming perf). */ +function appendModelStreamText(s: WorkflowTraceSession, delta: string) { + if (!delta) return + const cap = maxModelStreamChars() + const total = s._modelStreamTotal ?? 0 + if (total >= cap) return + let add = delta + if (total + add.length > cap) add = add.slice(0, cap - total) + if (!add) return + if (!s._modelStreamChunks) s._modelStreamChunks = [] + s._modelStreamChunks.push(add) + s._modelStreamTotal = total + add.length +} + +function materializeModelStreamText(s: WorkflowTraceSession) { + if (!s._modelStreamChunks?.length) return + ensureChat(s) + s.chat!.model_stream_text = s._modelStreamChunks.join("") +} + +/** Summarize user prompt parts for trace (no raw file bytes; filenames only). */ +export function summarizePromptPartsForTrace(parts: ReadonlyArray<{ type: string; text?: string; filename?: string }>) { + const chunks: string[] = [] + for (const p of parts) { + if (p.type === "text" && p.text) chunks.push(p.text) + else if (p.type === "file") + chunks.push(p.filename ? `[file:${p.filename}]` : "[file]") + else chunks.push(`[${p.type}]`) + } + const joined = chunks.join("\n") + const cap = maxUserInputChars() + if (joined.length <= cap) return joined + return `${joined.slice(0, cap - 12)}\n...[truncated]` +} + +/** Called after the user message is persisted (SessionPrompt.prompt). */ +export function traceChatInitFromUserMessage( + s: WorkflowTraceSession | undefined, + input: { sessionID: string; parts: ReadonlyArray<{ type: string; text?: string; filename?: string }> }, + user: { id: string; model: { providerID: string; modelID: string } }, +) { + if (!s) return + ensureChat(s) + s.chat!.session_id = input.sessionID + s.chat!.user_message_id = user.id + s.chat!.model = `${user.model.providerID}/${user.model.modelID}` + const text = summarizePromptPartsForTrace(input.parts) + s.chat!.user_input = text + s.chat!.user_text_preview = text +} + +/** Record one `message.part.delta` (after `Session.updatePartDelta`). */ +export function traceRecordMessagePartDelta( + s: WorkflowTraceSession | undefined, + input: { messageID: string; partID: string; field: string; delta: string }, +) { + if (!s || !input.delta) return + ensureChat(s) + const list = (s.chat!.part_deltas ??= []) + if (list.length >= MAX_PART_DELTA_EVENTS()) return + const cap = PART_DELTA_CAP() + let delta = input.delta + if (delta.length > cap) delta = `${delta.slice(0, cap - 3)}...` + list.push({ + ms: Math.round(performance.now() - s.t0Ms), + message_id: input.messageID, + part_id: input.partID, + field: input.field, + delta, + }) +} + +/** + * Lightweight fullStream hook: TTFT / `model_stream_text` only. + * Per-chunk UI replay lives in `chat.part_deltas` (`traceRecordMessagePartDelta`). + */ +export function traceRecordLlmStreamEvent(s: WorkflowTraceSession | undefined, ev: LlmStreamEvent) { + if (!s) return + switch (ev.type) { + case "start": + s._modelStreamChunks = undefined + s._modelStreamTotal = undefined + return + case "text-delta": + recordTtftOnDelta(s, ev.text, "text") + appendModelStreamText(s, ev.text) + return + case "reasoning-delta": + recordTtftOnDelta(s, ev.text, "reasoning") + return + default: + return + } +} + +/** Assistant plain text after each model stream completes (multi-round tool loops append). */ +export function traceChatFinalizeAssistant( + s: WorkflowTraceSession | undefined, + assistantMessageId: string, + fullText: string, +) { + if (!s) return + ensureChat(s) + s.chat!.assistant_message_id = assistantMessageId + const t = fullText.trim() + if (!t) return + const cap = maxAssistantOutputChars() + const prev = (s.chat!.assistant_output ?? s.chat!.assistant_text_preview ?? "").trim() + const merged = prev ? `${prev}\n\n--- round ---\n\n${t}` : t + const out = merged.length <= cap ? merged : `${merged.slice(0, cap - 12)}\n...[truncated]` + s.chat!.assistant_output = out + s.chat!.assistant_text_preview = out +} + +/** Same directory tier as `Global.Path.log`: `…/opencode/log` vs `…/opencode/trace`. */ +export function traceDirectory() { + return Global.Path.trace +} + +function shouldSkip(pathname: string, method: string) { + if (method === "OPTIONS") return true + if (pathname === "/log") return true + if (pathname === "/event" || pathname.startsWith("/event/")) return true + if (pathname === "/global/event" || pathname.startsWith("/global/event/")) return true + return false +} + +/** + * When `OPENCODE_WORKFLOW_TRACE` is unset, only session "chat" POSTs get a trace file + * (one HTTP request → one JSON file). Set to `all` or `1` to trace every non-skipped route. + * Header `x-opencode-workflow-trace: 1` forces tracing for that request. + */ +function shouldEmitTraceFile( + pathname: string, + method: string, + header: (name: string) => string | undefined, +): boolean { + if (shouldSkip(pathname, method)) return false + const h = header("x-opencode-workflow-trace")?.toLowerCase() + if (h === "1" || h === "all" || h === "true") return true + const mode = process.env.OPENCODE_WORKFLOW_TRACE?.toLowerCase() + if (mode === "all" || mode === "1" || mode === "true") return true + if (method !== "POST") return false + return /^\/session\/[^/]+\/(message|prompt_async|command|shell)$/.test(pathname) +} + +function safeTimestampForFilename(iso: string) { + return iso.replace(/[:.]/g, "-") +} + +async function writeTraceFile(filepath: string, payload: Record) { + await fs.mkdir(path.dirname(filepath), { recursive: true }) + await fs.writeFile(filepath, JSON.stringify(payload, null, 2), "utf8") +} + +/** POST …/prompt_async returns 204 before the prompt Effect finishes; skip middleware persist and call this from `runRequest(…).finally()`. */ +export function workflowTracePathDefersPersist(pathname: string): boolean { + return /\/session\/[^/]+\/prompt_async$/.test(pathname) +} + +/** Never block the request path: materialize + JSON + disk run after the current turn. */ +export function scheduleWorkflowTracePersist( + session: WorkflowTraceSession, + traceFilePath: string, + meta: { + requestId: string + startedAt: string + finishedAt: string + durationMs: number + finalStatus: number + }, +) { + setImmediate(() => { + void (async () => { + try { + materializeModelStreamText(session) + const payload: Record = { + format: TRACE_FORMAT, + request_id: meta.requestId, + started_at: meta.startedAt, + finished_at: meta.finishedAt, + duration_ms: meta.durationMs, + trace_file: traceFilePath, + http: { + ...session.http, + status: meta.finalStatus, + }, + opencode: session.opencode, + events: session.events, + } + if (session.chat !== undefined) payload.chat = session.chat + if (session.error !== undefined) payload.error = session.error + await writeTraceFile(traceFilePath, payload) + } catch (err) { + log.error("workflow trace write failed", { path: traceFilePath, error: err }) + } + })() + }) +} + +export const WorkflowTraceMiddleware: MiddlewareHandler = async (c, next) => { + const pathname = c.req.path + const method = c.req.method + if (shouldSkip(pathname, method)) return next() + if (!shouldEmitTraceFile(pathname, method, (n) => c.req.header(n))) return next() + + const requestId = crypto.randomUUID() + const startedAt = new Date().toISOString() + const t0Ms = performance.now() + const filename = `trace_${requestId}_${safeTimestampForFilename(startedAt)}.json` + const traceFilePath = path.join(traceDirectory(), filename) + + const session: WorkflowTraceSession = { + requestId, + startedAt, + t0Ms, + events: [], + traceFilePath, + http: { + method, + path: pathname, + url: c.req.url, + }, + opencode: { + directory: c.req.query("directory") ?? c.req.header("x-opencode-directory"), + workspace: c.req.query("workspace"), + }, + } + + appendEvent(session, formatTraceLine("http", "request.enter", 0, { request_id: requestId })) + c.header("X-OpenCode-Trace-Id", requestId) + + let status: number | null = null + let errMessage: string | undefined + + try { + await traceAls.run(session, async () => { + traceStep({ business: "http", method: "chain.before_next" }) + await next() + status = c.res.status + session.http.status = status + traceStep({ business: "http", method: "chain.after_next", detail: { status } }) + }) + } catch (e) { + errMessage = e instanceof Error ? e.message : String(e) + session.error = { message: errMessage } + throw e + } finally { + const durationMs = Math.round(performance.now() - t0Ms) + const finishedAt = new Date().toISOString() + const finalStatus = status ?? (errMessage ? 500 : 0) + + appendEvent( + session, + formatTraceLine("http", "request.exit", Math.round(performance.now() - session.t0Ms), { + status: finalStatus, + duration_ms: durationMs, + }), + ) + + if (!workflowTracePathDefersPersist(session.http.path)) { + scheduleWorkflowTracePersist(session, traceFilePath, { + requestId, + startedAt, + finishedAt, + durationMs, + finalStatus, + }) + } + } +} diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b475ec1c5997..0402fa751dc4 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -20,6 +20,12 @@ import { Question } from "@/question" import { errorMessage } from "@/util/error" import * as Log from "@opencode-ai/core/util/log" import { isRecord } from "@/util/record" +import { + WorkflowTraceSessionRef, + traceChatFinalizeAssistant, + traceRecordLlmStreamEvent, + traceRecordMessagePartDelta, +} from "@/server/workflow-trace" const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) @@ -71,6 +77,8 @@ interface ProcessorContext extends Input { needsCompaction: boolean currentText: MessageV2.TextPart | undefined reasoningMap: Record + /** Accumulated assistant plain text for workflow trace preview */ + traceAssistantText: string } type StreamEvent = Event @@ -121,6 +129,7 @@ export const layer: Layer.Layer< needsCompaction: false, currentText: undefined, reasoningMap: {}, + traceAssistantText: "", } let aborted = false const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id) @@ -214,6 +223,8 @@ export const layer: Layer.Layer< }) const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { + const wfTrace = yield* WorkflowTraceSessionRef + yield* Effect.sync(() => traceRecordLlmStreamEvent(wfTrace, value)) switch (value.type) { case "start": yield* status.set(ctx.sessionID, { type: "busy" }) @@ -244,6 +255,12 @@ export const layer: Layer.Layer< field: "text", delta: value.text, }) + traceRecordMessagePartDelta(wfTrace, { + messageID: ctx.reasoningMap[value.id].messageID, + partID: ctx.reasoningMap[value.id].id, + field: "text", + delta: value.text, + }) return case "reasoning-end": @@ -419,6 +436,7 @@ export const layer: Layer.Layer< case "text-delta": if (!ctx.currentText) return ctx.currentText.text += value.text + ctx.traceAssistantText += value.text if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, @@ -427,6 +445,12 @@ export const layer: Layer.Layer< field: "text", delta: value.text, }) + traceRecordMessagePartDelta(wfTrace, { + messageID: ctx.currentText.messageID, + partID: ctx.currentText.id, + field: "text", + delta: value.text, + }) return case "text-end": @@ -518,6 +542,10 @@ export const layer: Layer.Layer< ctx.toolcalls = {} ctx.assistantMessage.time.completed = Date.now() yield* session.updateMessage(ctx.assistantMessage) + const wfTraceDone = yield* WorkflowTraceSessionRef + yield* Effect.sync(() => + traceChatFinalizeAssistant(wfTraceDone, ctx.assistantMessage.id, ctx.traceAssistantText), + ) }) const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 4c259e4aef5a..53a17f310860 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -53,6 +53,7 @@ import { InstanceState } from "@/effect/instance-state" import { TaskTool, type TaskPromptOps } from "@/tool/task" import { SessionRunState } from "./run-state" import { EffectBridge } from "@/effect/bridge" +import { WorkflowTraceSessionRef, traceChatInitFromUserMessage } from "@/server/workflow-trace" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -1247,6 +1248,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the const session = yield* sessions.get(input.sessionID) yield* revert.cleanup(session) const message = yield* createUserMessage(input) + const wfTrace = yield* WorkflowTraceSessionRef + yield* Effect.sync(() => traceChatInitFromUserMessage(wfTrace, input, message.info)) yield* sessions.touch(input.sessionID) const permissions: Permission.Ruleset = [] diff --git a/scripts/build-and-web.sh b/scripts/build-and-web.sh new file mode 100755 index 000000000000..7011e39d74e0 --- /dev/null +++ b/scripts/build-and-web.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Build the opencode CLI bundle, then start the web server. +# Uses the binary under packages/opencode/dist/ — the Node wrapper in bin/opencode only +# resolves packages installed as node_modules/opencode--, not local dist/. +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT" + +bun run build + +resolve_opencode_binary() { + local os arch name exe p + exe=opencode + + case "$(uname -s)" in + Darwin) os=darwin ;; + Linux) os=linux ;; + MSYS_* | MINGW* | CYGWIN*) os=windows ;; + *) + echo "Unsupported OS: $(uname -s)" >&2 + return 1 + ;; + esac + + case "$(uname -m)" in + arm64 | aarch64) arch=arm64 ;; + x86_64 | amd64) arch=x64 ;; + *) + echo "Unsupported CPU: $(uname -m)" >&2 + return 1 + ;; + esac + + if [[ "$os" == "windows" ]]; then + exe=opencode.exe + fi + + name="opencode-${os}-${arch}" + p="${ROOT}/packages/opencode/dist/${name}/bin/${exe}" + if [[ -f "$p" ]]; then + echo "$p" + return 0 + fi + + # x64 optional baseline build (matches script/build.ts when AVX2 is absent) + if [[ "$arch" == "x64" ]]; then + p="${ROOT}/packages/opencode/dist/${name}-baseline/bin/${exe}" + if [[ -f "$p" ]]; then + echo "$p" + return 0 + fi + fi + + echo "No compiled binary at packages/opencode/dist/${name}/bin/${exe} (or -baseline on x64)." >&2 + echo "Run from repo root: bun run build" >&2 + return 1 +} + +BIN="$(resolve_opencode_binary)" +exec "$BIN" web "$@"