工作流开发指南
概述
工作流 (Workflow) 是智航云 AI 中台的多步骤自动化引擎,基于 DAG (有向无环图) 模型。你可以将 AI 生成、工具调用、HTTP 请求、条件判断等节点组合为复杂的自动化流程。
工作流结构
一个工作流由三部分组成:
json
{
"name": "每日巡检流程",
"trigger": { "type": "cron", "schedule": "0 9 * * *" },
"nodes": [...],
"edges": [...]
}触发方式
| 触发类型 | 说明 | 示例 |
|---|---|---|
manual | 手动触发(通过 API 调用) | { "type": "manual" } |
cron | 定时触发(cron 表达式) | { "type": "cron", "schedule": "0 9 * * *" } |
event | 事件触发 | { "type": "event", "event": "tool.executed" } |
Cron 表达式格式:分 时 日 月 星期
| 表达式 | 说明 |
|---|---|
0 9 * * * | 每天早上 9 点 |
0 */2 * * * | 每 2 小时 |
0 9 * * 1-5 | 工作日早上 9 点 |
0 0 1 * * | 每月 1 号零点 |
节点类型
| 类型 | 说明 | 用途 |
|---|---|---|
ai-generate | AI 生成 | 使用 AI 模型生成文本或分析数据 |
tool-call | 工具调用 | 调用已注册的工具 |
http-request | HTTP 请求 | 发送自定义 HTTP 请求 |
condition | 条件判断 | 根据表达式决定分支 |
email-send | 发送邮件 | 发送通知邮件 |
边 (Edge)
边定义节点之间的执行顺序,支持条件分支:
json
{
"edges": [
{ "from": "step1", "to": "step2" },
{ "from": "step2", "to": "step3_yes", "condition": "true" },
{ "from": "step2", "to": "step3_no", "condition": "false" }
]
}创建工作流
bash
curl -X POST https://api.invoratec.cn/v1/workflows \
-H "X-API-Key: claw_xxxx" \
-H "Content-Type: application/json" \
-d '{
"name": "设备异常自动处理",
"description": "检测到设备异常时自动分析并通知",
"trigger": { "type": "manual" },
"nodes": [
{
"id": "fetch_data",
"type": "tool-call",
"config": {
"tool": "iot_get_all_devices",
"input": { "status": "alarm" }
}
},
{
"id": "analyze",
"type": "ai-generate",
"config": {
"prompt": "分析以下设备报警数据,判断严重程度并给出处理建议:\n{{fetch_data.result}}"
}
},
{
"id": "check_severity",
"type": "condition",
"config": {
"expression": "{{analyze.result}}.includes(\"严重\")"
}
},
{
"id": "send_urgent",
"type": "email-send",
"config": {
"to": "emergency@example.com",
"subject": "【紧急】设备严重报警",
"body": "{{analyze.result}}"
}
},
{
"id": "send_normal",
"type": "email-send",
"config": {
"to": "ops@example.com",
"subject": "设备报警通知",
"body": "{{analyze.result}}"
}
}
],
"edges": [
{ "from": "fetch_data", "to": "analyze" },
{ "from": "analyze", "to": "check_severity" },
{ "from": "check_severity", "to": "send_urgent", "condition": "true" },
{ "from": "check_severity", "to": "send_normal", "condition": "false" }
]
}'python
wf = claw.workflow.create(
name="设备异常自动处理",
description="检测到设备异常时自动分析并通知",
trigger={"type": "manual"},
nodes=[
{
"id": "fetch_data",
"type": "tool-call",
"config": {"tool": "iot_get_all_devices", "input": {"status": "alarm"}}
},
{
"id": "analyze",
"type": "ai-generate",
"config": {"prompt": "分析设备报警数据:{{fetch_data.result}}"}
},
{
"id": "notify",
"type": "email-send",
"config": {"to": "ops@example.com", "subject": "报警通知", "body": "{{analyze.result}}"}
}
],
edges=[
{"from": "fetch_data", "to": "analyze"},
{"from": "analyze", "to": "notify"}
]
)javascript
const wf = await claw.workflows.create({
name: '设备异常自动处理',
trigger: { type: 'manual' },
nodes: [
{
id: 'fetch_data',
type: 'tool-call',
config: { tool: 'iot_get_all_devices', input: { status: 'alarm' } }
},
{
id: 'analyze',
type: 'ai-generate',
config: { prompt: '分析设备报警数据:{{fetch_data.result}}' }
},
{
id: 'notify',
type: 'email-send',
config: { to: 'ops@example.com', subject: '报警通知', body: '{{analyze.result}}' }
}
],
edges: [
{ from: 'fetch_data', to: 'analyze' },
{ from: 'analyze', to: 'notify' }
]
})变量引用
节点之间通过 {{nodeId.result}} 语法引用上游节点的输出:
json
{
"id": "step2",
"type": "ai-generate",
"config": {
"prompt": "分析数据:{{step1.result}}"
}
}也可以引用触发数据:
json
{
"config": {
"prompt": "处理事件:{{trigger.data}}"
}
}各节点类型详解
ai-generate -- AI 生成
json
{
"id": "analyze",
"type": "ai-generate",
"config": {
"prompt": "请分析以下数据并给出建议:{{data.result}}",
"model": "claude-sonnet-4-20250514",
"systemPrompt": "你是数据分析专家"
}
}tool-call -- 工具调用
json
{
"id": "get_data",
"type": "tool-call",
"config": {
"tool": "iot_get_energy",
"input": {
"period": "month",
"building": "A栋"
}
}
}http-request -- HTTP 请求
json
{
"id": "call_api",
"type": "http-request",
"config": {
"method": "POST",
"url": "https://api.example.com/notify",
"headers": { "Authorization": "Bearer token" },
"body": { "message": "{{analyze.result}}" }
}
}condition -- 条件判断
json
{
"id": "check",
"type": "condition",
"config": {
"expression": "{{data.result.temperature}} > 80"
}
}条件节点的出边需要标注 condition 字段:"true" 或 "false"。
email-send -- 发送邮件
json
{
"id": "notify",
"type": "email-send",
"config": {
"to": "admin@example.com",
"cc": "manager@example.com",
"subject": "自动化通知",
"body": "处理结果:{{analyze.result}}"
}
}执行工作流
bash
curl -X POST https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/run \
-H "X-API-Key: claw_xxxx" \
-H "Content-Type: application/json" \
-d '{
"data": { "override_building": "B栋" }
}'python
run = claw.workflow.run(wf["id"], data={"override_building": "B栋"})
print(run["runId"], run["status"]) # pendingjavascript
const run = await claw.workflows.run(wf.id, {
data: { override_building: 'B栋' }
})
console.log(run.runId, run.status)执行后返回 runId,状态为 pending。使用执行历史 API 查看执行结果。
查看执行历史
bash
# 列出执行历史
curl "https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/runs?limit=10" \
-H "X-API-Key: claw_xxxx"
# 查看具体执行
curl https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/runs/RUN_ID \
-H "X-API-Key: claw_xxxx"python
# 列出执行历史
runs = claw.workflow.runs(wf["id"], limit=10)
# 查看具体执行
detail = claw.workflow.get_run(wf["id"], run["runId"])
for step in detail["stepResults"]:
print(f"[{step['nodeId']}] {step['status']}")javascript
const { runs } = await claw.workflows.listRuns(wf.id, { limit: 10 })
const detail = await claw.workflows.getRun(wf.id, runs[0]._id)执行记录包含每个节点的执行状态和结果:
json
{
"status": "completed",
"stepResults": [
{ "nodeId": "fetch_data", "status": "success", "result": {...} },
{ "nodeId": "analyze", "status": "success", "result": "分析结果..." },
{ "nodeId": "notify", "status": "success", "result": { "sent": true } }
]
}实战示例:HR 月度报告自动化
python
claw.workflow.create(
name="HR月度报告",
description="每月1号自动生成并发送HR报告",
trigger={"type": "cron", "schedule": "0 9 1 * *"},
nodes=[
{
"id": "attendance",
"type": "tool-call",
"config": {"tool": "hr_get_attendance_summary", "input": {"period": "last_month"}}
},
{
"id": "recruitment",
"type": "tool-call",
"config": {"tool": "hr_get_recruitment_stats", "input": {"period": "last_month"}}
},
{
"id": "turnover",
"type": "tool-call",
"config": {"tool": "hr_get_turnover_data", "input": {"period": "last_month"}}
},
{
"id": "report",
"type": "ai-generate",
"config": {
"prompt": "根据以下数据生成月度HR报告:\n\n考勤数据:{{attendance.result}}\n招聘数据:{{recruitment.result}}\n离职数据:{{turnover.result}}\n\n请生成专业的Markdown格式报告,包含数据分析和改进建议。"
}
},
{
"id": "send",
"type": "email-send",
"config": {
"to": "hr-director@example.com",
"cc": "ceo@example.com",
"subject": "{{trigger.month}} 月度HR报告",
"body": "{{report.result}}"
}
}
],
edges=[
{"from": "attendance", "to": "report"},
{"from": "recruitment", "to": "report"},
{"from": "turnover", "to": "report"},
{"from": "report", "to": "send"}
]
)DAG 引擎会自动并行执行 attendance、recruitment、turnover 三个节点,待全部完成后再执行 report 节点。
下一步
- 事件与 Webhook -- 使用事件触发工作流
- 最佳实践 -- 工作流设计建议
- API 参考: 工作流 -- 完整 API 端点文档
