杠精派
派聪明
0/512

热门评论

Ψ 2026年03月09日 13:26
引用原文:WorkflowController.executeWorkflow 会先创建一个 SseEmitter 对象,然后将其作为参数创建 SseStreamCallback 对象
@派聪明 这是在干什么
点赞
回复1 取消回复
派聪明 2026年03月09日 13:27
这段代码描述的是建立**服务器到前端实时通信通道**的核心步骤,目的是让前端能实时接收工作流执行状态。具体解释如下: ### 1. **创建 `SseEmitter` 对象** - **作用**:建立服务器到前端的 **HTTP 长连接**(基于 **SSE 协议**)。 - **本质**:`SseEmitter` 是 Spring 提供的 **服务端发送事件(SSE)工具**,负责维持一个持久的 HTTP 连接,允许服务器主****端推送数据。 - **为什么需要**:传统 HTTP 请求是“一问一答”模式,工作流执行时间可能很长,用户无法实时看到进度。SSE 允许服务器在任务执行过程中持续推送状态更新。 ### 2. **创建 `SseStreamCallback` 对象** - **作用**:将工作流事件**封装成 SSE 协议格式**,并通过 `SseEmitter` 发送给前端。 - **实现逻辑**: - `SseStreamCallback` 实现了 `StreamCallback` 接口,是**传输层适配器**。 - 当工作流引擎(如 `WorkflowEngine`)产生事件(如节点开始、处理中、结束)时,会调用 `SseStreamCallback.callback()` 方法。 - 该方法将事件对象序列化为 JSON,通过 `***************()` 推送给前端。 - **代码示例**: ```java // WorkflowController.executeWorkflow() SseEmitter emitter = new SseEmitter(); // 创建长连接 StreamCallback callback = new SseStreamCallback(emitter); // 封装为回调 ``` ### 3. **整体流程** ```mermaid graph LR A[前端请求工作流执行] --> B[WorkflowController] B --> C[创建 SseEmitter] B --> D[创建 SseStreamCallback] D --> E[传递给 WorkflowEngine] E --> F[工作流执行中产生事件] F --> G[调用 SseStreamCallback] G --> H[序列化事件为JSON] H --> I[***************() 推送至前端] I --> J[前端实时更新UI] ``` ### 4. **设计优势** - **解耦**:工作流引擎(`WorkflowEngine`)不关心具体传输方式,只需调用 `StreamCallback` 接口。 - **可扩展性**:未来若改用 WebSocket,只需新增 `WebSocketStreamCallback` 实现 `StreamCallback`,其他代码无需修改。 - **标准化**:所有事件(节点状态、流式输出)通过统一格式(`LLMGenerate`)推送,前端按固定逻辑解析。 ### 总结 这段代码的核心是**搭建实时通信管道**: - `SseEmitter` 负责维持长连接。 - `SseStreamCallback` 负责将工作流事件转化为 SSE 数据流。 - 最终实现:前端能实时看到工作流进度(如“节点开始执行→LLM生成中→节点完成”),提升用户体验。
点赞
回复 取消回复

10 条评论

Ψ 2026年03月09日 16:09
引用原文:ChatCallBacks 根据不同的事件类型 ( onNodeStart 等),创建格式统一的 LLMGenerate 对象(加上时间戳、进度等)。再调用 putFrameIntoQueue() ,将 LLMGenerate 对象放入内部的 streamQueue 队列中。事件在此刻进入了缓冲状态。
这里的判断条件false写死了,所以是没有实现是吗?实际上只是用到了streamQueue队列
点赞
回复 取消回复
Ψ 2026年03月09日 14:25
引用原文:交给 ChatCallBacks 去创建统一的事件对象
@派聪明,这个ChatCallBacks是什么
点赞
回复1 取消回复
派聪明 2026年03月09日 14:26
根据参考资料,**ChatCallBacks** 是工作流引擎中负责**事件生产**的核心组件,主要作用是将工作流执行过程中的各种节点事件(如节点开始、执行中、结束)转换为标准化的消息对象,并推送到消息队列供前端实时消费。以下是关键解析: --- ### **1. 核心职责** - **事件标准化**: **工作流生命周期事件(如 `onNodeStart`、`onNodeProcess`、`onNodeEnd`),将原始事件转换为统一格式的 **`LLMGenerate` 对象**(兼容 OpenAI 流式响应格式)。 - **消息队列管理**: 将生成的消息放入 `streamQueue`(普通流式事件队列)或 `orderStreamResultQ`(需顺序处理的特殊事件队列),解耦生产与消费逻辑。 - **业务数据封装**: 封装节点执行时间、Token消耗、输入/输出等元信息,供前端展示工作流状态。 --- ### **2. 关键方法与事件处理** | 方法 | 功能 | 示例消息片段 | |--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------| | `onNodeStart()` | 节点开始时触发,记录启动时间,生成节点开始事件。 | `{"workflow_step": {"node": {"id": "node-llm::001", "alias_name": "LLM节点"}, "progress": 0.3}}` | | `onNodeProcess()` | 节点执行中触发(如LLM流式输出),实时推增内容。 | `{"choices": [{"delta": {"content": "播客脚本第一段..."}}]}` | | `onNodeEnd()` | 节点结束时触发,统计耗时/Token,封装最终结果。 | `{"workflow_step": {"node": {"outputs": {"llm_output": "完整脚本..."}}}}` | --- ### **3. 在事件流中的位置** ```mermaid graph LR A[节点执行] --> B[AbstractNodeExecutor] B --> C{ChatCallBacks} C --> D[生成LLMGenerate对象] D --> E[放入streamQueue] E --> F[WorkflowMsgCallback异步消费] F --> G[SseStreamCallback推送到前端] ``` - **生产者角色**: ChatCallBacks 位于事件流的起点,接收来自 `NodeExecutor` 的原始事件,加工后放入队列。 - **解耦设计**: 通过队列缓冲消息,避免阻塞工作流主线程,同时支持高并发场景。 --- ### **4. 为什么需要统一事件对象?** - **前端兼容性**: 所有事件(工作流开始/结束、节点状态、LLM流式输出)均采用 `LLMGenerate` 格式,前端可统一解析。 - **扩展性**: 若需改用 WebSocket,只需替换 `SseStreamCallback` 为 `WebSocketStreamCallback`,无需修改 ChatCallBacks 的生产逻辑。 --- ### **总结** **ChatCallBacks 是工作流引擎的“事件工厂”**,负责将分散的节点状态事件标准化为可被前端实时消费的流式消息,是实时通信架构中的关键生产者组件。
点赞
回复 取消回复
Ψ 2026年03月09日 14:23
引用原文:ChatCallBacks 去创建统一的事件对象
这是SseStreamCallback吧,只是在WorkflowMsgCallback中被叫成ChatCallBack
点赞
回复1 取消回复
Ψ 2026年03月09日 14:23
不对啊,这个是哪来的
点赞
回复 取消回复
Ψ 2026年03月09日 14:19
引用原文:然后引擎会把外部传入的 callback + 队列作为参数创建 WorkflowMsgCallback 对象
@派聪明,为什么有了SseStreamCallback还需要封装成WorkflowMsgCallback
点赞
回复1 取消回复
派聪明 2026年03月09日 14:20
WorkflowMsgCallback是对SseStreamCallback的进一步封装,主要原因有三点: 1. **解耦业务逻辑与传输实现**:SseStreamCallback专注于SSE传输的具体实现,而WorkflowMsgCallback负责处理工作流事件的业务逻辑。这种分层设计使得系统更模块化,未来若需更换传输方式(如WebSocket),只需修改SseStreamCallback实现,无需改动业务逻辑。 2. **统一事件处理与标准化**:工作流执行会产生多种事件(工作流开始、节点开始、节点处理中、节点结束等)。WorkflowMsgCallback将这些事件统一转换为标准格式的LLMGenerate对象,确保前端能一致处理。 3. **异步解耦与性能优化**:WorkflowMsgCallback内部维护streamQueue队列,启动独立后台线程异步消费队列。这样: - 生产者(节点执行)和消费者(SSE推送)完全解耦 - 避免阻塞主线程,提高系统响应性能 - 通过队列缓冲消息,应对高并发场景 这种设计实现了业务逻辑、消息处理和传输的三层解耦,使系统更易维护和扩展。
点赞
回复 取消回复
Ψ 2026年03月09日 14:19
引用原文:一个是 streamQueue,用来缓存实时产生的事件消息,另一个是 orderStreamResultQ,用来缓存那些需要在收尾阶段按顺序处理的结果。
@派聪明,为什么要分两个队列
点赞
回复1 取消回复
派聪明 2026年03月09日 14:20
使用两个队列(streamQueue和orderStreamResultQ)是为了优化系统性能和保证消息处理的正确顺序: 1. **streamQueue**用于缓存普通实时事件消息,如LLM节点的流式输出。这类消息需要立即推送给前端,实现实时交互体验。 2. **orderStreamResultQ**专门用于存储需要特殊顺序处理的结果消息,如工作流结束节点或汇总类消息。这类消息需要在所有流式输出完成后,按特定顺序显示。 这种双队列设计的主要优势: - **保证顺序**:确保关键结果(如工作流结束)在所有实时流式输出之后才显示,无论中间产生多少流式数据 - **解耦生产者与消费者**:事件生产者(NodeExecutor)无需关心处理策略,消费者可按需从不同队列获取消息 - **性能优化**:避免实时流式输出被关键结果打断,提升用户体验 - **灵活处理**:为不同类型事件提供差异化处理机制,便于后续扩展和二次处理 这种设计使系统能在高并发场景下保持高效,同时保证前端展示的正确性和流畅性。
点赞
回复 取消回复
Ψ 2026年03月09日 13:54
我觉得最好先从如何设计这个系统开始讲起,为什么要用sse,以及有什么要用两个队列,为什么要封装callback,就像之前派聪明的大厂篇一样。
点赞
回复 取消回复
Ψ 2026年03月09日 13:41
引用原文:它的构造方法接收一个 clientCallback 参数,通常是我们上面提到的 SseStreamCallback 实例。这形成了一个 装饰器模式 , WorkflowMsgCallback 包装了 SseStreamCallback。
@派聪明,为什么他们要重重包装,不能直接用SseEmitter和SseStreamCallback吗
点赞
回复1 取消回复
派聪明 2026年03月09日 13:42
这种重重包装是为了实现更好的系统架构和扩展性,主要原因如下: 1. **解耦传输层与业务层**:通过WorkflowMsgCallback包装SseStreamCallback,工作流引擎无需关心具体的传输方式(SSE/WebSocket)。未来如需更换传输方式,只需新增StreamCallback实现类,无需修改工作流引擎代码。 2. **异步处理与缓冲**:WorkflowMsgCallback内部维护streamQueue队列并启动后台线程消费,实现生产者(工作流引擎)与消费者(网络传输)的解耦,避免网络阻塞影响工作流执行。 3. **事件标准化**:将工作流的各种事件(开始、节点执行中、结束等)统一转换为标准格式(LLMGenerate对象),简化前端处理逻辑。 4. **性能优化**:队列机制可缓冲瞬时高并发事件,提升系统吞吐量,特别适合流式响应场景。 5. **扩展性增强**:可在WorkflowMsgCallback中添加事件过滤、转换、聚合等逻辑,而无需修改传输层代码。 这种设计遵循了单一职责原则和开闭原则,使系统更易维护和扩展,是典型的装饰器模式应用。
点赞
回复 取消回复
Ψ 2026年03月09日 13:34
引用原文:然后调用 doExecute 方法。doExecute 方法会先执行 callback.onNodeStart 通知节点开始,再从 VariablePool 里把输入参数解析出来
解读一下这张图@派聪明
点赞
回复1 取消回复
派聪明 2026年03月09日 13:34
根据这段文本和图示,我来解读一下工作流节点执行的流程: 这段描述的是 AbstractNodeExecutor 类中的 doExecute 方法执行过程,这是工作流引擎执行单个节点的核心逻辑: 1. **节点开始****: - 方法首先执行 `callback.onNodeStart`,这会触发一个事件**前端"节点开始执行" - 前端接收到这个事件后,可以显示节点状态为"正在执行" 2. **参数解析**: - 从 VariablePool 中解析节点的输入参数 - VariablePool 是工作流执行过程中的变量存储池,包含了前面节点产生的输出结果 3. **节点执行**: - 调用子类实现的 executeNode 方法执行具体逻辑 - 例如 LLMNodeExecutor 会调用大模型 API,PluginNodeExecutor 会调用外部工具 4. **流式输出处理**: - 在执行过程中,节点会通过 `callback.onNodeProcess` 持续发送中间结果 - 这使得前端能够实时显示生成的内容,实现打字机效果 5. **节点结束****: - 节点执行完成后,执行 `callback.onNodeEnd` **前端"节点执行完成" - 前端接收到这个事件后,可以更新节点状态为"已完成" 整个流程通过事件驱动机制,让前端能够实时感知工作流的执行状态,提升用户体验。这种设计使得即使工作流非常复杂,用户也能清楚地看到执行进度,而不会因为长时间等待而失去耐心。
点赞
回复 取消回复
Ψ 2026年03月09日 13:28
引用原文:一个是 streamQueue,用来缓存实时产生的事件消息,另一个是 orderStreamResultQ,用来缓存那些需要在收尾阶段按顺序处理的结果
@派聪明,为什么要两个队列,他们是怎么工作的
点赞
回复1 取消回复
派聪明 2026年03月09日 13:30
系统使用两个队列是为了解决不同类型消息的优先级和顺序处理问题,具体如下: **为什么需要两个队列:** 1. **消息分类处理**:不同类型的消息需要不同的处理策略 2. **保证顺序**:某些关键结果需要在所有流式输出完成后才展示 3. **性能优化**:避免高并发场景下的阻塞和消息丢失 **两个队列的工作方式:** 1. **streamQueue(普通流式队列)** - 缓存实时产生的事件消息(如LLM节点的流式输出) - 采用FIFO(先进先出)原则 - 作为缓冲区,处理网络抖动或高并**况 - 生产者(NodeExecutor)只管将消息放入,不关心消费顺序 2. **orderStreamResultQ(顺序结果队列)** - 缓存需要在收尾阶段按顺序处理的结果(如结束节点消息) - 确保关键结果在所有流式输出完成后才展示 - 在代码中通过节点类型判断是否放入此队列 **工作流程:** ``` 生产者 → 消息分类 → streamQueue/orderStreamResultQ → 异步消费 → 前端展示 ``` 当LLM节点快速生成大量内容时,这些内容进入streamQueue;而当工作流结束节点执行完毕时,其结果进入orderStreamResultQ。消费者会先处理完streamQueue中的所有消息,再处理orderStreamResultQ中的结果,确保前端展示的正确顺序和完整性。
点赞
回复 取消回复
Ψ 2026年03月09日 13:26
引用原文:WorkflowController.executeWorkflow 会先创建一个 SseEmitter 对象,然后将其作为参数创建 SseStreamCallback 对象
@派聪明 这是在干什么
点赞
回复1 取消回复
派聪明 2026年03月09日 13:27
这段代码描述的是建立**服务器到前端实时通信通道**的核心步骤,目的是让前端能实时接收工作流执行状态。具体解释如下: ### 1. **创建 `SseEmitter` 对象** - **作用**:建立服务器到前端的 **HTTP 长连接**(基于 **SSE 协议**)。 - **本质**:`SseEmitter` 是 Spring 提供的 **服务端发送事件(SSE)工具**,负责维持一个持久的 HTTP 连接,允许服务器主****端推送数据。 - **为什么需要**:传统 HTTP 请求是“一问一答”模式,工作流执行时间可能很长,用户无法实时看到进度。SSE 允许服务器在任务执行过程中持续推送状态更新。 ### 2. **创建 `SseStreamCallback` 对象** - **作用**:将工作流事件**封装成 SSE 协议格式**,并通过 `SseEmitter` 发送给前端。 - **实现逻辑**: - `SseStreamCallback` 实现了 `StreamCallback` 接口,是**传输层适配器**。 - 当工作流引擎(如 `WorkflowEngine`)产生事件(如节点开始、处理中、结束)时,会调用 `SseStreamCallback.callback()` 方法。 - 该方法将事件对象序列化为 JSON,通过 `***************()` 推送给前端。 - **代码示例**: ```java // WorkflowController.executeWorkflow() SseEmitter emitter = new SseEmitter(); // 创建长连接 StreamCallback callback = new SseStreamCallback(emitter); // 封装为回调 ``` ### 3. **整体流程** ```mermaid graph LR A[前端请求工作流执行] --> B[WorkflowController] B --> C[创建 SseEmitter] B --> D[创建 SseStreamCallback] D --> E[传递给 WorkflowEngine] E --> F[工作流执行中产生事件] F --> G[调用 SseStreamCallback] G --> H[序列化事件为JSON] H --> I[***************() 推送至前端] I --> J[前端实时更新UI] ``` ### 4. **设计优势** - **解耦**:工作流引擎(`WorkflowEngine`)不关心具体传输方式,只需调用 `StreamCallback` 接口。 - **可扩展性**:未来若改用 WebSocket,只需新增 `WebSocketStreamCallback` 实现 `StreamCallback`,其他代码无需修改。 - **标准化**:所有事件(节点状态、流式输出)通过统一格式(`LLMGenerate`)推送,前端按固定逻辑解析。 ### 总结 这段代码的核心是**搭建实时通信管道**: - `SseEmitter` 负责维持长连接。 - `SseStreamCallback` 负责将工作流事件转化为 SSE 数据流。 - 最终实现:前端能实时看到工作流进度(如“节点开始执行→LLM生成中→节点完成”),提升用户体验。
点赞
回复 取消回复

目录