Skip to content

第42章:跨平台部署

"一次编写,到处运行"——从 Java 到 Agent,跨平台始终是软件工程的核心命题,但 Agent 的跨平台面临着独特的挑战:模型推理的算力依赖、实时通信的低延迟要求、以及多端用户体验的一致性保障。

42.1 概述:为什么需要跨平台

42.1.1 Agent 产品跨平台的必要性

Agent 产品与传统的 Web 应用或移动应用有着本质区别——它需要与用户进行持续的、多轮次的、上下文感知的交互。这种交互模式决定了 Agent 产品必须在用户最方便的场景中出现,无论用户是在浏览器中工作、在手机上查收消息、在 IDE 中编码,还是在微信中沟通。

Agent 跨平台部署的核心驱动力来自三个方面:

1. 用户场景的碎片化

现代用户的信息触达场景高度碎片化。一个企业用户可能在工作时间使用桌面端 IDE 进行编码,在通勤途中通过手机查阅 Agent 生成的报告,在会议中通过大屏展示 Agent 分析的结果。如果 Agent 只存在于单一平台,就会错过大量用户交互机会。

2. 交互形式的多样化

不同平台有着天然适合的交互形式:

  • Web 端:适合复杂的数据展示、长文档编辑、多面板工作台
  • 移动端:适合语音交互、拍照识别、推送通知、快速查阅
  • 桌面端:适合与本地工具深度集成、文件系统访问、后台常驻
  • CLI 端:适合开发者工作流、CI/CD 集成、脚本自动化
  • IDE 端:适合编码辅助、代码审查、调试辅助
  • 微信生态:适合中国企业用户、低门槛触达、社交传播

3. 商业覆盖的最大化

从商业视角看,每多覆盖一个平台,就意味着多一个用户触达渠道、多一种变现方式。特别是 ToB 市场,客户往往要求私有化部署 + 多终端适配,跨平台能力直接决定了产品的竞争力。

42.1.2 Agent 跨平台的特殊挑战

与普通应用不同,Agent 跨平台面临以下特殊挑战:

挑战维度具体问题影响
模型推理边缘设备的算力限制,大模型无法在移动端运行需要云-端协同架构
实时通信Agent 的流式响应需要低延迟、可靠的双向通信WebSocket/SSE 的跨平台实现
上下文同步用户在不同设备间的会话状态需要无缝衔接需要统一的会话管理后端
工具调用不同平台的系统能力差异(如文件系统访问权限)需要平台感知的工具适配层
UI 一致性Agent 的交互体验需要在不同平台上保持连贯设计系统的跨平台适配
安全合规不同平台有不同的安全模型和合规要求如 iOS 的沙箱限制 vs 桌面端的完整权限

42.1.3 跨平台架构总览

一个完整的 Agent 跨平台架构通常包含以下层次:

┌──────────────────────────────────────────────────────┐
│                   客户端展示层                         │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐       │
│  │ Web  │ │ Mobile│ │Desktop│ │ CLI │ │ IDE │       │
│  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘       │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│     │     跨平台通信层(REST API + WebSocket + SSE)    │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│     │     业务逻辑层(Agent Core - 语言无关)            │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│     │     数据持久层(统一数据模型 + 多端同步)           │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│     │     基础设施层(模型推理 · 向量库 · 消息队列)      │
└─────┴────────┴────────┴────────┴────────┴───────────┘

核心设计原则:

  • 后端统一:所有平台共享同一套 Agent Core 后端服务,确保行为一致性
  • 前端适配:每个平台使用最合适的技术栈实现前端,不强制跨平台 UI 框架
  • 通信标准化:使用 REST API + WebSocket/SSE 的标准通信协议,便于各端接入
  • 数据模型统一:定义跨平台的统一数据模型,确保会话、消息等核心数据的一致性

42.2 Web 端部署

42.2.1 Web 端的技术选型

Web 端是 Agent 产品最基础的部署平台,也是大多数 Agent 产品的第一个版本的目标平台。以下是主流的技术选型方案:

前端框架对比:

特性ReactVue 3Next.jsNuxt 3
生态成熟度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Agent 组件库丰富适中丰富适中
SSR 支持需配合 Next.js需配合 Nuxt内置内置
学习曲线中等较低中高中等
适合场景复杂交互型快速原型SEO/SSR全栈型

对于 Agent 产品而言,React + TypeScript 是当前最主流的选择,原因在于:

  1. Agent 产品通常有复杂的交互状态管理需求,React 的状态管理生态最为丰富
  2. TypeScript 的类型系统能有效管理 Agent 交互中的复杂类型(消息、工具调用结果等)
  3. 大量的 AI/Agent 开源组件和示例都基于 React

42.2.2 React + TypeScript 的 Agent Web 客户端架构

下面是一个典型的 Agent Web 客户端架构示例:

typescript
// src/types/agent.ts - 统一类型定义
export interface AgentMessage {
  id: string;
  role: 'user' | 'assistant' | 'system' | 'tool';
  content: string;
  timestamp: number;
  toolCalls?: ToolCall[];
  toolResults?: ToolResult[];
  metadata?: Record<string, unknown>;
}

export interface ToolCall {
  id: string;
  name: string;
  arguments: Record<string, unknown>;
  status: 'pending' | 'running' | 'completed' | 'failed';
}

export interface ToolResult {
  callId: string;
  content: string;
  isError?: boolean;
}

export interface Conversation {
  id: string;
  title: string;
  messages: AgentMessage[];
  model: string;
  createdAt: number;
  updatedAt: number;
}

// src/hooks/useAgent.ts - Agent 交互核心 Hook
import { useState, useCallback, useRef } from 'react';

interface UseAgentOptions {
  apiUrl: string;
  model?: string;
  onToolCall?: (call: ToolCall) => Promise<string>;
}

export function useAgent(options: UseAgentOptions) {
  const [messages, setMessages] = useState<AgentMessage[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const abortControllerRef = useRef<AbortController | null>(null);

  const sendMessage = useCallback(async (content: string) => {
    const userMessage: AgentMessage = {
      id: crypto.randomUUID(),
      role: 'user',
      content,
      timestamp: Date.now(),
    };
    setMessages(prev => [...prev, userMessage]);
    setIsLoading(true);
    setError(null);

    const controller = new AbortController();
    abortControllerRef.current = controller;

    try {
      const response = await fetch(`${options.apiUrl}/chat`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: [...messages, userMessage].map(m => ({
            role: m.role, content: m.content,
          })),
          model: options.model,
          stream: true,
        }),
        signal: controller.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      // 流式读取响应
      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let assistantContent = '';

      const assistantMessage: AgentMessage = {
        id: crypto.randomUUID(),
        role: 'assistant',
        content: '',
        timestamp: Date.now(),
      };
      setMessages(prev => [...prev, assistantMessage]);

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        const chunk = decoder.decode(value, { stream: true });
        const lines = chunk.split('\n').filter(Boolean);
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            if (data === '[DONE]') break;
            const parsed = JSON.parse(data);
            if (parsed.choices?.[0]?.delta?.content) {
              assistantContent += parsed.choices[0].delta.content;
              setMessages(prev =>
                prev.map(m =>
                  m.id === assistantMessage.id
                    ? { ...m, content: assistantContent }
                    : m
                )
              );
            }
          }
        }
      }
    } catch (err: any) {
      if (err.name !== 'AbortError') setError(err.message);
    } finally {
      setIsLoading(false);
      abortControllerRef.current = null;
    }
  }, [messages, options]);

  const stopGeneration = useCallback(() => {
    abortControllerRef.current?.abort();
  }, []);

  return { messages, isLoading, error, sendMessage, stopGeneration };
}

// src/components/ChatPanel.tsx - 聊天面板组件
import { useRef, useEffect } from 'react';
import { useAgent } from '../hooks/useAgent';

interface ChatPanelProps {
  apiUrl: string;
  model?: string;
}

export function ChatPanel({ apiUrl, model }: ChatPanelProps) {
  const { messages, isLoading, error, sendMessage, stopGeneration } = useAgent({
    apiUrl, model,
  });
  const messagesEndRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);

  return (
    <div className="flex flex-col h-full">
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg) => (
          <MessageBubble key={msg.id} message={msg} />
        ))}
        {isLoading && (
          <div className="flex items-center gap-2 text-gray-500">
            <div className="animate-spin w-4 h-4 border-2 border-gray-300 border-t-blue-500 rounded-full" />
            <span>Agent 思考中...</span>
          </div>
        )}
        {error && (
          <div className="text-red-500 bg-red-50 p-3 rounded-lg">{error}</div>
        )}
        <div ref={messagesEndRef} />
      </div>
      <div className="border-t p-4">
        <div className="flex gap-2">
          <textarea
            className="flex-1 resize-none border rounded-lg p-3 focus:outline-none focus:ring-2 focus:ring-blue-500"
            placeholder="输入消息..."
            rows={1}
            onKeyDown={(e) => {
              if (e.key === 'Enter' && !e.shiftKey) {
                e.preventDefault();
                const target = e.target as HTMLTextAreaElement;
                if (target.value.trim()) {
                  sendMessage(target.value.trim());
                  target.value = '';
                }
              }
            }}
          />
          {isLoading ? (
            <button onClick={stopGeneration}
              className="px-4 py-2 bg-red-500 text-white rounded-lg hover:bg-red-600">停止</button>
          ) : (
            <button onClick={() => {
              const textarea = document.querySelector('textarea');
              if (textarea?.value.trim()) {
                sendMessage(textarea.value.trim());
                textarea.value = '';
              }
            }} className="px-4 py-2 bg-blue-500 text-white rounded-lg hover:bg-blue-600">发送</button>
          )}
        </div>
      </div>
    </div>
  );
}

// 消息气泡组件 - 处理工具调用展示
function MessageBubble({ message }: { message: AgentMessage }) {
  const isUser = message.role === 'user';
  return (
    <div className={`flex ${isUser ? 'justify-end' : 'justify-start'}`}>
      <div className={`max-w-[80%] rounded-lg p-4 ${isUser ? 'bg-blue-500 text-white' : 'bg-gray-100 text-gray-900'}`}>
        {message.toolCalls?.map((call) => (
          <div key={call.id} className="mb-2 p-2 bg-white/10 rounded border border-white/20">
            <div className="flex items-center gap-2 text-sm">
              <span className="font-mono">{call.name}</span>
              <ToolCallStatus status={call.status} />
            </div>
            <pre className="mt-1 text-xs overflow-x-auto">
              {JSON.stringify(call.arguments, null, 2)}
            </pre>
          </div>
        ))}
        <div className="whitespace-pre-wrap">{message.content}</div>
      </div>
    </div>
  );
}

function ToolCallStatus({ status }: { status: ToolCall['status'] }) {
  const config = {
    pending: { icon: '⏳', text: '等待中' },
    running: { icon: '⚡', text: '执行中' },
    completed: { icon: '✅', text: '已完成' },
    failed: { icon: '❌', text: '失败' },
  };
  const { icon, text } = config[status];
  return <span>{icon} {text}</span>;
}

42.2.3 WebSocket 实时通信

Agent 产品的核心交互特性——流式输出、工具调用状态更新、实时协作——都依赖于高效的实时通信。WebSocket 是实现这些功能的基础协议。

python
# backend/websocket_manager.py - WebSocket 连接管理
import asyncio
import json
import uuid
from typing import Dict, Set
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class WSClient:
    """WebSocket 客户端连接"""
    client_id: str
    user_id: str
    session_id: str
    websocket: object
    connected_at: datetime = field(default_factory=datetime.now)
    last_heartbeat: datetime = field(default_factory=datetime.now)

class WebSocketManager:
    """WebSocket 连接管理器"""

    def __init__(self):
        self._clients: Dict[str, WSClient] = {}
        self._session_clients: Dict[str, Set[str]] = {}
        self._user_clients: Dict[str, Set[str]] = {}
        self._heartbeat_interval = 30

    async def connect(self, websocket, user_id: str, session_id: str) -> str:
        client_id = str(uuid.uuid4())
        client = WSClient(client_id=client_id, user_id=user_id,
                          session_id=session_id, websocket=websocket)
        self._clients[client_id] = client
        if session_id not in self._session_clients:
            self._session_clients[session_id] = set()
        self._session_clients[session_id].add(client_id)
        if user_id not in self._user_clients:
            self._user_clients[user_id] = set()
        self._user_clients[user_id].add(client_id)
        return client_id

    async def disconnect(self, client_id: str):
        client = self._clients.pop(client_id, None)
        if not client:
            return
        if client.session_id in self._session_clients:
            self._session_clients[client.session_id].discard(client_id)
            if not self._session_clients[client.session_id]:
                del self._session_clients[client.session_id]
        if client.user_id in self._user_clients:
            self._user_clients[client.user_id].discard(client_id)
            if not self._user_clients[client.user_id]:
                del self._user_clients[client.user_id]

    async def send_to_session(self, session_id: str, message: dict):
        if session_id not in self._session_clients:
            return
        disconnected = []
        for client_id in self._session_clients[session_id]:
            client = self._clients.get(client_id)
            if client:
                try:
                    await client.websocket.send_json(message)
                except Exception:
                    disconnected.append(client_id)
        for cid in disconnected:
            await self.disconnect(cid)

    async def send_to_user(self, user_id: str, message: dict):
        if user_id not in self._user_clients:
            return
        disconnected = []
        for client_id in self._user_clients[user_id]:
            client = self._clients.get(client_id)
            if client:
                try:
                    await client.websocket.send_json(message)
                except Exception:
                    disconnected.append(client_id)
        for cid in disconnected:
            await self.disconnect(cid)

    async def broadcast(self, message: dict, exclude_client: str = None):
        disconnected = []
        for client_id, client in self._clients.items():
            if client_id == exclude_client:
                continue
            try:
                await client.websocket.send_json(message)
            except Exception:
                disconnected.append(client_id)
        for cid in disconnected:
            await self.disconnect(cid)


# 消息类型定义
class MessageType:
    AGENT_CHUNK = "agent.chunk"
    AGENT_COMPLETE = "agent.complete"
    AGENT_ERROR = "agent.error"
    TOOL_CALL_START = "tool.call.start"
    TOOL_CALL_RESULT = "tool.call.result"
    TOOL_CALL_ERROR = "tool.call.error"
    SESSION_CREATED = "session.created"
    SESSION_UPDATED = "session.updated"
    TYPING_START = "status.typing.start"
    TYPING_STOP = "status.typing.stop"
    HEARTBEAT = "system.heartbeat"
    HEARTBEAT_ACK = "system.heartbeat.ack"
python
# backend/routes/chat_ws.py - WebSocket 聊天路由
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json

router = APIRouter()

@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str,
                         token: str = None):
    user = await verify_ws_token(token)
    if not user:
        await websocket.close(code=4001, reason="Unauthorized")
        return
    await websocket.accept()
    client_id = await ws_manager.connect(websocket, user.id, session_id)

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            msg_type = message.get("type")

            if msg_type == MessageType.HEARTBEAT:
                await websocket.send_json({
                    "type": MessageType.HEARTBEAT_ACK,
                    "timestamp": datetime.now().isoformat(),
                })
                ws_manager._clients[client_id].last_heartbeat = datetime.now()

            elif msg_type == "chat.message":
                content = message.get("content", "")
                model = message.get("model", "default")
                asyncio.create_task(
                    process_agent_stream(session_id, user.id, content,
                                         model, ws_manager)
                )

            elif msg_type == MessageType.STOP_GENERATION:
                await cancel_stream(session_id)

    except WebSocketDisconnect:
        await ws_manager.disconnect(client_id)
    except Exception as e:
        await ws_manager.disconnect(client_id)


async def process_agent_stream(session_id, user_id, content, model, ws_manager):
    try:
        await ws_manager.send_to_session(session_id, {
            "type": MessageType.TYPING_START,
            "data": {"session_id": session_id},
        })
        async for event in agent_engine.stream_chat(
            session_id=session_id, user_id=user_id,
            content=content, model=model,
        ):
            event_type = event.get("type")
            if event_type == "text_delta":
                await ws_manager.send_to_session(session_id, {
                    "type": MessageType.AGENT_CHUNK,
                    "data": {"content": event["content"], "message_id": event["message_id"]},
                })
            elif event_type == "tool_call":
                await ws_manager.send_to_session(session_id, {
                    "type": MessageType.TOOL_CALL_START,
                    "data": {"call_id": event["call_id"], "name": event["name"],
                             "arguments": event["arguments"]},
                })
            elif event_type == "tool_result":
                await ws_manager.send_to_session(session_id, {
                    "type": MessageType.TOOL_CALL_RESULT,
                    "data": {"call_id": event["call_id"], "content": event["content"]},
                })

        await ws_manager.send_to_session(session_id, {
            "type": MessageType.AGENT_COMPLETE,
            "data": {"session_id": session_id},
        })
        await ws_manager.send_to_session(session_id, {
            "type": MessageType.TYPING_STOP,
            "data": {"session_id": session_id},
        })
    except Exception as e:
        await ws_manager.send_to_session(session_id, {
            "type": MessageType.AGENT_ERROR,
            "data": {"message": str(e)},
        })

42.2.4 SSE(Server-Sent Events)作为替代方案

在某些场景下,SSE 是比 WebSocket 更简单的单向推送方案:

特性SSEWebSocket
通信方向单向(服务端→客户端)双向
协议HTTPWS/WSS
自动重连浏览器内置需手动实现
复杂度简单较高
适用场景流式输出、通知推送实时聊天、协作编辑
代理兼容好(标准HTTP)可能被拦截

对于 Agent 产品,推荐组合使用:SSE 处理流式响应(单向推送),WebSocket 处理实时协作(双向通信)。

typescript
// SSE 客户端实现
export function useSSEAgent(apiUrl: string) {
  const [messages, setMessages] = useState<AgentMessage[]>([]);
  const [isLoading, setIsLoading] = useState(false);

  const sendMessage = async (content: string) => {
    const userMsg: AgentMessage = {
      id: crypto.randomUUID(), role: 'user',
      content, timestamp: Date.now(),
    };
    setMessages(prev => [...prev, userMsg]);
    setIsLoading(true);
    const assistantMsg: AgentMessage = {
      id: crypto.randomUUID(), role: 'assistant',
      content: '', timestamp: Date.now(),
    };
    setMessages(prev => [...prev, assistantMsg]);

    try {
      const response = await fetch(`${apiUrl}/chat/stream`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages: [...messages, userMsg] }),
      });
      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let accumulated = '';
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        const text = decoder.decode(value, { stream: true });
        const events = text.split('\n\n');
        for (const event of events) {
          const dataLine = event.split('\n').find(l => l.startsWith('data: '));
          if (!dataLine) continue;
          const jsonStr = dataLine.slice(6);
          if (jsonStr === '[DONE]') break;
          const parsed = JSON.parse(jsonStr);
          if (parsed.content) {
            accumulated += parsed.content;
            setMessages(prev =>
              prev.map(m => m.id === assistantMsg.id ? { ...m, content: accumulated } : m)
            );
          }
        }
      }
    } finally {
      setIsLoading(false);
    }
  };
  return { messages, isLoading, sendMessage };
}

42.3 移动端部署

42.3.1 移动端 Agent 的技术选型

移动端 Agent 部署的核心挑战在于:移动设备的算力有限,无法在本地运行大模型推理,因此必须采用云端推理 + 轻量客户端的架构。

技术方案对比:

方案优势劣势适合场景
React Native与 Web 端共享业务逻辑原生体验略差已有 React 技术栈的团队
Flutter性能好、UI 一致性高Dart 生态较小新项目、追求极致 UI
原生 iOS + Android最佳性能和体验开发成本翻倍高端产品、深度原生集成
Capacitor/Ionic快速从 Web 转化性能有上限简单 Agent 展示类应用
小程序零安装、低门槛能力受限中国市场、轻量级工具

42.3.2 React Native Agent 客户端实现

typescript
// mobile/src/services/AgentService.ts - Agent 服务层
import { Platform } from 'react-native';

interface AgentConfig {
  baseUrl: string;
  model: string;
  apiKey: string;
}

class AgentService {
  private config: AgentConfig;
  private abortController: AbortController | null = null;

  constructor(config: AgentConfig) { this.config = config; }

  async *sendMessageStream(
    messages: Array<{ role: string; content: string }>,
  ): AsyncGenerator<{
    type: 'text_delta' | 'tool_call' | 'tool_result' | 'done' | 'error';
    content?: string; data?: any;
  }> {
    const controller = new AbortController();
    this.abortController = controller;

    try {
      const response = await fetch(`${this.config.baseUrl}/chat/stream`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${this.config.apiKey}`,
        },
        body: JSON.stringify({
          messages, model: this.config.model,
          platform: Platform.OS,
        }),
        signal: controller.signal,
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';
        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          const data = line.slice(6).trim();
          if (data === '[DONE]') { yield { type: 'done' }; return; }
          try {
            const parsed = JSON.parse(data);
            yield parsed;
          } catch { /* skip */ }
        }
      }
    } catch (err: any) {
      if (err.name !== 'AbortError') yield { type: 'error', content: err.message };
    }
  }

  stopGeneration() { this.abortController?.abort(); }

  // 语音转文字(移动端特有能力)
  async transcribeAudio(audioUri: string): Promise<string> {
    const formData = new FormData();
    formData.append('audio', {
      uri: audioUri, type: 'audio/m4a', name: 'recording.m4a',
    } as any);
    const response = await fetch(`${this.config.baseUrl}/audio/transcribe`, {
      method: 'POST',
      headers: { 'Authorization': `Bearer ${this.config.apiKey}` },
      body: formData,
    });
    const result = await response.json();
    return result.text;
  }

  // 图片识别(移动端拍照后调用)
  async analyzeImage(imageUri: string, prompt: string): Promise<string> {
    const formData = new FormData();
    formData.append('image', {
      uri: imageUri, type: 'image/jpeg', name: 'photo.jpg',
    } as any);
    formData.append('prompt', prompt);
    const response = await fetch(`${this.config.baseUrl}/vision/analyze`, {
      method: 'POST',
      headers: { 'Authorization': `Bearer ${this.config.apiKey}` },
      body: formData,
    });
    const result = await response.json();
    return result.content;
  }
}

42.3.3 移动端特有的 Agent 能力

移动端相比 Web 端有一些独特的能力可以利用:

1. 语音交互

  • 语音输入:录音 → 语音转文字 → 发送给 Agent
  • 语音输出:Agent 文本 → 文字转语音 → 播放
  • 语音唤醒:"嘿,助手" 唤醒 Agent

2. 相机能力

  • 拍照识别:OCR、物体识别、文档扫描
  • 实时取景:AR 场景下的 Agent 辅助

3. 位置服务

  • 基于位置的 Agent 建议
  • 地理围栏触发的自动化

4. 推送通知

  • Agent 任务完成通知
  • 定时提醒、关键信息告警
typescript
// mobile/src/services/CapabilityBridge.ts - 能力桥接层
import { Platform } from 'react-native';
import { Linking, Notifications } from 'react-native';

/**
 * 能力桥接层:统一封装不同平台的特有能力
 * Agent 工具调用时通过此层访问设备能力
 */
class CapabilityBridge {
  private handlers: Map<string, Function> = new Map();

  registerCapability(name: string, handler: Function) {
    this.handlers.set(name, handler);
  }

  async callCapability(name: string, params: any): Promise<any> {
    const handler = this.handlers.get(name);
    if (!handler) throw new Error(`Capability not supported: ${name}`);

    const platformCheck = await this.checkPlatformSupport(name);
    if (!platformCheck.supported) {
      return { success: false, error: `Not supported on ${Platform.OS}` };
    }
    return handler(params);
  }

  private async checkPlatformSupport(name: string) {
    const capabilities: Record<string, Record<string, boolean>> = {
      'camera.capture': { ios: true, android: true, web: true },
      'location.current': { ios: true, android: true, web: true },
      'contacts.read': { ios: true, android: true, web: false },
      'sms.send': { ios: false, android: true, web: false },
      'notification.local': { ios: true, android: true, web: true },
      'calendar.read': { ios: true, android: true, web: false },
    };
    const cap = capabilities[name];
    if (!cap) return { supported: true };
    return { supported: cap[Platform.OS] ?? false };
  }

  init() {
    this.registerCapability('phone.call', async (params: any) => {
      const { number } = params;
      const supported = await Linking.canOpenURL(`tel:${number}`);
      if (supported) { await Linking.openURL(`tel:${number}`); return { success: true }; }
      return { success: false, error: 'Cannot make phone calls' };
    });

    this.registerCapability('notification.local', async (params: any) => {
      await Notifications.scheduleNotificationAsync({
        content: { title: params.title, body: params.body, data: params.data },
        trigger: params.trigger || null,
      });
      return { success: true };
    });
  }
}

export default new CapabilityBridge();

42.3.4 离线与弱网策略

移动端网络环境不稳定,需要专门的离线/弱网策略:

python
# backend/services/offline_sync.py - 离线同步策略
from dataclasses import dataclass
from datetime import datetime
from typing import List


@dataclass
class PendingMessage:
    id: str
    user_id: str
    content: str
    timestamp: datetime
    retry_count: int = 0
    max_retries: int = 3


class OfflineSyncService:
    """离线消息同步服务"""

    def __init__(self, storage_backend):
        self.storage = storage_backend

    async def cache_message(self, user_id: str, content: str) -> str:
        """离线时缓存消息"""
        import uuid
        msg = PendingMessage(
            id=str(uuid.uuid4()),
            user_id=user_id,
            content=content,
            timestamp=datetime.now(),
        )
        await self.storage.save_pending(msg)
        return msg.id

    async def sync_pending(self, user_id: str,
                           process_fn) -> dict:
        """网络恢复时同步缓存消息"""
        pending = await self.storage.get_pending(user_id)
        sent, failed = 0, 0

        for msg in pending:
            try:
                await process_fn(msg.content)
                await self.storage.remove_pending(msg.id)
                sent += 1
            except Exception:
                msg.retry_count += 1
                if msg.retry_count >= msg.max_retries:
                    await self.storage.mark_failed(msg)
                    failed += 1
                else:
                    await self.storage.update_pending(msg)

        return {"sent": sent, "failed": failed, "remaining": len(pending) - sent - failed}

    async def get_pending_count(self, user_id: str) -> int:
        return await self.storage.count_pending(user_id)

42.4 桌面端部署

42.4.1 Electron vs Tauri

桌面端 Agent 应用适合需要深度系统集成的场景,如文件管理、IDE 集成、后台常驻服务等。

特性ElectronTauri
包大小~150MB~5-10MB
内存占用较高(Chromium 内核)低(系统 WebView)
启动速度较慢
技术栈HTML/CSS/JS前端任意 + Rust 后端
系统集成通过 Node.js通过 Rust
安全模型Node.js 完整权限最小权限原则
生态成熟度非常成熟快速增长

42.4.2 Tauri Agent 客户端实现

rust
// src-tauri/src/commands/agent.rs - Tauri Agent 命令
use tauri::{command, State};
use serde::{Deserialize, Serialize};
use std::sync::Mutex;

#[derive(Serialize, Deserialize, Clone)]
pub struct AgentMessage {
    pub id: String,
    pub role: String,
    pub content: String,
    pub timestamp: i64,
}

#[derive(Serialize, Deserialize)]
pub struct AppConfig {
    pub api_url: String,
    pub api_key: String,
    pub model: String,
    pub theme: String,
}

pub struct AppState {
    pub config: Mutex<AppConfig>,
}

#[command]
pub async fn send_chat_message(
    state: State<'_, AppState>,
    messages: Vec<AgentMessage>,
    model: String,
) -> Result<String, String> {
    let config = state.config.lock().map_err(|e| e.to_string())?;

    let client = reqwest::Client::new();
    let response = client
        .post(format!("{}/chat", config.api_url))
        .header("Authorization", format!("Bearer {}", config.api_key))
        .json(&serde_json::json!({
            "messages": messages,
            "model": model,
            "stream": false,
        }))
        .send()
        .await
        .map_err(|e| format!("Request failed: {}", e))?;

    if !response.status().is_success() {
        return Err(format!("API error: {}", response.status()));
    }

    response.text().await.map_err(|e| e.to_string())
}

#[command]
pub async fn read_local_file(path: String) -> Result<String, String> {
    let expanded = shellexpand::tilde(&path).to_string();
    tokio::fs::read_to_string(&expanded)
        .await
        .map_err(|e| format!("Failed to read file: {}", e))
}

#[command]
pub async fn execute_terminal_command(
    command: String,
    cwd: Option<String>,
) -> Result<String, String> {
    let output = if let Some(dir) = cwd {
        tokio::process::Command::new("sh")
            .arg("-c").arg(&command)
            .current_dir(shellexpand::tilde(&dir).as_ref())
            .output().await
    } else {
        tokio::process::Command::new("sh")
            .arg("-c").arg(&command)
            .output().await
    };

    match output {
        Ok(output) => Ok(serde_json::json!({
            "stdout": String::from_utf8_lossy(&output.stdout),
            "stderr": String::from_utf8_lossy(&output.stderr),
            "exit_code": output.status.code(),
        }).to_string()),
        Err(e) => Err(format!("Command failed: {}", e)),
    }
}

#[command]
pub async fn get_config(state: State<'_, AppState>) -> Result<AppConfig, String> {
    state.config.lock().map_err(|e| e.to_string()).map(|c| c.clone())
}

#[command]
pub async fn update_config(
    state: State<'_, AppState>,
    new_config: AppConfig,
) -> Result<(), String> {
    let mut config = state.config.lock().map_err(|e| e.to_string())?;
    *config = new_config;
    Ok(())
}

42.4.3 桌面端特有的 Agent 能力

桌面端相比移动端和 Web 端,有以下独特优势:

  1. 文件系统完整访问:读写任意文件、目录浏览和管理、文件监听
  2. 终端命令执行:运行 Shell 命令、获取命令输出、管理子进程
  3. 系统级集成:系统托盘常驻、全局快捷键、剪贴板访问、窗口管理
  4. 本地存储:大容量本地数据库、本地文件缓存、本地向量索引

42.5 CLI 工具

42.5.1 Agent CLI 的价值

CLI 是开发者最熟悉的工作界面。一个设计精良的 Agent CLI 可以:

  1. 融入开发者工作流:通过管道与其他工具组合
  2. 自动化脚本集成:在 CI/CD 中使用 Agent
  3. 快速原型验证:无需启动 GUI 即可测试 Agent 行为
  4. 低资源开销:终端应用资源占用极低

42.5.2 Rich + Click 构建 Agent CLI

python
# cli/main.py - Agent CLI 主入口
import click
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.live import Live
from rich.prompt import Prompt

console = Console()


@click.group()
@click.option('--api-url', default=None, help='Agent API URL')
@click.option('--model', default=None, help='Default model')
@click.pass_context
def cli(ctx, api_url, model):
    """🤖 Agent CLI - Your AI assistant in the terminal."""
    ctx.ensure_object(dict)
    ctx.obj['api_url'] = api_url
    ctx.obj['model'] = model


@cli.command()
@click.argument('message', nargs=-1, required=False)
@click.option('--session', '-s', default=None, help='Session ID')
@click.option('--stream/--no-stream', default=True, help='Stream response')
@click.pass_context
def chat(ctx, message, session, stream):
    """Chat with the Agent."""
    from agent_client import AgentClient

    api_url = ctx.obj['api_url'] or load_config().api_url
    model = ctx.obj['model'] or load_config().model
    client = AgentClient(api_url, model)

    if message:
        msg = ' '.join(message)
        console.print(f"\n[bold cyan]You:[/] {msg}\n")
        with console.status("[bold green]Agent thinking..."):
            response = client.send_message(msg, session_id=session)
        console.print(Panel(Markdown(response), title="[bold magenta]Agent[/]",
                           border_style="magenta"))
    else:
        _interactive_chat(client, session)


def _interactive_chat(client, session_id):
    """交互式对话循环"""
    console.print(Panel(
        "[bold]Interactive Chat Mode[/]\n"
        "Type your message and press Enter.\n"
        "[dim]/quit - Exit  |  /clear - Clear  |  /model - Switch  |  /export - Save[/]",
        title="🤖 Agent CLI", border_style="blue",
    ))

    history = []
    current_model = client.model

    while True:
        try:
            user_input = Prompt.ask("\n[bold cyan]You[/bold]")
        except (EOFError, KeyboardInterrupt):
            console.print("\n[dim]Goodbye! 👋[/]")
            break

        if user_input.startswith('/'):
            cmd = user_input.strip().lower()
            if cmd in ('/quit', '/exit'):
                console.print("[dim]Goodbye! 👋[/]")
                break
            elif cmd == '/clear':
                history.clear()
                console.print("[yellow]History cleared.[/]")
                continue
            elif cmd == '/model':
                current_model = Prompt.ask("Model name", default=current_model)
                console.print(f"[green]Switched to: {current_model}[/]")
                continue
            elif cmd == '/export':
                _export_chat(history)
                continue
            else:
                console.print(f"[red]Unknown: {cmd}[/]")
                continue

        history.append({"role": "user", "content": user_input})

        # 流式输出
        console.print()
        full_response = ""
        with Live(console=console, refresh_per_second=15) as live:
            for chunk in client.stream_message(user_input, history=history,
                                                model=current_model):
                if chunk.get("type") == "text_delta":
                    full_response += chunk["content"]
                    live.update(Panel(Markdown(full_response),
                                     title="[bold magenta]Agent[/]",
                                     border_style="magenta"))
                elif chunk.get("type") == "tool_call":
                    live.update(Panel(
                        f"[yellow]⚡ Calling: {chunk['name']}[/]\n"
                        f"[dim]{chunk.get('arguments', '')}[/]",
                        title="[bold magenta]Agent[/]", border_style="yellow"))

        history.append({"role": "assistant", "content": full_response})


@cli.command()
@click.argument('file_path', type=click.Path(exists=True))
@click.option('--question', '-q', help='Question about the file')
@click.pass_context
def analyze(ctx, file_path, question):
    """Analyze a file with the Agent."""
    import os
    from agent_client import AgentClient

    api_url = ctx.obj['api_url'] or load_config().api_url
    model = ctx.obj['model'] or load_config().model
    client = AgentClient(api_url, model)

    with open(file_path, 'r') as f:
        content = f.read()

    console.print(Panel(
        f"[bold]File:[/] {file_path}\n[bold]Size:[/] {os.path.getsize(file_path):,} bytes",
        title="📄 File Analysis", border_style="blue",
    ))

    ext = os.path.splitext(file_path)[1].lstrip('.')
    syntax = Syntax(content[:2000], ext, theme="monokai", line_numbers=True)
    console.print(syntax)

    prompt = question or f"分析这个文件,指出关键问题和改进建议:\n```\n{content[:5000]}\n```"
    with console.status("[bold green]Analyzing..."):
        response = client.send_message(prompt)
    console.print(Panel(Markdown(response), title="[bold magenta]Analysis[/]",
                       border_style="magenta"))


@cli.command()
@click.argument('command_text')
def exec_agent(command_text):
    """Execute an agent command with tool access."""
    from agent_client import AgentClient
    client = AgentClient(load_config().api_url, load_config().model)

    console.print(f"[bold]Executing:[/] {command_text}\n")
    with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
                  console=console) as progress:
        task = progress.add_task("Agent executing...", total=None)
        for event in client.execute_with_tools(command_text):
            if event["type"] == "tool_call":
                progress.update(task, description=f"⚡ Tool: {event['name']}")
            elif event["type"] == "tool_result":
                console.print(Panel(event["content"][:500],
                    title=f"[green]✅ {event.get('name', 'tool')}[/]",
                    border_style="green"))
            elif event["type"] == "text":
                console.print(Markdown(event["content"]))
    console.print("[bold green]✓ Done[/]")


def _export_chat(history):
    import json
    from datetime import datetime
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    with open(f"chat_{ts}.json", 'w') as f:
        json.dump(history, f, ensure_ascii=False, indent=2)
    console.print(f"[green]Exported to chat_{ts}.json[/]")


if __name__ == '__main__':
    cli()
python
# cli/agent_client.py - CLI 的 Agent 客户端
import httpx
import json
from typing import Generator, Optional, List, Dict


class AgentClient:
    def __init__(self, api_url: str, model: str, api_key: str = ""):
        self.api_url = api_url.rstrip('/')
        self.model = model
        self.api_key = api_key
        self.client = httpx.Client(timeout=120.0)

    def send_message(self, message: str, session_id: Optional[str] = None,
                     history: Optional[List[Dict]] = None,
                     model: Optional[str] = None) -> str:
        messages = history or []
        messages.append({"role": "user", "content": message})
        response = self.client.post(
            f"{self.api_url}/v1/chat/completions",
            json={"messages": messages, "model": model or self.model, "stream": False},
            headers=self._headers(),
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]

    def stream_message(self, message: str, history=None, model=None) -> Generator:
        messages = history or []
        messages.append({"role": "user", "content": message})
        with self.client.stream("POST",
            f"{self.api_url}/v1/chat/completions",
            json={"messages": messages, "model": model or self.model, "stream": True},
            headers=self._headers(),
        ) as response:
            response.raise_for_status()
            for line in response.iter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]": break
                    try:
                        parsed = json.loads(data)
                        delta = parsed["choices"][0].get("delta", {})
                        if delta.get("content"):
                            yield {"type": "text_delta", "content": delta["content"]}
                    except json.JSONDecodeError:
                        continue

    def execute_with_tools(self, command: str) -> Generator:
        messages = [
            {"role": "system", "content": "你是终端 Agent,可以执行工具完成任务。"},
            {"role": "user", "content": command},
        ]
        response = self.client.post(
            f"{self.api_url}/v1/chat/completions",
            json={"messages": messages, "model": self.model,
                  "tools": self._default_tools(), "tool_choice": "auto"},
            headers=self._headers(),
        )
        response.raise_for_status()
        data = response.json()
        choice = data["choices"][0]
        if choice.get("tool_calls"):
            for tc in choice["tool_calls"]:
                fn = tc["function"]
                yield {"type": "tool_call", "name": fn["name"],
                       "arguments": json.loads(fn["arguments"])}
                result = self._execute_tool(fn["name"], json.loads(fn["arguments"]))
                yield {"type": "tool_result", "name": fn["name"], "content": result}
        yield {"type": "text", "content": choice["message"]["content"]}

    def _default_tools(self) -> list:
        return [
            {"type": "function", "function": {
                "name": "read_file", "description": "Read file contents",
                "parameters": {"type": "object", "properties": {"path": {"type": "string"}},
                               "required": ["path"]}}},
            {"type": "function", "function": {
                "name": "write_file", "description": "Write content to file",
                "parameters": {"type": "object",
                    "properties": {"path": {"type": "string"}, "content": {"type": "string"}},
                    "required": ["path", "content"]}}},
            {"type": "function", "function": {
                "name": "run_command", "description": "Execute shell command",
                "parameters": {"type": "object",
                    "properties": {"command": {"type": "string"}},
                    "required": ["command"]}}},
        ]

    def _execute_tool(self, name: str, args: dict) -> str:
        import subprocess
        if name == "read_file":
            with open(args["path"], "r") as f: return f.read()
        elif name == "write_file":
            with open(args["path"], "w") as f: f.write(args["content"])
            return f"Written to {args['path']}"
        elif name == "run_command":
            result = subprocess.run(args["command"], shell=True, capture_output=True, text=True)
            return result.stdout or result.stderr
        return f"Unknown tool: {name}"

    def _headers(self) -> dict:
        h = {"Content-Type": "application/json"}
        if self.api_key: h["Authorization"] = f"Bearer {self.api_key}"
        return h

42.5.3 CLI 使用示例

bash
# 单条消息
$ agent chat "解释 MCP 协议"

# 交互式对话
$ agent chat

# 分析文件
$ agent analyze main.py -q "找出潜在内存泄漏"

# 管道操作
$ cat error.log | agent chat "分析这些错误日志"

# Agent 执行(带工具调用)
$ agent exec-agent "重构项目,添加类型注解"

42.6 IDE 插件

42.6.1 IDE Agent 插件的价值

IDE 是开发者每天工作时间最长的环境。将 Agent 集成到 IDE 中,可以实现:

  1. 上下文感知:自动获取当前文件、选区、项目结构作为 Agent 输入
  2. 无缝集成:不需要切换窗口,直接在编码环境中使用 Agent
  3. 工作流增强:代码生成、重构、调试、测试等编码活动都能被 Agent 增强

42.6.2 VS Code Extension API 开发

typescript
// src/extension.ts - VS Code Agent 插件入口
import * as vscode from 'vscode';
import { AgentService } from './services/agentService';
import { ChatViewProvider } from './views/chatViewProvider';
import { InlineCompletionProvider } from './providers/inlineCompletionProvider';
import { CodeActionProvider } from './providers/codeActionProvider';

export function activate(context: vscode.ExtensionContext) {
    const agentService = new AgentService();

    // 注册侧边栏 Chat 视图
    const chatProvider = new ChatViewProvider(context.extensionUri, agentService);
    context.subscriptions.push(
        vscode.window.registerWebviewViewProvider('agentChatView', chatProvider)
    );

    // 注册内联补全 Provider
    const inlineProvider = new InlineCompletionProvider(agentService);
    context.subscriptions.push(
        vscode.languages.registerInlineCompletionItemProvider({ pattern: '**' }, inlineProvider)
    );

    // 注册 Code Action Provider
    const codeActionProvider = new CodeActionProvider(agentService);
    context.subscriptions.push(
        vscode.languages.registerCodeActionsProvider(
            { pattern: '**' }, codeActionProvider,
            { providedCodeActionKinds: [vscode.CodeActionKind.Refactor] }
        )
    );

    // 快捷命令
    context.subscriptions.push(
        vscode.commands.registerTextEditorCommand('agent.explainSelection', async (editor) => {
            const selection = editor.document.getText(editor.selection);
            if (!selection) return;
            const explanation = await agentService.explainCode(selection, editor.document.languageId);
            const doc = await vscode.workspace.openTextDocument({
                content: explanation, language: 'markdown',
            });
            await vscode.window.showTextDocument(doc, { preview: true });
        }),

        vscode.commands.registerTextEditorCommand('agent.refactorSelection', async (editor, edit) => {
            const selection = editor.document.getText(editor.selection);
            if (!selection) return;
            const refactored = await agentService.refactorCode(selection, editor.document.languageId);
            edit.replace(editor.selection, refactored);
        }),

        vscode.commands.registerTextEditorCommand('agent.addTests', async (editor, edit) => {
            const selection = editor.document.getText(editor.selection);
            if (!selection) return;
            const tests = await agentService.generateTests(selection, editor.document.languageId);
            const lastLine = editor.document.lineCount;
            editor.edit((eb) => eb.insert(new vscode.Position(lastLine, 0), '\n\n' + tests));
        }),

        vscode.commands.registerCommand('agent.openChat', () => {
            vscode.commands.executeCommand('agentChatView.focus');
        }),

        vscode.commands.registerCommand('agent.sendFileToChat', () => {
            const editor = vscode.window.activeTextEditor;
            if (editor) chatProvider.sendFile(editor.document);
        }),
    );
}

export function deactivate() {}
typescript
// src/providers/inlineCompletionProvider.ts - 内联补全
import * as vscode from 'vscode';
import { AgentService } from '../services/agentService';

export class InlineCompletionProvider implements vscode.InlineCompletionItemProvider {
    constructor(private agentService: AgentService) {}

    async provideInlineCompletionItems(
        document: vscode.TextDocument,
        position: vscode.Position,
        context: vscode.InlineCompletionContext,
        token: vscode.CancellationToken,
    ): Promise<vscode.InlineCompletionItem[]> {
        // 获取光标前 20 行上下文
        const prefix = document.getText(
            new vscode.Range(
                new vscode.Position(Math.max(0, position.line - 20), 0),
                position
            )
        );

        try {
            const completion = await this.agentService.getInlineCompletion(
                prefix, document.languageId, token,
            );
            if (!completion || token.isCancellationRequested) return [];
            return [new vscode.InlineCompletionItem(completion,
                new vscode.Range(position, position))];
        } catch {
            return [];
        }
    }
}

42.6.3 JetBrains 插件开发

JetBrains IDE(IntelliJ、PyCharm、WebStorm 等)也可以使用 Kotlin + IntelliJ Platform SDK 开发插件:

kotlin
// src/main/kotlin/com/agent/jetbrains/AgentAction.kt
package com.agent.jetbrains

import com.intellij.notification.NotificationGroupManager
import com.intellij.notification.NotificationType
import com.intellij.openapi.actionSystem.AnAction
import com.intellij.openapi.actionSystem.AnActionEvent
import com.intellij.openapi.actionSystem.CommonDataKeys
import com.intellij.openapi.progress.ProgressManager
import com.intellij.openapi.progress.Task

class ExplainCodeAction : AnAction() {
    override fun actionPerformed(e: AnActionEvent) {
        val editor = e.getData(CommonDataKeys.EDITOR) ?: return
        val project = e.project ?: return
        val selectedText = editor.selectionModel.selectedText ?: return
        val language = editor.document.fileType.name

        ProgressManager.getInstance().run(object : Task.Backgroundable(
            project, "Agent Analyzing", true
        ) {
            override fun run(indicator: com.intellij.openapi.progress.ProgressIndicator) {
                indicator.text = "Agent analyzing..."
                indicator.fraction = 0.3
                val service = AgentService.getInstance(project)
                val explanation = service.explainCode(selectedText, language)
                indicator.fraction = 1.0
                NotificationGroupManager.getInstance()
                    .getNotificationGroup("Agent")
                    .createNotification("Code Explanation", explanation, NotificationType.INFORMATION)
                    .notify(project)
            }
        })
    }
}

42.7 微信生态

42.7.1 微信生态的特殊性

微信生态在中国市场具有不可替代的地位。将 Agent 接入微信生态,可以:

  1. 零门槛触达:用户无需安装新应用
  2. 社交传播:通过群聊、朋友圈实现病毒式传播
  3. 企业场景:企业微信已成为企业标配
  4. 支付闭环:微信支付实现商业闭环

42.7.2 微信小程序 Agent 集成

javascript
// miniprogram/pages/chat/chat.js - 小程序聊天页面
const app = getApp();

Page({
  data: {
    messages: [],
    inputValue: '',
    isLoading: false,
    scrollToBottom: false,
  },

  onLoad() { this.loadHistory(); },

  loadHistory() {
    const history = wx.getStorageSync('chat_history') || [];
    this.setData({ messages: history });
  },

  onInputChange(e) { this.setData({ inputValue: e.detail.value }); },

  async sendMessage() {
    const { inputValue, messages } = this.data;
    if (!inputValue.trim() || this.data.isLoading) return;

    const userMsg = {
      id: Date.now().toString(),
      role: 'user',
      content: inputValue.trim(),
      time: new Date().toLocaleTimeString(),
    };

    const newMessages = [...messages, userMsg];
    this.setData({ messages: newMessages, inputValue: '', isLoading: true, scrollToBottom: true });

    try {
      const response = await this.callAgentAPI(newMessages);
      const assistantMsg = {
        id: (Date.now() + 1).toString(),
        role: 'assistant',
        content: response,
        time: new Date().toLocaleTimeString(),
      };
      const allMessages = [...newMessages, assistantMsg];
      this.setData({ messages: allMessages, isLoading: false, scrollToBottom: true });
      wx.setStorageSync('chat_history', allMessages.slice(-50));
    } catch (err) {
      wx.showToast({ title: '请求失败', icon: 'none' });
      this.setData({ isLoading: false });
    }
  },

  callAgentAPI(messages) {
    return new Promise((resolve, reject) => {
      wx.request({
        url: `${app.globalData.apiBaseUrl}/chat`,
        method: 'POST',
        header: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${wx.getStorageSync('token')}`,
        },
        data: {
          messages: messages.map(m => ({ role: m.role, content: m.content })),
          platform: 'miniprogram',
        },
        success: (res) => {
          if (res.statusCode === 200) resolve(res.data.content);
          else reject(new Error(res.data.message));
        },
        fail: reject,
      });
    });
  },

  clearChat() {
    wx.showModal({
      title: '确认清空',
      content: '确定清空聊天记录?',
      success: (res) => {
        if (res.confirm) {
          this.setData({ messages: [] });
          wx.removeStorageSync('chat_history');
        }
      },
    });
  },
});
xml
<!-- miniprogram/pages/chat/chat.wxml -->
<view class="chat-container">
  <scroll-view class="message-list" scroll-y
    scroll-into-view="{{scrollToBottom ? 'msg-bottom' : ''}}" scroll-with-animation>
    <view wx:for="{{messages}}" wx:key="id" class="message-wrapper {{item.role}}">
      <view class="avatar">
        <image wx:if="{{item.role === 'user'}}" src="/images/user.png" mode="aspectFill" />
        <view wx:else class="agent-avatar">🤖</view>
      </view>
      <view class="message-content">
        <view class="message-bubble {{item.role}}">{{item.content}}</view>
        <text class="message-time">{{item.time}}</text>
      </view>
    </view>
    <view wx:if="{{isLoading}}" class="loading-indicator">
      <view class="typing-dots"><view class="dot"></view><view class="dot"></view><view class="dot"></view></view>
    </view>
    <view id="msg-bottom"></view>
  </scroll-view>
  <view class="input-area">
    <input class="message-input" value="{{inputValue}}" bindinput="onInputChange"
      placeholder="输入消息..." bindconfirm="sendMessage" confirm-type="send" />
    <button class="send-btn" bindtap="sendMessage"
      disabled="{{!inputValue || isLoading}}">发送</button>
  </view>
</view>

42.7.3 企业微信机器人

企业微信机器人是将 Agent 接入企业工作群的有效方式:

python
# backend/integrations/wecom_bot.py - 企业微信机器人集成
import hmac
import hashlib
import base64
import time
import urllib.parse
import httpx
from typing import Optional
from dataclasses import dataclass


@dataclass
class WecomBotConfig:
    webhook_url: str
    secret: Optional[str] = None


class WecomBotClient:
    """企业微信机器人客户端"""

    def __init__(self, config: WecomBotConfig):
        self.config = config
        self.client = httpx.Client(timeout=30.0)

    def _sign(self) -> Optional[str]:
        if not self.config.secret:
            return None
        timestamp = str(int(time.time()))
        string_to_sign = f"{timestamp}\n{self.config.secret}"
        hmac_code = hmac.new(
            self.config.secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256,
        ).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        return f"&timestamp={timestamp}&sign={sign}"

    def send_text(self, content: str, mentioned_list: list = None):
        payload = {
            "msgtype": "text",
            "text": {"content": content, "mentioned_list": mentioned_list or []},
        }
        return self._send(payload)

    def send_markdown(self, content: str):
        payload = {"msgtype": "markdown", "markdown": {"content": content}}
        return self._send(payload)

    def send_agent_response(self, question: str, answer: str):
        content = f"**🤖 Agent 响应**\n\n> {question}\n\n{answer}\n\n---\n*由 Agent 自动生成*"
        return self.send_markdown(content)

    def _send(self, payload: dict):
        url = self.config.webhook_url
        sign = self._sign()
        if sign:
            url += sign
        response = self.client.post(url, json=payload)
        response.raise_for_status()
        return response.json()


# FastAPI 回调集成
from fastapi import FastAPI, Request
import asyncio

app = FastAPI()
wecom_bot = WecomBotClient(WecomBotConfig(
    webhook_url="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx",
))


@app.post("/wecom/callback")
async def wecom_callback(request: Request):
    body = await request.json()
    if body.get("MsgType") != "text":
        return {"errcode": 0, "errmsg": "ok"}

    content = body.get("Content", "").strip()
    if not content.startswith("@bot"):
        return {"errcode": 0, "errmsg": "ok"}

    question = content.replace("@bot", "").strip()
    if not question:
        return {"errcode": 0, "errmsg": "ok"}

    asyncio.create_task(_handle_wecom_question(question))
    return {"errcode": 0, "errmsg": "ok"}


async def _handle_wecom_question(question: str):
    try:
        response = await agent_engine.chat(question)
        wecom_bot.send_agent_response(question, response)
    except Exception as e:
        wecom_bot.send_text(f"Agent 处理失败:{str(e)}")

42.8 跨平台架构设计

42.8.1 统一数据模型

跨平台部署的关键是定义一套统一的数据模型,确保各端对核心数据结构的理解一致:

typescript
// packages/shared/src/models.ts - 跨平台共享模型

export interface Message {
  id: string;
  sessionId: string;
  role: 'user' | 'assistant' | 'system' | 'tool';
  content: string;
  createdAt: string; // ISO 8601
  metadata: MessageMetadata;
}

export interface MessageMetadata {
  model?: string;
  tokens?: TokenUsage;
  duration?: number;
  toolCalls?: ToolCallInfo[];
  platform?: string;
  parentId?: string;
}

export interface TokenUsage {
  prompt: number;
  completion: number;
  total: number;
}

export interface ToolCallInfo {
  id: string;
  name: string;
  arguments: Record<string, unknown>;
  result?: string;
  duration?: number;
  isError?: boolean;
}

export interface Session {
  id: string;
  title: string;
  messages: Message[];
  model: string;
  systemPrompt?: string;
  createdAt: string;
  updatedAt: string;
  settings: SessionSettings;
}

export interface SessionSettings {
  temperature: number;
  maxTokens: number;
  topP: number;
  toolsEnabled: boolean;
  contextWindow: 'auto' | number;
}

export interface ChatRequest {
  messages: Array<{ role: string; content: string }>;
  model?: string;
  stream?: boolean;
  sessionId?: string;
  platform?: string;
  tools?: Tool[];
  temperature?: number;
  maxTokens?: number;
}

export interface StreamEvent {
  type: 'text_delta' | 'tool_call' | 'tool_result' | 'done' | 'error';
  id?: string;
  content?: string;
  data?: any;
  timestamp: string;
}

export interface PlatformCapabilities {
  id: string;
  name: string;
  supportsVoiceInput: boolean;
  supportsVoiceOutput: boolean;
  supportsFileAccess: boolean;
  supportsCamera: boolean;
  supportsLocation: boolean;
  supportsNotifications: boolean;
  supportsClipboard: boolean;
  maxUploadSize: number;
  supportedFileTypes: string[];
}

42.8.2 统一 API 网关

所有平台通过统一的 API 网关与后端通信:

python
# backend/gateway/router.py - 统一 API 网关
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import json

app = FastAPI(title="Agent API Gateway", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:3000",
        "https://app.your-agent.com",
        "capacitor://localhost",
        "tauri://localhost",
    ],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.middleware("http")
async def platform_aware_middleware(request: Request, call_next):
    """识别请求来源平台"""
    platform = request.headers.get("X-Platform", "unknown")

    ua = request.headers.get("User-Agent", "")
    if "MicroMessenger" in ua:
        platform = "wechat_miniprogram"
    elif "Electron" in ua:
        platform = "electron_desktop"
    elif "Tauri" in ua:
        platform = "tauri_desktop"

    request.state.platform = platform
    response = await call_next(request)
    response.headers["X-Platform"] = platform
    return response


@app.post("/api/v1/chat")
async def chat(request: ChatRequest, req: Request):
    """统一聊天端点 - 支持流式和非流式"""
    platform = getattr(req.state, 'platform', 'unknown')

    # 平台感知的参数调整
    if platform == "wechat_miniprogram":
        request.stream = False
        request.maxTokens = min(request.maxTokens or 4096, 2048)

    if request.stream:
        return StreamingResponse(
            _stream_response(request, platform),
            media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
        )
    else:
        return await _sync_response(request, platform)


async def _stream_response(request, platform):
    async for event in agent_engine.stream_chat(
        messages=request.messages, model=request.model,
        session_id=request.sessionId, platform=platform,
    ):
        yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
    yield "data: [DONE]\n\n"

42.8.3 跨平台状态同步

用户在不同设备间的会话状态需要无缝同步。核心同步策略包括:

1. 增量同步

  • 每条新消息作为增量事件推送到所有在线设备
  • 离线设备重新上线后,拉取增量消息

2. 冲突解决

  • 基于时间戳的 Last-Write-Wins 策略
  • 编辑类操作使用 Operational Transformation (OT)

3. 会话一致性

  • 统一的 Session ID 在所有平台间共享
  • 通过 QR 码或链接实现跨设备会话迁移
python
# backend/services/sync_service.py - 跨平台同步服务
from datetime import datetime
from typing import Dict, Optional
from dataclasses import dataclass


@dataclass
class SyncEvent:
    event_id: str
    user_id: str
    session_id: str
    event_type: str  # 'message_new', 'session_update', 'settings_change'
    data: dict
    timestamp: datetime
    sequence: int  # 单调递增序列号


class SyncService:
    """跨平台状态同步服务"""

    def __init__(self, ws_manager, storage):
        self.ws_manager = ws_manager
        self.storage = storage
        self._sequences: Dict[str, int] = {}  # session_id -> sequence

    async def on_message_created(self, user_id: str, session_id: str, message: dict):
        """新消息创建时同步到所有在线设备"""
        seq = self._next_sequence(session_id)

        event = SyncEvent(
            event_id=message["id"],
            user_id=user_id,
            session_id=session_id,
            event_type="message_new",
            data={"message": message},
            timestamp=datetime.now(),
            sequence=seq,
        )

        # 实时推送到在线设备
        await self.ws_manager.send_to_user(user_id, {
            "type": "sync.event",
            "event": {
                "event_id": event.event_id,
                "event_type": event.event_type,
                "data": event.data,
                "sequence": event.sequence,
            },
        })

        # 持久化到同步日志
        await self.storage.save_sync_event(event)

    async def get_missing_events(self, user_id: str, session_id: str,
                                  since_sequence: int) -> list:
        """获取设备缺失的同步事件"""
        events = await self.storage.get_sync_events(
            session_id=session_id,
            since_sequence=since_sequence,
        )
        return events

    async def resolve_conflict(self, event_1: SyncEvent, event_2: SyncEvent) -> SyncEvent:
        """解决同步冲突"""
        # Last-Write-Wins 策略
        if event_1.timestamp > event_2.timestamp:
            return event_1
        return event_2

    def _next_sequence(self, session_id: str) -> int:
        current = self._sequences.get(session_id, 0)
        next_seq = current + 1
        self._sequences[session_id] = next_seq
        return next_seq

42.8.4 跨平台测试策略

python
# tests/test_cross_platform.py - 跨平台测试
import pytest
from unittest.mock import AsyncMock, MagicMock


class TestCrossPlatformAPI:
    """跨平台 API 测试"""

    @pytest.mark.parametrize("platform,expected_max_tokens", [
        ("web", 4096),
        ("ios", 4096),
        ("android", 4096),
        ("wechat_miniprogram", 2048),  # 小程序限制更小
        ("tauri_desktop", 8192),
        ("electron_desktop", 8192),
    ])
    async def test_platform_aware_limits(self, platform, expected_max_tokens):
        """测试平台感知的参数限制"""
        request = ChatRequest(maxTokens=8192)
        adjusted = apply_platform_limits(request, platform)
        assert adjusted.maxTokens == expected_max_tokens

    @pytest.mark.parametrize("platform", ["web", "ios", "android", "desktop"])
    async def test_unified_data_model(self, platform):
        """测试统一数据模型在各平台的一致性"""
        message = Message(
            id="test-123",
            sessionId="sess-456",
            role="user",
            content="Hello",
            createdAt=datetime.now().isoformat(),
            metadata={"platform": platform},
        )
        # 序列化和反序列化应保持一致
        serialized = message.model_dump_json()
        deserialized = Message.model_validate_json(serialized)
        assert deserialized.id == message.id
        assert deserialized.content == message.content

    async def test_websocket_multi_device(self):
        """测试多设备 WebSocket 连接"""
        ws_manager = WebSocketManager()

        # 模拟两个设备连接
        ws1 = AsyncMock()
        ws2 = AsyncMock()

        await ws_manager.connect(ws1, "user-1", "session-1")
        await ws_manager.connect(ws2, "user-1", "session-1")

        # 发送消息到 session
        await ws_manager.send_to_session("session-1", {"type": "test", "data": "hello"})

        # 两个设备都应收到
        ws1.send_json.assert_called_once()
        ws2.send_json.assert_called_once()

    async def test_offline_sync_recovery(self):
        """测试离线同步恢复"""
        sync_service = SyncService(
            ws_manager=AsyncMock(),
            storage=AsyncMock(),
        )

        # 模拟设备 A 在线时产生的消息
        await sync_service.on_message_created("user-1", "sess-1", {"id": "msg-1"})
        await sync_service.on_message_created("user-1", "sess-1", {"id": "msg-2"})

        # 设备 B 上线后拉取缺失事件
        sync_service.storage.get_sync_events.return_value = [
            {"event_id": "msg-1", "sequence": 1},
            {"event_id": "msg-2", "sequence": 2},
        ]

        missing = await sync_service.get_missing_events("user-1", "sess-1", since_sequence=0)
        assert len(missing) == 2

42.8.5 跨平台最佳实践总结

实践说明
共享类型定义使用 monorepo 管理跨平台共享的类型定义和工具函数
平台适配层为每个平台的特有能力建立适配层(Capability Bridge)
渐进式增强核心功能全平台一致,高级功能按平台增强
统一认证使用 OAuth 2.0 + JWT 实现跨平台统一认证
API 版本管理使用 URL 版本前缀(/v1/、/v2/)确保向后兼容
统一监控所有平台的请求都携带平台标识,便于分平台监控和分析
灰度发布按平台维度进行功能灰度,降低跨平台发布风险
性能预算为每个平台设定性能预算(包大小、启动时间、内存占用)

42.8.6 架构演进路线

一个 Agent 产品的跨平台部署通常经历以下演进阶段:

阶段一:Web 单平台(MVP)

  • 专注 Web 端,快速验证产品价值
  • REST API + SSE 实现流式响应
  • 简单的会话管理和用户系统

阶段二:+ 移动端

  • 使用 React Native 复用业务逻辑
  • 添加推送通知、语音输入等移动端能力
  • 实现基础的跨设备会话同步

阶段三:+ 桌面端 + CLI

  • 使用 Tauri 构建桌面应用,提供文件系统和终端访问能力
  • CLI 工具融入开发者工作流
  • 完善跨平台状态同步机制

阶段四:+ IDE 插件 + 微信生态

  • VS Code/JetBrains 插件实现编码场景的 Agent 集成
  • 微信小程序/企业微信接入,覆盖中国市场
  • 全平台统一监控和运营体系

阶段五:平台深度融合

  • 每个平台都有平台原生的深度优化体验
  • 智能平台选择:根据用户场景自动推荐最合适的平台
  • 跨平台工作流:不同平台间无缝协作
时间线:
Web MVP → +Mobile → +Desktop/CLI → +IDE/微信 → 深度融合
  1月        3月         6月           9月          12月+

本章小结

跨平台部署是 Agent 产品从"技术验证"走向"规模化应用"的关键一步。本章从 Web、移动端、桌面端、CLI、IDE 插件、微信生态六个维度,详细探讨了 Agent 跨平台部署的技术方案和最佳实践。

核心要点回顾:

  1. 后端统一、前端适配是跨平台部署的基本架构原则
  2. WebSocket + SSE 的组合可以满足 Agent 产品对实时通信的多样化需求
  3. 移动端需要特别关注离线策略和平台特有能力(语音、相机、位置)的利用
  4. 桌面端(Tauri/Electron)提供了文件系统和终端命令的深度集成能力
  5. CLI 是融入开发者工作流的重要入口,Rich + Click 可以快速构建专业级 CLI
  6. IDE 插件能实现上下文感知的 Agent 辅助,显著提升编码效率
  7. 微信生态是中国市场的必选项,小程序和企业微信各有适用场景
  8. 统一数据模型 + API 网关是跨平台一致性的技术保障

下一步思考:

  • 如何在保持跨平台一致性的同时,充分利用各平台的独特能力?
  • 边缘计算的发展将如何改变 Agent 的跨平台架构?
  • 如何设计一套可扩展的平台适配层,使新平台的接入成本最低化?

「一个好的 Agent 产品应该像水一样,倒入任何容器(平台)都能自然地适应其形状。」

基于 MIT 许可发布