条件流程执行

在流程的简单执行章节中,我们让一条普通的顺序流程从开始节点走向结束节点。那如果是条件流程呢?我们又应该如何处理呢?

流程定义

流程定义

如上图渲染的流程图,可由以下两种流程定义文件生成。

tests/json/leave_02.json

由决策节点的输出边属性来定义表达式,该表达式返回值为true/false

注:以下json并非全部。

{
  "id": "517ef2c7-3486-4992-b554-0f538ab91751",
  "type": "snaker:transition",
  "sourceNodeId": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
  "targetNodeId": "end",
  "properties": {
    "expr": "f_day<3"
  },
  "text": {
    "value": "请假天数小于3"
  }
}

tests/json/leave_03.json

由决策节点的expr属性来定义表达式,该表达式返回值为目标节点名称。

注:以下json并非全部。

{
      "id": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
      "type": "snaker:decision",
      "properties": {
        "expr": "'approveBoss' if f_day>=3 else 'end'"
      }
    }

tests/json/leave_04.json

由决策节点定义的handleClasss属性,实例化决策类,决定下一个节点名称。

注:以下json并非全部,缺少位置信息。

{
      "id": "2c75eebf-5baf-4cd0-a7b3-05466be13634",
      "type": "snaker:decision",
      "properties": {
        "handleClass": "tests.flow.LeaveDecisionHandler"
      }
    }

旧的代码逻辑

新增tests/test_wf_execute.py

共两个方法test_execute_leave_01test_execute_leave_02,两者的执行逻辑都一样,就是解析的流程定义文件不一样。

  • 加载配置
  • 解析流程定义文件
  • 执行流程
from packages.engine.parser.model_parser import ModelParser


class TestWfExecute:
    def test_execute_leave_01(self):
        # 读取相对路径下的./tests/json/leave.json文件
        with open("./tests/json/leave.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({})
    def test_execute_leave_02(self):
        # 读取相对路径下的./tests/json/leave_02.json文件
        with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({})
  • 执行execute_leave_01方法
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_01

结果如下:

model:StartModel,name:start,displayName:开始,time:2024-07-23 21:06:43.441874
model:TaskModel,name:apply,displayName:请假申请,time:2024-07-23 21:06:46.453361
model:TaskModel,name:approveDept,displayName:部门领导审批,time:2024-07-23 21:06:49.456431
model:EndModel,name:end,displayName:结束,time:2024-07-23 21:06:49.456431

执行结果一

  • 执行execute_leave_02方法
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02

结果如下:

model:StartModel,name:start,displayName:开始,time:2024-07-23 21:08:22.958460
model:TaskModel,name:apply,displayName:请假申请,time:2024-07-23 21:08:25.965849
model:TaskModel,name:approveDept,displayName:部门领导审批,time:2024-07-23 21:08:28.974350

执行结果二 我们会看到,execute_leave_02的执行是不完整的,因为我们并没有对决策节点进行处理。下面我们要对决策节点进行处理,使其完整的从开始节点走向结束节点。

决策节点分析

从图中看,我们可以得到如下两条路径:

  • 开始->请假申请->部门领导审批->结束
  • 开始->请假申请->部门领导审批->公司领导审批->结束

查看流程定义文件leave_02.json,在节点输出边中,我们会看到如下属性:

{
  "expr": "f_day<3"
}
{
  "expr": "f_day>=3"
}

查看流程定义文件leave_03.json,在节点属性中,我们会看到如下属性:

{
  "expr": "'approveBoss' if f_day>=3 else 'end'"
}

查看流程定义文件leave_04.json,在节点属性中,我们会看到如下属性:

{
  "handleClass": "tests.flow.LeaveDecisionHandler"
}

那我们在代码上应该如何实现呢?其实思路很简单,分三种情况判断:

如果决策节点定义有表达式属性:

  • 从节点属性中获取表达式
  • 调用表达式引擎,得到下一个节点的节点名称
  • 遍历所有输出边,如果输出边目标节点名称和上面找到的下一个节点名称一致,则设置enabled=true
  • 调用输出边的execute方法

如果决策节点定义有决策类字段串属性:

  • 从节点属性中获取决策类
  • 实例类决策类
  • 调用决策类方法,得到下一个节点的节点名称
  • 遍历所有输出边,如果输出边目标节点名称和上面找到的下一个节点名称一致,则设置enabled=true
  • 调用输出边的execute方法

如果决策节点未定义有表达式属性:

  • 从节点的输出边中获取表达式
  • 调用表达式引擎,设置输出边的enabled属性
  • 调用输出边的execute方法

表达式引擎

这里也没有什么好的python表达式引擎推荐,直接使用python的eval函数即可。当然,eval函数有安全风险,所以可以考虑使用如asteval这样的库,它提供了一个安全的表达式评估环境。下面先进行一些简单的测试。

使用eval函数

  1. 新建tests/test_wf_eval.py文件
import logging


def test_eval():
    # 假设 f_day 是一个变量
    context = {'f_day': 2}

    # 使用 eval 函数计算表达式
    expression = "f_day < 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day < 3:{result}")

    expression = "f_day > 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day > 3:{result}")
    
    expression = "f_day == 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day == 3:{result}")

  1. 执行测试命令:
pytest -q tests/test_wf_eval.py::test_eval
  1. 测试结果如下: 测试结果

使用asteval

  1. 安装asteval依赖库
pip3 install asteval
  1. 修改tests/test_wf_eval.py文件,增加test_asteval方法
import logging


def test_eval():
    # 假设 f_day 是一个变量
    context = {'f_day': 2}

    # 使用 eval 函数计算表达式
    expression = "f_day < 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day < 3:{result}")

    expression = "f_day > 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day > 3:{result}")

    expression = "f_day == 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day == 3:{result}")

def test_asteval():
    from asteval import Interpreter
    # 创建Interpreter实例
    interp = Interpreter()

    # 设置变量
    interp.symtable['f_day'] = 2
    # 定义表达式
    expression = "f_day < 3"
    # 评估表达式
    result = interp(expression)
    logging.info(f"f_day < 3:{result}")

    expression = "f_day > 3"
    result = interp(expression)
    logging.info(f"f_day > 3:{result}")

    expression = "f_day == 3"
    result = interp(expression)
    logging.info(f"f_day == 3:{result}")
  1. 执行测试命令:
pytest -q tests/test_wf_eval.py::test_asteval
  1. 测试结果如下: 测试结果

封装成工具类

  1. 新建package/engine/util/eval_util.py

from asteval import Interpreter


class ExprUtil(object):
    
    @staticmethod
    def eval(expr: str, context: dict):
        # 创建Interpreter实例
        interp = Interpreter()
        interp.symtable.update(context)
        return interp(expr)
  1. 修改tests/test_wf_eval.py文件,增加test_eval_util方法
import logging


def test_eval():
    # 假设 f_day 是一个变量
    context = {'f_day': 2}

    # 使用 eval 函数计算表达式
    expression = "f_day < 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day < 3:{result}")

    expression = "f_day > 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day > 3:{result}")

    expression = "f_day == 3"
    result = eval(expression, {}, context)
    logging.info(f"f_day == 3:{result}")

def test_asteval():
    from asteval import Interpreter
    # 创建Interpreter实例
    interp = Interpreter()

    # 设置变量
    interp.symtable['f_day'] = 2
    # 定义表达式
    expression = "f_day < 3"
    # 评估表达式
    result = interp(expression)
    logging.info(f"f_day < 3:{result}")

    expression = "f_day > 3"
    result = interp(expression)
    logging.info(f"f_day > 3:{result}")

    expression = "f_day == 3"
    result = interp(expression)
    logging.info(f"f_day == 3:{result}")

def test_eval_util():
    from packages.engine.util.expr_util import ExprUtil
    context = {'f_day': 2}
    result = ExprUtil.eval("f_day < 3", context)
    logging.info(f"f_day < 3:{result}")
    result = ExprUtil.eval("f_day > 3", context)
    logging.info(f"f_day > 3:{result}")
    result = ExprUtil.eval("f_day == 3", context)
    logging.info(f"f_day == 3:{result}")
  1. 执行测试命令:
pytest -q tests/test_wf_eval.py::test_eval_util
  1. 测试结果如下: 测试结果

代码实现

!. 修改model/decision_model.py文件的执行逻辑


import importlib
import logging
from packages.engine.model import NodeModel
from packages.engine.util.expr_util import ExprUtil


class DecisionModel(NodeModel):
    """
    决策节点模型
    """
    expr = None # 决策表达式
    handleClass = None # 决策处理类
    def exec(self, execution):
        """
        执行节点
        @param execution: 执行对象参数
        """
        # 执行决策节点自定义执行逻辑
        isFound = False
        nextNodeName = None
        args = execution.get('args', {})
        if not (self.expr is None or self.expr == ''):
            nextNodeName = ExprUtil.eval(self.expr, args)
        elif not (self.handleClass is None or self.handleClass == ''):
            # 执行决策处理类
            module_path = f'{self.handleClass}'
            logging.info(f'load handleClass:{self.handleClass}')
            try:
                model_module = importlib.import_module(module_path)
                handleClassName = self.handleClass.split('.')[-1]
                HandleClass = getattr(model_module, handleClassName)
                handleInstance = HandleClass()
                nextNodeName = handleInstance.decide(execution) 
                logging.info(f'{self.handleClass} decide result: {nextNodeName}')
            except (ImportError, AttributeError) as e:
                logging.error(f"Failed to load handleClass {self.handleClass}: {e}")
        for transition in self.outputs:
            if not (transition.expr is None and transition.expr == '') and ExprUtil.eval(transition.expr, args):
                # 决策节点输出边存在表达式,则使用输出边的表达式,true则执行
                isFound = True
                transition.enabled = True
                transition.execute(execution)
            elif transition.to == nextNodeName:
                # 找到对应的下一个节点
                isFound = True
                transition.enabled = True
                transition.execute(execution)
        if not isFound:
            # 找不到下一个可执行路线
            logging.error(f"{self.name} 找不到下一个可执行路线")
            # raise Exception(f'{self.name} 找不到下一个可执行路线')
  1. 修复abstract_node_parser.py解析逻辑bug

旧的exprnodeEdge中获取,改为从nodeEdgeproperties中获取

transitionModel.expr = nodeEdge.get(NodeParser.EXPR_KEY)

修改为

trProps = nodeEdge.get(NodeParser.PROPERTIES_KEY, {})
transitionModel.expr = trProps.get(NodeParser.EXPR_KEY)
  1. 单元测试类改造tests/test_wf_execute.py
from packages.engine.parser.model_parser import ModelParser


class TestWfExecute:
    def test_execute_leave_01(self):
        # 读取相对路径下的./tests/json/leave.json文件
        with open("./tests/json/leave.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({})
    def test_execute_leave_02(self):
        # 读取相对路径下的./tests/json/leave_02.json文件
        with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({})
    
    def test_execute_leave_02_1(self):
        # 读取相对路径下的./tests/json/leave_02.json文件
        with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({
                "args": {
                    "f_day": 1
                }
            })
    def test_execute_leave_02_2(self):
        # 读取相对路径下的./tests/json/leave_02.json文件
        with open("./tests/json/leave_02.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({
                "args": {
                    "f_day": 3
                }
            })
    def test_execute_leave_03_1(self):
        # 读取相对路径下的./tests/json/leave_03.json文件
        with open("./tests/json/leave_03.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({
                "args":{
                    "f_day": 1
                }
            })
    def test_execute_leave_03_2(self):
        # 读取相对路径下的./tests/json/leave_03.json文件
        with open("./tests/json/leave_03.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({
                "args":{
                    "f_day": 3
                }
            })
    def test_execute_leave_04(self):
        # 读取相对路径下的./tests/json/leave_04.json文件
        with open("./tests/json/leave_04.json", "r", encoding='utf-8') as f:
            processModel = ModelParser.parse(f)
            processModel.get_start().execute({})

测试验证

  1. 当执行execute_leave_02_1方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02_1
  • 流程定义文件:leave_02.json
  • f_day=1

结果如下:

model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:EndModel,name:end,displayName:结束

执行结果

  1. 当执行execute_leave_02_2方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_02_2
  • 流程定义文件:leave_02.json
  • f_day=3

结果如下:

model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:TaskModel,name:approveBoss,displayName:公司领导审批
model:EndModel,name:end,displayName:结束

执行结果

  1. 当执行executeLeave_03_1方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_03_1
  • 流程定义文件:leave_03.json
  • f_day=1

结果如下:

model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:EndModel,name:end,displayName:结束

执行结果

  1. 当执行executeLeave_03_2方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_03_2
  • 流程定义文件:leave_03.json
  • f_day=3

结果如下

model:StartModel,name:start,displayName:开始
model:TaskModel,name:apply,displayName:请假申请
model:TaskModel,name:approveDept,displayName:部门领导审批
model:TaskModel,name:approveBoss,displayName:公司领导审批
model:EndModel,name:end,displayName:结束

执行结果

  1. 当执行execute_leave_04方法时
pytest -q tests/test_wf_execute.py::TestWfExecute::test_execute_leave_04
  • 流程定义文件:leave_04.json
  • handleClass: tests.flow.LeaveDecisionHandler
import random
from packages.engine.decision_handler import DecisionHandler


class LeaveDecisionHandler(DecisionHandler):
    def decide(self, execution):
        # 生成1~10的随机数
        random_number = random.randint(1, 10)
        # 1~10随机,整除2则返回approveBoss,否则返回end
        if random_number % 2 == 0:
            return "approveBoss"
        else:
            return "end"

存在两种执行结果:

  • random_number为偶数时,执行approveBoss节点 执行结果1
  • random_number为奇数时,执行end节点 执行结果2

相关源码

工作流引擎核心设计·条件流程执行open in new window