工作流在执行过程中,很难保证每一次都能按照预期顺利跑完。现实情况可能是,模型调用超时,代码本身存在 bug。所以我们需要提前考虑异常情况下的处理,未雨绸缪(聪明如我)。
PaiFlow 的异常处理方案,首先考虑的是容错性。比如网络波动、短暂的服务不可用等,通过重试机制,让流程有机会自我恢复,而不是一次失败就终止。
其次是可控性。异常处理必须要引入超时控制,防止某个节点卡死,拖垮整个工作流。然后还要灵活,不同业务场景对异常的容忍度不一样,有的要立即失败,有的要给兜底结果继续向下执行,还有的需要异常分支。
最后要有记录,无论是日志、回调事件,还是最终的执行结果,都应该让用户和开发者清楚地知道哪里出了问题,系统做了什么决策,流程最终是如何结束的。
典型的既要又要😄。
理想状态下,用户构建出的工作流应该是一个完整的 DAG 图,并且每个节点都能按部就班地执行。
但如果用户拖拽出来的工作流没有 End 节点、存在自环、节点参数未填写等,这些都可能导致 DSL 构建失败;又或者执行过程中,因为变量过多/内存泄露导致 OOM;还有一些节点在执行的时候,依赖的服务挂了、模型超时等原因失败;再或者工作流执行成功了,但在给客户端回传消息的过程中,网络断了、连接挂了。。。。。。
对于业务场景下的异常,比如说工作流不完整、缺少参数、节点逻辑配置错误等,应该明确告诉用户哪里错了。对于程序异常,基本上都是因为系统本身存在 bug、第三方服务不可用、代码未覆盖边界等,这些异常一方面需要报警,另一方面要尽可能保障主流程可恢复。
对于 PaiFlow,第一期我们聚焦这几个环节:
-
节点执行超时,避免某些插件卡死;
-
节点执行失败时自动进行重试;
-
节点异常后支持三种策略:忽略异常继续、终止流程、跳转到指定分支。
因此接下来,我们主要讲这几个组件:
-
RetryConfig:重试配置类,定义超时时间和重试策略
-
AsyncUtil:异步工具类,提供超时执行支持
-
ErrorStrategyEnum:错误处理策略枚举,定义不同的异常处理方式
-
AbstractNodeExecutor:节点执行器的基类,实现重试和超时控制逻辑
1.超时控制
超时控制,本质上是一种止损机制。它要解决的问题并不复杂:如果一个 Node 在预期时间内没有返回结果,系统就必须主动介入,而不是无限等待。
如果缺乏超时控制,就会出现这几个典型的问题:
-
某些插件节点依赖外部服务,网络抖动导致线程长时间阻塞;
-
模型调用异常,没有明确的失败信号,也始终没有返回;
-
异步任务堆积,线程池被慢任务消耗殆尽;
-
用户端只能看到“还在执行”,却不知道已经进入了异常状态。
1.1 RetryConfig
在 PaiFlow 中,所有与超时和重试相关的行为,都被封装到了一个统一的配置类中:RetryConfig。
-
shouldRetry: 是否允许重试
-
maxRetries: 最大重试次数,避免无限重试
-
timeout: 节点的超时时间
-
errorStrategy: 节点发生异常时采用的错误处理策略
-
customOutput:在特定错误策略下,用于构建后续节点输入的兜底数据。
注:对于重试策略,还有一个关键属性,重试间隔时间,我们采用的是立即重试;但是在真实的商业类项目中,这个参数非常有必要;因为一次执行失败,立即重试的失败可能性是很高的,不妨过一段时间再试试;立即重试也会给被调用方带来瞬时的流量峰值,使情况更加恶劣;有兴趣的球友可以扩展一下。
1.2 AsyncUtil
AsyncUtil 是一个异步执行工具类,基于Google Guava的SimpleTimeLimiter实现:
@Slf4j
public class AsyncUtil {
private static final SimpleTimeLimiter simpleTimeLimiter;
static {
ExecutorService executorService = ExecutorBuilder.create()
.setCorePoolSize(10)
.setMaxPoolSize(50)
.setKeepAliveTime(0, TimeUnit.SECONDS)
.setWorkQueue(new SynchronousQueue())
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.setThreadFactory(THREAD_FACTORY)
.buildFinalizable();
// 包装一下线程池,避免出现上下文复用场景
executorService = TtlExecutors.getTtlExecutorService(executorService);
simpleTimeLimiter = SimpleTimeLimiter.create(executorService);
}
/**
* 带超时时间的方法调用执行,当执行时间超过给定的时间,则返回一个超时异常,内部的任务还是正常执行
* 若超时时间内执行完毕,则直接返回
*
* @param time
* @param unit
* @param call
* @param
* @return
*/
public static T callWithTimeLimit(long time,...真诚点赞 诚不我欺
回复