杠精派
派聪明
0/512

热门评论

Ψ 2026年03月07日 17:55
引用原文:节点之间不直接相互调用
我看代码里面当前节点执行完后,根据自身状态来决定调用它后面的节点,这不是直接调用吗?@派聪明
点赞
回复1 取消回复
派聪明 2026年03月07日 17:55
"节点之间不直接相互调用"指的是节点之间不直接调用彼此的业务方法,而是通过执行引擎(WorkflowEngine)进行调度。 具体解释: 1. **节点自治性**:每个节点都是独立、自治的执行单元,不知道其他节点的存在。节点只关注自己的输入和输出,不关心数据来自哪个节点或流向哪个节点。 2. **解耦设计**:节点之间不直接调用,避免强耦合。修改一个节点的实现不会影响其他节点。 3. **引擎调度**:当前节点执行完成后,不会自己决定并调用下一个节点。而是: - 将执行结果放入VariablePool - 由WorkflowEngine根据流程定义和节点状态决定下一个要执行的节点 - 执行引擎负责编排节点执行顺序 4. **数据传递方式**:节点之间通过VariablePool传递数据,而不是直接传递对象引用。节点声明自己的输入输出,执行引擎负责数据流转。 这种设计使得系统更加灵活,新增节点类型时只需实现NodeExecutor接口,而无需修改其他节点或执行引擎。
点赞
回复 取消回复

7 条评论

Je 2026年03月12日 17:00
引用原文:变量池VariablePool
@派聪明 变量池中的数据是只存在内存中的吗?有没有对应的持久化操作?
点赞
回复1 取消回复
派聪明 2026年03月12日 17:02
根据参考资料,VariablePool中的数据确实主要存储在内存中。从代码实现可以看出,它使用两层Map结构来存储变量: ```java private final Map<String, Map<String, Object>> variables = new HashMap<>(); ``` VariablePool的生命周期与工作流执行周期绑定,在WorkflowEngine的execute方法中,每次执行工作流时会创建一个新的VariablePool,执行完成后会调用******************()清空变量池: ```java // 清空上下文变量 ******************(); ``` 在finally块中还会移除上下文信息: ```java finally { // 移除上下文信息 **************************(); } ``` 参考资料中没有提到VariablePool有专门的持久化机制,它设计为工作流执行过程中的临时数据存储,用于节点间的数据传递。工作流执行完成后,这些临时变量就会被清空。 持久化操作可能发生在其他层面,比如工作流定义(DSL)的存储、执行结果的记录等,但VariablePool本身主要是内存中的临时数据结构。
点赞
回复 取消回复
Ψ 2026年03月07日 17:58
引用原文:Java 版工作流引擎通过 EngineContextHolder 在整个工作流执行过程中传递和访问执行上下文信息。
@派聪明,这个EngineContextHolder有什么用,传递的是什么上下文,在什么方法里面可以体现
点赞
回复1 取消回复
派聪明 2026年03月07日 17:58
EngineContextHolder是工作流引擎中用于传递和访问执行上下文信息的关键组件,其主要作用和体现如下: ### 作用 1. **跨线程上下文传递**:使用TransmittableThreadLocal(TTL)解决线程池环境下上下文传递问题,确保每个工作流执行过程有独立上下文环境 2. **避免回调链传递**:替代传统的回调链传递方式,简化代码结构 3. **生命周期管理**:在整个工作流执行过程中保持上下文一致性 ### 传递的上下文信息 EngineContext内部封装了工作流执行所需的关键信息: - 流程ID(flowId) - 会话ID(chatId/uuid) - 回调处理器(callback) - 全链路追踪的唯一标识符(sid) - 其他执行所需的关键信息 ### 体现的方法 在WorkflowEngine.execute()方法中可以清晰看到其使用: ```java // 初始化上下文 EngineContextHolder.initContext(workflowDSL.getFlowId(), workflowDSL.getUuid(), workflowCallback); // 执行工作流... executeNode(startNode, variablePool, workflowCallback); // 移除上下文信息 **************************(); ``` 此外,在异步执行场景中也能体现其价值: ```java // 主线程设置上下文 EngineContextHolder.initContext(flowId, chatId, callback); // 异步线程执行(TTL自动复制上下文) AsyncUtil.execute(() -> { // 可以正常获取上下文信息 String chatId = ***********************().getChatId(); }); ``` 这种设计使得工作流在执行过程中,即使在线程池环境下也能保持上下文一致性,无需在调用链中手动传递上下文信息。
点赞
回复 取消回复
Ψ 2026年03月07日 17:56
引用原文:执行过程中产生的结果和状态变化,会通过 StreamCallback 进行实时回传,用于前端展示、日志追踪等。
@派聪明 这是怎么实现的
点赞
回复1 取消回复
派聪明 2026年03月07日 17:57
StreamCallback 实现实时回传是通过观察者模式实现的,具体机制如下: 1. **回调接口设计**:工作流引擎定义了 StreamCallback 接口,包含如 onWorkflowStart、onNodeStart、onNodeEnd、onWorkflowEnd 等方法,用于不同执行阶段的回调**。 2. **回调链传递**:执行引擎 WorkflowEngine 在 execute 方法中创建 WorkflowStreamCallback 对象,该对象实现了 StreamCallback 接口。这个 callback 对象会通过 EngineContextHolder 传递给整个执行过程中的所有节点执行器。 3. **触发时机**:节点执行器在关键执行点主动触发回调: - 节点开始执行时:调用 onNodeStart - 节点执行成功时:调用 onNodeEnd - 工作流开始/结束时:调用 onWorkflowStart/onWorkflowEnd - 异常发生时:调用相应错误回调 4. **数据流转**: ```java // 节点执行器中触发回调 protected void successResponse(NodeState nodeState, NodeRunResult result) { WorkflowMsgCallback callback = nodeState.callback(); callback.onNodeEnd(**********(), aliasName, result); } ``` 5. **前端实时更新**:通过 WebSocket 等技术,后端将回调事件实时推送给前端,前端根据事件类型更新 UI 状态,如节点执行进度、结果展示等。 这种设计实现了执行逻辑与回调逻辑的解耦,使工作流引擎能灵活支持多种回调场景,同时保持核心执行逻辑的简洁性。
1
回复 取消回复
Ψ 2026年03月07日 17:55
引用原文:节点之间不直接相互调用
我看代码里面当前节点执行完后,根据自身状态来决定调用它后面的节点,这不是直接调用吗?@派聪明
点赞
回复1 取消回复
派聪明 2026年03月07日 17:55
"节点之间不直接相互调用"指的是节点之间不直接调用彼此的业务方法,而是通过执行引擎(WorkflowEngine)进行调度。 具体解释: 1. **节点自治性**:每个节点都是独立、自治的执行单元,不知道其他节点的存在。节点只关注自己的输入和输出,不关心数据来自哪个节点或流向哪个节点。 2. **解耦设计**:节点之间不直接调用,避免强耦合。修改一个节点的实现不会影响其他节点。 3. **引擎调度**:当前节点执行完成后,不会自己决定并调用下一个节点。而是: - 将执行结果放入VariablePool - 由WorkflowEngine根据流程定义和节点状态决定下一个要执行的节点 - 执行引擎负责编排节点执行顺序 4. **数据传递方式**:节点之间通过VariablePool传递数据,而不是直接传递对象引用。节点声明自己的输入输出,执行引擎负责数据流转。 这种设计使得系统更加灵活,新增节点类型时只需实现NodeExecutor接口,而无需修改其他节点或执行引擎。
点赞
回复 取消回复
Ψ 2026年03月07日 16:28
引用原文:NodeState 封装了节点执行时所需的信息,包括当前节点对象、变量池以及回调函数。
每个workflow执行线程都有自己的variblePool和EngineContextHolder
点赞
回复 取消回复
Ψ 2026年03月07日 16:27
引用原文:NodeExecutor 接口只定义了一个核心方法 execute,用于触发节点的执行逻辑;同时通过 getNodeType 方法,声明当前执行器所能处理的节点类型。
AbstractNodeExecutor*象类实现了接口的execute方法,同时定义了*象方法doExecute,该*象方**在execute方法中调用。其他子类比如LLM节点实现的是doExecute方法。以后增加新的节点只需要继承AbstractNodeExecutor*象类,并重写doexcute方法即可。
点赞
回复 取消回复
腼腆的白昼 2026年02月27日 09:14
Java版本的工作流是参考python版的实现吗?看了下python版的一些代码,Java有些类是跟python能对应上的,有些实现好像不太一样,比如我看WorkflowEngine这个类,Java版和python的感觉没有呼应上,是有二次改造了吗?
点赞
回复 取消回复

目录