跳到主要内容

本文为非官方中文翻译,内容以 OpenAI 官方英文文档为准。
官方来源:https://developers.openai.com/cookbook/examples/agents_sdk/agent_improvement_loop

使用 Traces、Evals 和 Codex 构建 Agent 改进循环

本笔记本为一个 agent 构建改进飞轮。我们从真实 traces 开始,加入人工和模型反馈,将这些反馈转化为 evals,并使用由此产生的证据来提出下一步的 harness 更改,供 Codex 实施。

你将会:

  • 创建一个由 OpenAI Agents SDK 驱动的金融分析师
  • 在合成公司数据上运行它并捕获 traces
  • 为这些运行添加示例人工反馈和 LLM 生成的反馈
  • 将这些反馈转换为可在以后重新运行的 Promptfoo evals
  • 使用 HALO 对下一步 harness 更改进行排序,并编写可直接交给 Codex 的移交文档

在本笔记本中,harness 是围绕模型的完整契约,包括指令、工具、路由、输出要求和验证检查。

这个飞轮会保留你从每次运行中学到的内容。Traces 展示发生了什么,反馈解释什么最重要,evals 让这些预期可以复用,而 Codex 可以基于最终的变更集采取行动。

你将构建的内容

Agent improvement loop flywheel

到最后,你将拥有:

  1. 一个由 OpenAI Agents SDK 驱动的金融分析师,它会在五次带 trace 的运行中审查一家虚构公司的尽调材料
  2. 针对这些 traces 的人工反馈和 LLM 生成的反馈
  3. 一个自动生成的 Promptfoo eval 套件
  4. 一个覆盖当前 agent 行为的 Promptfoo 验证门
  5. 一个基于 traces、反馈和 eval 结果的 HALO 优化过程
  6. 一个面向开发者的移交文档,交给 Codex 以便其实施建议的 harness 更改

该 agent 支持一家虚构公司的收购尽调。它会审查财务导出、客户数据、合同、安全说明、董事会材料和管理层叙述,然后以带引用和可审阅工件的方式回答尽调问题。

这个循环会写入一个承载后续工作的文件:ARTIFACT_DIR 下生成的 codex_handoff.md 文件。它包含完整的 HALO 诊断、排序后的建议、这些建议背后的证据,以及 Codex 在下一次 harness 更新中所需的实施指导。

自动化程度由开发者决定。你可以使用这个循环来提出一个经过审查的变更集,或者将其连接到一个能够自动打开、合并并部署 pull request 的工作流。一个常见的起点是“经审查的循环”:系统提出变更集,由开发者在合并前批准 diff。随着 eval gate 变得更值得信任,同样的移交文档也可以支持更深层次的自动化。无论哪种情况,核心工作流都是相同的:traces 加上人工与模型反馈,变成具体的 harness 更改,而不是停留在彼此孤立的评论中。

与只停留在 traces 或 evals 的示例相比,本笔记本将 traces、审查者判断、生成的 evals、优化以及实施移交整合在一个可运行的改进循环中。

前置条件

安装本示例所使用的 Python 依赖后,从仓库根目录运行本笔记本:

python -m venv .venv
source .venv/bin/activate
pip install openai openai-agents halo-engine

Promptfoo 通过 npx 运行,因此你还需要安装 Node.js,并确保 npx 在你的路径中可用。

在运行笔记本之前设置 API key:

export OPENAI_API_KEY=...

这个示例有意仅支持 live 模式。trace 生成、模型点评、eval 生成、验证和优化步骤都会使用最新的模型输出,因此该笔记本展示的是实际循环,而不是脚本化预览。下一个单元会在一个地方集中暴露模型选择,这样如果需要,你可以替换为更便宜的模型,以在质量和成本之间做权衡。

在默认的五个 trace 配置下,完整运行大约需要 20 分钟,不过模型延迟和网络状况会让这个时间上下浮动。通常耗时最长的部分是步骤 3(运行带 trace 的 agent 调用)和步骤 7(HALO 分析完整循环)。反馈、eval 生成和 Promptfoo 的单元也会发起实时调用,但通常更短。长时间运行的单元会在执行过程中打印进度或已用时间。

%%capture
# 安装或升级本笔记本使用的 Python 依赖。
%pip install --quiet --upgrade openai openai-agents halo-engine
from __future__ import annotations

from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from importlib.metadata import version
from pathlib import Path
from typing import Any, Iterable, Iterator, Mapping

from IPython.display import Markdown, display
from openai import OpenAI

def find_project_root(start: Path | None = None) -> Path:
current = (start or Path.cwd()).resolve()
for candidate in [current, *current.parents]:
if (candidate / "registry.yaml").exists():
return candidate
return current

PROJECT_ROOT = find_project_root()

if not os.getenv("OPENAI_API_KEY"):
raise RuntimeError("在运行此 live 笔记本之前请先设置 OPENAI_API_KEY。")
if shutil.which("npx") is None:
raise RuntimeError("在运行 Promptfoo eval gate 之前,请安装带有 npx 的 Node.js。")

# 如果你想在循环的某些部分使用成本更低的模型,请在这里统一编辑。
AGENT_MODEL = os.getenv("OPENAI_AGENT_MODEL", "gpt-5.5")
ANALYSIS_MODEL = os.getenv("OPENAI_ANALYSIS_MODEL", "gpt-5.5")
EVAL_GENERATION_MODEL = os.getenv("OPENAI_EVAL_GENERATION_MODEL", ANALYSIS_MODEL)
JUDGE_MODEL = os.getenv("OPENAI_JUDGE_MODEL", ANALYSIS_MODEL)
HALO_MODEL = os.getenv("OPENAI_HALO_MODEL", ANALYSIS_MODEL)
PROMPTFOO_VERSION = os.getenv("PROMPTFOO_VERSION", "0.121.9")

client = OpenAI()

def format_duration(seconds: float) -> str:
minutes, remainder = divmod(int(round(seconds)), 60)
return f"{minutes}m {remainder:02d}s" if minutes else f"{remainder}s"

ARTIFACT_DIR = PROJECT_ROOT / "examples" / "agents_sdk" / "agent_improvement_loop_artifacts"
TRACE_DIR = ARTIFACT_DIR / "traces"
HALO_TRACE_PATH = ARTIFACT_DIR / "halo_traces" / "traces.jsonl"
if ARTIFACT_DIR.exists():
shutil.rmtree(ARTIFACT_DIR)
ARTIFACT_DIR.mkdir(exist_ok=True)
TRACE_DIR.mkdir(exist_ok=True)
HALO_TRACE_PATH.parent.mkdir(exist_ok=True)

print("已检测到项目根目录。")
print("模型:", {
"agent": AGENT_MODEL,
"analysis": ANALYSIS_MODEL,
"eval_generation": EVAL_GENERATION_MODEL,
"judge": JUDGE_MODEL,
"halo": HALO_MODEL,
"promptfoo": PROMPTFOO_VERSION,
})
已检测到项目根目录。
模型: {'agent': 'gpt-5.5', 'analysis': 'gpt-5.5', 'eval_generation': 'gpt-5.5', 'judge': 'gpt-5.5', 'halo': 'gpt-5.5', 'promptfoo': '0.121.9'}

步骤 1. 创建合成公司数据

该笔记本为一家可能在收购过程中被审查的公司创建虚构的尽调材料。数据混合了结构化导出和叙述性 markdown 文档,因此 agent 必须判断哪些来源更值得赋予更高权重。

合成数据中的叙述性 markdown 文件

文件包含原因
overview.md管理层的顶层公司概述
product_strategy.md路线图背景以及一个未经验证的 NRR 估算
go_to_market.md销售动作背景,应该与 pipeline 数据交叉核对
board_deck.md一份经过修饰的管理层叙述,可能与结构化导出冲突
financials/revenue_recognition_notes.md关于启动阶段 ARR 处理的会计背景
legal/contracts_summary.md合同层面的风险背景
legal/open_issues.md应保持可见的未决法律事项
security/security_overview.md安全态势和认证措辞
sales/security_faq.md面向销售的安全表述,可能夸大证据
hr/org_chart.md领导层和人员配置的运营背景
sales/pipeline_notes.md定性的 pipeline 说明
notes/qa_log.md尽调问题和未解决的后续事项

该示例在运行时生成合成公司数据,因此它保持自包含,同时仍为 agent 提供了一个真实感较强的结构化导出与叙述性文档混合分析场景。

定义合成源文件

下面折叠的单元包含用于构建虚构公司数据的源文档。

from textwrap import dedent

WORKSPACE_FILES = {
"overview.md": """
# FictionalCorp XYZ

FictionalCorp XYZ 是一家收入智能软件公司,提供年度 SaaS 订阅、按使用量计费的附加项以及启动阶段承诺。

管理层报告 FY2025 ARR 为 $43.0M,同比增长 71%。

管理层报告,在剔除启动阶段按使用量计费附加项后,没有任何法律实体客户占已签 ARR 的 15% 以上。

法务摘要:管理层表示法律事项均属日常经营范围,没有合同条款会影响估值。
""",
"product_strategy.md": """
# 产品策略

核心产品线:

- Forecast Assist
- Pipeline Quality Monitor
- Renewal Risk Workbench

产品路线图的优先事项是企业工作流深度。管理层预计按使用量计费的附加项将提高扩张收入。

销售领导层在规划材料中提到了 122% 的 NRR 估算,但财务尚未发布正式 NRR,该估算排除了部分降级销售和流失调整。
""",
"go_to_market.md": """
# Go To Market

FictionalCorp XYZ 通过直销模式向 CRO 和 RevOps 买方销售产品。

当前计划假设了更大的企业 ACV 和合作伙伴来源的 pipeline。应根据 `sales/pipeline.csv` 核查 pipeline 转化证据。
""",
"board_deck.md": """
# 董事会材料包 - 2025 年 12 月

- FY2025 期末 ARR:$43.0M
- ARR 增长:71%
- 毛利率:69%
- 现金消耗:每月 $2.9M
- Runway:11 个月

管理层叙述:公司已具备高效企业扩张的有利位置。

ARR 注释:标题中的 ARR 视图包含已签署的启动阶段承诺以及用于董事会规划的使用量补差视图。

管理层叙述:按法律实体和已签 ARR 衡量时,客户集中度可控。
""",
"financials/revenue_recognition_notes.md": """
# 收入确认说明

财务将 `financials/arr_bridge.csv` 视为受控的 FY2025 ARR bridge。

董事会材料包中的 ARR 包含 $2.8M 的已签启动阶段承诺(截至 2025-12-31 尚未上线)以及 $1.1M 的使用量补差,而财务不将后者归类为经常性 ARR。

RevOps 还流传一个经 bookings 调整的 ARR 视图,为 $40.8M。该视图对 pipeline 规划有用,但不应在没有明确说明的情况下与受控 ARR bridge 静默对齐。
""",
"legal/contracts_summary.md": """
# 合同摘要

标准客户合同为年度 SaaS 协议,并附带安全和 DPA 附件。前五大客户合计占 ARR 的 $25.1M。

管理层摘要:法律事项均属日常经营范围,没有合同条款会影响估值。

条款清单尚未与本摘要完全核对一致。`legal/clause_inventory.csv` 中标记了两份战略客户协议存在非标准条款。
""",
"legal/open_issues.md": """
# 未决法律问题

前经销商 DataHarbor 提起违约索赔,要求赔偿 $3.2M 外加加速支付的佣金。法律顾问估计损失有可能发生,但并非很可能。条款审查还识别出两份战略客户 MSA 存在非标准的控制权变更通知权,以及无限额保密赔偿条款。
""",
"security/security_overview.md": """
# 安全概览

SOC 2 Type I 已完成。SOC 2 Type II 现场审计正在进行中,Type II 报告尚未出具。

在依赖 SOC 2 声明之前,客户安全审查应核实确切的认证状态。
""",
"sales/security_faq.md": """
# 销售安全 FAQ

面向一线销售的指导称,Aurora 在后期企业交易中“已完成 SOC 2”。

安全团队说明:该措辞原本是指 Type I 就绪状态,而不是已出具的 Type II 报告。在检查 `security/security_overview.md` 之前,不要将此 FAQ 用作认证证据。
""",
"hr/org_chart.md": """
# 组织架构图

- CEO
- CFO
- 销售副总裁
- 产品副总裁
- 安全负责人

招聘计划假设 2026 年 GTM 净新增 14 名员工。
""",
"sales/pipeline_notes.md": """
# Pipeline 说明

已承诺阶段 pipeline 包含 $1.6M 的 DataHarbor 来源机会,可能会受到经销商纠纷影响。

Northstar 扩张 pipeline 假设在采购审查前完成 SOC 2 Type II。财务尚未将这部分扩张纳入受控 FY2025 ARR。
""",
"notes/qa_log.md": """
# 尽调问答日志

- 已请求提供 NRR。RevOps 提供了 122% 的管理层估算,但财务尚未验证正式 NRR,并表示该估算排除了 Northstar 下行销售实体以及一个已流失的经销商来源账户。
- 已请求提供 CAC payback,但未提供。
- 前两大客户 ARR 合计为 $12.4M,占 FY2025 ARR 的 34%,依据 `customers/top_customers.csv`。
- Northstar Holdings 母账户 ARR 为 $12.4M,占 FY2025 ARR 的 34%,依据 `customers/account_hierarchy.csv`。
- 不应将董事会 ARR 与财务 ARR 静默对齐;差异请使用 `financials/revenue_recognition_notes.md`。
""",
"financials/arr_bridge.csv": """
metric,value_m
opening_arr_2025_m,21.58
new_arr_m,8.1
expansion_arr_m,3.2
contraction_arr_m,1.1
churn_arr_m,2.7
ending_arr_2025_m,36.9
bookings_adjusted_arr_m,40.8
""",
"financials/monthly_kpis.csv": """
month,ending_arr_m,new_arr_m,expansion_arr_m,churn_arr_m,gross_margin
2025-01,21.58,0.55,0.35,0.18,0.69
2025-02,23.28,0.59,0.37,0.20,0.69
2025-03,24.98,0.63,0.39,0.21,0.69
2025-04,26.69,0.67,0.41,0.22,0.69
2025-05,28.39,0.71,0.43,0.24,0.69
2025-06,30.09,0.75,0.45,0.26,0.69
2025-07,31.79,0.79,0.47,0.27,0.69
2025-09,33.50,0.83,0.49,0.28,0.69
2025-10,35.20,0.87,0.51,0.30,0.69
2025-12,36.90,0.91,0.53,0.32,0.69
""",
"financials/p_and_l.csv": """
period,revenue_m,gross_margin,opex_m,cash_burn_m,runway_months
FY2025,30.26,0.69,47.71,2.9,11
""",
"financials/retention_extract.csv": """
metric,value,status,notes
net_revenue_retention,122%,management_estimate_unvalidated,销售材料中的估算;排除了 Northstar 下行销售实体和一个已流失的经销商来源账户。
gross_revenue_retention,84%,finance_partial,2025 cohort 的初步结果;两个企业客户的使用量数据流不完整。
logo_retention,91%,finance_partial,"按法律实体统计,不是按母账户汇总。"
cac_payback_months,,not_provided,尽调团队已请求;数据室中没有来源计划表。
""",
"customers/top_customers.csv": """
customer,parent_account,arr_m,arr_share,segment,renewal_date,inclusion_basis
Northstar Bank,Northstar Holdings,7.8,0.2114,Enterprise,2026-02-15,controlled_arr_bridge
Northstar Capital Markets,Northstar Holdings,4.6,0.1247,Enterprise,2026-04-01,controlled_arr_bridge
Helio Retail,Helio Retail,6.9,0.1870,Enterprise,2026-05-15,controlled_arr_bridge
BluePeak Logistics,BluePeak Logistics,3.6,0.0976,Mid-market,2026-06-30,controlled_arr_bridge
Summit Foods,Summit Foods,2.2,0.0596,Mid-market,2026-02-28,controlled_arr_bridge
""",
"customers/account_hierarchy.csv": """
legal_entity,parent_account,parent_arr_m,note
Northstar Bank,Northstar Holdings,12.4,与 Northstar Capital Markets 属于同一采购母公司。
Northstar Capital Markets,Northstar Holdings,12.4,由不同的 RevOps 负责人管理,但属于同一个母公司续约委员会。
Helio Retail,Helio Retail,6.9,独立母账户。
BluePeak Logistics,BluePeak Logistics,3.6,独立母账户;续约问题尚未关闭。
""",
"customers/renewal_calendar.csv": """
customer,renewal_date,renewal_risk,notes
Northstar Bank,2026-02-15,medium,扩张取决于完成 SOC 2 Type II。
Northstar Capital Markets,2026-04-01,medium,与 Northstar Bank 属于同一母公司采购委员会。
Helio Retail,2026-05-15,medium,采用情况低于计划;预测延迟升级仍在监控中。
BluePeak Logistics,2026-06-30,high,存在未解决的 CRM 同步错误和续约风险。
""",
"customers/customer_health.csv": """
customer,health,primary_risk,signal_date,caveat
Northstar Bank,green,none flagged,2025-10-31,"Northstar 的健康状况按法律实体记录,而不是按母账户。"
Northstar Capital Markets,yellow,monitor adoption,2025-10-31,"Northstar 的健康状况按法律实体记录,而不是按母账户。"
Helio Retail,yellow,monitor adoption,2025-12-15,
BluePeak Logistics,red,renewal risk,2025-12-15,
Summit Foods,yellow,monitor adoption,2025-12-15,
""",
"legal/clause_inventory.csv": """
customer,issue,exposure,confidence
Northstar Bank,change_of_control_notice,客户可能在控制权交易后 10 天内要求过渡计划,medium
Helio Retail,uncapped_confidentiality_indemnity,保密违约责任无上限;未反映在管理层摘要中,high
BluePeak Logistics,service_credit_carveout,若 CRM sync SLA 连续两个月未达标,赔偿积分可超过一个月费用,medium
""",
"sales/pipeline.csv": """
stage,pipeline_m,historical_close_rate,quality_note
commit,6.1,0.39,包含依赖安全认证的 Northstar 扩张。
best_case,9.7,0.28,包含处于争议中的 DataHarbor 来源机会。
early,18.2,0.08,数量大但转化质量低。
""",
"support/escalations.csv": """
customer,severity,issue,status
Northstar Capital Markets,medium,Forecast latency,monitoring
BluePeak Logistics,high,CRM sync errors,open
Northstar Bank,medium,Security questionnaire blocked pending SOC 2 Type II report,open
""",
}

具现化合成数据

将源文件写入磁盘,添加清单,并检查生成的数据集。

def write_workspace_file(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(dedent(content).strip() + "\n", encoding="utf-8")

def generate_acquisition_diligence_workspace() -> Path:
"""直接根据 notebook 数据创建合成并购尽调工作区。"""
dataroom = ARTIFACT_DIR / "synthetic_dataroom"
shutil.rmtree(dataroom, ignore_errors=True)
for relative_path, content in WORKSPACE_FILES.items():
write_workspace_file(dataroom / relative_path, content)
manifest = {
"company_name": "FictionalCorp XYZ",
"scenario": "adversarial_diligence",
"files": sorted(str(path.relative_to(dataroom)) for path in dataroom.rglob("*") if path.is_file()),
}
write_workspace_file(dataroom / "manifest.json", json.dumps(manifest, indent=2))
return dataroom

dataset = generate_acquisition_diligence_workspace()
files = sorted(str(path.relative_to(dataset)) for path in dataset.rglob("*") if path.is_file())
print(f"Dataset created: {len(files)} files")
Dataset created: 24 files

第 2 步:定义由 Agents SDK 支持的分析师

示例 agent 对一家虚构 SaaS 公司执行并购尽调,该公司正在作为潜在收购目标接受审查。案例材料同时包含结构化导出数据和管理层叙述。有些来源彼此一致,有些相互冲突,还有一些重要主张仅得到部分支持。这为我们提供了一个现实理由,让我们随着时间推移不断改进 harness。

该 agent 仅使用提供的公司数据,为投资团队回答问题。当结构化财务证据与叙述性摘要不一致时,它应优先采用结构化财务证据;当证据缺失时,应保留不确定性;并留下可供其他审阅者检查的 artifacts。

OpenAI Agents SDK 提供了此工作流所需的托管 runner、沙箱执行、模型设置和 tracing hooks。prompt、tools、路由规则、输出要求和验证检查共同构成了当前的 agent harness

agent 生成的 artifacts

Artifactagent 写入它的原因
summary_answer.md返回给用户的简明回答
investment_memo.md供尽调读者使用的更完整审查 artifact
risk_register.json带有证据的结构化风险,供下游系统检查
open_questions.md应保持可见的缺失证据或未解决问题
citations.json从主张到源文件的机器可读链接
evidence_table.csv主张及其支持来源的表格式审计轨迹

这些 artifacts 通过在最终答案旁保留支持证据、未解决问题和必需文件,使工作可供审查。

需要关注的失败模式

此 notebook 旨在暴露如下失败:

  • 当结构化导出数据不一致时,将管理层叙述视为官方指标
  • 将未经支持的 NRR 估算报告得像是财务部门已验证一样
  • 将母账户集中度淡化为较弱的法人实体视角
  • 在证据只支持 Type I 的情况下声称“SOC 2 已完成”
  • 生成了措辞漂亮的回答,却让 citations、风险文件或证据 artifacts 不完整

定义 harness schema

先为模型设置和提升后的 agent 配置创建小型数据结构。这样可以让 harness 显式化,以便后续优化不仅针对 prompt 措辞。


@dataclass(frozen=True)
class ModelSettings:
agent_model: str
reasoning_effort: str

@dataclass(frozen=True)
class AgentConfig:
version: str
system_prompt: str
model_settings: ModelSettings
tool_policy: dict[str, Any]
eval_metadata: dict[str, Any]
path: Path = field(default_factory=lambda: Path("notebook_defined_agent_config"))

@property
def required_artifacts(self) -> list[str]:
return self.tool_policy["required_artifacts"]

def build_instructions(self) -> str:
return "\n\n".join([
self.system_prompt,
format_policy_section("Tool policy", self.tool_policy),
f"Runtime config:\n- Config version: `{self.version}`.\n- Treat this config as the promoted runtime contract.\n- Do not modify the runtime config during the run.",
]) + "\n"

def format_policy_section(title: str, policy: dict[str, Any]) -> str:
lines = [f"{title}:"]
for key, value in policy.items():
lines.extend(format_policy_value(key, value))
return "\n".join(lines)

def format_policy_value(key: str, value: Any, indent: int = 0) -> list[str]:
prefix = " " * indent
if isinstance(value, dict):
lines = [f"{prefix}- {key}:"]
for child_key, child_value in value.items():
lines.extend(format_policy_value(child_key, child_value, indent + 1))
return lines
if isinstance(value, list):
lines = [f"{prefix}- {key}:"]
for item in value:
if isinstance(item, dict):
lines.append(f"{prefix} -")
for child_key, child_value in item.items():
lines.extend(format_policy_value(child_key, child_value, indent + 2))
else:
lines.append(f"{prefix} - {item}")
return lines
return [f"{prefix}- {key}: {value}"]

配置说明和策略

system prompt 说明证据规则,tool policy 定义 agent 可以读取和写入的内容,eval metadata 记录当前被提升的 harness 版本。

SYSTEM_PROMPT = """
你是一名尽调分析师,正在审查一个合成公司 dataroom。

证据范围:
- 仅使用 `data/` 下的文件。
- 不要使用外部知识或假设。
- 当结构化 CSV/JSON 导出与叙述性文件冲突时,优先采用前者。

运行时工具:
- 沙箱从挂载的工作区根目录启动。使用诸如 `data/...` 和 `outputs/...` 这样的相对工作区路径;运行 shell 命令时,省略 `workdir` 或仅使用相对路径。绝不要传入绝对临时路径。
- `data/tools/check_evidence_coverage.py`:在最终定稿包含重大主张的答案之前使用它。创建一个 JSON 主张列表,其中包含 `claim`、`claim_type` 和 `citations`,然后运行 `python data/tools/check_evidence_coverage.py --claims-json <path> --dataset-root data --output outputs/evidence_coverage.json`。
- `data/tools/validate_output_contract.py`:在写入所需 artifacts 之后、最终响应之前,运行 `python data/tools/validate_output_contract.py --outputs outputs --dataset-root data --output outputs/output_contract_validation.json`。
- 如果任一工具报告了不受支持的主张、缺失的 citations、缺失的文件、格式错误的 JSON 或空 artifacts,请在最终定稿前修订答案/artefacts。如果证据不可用,请说明该主张未知或不受支持。

引用规则:
- 每个重大主张都必须引用一个或多个源文件名。
- 以工作区相对路径精确引用文件名,例如 `financials/arr_bridge.csv`。
- 不要引用不支持该主张的文件。

未知处理规则:
- 如果证据缺失,请说明答案未知或不受支持。
- 绝不要编造缺失数字。
- 如果证据冲突,请明确说明冲突,而不是默默调和。

输出规则:
- 写入 `outputs/summary_answer.md`。
- 写入 `outputs/investment_memo.md`。
- 写入 `outputs/risk_register.json`。
- 写入 `outputs/open_questions.md`。
- 写入 `outputs/citations.json`。
- 写入 `outputs/evidence_table.csv`。
""".strip()

MODEL_SETTINGS = {
"agent_model": AGENT_MODEL,
"reasoning_effort": "medium",
}

TOOL_POLICY = {
"allowed_data_root": "data",
"writable_output_root": "outputs",
"required_artifacts": [
"summary_answer.md",
"investment_memo.md",
"risk_register.json",
"open_questions.md",
"citations.json",
"evidence_table.csv",
],
"evidence_preference": [
"当来源冲突时,优先采用结构化 CSV 或 JSON 导出,而不是叙述性摘要。",
"将董事会材料视为有用的叙述性证据,而不是指标的最终记录系统。",
"暴露未解决的冲突,而不是默默调和它们。",
],
"runtime_tools": [
{
"path": "data/tools/check_evidence_coverage.py",
"purpose": "在最终答案前,对起草的重大主张及其引用的 dataroom 文件进行审计。",
"recommended_command": "python data/tools/check_evidence_coverage.py --claims-json outputs/claim_audit_input.json --dataset-root data --output outputs/evidence_coverage.json",
},
{
"path": "data/tools/validate_output_contract.py",
"purpose": "验证必需的输出 artifacts、JSON 结构以及 citation/源文件引用。",
"recommended_command": "python data/tools/validate_output_contract.py --outputs outputs --dataset-root data --output outputs/output_contract_validation.json",
},
],
"unknown_handling": [
"当某个指标缺失时,说明其未知或不受支持。",
"不要根据相邻指标推断缺失值。",
"将事实、推断和开放问题分开。",
],
"mutation_policy": [
"仅写入配置的 outputs 目录。",
"不要修改 dataroom 输入。",
"运行期间不要修改运行时 agent 配置。",
],
}

EVAL_METADATA = {
"version": "v001",
"status": "promoted",
"created_by": "manual_baseline",
"promotion_gate": "manual_review",
"description": "基线尽调分析师配置,具有严格的 dataroom 依据、引用、未知处理和 artifact 规则。",
}

agent_config = AgentConfig(
version=EVAL_METADATA["version"],
system_prompt=SYSTEM_PROMPT,
model_settings=ModelSettings(**MODEL_SETTINGS),
tool_policy=TOOL_POLICY,
eval_metadata=EVAL_METADATA,
)

检查 agent 配置

这个紧凑视图显示了已提升的配置版本、所选模型、必需的制品,以及 agent 在运行时可使用的工具。

required_artifacts_md = "\n".join(
f"- `{artifact}`" for artifact in agent_config.required_artifacts
)
runtime_tools_md = "\n".join(
f"- `{tool['path']}` — {tool['purpose']}"
for tool in agent_config.tool_policy["runtime_tools"]
)

display(Markdown(f"""
### Agent config summary

- **Version:** `{agent_config.version}`
- **Agent model:** `{agent_config.model_settings.agent_model}`
- **Reasoning effort:** `{agent_config.model_settings.reasoning_effort}`

**Required artifacts**
{required_artifacts_md}

**Runtime tools**
{runtime_tools_md}
"""))

Agent config 摘要

  • Version: v001
  • Agent model: gpt-5.5
  • Reasoning effort: medium

Required artifacts

  • summary_answer.md
  • investment_memo.md
  • risk_register.json
  • open_questions.md
  • citations.json
  • evidence_table.csv

Runtime tools

  • data/tools/check_evidence_coverage.py — 在最终答案前,根据已引用的 dataroom 文件审计草拟材料中的声明。
  • data/tools/validate_output_contract.py — 验证所需输出制品、JSON 结构以及引用/源文件引用。

添加验证工具

接下来的辅助函数会在工作区内创建两个本地工具:一个检查草拟的声明是否引用了真实存在的 dataroom 文件,另一个验证所需的输出制品是否存在且具有预期的结构。代码默认隐藏以节省空间,但如果你想检查实现,可以展开查看。


CHECK_EVIDENCE_COVERAGE = r'''#!/usr/bin/env python3

from pathlib import Path

def main() -> None:
parser = argparse.ArgumentParser(description="Audit whether drafted claims cite existing dataroom files.")
parser.add_argument("--claims-json", type=Path, required=True)
parser.add_argument("--dataset-root", type=Path, default=Path("data"))
parser.add_argument("--output", type=Path, default=Path("outputs/evidence_coverage.json"))
args = parser.parse_args()

claims = json.loads(args.claims_json.read_text(encoding="utf-8"))
if not isinstance(claims, list):
raise ValueError("--claims-json must contain a JSON list of claim objects")

result = check_evidence_coverage(claims, args.dataset_root)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, indent=2) + "\n", encoding="utf-8")
print(json.dumps(result, indent=2))

def check_evidence_coverage(claims: list[dict], dataset_root: Path) -> dict:
supported = []
unsupported = []
missing_citations = []

for raw in claims:
claim = str(raw.get("claim") or "").strip()
claim_type = str(raw.get("claim_type") or "claim")
citations = [str(item).strip().removeprefix("data/") for item in raw.get("citations") or [] if str(item).strip()]
row = {"claim": claim, "claim_type": claim_type, "citations": citations}
if not citations:
missing_citations.append({**row, "issue": "No citation provided."})
continue
missing = [citation for citation in citations if not (dataset_root / citation).exists()]
if missing:
unsupported.append({**row, "issue": f"Missing cited file(s): {', '.join(missing)}"})
else:
supported.append(row)

return {
"supported_claims": supported,
"unsupported_claims": unsupported,
"missing_citations": missing_citations,
"recommended_caveats": [
"Add valid source filenames or mark unsupported claims as unknown before final answer."
],
"passed": not unsupported and not missing_citations,
}

if __name__ == "__main__":
main()
'''

VALIDATE_OUTPUT_CONTRACT = r'''#!/usr/bin/env python3

from pathlib import Path

REQUIRED_FILES = [
"summary_answer.md",
"investment_memo.md",
"risk_register.json",
"open_questions.md",
"citations.json",
"evidence_table.csv",
]

def main() -> None:
parser = argparse.ArgumentParser(description="Validate diligence output artifacts before final answer.")
parser.add_argument("--outputs", type=Path, default=Path("outputs"))
parser.add_argument("--dataset-root", type=Path, default=Path("data"))
parser.add_argument("--output", type=Path, default=Path("outputs/output_contract_validation.json"))
args = parser.parse_args()

result = validate_output_contract(args.outputs, args.dataset_root)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, indent=2) + "\n", encoding="utf-8")
print(json.dumps(result, indent=2))

def validate_output_contract(outputs: Path, dataset_root: Path) -> dict:
issues = []
for filename in REQUIRED_FILES:
path = outputs / filename
if not path.exists():
issues.append({"file": filename, "issue": "missing required artifact"})
elif path.stat().st_size == 0:
issues.append({"file": filename, "issue": "empty required artifact"})

risks = _read_json(outputs / "risk_register.json", default=[])
citations = _read_json(outputs / "citations.json", default=[])
if not isinstance(risks, list):
issues.append({"file": "risk_register.json", "issue": "must be a JSON list"})
risks = []
if not isinstance(citations, list):
issues.append({"file": "citations.json", "issue": "must be a JSON list"})
citations = []

for index, risk in enumerate(risks):
evidence = risk.get("evidence") if isinstance(risk, dict) else None
if not evidence:
issues.append({"file": "risk_register.json", "risk_index": index, "issue": "risk lacks evidence"})
continue
missing = [str(item).removeprefix("data/") for item in evidence if not (dataset_root / str(item).removeprefix("data/")).exists()]
if missing:
issues.append({"file": "risk_register.json", "risk_index": index, "issue": f"missing evidence file(s): {', '.join(missing)}"})

for index, citation in enumerate(citations):
sources = citation.get("sources") if isinstance(citation, dict) else None
if not sources:
issues.append({"file": "citations.json", "citation_index": index, "issue": "citation lacks sources"})
continue
missing = [str(item).removeprefix("data/") for item in sources if not (dataset_root / str(item).removeprefix("data/")).exists()]
if missing:
issues.append({"file": "citations.json", "citation_index": index, "issue": f"missing source file(s): {', '.join(missing)}"})

try:
with (outputs / "evidence_table.csv").open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
if rows and not {"claim_id", "claim", "sources"}.issubset(rows[0].keys()):
issues.append({"file": "evidence_table.csv", "issue": "must include claim_id, claim, and sources columns"})
except FileNotFoundError:
pass

return {"passed": not issues, "issues": issues, "required_files": REQUIRED_FILES}

def _read_json(path: Path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return {"error": str(exc)}

if __name__ == "__main__":
main()
'''

def write_runtime_tools(dataset_dir: Path) -> list[str]:
tools_dir = dataset_dir / "tools"
tools_dir.mkdir(parents=True, exist_ok=True)
files = {
"check_evidence_coverage.py": CHECK_EVIDENCE_COVERAGE,
"validate_output_contract.py": VALIDATE_OUTPUT_CONTRACT,
}
written: list[str] = []
for filename, content in files.items():
path = tools_dir / filename
path.write_text(content, encoding="utf-8")
path.chmod(0o755)
written.append(str(path.relative_to(dataset_dir)))
return written

构建每个用户轮次

prompt 构建器仅在需要时添加特定于任务的指导,例如 memo 格式、单独的风险类别,或对缺乏支持的 NRR 声明进行严格处理。

def build_user_prompt(question: str, agent_config: Any | None = None) -> str:
config_line = ""
if agent_config is not None:
config_line = f"\nActive agent config: `{agent_config.version}` from `{agent_config.path}`.\n"
memo_instruction = ""
if _asks_for_memo(question):
memo_instruction = (
"\nThe user asked for a memo-style deliverable. Return the memo content inline in "
"your final answer and also write the required output artifacts. Do not answer only "
"with a status update or artifact path list.\n"
)
risk_category_instruction = ""
if _asks_for_top_risk_categories(question):
risk_category_instruction = (
"\nStructure the final answer with separate sections for Financial, Legal, and "
"Customer concentration risks. Do not collapse customer concentration into the "
"financial category.\n"
)
unsupported_metric_instruction = ""
if _asks_for_net_revenue_retention(question):
unsupported_metric_instruction = (
"\nFor net revenue retention, report the metric only if the dataroom directly "
"provides NRR/net revenue retention. Do not derive or estimate an NRR percentage "
"from ARR bridge components unless the user explicitly asks for an estimate. If "
"the metric is absent, say it is unknown or unsupported, cite the searched "
"source files, and separate missing evidence from any directional inference.\n"
)
return f"""
Answer this diligence question using only the mounted dataroom:

{question}
{config_line}
{memo_instruction}
{risk_category_instruction}
{unsupported_metric_instruction}
Also write the required output artifacts. Keep the answer concise, grounded, and citation-heavy.
Use workspace-relative paths for shell commands and omit `workdir`; do not pass absolute temporary paths.
"""

def _asks_for_memo(question: str) -> bool:
lower = question.lower()
return "memo" in lower or "ic-style" in lower or "investment committee" in lower

def _asks_for_top_risk_categories(question: str) -> bool:
lower = question.lower()
return all(term in lower for term in ("financial", "legal", "customer")) and "risk" in lower

def _asks_for_net_revenue_retention(question: str) -> bool:
lower = question.lower()
return "net revenue retention" in lower or "nrr" in lower

导出 trace 以供后续优化

本地导出器会将 Agents SDK 事件转换为 HALO 之后可读取的 OpenTelemetry 风格 JSONL。由于实现细节较多,因此默认将代码折叠显示。

配置 trace 导出器

设置导出器对象,用于接收 Agents SDK span,并为每个 span 写入一行 JSONL。

EXPORT_SCHEMA_VERSION = 1

OBSERVATION_KIND_BY_TYPE = {
"agent": "AGENT",
"generation": "LLM",
"response": "LLM",
"function": "TOOL",
"mcp_tools": "TOOL",
"handoff": "CHAIN",
"guardrail": "GUARDRAIL",
"custom": "SPAN",
"task": "SPAN",
"turn": "SPAN",
"transcription": "SPAN",
"speech": "SPAN",
"speech_group": "SPAN",
}

@dataclass(frozen=True)
class HaloExportContext:
project_id: str
service_name: str
service_version: str | None = None
deployment_environment: str | None = None
extra_resource_attributes: Mapping[str, Any] | None = None

def setup_halo_tracing(
path: str | Path,
*,
project_id: str = "synthetic-dataroom-agent",
service_name: str = "financial-diligence-analyst",
service_version: str | None = None,
deployment_environment: str | None = None,
extra_resource_attributes: Mapping[str, Any] | None = None,
):
from agents import set_trace_processors

trace_path = Path(path)
trace_path.parent.mkdir(parents=True, exist_ok=True)
processor = HaloJsonlTraceProcessor(
trace_path,
ctx=HaloExportContext(
project_id=project_id,
service_name=service_name,
service_version=service_version,
deployment_environment=deployment_environment,
extra_resource_attributes=extra_resource_attributes,
),
)
# 在本 cookbook 工作流中仅使用本地导出器。
# 托管式 trace 摄取在某些环境中可能不可用(例如 ZDR 组织)。
set_trace_processors([processor])
return processor

class HaloJsonlTraceProcessor:
def __init__(self, path: Path, *, ctx: HaloExportContext):
self._path = path
self._ctx = ctx
self._lock = threading.Lock()
self._handle = path.open("a", encoding="utf-8")
self._trace_meta: dict[str, tuple[str | None, str | None, dict[str, Any]]] = {}

def on_trace_start(self, trace) -> None: # noqa: ANN001
data = trace.export() or {}
trace_id = _strip_prefix(data.get("id"), "trace_") or ""
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
self._trace_meta[trace_id] = (
data.get("workflow_name"),
data.get("group_id"),
metadata,
)

def on_trace_end(self, trace) -> None: # noqa: ANN001
data = trace.export() or {}
trace_id = _strip_prefix(data.get("id"), "trace_") or ""
self._trace_meta.pop(trace_id, None)

def on_span_start(self, span) -> None: # noqa: ANN001
return None

def on_span_end(self, span) -> None: # noqa: ANN001
exported = span.export() or {}
trace_id = _strip_prefix(exported.get("trace_id"), "trace_") or ""
workflow_name, group_id, trace_metadata = self._trace_meta.get(trace_id, (None, None, {}))
line = span_to_halo_jsonl_line(
span,
ctx=self._ctx,
workflow_name=workflow_name,
group_id=group_id,
trace_metadata=trace_metadata,
)
encoded = json.dumps(line, separators=(",", ":"), ensure_ascii=False, default=str)
with self._lock:
self._handle.write(encoded)
self._handle.write("\n")

def shutdown(self) -> None:
with self._lock:
try:
self._handle.flush()
self._handle.close()
except Exception:
pass

def force_flush(self) -> None:
with self._lock:
self._handle.flush()

将 SDK span 映射为 HALO 可读字段

这些辅助函数会将每种 SDK span 类型转换为 HALO 之后会检查的属性。

def span_to_halo_jsonl_line(
span,
*,
ctx: HaloExportContext,
workflow_name: str | None = None,
group_id: str | None = None,
trace_metadata: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
raw = span.export() or {}
span_data = raw.get("span_data") or {}
span_type = str(span_data.get("type") or "custom")
error = raw.get("error")
resource_attributes: dict[str, Any] = {"service.name": ctx.service_name}
if ctx.service_version:
resource_attributes["service.version"] = ctx.service_version
if ctx.deployment_environment:
resource_attributes["deployment.environment"] = ctx.deployment_environment
if ctx.extra_resource_attributes:
resource_attributes.update(ctx.extra_resource_attributes)

attributes, projection = _attributes_for_span_type(span_type, span_data)
if workflow_name:
attributes["agent.workflow.name"] = workflow_name
if group_id:
attributes["agent.workflow.group_id"] = group_id
for key, value in (trace_metadata or {}).items():
if _json_safe(value):
attributes[f"agent.trace_metadata.{key}"] = value
else:
attributes[f"agent.trace_metadata.{key}"] = _json(value)

attributes.update(
{
"inference.export.schema_version": EXPORT_SCHEMA_VERSION,
"inference.project_id": ctx.project_id,
"inference.observation_kind": OBSERVATION_KIND_BY_TYPE.get(span_type, "SPAN"),
"inference.llm.provider": projection.get("llm_provider"),
"inference.llm.model_name": projection.get("llm_model_name"),
"inference.llm.input_tokens": projection.get("input_tokens"),
"inference.llm.output_tokens": projection.get("output_tokens"),
"inference.llm.cost.total": projection.get("cost_total"),
"inference.user_id": projection.get("user_id"),
"inference.session_id": group_id,
"inference.agent_name": projection.get("agent_name") or "",
}
)

return {
"trace_id": _strip_prefix(raw.get("trace_id"), "trace_") or "",
"span_id": _strip_prefix(raw.get("id"), "span_") or "",
"parent_span_id": _strip_prefix(raw.get("parent_id"), "span_") or "",
"trace_state": "",
"name": _span_name(span_type, span_data),
"kind": _span_kind(span_type),
"start_time": _to_otlp_timestamp(raw.get("started_at")),
"end_time": _to_otlp_timestamp(raw.get("ended_at")),
"status": {
"code": "STATUS_CODE_ERROR" if error else "STATUS_CODE_OK",
"message": str((error or {}).get("message") or ""),
},
"resource": {"attributes": resource_attributes},
"scope": {"name": "openai-agents-sdk", "version": _sdk_version()},
"attributes": {key: value for key, value in attributes.items() if value is not None},
}

def _attributes_for_span_type(
span_type: str,
data: Mapping[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
if span_type == "agent":
return _agent_attrs(data)
if span_type == "generation":
return _generation_attrs(data)
if span_type == "response":
return _response_attrs(data)
if span_type == "function":
return _function_attrs(data)
if span_type == "mcp_tools":
return _mcp_tools_attrs(data)
if span_type == "handoff":
return _handoff_attrs(data)
if span_type == "guardrail":
return _guardrail_attrs(data)
return _custom_attrs(span_type, data)

def _agent_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
name = data.get("name") or ""
return _drop_none(
{
"openinference.span.kind": "AGENT",
"agent.name": name,
"agent.handoffs": _json(data.get("handoffs")),
"agent.tools": _json(data.get("tools")),
"agent.output_type": data.get("output_type"),
}
), {"agent_name": name}

def _generation_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
usage = data.get("usage") or {}
input_messages = data.get("input") or []
output_messages = data.get("output") or []
attrs: dict[str, Any] = {
"openinference.span.kind": "LLM",
"llm.provider": "openai",
"llm.model_name": data.get("model"),
"llm.invocation_parameters": _json(data.get("model_config")),
"llm.input_messages": _json(list(input_messages)),
"llm.output_messages": _json(list(output_messages)),
"llm.token_count.prompt": _int(usage.get("input_tokens") or usage.get("prompt_tokens")),
"llm.token_count.completion": _int(
usage.get("output_tokens") or usage.get("completion_tokens")
),
"llm.token_count.total": _int(usage.get("total_tokens")),
}
attrs.update(_expand_messages("llm.input_messages", input_messages))
attrs.update(_expand_messages("llm.output_messages", output_messages))
return _drop_none(attrs), {
"llm_provider": "openai",
"llm_model_name": data.get("model"),
"input_tokens": _int(usage.get("input_tokens") or usage.get("prompt_tokens")),
"output_tokens": _int(usage.get("output_tokens") or usage.get("completion_tokens")),
}

def _response_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
usage = data.get("usage") or {}
return _drop_none(
{
"openinference.span.kind": "LLM",
"llm.provider": "openai",
"llm.response.id": data.get("response_id"),
"llm.token_count.prompt": _int(usage.get("input_tokens") or usage.get("prompt_tokens")),
"llm.token_count.completion": _int(
usage.get("output_tokens") or usage.get("completion_tokens")
),
"llm.token_count.total": _int(usage.get("total_tokens")),
}
), {
"llm_provider": "openai",
"input_tokens": _int(usage.get("input_tokens") or usage.get("prompt_tokens")),
"output_tokens": _int(usage.get("output_tokens") or usage.get("completion_tokens")),
}

def _function_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
return _drop_none(
{
"openinference.span.kind": "TOOL",
"tool.name": data.get("name"),
"input.value": data.get("input"),
"output.value": data.get("output"),
"mcp.data": _json(data.get("mcp_data")),
}
), {}

def _mcp_tools_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
return _drop_none(
{
"openinference.span.kind": "TOOL",
"mcp.server": data.get("server"),
"mcp.tools.listed": _json(data.get("result")),
}
), {}

def _handoff_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
return _drop_none(
{
"openinference.span.kind": "CHAIN",
"agent.handoff.from": data.get("from_agent"),
"agent.handoff.to": data.get("to_agent"),
}
), {"agent_name": data.get("to_agent")}

def _guardrail_attrs(data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
return _drop_none(
{
"openinference.span.kind": "GUARDRAIL",
"guardrail.name": data.get("name"),
"guardrail.triggered": bool(data.get("triggered")),
}
), {}

def _custom_attrs(span_type: str, data: Mapping[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
attrs: dict[str, Any] = {
"openinference.span.kind": "CHAIN",
"sdk.span.type": span_type,
}
if data.get("name"):
attrs["sdk.span.name"] = data.get("name")
payload = data.get("data") or {}
if isinstance(payload, Mapping):
for key, value in payload.items():
attrs[f"sdk.data.{key}"] = value if _json_safe(value) else _json(value)
if "usage" in data:
attrs["llm.token_count.total"] = _int((data.get("usage") or {}).get("total_tokens"))
return _drop_none(attrs), {}

规范化辅助值

最后这些辅助函数会在导出的 spans 之间保持 ID、时间戳和序列化值的一致性。

def _strip_prefix(value: Any, prefix: str) -> str | None:
if not value:
return None
text = str(value)
return text[len(prefix) :] if text.startswith(prefix) else text

def _to_otlp_timestamp(value: str | None) -> str:
if not value:
return ""
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
parsed = parsed.astimezone(timezone.utc)
return parsed.strftime("%Y-%m-%dT%H:%M:%S.") + f"{parsed.microsecond:06d}000Z"

def _span_kind(span_type: str) -> str:
return "SPAN_KIND_CLIENT" if span_type in {"generation", "response"} else "SPAN_KIND_INTERNAL"

def _span_name(span_type: str, data: Mapping[str, Any]) -> str:
if data.get("name"):
return f"{span_type}.{data['name']}"
if data.get("model"):
return f"{span_type}.{data['model']}"
return span_type

def _expand_messages(prefix: str, messages: Iterable[Mapping[str, Any]]) -> dict[str, Any]:
attrs: dict[str, Any] = {}
for index, message in enumerate(messages or []):
if not isinstance(message, Mapping):
continue
role = message.get("role")
content = message.get("content")
if role is not None:
attrs[f"{prefix}.{index}.message.role"] = role
if isinstance(content, str):
attrs[f"{prefix}.{index}.message.content"] = content
elif content is not None:
attrs[f"{prefix}.{index}.message.content"] = _json(content)
for tool_index, tool_call in enumerate(message.get("tool_calls") or []):
function = (tool_call or {}).get("function") or {}
attrs[f"{prefix}.{index}.message.tool_calls.{tool_index}.tool_call.id"] = (
tool_call or {}
).get("id")
attrs[
f"{prefix}.{index}.message.tool_calls.{tool_index}.tool_call.function.name"
] = function.get("name")
attrs[
f"{prefix}.{index}.message.tool_calls.{tool_index}.tool_call.function.arguments"
] = function.get("arguments")
if message.get("tool_call_id"):
attrs[f"{prefix}.{index}.message.tool_call_id"] = message["tool_call_id"]
if message.get("name"):
attrs[f"{prefix}.{index}.message.name"] = message["name"]
return {key: value for key, value in attrs.items() if value is not None}

def _json(value: Any) -> str | None:
if value is None:
return None
return json.dumps(value, default=str, separators=(",", ":"))

def _json_safe(value: Any) -> bool:
return isinstance(value, (str, int, float, bool)) or value is None

def _int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None

def _drop_none(values: Mapping[str, Any]) -> dict[str, Any]:
return {key: value for key, value in values.items() if value is not None}

def _sdk_version() -> str:
try:
return version("openai-agents")
except Exception:
return "unknown"

运行 SDK agent

run_sdk_agent() 直接调用 Agents SDK runner,同时处理每次带 tracing 的运行周边需要重复执行的设置:挂载数据、附加 tracing、执行 agent,以及收集输出产物。

async def run_sdk_agent(
dataset_dir: Path,
output_dir: Path,
question: str,
model: str,
agent_config: AgentConfig,
trace_id: str | None = None,
trace_metadata: dict[str, Any] | None = None,
halo_trace_path: str | Path | None = None,
halo_project_id: str = "financial_diligence_analyst_optimization_context",
) -> str:
from agents import ModelSettings as SDKModelSettings
from agents import Runner, custom_span, flush_traces, trace
from agents.run import RunConfig
from agents.sandbox import Manifest, SandboxAgent, SandboxRunConfig
from agents.sandbox.entries import Dir, LocalDir
from agents.sandbox.sandboxes.unix_local import UnixLocalSandboxClient
from openai.types.shared import Reasoning

output_dir.mkdir(parents=True, exist_ok=True)
with staged_dataset_mount(dataset_dir) as staged_dataset_dir:
write_runtime_manifest(staged_dataset_dir)
reasoning = Reasoning(effort=agent_config.model_settings.reasoning_effort)
agent = SandboxAgent(
name="Synthetic dataroom diligence analyst",
model=model,
model_settings=SDKModelSettings(reasoning=reasoning),
instructions=agent_config.build_instructions(),
default_manifest=Manifest(
entries={
"data": LocalDir(src=staged_dataset_dir),
"outputs": Dir(),
}
),
)
client = UnixLocalSandboxClient()
session = None
halo_processor = None
if halo_trace_path is not None:
halo_processor = setup_halo_tracing(
halo_trace_path,
project_id=halo_project_id,
service_version=agent_config.version,
deployment_environment="notebook" if trace_metadata else None,
extra_resource_attributes={
"agent.config.version": agent_config.version,
"agent.config.path": str(agent_config.path),
},
)
trace_context = (
trace(
workflow_name="Synthetic dataroom diligence",
trace_id=trace_id,
metadata=trace_metadata,
)
if trace_id
else None
)
if trace_context is not None:
trace_context.__enter__()
try:
with custom_span(
"sandbox_workspace",
{
"tool.name": "sandbox_workspace",
"tool.input": {
"mounted": "data",
"writable": "outputs",
"dataset_dir": str(dataset_dir),
"staged_dataset_dir": str(staged_dataset_dir),
"agent_config": str(agent_config.path),
"agent_config_version": agent_config.version,
},
},
disabled=trace_context is None,
):
with custom_span(
"agent_config",
{
"tool.name": "agent_config",
"tool.input": {
"version": agent_config.version,
"required_artifacts": agent_config.required_artifacts,
},
},
disabled=trace_context is None,
):
pass
session = await client.create(manifest=agent.default_manifest)
async with session:
result = await Runner.run(
agent,
build_user_prompt(question, agent_config),
run_config=RunConfig(
sandbox=SandboxRunConfig(session=session),
workflow_name="Synthetic dataroom diligence",
trace_id=trace_id,
trace_metadata=trace_metadata,
tracing_disabled=trace_id is None,
),
max_turns=30,
)
for filename in agent_config.required_artifacts:
try:
with custom_span(
"artifact_write",
{
"tool.name": "artifact_write",
"tool.input": {"filename": filename},
},
disabled=trace_context is None,
):
with await session.read(Path("outputs") / filename) as handle:
(output_dir / filename).write_bytes(handle.read())
except Exception:
continue
return str(result.final_output)
finally:
delete = getattr(client, "delete", None)
if delete is not None and session is not None:
try:
await delete(session)
except Exception:
pass
if trace_context is not None:
trace_context.__exit__(None, None, None)
if halo_processor is not None:
try:
flush_traces()
except Exception:
pass
try:
halo_processor.shutdown()
except Exception:
pass

@contextmanager
def staged_dataset_mount(dataset_dir: Path) -> Iterator[Path]:
"""准备一个可写的 SDK 挂载副本,而不修改源 dataroom。"""
with tempfile.TemporaryDirectory(prefix="synthetic-dataroom-mount-") as tmp:
staged_dir = Path(tmp) / dataset_dir.name
shutil.copytree(dataset_dir, staged_dir)
write_runtime_tools(staged_dir)
yield staged_dir.resolve()

def write_runtime_manifest(dataset_dir: Path) -> None:
manifest = {
"runtime_scope": "sdk_agent_visible_dataroom",
"files": sorted(
str(path.relative_to(dataset_dir))
for path in dataset_dir.rglob("*")
if path.is_file() and path.name != "manifest.json"
),
}
(dataset_dir / "manifest.json").write_text(
json.dumps(manifest, indent=2) + "\n",
encoding="utf-8",
)

步骤 3. 生成带 trace 的运行

这些问题被刻意设计为多样化的,以便 eval 套件能够覆盖 agent 可能出错的多种方式。该 notebook 默认运行五条 trace,以在保持实时路径可行的同时,仍覆盖几种明显不同的行为。如果你之后想获得更广的覆盖范围,仍可使用更大的问题库。

每次运行都使用异步 Agents SDK 路径,并写入真实 trace 以及所需的工件。

QUESTION_BANK = [
"What do runway and burn tell us about near-term financing risk?",
"How strong is revenue quality, and which ARR figure should we rely on?",
"What is the real customer concentration risk after parent-account rollups?",
"What legal exposure should an acquirer investigate first?",
"How ready is the company for enterprise security review?",
"Which contradictions appear across the board deck, finance exports, and management narratives?",
"What unsupported metrics should we refuse to infer from the dataroom?",
"What follow-up questions should management answer before an investment committee review?",
"What are the top three diligence risks, ranked by severity?",
"Which claims in the materials look directionally useful but still need stronger evidence?",
]

# 默认使用 5 个问题;如果你之后想要更广的覆盖范围,还有更多问题可用。

DEFAULT_TRACE_INDICES = [0, 1, 2, 4, 6]
TRACE_LIMIT = len(DEFAULT_TRACE_INDICES)
QUESTIONS = [QUESTION_BANK[index] for index in DEFAULT_TRACE_INDICES]

@dataclass
class TraceRecord:
trace_id: str
sdk_trace_id: str
trace_label: str
question: str
answer: str
output_dir: str
mode: str

def sdk_trace_id(label: str) -> str:
# Agents SDK trace 上传要求 id 形如 `trace_<hex>`。
return f"trace_{hashlib.sha256(label.encode('utf-8')).hexdigest()[:32]}"

def exported_trace_id(label: str) -> str:
# 本地 HALO 导出器在写入 JSONL 之前会去掉 SDK 的 `trace_` 前缀。
return sdk_trace_id(label).removeprefix("trace_")

async def generate_traces(dataset: Path, questions: list[str]) -> list[TraceRecord]:
traces: list[TraceRecord] = []
for index, question in enumerate(questions, start=1):
label = f"trace-{index:02d}"
print(f"Running {label}/{len(questions):02d}: {question}")
output_dir = TRACE_DIR / f"trace_{index:02d}"
output_dir.mkdir(parents=True, exist_ok=True)
real_sdk_trace_id = sdk_trace_id(label)
real_exported_trace_id = exported_trace_id(label)
answer = await run_sdk_agent(
dataset_dir=dataset,
output_dir=output_dir,
question=question,
model=AGENT_MODEL,
agent_config=agent_config,
trace_id=real_sdk_trace_id,
trace_metadata={"notebook_trace_id": label},
halo_trace_path=HALO_TRACE_PATH,
)
traces.append(
TraceRecord(
trace_id=real_exported_trace_id,
sdk_trace_id=real_sdk_trace_id,
trace_label=label,
question=question,
answer=answer,
output_dir=str(output_dir.relative_to(PROJECT_ROOT)),
mode="sdk",
)
)
return traces

trace_generation_started = time.perf_counter()
traces = await generate_traces(dataset, QUESTIONS)
print(f"Trace generation completed in {format_duration(time.perf_counter() - trace_generation_started)}")
assert len(traces) == TRACE_LIMIT

for trace in traces:
print(f"{trace.trace_label}: {trace.question}")
print(textwrap.shorten(trace.answer.replace("\n", " "), width=180, placeholder="..."))
print()
Running trace-01/05: What do runway and burn tell us about near-term financing risk?
Running trace-02/05: How strong is revenue quality, and which ARR figure should we rely on?
Running trace-03/05: What is the real customer concentration risk after parent-account rollups?
Running trace-04/05: How ready is the company for enterprise security review?
Running trace-05/05: What unsupported metrics should we refuse to infer from the dataroom?
Trace generation completed in 7m 59s
trace-01: What do runway and burn tell us about near-term financing risk?
Near-term financing risk is elevated. Finance reports `$2.9M` monthly cash burn and `11 months` runway, and the board packet corroborates both figures....

trace-02: How strong is revenue quality, and which ARR figure should we rely on?
**Answer** - Revenue quality is **moderate, not clean**: real scale and 69% gross margin, but ARR definition drift, unvalidated retention, concentration, and renewal risk weaken...

trace-03: What is the real customer concentration risk after parent-account rollups?
**Answer** - Real concentration risk is **high**: Northstar Bank + Northstar Capital Markets roll up to **Northstar Holdings at $12.4M**, or **33.6% of controlled FY2025 ARR**....

trace-04: How ready is the company for enterprise security review?
**Answer** - The company is **partially ready, but not ready for frictionless enterprise security review**: SOC 2 Type I is complete, but SOC 2 Type II fieldwork is still in...

trace-05: What unsupported metrics should we refuse to infer from the dataroom?
**Answer** Refuse to infer these unsupported or conflicted metrics from the dataroom: - `CAC payback`: explicitly `not_provided`; requested but not supplied....

检查 agent 工件

每次带 trace 的运行都会写入 harness 所需的完整工件集。下面的第一次运行展示了 agent 生成的文件,因此你可以一起检查答案、证据和开放问题。

def show_trace_artifacts(trace: TraceRecord) -> None:
output_dir = PROJECT_ROOT / trace.output_dir
for artifact in agent_config.required_artifacts:
path = output_dir / artifact
language = {
".md": "markdown",
".json": "json",
".csv": "csv",
}.get(path.suffix, "text")
display(Markdown(f"### `{artifact}`\n```{language}\n{path.read_text(encoding='utf-8').rstrip()}\n```"))

show_trace_artifacts(traces[0])

summary_answer.md

# Summary Answer

Runway and burn indicate elevated near-term financing risk. Finance reports FY2025 cash burn of $2.9M per month and 11 months of runway, and the December board packet repeats the same burn and runway figures. (`financials/p_and_l.csv`, `board_deck.md`)

An 11-month runway is a sub-12-month financing window: unless burn is reduced, revenue conversion accelerates, or additional capital is secured, the company likely needs a financing plan in the near term. (`financials/p_and_l.csv`)

The financing story is somewhat weakened by ARR quality and source conflicts. The controlled FY2025 ARR bridge shows $36.9M ending ARR, while the board deck reports $43.0M because it includes $2.8M of launch-stage commitments and $1.1M of usage true-ups that finance does not classify as recurring ARR. (`financials/arr_bridge.csv`, `financials/revenue_recognition_notes.md`, `board_deck.md`)

The dataroom does not provide a cash balance, debt schedule, undrawn facility, covenant package, or financing plan, so the exact liquidity cushion and financing path are unknown from the provided evidence. (`financials/p_and_l.csv`, `manifest.json`)

investment_memo.md

# Investment Memo: Runway and Burn

## Bottom Line
- Near-term financing risk is elevated because finance reports $2.9M of monthly cash burn and only 11 months of runway. (`financials/p_and_l.csv`)
- The board packet corroborates the same $2.9M monthly burn and 11-month runway. (`board_deck.md`)
- The exact liquidity cushion is unknown because the dataroom provides runway and burn but not cash balance, debt availability, covenant terms, or a financing plan. (`financials/p_and_l.csv`, `manifest.json`)

## Evidence
- FY2025 P&L reports $30.26M revenue, 69% gross margin, $47.71M opex, $2.9M cash burn per month, and 11 months of runway. (`financials/p_and_l.csv`)
- Finance-controlled ARR is $36.9M at FY2025 year-end. (`financials/arr_bridge.csv`)
- The board deck reports $43.0M FY2025 ending ARR, 71% ARR growth, 69% gross margin, $2.9M monthly burn, and 11 months of runway. (`board_deck.md`)
- Finance states the board ARR includes $2.8M signed launch-stage commitments not live by 2025-12-31 and $1.1M usage true-ups that finance does not classify as recurring ARR. (`financials/revenue_recognition_notes.md`)

## Interpretation
- A company burning $2.9M per month with 11 months of runway has less than one year to reduce burn, convert growth into cash-efficient revenue, or raise capital. (`financials/p_and_l.csv`)
- The growth narrative should be underwritten against finance-controlled ARR rather than board headline ARR because finance identifies specific non-recurring or not-yet-live components in the board figure. (`financials/arr_bridge.csv`, `financials/revenue_recognition_notes.md`, `board_deck.md`)
- Current evidence supports a financing-risk concern, but it does not support quantifying exact cash balance, facility availability, covenant headroom, or planned raise timing. (`financials/p_and_l.csv`, `manifest.json`)

## Diligence View
- Financing risk: High / elevated.
- Key dependency: management must show a credible plan to extend runway beyond the reported 11 months.
- Critical missing evidence: cash balance, monthly cash forecast, debt/facility details, covenant headroom, and financing plan.

risk_register.json

[
{
"id": "R-001",
"risk": "少于 12 个月的资金续航期",
"severity": "High",
"rationale": "财务报告显示资金续航期为 11 个月,每月现金消耗为 290 万美元,这表明近期需要降低消耗、提高现金创造能力,或获得融资。",
"evidence": [
"financials/p_and_l.csv",
"board_deck.md"
],
"open_questions": [
"当前不受限现金是多少?",
"在资金续航期降到 6 个月以下之前,计划采取哪些融资行动?"
]
},
{
"id": "R-002",
"risk": "ARR 质量可能削弱融资叙事",
"severity": "Medium",
"rationale": "财务控制口径的 FY2025 期末 ARR 为 3690 万美元,而董事会材料报告的 ARR 为 4300 万美元,因为后者包含处于启动阶段的承诺和 usage true-ups,而财务并不将其归类为经常性 ARR。",
"evidence": [
"financials/arr_bridge.csv",
"financials/revenue_recognition_notes.md",
"board_deck.md"
],
"open_questions": [
"在贷款方或投资人材料中使用的是哪个 ARR 数字?",
"这些处于启动阶段的承诺中,有多少后来已经正式上线?"
]
},
{
"id": "R-003",
"risk": "流动性结构缺乏证据支持",
"severity": "Medium",
"rationale": "dataroom 提供了现金消耗和资金续航期,但没有提供现金余额、可用债务额度、契约余量或融资计划,因此难以对公司的流动性路径建立信心。",
"evidence": [
"financials/p_and_l.csv",
"manifest.json"
],
"open_questions": [
"是否存在尚未动用的循环信贷额度或 venture debt facility?",
"是否存在财务契约或最低现金要求?"
]
}
]

open_questions.md

# 待解答问题

- 当前不受限现金是多少,它与报告中的 11 个月资金续航期如何对应?(`financials/p_and_l.csv`)
- 是否存在现有债务工具、未动用的循环信贷额度、契约包或最低现金要求?(`manifest.json`)
- 管理层的融资计划是什么,包括目标时间、金额,以及在市场不可用时的应对方案?(`manifest.json`)
- 可采取哪些降低现金消耗的行动,每项行动可额外增加多少个月的资金续航期?(`financials/p_and_l.csv`)
- 在融资讨论中使用的是哪个 ARR 数字:财务控制口径的 3690 万美元 ARR,还是董事会标题中的 4300 万美元 ARR?(`financials/arr_bridge.csv`, `financials/revenue_recognition_notes.md`, `board_deck.md`)

citations.json

[
{
"claim_id": "C-001",
"claim": "财务报告 FY2025 每月现金消耗为 290 万美元,资金续航期为 11 个月。",
"sources": [
"financials/p_and_l.csv"
]
},
{
"claim_id": "C-002",
"claim": "12 月董事会材料再次给出了每月 290 万美元的现金消耗和 11 个月的资金续航期。",
"sources": [
"board_deck.md"
]
},
{
"claim_id": "C-003",
"claim": "财务控制口径的 FY2025 期末 ARR 为 3690 万美元。",
"sources": [
"financials/arr_bridge.csv"
]
},
{
"claim_id": "C-004",
"claim": "董事会材料报告 FY2025 期末 ARR 为 4300 万美元,ARR 增长率为 71%。",
"sources": [
"board_deck.md"
]
},
{
"claim_id": "C-005",
"claim": "财务表示,董事会口径的 ARR 包含 280 万美元截至 2025-12-31 尚未上线的启动阶段承诺,以及 110 万美元财务不归类为经常性 ARR 的 usage true-ups。",
"sources": [
"financials/revenue_recognition_notes.md"
]
},
{
"claim_id": "C-006",
"claim": "dataroom 未提供单独的现金余额、债务明细、融资工具可用额度、契约包或融资计划。",
"sources": [
"financials/p_and_l.csv",
"manifest.json"
]
}
]

evidence_table.csv

claim_id,claim,sources
C-001,"财务报告 FY2025 每月现金消耗为 290 万美元,资金续航期为 11 个月。","financials/p_and_l.csv"
C-002,"12 月董事会材料再次给出了每月 290 万美元的现金消耗和 11 个月的资金续航期。","board_deck.md"
C-003,"财务控制口径的 FY2025 期末 ARR 为 3690 万美元。","financials/arr_bridge.csv"
C-004,"董事会材料报告 FY2025 期末 ARR 为 4300 万美元,ARR 增长率为 71%。","board_deck.md"
C-005,"财务表示,董事会口径的 ARR 包含 280 万美元截至 2025-12-31 尚未上线的启动阶段承诺,以及 110 万美元财务不归类为经常性 ARR 的 usage true-ups。","financials/revenue_recognition_notes.md"
C-006,"dataroom 未提供单独的现金余额、债务明细、融资工具可用额度、契约包或融资计划。","financials/p_and_l.csv; manifest.json"

第 4 步:生成示例人工反馈和模型洞察

本节模拟了 agent 运行后由人工专家审查 traces 的过程。在真实的尽调工作流中,审查者可能是财务负责人或其他案例专家,他们知道哪些细节会影响决策。在这个示例中,审查者指出,母账户汇总比法律实体层面的集中度更重要,未经验证的管理层 NRR 估算不应成为正式指标,而当证据仅支持 Type I 时,“SOC 2 complete” 这样的说法过于模糊。

模型生成的洞察保持独立。在完全自动化的路径中,LLM 会审查相同的 traces,并提出反复出现的问题或缺失的行为。额外的这一步可以提高覆盖率,而主题专家审查则会加入基于实际工作内容的领域判断。

def feedback_item(
trace: TraceRecord,
summary: str,
required: list[str],
prohibited: list[str],
theme: str,
) -> dict[str, Any]:
return {
"feedback_id": f"human-{trace.trace_label}",
"trace_id": trace.trace_id,
"trace_label": trace.trace_label,
"question": trace.question,
"source_type": "human_feedback",
"theme": theme,
"summary": summary,
"required_observations": required,
"prohibited_claims": prohibited,
}

def generate_mock_human_feedback(traces: list[TraceRecord]) -> list[dict[str, Any]]:
specs_by_question = {
"What do runway and burn tell us about near-term financing risk?": (
"明确指出 11 个月的资金续航期和不断上升的现金消耗构成融资风险,而不只是泛泛而谈的危险信号。",
["提到 11 个月的资金续航期", "将现金消耗与近期融资压力联系起来"],
["不要暗示公司有超过 12 个月的资金续航期"],
"financial_risk",
),
"How strong is revenue quality, and which ARR figure should we rely on?": (
"将受控的 ARR bridge 作为可靠数字,并保留董事会口径与财务口径之间的矛盾。",
["优先使用财务 ARR 而非董事会 ARR", "保留 ARR 矛盾"],
["不要悄悄消解 ARR 差距"],
"revenue_quality",
),
"What is the real customer concentration risk after parent-account rollups?": (
"将集中度汇总到 Northstar Holdings。法律实体层面的表述低估了真实依赖度。",
["提到母账户集中度", "使用 account_hierarchy.csv"],
["不要停留在法律实体层面的集中度"],
"customer_concentration",
),
"How ready is the company for enterprise security review?": (
"对认证状态要精确:Type I 已完成;Type II 仍在进行中。",
["区分 Type I 和 Type II", "将销售 FAQ 视为较弱证据"],
["不要简单地说 SOC 2 已完成"],
"security_readiness",
),
"What unsupported metrics should we refuse to infer from the dataroom?": (
"当 dataroom 不支持时,拒绝给出正式的 NRR 和 CAC payback。",
["将正式 NRR 标记为不受支持", "将 CAC payback 标记为不受支持"],
["不要把管理层的 NRR 估算提升为正式指标"],
"unsupported_metrics",
),
}
return [feedback_item(trace, *specs_by_question[trace.question]) for trace in traces]

def extract_json(text: str) -> Any:
text = text.strip()
fenced = re.search(r"```(?:json)?\s*(.*?)```", text, flags=re.DOTALL)
candidate = fenced.group(1).strip() if fenced else text
return json.loads(candidate)

def generate_llm_feedback(traces: list[TraceRecord]) -> list[dict[str, Any]]:
payload = [asdict(trace) for trace in traces]
response = client.responses.create(
model=ANALYSIS_MODEL,
input=f"""
You are reviewing traces from a financial diligence analyst agent.
Return JSON only: a list of objects with keys `insight_id`, `trace_id`, `question`, `source_type`, and `observations`.
Use `source_type` = `llm_insight`.
For `trace_id`, copy the provided `trace_id` field exactly; do not use `sdk_trace_id` or `trace_label`.
For each trace, identify concise recurring-behavior observations that could help generate evals later.
Do not restate the whole answer. Do not invent unavailable evidence.

Traces:
{json.dumps(payload, indent=2)}
""".strip(),
)
parsed = extract_json(response.output_text)
if not isinstance(parsed, list):
raise ValueError("Expected a JSON list of LLM insights.")
trace_labels = {trace.trace_id: trace.trace_label for trace in traces}
for item in parsed:
try:
item["trace_label"] = trace_labels[item["trace_id"]]
except KeyError as exc:
raise ValueError(f"Unknown trace_id in LLM feedback: {item['trace_id']}") from exc
return parsed

feedback_started = time.perf_counter()
human_feedback = generate_mock_human_feedback(traces)
llm_feedback = generate_llm_feedback(traces)
print(f"反馈生成完成,用时 {format_duration(time.perf_counter() - feedback_started)}")
assert len(human_feedback) == TRACE_LIMIT
assert len(llm_feedback) == TRACE_LIMIT

print("人工反馈条目数:", len(human_feedback))
print("LLM 洞察条目数:", len(llm_feedback))
print("\n示例人工反馈:")
print(json.dumps(human_feedback[0], indent=2))
print("\n示例 LLM 洞察:")
print(json.dumps(llm_feedback[0], indent=2))
反馈生成已在 13 秒内完成
人工反馈条目数:5
LLM 洞察条目数:5

人工反馈示例:
{
"feedback_id": "human-trace-01",
"trace_id": "43d9b03619a9d2ed4d2f3e3fd17c8bf4",
"trace_label": "trace-01",
"question": "runway 和 burn 告诉了我们哪些近期融资风险?",
"source_type": "human_feedback",
"theme": "financial_risk",
"summary": "要同时说明 11 个月 runway 和不断上升的 burn 构成了融资风险,而不只是笼统地说是一个红旗。",
"required_observations": [
"指出 11 个月的 runway",
"将 burn 与近期融资压力联系起来"
],
"prohibited_claims": [
"不要暗示该公司拥有超过 12 个月的 runway"
]
}

LLM 洞察示例:
{
"insight_id": "llm_insight_01",
"trace_id": "43d9b03619a9d2ed4d2f3e3fd17c8bf4",
"question": "runway 和 burn 告诉了我们哪些近期融资风险?",
"source_type": "llm_insight",
"observations": [
"当 runway 少于 12 个月且 finance 与 board 来源给出了 monthly burn 时,会标记融资风险升高。",
"当 ARR 定义冲突时,优先采用由 finance 控制的 ARR,而不是 board 上的 headline ARR。",
"明确指出缺失的流动性数据,例如现金余额、可用债务、契约条款以及融资计划。",
"为关键数字性主张附上来源引用,并注明验证/制品完成情况。"
],
"trace_label": "trace-01"
}

第 5 步:根据 traces 和反馈生成 Promptfoo evals

eval 套件由 LLM 根据目前收集到的证据动态生成:trace 行为、人工反馈以及模型生成的观察结果。这会把评论转化为测试,使下一个 harness 修订版本之后还能再次运行这些测试。

Promptfoo 是一个用于评估和 red-team LLM 应用的开源 CLI 和库。在这个 notebook 中,生成的行为会变成 Promptfoo 测试用例:每个用例都可以结合字面断言和 LLM rubric judge,因此同一个关卡既能检查精确要求,也能检查语义层面的审阅意图。

Evals 是一个值得领域专家和开发者投入人工精力的地方。全自动流程可以快速提出有用的 evals,但在它们成为长期测试套件的一部分之前,人们仍应检查这些 evals 是否准确、是否具有代表性,以及是否真正衡量了重要的行为。

def generate_feedback_derived_evals(
traces: list[TraceRecord],
human_feedback: list[dict[str, Any]],
llm_feedback: list[dict[str, Any]],
) -> list[dict[str, Any]]:
min_eval_count = min(5, max(2, len(traces)))
max_eval_count = min(7, max(min_eval_count, len(traces) + 2))
response = client.responses.create(
model=EVAL_GENERATION_MODEL,
input=f"""
You are designing an eval suite for an OpenAI Agents SDK-backed financial diligence analyst.
Use the traces, human feedback, and LLM insights below to generate {min_eval_count} to {max_eval_count} durable eval definitions.
Return JSON only: a list of objects with keys `eval_id`, `title`, `scoring_method`, `expected_behavior`, `source_trace_id`, `rubric`, `deterministic_assertions`, `suggested_pass_example`, and `suggested_fail_example`.
`scoring_method` must be one of `deterministic`, `llm_judge`, or `hybrid`.
`source_trace_id` must exactly match the provided `trace_id` field for the trace whose answer should be scored. Do not use `sdk_trace_id` or `trace_label` for this field; those are only for SDK transport and human-readable references.
`rubric` must be a concise pass/fail grading rubric suitable for Promptfoo `llm-rubric`.
`deterministic_assertions` must be a list of Promptfoo-style assertion objects and may use only `contains`, `icontains`, or `not-contains` when a literal check is clearly useful; otherwise return an empty list.
Prefer reusable behaviors over one-off trace restatements.

Traces:
{json.dumps([asdict(trace) for trace in traces], indent=2)}

Human feedback:
{json.dumps(human_feedback, indent=2)}

LLM insights:
{json.dumps(llm_feedback, indent=2)}
""".strip(),
)
parsed = extract_json(response.output_text)
if not isinstance(parsed, list):
raise ValueError("Expected a JSON list of eval definitions.")
trace_labels = {trace.trace_id: trace.trace_label for trace in traces}
for item in parsed:
try:
item["source_trace_label"] = trace_labels[item["source_trace_id"]]
except KeyError as exc:
raise ValueError(f"Unknown source_trace_id in generated eval: {item['source_trace_id']}") from exc
return parsed

eval_generation_started = time.perf_counter()
eval_suite = generate_feedback_derived_evals(traces, human_feedback, llm_feedback)
print(f"Eval generation completed in {format_duration(time.perf_counter() - eval_generation_started)}")
assert all({"title", "scoring_method", "suggested_pass_example", "suggested_fail_example", "expected_behavior", "source_trace_id", "rubric", "deterministic_assertions"} <= set(item) for item in eval_suite)

def markdown_table(rows: list[dict[str, Any]], columns: list[str]) -> str:
header = "| " + " | ".join(columns) + " |"
divider = "| " + " | ".join(["---"] * len(columns)) + " |"
body = ["| " + " | ".join(str(row[column]) for column in columns) + " |" for row in rows]
return "\n".join([header, divider, *body])

display(Markdown(markdown_table(eval_suite, ["title", "scoring_method", "expected_behavior"])))

for item in eval_suite:
print(f"\n{item['title']}")
print(" pass:", item["suggested_pass_example"])
print(" fail:", item["suggested_fail_example"])
eval 生成已在 52 秒内完成
titlescoring_methodexpected_behavior
Runway 和 burn 必须被转化为近期融资风险hybrid答案应明确指出,由于 runway 只有 11 个月且每月现金 burn 较高/正在上升,融资风险已经升高,并将 burn 与需要在少于 12 个月的 runway 结束前削减支出、改善现金转化或筹集资本的压力联系起来。不应暗示该公司拥有超过 12 个月的 runway。
收入质量评估必须优先采用由 finance 控制的 ARR,并保留 ARR 之间的矛盾hybrid答案应将收入质量描述为混合或中等,而不是干净明确;在 underwriting 中依赖截至 FY2025 末约 $36.9M 的 finance-controlled ARR;并明确否定或限定 $43.0M 的 board/headline ARR 以及 $40.8M 的 bookings-adjusted ARR,说明它们不等同于 recurring ARR。应保留这种矛盾,而不是悄悄把差异调和掉。
客户集中度必须在 parent-account 汇总后再评估hybrid答案应在评估集中度前先将法实体汇总到 parent accounts,特别是识别出 Northstar Holdings 才是真正的母级敞口。应使用 finance-controlled ARR 作为分母,引用或提到账户层级证据,并避免停留在法实体层面的集中度分析。
企业安全就绪度必须区分 SOC 2 Type I 和 Type IIhybrid答案应说明该公司对企业级安全审查只算部分就绪,因为 SOC 2 Type I 已完成,但 Type II 仍在进行中,且尚未出具 Type II 报告。应将类似“SOC 2 complete”的销售 FAQ 表述视为较弱或可能具有误导性的证据,并将缺失 Type II 证据与企业采购或客户摩擦联系起来。
对缺乏支持的指标必须拒绝而不是推断hybrid答案应拒绝推断那些缺失、冲突、不完整或仅为管理层估算的指标。特别是,必须将 CAC payback 标记为 unsupported/not provided,并将官方 NRR 标记为 unsupported,因为 122% 的 NRR 只是未经验证的管理层估算。不应把不完整或非官方指标提升为确定性的 diligence 指标。

Runway 和 burn 必须被转化为近期融资风险
pass: 近期融资风险较高:该公司只有 11 个月的 runway,且每月 burn 显著,这带来了在少于 12 个月的时间窗口内降低 burn、改善现金转化或筹集资本的压力。
fail: 融资风险看起来可控,因为该公司未来一年有足够的 runway,应该能够在近期没有融资压力的情况下继续运营。

收入质量评估必须优先采用由 finance 控制的 ARR,并保留 ARR 之间的矛盾
pass: 收入质量属于中等,而不是干净明确。就 underwriting 而言,应使用 finance-controlled ARR bridge 中的 $36.9M,同时将 $43.0M 的 board ARR 和 $40.8M 的 bookings-adjusted 视图视为不可直接比较或仅用于规划的数字,因为它们包含了不被归类为 recurring ARR 的项目。
fail: 收入质量很强,公司拥有 $43.0M 的 ARR;可以使用 board 数字,因为经过常规调整后,它能与 finance ARR bridge 对上。

客户集中度必须在 parent-account 汇总后再评估
pass: 在进行 parent 汇总后,集中度风险很高:Northstar Bank 和 Northstar Capital Markets 应汇总到 Northstar Holdings,合计约 $12.4M,约占 finance-controlled ARR 的三分之一。只看法实体会低估这种依赖。
fail: 客户集中度可以接受,因为在查看 top-customer 列表后,没有任何单一法实体超过阈值。

企业安全就绪度必须区分 SOC 2 Type I 和 Type II
pass: 该公司只是部分就绪,而非毫无阻碍:SOC 2 Type I 已完成,但 Type II 现场工作仍在进行中,且尚无 Type II 报告可用。销售材料中“SOC 2 complete”的说法应谨慎对待,因为企业买家还在等待 Type II 证据。
fail: 该公司已经为企业安全审查做好准备,因为 SOC 2 已完成,销售 FAQ 也确认不会有安全阻碍。

对缺乏支持的指标必须拒绝而不是推断
pass: 拒绝推断 CAC payback,因为并未提供该指标。同时也拒绝将 122% 的 NRR 视为官方指标;它只是未经验证的管理层估算,不应作为确定性的留存指标使用。
fail: 该公司拥有 122% 的官方 NRR,并且根据其收入增长情况,CAC payback 看起来也很有吸引力,因此这两项都可以用于 underwriting。

第 6 步:使用 Promptfoo 验证当前 harness

Promptfoo 会针对当前 trace 输出运行生成的测试。这让循环能够获得一个快照,了解 harness 当前在哪些方面已经表现良好,以及哪些预期仍然失败。Promptfoo 适合承担这个角色,因为它可以将针对字面要求的确定性检查与用于语义质量的 llm-rubric 评判结合起来。

在这个 notebook 中,Promptfoo gate 会对现有 trace 输出打分。要验证未来的 harness 修订版,可将 trace-output provider 替换为运行候选 agent 的 provider。这些 Promptfoo 结果会成为下方传递给 HALO 的优化输入的一部分。即使 eval 生成是自动化的,人类仍然可以在让它们驱动重复优化之前收紧薄弱的 eval。

构建 Promptfoo 测试 harness

该 provider 会把现有 trace 输出返回给 Promptfoo,而测试构建器会把生成的 eval 定义转换为可运行的 Promptfoo case。

PROMPTFOO_PROVIDER = r'''from __future__ import annotations

from pathlib import Path

def call_api(prompt: str, options: dict, context: dict) -> dict:
config = options.get("config", {})
trace_outputs = json.loads(Path(config["trace_outputs_path"]).read_text(encoding="utf-8"))
trace_id = (context.get("vars") or {}).get("trace_id")
trace = trace_outputs[trace_id]
return {
"output": trace["answer"],
"metadata": {
"trace_id": trace_id,
"question": trace["question"],
},
}
'''

def trace_for_eval(item: dict[str, Any], traces: list[TraceRecord]) -> TraceRecord:
trace_by_id = {trace.trace_id: trace for trace in traces}
try:
return trace_by_id[item["source_trace_id"]]
except KeyError as exc:
raise ValueError(f"Unknown source_trace_id in generated eval: {item['source_trace_id']}") from exc

def promptfoo_test_from_eval(item: dict[str, Any], trace: TraceRecord) -> dict[str, Any]:
assertions = [
assertion
for assertion in item.get("deterministic_assertions") or []
if isinstance(assertion, dict)
and assertion.get("type") in {"contains", "icontains", "not-contains"}
and assertion.get("value")
]
assertions.append({
"type": "llm-rubric",
"provider": f"openai:{JUDGE_MODEL}",
"threshold": 0.8,
"value": item["rubric"],
})
return {
"description": item["title"],
"vars": {
"question": trace.question,
"trace_id": trace.trace_id,
"trace_label": trace.trace_label,
},
"metadata": {
"eval_id": item["eval_id"],
"scoring_method": item["scoring_method"],
},
"assert": assertions,
}

def write_promptfoo_artifacts(eval_suite: list[dict[str, Any]], traces: list[TraceRecord]) -> dict[str, Path]:
promptfoo_dir = ARTIFACT_DIR / "promptfoo"
promptfoo_dir.mkdir(parents=True, exist_ok=True)
provider_path = promptfoo_dir / "trace_output_provider.py"
trace_outputs_path = promptfoo_dir / "trace_outputs.json"
config_path = promptfoo_dir / "promptfooconfig.yaml"
output_path = promptfoo_dir / "promptfoo_results.json"

provider_path.write_text(PROMPTFOO_PROVIDER, encoding="utf-8")
trace_outputs_path.write_text(
json.dumps({trace.trace_id: asdict(trace) for trace in traces}, indent=2) + "\n",
encoding="utf-8",
)
tests = [promptfoo_test_from_eval(item, trace_for_eval(item, traces)) for item in eval_suite]
config = {
"description": "Feedback-derived diligence eval gate",
"prompts": ["{{question}}"],
"providers": [{
"id": "file://trace_output_provider.py",
"label": "current-trace-output",
"config": {"trace_outputs_path": str(trace_outputs_path)},
}],
"tests": tests,
}
# JSON is valid YAML, which keeps the generated config easy to inspect without
# adding another serialization dependency to the notebook.
config_path.write_text(json.dumps(config, indent=2) + "\n", encoding="utf-8")
return {
"dir": promptfoo_dir,
"provider": provider_path,
"trace_outputs": trace_outputs_path,
"config": config_path,
"output": output_path,
}

def promptfoo_summary(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
results = (data.get("results") or {}).get("outputs") or (data.get("results") or {}).get("results") or []
rows = []
for result in results:
grading = result.get("gradingResult") or {}
components = grading.get("componentResults") or []
failing_component = next(
(
component
for component in components
if isinstance(component, dict) and component.get("pass") is False
),
None,
)
reason = str(grading.get("reason") or "")
if not reason and failing_component:
reason = str(failing_component.get("reason") or "")
if not reason and components and isinstance(components[0], dict):
reason = str(components[0].get("reason") or "")
test_case = result.get("testCase") or {}
test_vars = test_case.get("vars") or {}
rows.append({
"eval_id": (test_case.get("metadata") or {}).get("eval_id"),
"title": test_case.get("description") or "Untitled",
"trace_id": test_vars.get("trace_id"),
"trace_label": test_vars.get("trace_label"),
"passed": bool(result.get("success")),
"score": result.get("score"),
"explanation": reason,
})
return {
"backend": "promptfoo",
"total": len(rows),
"passed": sum(row["passed"] for row in rows),
"failed": sum(not row["passed"] for row in rows),
"rows": rows,
}

运行 Promptfoo gate

执行生成的 suite,并汇总当前 harness 的结果。

def run_promptfoo_feedback_eval_gate(eval_suite: list[dict[str, Any]], traces: list[TraceRecord]) -> dict[str, Any]:
artifacts = write_promptfoo_artifacts(eval_suite, traces)
command = [
"npx",
"--yes",
f"promptfoo@{PROMPTFOO_VERSION}",
"eval",
"--no-cache",
"--no-table",
"-c",
str(artifacts["config"]),
"-o",
str(artifacts["output"]),
]
env = os.environ.copy()
env["PROMPTFOO_PYTHON"] = sys.executable
env["PROMPTFOO_CONFIG_DIR"] = str(artifacts["dir"] / ".promptfoo")
env["PROMPTFOO_DISABLE_WAL_MODE"] = "true"
process = subprocess.run(
command,
cwd=artifacts["dir"],
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False,
)
if not artifacts["output"].exists():
raise RuntimeError(f"Promptfoo did not write results. Output:\n{process.stdout[-4000:]}")
summary = promptfoo_summary(artifacts["output"])
summary["command"] = command
summary["returncode"] = process.returncode
summary["result_path"] = str(artifacts["output"].relative_to(PROJECT_ROOT))
summary["log_tail"] = process.stdout[-4000:]
return summary

promptfoo_started = time.perf_counter()
gate_result = run_promptfoo_feedback_eval_gate(eval_suite, traces)
print(f"Promptfoo gate completed in {format_duration(time.perf_counter() - promptfoo_started)}")
display(Markdown(markdown_table(gate_result["rows"], ["title", "trace_label", "passed", "score", "explanation"])))
print({key: gate_result[key] for key in ["backend", "total", "passed", "failed", "result_path"]})
Promptfoo gate completed in 9s
titletrace_labelpassedscoreexplanation
Runway and burn must be translated into near-term financing risktrace-01True1All assertions passed
Revenue quality assessment must prefer finance-controlled ARR and preserve ARR contradictionstrace-02True1All assertions passed
Customer concentration must be assessed after parent-account rollupstrace-03True1All assertions passed
Enterprise security readiness must distinguish SOC 2 Type I from Type IItrace-04True1All assertions passed
Unsupported metrics must be refused rather than inferredtrace-05True1All assertions passed
{'backend': 'promptfoo', 'total': 5, 'passed': 5, 'failed': 0, 'result_path': 'examples/agents_sdk/agent_improvement_loop_artifacts/promptfoo/promptfoo_results.json'}

第 7 步:运行 HALO 并编写交接内容

HALO 是 Hierarchical Agent Loop Optimization 的缩写,是一种用于根据执行 traces 改进 agent harness 的方法论和 Python 包。HALO 仓库 描述了这样一个循环:收集 traces,分析反复出现的 harness 级失败,将生成的报告交给 coding agent,并在 harness 更改后重复这一过程。

这一步,循环会把累积的证据转化为对 harness 变更的提议。HALO 会结合当前 harness、agent traces、人工反馈、模型反馈、生成的 evals 以及 Promptfoo 结果进行审查。然后,它会产出一组按优先级排序的变更,用于下一轮实现。

HALO 在这里的价值在于,它会对整个循环进行整体推理。它可以将人工判断与运行时行为和 eval 结果结合起来,然后将结果打包成一份交接内容,供 Codex 用来实现能够改进 harness 的代码变更。

收集 HALO 输入

构建一个上下文对象,把当前 harness、traces、反馈、evals 和 gate 结果放在一起。

from datetime import datetime, timezone

def serialize_agent_config(config: AgentConfig) -> dict[str, Any]:
return {
"version": config.version,
"system_prompt": config.system_prompt,
"model_settings": asdict(config.model_settings),
"tool_policy": config.tool_policy,
"eval_metadata": config.eval_metadata,
}

def build_halo_context(
traces: list[TraceRecord],
human_feedback: list[dict[str, Any]],
llm_feedback: list[dict[str, Any]],
eval_suite: list[dict[str, Any]],
gate_result: dict[str, Any],
agent_config: AgentConfig,
) -> dict[str, Any]:
return {
"traces": [asdict(trace) for trace in traces],
"human_feedback": human_feedback,
"llm_feedback": llm_feedback,
"eval_suite": eval_suite,
"gate_result": gate_result,
"agent_config": serialize_agent_config(agent_config),
}

def synthetic_trace_id(value: str) -> str:
return hashlib.sha256(f"halo-context-{value}".encode("utf-8")).hexdigest()[:32]

def synthetic_span_id(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()[:16]

def synthetic_span(*, trace_id: str, span_id: str, name: str, observation_kind: str, attributes: dict[str, Any]) -> dict[str, Any]:
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
return {
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": "",
"trace_state": "",
"name": name,
"kind": "SPAN_KIND_INTERNAL",
"start_time": now,
"end_time": now,
"status": {"code": "STATUS_CODE_OK", "message": ""},
"resource": {"attributes": {"service.name": "financial-diligence-analyst"}},
"scope": {"name": "halo-optimization-context", "version": "1"},
"attributes": {
"openinference.span.kind": observation_kind,
"inference.export.schema_version": 1,
"inference.project_id": "financial_diligence_analyst_optimization_context",
"inference.observation_kind": observation_kind,
**attributes,
},
}

def halo_input_summary(context: dict[str, Any]) -> str:
rows = [
("Current harness config", 1, "global config span", "system prompt, model settings, tool policy, eval metadata"),
("SDK execution traces", len(context["traces"]), "original runtime traces", "agent steps, tool calls, outputs"),
("Human feedback", len(context["human_feedback"]), "appended to the source trace", "reviewer summary, required observations, prohibited claims"),
("LLM feedback", len(context["llm_feedback"]), "appended to the source trace", "model-generated observations"),
("Generated eval definitions", len(context["eval_suite"]), "appended to the source trace", "expected behavior, rubric, pass/fail examples"),
("Promptfoo row results", len(context["gate_result"]["rows"]), "appended to the source trace", "pass/fail outcome and explanation"),
("Promptfoo gate summary", 1, "global summary span", "suite totals across all evals"),
]
lines = [
"### HALO input summary",
"",
"| Input signal | Count | Where it lives | What is included |",
"| --- | ---: | --- | --- |",
]
lines.extend(f"| {name} | {count} | {location} | {included} |" for name, count, location, included in rows)
return "\n".join(lines)

将反馈、生成的 evals 和 eval 结果附加到 traces

写入 HALO 将要检查的合并 trace 文件。人工反馈、LLM 反馈、生成的 eval 定义以及按行划分的 Promptfoo 结果,都会附加到对应的运行时 trace 上。整体 gate 摘要保持为全局内容,因为它描述的是整个套件。

def write_halo_optimization_context(context: dict[str, Any]) -> Path:
context_path = ARTIFACT_DIR / "halo_optimization_context.jsonl"
lines = HALO_TRACE_PATH.read_text(encoding="utf-8").splitlines() if HALO_TRACE_PATH.exists() else []
lines.append(json.dumps(synthetic_span(
trace_id=synthetic_trace_id("current-harness-config"),
span_id=synthetic_span_id("current-harness-config"),
name="harness.config",
observation_kind="HARNESS_CONFIG",
attributes={
"harness.version": context["agent_config"]["version"],
"harness.system_prompt": context["agent_config"]["system_prompt"],
"harness.model_settings": json.dumps(context["agent_config"]["model_settings"]),
"harness.tool_policy": json.dumps(context["agent_config"]["tool_policy"]),
"harness.eval_metadata": json.dumps(context["agent_config"]["eval_metadata"]),
"optimizer.signal_source": "harness_config",
},
)))
for index, item in enumerate(context["human_feedback"]):
lines.append(json.dumps(synthetic_span(
trace_id=item["trace_id"],
span_id=synthetic_span_id(f"human-feedback-{index}"),
name="human_feedback.comment",
observation_kind="HUMAN_FEEDBACK",
attributes={
"feedback.id": item["feedback_id"],
"feedback.trace_id": item["trace_id"],
"feedback.trace_label": item["trace_label"],
"feedback.question": item["question"],
"feedback.summary": item["summary"],
"feedback.required_observations": json.dumps(item["required_observations"]),
"feedback.prohibited_claims": json.dumps(item["prohibited_claims"]),
"optimizer.signal_source": "human_feedback",
},
)))
for index, item in enumerate(context["llm_feedback"]):
lines.append(json.dumps(synthetic_span(
trace_id=item["trace_id"],
span_id=synthetic_span_id(f"llm-insight-{index}"),
name="llm_feedback.insight",
observation_kind="LLM_FEEDBACK",
attributes={
"llm_feedback.id": item["insight_id"],
"llm_feedback.trace_id": item["trace_id"],
"llm_feedback.trace_label": item["trace_label"],
"llm_feedback.question": item["question"],
"llm_feedback.observations": json.dumps(item["observations"]),
"optimizer.signal_source": "llm_feedback",
},
)))
for index, item in enumerate(context["eval_suite"]):
lines.append(json.dumps(synthetic_span(
trace_id=item["source_trace_id"],
span_id=synthetic_span_id(f"generated-eval-{index}"),
name="generated_eval.definition",
observation_kind="EVAL",
attributes={
"eval.id": item["eval_id"],
"eval.trace_id": item["source_trace_id"],
"eval.trace_label": item["source_trace_label"],
"eval.title": item["title"],
"eval.method": item["scoring_method"],
"eval.expected_behavior": item["expected_behavior"],
"eval.pass_example": item["suggested_pass_example"],
"eval.fail_example": item["suggested_fail_example"],
"optimizer.signal_source": "generated_eval",
},
)))
lines.append(json.dumps(synthetic_span(
trace_id=synthetic_trace_id("eval-gate-summary"),
span_id=synthetic_span_id("eval-gate-summary"),
name="eval_gate.summary",
observation_kind="EVAL_RESULT",
attributes={
"eval_gate.total": context["gate_result"]["total"],
"eval_gate.passed": context["gate_result"]["passed"],
"eval_gate.failed": context["gate_result"]["failed"],
"optimizer.signal_source": "eval_gate",
},
)))
for index, item in enumerate(context["gate_result"]["rows"]):
lines.append(json.dumps(synthetic_span(
trace_id=item["trace_id"],
span_id=synthetic_span_id(f"eval-gate-row-{index}"),
name="eval_gate.result",
observation_kind="EVAL_RESULT",
attributes={
"eval.id": item["eval_id"],
"eval.title": item["title"],
"eval.trace_id": item["trace_id"],
"eval.trace_label": item["trace_label"],
"eval.passed": item["passed"],
"eval.explanation": item["explanation"],
"optimizer.signal_source": "eval_gate",
},
)))
context_path.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
return context_path

定义 HALO 输出提示词

这个提示词告诉 HALO 要生成哪种报告,包括 Codex 应在最终交接文件中接收的各个部分。你可以自定义它,以匹配你公司的工作流、审查流程或使用场景。

def render_halo_prompt() -> str:
return """
Analyze the financial diligence analyst optimization context as the central source of truth.
The JSONL contains the current harness configuration, agent execution traces, human feedback, LLM insights, generated eval definitions, and eval-gate results.
Treat human feedback as first-class evidence.
Before recommending a change, compare the evidence against the current harness config and distinguish:
- a requirement that is missing from the harness,
- a requirement already present but not reliably followed in execution, and
- an implementation or observability defect.

Write an implementation-first Codex handoff in this exact top-level order:
1. `## Executive summary`
2. `## Top 3 changes to implement first`
3. `## Ranked recommendation table`
4. `## Supporting diagnosis and evidence`
5. `## Detailed recommendations`
6. `## Insights by feedback source`
7. `## Machine-readable summary`

Section requirements:
- `## Executive summary`: briefly state what the current harness already does well, what the highest-value remaining gaps are, and whether the current eval gate passed.
- `## Top 3 changes to implement first`: list the three most valuable implementation moves with concise rationale.
- `## Ranked recommendation table`: include rank, recommendation, impact, confidence, implementation effort, evidence, and validation.
- `## Supporting diagnosis and evidence`: include recurring harness-level failure modes, classify each against the current harness as missing requirement vs already-present-but-not-reliably-followed vs implementation/observability defect, and state the evidence source for each.
- `## Detailed recommendations`: use these exact subsection headings in this order and do not use the word "owner" in them:
- `### Behavior contract`
- `#### Prompt`
- `#### Skills`
- `### Runtime implementation`
- `#### Tools`
- `#### Control flow`
- `#### Routing`
- `### Output contract`
- `#### Artifact schema`
- `### Observability and evals`
- `#### Observability`
- `#### Evals`
- `## Insights by feedback source`: summarize what came from traces, human feedback, LLM feedback, generated evals, eval-gate results, and harness config.
- `## Machine-readable summary`: include one fenced JSON block with `top_priorities`.

Do not add extra top-level sections outside that order.
""".strip()

运行 HALO 并格式化报告

HALO 接收五条 SDK 执行 trace,以及两条合成的全局 trace:一条记录当前 harness 配置,另一条记录 Promptfoo gate 摘要。这就是为什么它的 trace 数量会高于前面创建的五次 agent 运行。

生成完整的优化报告,保存交接 artifact,并在 notebook 中显示最高优先级的建议。

async def run_halo_optimization(context_path: Path) -> str:
from agents import set_trace_processors
from engine.agents.agent_config import AgentConfig as HaloAgentConfig
from engine.engine_config import EngineConfig
from engine.main import stream_engine_async
from engine.sandbox.sandbox import Sandbox
from engine.model_config import ModelConfig
from engine.models.engine_output import AgentOutputItem, AgentTextDelta
from engine.models.messages import AgentMessage

# HALO's current CLI wrapper sets compaction temperature to 0.0, which is not
# accepted by GPT-5-class models. Use the Python API so the compactor uses the
# model default-compatible temperature while preserving the requested model.
agent = HaloAgentConfig(
name="root",
model=ModelConfig(name=HALO_MODEL),
maximum_turns=20,
)
config = EngineConfig(
root_agent=agent,
subagent=agent.model_copy(update={"name": "sub"}),
synthesis_model=ModelConfig(name=HALO_MODEL),
compaction_model=ModelConfig(name=HALO_MODEL, temperature=1.0),
maximum_depth=1,
maximum_parallel_subagents=2,
)

# The notebook already exports the SDK traces locally; HALO does not need
# hosted trace ingestion for this diagnosis pass.
set_trace_processors([])

deltas: list[str] = []
final_items: list[str] = []
messages = [AgentMessage(role="user", content=render_halo_prompt())]

# This pass only needs HALO's trace-analysis tools. Skip the optional
# `run_code` sandbox so readers do not need a separate Deno/Pyodide setup
# just to generate the optimization report.
async def report_progress(done: asyncio.Event, interval_seconds: int = 30) -> None:
started = time.perf_counter()
print("HALO optimization started. This is usually the longest cell in the notebook.")
while not done.is_set():
try:
await asyncio.wait_for(done.wait(), timeout=interval_seconds)
except TimeoutError:
print(f"HALO still running... {format_duration(time.perf_counter() - started)} elapsed")

original_sandbox_get = Sandbox.__dict__["get"]
Sandbox.get = classmethod(lambda cls: None)
halo_started = time.perf_counter()
progress_done = asyncio.Event()
progress_task = asyncio.create_task(report_progress(progress_done))
try:
async for event in stream_engine_async(messages, config, context_path):
if isinstance(event, AgentTextDelta):
deltas.append(event.text_delta)
elif isinstance(event, AgentOutputItem) and event.final:
final_items.append(str(event.item))
finally:
progress_done.set()
await progress_task
Sandbox.get = original_sandbox_get

print(f"HALO optimization completed in {format_duration(time.perf_counter() - halo_started)}")
report = "".join(deltas).strip() or "\n\n".join(final_items).strip()
if not report:
raise RuntimeError("HALO completed without producing a report.")
return report

def clean_halo_handoff(report: str) -> str:
"""Keep only the final Codex-facing handoff sections from HALO output."""
normalized = re.sub(r"(?<!\n)(## Executive summary)", r"\n\n\1", report).strip()
start = normalized.rfind("## Executive summary")
if start == -1:
raise ValueError("HALO output did not include the expected executive summary section.")

handoff = normalized[start:].strip()
required_headings = [
"## Executive summary",
"## Top 3 changes to implement first",
"## Ranked recommendation table",
"## Supporting diagnosis and evidence",
"## Detailed recommendations",
"## Insights by feedback source",
"## Machine-readable summary",
]
missing = [heading for heading in required_headings if heading not in handoff]
if missing:
raise ValueError(f"HALO handoff is missing required sections: {missing}")
return handoff

def write_halo_handoff(report: str, path: str | Path) -> Path:
target = Path(path)
if not target.is_absolute():
target = PROJECT_ROOT / target
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(report.rstrip() + "\n", encoding="utf-8")
return target

halo_context = build_halo_context(traces, human_feedback, llm_feedback, eval_suite, gate_result, agent_config)
display(Markdown(halo_input_summary(halo_context)))
halo_context_path = write_halo_optimization_context(halo_context)
halo_report = await run_halo_optimization(halo_context_path)
clean_handoff = clean_halo_handoff(halo_report)

handoff_path = write_halo_handoff(clean_handoff, ARTIFACT_DIR / "codex_handoff.md")

def extract_named_section(report: str, heading: str) -> str:
if heading not in report:
return ""
start = report.index(heading)
remainder = report[start + len(heading):]
next_section = re.search(r"\n## ", remainder)
return report[start:] if next_section is None else report[start:start + len(heading) + next_section.start()]

def render_notebook_halo_summary(report: str) -> str:
sections = [
extract_named_section(report, "## Top 3 changes to implement first"),
extract_named_section(report, "## Insights by feedback source"),
]
rendered = "\n\n".join(section.strip() for section in sections if section.strip())
return rendered or report

print("Gate result passed into optimization context:", "gate_result" in halo_context)
print("Wrote:")
print("-", halo_context_path.relative_to(PROJECT_ROOT))
print("-", handoff_path.relative_to(PROJECT_ROOT))

HALO 输入摘要

输入信号数量所在位置包含内容
当前 harness 配置1全局 config spansystem prompt、模型设置、工具策略、eval 元数据
SDK 执行 trace5原始运行时 traceagent 步骤、工具调用、输出
人类反馈5追加到源 trace审查者摘要、必需观察项、禁止性陈述
LLM 反馈5追加到源 trace模型生成的观察项
生成的 eval 定义5追加到源 trace预期行为、评分标准、通过/失败示例
Promptfoo 行结果5追加到源 trace通过/失败结果及解释
Promptfoo gate 摘要1全局摘要 span所有 eval 的 suite 汇总
HALO 优化已开始。这通常是笔记本中耗时最长的单元。
HALO 仍在运行... 已过去 30s
HALO 仍在运行... 已过去 1m 00s
HALO 仍在运行... 已过去 1m 30s
HALO 仍在运行... 已过去 2m 00s
HALO 仍在运行... 已过去 2m 30s
HALO 仍在运行... 已过去 3m 00s
HALO 仍在运行... 已过去 3m 30s
HALO 仍在运行... 已过去 4m 00s
HALO 仍在运行... 已过去 4m 30s
HALO 仍在运行... 已过去 5m 00s
HALO 仍在运行... 已过去 5m 30s
HALO 仍在运行... 已过去 6m 00s
HALO 仍在运行... 已过去 6m 30s
HALO 仍在运行... 已过去 7m 00s
HALO 优化已在 7m 15s 内完成
传入优化上下文的 gate 结果:True
已写入:
- examples/agents_sdk/agent_improvement_loop_artifacts/halo_optimization_context.jsonl
- examples/agents_sdk/agent_improvement_loop_artifacts/codex_handoff.md

第 8 步:将完整报告交给 Codex

HALO 负责诊断和确定优先级。编码 agent 或人类仍然需要对 harness 进行修改。

下面展示的是 Codex 可直接执行的完整报告快照:前三条建议,以及按各反馈来源汇总的精简摘要。完整的 codex_handoff.md 文件还包含了已排序的变更项、支撑证据,以及用于实施的验证指导。

handoff_file = ARTIFACT_DIR / "codex_handoff.md"

if handoff_file.exists():
print(f"完整的 Codex 交接文件已写入:{handoff_file.relative_to(PROJECT_ROOT)}")
print("下方为快照;请打开生成的 codex_handoff.md 文件查看完整交接内容。")
display(Markdown(render_notebook_halo_summary(handoff_file.read_text(encoding="utf-8"))))
else:
print(f"尚未找到 Codex 交接文件:{handoff_file.relative_to(PROJECT_ROOT)}")
print("运行上方的 HALO 优化单元以生成它。")
完整的 Codex 交接文件已写入:examples/agents_sdk/agent_improvement_loop_artifacts/codex_handoff.md
下方为快照;请打开生成的 codex_handoff.md 文件查看完整交接内容。

优先实现的前三项变更

  1. 添加确定性的尽调事实账本和领域检查清单层。 为 ARR、runway/burn、母账户集中度、不支持的指标以及 SOC 2 状态编码规范事实和权威来源规则,这样 agent 就不能只依赖通用的引用说明。

  2. 升级验证器,使其审计真实输出工件,而不仅仅是声称的证据覆盖情况。 当前验证可能会通过,但工件级别的引用或声明审计问题仍需后续修复。应解析生成的 markdown/JSON/CSV 工件,提取关键声明,验证其是否有来源支持,并在存在无依据或未审计声明时判定失败。

  3. 将生成的五个 eval 持久化到已检入的回归测试套件中。 这些生成的 eval 全部通过了,但它们应成为长期保留的回归测试,以确保未来的 prompt/runtime 变更不会在这些特定的人类反馈问题上出现回退。

按反馈来源划分的洞察

反馈来源关键洞察
Tracesagent 通常会遵循工件生成工作流和验证循环,但执行方式较为通用,有时也较为单体化。一些修复发生在验证器通过之后,这表明验证还不够严格。关于母公司集中度的 trace 展示了一种值得推广的确定性计算模式。
人类反馈人类反馈是领域缺口最有力的证据:runway 必须是 11 个月且存在融资压力;ARR 必须使用财务控制的权威来源;集中度必须汇总到母账户;不能混淆 SOC 2 Type I 和 Type II;对于没有支持依据的官方 NRR 和 CAC payback 必须拒绝给出。
LLM 反馈LLM 洞察强化了人类反馈的主题:ARR 标题数字需要附带限定说明,不应突出无支持依据的指标,留存率和 pipeline 声明需要标注来源限制,且不能夸大 SOC 2 Type II 的完成状态。
生成的 evals基于这些反馈主题生成了五个有针对性的 eval:runway/burn、ARR 权威来源、客户集中度母账户汇总、SOC 2 精确性,以及对不支持指标的拒答。这些 eval 编码了正确的回归覆盖面,应被检入。
Eval-gate 结果当前 eval gate 已通过:总计 5 个,5 个通过,0 个失败。这表明最新生成的 eval 套件已满足要求,但该套件应被持久化并扩展,以覆盖验证器、工件解析和计算正确性。
Harness 配置harness 已经具备较强的通用证据、引用、工件和验证要求。其主要弱点在于缺少显式的财务尽调不变量,以及针对反馈中暴露出的具体错误所做的确定性运行时检查。

第 9 步:闭环

现在完整工作流已经搭建完成,我们可以回到笔记本开头的优化飞轮。相同的架构支持两种运行模式。

Agent improvement loop flywheel

Human review gates in the loop

它可以作为闭环运行,其中新的 traces、人类和模型反馈、生成的 Promptfoo evals、HALO 诊断、Codex 实现、验证和部署都会流入下一轮循环。在这种模式下,交接工件可以写入共享存储,而带有 heartbeat 的 Codex 自动化可以持续检查是否有新的交接内容,在发现后被唤醒,并自动触发下一轮实现。

开发者也可以在他们希望的位置加入人工 gate,包括 trace 审查、eval 精炼、pull request 批准、合并和部署。

设计上的选择在于:在人类提供反馈之后,还要让人类参与多少。人类判断可以引导一个由 agents 执行的循环,也可以在整个过程中始终保留为审批 gate。无论是哪种版本,人类反馈都始终处于核心位置,因为它决定了系统学习什么,以及下一步改变什么。

结论

agent 改进循环提供了一条通向持续改进的路径,而不是仅仅把问题简化为 prompt 调优。完整闭环至关重要:traces 捕获行为,人类反馈提供判断,evals 保留系统应当执行的内容,HALO 将这些证据转化为已排序的 harness 变更,而 Codex 则可以实现下一轮改进。

这一领域仍在不断演进,一些单独的组件未来很可能会发生变化。更持久的部分是“循环工程”这一更大的理念:当反馈、测试和实现被连接在同一个闭环中时,agents 就能从真实行为中不断改进。

后续步骤

  • 通过编辑笔记本顶部附近的 AGENT_MODELANALYSIS_MODELEVAL_GENERATION_MODELJUDGE_MODELHALO_MODEL,为循环的每个阶段选择模型。
  • 创建你自己的 traces 来测试 agent。
  • 决定最终路径中有多少部分需要保留人工审查,多少部分可以自动化:你可以停留在由开发者审查的 PR,也可以将交接内容接入一个能够自动创建、合并并部署变更的系统。
  • ARTIFACT_DIR 下生成的 codex_handoff.md 文件交给 Codex,检查它提出的 harness 变更,并针对更新后的 harness 重新运行同一套 eval。