Skip to content

工作流开发指南

概述

工作流 (Workflow) 是智航云 AI 中台的多步骤自动化引擎,基于 DAG (有向无环图) 模型。你可以将 AI 生成、工具调用、HTTP 请求、条件判断等节点组合为复杂的自动化流程。

工作流结构

一个工作流由三部分组成:

json
{
  "name": "每日巡检流程",
  "trigger": { "type": "cron", "schedule": "0 9 * * *" },
  "nodes": [...],
  "edges": [...]
}

触发方式

触发类型说明示例
manual手动触发(通过 API 调用){ "type": "manual" }
cron定时触发(cron 表达式){ "type": "cron", "schedule": "0 9 * * *" }
event事件触发{ "type": "event", "event": "tool.executed" }

Cron 表达式格式:分 时 日 月 星期

表达式说明
0 9 * * *每天早上 9 点
0 */2 * * *每 2 小时
0 9 * * 1-5工作日早上 9 点
0 0 1 * *每月 1 号零点

节点类型

类型说明用途
ai-generateAI 生成使用 AI 模型生成文本或分析数据
tool-call工具调用调用已注册的工具
http-requestHTTP 请求发送自定义 HTTP 请求
condition条件判断根据表达式决定分支
email-send发送邮件发送通知邮件

边 (Edge)

边定义节点之间的执行顺序,支持条件分支:

json
{
  "edges": [
    { "from": "step1", "to": "step2" },
    { "from": "step2", "to": "step3_yes", "condition": "true" },
    { "from": "step2", "to": "step3_no", "condition": "false" }
  ]
}

创建工作流

bash
curl -X POST https://api.invoratec.cn/v1/workflows \
  -H "X-API-Key: claw_xxxx" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "设备异常自动处理",
    "description": "检测到设备异常时自动分析并通知",
    "trigger": { "type": "manual" },
    "nodes": [
      {
        "id": "fetch_data",
        "type": "tool-call",
        "config": {
          "tool": "iot_get_all_devices",
          "input": { "status": "alarm" }
        }
      },
      {
        "id": "analyze",
        "type": "ai-generate",
        "config": {
          "prompt": "分析以下设备报警数据,判断严重程度并给出处理建议:\n{{fetch_data.result}}"
        }
      },
      {
        "id": "check_severity",
        "type": "condition",
        "config": {
          "expression": "{{analyze.result}}.includes(\"严重\")"
        }
      },
      {
        "id": "send_urgent",
        "type": "email-send",
        "config": {
          "to": "emergency@example.com",
          "subject": "【紧急】设备严重报警",
          "body": "{{analyze.result}}"
        }
      },
      {
        "id": "send_normal",
        "type": "email-send",
        "config": {
          "to": "ops@example.com",
          "subject": "设备报警通知",
          "body": "{{analyze.result}}"
        }
      }
    ],
    "edges": [
      { "from": "fetch_data", "to": "analyze" },
      { "from": "analyze", "to": "check_severity" },
      { "from": "check_severity", "to": "send_urgent", "condition": "true" },
      { "from": "check_severity", "to": "send_normal", "condition": "false" }
    ]
  }'
python
wf = claw.workflow.create(
    name="设备异常自动处理",
    description="检测到设备异常时自动分析并通知",
    trigger={"type": "manual"},
    nodes=[
        {
            "id": "fetch_data",
            "type": "tool-call",
            "config": {"tool": "iot_get_all_devices", "input": {"status": "alarm"}}
        },
        {
            "id": "analyze",
            "type": "ai-generate",
            "config": {"prompt": "分析设备报警数据:{{fetch_data.result}}"}
        },
        {
            "id": "notify",
            "type": "email-send",
            "config": {"to": "ops@example.com", "subject": "报警通知", "body": "{{analyze.result}}"}
        }
    ],
    edges=[
        {"from": "fetch_data", "to": "analyze"},
        {"from": "analyze", "to": "notify"}
    ]
)
javascript
const wf = await claw.workflows.create({
  name: '设备异常自动处理',
  trigger: { type: 'manual' },
  nodes: [
    {
      id: 'fetch_data',
      type: 'tool-call',
      config: { tool: 'iot_get_all_devices', input: { status: 'alarm' } }
    },
    {
      id: 'analyze',
      type: 'ai-generate',
      config: { prompt: '分析设备报警数据:{{fetch_data.result}}' }
    },
    {
      id: 'notify',
      type: 'email-send',
      config: { to: 'ops@example.com', subject: '报警通知', body: '{{analyze.result}}' }
    }
  ],
  edges: [
    { from: 'fetch_data', to: 'analyze' },
    { from: 'analyze', to: 'notify' }
  ]
})

变量引用

节点之间通过 {{nodeId.result}} 语法引用上游节点的输出:

json
{
  "id": "step2",
  "type": "ai-generate",
  "config": {
    "prompt": "分析数据:{{step1.result}}"
  }
}

也可以引用触发数据:

json
{
  "config": {
    "prompt": "处理事件:{{trigger.data}}"
  }
}

各节点类型详解

ai-generate -- AI 生成

json
{
  "id": "analyze",
  "type": "ai-generate",
  "config": {
    "prompt": "请分析以下数据并给出建议:{{data.result}}",
    "model": "claude-sonnet-4-20250514",
    "systemPrompt": "你是数据分析专家"
  }
}

tool-call -- 工具调用

json
{
  "id": "get_data",
  "type": "tool-call",
  "config": {
    "tool": "iot_get_energy",
    "input": {
      "period": "month",
      "building": "A栋"
    }
  }
}

http-request -- HTTP 请求

json
{
  "id": "call_api",
  "type": "http-request",
  "config": {
    "method": "POST",
    "url": "https://api.example.com/notify",
    "headers": { "Authorization": "Bearer token" },
    "body": { "message": "{{analyze.result}}" }
  }
}

condition -- 条件判断

json
{
  "id": "check",
  "type": "condition",
  "config": {
    "expression": "{{data.result.temperature}} > 80"
  }
}

条件节点的出边需要标注 condition 字段:"true""false"

email-send -- 发送邮件

json
{
  "id": "notify",
  "type": "email-send",
  "config": {
    "to": "admin@example.com",
    "cc": "manager@example.com",
    "subject": "自动化通知",
    "body": "处理结果:{{analyze.result}}"
  }
}

执行工作流

bash
curl -X POST https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/run \
  -H "X-API-Key: claw_xxxx" \
  -H "Content-Type: application/json" \
  -d '{
    "data": { "override_building": "B栋" }
  }'
python
run = claw.workflow.run(wf["id"], data={"override_building": "B栋"})
print(run["runId"], run["status"])  # pending
javascript
const run = await claw.workflows.run(wf.id, {
  data: { override_building: 'B栋' }
})
console.log(run.runId, run.status)

执行后返回 runId,状态为 pending。使用执行历史 API 查看执行结果。

查看执行历史

bash
# 列出执行历史
curl "https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/runs?limit=10" \
  -H "X-API-Key: claw_xxxx"

# 查看具体执行
curl https://api.invoratec.cn/v1/workflows/WORKFLOW_ID/runs/RUN_ID \
  -H "X-API-Key: claw_xxxx"
python
# 列出执行历史
runs = claw.workflow.runs(wf["id"], limit=10)

# 查看具体执行
detail = claw.workflow.get_run(wf["id"], run["runId"])
for step in detail["stepResults"]:
    print(f"[{step['nodeId']}] {step['status']}")
javascript
const { runs } = await claw.workflows.listRuns(wf.id, { limit: 10 })
const detail = await claw.workflows.getRun(wf.id, runs[0]._id)

执行记录包含每个节点的执行状态和结果:

json
{
  "status": "completed",
  "stepResults": [
    { "nodeId": "fetch_data", "status": "success", "result": {...} },
    { "nodeId": "analyze", "status": "success", "result": "分析结果..." },
    { "nodeId": "notify", "status": "success", "result": { "sent": true } }
  ]
}

实战示例:HR 月度报告自动化

python
claw.workflow.create(
    name="HR月度报告",
    description="每月1号自动生成并发送HR报告",
    trigger={"type": "cron", "schedule": "0 9 1 * *"},
    nodes=[
        {
            "id": "attendance",
            "type": "tool-call",
            "config": {"tool": "hr_get_attendance_summary", "input": {"period": "last_month"}}
        },
        {
            "id": "recruitment",
            "type": "tool-call",
            "config": {"tool": "hr_get_recruitment_stats", "input": {"period": "last_month"}}
        },
        {
            "id": "turnover",
            "type": "tool-call",
            "config": {"tool": "hr_get_turnover_data", "input": {"period": "last_month"}}
        },
        {
            "id": "report",
            "type": "ai-generate",
            "config": {
                "prompt": "根据以下数据生成月度HR报告:\n\n考勤数据:{{attendance.result}}\n招聘数据:{{recruitment.result}}\n离职数据:{{turnover.result}}\n\n请生成专业的Markdown格式报告,包含数据分析和改进建议。"
            }
        },
        {
            "id": "send",
            "type": "email-send",
            "config": {
                "to": "hr-director@example.com",
                "cc": "ceo@example.com",
                "subject": "{{trigger.month}} 月度HR报告",
                "body": "{{report.result}}"
            }
        }
    ],
    edges=[
        {"from": "attendance", "to": "report"},
        {"from": "recruitment", "to": "report"},
        {"from": "turnover", "to": "report"},
        {"from": "report", "to": "send"}
    ]
)

DAG 引擎会自动并行执行 attendancerecruitmentturnover 三个节点,待全部完成后再执行 report 节点。

下一步

智航云 AI 中台 — 开发者平台