Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions WORKFLOW-TRACE-PERF.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Workflow trace 性能事故记录(供回顾)

## 现象

在 `packages/opencode/src/server/workflow-trace.ts` 增强 trace(记录 `user_input`、`model_stream_text`、以及历史上按 SDK 事件逐条落盘的 `llm_stream` 等)之后,出现 **agent 长时间卡住、界面像全部失灵** 的情况。当前设计已移除 `llm_stream` / `stream_packages`,UI 对齐的流式片段见 `part_deltas`。

## 根因(大坑)

流式输出时,模型会触发大量 **`text-delta`** 事件。若对**整段字符串**做反复拼接:

```text
s = s + delta // 每次拷贝整段前缀,总长度 n、事件次数 m → 近似 O(n×m),实际可视为 O(n²) 级别
```

两处都曾中招:

1. **`chat.model_stream_text`**:每个 delta 执行 `model_stream_text = model_stream_text + delta`。
2. **逐事件的流式 trace row**(旧版 `llm_stream`):对 `_traceTextBuf` / `_traceReasonBuf` 使用 `buf += delta` 再 `slice`。

token 稍多就会把 **主线程/event loop 打满**,表现为全局卡死,而不是单个接口超时。

## 修复要点

1. **流式正文**:用 **`string[]` 分块 `push`**,写盘前 **`join` 一次**(线性时间)。
2. **逐 delta 的 trace**:避免对大 buffer 反复 `+=`;`part_deltas` 为按条 `push` 的结构化行;`model_stream_text` 仍用分块数组 + 一次 `join`。
3. **写文件**:**不要**在 middleware 的同步路径里做 `materialize + JSON.stringify + fs.write`。使用 **`scheduleTracePersist`**:`setImmediate` 里再 `async` 执行 `materializeModelStreamText`、组 payload、`await writeTraceFile`,请求路径不等待。

## 相关代码

- `packages/opencode/src/server/workflow-trace.ts`:`appendModelStreamText`、`traceRecordMessagePartDelta`、`scheduleWorkflowTracePersist`
- `packages/opencode/src/session/processor.ts`:每条流事件仍会调用 `traceRecordLlmStreamEvent`(需保持 O(1) 摊销/线性累计,避免热路径字符串拼接)

## 自检清单(以后改 trace)

- [ ] 热路径里是否对** growing string** 做 `+=`?
- [ ] 大对象 `JSON.stringify` 是否在请求返回前同步执行?
- [ ] 磁盘 I/O 是否已脱离 HTTP `finally` 的主线?

---

*记录日期:以本仓库当时修改为准;便于日后 code review 与 onboarding。*
61 changes: 61 additions & 0 deletions changelog/20260428/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# 2026-04-28 — Workflow trace 里程碑

## 概要

在 OpenCode 服务端 HTTP 工作流中引入 **workflow trace**:按请求生成结构化 JSON,便于排查会话、模型流与耗时。

## 主要能力

- **落盘位置**:与日志同级,例如 `~/.local/share/opencode/trace/`(随 `Global.Path.trace`)。
- **文件命名**:`trace_<request_id>_<iso-timestamp>.json`,**一请求一文件**(可配置/路径例外见下)。
- **内容**:
- HTTP 元数据、`events` 时间线(`opencode.<business>.<method>=<ms>|k=v`)。
- **`chat`**:`user_input`、`model_stream_text`(流式正文拼接)、`part_deltas`(与 UI 一致的 `message.part.delta` 粒度)、`assistant_output`、`model` 等。
- **Effect 贯通**:`WorkflowTraceSessionRef` 将 trace session 传入 `AppRuntime.runPromise`(ALS 无法穿透 Effect)。
- **`prompt_async`**:204 返回后 prompt 仍在后台执行,trace **推迟到 `runRequest.finally`** 再持久化,避免只有 `user_input`、无模型回复。
- **性能**:
- 流式场景禁止对 growing string 反复 `+=`(曾导致近似 O(n²) 卡死);改为分块数组 + 一次 `join`。
- 持久化经 `setImmediate` 异步调度,避免阻塞 HTTP 收尾。
- **默认范围**:仅对 `POST /session/:id/message|prompt_async|command|shell` 写 trace;`OPENCODE_WORKFLOW_TRACE=all` 或请求头 `x-opencode-workflow-trace: 1` 可扩大范围。

## 相关代码(入口)

相对本文件 `changelog/20260428/readme.md` 的路径,可在 IDE / GitHub 中点击跳转。

- [`packages/opencode/src/server/workflow-trace.ts`](../../packages/opencode/src/server/workflow-trace.ts) — 中间件、`scheduleWorkflowTracePersist`、`traceRecordLlmStreamEvent` 等。
- [`packages/opencode/src/server/routes/instance/trace.ts`](../../packages/opencode/src/server/routes/instance/trace.ts) — `runRequest` + `WorkflowTraceSessionRef`。
- [`packages/opencode/src/session/prompt.ts`](../../packages/opencode/src/session/prompt.ts) — `traceChatInitFromUserMessage`。
- [`packages/opencode/src/session/processor.ts`](../../packages/opencode/src/session/processor.ts) — 流事件与 `traceChatFinalizeAssistant`。
- [`packages/core/src/global.ts`](../../packages/core/src/global.ts) — `Path.trace`。
- [`packages/opencode/src/server/routes/instance/session.ts`](../../packages/opencode/src/server/routes/instance/session.ts) — `prompt_async` 推迟 `scheduleWorkflowTracePersist`。

## 事故与备忘

根目录 [**`WORKFLOW-TRACE-PERF.md`**](../../WORKFLOW-TRACE-PERF.md) 记录了「流式 trace 字符串拼接导致全局卡顿」的根因与自检清单,评审时可对照。

## 里程碑反思(提醒自己还能做得更好)

这一条不只是「功能合入」,更是一轮 **设计纠偏**:先按 SDK `fullStream` 逐事件落盘(`llm_stream` / `stream_packages`),后来发现与 **UI / bus 实际展示的粒度** 不一致,排查时对不上号,还加重了热路径与体积。收敛为 **`part_deltas`**(与 `session.updatePartDelta` / `message.part.delta` 一一对应)后,trace 才成为「和用户看到的一样」的可靠回放。

**可以记下来的原则:**

- **观测与产品同构**:调试数据优先对齐用户可见的通道与分片,而不是对齐任意一层 SDK 抽象。
- **热路径先量后加**:流式场景默认假设「事件极多」;任何 `+=`、大块 `JSON.stringify`、同步写盘都要当作红灯。
- **少即是多**:能用一个字段讲清故事,就不要维护两套语义相近的流(减少解析心智与维护成本)。

**后续仍可改进的方向(备忘,非承诺):** 采样/开关更细粒度、trace 与隐私/体积的平衡、与外部可观测性(trace id、导出格式)的统一——需要时再拆任务。

## 环境变量(节选)

| 变量 | 含义 |
|------|------|
| `OPENCODE_WORKFLOW_TRACE` | `all` / `1`:对所有未跳过路由写 trace |
| `OPENCODE_TRACE_MAX_USER_CHARS` | `user_input` 上限 |
| `OPENCODE_TRACE_MAX_STREAM_CHARS` | `model_stream_text` 上限 |
| `OPENCODE_TRACE_MAX_ASSISTANT_CHARS` | `assistant_output` 上限 |
| `OPENCODE_TRACE_MAX_PART_DELTAS` | `part_deltas` 最大条数 |
| `OPENCODE_TRACE_PART_DELTA_CAP` | 单条 `part_deltas[].delta` 字符上限 |

---

*本目录用于按日期归档重要变更;本条为 workflow trace 里程碑:含首次合入、性能事故教训,以及「与 UI 对齐的 `part_deltas`」这一设计转向,便于日后翻回来看「当时为什么这样选」。*
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"type": "module",
"packageManager": "bun@1.3.13",
"scripts": {
"build": "bun run --cwd packages/opencode build",
"build:fast": "bun run --cwd packages/opencode build -- --single --skip-embed-web-ui",
"build:web": "bash scripts/build-and-web.sh",
"dev": "bun run --cwd packages/opencode --conditions=browser src/index.ts",
"dev:desktop": "bun --cwd packages/desktop-electron dev",
"dev:web": "bun --cwd packages/app dev",
Expand Down
8 changes: 4 additions & 4 deletions packages/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
},
"scripts": {
"typecheck": "tsgo -b",
"start": "vite",
"dev": "vite",
"build": "vite build",
"serve": "vite preview",
"start": "bunx --bun vite",
"dev": "bunx --bun vite",
"build": "bunx --bun vite build",
"serve": "bunx --bun vite preview",
"test": "bun run test:unit",
"test:ci": "mkdir -p .artifacts/unit && bun test --preload ./happydom.ts ./src --reporter=junit --reporter-outfile=.artifacts/unit/junit.xml",
"test:unit": "bun test --preload ./happydom.ts ./src",
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const paths = {
data,
bin: path.join(cache, "bin"),
log: path.join(data, "log"),
trace: path.join(data, "trace"),
cache,
config,
state,
Expand All @@ -32,6 +33,7 @@ await Promise.all([
fs.mkdir(Path.config, { recursive: true }),
fs.mkdir(Path.state, { recursive: true }),
fs.mkdir(Path.log, { recursive: true }),
fs.mkdir(Path.trace, { recursive: true }),
fs.mkdir(Path.bin, { recursive: true }),
])

Expand All @@ -45,6 +47,7 @@ export interface Interface {
readonly state: string
readonly bin: string
readonly log: string
readonly trace: string
}

export const layer = Layer.effect(
Expand All @@ -58,6 +61,7 @@ export const layer = Layer.effect(
state: Path.state,
bin: Path.bin,
log: Path.log,
trace: Path.trace,
})
}),
)
Expand Down
1 change: 1 addition & 0 deletions packages/core/test/fixture/effect-flock-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const testGlobal = Layer.succeed(
state: os.tmpdir(),
bin: os.tmpdir(),
log: os.tmpdir(),
trace: os.tmpdir(),
}),
)

Expand Down
1 change: 1 addition & 0 deletions packages/core/test/util/effect-flock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ const testGlobal = Layer.succeed(
state: os.tmpdir(),
bin: os.tmpdir(),
log: os.tmpdir(),
trace: os.tmpdir(),
}),
)

Expand Down
94 changes: 92 additions & 2 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,93 @@ import { ModelID, ProviderID } from "./schema"

const log = Log.create({ service: "provider" })

/** Set `OPENCODE_LOG_SSE=1` to log each raw SSE body chunk after fetch (`request_id`, bytes, UTF-8 preview). */
function sseLoggingEnabled() {
const v = process.env.OPENCODE_LOG_SSE?.toLowerCase()
return v === "1" || v === "true" || v === "all"
}

function requestUrlString(input: RequestInfo | URL): string {
if (typeof input === "string") return input
if (input instanceof URL) return input.href
return input.url
}

/** Stable id for one streamed response: prefer provider headers, else a new UUID for this tap. */
function sseStreamRequestId(res: Response): string {
const h = res.headers
const fromHeader =
h.get("x-request-id") ??
h.get("request-id") ??
h.get("cf-ray") ??
h.get("x-amzn-requestid") ??
h.get("x-openai-request-id")
const trimmed = fromHeader?.trim()
if (trimmed) return trimmed
return crypto.randomUUID()
}

/** Pass-through stream that logs every chunk from the provider (debugging SSE granularity). */
function tapSSEBody(res: Response, url: string): Response {
if (!sseLoggingEnabled() || !res.body) return res
const ct = res.headers.get("content-type") ?? ""
if (!ct.includes("event-stream")) return res

const requestId = sseStreamRequestId(res)
const reader = res.body.getReader()
const dec = new TextDecoder()
let seq = 0
const maxPreview = envInt("OPENCODE_LOG_SSE_PREVIEW", 2000)

const body = new ReadableStream<Uint8Array>({
async pull(ctrl) {
const part = await reader.read()
if (part.done) {
const tail = dec.decode()
if (tail) {
seq += 1
log.info("sse chunk", {
request_id: requestId,
url,
seq,
bytes: 0,
text: tail.length > maxPreview ? `${tail.slice(0, maxPreview)}...` : tail,
flush: true,
})
}
ctrl.close()
return
}
seq += 1
const text = dec.decode(part.value, { stream: true })
log.info("sse chunk", {
request_id: requestId,
url,
seq,
bytes: part.value.byteLength,
text: text.length > maxPreview ? `${text.slice(0, maxPreview)}...` : text,
})
ctrl.enqueue(part.value)
},
cancel(reason) {
return reader.cancel(reason)
},
})

return new Response(body, {
headers: new Headers(res.headers),
status: res.status,
statusText: res.statusText,
})
}

function envInt(name: string, fallback: number) {
const raw = process.env[name]
if (raw === undefined || raw === "") return fallback
const n = Number.parseInt(raw, 10)
return Number.isFinite(n) && n > 0 ? n : fallback
}

function shouldUseCopilotResponsesApi(modelID: string): boolean {
const match = /^gpt-(\d+)/.exec(modelID)
if (!match) return false
Expand Down Expand Up @@ -1486,8 +1573,11 @@ const layer: Layer.Layer<
timeout: false,
})

if (!chunkAbortCtl) return res
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
const urlStr = requestUrlString(input as RequestInfo | URL)
const tapped = tapSSEBody(res, urlStr)

if (!chunkAbortCtl) return tapped
return wrapSSE(tapped, chunkTimeout, chunkAbortCtl)
}

const bundledLoader = BUNDLED_PROVIDERS[model.api.npm]
Expand Down
18 changes: 17 additions & 1 deletion packages/opencode/src/server/routes/instance/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AppRuntime } from "@/effect/app-runtime"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { WorkspaceID } from "@/control-plane/schema"
import { traceStep } from "@/server/workflow-trace"

export function InstanceMiddleware(workspaceID?: WorkspaceID): MiddlewareHandler {
return async (c, next) => {
Expand All @@ -19,14 +20,29 @@ export function InstanceMiddleware(workspaceID?: WorkspaceID): MiddlewareHandler
})(),
)

traceStep({
business: "instance",
method: "resolve_directory",
detail: {
directory,
...(workspaceID !== undefined ? { workspace_id: String(workspaceID) } : {}),
},
})

return WorkspaceContext.provide({
workspaceID,
async fn() {
traceStep({ business: "instance", method: "workspace.enter" })
return Instance.provide({
directory,
init: () => AppRuntime.runPromise(InstanceBootstrap),
async fn() {
return next()
traceStep({ business: "instance", method: "route.enter" })
try {
return await next()
} finally {
traceStep({ business: "instance", method: "route.exit" })
}
},
})
},
Expand Down
31 changes: 25 additions & 6 deletions packages/opencode/src/server/routes/instance/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import { zodObject } from "@/util/effect-zod"
import { Bus } from "@/bus"
import { NamedError } from "@opencode-ai/core/util/error"
import { jsonRequest, runRequest } from "./trace"
import {
getWorkflowTraceSession,
scheduleWorkflowTracePersist,
workflowTracePathDefersPersist,
} from "@/server/workflow-trace"

const log = Log.create({ service: "server" })

Expand Down Expand Up @@ -925,19 +930,33 @@ export const SessionRoutes = lazy(() =>
async (c) => {
const sessionID = c.req.valid("param").sessionID
const body = c.req.valid("json")
const wfTrace = getWorkflowTraceSession()
void runRequest(
"SessionRoutes.prompt_async",
c,
SessionPrompt.Service.use((svc) =>
svc.prompt({ ...body, sessionID } as unknown as SessionPrompt.PromptInput),
),
).catch((err) => {
log.error("prompt_async failed", { sessionID, error: err })
void Bus.publish(Session.Event.Error, {
sessionID,
error: new NamedError.Unknown({ message: err instanceof Error ? err.message : String(err) }).toObject(),
)
.finally(() => {
if (!wfTrace || !workflowTracePathDefersPersist(wfTrace.http.path)) return
const finishedAt = new Date().toISOString()
const durationMs = Math.round(performance.now() - wfTrace.t0Ms)
scheduleWorkflowTracePersist(wfTrace, wfTrace.traceFilePath, {
requestId: wfTrace.requestId,
startedAt: wfTrace.startedAt,
finishedAt,
durationMs,
finalStatus: 204,
})
})
.catch((err) => {
log.error("prompt_async failed", { sessionID, error: err })
void Bus.publish(Session.Event.Error, {
sessionID,
error: new NamedError.Unknown({ message: err instanceof Error ? err.message : String(err) }).toObject(),
})
})
})

return c.body(null, 204)
},
Expand Down
Loading
Loading