Source code for pm4py.objects.conversion.bpmn.variants.to_petri_net

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import uuid
from enum import Enum

from pm4py.objects.petri_net.utils import reduction
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.petri_net.utils.petri_utils import add_arc_from_to
from pm4py.util import exec_utils


[docs]class Parameters(Enum): USE_ID = "use_id"
[docs]def build_digraph_from_petri_net(net): """ Builds a directed graph from a Petri net (for the purpose to add invisibles between inclusive gateways) Parameters ------------- net Petri net Returns ------------- digraph Digraph """ import networkx as nx graph = nx.DiGraph() for place in net.places: graph.add_node(place.name) for trans in net.transitions: in_places = [x.source for x in list(trans.in_arcs)] out_places = [x.target for x in list(trans.out_arcs)] for pl1 in in_places: for pl2 in out_places: graph.add_edge(pl1.name, pl2.name) return graph
[docs]def apply(bpmn_graph, parameters=None): """ Converts a BPMN graph to an accepting Petri net Parameters -------------- bpmn_graph BPMN graph parameters Parameters of the algorithm Returns -------------- net Petri net im Initial marking fm Final marking """ if parameters is None: parameters = {} import networkx as nx from pm4py.objects.bpmn.obj import BPMN use_id = exec_utils.get_param_value(Parameters.USE_ID, parameters, False) net = PetriNet("") source_place = PetriNet.Place("source") net.places.add(source_place) sink_place = PetriNet.Place("sink") net.places.add(sink_place) im = Marking() fm = Marking() im[source_place] = 1 fm[sink_place] = 1 # keep this correspondence for adding invisible transitions for OR-gateways inclusive_gateway_exit = set() inclusive_gateway_entry = set() flow_place = {} source_count = {} target_count = {} for flow in bpmn_graph.get_flows(): source = flow.get_source() target = flow.get_target() place = PetriNet.Place(str(flow.get_id())) net.places.add(place) flow_place[flow] = place if source not in source_count: source_count[source] = 0 if target not in target_count: target_count[target] = 0 source_count[source] = source_count[source] + 1 target_count[target] = target_count[target] + 1 for flow in bpmn_graph.get_flows(): source = flow.get_source() target = flow.get_target() place = PetriNet.Place(str(flow.get_id())) if isinstance(source, BPMN.InclusiveGateway) and source_count[source] > 1: inclusive_gateway_exit.add(place.name) elif isinstance(target, BPMN.InclusiveGateway) and target_count[target] > 1: inclusive_gateway_entry.add(place.name) # remove possible places that are both in inclusive_gateway_exit and inclusive_gateway_entry, # because we do not need to add invisibles in this situation incl_gat_set_inters = inclusive_gateway_entry.intersection(inclusive_gateway_exit) inclusive_gateway_exit = inclusive_gateway_exit.difference(incl_gat_set_inters) inclusive_gateway_entry = inclusive_gateway_entry.difference(incl_gat_set_inters) nodes_entering = {} nodes_exiting = {} for node in bpmn_graph.get_nodes(): entry_place = PetriNet.Place("ent_" + str(node.get_id())) net.places.add(entry_place) exiting_place = PetriNet.Place("exi_" + str(node.get_id())) net.places.add(exiting_place) if use_id: label = str(node.get_id()) else: label = str(node.get_name()) if isinstance(node, BPMN.Task) else None if not label: label = None transition = PetriNet.Transition(name=str(node.get_id()), label=label) net.transitions.add(transition) add_arc_from_to(entry_place, transition, net) add_arc_from_to(transition, exiting_place, net) if isinstance(node, BPMN.ParallelGateway) or isinstance(node, BPMN.InclusiveGateway): if source_count[node] > 1: exiting_object = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(exiting_object) add_arc_from_to(exiting_place, exiting_object, net) else: exiting_object = exiting_place if target_count[node] > 1: entering_object = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(entering_object) add_arc_from_to(entering_object, entry_place, net) else: entering_object = entry_place nodes_entering[node] = entering_object nodes_exiting[node] = exiting_object else: nodes_entering[node] = entry_place nodes_exiting[node] = exiting_place if isinstance(node, BPMN.StartEvent): start_transition = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(start_transition) add_arc_from_to(source_place, start_transition, net) add_arc_from_to(start_transition, entry_place, net) elif isinstance(node, BPMN.EndEvent): end_transition = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(end_transition) add_arc_from_to(exiting_place, end_transition, net) add_arc_from_to(end_transition, sink_place, net) for flow in bpmn_graph.get_flows(): source_object = nodes_exiting[flow.get_source()] target_object = nodes_entering[flow.get_target()] if isinstance(source_object, PetriNet.Place): inv1 = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(inv1) add_arc_from_to(source_object, inv1, net) source_object = inv1 if isinstance(target_object, PetriNet.Place): inv2 = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(inv2) add_arc_from_to(inv2, target_object, net) target_object = inv2 add_arc_from_to(source_object, flow_place[flow], net) add_arc_from_to(flow_place[flow], target_object, net) if inclusive_gateway_exit and inclusive_gateway_entry: # do the following steps if there are inclusive gateways: # - calculate the shortest paths # - add an invisible transition between couples of corresponding places # this ensures soundness and the correct translation of the BPMN inv_places = {x.name: x for x in net.places} digraph = build_digraph_from_petri_net(net) all_shortest_paths = dict(nx.all_pairs_dijkstra(digraph)) keys = list(all_shortest_paths.keys()) for pl1 in inclusive_gateway_exit: if pl1 in keys: output_places = sorted( [(x, len(y)) for x, y in all_shortest_paths[pl1][1].items() if x in inclusive_gateway_entry], key=lambda x: x[1]) if output_places: inv_trans = PetriNet.Transition(str(uuid.uuid4()), None) net.transitions.add(inv_trans) add_arc_from_to(inv_places[pl1], inv_trans, net) add_arc_from_to(inv_trans, inv_places[output_places[0][0]], net) reduction.apply_simple_reduction(net) return net, im, fm