Skip to content

最佳实践

API Key 安全

环境变量管理

不要在代码中硬编码 API Key:

python
# 推荐
import os
claw = ClawPlatform(api_key=os.environ["CLAW_API_KEY"])

# 不推荐
claw = ClawPlatform(api_key="claw_a1b2c3d4...")

权限最小化

为不同用途创建不同权限的 Key:

用途权限说明
前端展示read仅查询数据
后端服务read, write读写操作
管理脚本admin仅管理员使用

定期轮换

python
# 创建新 Key
new_key = requests.post("/v1/auth/api-keys", json={"name": "Q2-2026"})

# 更新应用配置使用新 Key
# ...

# 吊销旧 Key
requests.delete(f"/v1/auth/api-keys/{old_key_id}")

工具设计

命名规范

使用 系统_动作_对象 格式:

erp_get_inventory        # ERP 查询库存
erp_update_stock         # ERP 更新库存
hr_get_employee_list     # HR 获取员工列表
iot_get_device_status    # IoT 获取设备状态

描述清晰

好的工具描述直接影响 AI 调用的准确性:

json
{
  "description": "查询指定仓库的库存数据。返回商品列表,包含名称、数量、单价。可按产品编码过滤。仓库ID格式为WH-XXX。"
}

要点:

  • 说明功能和返回内容
  • 说明参数格式要求
  • 说明适用场景

幂等设计

工具应设计为可安全重复调用:

json
{
  "name": "erp_update_stock",
  "description": "设置指定商品的库存数量(覆盖写入,非累加)",
  "execution": {
    "type": "http",
    "method": "PUT",
    "url": "https://erp.example.com/api/stock/{product_id}",
    "body": { "quantity": "{quantity}" }
  }
}

知识库管理

文档粒度

场景建议长度说明
FAQ100-500 字一个问题一个文档
操作手册500-2000 字按章节拆分
技术文档1000-5000 字按主题拆分

过长的文档会降低语义搜索的精度。

定期更新

python
# 定时任务:每周同步最新文档
import schedule

def sync_knowledge():
    docs = fetch_latest_docs_from_source()
    for doc in docs:
        claw.knowledge.upload(
            doc["content"],
            knowledge_id=kb_id,
            title=doc["title"],
            metadata={"synced_at": datetime.now().isoformat()}
        )

schedule.every().monday.at("03:00").do(sync_knowledge)

多知识库隔离

不同业务领域使用独立的知识库:

python
# 分领域创建知识库
ops_kb = claw.knowledge.create("运维知识库")
hr_kb = claw.knowledge.create("人事政策库")
safety_kb = claw.knowledge.create("安全规范库")

工作流设计

错误处理

在关键步骤后添加条件判断:

json
{
  "nodes": [
    { "id": "fetch", "type": "tool-call", "config": { "tool": "api_call" } },
    { "id": "check", "type": "condition", "config": { "expression": "{{fetch.status}} === 'success'" } },
    { "id": "process", "type": "ai-generate", "config": { "prompt": "处理:{{fetch.result}}" } },
    { "id": "error_notify", "type": "email-send", "config": { "to": "admin@example.com", "subject": "工作流异常" } }
  ],
  "edges": [
    { "from": "fetch", "to": "check" },
    { "from": "check", "to": "process", "condition": "true" },
    { "from": "check", "to": "error_notify", "condition": "false" }
  ]
}

并行优化

DAG 引擎自动并行执行无依赖关系的节点:

fetch_sales ─────┐
fetch_inventory ──┤──→ analyze ──→ send_report
fetch_orders ────┘

三个数据获取节点会同时执行,大幅减少总耗时。

超时设计

对外部 API 调用设置合理的超时时间,避免工作流挂起。HTTP 工具的默认超时为 10 秒。

AI 集成

System Prompt 模板

针对不同场景编写专用的 System Prompt:

python
PROMPTS = {
    "ops": "你是运维助手。回答设备相关问题,温度单位为摄氏度,能耗单位为kWh。",
    "hr": "你是HR助手。回答人事政策问题,注意保护员工隐私。",
    "finance": "你是财务助手。金额精确到分,使用人民币。"
}

response = claw.ai.chat(
    message=user_input,
    system_prompt=PROMPTS[current_app]
)

流式优先

面向用户的交互场景优先使用流式 API,改善用户体验:

javascript
// 在前端使用流式 API
const stream = claw.ai.chatStream(message)
for await (const chunk of stream) {
  if (chunk.type === 'stream') {
    appendToUI(chunk.text)  // 逐字显示
  }
}

Token 成本控制

  • 使用 usage 字段监控 Token 消耗
  • 对于简单查询使用较小的模型
  • 在 System Prompt 中指示 AI 简洁回答
  • 定期检查 /v1/auth/usage 用量

限流处理

指数退避重试

javascript
async function withRetry(fn, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn()
    } catch (err) {
      if (err.statusCode === 429 && i < maxRetries - 1) {
        const delay = Math.pow(2, i) * 1000 + Math.random() * 1000
        await new Promise(r => setTimeout(r, delay))
        continue
      }
      throw err
    }
  }
}

请求队列

高并发场景下使用请求队列控制并发:

python
import asyncio
from asyncio import Semaphore

semaphore = Semaphore(10)  # 最多10个并发请求

async def rate_limited_call(fn):
    async with semaphore:
        return await fn()

Webhook 可靠性

快速响应

Webhook 处理器应在 5 秒内返回 200 状态码。耗时操作放到异步队列:

python
@app.route('/webhook', methods=['POST'])
def handle_webhook():
    data = request.json
    # 放入异步队列处理
    task_queue.enqueue(process_webhook, data)
    return 'OK', 200  # 立即响应

签名验证

始终验证 Webhook 签名,防止伪造请求。参见 事件指南 中的签名验证代码。

幂等处理

同一事件可能被投递多次,处理逻辑应具备幂等性:

python
processed_events = set()

def process_event(event_id, data):
    if event_id in processed_events:
        return  # 已处理过,跳过
    processed_events.add(event_id)
    # 实际处理逻辑

生产环境检查清单

  • [ ] API Key 使用环境变量,不在代码中硬编码
  • [ ] 启用 HTTPS
  • [ ] 配置了限流重试逻辑
  • [ ] 关键工作流有错误处理分支
  • [ ] 知识库内容定期更新
  • [ ] 监控 API 用量和错误率
  • [ ] 数据库定期备份
  • [ ] Webhook 处理器有签名验证
  • [ ] 日志记录完整,便于排查问题

智航云 AI 中台 — 开发者平台