解析流程定义文件
在上一篇我们手动构建了一个流程对象并简单打印执行,其构建流程对象的方式并不是很友好。为了更方便的构建流程对象,我们采用全新的方式,即解析基础篇提到的流程定义文件,并将其转成流程模型。 以下是要解析的样例文件:
tests/json/leave.json
{
"name": "leave",
"displayName": "请假",
"instanceUrl": "leaveForm",
"nodes": [
{
"id": "start",
"type": "snaker:start",
"x": 340,
"y": 160,
"properties": {
"width": "120",
"height": "80"
},
"text": {
"x": 340,
"y": 200,
"value": "开始"
}
},
{
"id": "apply",
"type": "snaker:task",
"x": 520,
"y": 160,
"properties": {
"assignee": "approve.operator",
"taskType": "Major",
"performType": "ANY",
"autoExecute": "N",
"width": "120",
"height": "80",
"field": {
"userKey": "1"
}
},
"text": {
"x": 520,
"y": 160,
"value": "请假申请"
}
},
{
"id": "approveDept",
"type": "snaker:task",
"x": 740,
"y": 160,
"properties": {
"assignmentHandler": "com.mldong.config.FlowAssignmentHandler",
"taskType": "Major",
"performType": "ANY",
"autoExecute": "N",
"width": "120",
"height": "80"
},
"text": {
"x": 740,
"y": 160,
"value": "部门领导审批"
}
},
{
"id": "end",
"type": "snaker:end",
"x": 980,
"y": 160,
"properties": {
"width": "120",
"height": "80"
},
"text": {
"x": 980,
"y": 200,
"value": "结束"
}
}
],
"edges": [
{
"id": "t1",
"type": "snaker:transition",
"sourceNodeId": "start",
"targetNodeId": "apply",
"startPoint": {
"x": 358,
"y": 160
},
"endPoint": {
"x": 460,
"y": 160
},
"properties": {
"height": 80,
"width": 120
},
"pointsList": [
{
"x": 358,
"y": 160
},
{
"x": 460,
"y": 160
}
]
},
{
"id": "t2",
"type": "snaker:transition",
"sourceNodeId": "apply",
"targetNodeId": "approveDept",
"startPoint": {
"x": 580,
"y": 160
},
"endPoint": {
"x": 680,
"y": 160
},
"properties": {
"height": 80,
"width": 120
},
"pointsList": [
{
"x": 580,
"y": 160
},
{
"x": 680,
"y": 160
}
]
},
{
"id": "t3",
"type": "snaker:transition",
"sourceNodeId": "approveDept",
"targetNodeId": "end",
"startPoint": {
"x": 800,
"y": 160
},
"endPoint": {
"x": 962,
"y": 160
},
"properties": {
"height": 80,
"width": 120
},
"pointsList": [
{
"x": 800,
"y": 160
},
{
"x": 830,
"y": 160
},
{
"x": 830,
"y": 160
},
{
"x": 932,
"y": 160
},
{
"x": 932,
"y": 160
},
{
"x": 962,
"y": 160
}
]
}
]
}
类图
为了实现流程定义文件的灵活解析,我们采用了模板方法设计模式。这一模式的核心在于定义了一个算法的骨架,并将某些步骤留给子类去具体实现。在我们的解析框架中,我们首先设计了一个抽象的基类AbstractNodeParser
,它规定了解析流程节点的基本步骤,如解析流程节点的id、类型、坐标、文本、属性等,但将具体实施细节抽象化。
子类继承自这个基类,它们负责填充那些抽象化的方法,即实现特定节点属性的解析逻辑,比如任务节点TaskNodeParser
,它负责填充任务节点的参与人、任务类型、参与类型、提醒时间、提醒间隔等和决策节点DecisionNodeParser
,它负责填充决策节点的决策表达式、决策处理类等。通过这种方式,我们能够确保所有解析流程遵循一致的结构,同时保持高度的灵活性,以适应多种多样的流程节点的解析。
流程图
代码实现
解析类
parser/node_parser.py
from abc import ABC, abstractmethod
from packages.engine.model import NodeModel
class NodeParser(ABC):
"""
节点解析器
"""
__abstract__ = True
ID_KEY = "id" # 节点id
NODE_NAME_PREFIX="snaker:" # 节点名称前辍
NODES_KEY = "nodes" # 节点
EDGES_KEY = "edges" # 边
TARGET_NODE_ID = "targetNodeId" # 目标节点id
SOURCE_NODE_ID = "sourceNodeId" # 源节点id
PROPERTIES_KEY = "properties" # 属性KEy
TEXT_KEY = "text" # 文本节点
TEXT_VALUE_KEY = "value" # 文本值
WIDTH_KEY = "width" # 节点宽度
HEIGHT_KEY = "height" # 节点高度
PRE_INTERCEPTORS_KEY = "preInterceptors" # 前置拦截器
POST_INTERCEPTORS_KEY = "postInterceptors" # 后置拦截器
EXPR_KEY = "expr" # 表达式key
HANDLE_CLASS_KEY = "handleClass" # 表达式处理类
FORM_KEY = "form" # 表单标识
ASSIGNEE_KEY = "assignee" # 参与人
ASSIGNMENT_HANDLE_KEY = "assignmentHandler" # 参与人处理类
TASK_TYPE_KEY = "taskType" # 任务类型(主办/协办)
PERFORM_TYPE_KEY = "performType" # 参与类型(普通参与/会签参与)
REMINDER_TIME_KEY = "reminderTime" # 提醒时间
REMINDER_REPEAT_KEY = "reminderRepeat" # 重复提醒间隔
EXPIRE_TIME_KEY = "expireTime" # 期待任务完成时间变量key
AUTH_EXECUTE_KEY = "autoExecute" # 到期是否自动执行Y/N
CALLBACK_KEY = "callback" # 自动执行回调类
EXT_FIELD_KEY = "field" # 自定义扩展属性
@abstractmethod
def parse(self, lfNode, lfEdges):
"""
节点属性解析方法,由解析类完成解析
@param lfNode LogicFlow节点对象
@param lfEdges 所有边对象
"""
pass
@abstractmethod
def get_model() -> NodeModel:
"""
解析完成后,提供返回NodeModel对象
"""
pass
parser/abstract_node_parser.py
from abc import abstractmethod
from packages.engine.model import NodeModel
from packages.engine.model.transition_model import TransitionModel
from packages.engine.parser.node_parser import NodeParser
class AbstractNodeParser(NodeParser):
"""
抽象节点解析器
"""
nodeModel = None
def __init__(self):
self.nodeModel = None
def parse(self, lfNode, lfEdges: list):
"""
子类实现此类完成特定解析
"""
self.nodeModel = self.new_nodel()
# 解析基本信息
self.nodeModel.name = lfNode.get(NodeParser.ID_KEY)
text = lfNode.get(NodeParser.TEXT_KEY, {})
self.nodeModel.displayName = text.get(NodeParser.TEXT_VALUE_KEY)
properties = lfNode.get(NodeParser.PROPERTIES_KEY, {})
# 解析拦截器
self.nodeModel.preInterceptors = properties.get(NodeParser.PRE_INTERCEPTORS_KEY)
self.nodeModel.postInterceptors = properties.get(NodeParser.POST_INTERCEPTORS_KEY)
# 解析输出边
nodeEdges = self.get_edge_by_source_node_id(self.nodeModel.name, lfEdges)
for nodeEdge in nodeEdges:
transitionModel = TransitionModel()
transitionModel.name = nodeEdge.get(NodeParser.ID_KEY)
transitionModel.to = nodeEdge.get(NodeParser.TARGET_NODE_ID)
transitionModel.source = self.nodeModel
transitionModel.expr = nodeEdge.get(NodeParser.EXPR_KEY)
self.nodeModel.outputs.append(transitionModel)
# 调用子类特定解析方法
self.parse_node(lfNode)
@abstractmethod
def parse_node(self, lfNode):
"""
子类实现此类完成特定解析
"""
def new_nodel(self) -> NodeModel:
"""
由子类各自创建节点模型对象
"""
def get_model(self) -> NodeModel:
"""
获取节点模型对象
"""
return self.nodeModel
def get_edge_by_target_node_id(self, targetNodeId: str,edges: list):
"""
获取节点输入
@param targetNodeId 目标节点id
@param edges
"""
res = []
for edge in edges:
if edge.get('targetNodeId') == targetNodeId:
res.append(edge)
return res
def get_edge_by_source_node_id(self, sourceNodeId: str,edges: list):
"""
获取节点输出
@param sourceNodeId 源节点id
@param edges
"""
res = []
for edge in edges:
if edge.get('sourceNodeId') == sourceNodeId:
res.append(edge)
return res
parser/impl/start_parser.java
from packages.engine.model import NodeModel
from packages.engine.model.start_model import StartModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class StartParser(AbstractNodeParser):
"""
开始节点解析器
"""
def parse_node(self, lfNode):
pass
def new_nodel(self) -> NodeModel:
return StartModel()
parser/impl/end_parser.java
from packages.engine.model import NodeModel
from packages.engine.model.end_model import EndModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class EndParser(AbstractNodeParser):
"""
结束节点解析器
"""
def parse_node(self, lfNode):
pass
def new_nodel(self) -> NodeModel:
return EndModel()
parser/impl/task_parser.py
import logging
from packages.engine.model import NodeModel
from packages.engine.model.task_model import TaskModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class TaskParser(AbstractNodeParser):
"""
任务节点解析器
"""
def parse_node(self, lfNode):
"""
解析task节点特有属性
"""
taskModel:TaskModel = self.nodeModel
properties = lfNode.get(self.PROPERTIES_KEY, {})
taskModel.form = properties.get(self.FORM_KEY)
taskModel.assignee = properties.get(self.ASSIGNEE_KEY)
taskModel.assignmentHandler = properties.get(self.ASSIGNMENT_HANDLE_KEY)
taskModel.taskType = properties.get(self.TASK_TYPE_KEY)
taskModel.performType = properties.get(self.PERFORM_TYPE_KEY)
taskModel.reminderTime = properties.get(self.REMINDER_TIME_KEY)
taskModel.reminderRepeat = properties.get(self.REMINDER_REPEAT_KEY)
taskModel.expireTime = properties.get(self.EXPIRE_TIME_KEY)
taskModel.autoExecute = properties.get(self.AUTH_EXECUTE_KEY)
taskModel.callback = properties.get(self.CALLBACK_KEY)
# 自定义扩展属性
taskModel.ext = properties.get(self.EXT_FIELD_KEY, {})
def new_nodel(self) -> NodeModel:
return TaskModel()
parser/impl/fork_parser.py
from packages.engine.model import NodeModel
from packages.engine.model.fork_model import ForkModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class ForkParser(AbstractNodeParser):
"""
分支节点解析器
"""
def parse_node(self, lfNode):
pass
def new_nodel(self) -> NodeModel:
return ForkModel()
parser/impl/join_parser.py
from packages.engine.model import NodeModel
from packages.engine.model.join_model import JoinModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class JoinParser(AbstractNodeParser):
"""
合并节点解析器
"""
def parse_node(self, lfNode):
pass
def new_nodel(self) -> NodeModel:
return JoinModel()
parser/impl/decision_parser.py
from packages.engine.model import NodeModel
from packages.engine.model.decision_model import DecisionModel
from packages.engine.parser.abstract_node_parser import AbstractNodeParser
class DecisionParser(AbstractNodeParser):
"""
决策节点解析器
"""
def parse_node(self, lfNode):
decisionModel:DecisionModel = self.nodeModel
properties = lfNode.get(self.PROPERTIES_KEY, {})
decisionModel.expr = properties.get(self.EXPR_KEY)
decisionModel.handleClass = properties.get(self.HANDLE_CLASS_KEY)
def new_nodel(self) -> NodeModel:
return DecisionModel()
解析入口类
parser/model_parser.py
import json
from packages.engine.model.process_model import ProcessModel
from packages.engine.model.task_model import TaskModel
from packages.engine.parser.impl.decision_parser import DecisionParser
from packages.engine.parser.impl.end_parser import EndParser
from packages.engine.parser.impl.fork_parser import ForkParser
from packages.engine.parser.impl.join_parser import JoinParser
from packages.engine.parser.impl.start_parser import StartParser
from packages.engine.parser.impl.task_parser import TaskParser
from packages.engine.parser.node_parser import NodeParser
class ModelParser(object):
"""
流程模型解析器
"""
parserMap = {
'start': StartParser(),
'end': EndParser(),
'task': TaskParser(),
'decision': DecisionParser(),
'fork': ForkParser(),
'join': JoinParser()
}
@staticmethod
def parse(jsonData):
"""
将json定义文件解析成流程模型对象
"""
processModel: ProcessModel = ProcessModel()
lfModel = json.load(jsonData)
lfNodes = lfModel.get(NodeParser.NODES_KEY)
lfEdges = lfModel.get(NodeParser.EDGES_KEY)
if lfNodes is None or len(lfNodes) == 0 or lfEdges is None or len(lfEdges) == 0:
return processModel
# 流程定义基本信息
processModel.name = lfModel.get("name")
processModel.displayName = lfModel.get("displayName")
processModel.type = lfModel.get("type")
processModel.instanceUrl = lfModel.get("instanceUrl")
processModel.instanceNoClass = lfModel.get("instanceNoClass")
# 流程节点信息
for node in lfNodes:
nodeType = node.get('type',"").replace("snaker:", "")
nodeParser: NodeParser = ModelParser.parserMap.get(nodeType)
if nodeParser is None:
continue
nodeParser.parse(node, lfEdges)
nodeModel = nodeParser.get_model()
processModel.nodes.append(nodeModel)
if isinstance(nodeModel, TaskModel):
processModel.tasks.append(nodeModel)
# 循环节点模型,构造输入边、输出边的source、target
for node in processModel.nodes:
for transition in node.outputs:
to = transition.to
for node2 in processModel.nodes:
if node2.name == to:
node2.inputs.append(transition)
transition.target = node2
return processModel
单元测试类
tests/test_model_parser.py
import logging
import os
from packages.engine.parser.model_parser import ModelParser
class TestModelParser:
def test_parse(self):
current_directory = os.getcwd()
logging.info(current_directory)
# 读取相对路径下的./tests/json/leave.json文件
with open("./tests/json/leave.json", "r", encoding='utf-8') as f:
processModel = ModelParser.parse(f)
processModel.get_start().execute({})
运行结果
model:StartModel,name:start,displayName:开始,time:2024-07-23 20:25:56.384774
model:TaskModel,name:apply,displayName:请假申请,time:2024-07-23 20:25:59.391439
model:TaskModel,name:approveDept,displayName:部门领导审批,time:2024-07-23 20:26:02.395030
model:EndModel,name:end,displayName:结束,time:2024-07-23 20:26:02.395030