GBrain Minions — 源码深度分析

> 一句话版本:一个 Postgres 原生的后台任务队列系统,灵感来自 BullMQ,但完全不需要 Redis。确定性工作(shell 脚本、数据同步)走 Minions 零 token 消耗,判断性工作(LLM 循环)也可以用 subagent handler 在同一队列里跑。4,448 行 TypeScript。

项目信息
来源[github.com/garrytan/gbrain/src/core/minions/](https://github.com/garrytan/gbrain/tree/master/src/core/minions)
语言TypeScript
依赖PGLite(本地)/ Supabase(生产),Bun 运行时
灵感BullMQ(Redis 队列),Sidekiq(backoff 策略)

架构总览


┌─────────────────────────────────────────────┐
│              MinionQueue                     │
│  (Postgres-native job queue)                 │
│                                              │
│  add() → claim() → completeJob() / failJob() │
│  cancelJob() → recursive CTE                │
│  handleStalled() / handleTimeouts()         │
└──────────────┬──────────────────────────────┘
               │
     ┌─────────┼──────────┐
     ▼         ▼          ▼
  Worker    Worker     Worker
  (concurrency=1) (concurrency=4)
     │
     ├── shell handler      → /bin/sh -c "cmd"
     ├── subagent handler   → Anthropic Messages loop
     └── aggregator handler → fan-in child results

核心组件

1. MinionQueue(~1,500 行)— Postgres 原生任务队列

不需要 Redis,所有状态都在一张 minion_jobs 表里。

Job 生命周期(9 种状态):


waiting → active → completed
                 → failed → delayed → waiting (retry)
                          → dead (永久失败)
waiting → delayed (带延迟的任务)
active → paused → waiting (恢复)
waiting → waiting-children (父任务等子任务)
        → cancelled (递归取消所有后代)
active → dead (超时 / max_stall 超限)

关键设计

设计点实现方式
**乐观锁**`lock_token` + `lock_until`,worker 每半轮续约
**FOR UPDATE SKIP LOCKED**claim 时用 PG 行级锁,多 worker 安全
**幂等提交**`idempotency_key`,相同 key 返回已有 job
**事务原子性**`completeJob()` 在同一个事务里:更新状态 + token 汇总 + 写 child_done + 解析父任务
**递归取消**`WITH RECURSIVE` CTE 一次性取消整棵任务树(最深 100 层)
**级联失败策略**`on_child_fail`: `fail_parent` / `remove_dep` / `ignore` / `continue`

token 汇总:子任务完成后自动向上汇总 token 消耗到父任务,最终可以看到整棵树的总成本。

2. MinionWorker(~250 行)— 并发 worker


const worker = new MinionWorker(engine, {
  concurrency: 4,      // 同时跑 4 个 job
  lockDuration: 30000, // 30s 锁
  stalledInterval: 30000, // 30s 检测一次 stalled
});

worker.register('sync', syncHandler);
worker.register('shell', shellHandler);
worker.start(); // 阻塞直到 SIGTERM

每个 job 独立的 AbortController

Worker 循环

1. 推进 delayed 任务

2. Claim 新 job(不超过 concurrency)

3. 检测 stalled(lock 过期但 status 仍 active → 重新排队)

4. 检测 timeout(timeout_at 过期 → dead-letter)

3. Shell Handler(~311 行)— 执行 shell 命令

双重安全门

1. MinionQueue.add() 拒绝 name='shell',除非 allowProtectedSubmit=true(CLI 和本地 MCP 才有)

2. 环境变量 GBRAIN_ALLOW_SHELL_JOBS=1 才注册 handler

环境隔离


// 只传递白名单变量,防止 $OPENAI_API_KEY 泄漏
const SHELL_ENV_ALLOWLIST = ['PATH', 'HOME', 'USER', 'LANG', 'TZ', 'NODE_ENV'];

优雅终止


SIGTERM → 5 秒等待 → SIGKILL

stdout/stderr 用 UTF-8 安全的 TailBuffer 截断(64KB / 16KB)。

4. Subagent Handler(~710 行)— LLM 循环

不只是 shell,Minions 也能跑 LLM。完整的 Anthropic Messages API 循环:

5. 辅助系统

Backoff(Sidekiq 风格):

Stagger(确定性交错):

Quiet Hours(静默时段):

Rate Leases(限流):

父子任务 DAG


parent (waiting-children)
  ├── child_1 (completed → child_done → inbox)
  ├── child_2 (failed → child_done → on_child_fail policy)
  └── child_3 (active → timeout → dead → child_done)

Aggregator 模式:
parent waits for ALL children → reads inbox → aggregates results

关键机制

Token 汇总:每个子任务的 token 消耗自动累加到父任务。

安全模型

保护
Job name`shell` 是受保护名,MCP 调用者不能提交
环境变量白名单制,防止 API key 泄漏
Shell 路径硬编码 `/bin/sh`,防止 PATH 投毒
深度限制`maxSpawnDepth=5`,防止无限递归
子任务上限`max_children`,防止 fan-out 爆炸
AbortController超时/取消/锁丢失/进程终止,四种信号
Stall 检测`max_stalled`(默认 5),超过则 dead-letter

与 BullMQ 对比

维度BullMQ (Redis)Minions (Postgres)
依赖RedisPGLite / Supabase
持久性Redis AOF/RDBPostgres WAL
分布式天然支持单 worker(当前)
父子 DAG有限完整(inbox + child_done)
附件内置(5MB 限制,SHA256)
LLM 循环内置 subagent handler
限流外部内置 rate leases
静默时段内置 quiet hours

分析

优势

局限

与 Jay 的关联

实际使用场景举例

场景 1:每天凌晨同步社交媒体帖子(Garry 的真实用例)

没有 Minions 之前:用 OpenClaw cron + sessions_spawn

用 Minions 之后


gbrain jobs submit sync-social \
  --params '{"platform":"twitter","months":1}' \
  --schedule "0 2 * * *"

# 等价于:每天凌晨 2:00 运行一个 shell 脚本
# 脚本内容:curl API → 解析 JSON → 写入 PGLite
# 延迟:753ms,token:$0.00,成功率:100%

Worker 内部发生了什么:


1. Worker poll → 发现 waiting 的 job
2. claim → status 变 active,拿到 lock_token
3. shell handler → spawn /bin/sh -c "curl ... | gbrain import"
4. 子进程输出 stdout/stderr → TailBuffer 截断到 64KB
5. exit code 0 → completeJob() → 事务提交
6. token 汇总:input=0, output=0, cache=0

场景 2:批量处理 36 个月的帖子(Fan-out DAG)


parent job: "sync-all-social"
  ├── child 1: sync 2023-01 ~ 2023-06 (753ms)
  ├── child 2: sync 2023-07 ~ 2023-12 (753ms)
  ├── ...
  └── child 6: sync 2025-07 ~ 2025-12 (753ms)

parent: max_children=3(同时最多 3 个子任务)
parent: on_child_fail="continue"(一个失败不影响其他)

代码层面


const parent = await queue.add('sync-all', { months: 36 });
for (let i = 0; i < 6; i++) {
  await queue.add('shell', {
    cmd: `gbrain import --range ${i*6}-${(i+1)*6}`,
    cwd: '/home/garry/brain',
  }, {
    parent_job_id: parent.id,
    max_children: 3,  // 父任务设置的上限
  });
}
// parent status: waiting → waiting-children
// 每个子任务完成后 → child_done inbox 消息 → 父任务
// 全部完成 → parent: waiting-children → waiting

场景 3:LLM 子任务 + 聚合(Subagent + Aggregator)


parent: "research-competitors" (aggregator)
  ├── child 1: subagent "分析公司 A 的最新融资" (LLM, ~$0.05)
  ├── child 2: subagent "分析公司 B 的最新融资" (LLM, ~$0.05)
  └── child 3: subagent "分析公司 C 的最新融资" (LLM, ~$0.05)

全部完成后 → aggregator 读取 inbox → 合并 3 份报告 → 写入 brain 页面

代码层面


const children = [];
for (const company of ['A', 'B', 'C']) {
  const job = await queue.add('subagent', {
    prompt: `分析 ${company} 的最新融资情况`,
    max_turns: 5,
  }, { parent_job_id: parent.id });
  children.push(job.id);
}

await queue.add('subagent_aggregator', {
  children_ids: children,
  aggregate_prompt_template: '综合以下 N 份分析报告...',
  output_slug: 'competitor-analysis',
}, { parent_job_id: parent.id });

场景 4:Cron 错开(Stagger)


10 个 cron 任务都在凌晨 2:00 触发:
  twitter-ingest  → stagger_minute_offset("twitter-ingest") = 23 → 2:23 执行
  github-ingest   → stagger_minute_offset("github-ingest")  = 41 → 2:41 执行
  linkedin-ingest → stagger_minute_offset("linkedin-ingest") = 07 → 2:07 执行

相同 key 永远相同偏移,不需要手动设置间隔。

场景 5:静默时段


job.quiet_hours = {
  start: 22,   // 晚上 10 点
  end: 7,      // 早上 7 点
  tz: "America/Los_Angeles",
  policy: "defer"  // 延迟到 7 点后执行
}

晚上 11 点 claim → 发现当前在静默窗口 → defer
→ status: active → delayed, delay_until: now() + 15min
→ 15 分钟后再检查 → 还在静默 → 再 defer
→ 早上 7 点后 → 正常执行

场景对比总结

场景sessions_spawnMinions
每天 curl API 同步$0.03/次,可能超时$0.00,753ms
36 个月批量处理~$1.08,40% 失败$0.00,100% 成功
LLM 研究 3 家公司串行,~3 分钟并行,~1 分钟
10 个 cron 同时触发网关过载自动交错

📚 交叉引用 — 🧬 GBrain 三部曲

本报告属于以下系列的一部分:

评分

维度评分 (1-10)说明
设计质量9事务一致性、crash-safe、统一队列
代码质量94,448 行,注释详尽,edge case 处理完善
创新性8BullMQ 思路但 Postgres-native + 内置 LLM 循环
实用性8Garry 生产验证(19 cron、45K 页 brain)
通用性6Bun 专属、单 worker、Anthropic-only
与 Jay 的关联9直接解决 OpenClaw cron 痛点
**总分****8.5**Postgres 原生任务队列的标杆实现