如何设计组件化流程引擎?

本文简单介绍下流程引擎以及实现一个最基础功能的组件化的流程引擎。

核心问题

什么是流程编排的核心要解决的问题?

流程可编排是流程引擎的核心,是解决服务流程的核心;

流程定义清晰是流程引擎的关键,只有清晰的流程才能保证编排的可维护性;

流程易于扩展是流程引擎的核心竞争力,只有更好的扩展能力才能让流程的编排效率提升;

绕不开的BPM规范

BPM(Business Process Manager、业务流程管理)主要用于管理复杂业务关系以及业务流程。

BPMN(Business Process Modeling Notation)是BPM的一种建模语言,BMPN2.0是最新版本。

工作流引擎以Activiti为代表,使用BPMN2.0标准定义进行编排管理;

功能完善、相对重量级的产品如下:

功能基本可用,相对轻量级的产品如下:

BMPN的可视化

流程编排的可视化是降低编排复杂度的关键,除了传统的XML的编排方式,可视化的编排方式更加易于维护。

这里推荐使用开源的bmpmn-js来实现页面可视化。

https://github.com/bpmn-io/bpmn-js

如果需要对其功能点和扩展方式进行进一步的了解,可以在其使用实例中了解扩展的写法。

https://github.com/bpmn-io/bpmn-js-examples

如何设计组件化流程引擎?

目标: 我们到底要解决什么问题?

传统的工作流引擎常用于企业内部审批流、工作流等方面,注重业务流程的编排和扩展,

而此次要设计的流程引擎是关注代码逻辑流程的编排,

因此,不考虑使用繁琐的Activiti流程,而采用类似liteflow的方式来实现基础的流程编排功能;

类图

流程节点

流程节点是编排的最小单元。

1
2
3
4
/** 节点定义 **/
public interface IFlowNode<C extends IFlowContext> {
void exec(C context);
}

节点能力

节点能力适用于扩展流程节点的功能,例如节点重试,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/** 重试能力 **/
public interface IFlowRetryCapacity {
int retryTimes();
}

/** 重试节点定义 **/
public interface IFlowRetryNode<C extends IFlowContext> extends IFlowRetryCapacity, IFlowNode<C> {
}

/** 重试定义 **/
public interface IFlowRetry<C extends IFlowContext> {
void retry(IFlowRetryNode retryNode, C context);
}

/** 重试策略 **/
public abstract class AbstractFlowRetry implements IFlowRetry {

abstract public void doRetry(IFlowRetryNode retryNode, IFlowContext flowContext);

@Override
public void retry(IFlowRetryNode retryNode, IFlowContext flowContext) {
if (retryNode.retryTimes() <= 0) {
return;
}
log.info("FlowRetry start uniqueKey={} retry={}", flowContext.getUniqueKey(), retryNode.getClass().getSimpleName());
doRetry(retryNode, flowContext);
}
}

/** 基于spring的重试策略 **/
public class SpringFlowRetry extends AbstractFlowRetry implements IFlowRetry {

private ConcurrentHashMap<Integer, RetryTemplate> retryMap = new ConcurrentHashMap<>();

@Override
public void doRetry(IFlowRetryNode retryNode, IFlowContext flowContext) {
if (retryNode.retryTimes() <= 0) {
return;
}

if (!retryMap.contains(retryNode.retryTimes())) {
synchronized (this) {
RetryTemplate retryTemplate = RetryTemplate.builder()
.maxAttempts(retryNode.retryTimes())
.retryOn(Throwable.class)
.build();
retryMap.put(retryNode.retryTimes(), retryTemplate);
}
}

retryMap.get(retryNode.retryTimes()).execute((context) -> {
log.info("SpringFlowRetry retry uniqueKey={} retryNode={} retryCount={}", flowContext.getUniqueKey(),
retryNode.getClass().getSimpleName(), context.getRetryCount());
retryNode.exec(flowContext);
return true;
});
}

}

流程链

流程链用于存储流程节点,也就是编排的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FlowChain<C extends IFlowContext> {

// 流程节点的定义
private List<Class<? extends IFlowNode>> flowNodeDefinition = new ArrayList<>();
// 用于节点重试
private IFlowRetry<C> flowRetry = new SpringFlowRetry();

public FlowChain add(Class<? extends IFlowNode> definition) {
this.flowNodeDefinition.add(definition);
return this;
}

public boolean isNull() {
return CollectionUtils.isEmpty(flowNodeDefinition);
}

public void forEach(Function<Class<? extends IFlowNode>, List<IFlowNode>> function) {
if (isNull()) {
return;
}
flowNodeDefinition.forEach(function::apply);
}

public List<Class<? extends IFlowNode>> getFlowNodes() {
return flowNodeDefinition;
}

public IFlowRetry<C> getFlowRetry() {
return flowRetry;
}
}

流程定义

流程是由流程链以及执行方法组成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/** 流程定义 **/
public interface IFlow<C extends IFlowContext> {
FlowChain getFlowChain();
/**
* 执行流程
* @param context
*/
void execute(C context);
}

/** 流程模板 **/
public abstract class AbstractFlowTemplate<C extends IFlowContext> extends AbstractFlowScanner implements IFlow<C> {

private FlowNodeSelector selector = new FlowNodeSelector(
getDefaultImplements(),
getMutexExtensionImplements(),
getShareExtensionImplements());

@Override
public void execute(C context) {
FlowChain<C> flowChain = getFlowChain();
if (flowChain == null || flowChain.isNull()) {
return;
}
// 流程链的顺序执行
for (Object flowNode : flowChain.getFlowNodes()) {
// 流程的过滤,通过selector来选择当前上下文需要命中的流程节点
List<IFlowNode> flowNodes = selector.select((Class<? extends IFlowNode>) flowNode, context);
if (CollectionUtils.isNotEmpty(flowNodes)) {
flowNodes.forEach(bean -> {
// 支持流程节点的重试
if (bean instanceof IFlowRetryNode && ((IFlowRetryNode) bean).retryTimes() > 0) {
flowChain.getFlowRetry().retry((IFlowRetryNode) bean, context);
} else {
bean.exec(context);
}
});
}
}
}

@Override
protected List<Class<? extends IFlowNode>> getFlowNodes() {
return getFlowChain() != null ? getFlowChain().getFlowNodes() : null;
}
}

/** 流程扫描 **/
public abstract class AbstractFlowScanner implements ApplicationContextAware, InitializingBean {

private ApplicationContext applicationContext;
private List<Class<? extends IFlowNode>> flowNodes;
// 默认流程节点
private Map<Class<? extends IFlowNode> , List<IFlowNode>> defaultImplements = new HashMap<>();
// 互斥流程节点
private Map<Class<? extends IFlowNode> , List<IFlowNode>> mutexExtensionImplements = new HashMap<>();
// 共享流程节点
private Map<Class<? extends IFlowNode> , List<IFlowNode>> shareExtensionImplements = new HashMap<>();
// 支持优先级
private Map<Class<? extends IFlowNode> , PriorityQueue<PriorityFlowNode>> sharePriorityExtensionImplements = new HashMap<>();
private Comparator<PriorityFlowNode> comparator = (o1, o2) -> o2.priority - o1.priority;

protected abstract List<Class<? extends IFlowNode>> getFlowNodes();

public Map<Class<? extends IFlowNode>, List<IFlowNode>> getDefaultImplements() {
return defaultImplements;
}

public Map<Class<? extends IFlowNode>, List<IFlowNode>> getMutexExtensionImplements() {
return mutexExtensionImplements;
}

public Map<Class<? extends IFlowNode>, List<IFlowNode>> getShareExtensionImplements() {
return shareExtensionImplements;
}

private synchronized void initFlowNodeImplements() {
flowNodes = getFlowNodes();
if (CollectionUtils.isEmpty(flowNodes)) {
return;
}
Map<String, IFlowNode> beans = applicationContext.getBeansOfType(IFlowNode.class);
if (MapUtils.isEmpty(beans)) {
return;
}
beans.values().stream().forEach(bean -> {
for (Class<? extends IFlowNode> flowNode : flowNodes) {
if (flowNode.isInstance(bean)) {
if (bean.getClass().getAnnotation(DefaultFlowNode.class) != null) {
registerFlowNode(defaultImplements, bean, flowNode);
log.info("AbstractFlowBuilder register default={}", bean.getClass().getSimpleName());
} else if (bean.getClass().getAnnotation(ExtensionFlowNode.class) != null) {
ExtensionFlowNode extensionFlowNode = bean.getClass().getAnnotation(ExtensionFlowNode.class);
// registerFlowNode(FlowExtensionType.MUTEX.equals(extensionFlowNode.type())
// ? mutexExtensionImplements : shareExtensionImplements, bean, flowNode);
if (FlowExtensionType.MUTEX.equals(extensionFlowNode.type())) {
registerFlowNode(mutexExtensionImplements, bean, flowNode);
} else {
registerPriorityFlowNode(bean, flowNode, extensionFlowNode.priority());
}
log.info("AbstractFlowBuilder register extension={}", bean.getClass().getSimpleName());
} else {
log.info("AbstractFlowBuilder register ignore {}", bean.getClass().getSimpleName());
}
}
}
});

refreshShareExtensionImplements();
}

private void refreshShareExtensionImplements() {
// priority 转 list
for (Class<? extends IFlowNode> flowNode : sharePriorityExtensionImplements.keySet()) {
PriorityQueue<PriorityFlowNode> priorityQueue = sharePriorityExtensionImplements.get(flowNode);
if (priorityQueue != null) {
List<IFlowNode> list = new ArrayList<>();
while (!priorityQueue.isEmpty()) {
list.add(priorityQueue.poll().getFlowNode());
}
shareExtensionImplements.put(flowNode, list);
}
}
}

private void registerFlowNode(Map<Class<? extends IFlowNode> , List<IFlowNode>> store, IFlowNode bean, Class<? extends IFlowNode> definition) {
if (!store.containsKey(definition)) {
store.put(definition, new ArrayList<>());
}
store.get(definition).add(bean);
}

private void registerPriorityFlowNode(IFlowNode bean, Class<? extends IFlowNode> definition, int priority) {
if (!sharePriorityExtensionImplements.containsKey(definition)) {
sharePriorityExtensionImplements.put(definition, new PriorityQueue<>(comparator));
}
sharePriorityExtensionImplements.get(definition).add(new PriorityFlowNode(bean, priority));
}

@Override
public void afterPropertiesSet() throws Exception {
initFlowNodeImplements();
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Data
private static class PriorityFlowNode {
private IFlowNode flowNode;
private int priority = 1;

public PriorityFlowNode(IFlowNode flowNode, int priority) {
this.flowNode = flowNode;
this.priority = priority;
}
}

}

流程扩展

在流程定义中,应该可以发现流程节点会分为默认节点和扩展节点,其中扩展节点又可以分为互斥节点和共享节点;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/** 扩展类型 **/
ffpublic enum FlowExtensionType {
MUTEX, // 互斥、不同的扩展会相互隔离
SHARE, // 共享、不同的扩展会相互叠加,通过指定不同的优先级来控制先后的覆盖顺序
}

/** 默认节点 **/
@Component
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DefaultFlowNode {
}

/** 扩展节点 **/
@Component
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ExtensionFlowNode {
FlowExtensionType type() default FlowExtensionType.SHARE;
int priority() default 1; // 值越大优先级越高
}

/** 扩展匹配 **/
public interface IMatcher<C extends IFlowContext> {
boolean match(C context);
}
待完善的功能点
  • 流程分叉(流程编排过程中存在if-else的问题)
  • 流程持久化
  • 流程重试
  • xml编排
  • bpmn可视化编排