第42章:跨平台部署
"一次编写,到处运行"——从 Java 到 Agent,跨平台始终是软件工程的核心命题,但 Agent 的跨平台面临着独特的挑战:模型推理的算力依赖、实时通信的低延迟要求、以及多端用户体验的一致性保障。
42.1 概述:为什么需要跨平台
42.1.1 Agent 产品跨平台的必要性
Agent 产品与传统的 Web 应用或移动应用有着本质区别——它需要与用户进行持续的、多轮次的、上下文感知的交互。这种交互模式决定了 Agent 产品必须在用户最方便的场景中出现,无论用户是在浏览器中工作、在手机上查收消息、在 IDE 中编码,还是在微信中沟通。
Agent 跨平台部署的核心驱动力来自三个方面:
1. 用户场景的碎片化
现代用户的信息触达场景高度碎片化。一个企业用户可能在工作时间使用桌面端 IDE 进行编码,在通勤途中通过手机查阅 Agent 生成的报告,在会议中通过大屏展示 Agent 分析的结果。如果 Agent 只存在于单一平台,就会错过大量用户交互机会。
2. 交互形式的多样化
不同平台有着天然适合的交互形式:
- Web 端:适合复杂的数据展示、长文档编辑、多面板工作台
- 移动端:适合语音交互、拍照识别、推送通知、快速查阅
- 桌面端:适合与本地工具深度集成、文件系统访问、后台常驻
- CLI 端:适合开发者工作流、CI/CD 集成、脚本自动化
- IDE 端:适合编码辅助、代码审查、调试辅助
- 微信生态:适合中国企业用户、低门槛触达、社交传播
3. 商业覆盖的最大化
从商业视角看,每多覆盖一个平台,就意味着多一个用户触达渠道、多一种变现方式。特别是 ToB 市场,客户往往要求私有化部署 + 多终端适配,跨平台能力直接决定了产品的竞争力。
42.1.2 Agent 跨平台的特殊挑战
与普通应用不同,Agent 跨平台面临以下特殊挑战:
| 挑战维度 | 具体问题 | 影响 |
|---|---|---|
| 模型推理 | 边缘设备的算力限制,大模型无法在移动端运行 | 需要云-端协同架构 |
| 实时通信 | Agent 的流式响应需要低延迟、可靠的双向通信 | WebSocket/SSE 的跨平台实现 |
| 上下文同步 | 用户在不同设备间的会话状态需要无缝衔接 | 需要统一的会话管理后端 |
| 工具调用 | 不同平台的系统能力差异(如文件系统访问权限) | 需要平台感知的工具适配层 |
| UI 一致性 | Agent 的交互体验需要在不同平台上保持连贯 | 设计系统的跨平台适配 |
| 安全合规 | 不同平台有不同的安全模型和合规要求 | 如 iOS 的沙箱限制 vs 桌面端的完整权限 |
42.1.3 跨平台架构总览
一个完整的 Agent 跨平台架构通常包含以下层次:
┌──────────────────────────────────────────────────────┐
│ 客户端展示层 │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ Web │ │ Mobile│ │Desktop│ │ CLI │ │ IDE │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│ │ 跨平台通信层(REST API + WebSocket + SSE) │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│ │ 业务逻辑层(Agent Core - 语言无关) │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│ │ 数据持久层(统一数据模型 + 多端同步) │
├─────┼────────┼────────┼────────┼────────┼───────────┤
│ │ 基础设施层(模型推理 · 向量库 · 消息队列) │
└─────┴────────┴────────┴────────┴────────┴───────────┘核心设计原则:
- 后端统一:所有平台共享同一套 Agent Core 后端服务,确保行为一致性
- 前端适配:每个平台使用最合适的技术栈实现前端,不强制跨平台 UI 框架
- 通信标准化:使用 REST API + WebSocket/SSE 的标准通信协议,便于各端接入
- 数据模型统一:定义跨平台的统一数据模型,确保会话、消息等核心数据的一致性
42.2 Web 端部署
42.2.1 Web 端的技术选型
Web 端是 Agent 产品最基础的部署平台,也是大多数 Agent 产品的第一个版本的目标平台。以下是主流的技术选型方案:
前端框架对比:
| 特性 | React | Vue 3 | Next.js | Nuxt 3 |
|---|---|---|---|---|
| 生态成熟度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Agent 组件库 | 丰富 | 适中 | 丰富 | 适中 |
| SSR 支持 | 需配合 Next.js | 需配合 Nuxt | 内置 | 内置 |
| 学习曲线 | 中等 | 较低 | 中高 | 中等 |
| 适合场景 | 复杂交互型 | 快速原型 | SEO/SSR | 全栈型 |
对于 Agent 产品而言,React + TypeScript 是当前最主流的选择,原因在于:
- Agent 产品通常有复杂的交互状态管理需求,React 的状态管理生态最为丰富
- TypeScript 的类型系统能有效管理 Agent 交互中的复杂类型(消息、工具调用结果等)
- 大量的 AI/Agent 开源组件和示例都基于 React
42.2.2 React + TypeScript 的 Agent Web 客户端架构
下面是一个典型的 Agent Web 客户端架构示例:
// src/types/agent.ts - 统一类型定义
export interface AgentMessage {
id: string;
role: 'user' | 'assistant' | 'system' | 'tool';
content: string;
timestamp: number;
toolCalls?: ToolCall[];
toolResults?: ToolResult[];
metadata?: Record<string, unknown>;
}
export interface ToolCall {
id: string;
name: string;
arguments: Record<string, unknown>;
status: 'pending' | 'running' | 'completed' | 'failed';
}
export interface ToolResult {
callId: string;
content: string;
isError?: boolean;
}
export interface Conversation {
id: string;
title: string;
messages: AgentMessage[];
model: string;
createdAt: number;
updatedAt: number;
}
// src/hooks/useAgent.ts - Agent 交互核心 Hook
import { useState, useCallback, useRef } from 'react';
interface UseAgentOptions {
apiUrl: string;
model?: string;
onToolCall?: (call: ToolCall) => Promise<string>;
}
export function useAgent(options: UseAgentOptions) {
const [messages, setMessages] = useState<AgentMessage[]>([]);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async (content: string) => {
const userMessage: AgentMessage = {
id: crypto.randomUUID(),
role: 'user',
content,
timestamp: Date.now(),
};
setMessages(prev => [...prev, userMessage]);
setIsLoading(true);
setError(null);
const controller = new AbortController();
abortControllerRef.current = controller;
try {
const response = await fetch(`${options.apiUrl}/chat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage].map(m => ({
role: m.role, content: m.content,
})),
model: options.model,
stream: true,
}),
signal: controller.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
// 流式读取响应
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let assistantContent = '';
const assistantMessage: AgentMessage = {
id: crypto.randomUUID(),
role: 'assistant',
content: '',
timestamp: Date.now(),
};
setMessages(prev => [...prev, assistantMessage]);
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n').filter(Boolean);
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;
const parsed = JSON.parse(data);
if (parsed.choices?.[0]?.delta?.content) {
assistantContent += parsed.choices[0].delta.content;
setMessages(prev =>
prev.map(m =>
m.id === assistantMessage.id
? { ...m, content: assistantContent }
: m
)
);
}
}
}
}
} catch (err: any) {
if (err.name !== 'AbortError') setError(err.message);
} finally {
setIsLoading(false);
abortControllerRef.current = null;
}
}, [messages, options]);
const stopGeneration = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
return { messages, isLoading, error, sendMessage, stopGeneration };
}
// src/components/ChatPanel.tsx - 聊天面板组件
import { useRef, useEffect } from 'react';
import { useAgent } from '../hooks/useAgent';
interface ChatPanelProps {
apiUrl: string;
model?: string;
}
export function ChatPanel({ apiUrl, model }: ChatPanelProps) {
const { messages, isLoading, error, sendMessage, stopGeneration } = useAgent({
apiUrl, model,
});
const messagesEndRef = useRef<HTMLDivElement>(null);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
return (
<div className="flex flex-col h-full">
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{isLoading && (
<div className="flex items-center gap-2 text-gray-500">
<div className="animate-spin w-4 h-4 border-2 border-gray-300 border-t-blue-500 rounded-full" />
<span>Agent 思考中...</span>
</div>
)}
{error && (
<div className="text-red-500 bg-red-50 p-3 rounded-lg">{error}</div>
)}
<div ref={messagesEndRef} />
</div>
<div className="border-t p-4">
<div className="flex gap-2">
<textarea
className="flex-1 resize-none border rounded-lg p-3 focus:outline-none focus:ring-2 focus:ring-blue-500"
placeholder="输入消息..."
rows={1}
onKeyDown={(e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
const target = e.target as HTMLTextAreaElement;
if (target.value.trim()) {
sendMessage(target.value.trim());
target.value = '';
}
}
}}
/>
{isLoading ? (
<button onClick={stopGeneration}
className="px-4 py-2 bg-red-500 text-white rounded-lg hover:bg-red-600">停止</button>
) : (
<button onClick={() => {
const textarea = document.querySelector('textarea');
if (textarea?.value.trim()) {
sendMessage(textarea.value.trim());
textarea.value = '';
}
}} className="px-4 py-2 bg-blue-500 text-white rounded-lg hover:bg-blue-600">发送</button>
)}
</div>
</div>
</div>
);
}
// 消息气泡组件 - 处理工具调用展示
function MessageBubble({ message }: { message: AgentMessage }) {
const isUser = message.role === 'user';
return (
<div className={`flex ${isUser ? 'justify-end' : 'justify-start'}`}>
<div className={`max-w-[80%] rounded-lg p-4 ${isUser ? 'bg-blue-500 text-white' : 'bg-gray-100 text-gray-900'}`}>
{message.toolCalls?.map((call) => (
<div key={call.id} className="mb-2 p-2 bg-white/10 rounded border border-white/20">
<div className="flex items-center gap-2 text-sm">
<span className="font-mono">{call.name}</span>
<ToolCallStatus status={call.status} />
</div>
<pre className="mt-1 text-xs overflow-x-auto">
{JSON.stringify(call.arguments, null, 2)}
</pre>
</div>
))}
<div className="whitespace-pre-wrap">{message.content}</div>
</div>
</div>
);
}
function ToolCallStatus({ status }: { status: ToolCall['status'] }) {
const config = {
pending: { icon: '⏳', text: '等待中' },
running: { icon: '⚡', text: '执行中' },
completed: { icon: '✅', text: '已完成' },
failed: { icon: '❌', text: '失败' },
};
const { icon, text } = config[status];
return <span>{icon} {text}</span>;
}42.2.3 WebSocket 实时通信
Agent 产品的核心交互特性——流式输出、工具调用状态更新、实时协作——都依赖于高效的实时通信。WebSocket 是实现这些功能的基础协议。
# backend/websocket_manager.py - WebSocket 连接管理
import asyncio
import json
import uuid
from typing import Dict, Set
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class WSClient:
"""WebSocket 客户端连接"""
client_id: str
user_id: str
session_id: str
websocket: object
connected_at: datetime = field(default_factory=datetime.now)
last_heartbeat: datetime = field(default_factory=datetime.now)
class WebSocketManager:
"""WebSocket 连接管理器"""
def __init__(self):
self._clients: Dict[str, WSClient] = {}
self._session_clients: Dict[str, Set[str]] = {}
self._user_clients: Dict[str, Set[str]] = {}
self._heartbeat_interval = 30
async def connect(self, websocket, user_id: str, session_id: str) -> str:
client_id = str(uuid.uuid4())
client = WSClient(client_id=client_id, user_id=user_id,
session_id=session_id, websocket=websocket)
self._clients[client_id] = client
if session_id not in self._session_clients:
self._session_clients[session_id] = set()
self._session_clients[session_id].add(client_id)
if user_id not in self._user_clients:
self._user_clients[user_id] = set()
self._user_clients[user_id].add(client_id)
return client_id
async def disconnect(self, client_id: str):
client = self._clients.pop(client_id, None)
if not client:
return
if client.session_id in self._session_clients:
self._session_clients[client.session_id].discard(client_id)
if not self._session_clients[client.session_id]:
del self._session_clients[client.session_id]
if client.user_id in self._user_clients:
self._user_clients[client.user_id].discard(client_id)
if not self._user_clients[client.user_id]:
del self._user_clients[client.user_id]
async def send_to_session(self, session_id: str, message: dict):
if session_id not in self._session_clients:
return
disconnected = []
for client_id in self._session_clients[session_id]:
client = self._clients.get(client_id)
if client:
try:
await client.websocket.send_json(message)
except Exception:
disconnected.append(client_id)
for cid in disconnected:
await self.disconnect(cid)
async def send_to_user(self, user_id: str, message: dict):
if user_id not in self._user_clients:
return
disconnected = []
for client_id in self._user_clients[user_id]:
client = self._clients.get(client_id)
if client:
try:
await client.websocket.send_json(message)
except Exception:
disconnected.append(client_id)
for cid in disconnected:
await self.disconnect(cid)
async def broadcast(self, message: dict, exclude_client: str = None):
disconnected = []
for client_id, client in self._clients.items():
if client_id == exclude_client:
continue
try:
await client.websocket.send_json(message)
except Exception:
disconnected.append(client_id)
for cid in disconnected:
await self.disconnect(cid)
# 消息类型定义
class MessageType:
AGENT_CHUNK = "agent.chunk"
AGENT_COMPLETE = "agent.complete"
AGENT_ERROR = "agent.error"
TOOL_CALL_START = "tool.call.start"
TOOL_CALL_RESULT = "tool.call.result"
TOOL_CALL_ERROR = "tool.call.error"
SESSION_CREATED = "session.created"
SESSION_UPDATED = "session.updated"
TYPING_START = "status.typing.start"
TYPING_STOP = "status.typing.stop"
HEARTBEAT = "system.heartbeat"
HEARTBEAT_ACK = "system.heartbeat.ack"# backend/routes/chat_ws.py - WebSocket 聊天路由
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json
router = APIRouter()
@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str,
token: str = None):
user = await verify_ws_token(token)
if not user:
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
client_id = await ws_manager.connect(websocket, user.id, session_id)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
msg_type = message.get("type")
if msg_type == MessageType.HEARTBEAT:
await websocket.send_json({
"type": MessageType.HEARTBEAT_ACK,
"timestamp": datetime.now().isoformat(),
})
ws_manager._clients[client_id].last_heartbeat = datetime.now()
elif msg_type == "chat.message":
content = message.get("content", "")
model = message.get("model", "default")
asyncio.create_task(
process_agent_stream(session_id, user.id, content,
model, ws_manager)
)
elif msg_type == MessageType.STOP_GENERATION:
await cancel_stream(session_id)
except WebSocketDisconnect:
await ws_manager.disconnect(client_id)
except Exception as e:
await ws_manager.disconnect(client_id)
async def process_agent_stream(session_id, user_id, content, model, ws_manager):
try:
await ws_manager.send_to_session(session_id, {
"type": MessageType.TYPING_START,
"data": {"session_id": session_id},
})
async for event in agent_engine.stream_chat(
session_id=session_id, user_id=user_id,
content=content, model=model,
):
event_type = event.get("type")
if event_type == "text_delta":
await ws_manager.send_to_session(session_id, {
"type": MessageType.AGENT_CHUNK,
"data": {"content": event["content"], "message_id": event["message_id"]},
})
elif event_type == "tool_call":
await ws_manager.send_to_session(session_id, {
"type": MessageType.TOOL_CALL_START,
"data": {"call_id": event["call_id"], "name": event["name"],
"arguments": event["arguments"]},
})
elif event_type == "tool_result":
await ws_manager.send_to_session(session_id, {
"type": MessageType.TOOL_CALL_RESULT,
"data": {"call_id": event["call_id"], "content": event["content"]},
})
await ws_manager.send_to_session(session_id, {
"type": MessageType.AGENT_COMPLETE,
"data": {"session_id": session_id},
})
await ws_manager.send_to_session(session_id, {
"type": MessageType.TYPING_STOP,
"data": {"session_id": session_id},
})
except Exception as e:
await ws_manager.send_to_session(session_id, {
"type": MessageType.AGENT_ERROR,
"data": {"message": str(e)},
})42.2.4 SSE(Server-Sent Events)作为替代方案
在某些场景下,SSE 是比 WebSocket 更简单的单向推送方案:
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务端→客户端) | 双向 |
| 协议 | HTTP | WS/WSS |
| 自动重连 | 浏览器内置 | 需手动实现 |
| 复杂度 | 简单 | 较高 |
| 适用场景 | 流式输出、通知推送 | 实时聊天、协作编辑 |
| 代理兼容 | 好(标准HTTP) | 可能被拦截 |
对于 Agent 产品,推荐组合使用:SSE 处理流式响应(单向推送),WebSocket 处理实时协作(双向通信)。
// SSE 客户端实现
export function useSSEAgent(apiUrl: string) {
const [messages, setMessages] = useState<AgentMessage[]>([]);
const [isLoading, setIsLoading] = useState(false);
const sendMessage = async (content: string) => {
const userMsg: AgentMessage = {
id: crypto.randomUUID(), role: 'user',
content, timestamp: Date.now(),
};
setMessages(prev => [...prev, userMsg]);
setIsLoading(true);
const assistantMsg: AgentMessage = {
id: crypto.randomUUID(), role: 'assistant',
content: '', timestamp: Date.now(),
};
setMessages(prev => [...prev, assistantMsg]);
try {
const response = await fetch(`${apiUrl}/chat/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: [...messages, userMsg] }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let accumulated = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const events = text.split('\n\n');
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (!dataLine) continue;
const jsonStr = dataLine.slice(6);
if (jsonStr === '[DONE]') break;
const parsed = JSON.parse(jsonStr);
if (parsed.content) {
accumulated += parsed.content;
setMessages(prev =>
prev.map(m => m.id === assistantMsg.id ? { ...m, content: accumulated } : m)
);
}
}
}
} finally {
setIsLoading(false);
}
};
return { messages, isLoading, sendMessage };
}42.3 移动端部署
42.3.1 移动端 Agent 的技术选型
移动端 Agent 部署的核心挑战在于:移动设备的算力有限,无法在本地运行大模型推理,因此必须采用云端推理 + 轻量客户端的架构。
技术方案对比:
| 方案 | 优势 | 劣势 | 适合场景 |
|---|---|---|---|
| React Native | 与 Web 端共享业务逻辑 | 原生体验略差 | 已有 React 技术栈的团队 |
| Flutter | 性能好、UI 一致性高 | Dart 生态较小 | 新项目、追求极致 UI |
| 原生 iOS + Android | 最佳性能和体验 | 开发成本翻倍 | 高端产品、深度原生集成 |
| Capacitor/Ionic | 快速从 Web 转化 | 性能有上限 | 简单 Agent 展示类应用 |
| 小程序 | 零安装、低门槛 | 能力受限 | 中国市场、轻量级工具 |
42.3.2 React Native Agent 客户端实现
// mobile/src/services/AgentService.ts - Agent 服务层
import { Platform } from 'react-native';
interface AgentConfig {
baseUrl: string;
model: string;
apiKey: string;
}
class AgentService {
private config: AgentConfig;
private abortController: AbortController | null = null;
constructor(config: AgentConfig) { this.config = config; }
async *sendMessageStream(
messages: Array<{ role: string; content: string }>,
): AsyncGenerator<{
type: 'text_delta' | 'tool_call' | 'tool_result' | 'done' | 'error';
content?: string; data?: any;
}> {
const controller = new AbortController();
this.abortController = controller;
try {
const response = await fetch(`${this.config.baseUrl}/chat/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.config.apiKey}`,
},
body: JSON.stringify({
messages, model: this.config.model,
platform: Platform.OS,
}),
signal: controller.signal,
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') { yield { type: 'done' }; return; }
try {
const parsed = JSON.parse(data);
yield parsed;
} catch { /* skip */ }
}
}
} catch (err: any) {
if (err.name !== 'AbortError') yield { type: 'error', content: err.message };
}
}
stopGeneration() { this.abortController?.abort(); }
// 语音转文字(移动端特有能力)
async transcribeAudio(audioUri: string): Promise<string> {
const formData = new FormData();
formData.append('audio', {
uri: audioUri, type: 'audio/m4a', name: 'recording.m4a',
} as any);
const response = await fetch(`${this.config.baseUrl}/audio/transcribe`, {
method: 'POST',
headers: { 'Authorization': `Bearer ${this.config.apiKey}` },
body: formData,
});
const result = await response.json();
return result.text;
}
// 图片识别(移动端拍照后调用)
async analyzeImage(imageUri: string, prompt: string): Promise<string> {
const formData = new FormData();
formData.append('image', {
uri: imageUri, type: 'image/jpeg', name: 'photo.jpg',
} as any);
formData.append('prompt', prompt);
const response = await fetch(`${this.config.baseUrl}/vision/analyze`, {
method: 'POST',
headers: { 'Authorization': `Bearer ${this.config.apiKey}` },
body: formData,
});
const result = await response.json();
return result.content;
}
}42.3.3 移动端特有的 Agent 能力
移动端相比 Web 端有一些独特的能力可以利用:
1. 语音交互
- 语音输入:录音 → 语音转文字 → 发送给 Agent
- 语音输出:Agent 文本 → 文字转语音 → 播放
- 语音唤醒:"嘿,助手" 唤醒 Agent
2. 相机能力
- 拍照识别:OCR、物体识别、文档扫描
- 实时取景:AR 场景下的 Agent 辅助
3. 位置服务
- 基于位置的 Agent 建议
- 地理围栏触发的自动化
4. 推送通知
- Agent 任务完成通知
- 定时提醒、关键信息告警
// mobile/src/services/CapabilityBridge.ts - 能力桥接层
import { Platform } from 'react-native';
import { Linking, Notifications } from 'react-native';
/**
* 能力桥接层:统一封装不同平台的特有能力
* Agent 工具调用时通过此层访问设备能力
*/
class CapabilityBridge {
private handlers: Map<string, Function> = new Map();
registerCapability(name: string, handler: Function) {
this.handlers.set(name, handler);
}
async callCapability(name: string, params: any): Promise<any> {
const handler = this.handlers.get(name);
if (!handler) throw new Error(`Capability not supported: ${name}`);
const platformCheck = await this.checkPlatformSupport(name);
if (!platformCheck.supported) {
return { success: false, error: `Not supported on ${Platform.OS}` };
}
return handler(params);
}
private async checkPlatformSupport(name: string) {
const capabilities: Record<string, Record<string, boolean>> = {
'camera.capture': { ios: true, android: true, web: true },
'location.current': { ios: true, android: true, web: true },
'contacts.read': { ios: true, android: true, web: false },
'sms.send': { ios: false, android: true, web: false },
'notification.local': { ios: true, android: true, web: true },
'calendar.read': { ios: true, android: true, web: false },
};
const cap = capabilities[name];
if (!cap) return { supported: true };
return { supported: cap[Platform.OS] ?? false };
}
init() {
this.registerCapability('phone.call', async (params: any) => {
const { number } = params;
const supported = await Linking.canOpenURL(`tel:${number}`);
if (supported) { await Linking.openURL(`tel:${number}`); return { success: true }; }
return { success: false, error: 'Cannot make phone calls' };
});
this.registerCapability('notification.local', async (params: any) => {
await Notifications.scheduleNotificationAsync({
content: { title: params.title, body: params.body, data: params.data },
trigger: params.trigger || null,
});
return { success: true };
});
}
}
export default new CapabilityBridge();42.3.4 离线与弱网策略
移动端网络环境不稳定,需要专门的离线/弱网策略:
# backend/services/offline_sync.py - 离线同步策略
from dataclasses import dataclass
from datetime import datetime
from typing import List
@dataclass
class PendingMessage:
id: str
user_id: str
content: str
timestamp: datetime
retry_count: int = 0
max_retries: int = 3
class OfflineSyncService:
"""离线消息同步服务"""
def __init__(self, storage_backend):
self.storage = storage_backend
async def cache_message(self, user_id: str, content: str) -> str:
"""离线时缓存消息"""
import uuid
msg = PendingMessage(
id=str(uuid.uuid4()),
user_id=user_id,
content=content,
timestamp=datetime.now(),
)
await self.storage.save_pending(msg)
return msg.id
async def sync_pending(self, user_id: str,
process_fn) -> dict:
"""网络恢复时同步缓存消息"""
pending = await self.storage.get_pending(user_id)
sent, failed = 0, 0
for msg in pending:
try:
await process_fn(msg.content)
await self.storage.remove_pending(msg.id)
sent += 1
except Exception:
msg.retry_count += 1
if msg.retry_count >= msg.max_retries:
await self.storage.mark_failed(msg)
failed += 1
else:
await self.storage.update_pending(msg)
return {"sent": sent, "failed": failed, "remaining": len(pending) - sent - failed}
async def get_pending_count(self, user_id: str) -> int:
return await self.storage.count_pending(user_id)42.4 桌面端部署
42.4.1 Electron vs Tauri
桌面端 Agent 应用适合需要深度系统集成的场景,如文件管理、IDE 集成、后台常驻服务等。
| 特性 | Electron | Tauri |
|---|---|---|
| 包大小 | ~150MB | ~5-10MB |
| 内存占用 | 较高(Chromium 内核) | 低(系统 WebView) |
| 启动速度 | 较慢 | 快 |
| 技术栈 | HTML/CSS/JS | 前端任意 + Rust 后端 |
| 系统集成 | 通过 Node.js | 通过 Rust |
| 安全模型 | Node.js 完整权限 | 最小权限原则 |
| 生态成熟度 | 非常成熟 | 快速增长 |
42.4.2 Tauri Agent 客户端实现
// src-tauri/src/commands/agent.rs - Tauri Agent 命令
use tauri::{command, State};
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
#[derive(Serialize, Deserialize, Clone)]
pub struct AgentMessage {
pub id: String,
pub role: String,
pub content: String,
pub timestamp: i64,
}
#[derive(Serialize, Deserialize)]
pub struct AppConfig {
pub api_url: String,
pub api_key: String,
pub model: String,
pub theme: String,
}
pub struct AppState {
pub config: Mutex<AppConfig>,
}
#[command]
pub async fn send_chat_message(
state: State<'_, AppState>,
messages: Vec<AgentMessage>,
model: String,
) -> Result<String, String> {
let config = state.config.lock().map_err(|e| e.to_string())?;
let client = reqwest::Client::new();
let response = client
.post(format!("{}/chat", config.api_url))
.header("Authorization", format!("Bearer {}", config.api_key))
.json(&serde_json::json!({
"messages": messages,
"model": model,
"stream": false,
}))
.send()
.await
.map_err(|e| format!("Request failed: {}", e))?;
if !response.status().is_success() {
return Err(format!("API error: {}", response.status()));
}
response.text().await.map_err(|e| e.to_string())
}
#[command]
pub async fn read_local_file(path: String) -> Result<String, String> {
let expanded = shellexpand::tilde(&path).to_string();
tokio::fs::read_to_string(&expanded)
.await
.map_err(|e| format!("Failed to read file: {}", e))
}
#[command]
pub async fn execute_terminal_command(
command: String,
cwd: Option<String>,
) -> Result<String, String> {
let output = if let Some(dir) = cwd {
tokio::process::Command::new("sh")
.arg("-c").arg(&command)
.current_dir(shellexpand::tilde(&dir).as_ref())
.output().await
} else {
tokio::process::Command::new("sh")
.arg("-c").arg(&command)
.output().await
};
match output {
Ok(output) => Ok(serde_json::json!({
"stdout": String::from_utf8_lossy(&output.stdout),
"stderr": String::from_utf8_lossy(&output.stderr),
"exit_code": output.status.code(),
}).to_string()),
Err(e) => Err(format!("Command failed: {}", e)),
}
}
#[command]
pub async fn get_config(state: State<'_, AppState>) -> Result<AppConfig, String> {
state.config.lock().map_err(|e| e.to_string()).map(|c| c.clone())
}
#[command]
pub async fn update_config(
state: State<'_, AppState>,
new_config: AppConfig,
) -> Result<(), String> {
let mut config = state.config.lock().map_err(|e| e.to_string())?;
*config = new_config;
Ok(())
}42.4.3 桌面端特有的 Agent 能力
桌面端相比移动端和 Web 端,有以下独特优势:
- 文件系统完整访问:读写任意文件、目录浏览和管理、文件监听
- 终端命令执行:运行 Shell 命令、获取命令输出、管理子进程
- 系统级集成:系统托盘常驻、全局快捷键、剪贴板访问、窗口管理
- 本地存储:大容量本地数据库、本地文件缓存、本地向量索引
42.5 CLI 工具
42.5.1 Agent CLI 的价值
CLI 是开发者最熟悉的工作界面。一个设计精良的 Agent CLI 可以:
- 融入开发者工作流:通过管道与其他工具组合
- 自动化脚本集成:在 CI/CD 中使用 Agent
- 快速原型验证:无需启动 GUI 即可测试 Agent 行为
- 低资源开销:终端应用资源占用极低
42.5.2 Rich + Click 构建 Agent CLI
# cli/main.py - Agent CLI 主入口
import click
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.live import Live
from rich.prompt import Prompt
console = Console()
@click.group()
@click.option('--api-url', default=None, help='Agent API URL')
@click.option('--model', default=None, help='Default model')
@click.pass_context
def cli(ctx, api_url, model):
"""🤖 Agent CLI - Your AI assistant in the terminal."""
ctx.ensure_object(dict)
ctx.obj['api_url'] = api_url
ctx.obj['model'] = model
@cli.command()
@click.argument('message', nargs=-1, required=False)
@click.option('--session', '-s', default=None, help='Session ID')
@click.option('--stream/--no-stream', default=True, help='Stream response')
@click.pass_context
def chat(ctx, message, session, stream):
"""Chat with the Agent."""
from agent_client import AgentClient
api_url = ctx.obj['api_url'] or load_config().api_url
model = ctx.obj['model'] or load_config().model
client = AgentClient(api_url, model)
if message:
msg = ' '.join(message)
console.print(f"\n[bold cyan]You:[/] {msg}\n")
with console.status("[bold green]Agent thinking..."):
response = client.send_message(msg, session_id=session)
console.print(Panel(Markdown(response), title="[bold magenta]Agent[/]",
border_style="magenta"))
else:
_interactive_chat(client, session)
def _interactive_chat(client, session_id):
"""交互式对话循环"""
console.print(Panel(
"[bold]Interactive Chat Mode[/]\n"
"Type your message and press Enter.\n"
"[dim]/quit - Exit | /clear - Clear | /model - Switch | /export - Save[/]",
title="🤖 Agent CLI", border_style="blue",
))
history = []
current_model = client.model
while True:
try:
user_input = Prompt.ask("\n[bold cyan]You[/bold]")
except (EOFError, KeyboardInterrupt):
console.print("\n[dim]Goodbye! 👋[/]")
break
if user_input.startswith('/'):
cmd = user_input.strip().lower()
if cmd in ('/quit', '/exit'):
console.print("[dim]Goodbye! 👋[/]")
break
elif cmd == '/clear':
history.clear()
console.print("[yellow]History cleared.[/]")
continue
elif cmd == '/model':
current_model = Prompt.ask("Model name", default=current_model)
console.print(f"[green]Switched to: {current_model}[/]")
continue
elif cmd == '/export':
_export_chat(history)
continue
else:
console.print(f"[red]Unknown: {cmd}[/]")
continue
history.append({"role": "user", "content": user_input})
# 流式输出
console.print()
full_response = ""
with Live(console=console, refresh_per_second=15) as live:
for chunk in client.stream_message(user_input, history=history,
model=current_model):
if chunk.get("type") == "text_delta":
full_response += chunk["content"]
live.update(Panel(Markdown(full_response),
title="[bold magenta]Agent[/]",
border_style="magenta"))
elif chunk.get("type") == "tool_call":
live.update(Panel(
f"[yellow]⚡ Calling: {chunk['name']}[/]\n"
f"[dim]{chunk.get('arguments', '')}[/]",
title="[bold magenta]Agent[/]", border_style="yellow"))
history.append({"role": "assistant", "content": full_response})
@cli.command()
@click.argument('file_path', type=click.Path(exists=True))
@click.option('--question', '-q', help='Question about the file')
@click.pass_context
def analyze(ctx, file_path, question):
"""Analyze a file with the Agent."""
import os
from agent_client import AgentClient
api_url = ctx.obj['api_url'] or load_config().api_url
model = ctx.obj['model'] or load_config().model
client = AgentClient(api_url, model)
with open(file_path, 'r') as f:
content = f.read()
console.print(Panel(
f"[bold]File:[/] {file_path}\n[bold]Size:[/] {os.path.getsize(file_path):,} bytes",
title="📄 File Analysis", border_style="blue",
))
ext = os.path.splitext(file_path)[1].lstrip('.')
syntax = Syntax(content[:2000], ext, theme="monokai", line_numbers=True)
console.print(syntax)
prompt = question or f"分析这个文件,指出关键问题和改进建议:\n```\n{content[:5000]}\n```"
with console.status("[bold green]Analyzing..."):
response = client.send_message(prompt)
console.print(Panel(Markdown(response), title="[bold magenta]Analysis[/]",
border_style="magenta"))
@cli.command()
@click.argument('command_text')
def exec_agent(command_text):
"""Execute an agent command with tool access."""
from agent_client import AgentClient
client = AgentClient(load_config().api_url, load_config().model)
console.print(f"[bold]Executing:[/] {command_text}\n")
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
console=console) as progress:
task = progress.add_task("Agent executing...", total=None)
for event in client.execute_with_tools(command_text):
if event["type"] == "tool_call":
progress.update(task, description=f"⚡ Tool: {event['name']}")
elif event["type"] == "tool_result":
console.print(Panel(event["content"][:500],
title=f"[green]✅ {event.get('name', 'tool')}[/]",
border_style="green"))
elif event["type"] == "text":
console.print(Markdown(event["content"]))
console.print("[bold green]✓ Done[/]")
def _export_chat(history):
import json
from datetime import datetime
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"chat_{ts}.json", 'w') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
console.print(f"[green]Exported to chat_{ts}.json[/]")
if __name__ == '__main__':
cli()# cli/agent_client.py - CLI 的 Agent 客户端
import httpx
import json
from typing import Generator, Optional, List, Dict
class AgentClient:
def __init__(self, api_url: str, model: str, api_key: str = ""):
self.api_url = api_url.rstrip('/')
self.model = model
self.api_key = api_key
self.client = httpx.Client(timeout=120.0)
def send_message(self, message: str, session_id: Optional[str] = None,
history: Optional[List[Dict]] = None,
model: Optional[str] = None) -> str:
messages = history or []
messages.append({"role": "user", "content": message})
response = self.client.post(
f"{self.api_url}/v1/chat/completions",
json={"messages": messages, "model": model or self.model, "stream": False},
headers=self._headers(),
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
def stream_message(self, message: str, history=None, model=None) -> Generator:
messages = history or []
messages.append({"role": "user", "content": message})
with self.client.stream("POST",
f"{self.api_url}/v1/chat/completions",
json={"messages": messages, "model": model or self.model, "stream": True},
headers=self._headers(),
) as response:
response.raise_for_status()
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]": break
try:
parsed = json.loads(data)
delta = parsed["choices"][0].get("delta", {})
if delta.get("content"):
yield {"type": "text_delta", "content": delta["content"]}
except json.JSONDecodeError:
continue
def execute_with_tools(self, command: str) -> Generator:
messages = [
{"role": "system", "content": "你是终端 Agent,可以执行工具完成任务。"},
{"role": "user", "content": command},
]
response = self.client.post(
f"{self.api_url}/v1/chat/completions",
json={"messages": messages, "model": self.model,
"tools": self._default_tools(), "tool_choice": "auto"},
headers=self._headers(),
)
response.raise_for_status()
data = response.json()
choice = data["choices"][0]
if choice.get("tool_calls"):
for tc in choice["tool_calls"]:
fn = tc["function"]
yield {"type": "tool_call", "name": fn["name"],
"arguments": json.loads(fn["arguments"])}
result = self._execute_tool(fn["name"], json.loads(fn["arguments"]))
yield {"type": "tool_result", "name": fn["name"], "content": result}
yield {"type": "text", "content": choice["message"]["content"]}
def _default_tools(self) -> list:
return [
{"type": "function", "function": {
"name": "read_file", "description": "Read file contents",
"parameters": {"type": "object", "properties": {"path": {"type": "string"}},
"required": ["path"]}}},
{"type": "function", "function": {
"name": "write_file", "description": "Write content to file",
"parameters": {"type": "object",
"properties": {"path": {"type": "string"}, "content": {"type": "string"}},
"required": ["path", "content"]}}},
{"type": "function", "function": {
"name": "run_command", "description": "Execute shell command",
"parameters": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}}},
]
def _execute_tool(self, name: str, args: dict) -> str:
import subprocess
if name == "read_file":
with open(args["path"], "r") as f: return f.read()
elif name == "write_file":
with open(args["path"], "w") as f: f.write(args["content"])
return f"Written to {args['path']}"
elif name == "run_command":
result = subprocess.run(args["command"], shell=True, capture_output=True, text=True)
return result.stdout or result.stderr
return f"Unknown tool: {name}"
def _headers(self) -> dict:
h = {"Content-Type": "application/json"}
if self.api_key: h["Authorization"] = f"Bearer {self.api_key}"
return h42.5.3 CLI 使用示例
# 单条消息
$ agent chat "解释 MCP 协议"
# 交互式对话
$ agent chat
# 分析文件
$ agent analyze main.py -q "找出潜在内存泄漏"
# 管道操作
$ cat error.log | agent chat "分析这些错误日志"
# Agent 执行(带工具调用)
$ agent exec-agent "重构项目,添加类型注解"42.6 IDE 插件
42.6.1 IDE Agent 插件的价值
IDE 是开发者每天工作时间最长的环境。将 Agent 集成到 IDE 中,可以实现:
- 上下文感知:自动获取当前文件、选区、项目结构作为 Agent 输入
- 无缝集成:不需要切换窗口,直接在编码环境中使用 Agent
- 工作流增强:代码生成、重构、调试、测试等编码活动都能被 Agent 增强
42.6.2 VS Code Extension API 开发
// src/extension.ts - VS Code Agent 插件入口
import * as vscode from 'vscode';
import { AgentService } from './services/agentService';
import { ChatViewProvider } from './views/chatViewProvider';
import { InlineCompletionProvider } from './providers/inlineCompletionProvider';
import { CodeActionProvider } from './providers/codeActionProvider';
export function activate(context: vscode.ExtensionContext) {
const agentService = new AgentService();
// 注册侧边栏 Chat 视图
const chatProvider = new ChatViewProvider(context.extensionUri, agentService);
context.subscriptions.push(
vscode.window.registerWebviewViewProvider('agentChatView', chatProvider)
);
// 注册内联补全 Provider
const inlineProvider = new InlineCompletionProvider(agentService);
context.subscriptions.push(
vscode.languages.registerInlineCompletionItemProvider({ pattern: '**' }, inlineProvider)
);
// 注册 Code Action Provider
const codeActionProvider = new CodeActionProvider(agentService);
context.subscriptions.push(
vscode.languages.registerCodeActionsProvider(
{ pattern: '**' }, codeActionProvider,
{ providedCodeActionKinds: [vscode.CodeActionKind.Refactor] }
)
);
// 快捷命令
context.subscriptions.push(
vscode.commands.registerTextEditorCommand('agent.explainSelection', async (editor) => {
const selection = editor.document.getText(editor.selection);
if (!selection) return;
const explanation = await agentService.explainCode(selection, editor.document.languageId);
const doc = await vscode.workspace.openTextDocument({
content: explanation, language: 'markdown',
});
await vscode.window.showTextDocument(doc, { preview: true });
}),
vscode.commands.registerTextEditorCommand('agent.refactorSelection', async (editor, edit) => {
const selection = editor.document.getText(editor.selection);
if (!selection) return;
const refactored = await agentService.refactorCode(selection, editor.document.languageId);
edit.replace(editor.selection, refactored);
}),
vscode.commands.registerTextEditorCommand('agent.addTests', async (editor, edit) => {
const selection = editor.document.getText(editor.selection);
if (!selection) return;
const tests = await agentService.generateTests(selection, editor.document.languageId);
const lastLine = editor.document.lineCount;
editor.edit((eb) => eb.insert(new vscode.Position(lastLine, 0), '\n\n' + tests));
}),
vscode.commands.registerCommand('agent.openChat', () => {
vscode.commands.executeCommand('agentChatView.focus');
}),
vscode.commands.registerCommand('agent.sendFileToChat', () => {
const editor = vscode.window.activeTextEditor;
if (editor) chatProvider.sendFile(editor.document);
}),
);
}
export function deactivate() {}// src/providers/inlineCompletionProvider.ts - 内联补全
import * as vscode from 'vscode';
import { AgentService } from '../services/agentService';
export class InlineCompletionProvider implements vscode.InlineCompletionItemProvider {
constructor(private agentService: AgentService) {}
async provideInlineCompletionItems(
document: vscode.TextDocument,
position: vscode.Position,
context: vscode.InlineCompletionContext,
token: vscode.CancellationToken,
): Promise<vscode.InlineCompletionItem[]> {
// 获取光标前 20 行上下文
const prefix = document.getText(
new vscode.Range(
new vscode.Position(Math.max(0, position.line - 20), 0),
position
)
);
try {
const completion = await this.agentService.getInlineCompletion(
prefix, document.languageId, token,
);
if (!completion || token.isCancellationRequested) return [];
return [new vscode.InlineCompletionItem(completion,
new vscode.Range(position, position))];
} catch {
return [];
}
}
}42.6.3 JetBrains 插件开发
JetBrains IDE(IntelliJ、PyCharm、WebStorm 等)也可以使用 Kotlin + IntelliJ Platform SDK 开发插件:
// src/main/kotlin/com/agent/jetbrains/AgentAction.kt
package com.agent.jetbrains
import com.intellij.notification.NotificationGroupManager
import com.intellij.notification.NotificationType
import com.intellij.openapi.actionSystem.AnAction
import com.intellij.openapi.actionSystem.AnActionEvent
import com.intellij.openapi.actionSystem.CommonDataKeys
import com.intellij.openapi.progress.ProgressManager
import com.intellij.openapi.progress.Task
class ExplainCodeAction : AnAction() {
override fun actionPerformed(e: AnActionEvent) {
val editor = e.getData(CommonDataKeys.EDITOR) ?: return
val project = e.project ?: return
val selectedText = editor.selectionModel.selectedText ?: return
val language = editor.document.fileType.name
ProgressManager.getInstance().run(object : Task.Backgroundable(
project, "Agent Analyzing", true
) {
override fun run(indicator: com.intellij.openapi.progress.ProgressIndicator) {
indicator.text = "Agent analyzing..."
indicator.fraction = 0.3
val service = AgentService.getInstance(project)
val explanation = service.explainCode(selectedText, language)
indicator.fraction = 1.0
NotificationGroupManager.getInstance()
.getNotificationGroup("Agent")
.createNotification("Code Explanation", explanation, NotificationType.INFORMATION)
.notify(project)
}
})
}
}42.7 微信生态
42.7.1 微信生态的特殊性
微信生态在中国市场具有不可替代的地位。将 Agent 接入微信生态,可以:
- 零门槛触达:用户无需安装新应用
- 社交传播:通过群聊、朋友圈实现病毒式传播
- 企业场景:企业微信已成为企业标配
- 支付闭环:微信支付实现商业闭环
42.7.2 微信小程序 Agent 集成
// miniprogram/pages/chat/chat.js - 小程序聊天页面
const app = getApp();
Page({
data: {
messages: [],
inputValue: '',
isLoading: false,
scrollToBottom: false,
},
onLoad() { this.loadHistory(); },
loadHistory() {
const history = wx.getStorageSync('chat_history') || [];
this.setData({ messages: history });
},
onInputChange(e) { this.setData({ inputValue: e.detail.value }); },
async sendMessage() {
const { inputValue, messages } = this.data;
if (!inputValue.trim() || this.data.isLoading) return;
const userMsg = {
id: Date.now().toString(),
role: 'user',
content: inputValue.trim(),
time: new Date().toLocaleTimeString(),
};
const newMessages = [...messages, userMsg];
this.setData({ messages: newMessages, inputValue: '', isLoading: true, scrollToBottom: true });
try {
const response = await this.callAgentAPI(newMessages);
const assistantMsg = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: response,
time: new Date().toLocaleTimeString(),
};
const allMessages = [...newMessages, assistantMsg];
this.setData({ messages: allMessages, isLoading: false, scrollToBottom: true });
wx.setStorageSync('chat_history', allMessages.slice(-50));
} catch (err) {
wx.showToast({ title: '请求失败', icon: 'none' });
this.setData({ isLoading: false });
}
},
callAgentAPI(messages) {
return new Promise((resolve, reject) => {
wx.request({
url: `${app.globalData.apiBaseUrl}/chat`,
method: 'POST',
header: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${wx.getStorageSync('token')}`,
},
data: {
messages: messages.map(m => ({ role: m.role, content: m.content })),
platform: 'miniprogram',
},
success: (res) => {
if (res.statusCode === 200) resolve(res.data.content);
else reject(new Error(res.data.message));
},
fail: reject,
});
});
},
clearChat() {
wx.showModal({
title: '确认清空',
content: '确定清空聊天记录?',
success: (res) => {
if (res.confirm) {
this.setData({ messages: [] });
wx.removeStorageSync('chat_history');
}
},
});
},
});<!-- miniprogram/pages/chat/chat.wxml -->
<view class="chat-container">
<scroll-view class="message-list" scroll-y
scroll-into-view="{{scrollToBottom ? 'msg-bottom' : ''}}" scroll-with-animation>
<view wx:for="{{messages}}" wx:key="id" class="message-wrapper {{item.role}}">
<view class="avatar">
<image wx:if="{{item.role === 'user'}}" src="/images/user.png" mode="aspectFill" />
<view wx:else class="agent-avatar">🤖</view>
</view>
<view class="message-content">
<view class="message-bubble {{item.role}}">{{item.content}}</view>
<text class="message-time">{{item.time}}</text>
</view>
</view>
<view wx:if="{{isLoading}}" class="loading-indicator">
<view class="typing-dots"><view class="dot"></view><view class="dot"></view><view class="dot"></view></view>
</view>
<view id="msg-bottom"></view>
</scroll-view>
<view class="input-area">
<input class="message-input" value="{{inputValue}}" bindinput="onInputChange"
placeholder="输入消息..." bindconfirm="sendMessage" confirm-type="send" />
<button class="send-btn" bindtap="sendMessage"
disabled="{{!inputValue || isLoading}}">发送</button>
</view>
</view>42.7.3 企业微信机器人
企业微信机器人是将 Agent 接入企业工作群的有效方式:
# backend/integrations/wecom_bot.py - 企业微信机器人集成
import hmac
import hashlib
import base64
import time
import urllib.parse
import httpx
from typing import Optional
from dataclasses import dataclass
@dataclass
class WecomBotConfig:
webhook_url: str
secret: Optional[str] = None
class WecomBotClient:
"""企业微信机器人客户端"""
def __init__(self, config: WecomBotConfig):
self.config = config
self.client = httpx.Client(timeout=30.0)
def _sign(self) -> Optional[str]:
if not self.config.secret:
return None
timestamp = str(int(time.time()))
string_to_sign = f"{timestamp}\n{self.config.secret}"
hmac_code = hmac.new(
self.config.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256,
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"×tamp={timestamp}&sign={sign}"
def send_text(self, content: str, mentioned_list: list = None):
payload = {
"msgtype": "text",
"text": {"content": content, "mentioned_list": mentioned_list or []},
}
return self._send(payload)
def send_markdown(self, content: str):
payload = {"msgtype": "markdown", "markdown": {"content": content}}
return self._send(payload)
def send_agent_response(self, question: str, answer: str):
content = f"**🤖 Agent 响应**\n\n> {question}\n\n{answer}\n\n---\n*由 Agent 自动生成*"
return self.send_markdown(content)
def _send(self, payload: dict):
url = self.config.webhook_url
sign = self._sign()
if sign:
url += sign
response = self.client.post(url, json=payload)
response.raise_for_status()
return response.json()
# FastAPI 回调集成
from fastapi import FastAPI, Request
import asyncio
app = FastAPI()
wecom_bot = WecomBotClient(WecomBotConfig(
webhook_url="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx",
))
@app.post("/wecom/callback")
async def wecom_callback(request: Request):
body = await request.json()
if body.get("MsgType") != "text":
return {"errcode": 0, "errmsg": "ok"}
content = body.get("Content", "").strip()
if not content.startswith("@bot"):
return {"errcode": 0, "errmsg": "ok"}
question = content.replace("@bot", "").strip()
if not question:
return {"errcode": 0, "errmsg": "ok"}
asyncio.create_task(_handle_wecom_question(question))
return {"errcode": 0, "errmsg": "ok"}
async def _handle_wecom_question(question: str):
try:
response = await agent_engine.chat(question)
wecom_bot.send_agent_response(question, response)
except Exception as e:
wecom_bot.send_text(f"Agent 处理失败:{str(e)}")42.8 跨平台架构设计
42.8.1 统一数据模型
跨平台部署的关键是定义一套统一的数据模型,确保各端对核心数据结构的理解一致:
// packages/shared/src/models.ts - 跨平台共享模型
export interface Message {
id: string;
sessionId: string;
role: 'user' | 'assistant' | 'system' | 'tool';
content: string;
createdAt: string; // ISO 8601
metadata: MessageMetadata;
}
export interface MessageMetadata {
model?: string;
tokens?: TokenUsage;
duration?: number;
toolCalls?: ToolCallInfo[];
platform?: string;
parentId?: string;
}
export interface TokenUsage {
prompt: number;
completion: number;
total: number;
}
export interface ToolCallInfo {
id: string;
name: string;
arguments: Record<string, unknown>;
result?: string;
duration?: number;
isError?: boolean;
}
export interface Session {
id: string;
title: string;
messages: Message[];
model: string;
systemPrompt?: string;
createdAt: string;
updatedAt: string;
settings: SessionSettings;
}
export interface SessionSettings {
temperature: number;
maxTokens: number;
topP: number;
toolsEnabled: boolean;
contextWindow: 'auto' | number;
}
export interface ChatRequest {
messages: Array<{ role: string; content: string }>;
model?: string;
stream?: boolean;
sessionId?: string;
platform?: string;
tools?: Tool[];
temperature?: number;
maxTokens?: number;
}
export interface StreamEvent {
type: 'text_delta' | 'tool_call' | 'tool_result' | 'done' | 'error';
id?: string;
content?: string;
data?: any;
timestamp: string;
}
export interface PlatformCapabilities {
id: string;
name: string;
supportsVoiceInput: boolean;
supportsVoiceOutput: boolean;
supportsFileAccess: boolean;
supportsCamera: boolean;
supportsLocation: boolean;
supportsNotifications: boolean;
supportsClipboard: boolean;
maxUploadSize: number;
supportedFileTypes: string[];
}42.8.2 统一 API 网关
所有平台通过统一的 API 网关与后端通信:
# backend/gateway/router.py - 统一 API 网关
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import json
app = FastAPI(title="Agent API Gateway", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"https://app.your-agent.com",
"capacitor://localhost",
"tauri://localhost",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def platform_aware_middleware(request: Request, call_next):
"""识别请求来源平台"""
platform = request.headers.get("X-Platform", "unknown")
ua = request.headers.get("User-Agent", "")
if "MicroMessenger" in ua:
platform = "wechat_miniprogram"
elif "Electron" in ua:
platform = "electron_desktop"
elif "Tauri" in ua:
platform = "tauri_desktop"
request.state.platform = platform
response = await call_next(request)
response.headers["X-Platform"] = platform
return response
@app.post("/api/v1/chat")
async def chat(request: ChatRequest, req: Request):
"""统一聊天端点 - 支持流式和非流式"""
platform = getattr(req.state, 'platform', 'unknown')
# 平台感知的参数调整
if platform == "wechat_miniprogram":
request.stream = False
request.maxTokens = min(request.maxTokens or 4096, 2048)
if request.stream:
return StreamingResponse(
_stream_response(request, platform),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
else:
return await _sync_response(request, platform)
async def _stream_response(request, platform):
async for event in agent_engine.stream_chat(
messages=request.messages, model=request.model,
session_id=request.sessionId, platform=platform,
):
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"42.8.3 跨平台状态同步
用户在不同设备间的会话状态需要无缝同步。核心同步策略包括:
1. 增量同步
- 每条新消息作为增量事件推送到所有在线设备
- 离线设备重新上线后,拉取增量消息
2. 冲突解决
- 基于时间戳的 Last-Write-Wins 策略
- 编辑类操作使用 Operational Transformation (OT)
3. 会话一致性
- 统一的 Session ID 在所有平台间共享
- 通过 QR 码或链接实现跨设备会话迁移
# backend/services/sync_service.py - 跨平台同步服务
from datetime import datetime
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class SyncEvent:
event_id: str
user_id: str
session_id: str
event_type: str # 'message_new', 'session_update', 'settings_change'
data: dict
timestamp: datetime
sequence: int # 单调递增序列号
class SyncService:
"""跨平台状态同步服务"""
def __init__(self, ws_manager, storage):
self.ws_manager = ws_manager
self.storage = storage
self._sequences: Dict[str, int] = {} # session_id -> sequence
async def on_message_created(self, user_id: str, session_id: str, message: dict):
"""新消息创建时同步到所有在线设备"""
seq = self._next_sequence(session_id)
event = SyncEvent(
event_id=message["id"],
user_id=user_id,
session_id=session_id,
event_type="message_new",
data={"message": message},
timestamp=datetime.now(),
sequence=seq,
)
# 实时推送到在线设备
await self.ws_manager.send_to_user(user_id, {
"type": "sync.event",
"event": {
"event_id": event.event_id,
"event_type": event.event_type,
"data": event.data,
"sequence": event.sequence,
},
})
# 持久化到同步日志
await self.storage.save_sync_event(event)
async def get_missing_events(self, user_id: str, session_id: str,
since_sequence: int) -> list:
"""获取设备缺失的同步事件"""
events = await self.storage.get_sync_events(
session_id=session_id,
since_sequence=since_sequence,
)
return events
async def resolve_conflict(self, event_1: SyncEvent, event_2: SyncEvent) -> SyncEvent:
"""解决同步冲突"""
# Last-Write-Wins 策略
if event_1.timestamp > event_2.timestamp:
return event_1
return event_2
def _next_sequence(self, session_id: str) -> int:
current = self._sequences.get(session_id, 0)
next_seq = current + 1
self._sequences[session_id] = next_seq
return next_seq42.8.4 跨平台测试策略
# tests/test_cross_platform.py - 跨平台测试
import pytest
from unittest.mock import AsyncMock, MagicMock
class TestCrossPlatformAPI:
"""跨平台 API 测试"""
@pytest.mark.parametrize("platform,expected_max_tokens", [
("web", 4096),
("ios", 4096),
("android", 4096),
("wechat_miniprogram", 2048), # 小程序限制更小
("tauri_desktop", 8192),
("electron_desktop", 8192),
])
async def test_platform_aware_limits(self, platform, expected_max_tokens):
"""测试平台感知的参数限制"""
request = ChatRequest(maxTokens=8192)
adjusted = apply_platform_limits(request, platform)
assert adjusted.maxTokens == expected_max_tokens
@pytest.mark.parametrize("platform", ["web", "ios", "android", "desktop"])
async def test_unified_data_model(self, platform):
"""测试统一数据模型在各平台的一致性"""
message = Message(
id="test-123",
sessionId="sess-456",
role="user",
content="Hello",
createdAt=datetime.now().isoformat(),
metadata={"platform": platform},
)
# 序列化和反序列化应保持一致
serialized = message.model_dump_json()
deserialized = Message.model_validate_json(serialized)
assert deserialized.id == message.id
assert deserialized.content == message.content
async def test_websocket_multi_device(self):
"""测试多设备 WebSocket 连接"""
ws_manager = WebSocketManager()
# 模拟两个设备连接
ws1 = AsyncMock()
ws2 = AsyncMock()
await ws_manager.connect(ws1, "user-1", "session-1")
await ws_manager.connect(ws2, "user-1", "session-1")
# 发送消息到 session
await ws_manager.send_to_session("session-1", {"type": "test", "data": "hello"})
# 两个设备都应收到
ws1.send_json.assert_called_once()
ws2.send_json.assert_called_once()
async def test_offline_sync_recovery(self):
"""测试离线同步恢复"""
sync_service = SyncService(
ws_manager=AsyncMock(),
storage=AsyncMock(),
)
# 模拟设备 A 在线时产生的消息
await sync_service.on_message_created("user-1", "sess-1", {"id": "msg-1"})
await sync_service.on_message_created("user-1", "sess-1", {"id": "msg-2"})
# 设备 B 上线后拉取缺失事件
sync_service.storage.get_sync_events.return_value = [
{"event_id": "msg-1", "sequence": 1},
{"event_id": "msg-2", "sequence": 2},
]
missing = await sync_service.get_missing_events("user-1", "sess-1", since_sequence=0)
assert len(missing) == 242.8.5 跨平台最佳实践总结
| 实践 | 说明 |
|---|---|
| 共享类型定义 | 使用 monorepo 管理跨平台共享的类型定义和工具函数 |
| 平台适配层 | 为每个平台的特有能力建立适配层(Capability Bridge) |
| 渐进式增强 | 核心功能全平台一致,高级功能按平台增强 |
| 统一认证 | 使用 OAuth 2.0 + JWT 实现跨平台统一认证 |
| API 版本管理 | 使用 URL 版本前缀(/v1/、/v2/)确保向后兼容 |
| 统一监控 | 所有平台的请求都携带平台标识,便于分平台监控和分析 |
| 灰度发布 | 按平台维度进行功能灰度,降低跨平台发布风险 |
| 性能预算 | 为每个平台设定性能预算(包大小、启动时间、内存占用) |
42.8.6 架构演进路线
一个 Agent 产品的跨平台部署通常经历以下演进阶段:
阶段一:Web 单平台(MVP)
- 专注 Web 端,快速验证产品价值
- REST API + SSE 实现流式响应
- 简单的会话管理和用户系统
阶段二:+ 移动端
- 使用 React Native 复用业务逻辑
- 添加推送通知、语音输入等移动端能力
- 实现基础的跨设备会话同步
阶段三:+ 桌面端 + CLI
- 使用 Tauri 构建桌面应用,提供文件系统和终端访问能力
- CLI 工具融入开发者工作流
- 完善跨平台状态同步机制
阶段四:+ IDE 插件 + 微信生态
- VS Code/JetBrains 插件实现编码场景的 Agent 集成
- 微信小程序/企业微信接入,覆盖中国市场
- 全平台统一监控和运营体系
阶段五:平台深度融合
- 每个平台都有平台原生的深度优化体验
- 智能平台选择:根据用户场景自动推荐最合适的平台
- 跨平台工作流:不同平台间无缝协作
时间线:
Web MVP → +Mobile → +Desktop/CLI → +IDE/微信 → 深度融合
1月 3月 6月 9月 12月+本章小结
跨平台部署是 Agent 产品从"技术验证"走向"规模化应用"的关键一步。本章从 Web、移动端、桌面端、CLI、IDE 插件、微信生态六个维度,详细探讨了 Agent 跨平台部署的技术方案和最佳实践。
核心要点回顾:
- 后端统一、前端适配是跨平台部署的基本架构原则
- WebSocket + SSE 的组合可以满足 Agent 产品对实时通信的多样化需求
- 移动端需要特别关注离线策略和平台特有能力(语音、相机、位置)的利用
- 桌面端(Tauri/Electron)提供了文件系统和终端命令的深度集成能力
- CLI 是融入开发者工作流的重要入口,Rich + Click 可以快速构建专业级 CLI
- IDE 插件能实现上下文感知的 Agent 辅助,显著提升编码效率
- 微信生态是中国市场的必选项,小程序和企业微信各有适用场景
- 统一数据模型 + API 网关是跨平台一致性的技术保障
下一步思考:
- 如何在保持跨平台一致性的同时,充分利用各平台的独特能力?
- 边缘计算的发展将如何改变 Agent 的跨平台架构?
- 如何设计一套可扩展的平台适配层,使新平台的接入成本最低化?
「一个好的 Agent 产品应该像水一样,倒入任何容器(平台)都能自然地适应其形状。」