'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
import random
import time
from typing import Optional, Set
from copy import copy, deepcopy
from pm4py.objects.log.obj import Trace, Event
from pm4py.objects.petri_net import properties
from pm4py.objects.petri_net import semantics, properties
from pm4py.objects.petri_net.utils.networkx_graph import create_networkx_directed_graph
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import xes_constants as xes_util
import deprecation
[docs]def is_sub_marking(sub_marking: Marking, marking: Marking) -> bool:
for p in sub_marking:
if p not in marking:
return False
elif marking[p] > sub_marking[p]:
return False
return True
[docs]def place_set_as_marking(places) -> Marking:
m = Marking()
for p in places:
m[p] = 1
return m
[docs]def get_arc_type(elem):
if properties.ARCTYPE in elem.properties:
return elem.properties[properties.ARCTYPE]
return None
[docs]def pre_set(elem, arc_type=None) -> Set:
pre = set()
for a in elem.in_arcs:
if get_arc_type(a) == arc_type:
pre.add(a.source)
return pre
[docs]def post_set(elem, arc_type=None) -> Set:
post = set()
for a in elem.out_arcs:
if get_arc_type(a) == arc_type:
post.add(a.target)
return post
[docs]def remove_transition(net: PetriNet, trans: PetriNet.Transition) -> PetriNet:
"""
Remove a transition from a Petri net
Parameters
----------
net
Petri net
trans
Transition to remove
Returns
----------
net
Petri net
"""
if trans in net.transitions:
in_arcs = trans.in_arcs
for arc in in_arcs:
place = arc.source
place.out_arcs.remove(arc)
net.arcs.remove(arc)
out_arcs = trans.out_arcs
for arc in out_arcs:
place = arc.target
place.in_arcs.remove(arc)
net.arcs.remove(arc)
net.transitions.remove(trans)
return net
[docs]def add_place(net: PetriNet, name=None) -> PetriNet.Place:
name = name if name is not None else 'p_' + str(len(net.places)) + '_' + str(time.time()) + str(
random.randint(0, 10000))
p = PetriNet.Place(name=name)
net.places.add(p)
return p
[docs]def add_transition(net: PetriNet, name=None, label=None) -> PetriNet.Transition:
name = name if name is not None else 't_' + str(len(net.transitions)) + '_' + str(time.time()) + str(
random.randint(0, 10000))
t = PetriNet.Transition(name=name, label=label)
net.transitions.add(t)
return t
[docs]def merge(trgt: Optional[PetriNet]=None, nets=None) -> PetriNet:
trgt = trgt if trgt is not None else PetriNet()
nets = nets if nets is not None else list()
for net in nets:
trgt.transitions.update(net.transitions)
trgt.places.update(net.places)
trgt.arcs.update(net.arcs)
return trgt
[docs]def remove_place(net: PetriNet, place: PetriNet.Place) -> PetriNet:
"""
Remove a place from a Petri net
Parameters
-------------
net
Petri net
place
Place to remove
Returns
-------------
net
Petri net
"""
if place in net.places:
in_arcs = place.in_arcs
for arc in in_arcs:
trans = arc.source
trans.out_arcs.remove(arc)
net.arcs.remove(arc)
out_arcs = place.out_arcs
for arc in out_arcs:
trans = arc.target
trans.in_arcs.remove(arc)
net.arcs.remove(arc)
net.places.remove(place)
return net
[docs]def add_arc_from_to(fr, to, net: PetriNet, weight=1, type=None) -> PetriNet.Arc:
"""
Adds an arc from a specific element to another element in some net. Assumes from and to are in the net!
Parameters
----------
fr: transition/place from
to: transition/place to
net: net to use
weight: weight associated to the arc
Returns
-------
None
"""
a = PetriNet.Arc(fr, to, weight)
if type is not None:
a.properties[properties.ARCTYPE] = type
net.arcs.add(a)
fr.out_arcs.add(a)
to.in_arcs.add(a)
return a
[docs]def construct_trace_net(trace, trace_name_key=xes_util.DEFAULT_NAME_KEY, activity_key=xes_util.DEFAULT_NAME_KEY):
"""
Creates a trace net, i.e. a trace in Petri net form.
Parameters
----------
trace: :class:`list` input trace, assumed to be a list of events
trace_name_key: :class:`str` key of the attribute that defines the name of the trace
activity_key: :class:`str` key of the attribute of the events that defines the activity name
Returns
-------
tuple: :class:`tuple` of the net, initial marking and the final marking
"""
net = PetriNet(
'trace net of %s' % trace.attributes[trace_name_key] if trace_name_key in trace.attributes else ' ')
place_map = {0: PetriNet.Place('p_0')}
net.places.add(place_map[0])
for i in range(0, len(trace)):
t = PetriNet.Transition('t_' + trace[i][activity_key] + '_' + str(i), trace[i][activity_key])
# 16/02/2021: set the trace index as property of the transition of the trace net
t.properties[properties.TRACE_NET_TRANS_INDEX] = i
net.transitions.add(t)
place_map[i + 1] = PetriNet.Place('p_' + str(i + 1))
# 16/02/2021: set the place index as property of the place of the trace net
place_map[i + 1].properties[properties.TRACE_NET_PLACE_INDEX] = i + 1
net.places.add(place_map[i + 1])
add_arc_from_to(place_map[i], t, net)
add_arc_from_to(t, place_map[i + 1], net)
return net, Marking({place_map[0]: 1}), Marking({place_map[len(trace)]: 1})
[docs]def construct_trace_net_cost_aware(trace, costs, trace_name_key=xes_util.DEFAULT_NAME_KEY,
activity_key=xes_util.DEFAULT_NAME_KEY):
"""
Creates a trace net, i.e. a trace in Petri net form mapping specific costs to transitions.
Parameters
----------
trace: :class:`list` input trace, assumed to be a list of events
costs: :class:`list` list of costs, length should be equal to the length of the input trace
trace_name_key: :class:`str` key of the attribute that defines the name of the trace
activity_key: :class:`str` key of the attribute of the events that defines the activity name
Returns
-------
tuple: :class:`tuple` of the net, initial marking, final marking and map of costs
"""
net = PetriNet(
'trace net of %s' % trace.attributes[trace_name_key] if trace_name_key in trace.attributes else ' ')
place_map = {0: PetriNet.Place('p_0')}
net.places.add(place_map[0])
cost_map = dict()
for i in range(0, len(trace)):
t = PetriNet.Transition('t_' + trace[i][activity_key] + '_' + str(i), trace[i][activity_key])
# 16/02/2021: set the trace index as property of the transition of the trace net
t.properties[properties.TRACE_NET_TRANS_INDEX] = i
cost_map[t] = costs[i]
net.transitions.add(t)
place_map[i + 1] = PetriNet.Place('p_' + str(i + 1))
# 16/02/2021: set the place index as property of the place of the trace net
place_map[i + 1].properties[properties.TRACE_NET_PLACE_INDEX] = i + 1
net.places.add(place_map[i + 1])
add_arc_from_to(place_map[i], t, net)
add_arc_from_to(t, place_map[i + 1], net)
return net, Marking({place_map[0]: 1}), Marking({place_map[len(trace)]: 1}), cost_map
[docs]def acyclic_net_variants(net, initial_marking, final_marking, activity_key=xes_util.DEFAULT_NAME_KEY):
"""
Given an acyclic accepting Petri net, initial and final marking extracts a set of variants (in form of traces)
replayable on the net.
Warning: this function is based on a marking exploration. If the accepting Petri net contains loops, the method
will not work properly as it stops the search if a specific marking has already been encountered.
Parameters
----------
:param net: An acyclic workflow net
:param initial_marking: The initial marking of the net.
:param final_marking: The final marking of the net.
:param activity_key: activity key to use
Returns
-------
:return: variants: :class:`list` Set of variants - in the form of Trace objects - obtainable executing the net
"""
active = {(initial_marking, ())}
visited = set()
variants = set()
while active:
curr_marking, curr_partial_trace = active.pop()
curr_pair = (curr_marking, curr_partial_trace)
enabled_transitions = semantics.enabled_transitions(net, curr_marking)
for transition in enabled_transitions:
if transition.label is not None:
next_partial_trace = curr_partial_trace + (transition.label,)
else:
next_partial_trace = curr_partial_trace
next_marking = semantics.execute(transition, net, curr_marking)
next_pair = (next_marking, next_partial_trace)
if next_marking == final_marking:
variants.add(next_partial_trace)
else:
# If the next marking is not in visited, if the next marking+partial trace is different from the current one+partial trace
if next_pair not in visited and curr_pair != next_pair:
active.add(next_pair)
visited.add(curr_pair)
trace_variants = []
for variant in variants:
trace = Trace()
for activity_label in variant:
trace.append(Event({activity_key: activity_label}))
trace_variants.append(trace)
return trace_variants
[docs]def get_transition_by_name(net: PetriNet, transition_name) -> Optional[PetriNet.Transition]:
"""
Get a transition by its name
Parameters
------------
net
Petri net
transition_name
Transition name
Returns
------------
transition
Transition object
"""
for t in net.transitions:
if t.name == transition_name:
return t
return None
[docs]@deprecation.deprecated('2.2.7', '3.0.0')
def get_cycles_petri_net_places(net):
"""
Get the cycles of a Petri net (returning only list of places belonging to the cycle)
Parameters
-------------
net
Petri net
Returns
-------------
cycles
Cycles (places) of the Petri net
"""
import networkx as nx
graph, inv_dictionary = create_networkx_directed_graph(net)
cycles = nx.simple_cycles(graph)
cycles_places = []
for cycle in cycles:
cycles_places.append([])
for el in cycle:
if el in inv_dictionary and type(inv_dictionary[el]) is PetriNet.Place:
cycles_places[-1].append(inv_dictionary[el])
return cycles_places
[docs]@deprecation.deprecated('2.2.7', '3.0.0')
def get_cycles_petri_net_transitions(net):
"""
Get the cycles of a Petri net (returning only list of transitions belonging to the cycle)
Parameters
-------------
net
Petri net
Returns
-------------
cycles
Cycles (transitions) of the Petri net
"""
import networkx as nx
graph, inv_dictionary = create_networkx_directed_graph(net)
cycles = nx.simple_cycles(graph)
cycles_trans = []
for cycle in cycles:
cycles_trans.append([])
for el in cycle:
if el in inv_dictionary and type(inv_dictionary[el]) is PetriNet.Transition:
cycles_trans[-1].append(inv_dictionary[el])
return cycles_trans
[docs]def decorate_places_preset_trans(net: PetriNet):
"""
Decorate places with information useful for the replay
Parameters
-------------
net
Petri net
"""
for place in net.places:
place.ass_trans = set()
for trans in net.transitions:
for place in trans.sub_marking:
place.ass_trans.add(trans)
[docs]def decorate_transitions_prepostset(net: PetriNet):
"""
Decorate transitions with sub and addition markings
Parameters
-------------
net
Petri net
"""
from pm4py.objects.petri_net.obj import Marking
for trans in net.transitions:
sub_marking = Marking()
add_marking = Marking()
for arc in trans.in_arcs:
sub_marking[arc.source] = arc.weight
add_marking[arc.source] = -arc.weight
for arc in trans.out_arcs:
if arc.target in add_marking:
add_marking[arc.target] = arc.weight + add_marking[arc.target]
else:
add_marking[arc.target] = arc.weight
trans.sub_marking = sub_marking
trans.add_marking = add_marking
[docs]@deprecation.deprecated('2.2.7', '3.0.0')
def get_strongly_connected_subnets(net):
"""
Get the strongly connected components subnets in the Petri net
Parameters
-------------
net
Petri net
Returns
-------------
strongly_connected_transitions
List of strongly connected transitions of the Petri net
"""
import networkx as nx
graph, inv_dictionary = create_networkx_directed_graph(net)
sccg = list(nx.strongly_connected_components(graph))
strongly_connected_subnets = []
for sg in list(sccg):
if len(sg) > 1:
subnet = PetriNet()
imarking = Marking()
fmarking = Marking()
corr = {}
for node in sg:
if node in inv_dictionary:
if type(inv_dictionary[node]) is PetriNet.Transition:
prev_trans = inv_dictionary[node]
new_trans = PetriNet.Transition(prev_trans.name, prev_trans.label)
corr[node] = new_trans
subnet.transitions.add(new_trans)
if type(inv_dictionary[node]) is PetriNet.Place:
prev_place = inv_dictionary[node]
new_place = PetriNet.Place(prev_place.name)
corr[node] = new_place
subnet.places.add(new_place)
for edge in graph.edges:
if edge[0] in sg and edge[1] in sg:
add_arc_from_to(corr[edge[0]], corr[edge[1]], subnet)
strongly_connected_subnets.append([subnet, imarking, fmarking])
return strongly_connected_subnets
[docs]def get_places_shortest_path(net, place_to_populate, current_place, places_shortest_path, actual_list, rec_depth,
max_rec_depth):
"""
Get shortest path between places lead by hidden transitions
Parameters
----------
net
Petri net
place_to_populate
Place that we are populating the shortest map of
current_place
Current visited place (must explore its transitions)
places_shortest_path
Current dictionary
actual_list
Actual list of transitions to enable
rec_depth
Recursion depth
max_rec_depth
Maximum recursion depth
"""
if rec_depth > max_rec_depth:
return places_shortest_path
if place_to_populate not in places_shortest_path:
places_shortest_path[place_to_populate] = {}
for t in current_place.out_arcs:
if t.target.label is None:
for p2 in t.target.out_arcs:
if p2.target not in places_shortest_path[place_to_populate] or len(actual_list) + 1 < len(
places_shortest_path[place_to_populate][p2.target]):
new_actual_list = copy(actual_list)
new_actual_list.append(t.target)
places_shortest_path[place_to_populate][p2.target] = copy(new_actual_list)
places_shortest_path = get_places_shortest_path(net, place_to_populate, p2.target,
places_shortest_path, new_actual_list,
rec_depth + 1, max_rec_depth)
return places_shortest_path
[docs]def get_places_shortest_path_by_hidden(net: PetriNet, max_rec_depth):
"""
Get shortest path between places lead by hidden transitions
Parameters
----------
net
Petri net
max_rec_depth
Maximum recursion depth
"""
places_shortest_path = {}
for p in net.places:
places_shortest_path = get_places_shortest_path(net, p, p, places_shortest_path, [], 0, max_rec_depth)
return places_shortest_path
[docs]def invert_spaths_dictionary(spaths):
"""
Invert the shortest paths (between places) dictionary,
from target-source to source-target
Parameters
-------------
spaths
Shortest paths dictionary
Returns
-------------
inv_spaths
Inverted shortest paths dictionary
"""
inv_spaths = {}
for target_place in spaths:
for source_place in spaths[target_place]:
if not source_place in inv_spaths:
inv_spaths[source_place] = {}
if not target_place in inv_spaths[source_place]:
inv_spaths[source_place][target_place] = set()
inv_spaths[source_place][target_place] = inv_spaths[source_place][target_place].union(
spaths[target_place][source_place])
return inv_spaths
[docs]def remove_unconnected_components(net: PetriNet) -> PetriNet:
"""
Remove unconnected components from a Petri net
Parameters
-----------
net
Petri net
Returns
-----------
net
Cleaned Petri net
"""
changed_something = True
while changed_something:
changed_something = False
places = list(net.places)
for place in places:
if len(place.in_arcs) == 0 and len(place.out_arcs) == 0:
remove_place(net, place)
changed_something = True
transitions = list(net.transitions)
for trans in transitions:
if len(trans.in_arcs) == 0 or len(trans.out_arcs) == 0:
remove_transition(net, trans)
changed_something = True
return net
[docs]def get_s_components_from_petri(net, im, fm, rec_depth=0, curr_s_comp=None, visited_places=None,
list_s_components=None, max_rec_depth=6):
"""
Gets the S-components from a Petri net
Parameters
-------------
net
Petri net
im
Initial marking
fm
Final marking
curr_s_comp
Current S component
visited_places
Visited places
list_s_components
List of S-components
max_rec_depth
Maximum recursion depth
Returns
--------------
s_components
List of S-components
"""
if list_s_components is None:
list_s_components = []
if len(im) > 1 or len(fm) > 1:
return list_s_components
source = list(im.keys())[0]
if curr_s_comp is None:
curr_s_comp = [source]
if visited_places is None:
visited_places = []
something_changed = True
while something_changed and rec_depth < max_rec_depth:
something_changed = False
places_to_visit = sorted(list(set(curr_s_comp[len(visited_places):])), key=lambda x: len(x.out_arcs),
reverse=True)
for place_to_visit in places_to_visit:
visited_places.append(place_to_visit)
target_trans = sorted(list(set([arc.target for arc in place_to_visit.out_arcs])),
key=lambda x: len(x.out_arcs))
for trans in target_trans:
visited_places_names = [x.name for x in visited_places]
target_trans_target = list(
set([arc.target for arc in trans.out_arcs if arc.target.name not in visited_places_names]))
if target_trans_target:
something_changed = True
if len(target_trans_target) == 1:
new_place = target_trans_target[0]
curr_s_comp.append(new_place)
else:
for new_place in target_trans_target:
[new_curr_s_comp, new_visited_places] = deepcopy([curr_s_comp, visited_places])
new_curr_s_comp.append(new_place)
list_s_components = get_s_components_from_petri(net, im, fm, rec_depth=rec_depth + 1,
curr_s_comp=new_curr_s_comp,
visited_places=new_visited_places,
list_s_components=list_s_components,
max_rec_depth=max_rec_depth)
if not set([place.name for place in curr_s_comp]) in list_s_components:
list_s_components.append(set([place.name for place in curr_s_comp]))
return list_s_components
[docs]def remove_arc(net: PetriNet, arc: PetriNet.Arc) -> PetriNet:
"""
Removes an arc from a Petri net
Parameters
---------------
net
Petri net
arc
Arc of the Petri net
Returns
-------------
net
Petri net
"""
net.arcs.remove(arc)
arc.source.out_arcs.remove(arc)
arc.target.in_arcs.remove(arc)
return net