大家好,我是二哥呀。
一开始,我在做 PaiAgent 这个 Vibe Coding 项目时,确实是在工作流跑完的时候一次性返回结果的,Vibe Coding 尽量小版本持续迭代。但这样的话,用户就无法感知工作流的实时执行状态。如果工作流非常复杂,需要等待的时间很久,用户很容易在等的过程中失去耐心。
所以在做 PaiFlow 的时候,我就决定,每一个节点的状态都应该实时推送给前端。有了这个前提,我们再来聊,消息是怎么回传的,实时通信又是怎么做的。
第一个是事件驱动。工作流的执行本质上是一连串状态的变化过程,节点开始、节点跑完、节点失败。所以我们要在每一个关键时刻,主动把事件抛出来,通过发布-订阅的方式,让前端感知到。
第二个是消息必须标准化,不能今天是这个状态,明天又多一个状态,所有回传消息都必须是统一的结构,不管是工作流级别的事件,还是节点级别的事件,前端只需要按照同一种方式去解析和渲染就好了。
第三个,在技术选型上,我们决定使用 Server-Sent Events,也就是 SSE。这种方式非常适合单向的,服务端向前端推送状态的场景;另外就是,派聪明 RAG 我们用了 WebSocket,之前也有球友说为什么不用 SSE,那这次我们就狠狠听劝,用。
前端发起工作流执行请求,后端一边跑 Workflow,一边把节点状态和 LLM 的流式输出持续推送给前端。这个业务并不复杂,但会涉及到非常多的组件。
我们从入口开始看,WorkflowController.executeWorkflow 会先创建一个 SseEmitter 对象,然后将其作为参数创建 SseStreamCallback 对象,再把这个 callback 作为参数交给 WorkflowEngine.execute 方法。
WorkflowController.executeWorkflow()
├─ SseEmitter emitter = new SseEmitter()
└─ StreamCallback callback = new SseStreamCallback(emitter)
进入 WorkflowEngine 之后,引擎会先创建两个队列,一个是 streamQueue,用来缓存实时产生的事件消息,另一个是 orderStreamResultQ,用来缓存那些需要在收尾阶段按顺序处理的结果。然后引擎会把外部传入的 callback + 队列作为参数创建 WorkflowMsgCallback 对象。WorkflowMsgCallback 负责把引擎内部发生的 onWorkflowStart、onNodeStart、onNodeProcess、onNodeEnd 这些事件统一转换成可传输的消息对象,并丢进队列。与此同时,引擎会启动一个异步线程去消费 streamQueue,这样生产者和消费者就彻底解耦了。
WorkflowEngine.execute(dsl, variablePool, inputs, callback)
├─ Queue streamQueue = new ConcurrentLinkedQueue()
├─ WorkflowMsgCallback workflowCallback = new WorkflowMsgCallback(
│ sid,
│ callback, // SseStreamCallback
│ streamQueue
│ )
│ └─ 构造方法中启动异步线程:
│ while(tag) {
│ poll streamQueue → clientCallback.callback()
│ }
├─ workflowCallback.onWorkflowStart()
└─ executeNode(startNode, variablePool, workflowCallback)
准备工作完成后,引擎会先触发 workflowCallback.onWorkflowStart,然后从起始节点开始 executeNode。executeNode 先检查前置节点依赖是否满足,再根据节点类型拿到对应的 NodeExecutor,最后把 NodeState 传进去执行。NodeState 有三个字段,Node、VariablePool、Callback。
WorkflowEngine.executeNode(node, variablePool, workflowCallback)
├─ 递归执行前置节点
├─ NodeExecutor executor = nodeExecutors.get(nodeType)
└─ executor.execute(new NodeState(node, variablePool, workflowCallback))
到了 AbstractNodeExecutor 这一层,它先执行重试逻辑,然后调用 doExecute 方法。doExecute 方法会先执行 callback.onNodeStart 通知节点开始,再从 VariablePool 里把输入参数解析出来,调用子类实现的 executeNode 方法。比如 LLMNodeExecutor 会去调模型,PluginNodeExecutor 会去调 MCP 工具。每个节点跑完后会执行 callback.onNodeEnd 宣告节点结束。
AbstractNodeExecutor.execute(nodeState)
├─ 重试控制
└─ doExecute(nodeState)
├─ callback.onNodeStart(0, nodeId, aliasName)
│ ↓
│ WorkflowMsgCallback.onNodeStart()
│ ↓
│ ChatCallBacks.onNodeStart()
│ ├─ 创建 LLMGenerate.nodeStart()
│ └─ streamQueue.offer(resp)
│ ↓
│ 【异步线程】poll() → clientCallback.callback()
│ ↓
│ SseStreamCallback.callback("stream", resp)
│ ├─ JSON.toJSONString(resp)
│ └─ emitter.send()
│ ↓
│ 前端:🟡 正在执行 LLM 节点
│
├─ resolvedInputs = resolveInputs(node, variablePool)
├─ result = executeNode(nodeState, resolvedInputs)
│ ↓
│ LLMNodeExecutor.executeNode()
│ ├─ 调用 DeepSeek API
│ └─ 每生成一段:callback.onNodeProcess(..., "文本", ...)
│ ↓
│ WorkflowMsgCallback.onNodeProcess()
│ ↓
│ ChatCallBacks.onNodeProcess()
│ └─ streamQueue.offer(nodeProcessEvent)
│ ↓
│ 【异步线程】→ emitter.send()
│ ↓
│ 前端:📝 播客脚本第一段...
│ 前端:📝 播客脚本第二段...
│
├─ storeOutputs(node, result.getOutputs(), variablePool)
└─ callback.onNodeEnd(nodeId, aliasName, result)
↓
WorkflowMsgCallback.onNodeEnd()
↓
ChatCallBacks.onNodeEnd()
└─ streamQueue.offer(nodeEndEvent)
↓
前端:✅ LLM 节点完成
WorkflowMsgCallback 接到 onNodeStart 或 onNodeProcess 这种事件后,不是直接 emitter.send 发消息,而是交给 ChatCallBacks 去创建统一的事件对象,然后执行 streamQueue.offer 入队。真正的消息发送是在异步消费线程里,它会不断从 streamQueue.poll 拿消息,然后调用 clientCallback,也就是最初的 SseStreamCallback。SseStreamCallback.callback 会把事件序列化成 JSON,再通过 emitter.send 把数据推到前端。前端收到消息后,会根据消息的类型和状态(RUNNING、SUCCESS、ERROR)动态变化,展示工作流的实时状态。
WorkflowEngine.execute() finally块
└─ workflowCallback.finished()
├─ tag = false (停止异步线程)
├─ 等待异步线程结束
├─ 消费剩余队列数据
└─ clientCallback.finished()
↓
SseStreamCallback.finished()
emitter.complete()
↓
前端:SSE 连接关闭
当工作流执行结束的时候,会对资源进行释放。整个过程涉及到的类比较多,我用另外一幅图来辅助大家消化理解,当然后面我还会一一解释(从多个角度,类和消息的生命周期)。
1.SSE涉及到的关键类
1.1 回调接口 StreamCallback
StreamCallback 是工作流实时流式响应的回调接口,用于将工作流执行过程中的事件和数据实时推送到前端。StreamCallback 是一个策略接口,具体的传输方式由其实现类决定。
换句话说,如果未来我们想换成 WebSocket 来给前端推送消息,只需要:
- 创建一个新的 WebSocketStreamCallback 来实现 StreamCallback 接口。
2. 在 WorkflowController 中,将 new SseStreamCallback(emitter) 换成 new WebSocketStreamCallback(session) 即可。
而 WorkflowEngine 、 WorkflowMsgCallback 以及所有的 NodeExecutor 都不需要做任何改动 ,因为它们依赖的是抽象的 StreamCallback 接口。
StreamCallback 定义了两个方法,callback 接收两个参数,一个是字符串类型的 eventType,用于标识事件的类型,前端可以根据不同的 eventType 来执行不同的 UI 更新逻辑。另外一个是 Object 类型的 data,回调时可以传递任意类型的数据结构,发送前通常会被序列化为 JSON 字符串。
finished 是一个默认方法。它提供了一个可选的、在所有回调操作完成后的清理机制。
在 WorkflowController.java 中,当一个 /chat 请求进来时,代码会创建一个 SseEmitter 对象(与当前的 HTTP 长连接绑定),然后用它来作为参数 new 一个 SseStreamCallback 实例。
这里的 callback 变量,虽然实现是 SseStreamCallback ,但它被声明为 StreamCallback 接口类型。然后,这个 callback 对象会被传递给 workflowEngine.execute 方法。
接着,在 WorkflowEngine.java 内部,这个 callback 会被用来构造 WorkflowMsgCallback 对象。
在这里, SseStreamCallback 实例会被赋值给 WorkflowMsgCallback 内部的 clientCallback 字段。
也就是说,当引擎执行节点时,会调用 workflowCallback.onNodeStart(...) ,workflowCallback 内部会调用 chatCallBacks 来生产标准格式的消息,并将消息体放入队列中。
workflowCallback 的异步任务又会从队列中取出消息,调用 clientCallback.callback(...) 。因为 clientCallback 的实际对象是 SseStreamCallback ,所以最终会调用 SseStreamCallback 的 callback 方法,通过 SseEmitter 将数据发送出去。
1.2 SSE适配器SseStreamCallback
SseStreamCallback 是 SSE 的传输层适配器,负责将工作流...
热门评论
10 条评论
回复