条件流程执行
在流程的简单执行章节中,我们让一条普通的顺序流程从开始节点走向结束节点。那如果是条件流程呢?我们又应该如何处理呢?
流程定义
如上图渲染的流程图,可由以下两种流程定义文件生成。
tests/json/leave_02.json
由决策节点的输出边属性来定义表达式,该表达式返回值为true/false
注:以下json并非全部。
{
"id": "517ef2c7-3486-4992-b554-0f538ab91751",
"type": "snaker:transition",
"sourceNodeId": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
"targetNodeId": "end",
"properties": {
"expr": "f_day<3"
},
"text": {
"value": "请假天数小于3"
}
}
tests/json/leave_03.json
由决策节点的expr属性来定义表达式,该表达式返回值为目标节点名称。
注:以下json并非全部。
{
"id": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
"type": "snaker:decision",
"properties": {
"expr": "'approveBoss' if f_day>=3 else 'end'"
}
}
tests/json/leave_04.json
由决策节点定义的handleClasss属性,实例化决策类,决定下一个节点名称。
注:以下json并非全部,缺少位置信息。
{
"id": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
"type": "snaker:decision",
"properties": {
"handleClass": "tests.flow.LeaveDecisionHandler"
}
}
旧的代码逻辑
新增tests/test_wf_execute.py
共两个方法test_execute_leave_01
和test_execute_leave_02
,两者的执行逻辑都一样,就是解析的流程定义文件不一样。
- 加载配置
- 解析流程定义文件
- 执行流程
from packages.engine.parser.model_parser import ModelParser
class TestWfExecute:
def test_execute_leave_01(self):
# 读取相对路径下的./tests/json/leave.json文件
with open("./tests/json/leave.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
def test_execute_leave_02(self):
# 读取相对路径下的./tests/json/leave_02.json文件
with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
- 执行
execute_leave_01
方法
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_01
结果如下:
model:StartModel,name:start,displayName:开始,time:2024-07-23 21:06:43.441874
model:TaskModel,name:apply,displayName:请假申请,time:2024-07-23 21:06:46.453361
model:TaskModel,name:approveDept,displayName:部门领导审批,time:2024-07-23 21:06:49.456431
model:EndModel,name:end,displayName:结束,time:2024-07-23 21:06:49.456431
- 执行
execute_leave_02
方法
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02
结果如下:
model:StartModel,name:start,displayName:开始,time:2024-07-23 21:08:22.958460
model:TaskModel,name:apply,displayName:请假申请,time:2024-07-23 21:08:25.965849
model:TaskModel,name:approveDept,displayName:部门领导审批,time:2024-07-23 21:08:28.974350
我们会看到,
execute_leave_02
的执行是不完整的,因为我们并没有对决策节点进行处理。下面我们要对决策节点进行处理,使其完整的从开始节点走向结束节点。
决策节点分析
从图中看,我们可以得到如下两条路径:
- 开始->请假申请->部门领导审批->结束
- 开始->请假申请->部门领导审批->公司领导审批->结束
查看流程定义文件leave_02.json
,在节点输出边中,我们会看到如下属性:
{
"expr": "f_day<3"
}
{
"expr": "f_day>=3"
}
查看流程定义文件leave_03.json
,在节点属性中,我们会看到如下属性:
{
"expr": "'approveBoss' if f_day>=3 else 'end'"
}
查看流程定义文件leave_04.json
,在节点属性中,我们会看到如下属性:
{
"handleClass": "tests.flow.LeaveDecisionHandler"
}
那我们在代码上应该如何实现呢?其实思路很简单,分三种情况判断:
如果决策节点定义有表达式属性:
- 从节点属性中获取表达式
- 调用表达式引擎,得到下一个节点的节点名称
- 遍历所有输出边,如果输出边目标节点名称和上面找到的下一个节点名称一致,则设置enabled=true
- 调用输出边的execute方法
如果决策节点定义有决策类字段串属性:
- 从节点属性中获取决策类
- 实例类决策类
- 调用决策类方法,得到下一个节点的节点名称
- 遍历所有输出边,如果输出边目标节点名称和上面找到的下一个节点名称一致,则设置enabled=true
- 调用输出边的execute方法
如果决策节点未定义有表达式属性:
- 从节点的输出边中获取表达式
- 调用表达式引擎,设置输出边的enabled属性
- 调用输出边的execute方法
表达式引擎
这里也没有什么好的python表达式引擎推荐,直接使用python的eval函数即可。当然,eval函数有安全风险,所以可以考虑使用如asteval这样的库,它提供了一个安全的表达式评估环境。下面先进行一些简单的测试。
使用eval函数
- 新建
tests/test_wf_eval.py
文件
import logging
def test_eval():
# 假设 f_day 是一个变量
context = {'f_day': 2}
# 使用 eval 函数计算表达式
expression = "f_day < 3"
result = eval(expression, {}, context)
logging.info(f"f_day < 3:{result}")
expression = "f_day > 3"
result = eval(expression, {}, context)
logging.info(f"f_day > 3:{result}")
expression = "f_day == 3"
result = eval(expression, {}, context)
logging.info(f"f_day == 3:{result}")
- 执行测试命令:
pytest -q tests/test_wf_eval.py::test_eval
- 测试结果如下:
使用asteval
- 安装asteval依赖库
pip3 install asteval
- 修改
tests/test_wf_eval.py
文件,增加test_asteval
方法
import logging
def test_eval():
# 假设 f_day 是一个变量
context = {'f_day': 2}
# 使用 eval 函数计算表达式
expression = "f_day < 3"
result = eval(expression, {}, context)
logging.info(f"f_day < 3:{result}")
expression = "f_day > 3"
result = eval(expression, {}, context)
logging.info(f"f_day > 3:{result}")
expression = "f_day == 3"
result = eval(expression, {}, context)
logging.info(f"f_day == 3:{result}")
def test_asteval():
from asteval import Interpreter
# 创建Interpreter实例
interp = Interpreter()
# 设置变量
interp.symtable['f_day'] = 2
# 定义表达式
expression = "f_day < 3"
# 评估表达式
result = interp(expression)
logging.info(f"f_day < 3:{result}")
expression = "f_day > 3"
result = interp(expression)
logging.info(f"f_day > 3:{result}")
expression = "f_day == 3"
result = interp(expression)
logging.info(f"f_day == 3:{result}")
- 执行测试命令:
pytest -q tests/test_wf_eval.py::test_asteval
- 测试结果如下:
封装成工具类
- 新建
package/engine/util/eval_util.py
from asteval import Interpreter
class ExprUtil(object):
@staticmethod
def eval(expr: str, context: dict):
# 创建Interpreter实例
interp = Interpreter()
interp.symtable.update(context)
return interp(expr)
- 修改
tests/test_wf_eval.py
文件,增加test_eval_util
方法
import logging
def test_eval():
# 假设 f_day 是一个变量
context = {'f_day': 2}
# 使用 eval 函数计算表达式
expression = "f_day < 3"
result = eval(expression, {}, context)
logging.info(f"f_day < 3:{result}")
expression = "f_day > 3"
result = eval(expression, {}, context)
logging.info(f"f_day > 3:{result}")
expression = "f_day == 3"
result = eval(expression, {}, context)
logging.info(f"f_day == 3:{result}")
def test_asteval():
from asteval import Interpreter
# 创建Interpreter实例
interp = Interpreter()
# 设置变量
interp.symtable['f_day'] = 2
# 定义表达式
expression = "f_day < 3"
# 评估表达式
result = interp(expression)
logging.info(f"f_day < 3:{result}")
expression = "f_day > 3"
result = interp(expression)
logging.info(f"f_day > 3:{result}")
expression = "f_day == 3"
result = interp(expression)
logging.info(f"f_day == 3:{result}")
def test_eval_util():
from packages.engine.util.expr_util import ExprUtil
context = {'f_day': 2}
result = ExprUtil.eval("f_day < 3", context)
logging.info(f"f_day < 3:{result}")
result = ExprUtil.eval("f_day > 3", context)
logging.info(f"f_day > 3:{result}")
result = ExprUtil.eval("f_day == 3", context)
logging.info(f"f_day == 3:{result}")
- 执行测试命令:
pytest -q tests/test_wf_eval.py::test_eval_util
- 测试结果如下:
代码实现
!. 修改model/decision_model.py
文件的执行逻辑
import importlib
import logging
from packages.engine.model import NodeModel
from packages.engine.util.expr_util import ExprUtil
class DecisionModel(NodeModel):
"""
决策节点模型
"""
expr = None # 决策表达式
handleClass = None # 决策处理类
def exec(self, execution):
"""
执行节点
@param execution: 执行对象参数
"""
# 执行决策节点自定义执行逻辑
isFound = False
nextNodeName = None
args = execution.get('args', {})
if not (self.expr is None or self.expr == ''):
nextNodeName = ExprUtil.eval(self.expr, args)
elif not (self.handleClass is None or self.handleClass == ''):
# 执行决策处理类
module_path = f'{self.handleClass}'
logging.info(f'load handleClass:{self.handleClass}')
try:
model_module = importlib.import_module(module_path)
handleClassName = self.handleClass.split('.')[-1]
HandleClass = getattr(model_module, handleClassName)
handleInstance = HandleClass()
nextNodeName = handleInstance.decide(execution)
logging.info(f'{self.handleClass} decide result: {nextNodeName}')
except (ImportError, AttributeError) as e:
logging.error(f"Failed to load handleClass {self.handleClass}: {e}")
for transition in self.outputs:
if not (transition.expr is None and transition.expr == '') and ExprUtil.eval(transition.expr, args):
# 决策节点输出边存在表达式,则使用输出边的表达式,true则执行
isFound = True
transition.enabled = True
transition.execute(execution)
elif transition.to == nextNodeName:
# 找到对应的下一个节点
isFound = True
transition.enabled = True
transition.execute(execution)
if not isFound:
# 找不到下一个可执行路线
logging.error(f"{self.name} 找不到下一个可执行路线")
# raise Exception(f'{self.name} 找不到下一个可执行路线')
- 修复
abstract_node_parser.py
解析逻辑bug
旧的expr
从nodeEdge
中获取,改为从nodeEdge
的properties
中获取
transitionModel.expr = nodeEdge.get(NodeParser.EXPR_KEY)
修改为
trProps = nodeEdge.get(NodeParser.PROPERTIES_KEY, {})
transitionModel.expr = trProps.get(NodeParser.EXPR_KEY)
- 单元测试类改造
tests/test_wf_execute.py
from packages.engine.parser.model_parser import ModelParser
class TestWfExecute:
def test_execute_leave_01(self):
# 读取相对路径下的./tests/json/leave.json文件
with open("./tests/json/leave.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
def test_execute_leave_02(self):
# 读取相对路径下的./tests/json/leave_02.json文件
with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
def test_execute_leave_02_1(self):
# 读取相对路径下的./tests/json/leave_02.json文件
with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({
"args": {
"f_day": 1
}
})
def test_execute_leave_02_2(self):
# 读取相对路径下的./tests/json/leave_02.json文件
with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({
"args": {
"f_day": 3
}
})
def test_execute_leave_03_1(self):
# 读取相对路径下的./tests/json/leave_03.json文件
with open("./tests/json/leave_03.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({
"args":{
"f_day": 1
}
})
def test_execute_leave_03_2(self):
# 读取相对路径下的./tests/json/leave_03.json文件
with open("./tests/json/leave_03.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({
"args":{
"f_day": 3
}
})
def test_execute_leave_04(self):
# 读取相对路径下的./tests/json/leave_04.json文件
with open("./tests/json/leave_04.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
测试验证
- 当执行
execute_leave_02_1
方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02_1
- 流程定义文件:
leave_02.json
f_day=1
结果如下:
model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:EndModel,name:end,displayName:结束
- 当执行
execute_leave_02_2
方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02_2
- 流程定义文件:
leave_02.json
f_day=3
结果如下:
model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:TaskModel,name:approveBoss,displayName:公司领导审批
model:EndModel,name:end,displayName:结束
- 当执行
executeLeave_03_1
方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_03_1
- 流程定义文件:
leave_03.json
f_day=1
结果如下:
model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:EndModel,name:end,displayName:结束
- 当执行
executeLeave_03_2
方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_03_2
- 流程定义文件:
leave_03.json
f_day=3
结果如下
model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:TaskModel,name:approveBoss,displayName:公司领导审批
model:EndModel,name:end,displayName:结束
- 当执行
execute_leave_04
方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_04
- 流程定义文件:
leave_04.json
handleClass
:tests.flow.LeaveDecisionHandler
import random
from packages.engine.decision_handler import DecisionHandler
class LeaveDecisionHandler(DecisionHandler):
def decide(self, execution):
# 生成1~10的随机数
random_number = random.randint(1, 10)
# 1~10随机,整除2则返回approveBoss,否则返回end
if random_number % 2 == 0:
return "approveBoss"
else:
return "end"
存在两种执行结果:
- 当
random_number
为偶数时,执行approveBoss
节点 - 当
random_number
为奇数时,执行end
节点