'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
import heapq
import sys
import uuid
from enum import Enum
from pm4py.objects.log.obj import EventLog, Trace
from pm4py.util import constants, xes_constants, exec_utils
from pm4py.util import variants_util
from pm4py.objects.petri_net.utils import align_utils
from pm4py.objects.log.obj import EventLog, Trace
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.util import typing
from pm4py.objects.conversion.log import converter as log_converter
[docs]class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
SYNC_COST_FUNCTION = "sync_cost_function"
MODEL_MOVE_COST_FUNCTION = "model_move_cost_function"
LOG_MOVE_COST_FUNCTION = "log_move_cost_function"
INTERNAL_LOG_MOVE_COST_FUNCTION = "internal_log_move_cost_function"
PARAMETER_VARIANT_DELIMITER = "variant_delimiter"
[docs]class Outputs(Enum):
ALIGNMENT = "alignment"
COST = "cost"
VISITED = "visited_states"
CLOSED = "closed"
INTERNAL_COST = "internal_cost"
POSITION_G = 0
POSITION_LOG = 1
POSITION_NUM_VISITED = 2
POSITION_H = 3
POSITION_REAL_F = 4
POSITION_F = 5
POSITION_MODEL = 6
POSITION_IS_LM = 7
POSITION_IS_MM = 8
POSITION_PREV = 9
[docs]def apply(obj: Union[EventLog, Trace], dfg: Dict[Tuple[str, str], int], sa: Dict[str, int], ea: Dict[str, int], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Union[typing.AlignmentResult, typing.ListAlignments]:
"""
Applies the alignment algorithm provided a log/trace object, and a *connected* DFG
Parameters
--------------
obj
Event log / Trace
dfg
*Connected* directly-Follows Graph
sa
Start activities
ea
End activities
parameters
Parameters of the algorithm:
- Parameters.SYNC_COST_FUNCTION: for each activity that is in both the trace and the model, provide the
non-negative cost of a sync move
- Parameters.MODEL_MOVE_COST_FUNCTION: for each activity that is in the model, provide the non-negative
cost of a model move
- Parameters.LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost of a log move
that is returned in the alignment to the user (but not used internally for ordering the states)
- Parameters.INTERNAL_LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost
of a log move that is used internally for ordering the states in the search algorithm.
- Parameters.ACTIVITY_KEY: the attribute of the log that is the activity
Returns
--------------
ali
Result of the alignment
"""
if isinstance(obj, Trace):
return apply_trace(obj, dfg, sa, ea, parameters=parameters)
else:
return apply_log(obj, dfg, sa, ea, parameters=parameters)
[docs]def apply_log(log, dfg, sa, ea, parameters=None):
"""
Applies the alignment algorithm provided a log object, and a *connected* DFG
Parameters
----------------
log
Event log
dfg
*Connected* DFG
sa
Start activities
ea
End activities
parameters
Parameters of the algorithm:
- Parameters.SYNC_COST_FUNCTION: for each activity that is in both the trace and the model, provide the
non-negative cost of a sync move
- Parameters.MODEL_MOVE_COST_FUNCTION: for each activity that is in the model, provide the non-negative
cost of a model move
- Parameters.LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost of a log move
that is returned in the alignment to the user (but not used internally for ordering the states)
- Parameters.INTERNAL_LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost
of a log move that is used internally for ordering the states in the search algorithm.
- Parameters.ACTIVITY_KEY: the attribute of the log that is the activity
Returns
----------------
aligned_traces
For each trace, contains a dictionary describing the alignment
"""
if parameters is None:
parameters = {}
log = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters)
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)
aligned_traces = []
align_dict = {}
al_empty_cost = __apply_list_activities([], dfg, sa, ea, parameters=parameters)["cost"]
for trace in log:
trace_act = tuple(x[activity_key] for x in trace)
if trace_act in align_dict:
aligned_traces.append(align_dict[trace_act])
else:
log_move_cost_function = exec_utils.get_param_value(Parameters.LOG_MOVE_COST_FUNCTION, parameters,
{x: align_utils.STD_MODEL_LOG_MOVE_COST for x in
trace_act})
trace_bwc_cost = sum(log_move_cost_function[x] for x in trace_act)
al_tr = __apply_list_activities(trace_act, dfg, sa, ea, parameters=parameters)
al_tr["fitness"] = 1.0 - al_tr["cost"] / (al_empty_cost + trace_bwc_cost)
al_tr["bwc"] = al_empty_cost + trace_bwc_cost
align_dict[trace_act] = al_tr
aligned_traces.append(align_dict[trace_act])
return aligned_traces
[docs]def apply_trace(trace, dfg, sa, ea, parameters=None):
"""
Applies the alignment algorithm provided a trace of a log, and a *connected* DFG
Parameters
---------------
trace
Trace
dfg
*Connected* DFG
sa
Start activities
ea
End activities
parameters
Parameters of the algorithm:
- Parameters.SYNC_COST_FUNCTION: for each activity that is in both the trace and the model, provide the
non-negative cost of a sync move
- Parameters.MODEL_MOVE_COST_FUNCTION: for each activity that is in the model, provide the non-negative
cost of a model move
- Parameters.LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost of a log move
that is returned in the alignment to the user (but not used internally for ordering the states)
- Parameters.INTERNAL_LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost
of a log move that is used internally for ordering the states in the search algorithm.
- Parameters.ACTIVITY_KEY: the attribute of the log that is the activity
Returns
---------------
ali
Dictionary describing the alignment
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)
trace_act = tuple(x[activity_key] for x in trace)
return __apply_list_activities(trace_act, dfg, sa, ea, parameters=parameters)
[docs]def apply_from_variants_list_dfg_string(var_list, dfg_serialization, parameters=None):
if parameters is None:
parameters = {}
from pm4py.objects.dfg.importer import importer as dfg_importer
dfg, sa, ea = dfg_importer.deserialize(dfg_serialization, parameters=parameters)
return apply_from_variants_list(var_list, dfg, sa, ea, parameters=parameters)
[docs]def apply_from_variants_list(var_list, dfg, sa, ea, parameters=None):
if parameters is None:
parameters = {}
dictio_alignments = {}
for varitem in var_list:
variant = varitem[0]
dictio_alignments[variant] = apply_from_variant(variant, dfg, sa, ea, parameters=parameters)
return dictio_alignments
[docs]def apply_from_variant(variant, dfg, sa, ea, parameters=None):
if parameters is None:
parameters = {}
trace = variants_util.variant_to_trace(variant, parameters=parameters)
return apply_trace(trace, dfg, sa, ea, parameters=parameters)
[docs]def dijkstra_to_end_node(dfg, sa, ea, start_node, end_node, activities_model, sync_cost_function,
model_move_cost_function):
"""
Gets the cost of the minimum path from a node to the end node
Parameters
---------------
dfg
*Connected* DFG
sa
Start activities
ea
End activities
start_node
Start node of the graph (connected to all the start activities)
end_node
End node of the graph (connected to all the end activities)
activities_model
Set of the activities contained in the DFG
sync_cost_function
Given an activity, provides the cost when the activity is executed in a sync way
model_move_cost_function
Given an activity, provides the cost when the activity is executed as a move-on-model
Returns
-----------------
cost_to_end_node
Dictionary associating to each node the cost to the end node
"""
pred = {x: set() for x in activities_model}
pred[start_node] = set()
pred[end_node] = set()
for x in sa:
pred[x].add(start_node)
for x in ea:
pred[end_node].add(x)
for x in dfg:
pred[x[1]].add(x[0])
min_dist = {x: sys.maxsize for x in activities_model}
min_dist[start_node] = sys.maxsize
min_dist[end_node] = 0
nodes_cost = {}
nodes_cost[start_node] = 0
nodes_cost[end_node] = 0
for x in activities_model:
if x in sync_cost_function:
nodes_cost[x] = min(sync_cost_function[x], model_move_cost_function[x])
else:
nodes_cost[x] = model_move_cost_function[x]
to_visit = [end_node]
while len(to_visit) > 0:
curr = to_visit.pop(0)
curr_dist = min_dist[curr]
for prev_node in pred[curr]:
prev_dist = min_dist[prev_node]
if prev_dist > curr_dist + nodes_cost[prev_node]:
min_dist[prev_node] = curr_dist + nodes_cost[prev_node]
to_visit.append(prev_node)
# subtract the cost of the node itself to make the heuristics correct
for el in min_dist:
min_dist[el] -= nodes_cost[el]
return min_dist
def __apply_list_activities(trace, dfg, sa, ea, parameters=None):
"""
Apply the alignments provided the *connected* DFG and a list of activities of the trace
Parameters
--------------
trace
List of activities of the trace
dfg
*Connected* DFG
sa
Start activities
ea
End activities
parameters
Parameters of the algorithm:
- Parameters.SYNC_COST_FUNCTION: for each activity that is in both the trace and the model, provide the
non-negative cost of a sync move
- Parameters.MODEL_MOVE_COST_FUNCTION: for each activity that is in the model, provide the non-negative
cost of a model move
- Parameters.LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost of a log move
that is returned in the alignment to the user (but not used internally for ordering the states)
- Parameters.INTERNAL_LOG_MOVE_COST_FUNCTION: for each activity that is in the trace, provide the cost
of a log move that is used internally for ordering the states in the search algorithm.
Returns
--------------
ali
Dictionary describing the alignment
"""
if parameters is None:
parameters = {}
activities_trace = set(x for x in trace)
activities_model = set(x[0] for x in dfg).union(set(x[1] for x in dfg)).union(set(x for x in sa)).union(
set(x for x in ea))
activities_one_instantiation = activities_trace.union(activities_model)
activities_one_instantiation = {x: x for x in activities_one_instantiation}
activities_both_in_trace_and_model = activities_trace.intersection(activities_model)
sync_cost_function = exec_utils.get_param_value(Parameters.SYNC_COST_FUNCTION, parameters,
{x: align_utils.STD_SYNC_COST for x in activities_both_in_trace_and_model})
model_move_cost_function = exec_utils.get_param_value(Parameters.MODEL_MOVE_COST_FUNCTION, parameters,
{x: align_utils.STD_MODEL_LOG_MOVE_COST for x in activities_model})
# for each activity that is in the trace, provides the cost of a log move
# that is returned in the alignment to the user (but not used internally for ordering the states)
log_move_cost_function = exec_utils.get_param_value(Parameters.LOG_MOVE_COST_FUNCTION, parameters,
{x: align_utils.STD_MODEL_LOG_MOVE_COST for x in activities_trace})
# for each activity that is in the trace, provide the cost of a log move that is used internally for
# ordering the states in the search algorithm.
internal_log_move_cost_function = exec_utils.get_param_value(Parameters.INTERNAL_LOG_MOVE_COST_FUNCTION, parameters,
None)
if internal_log_move_cost_function is None:
internal_log_move_cost_function = {}
for x in activities_trace:
if x in activities_model:
internal_log_move_cost_function[x] = log_move_cost_function[x]
else:
internal_log_move_cost_function[x] = 0
start_node = str(uuid.uuid4())
end_node = str(uuid.uuid4())
outgoing_nodes = {activities_one_instantiation[x]: set() for x in activities_model}
outgoing_nodes[start_node] = set()
for x in dfg:
outgoing_nodes[x[0]].add(activities_one_instantiation[x[1]])
for x in sa:
outgoing_nodes[start_node].add(activities_one_instantiation[x])
for x in ea:
outgoing_nodes[activities_one_instantiation[x]].add(end_node)
# closed set: the set associates each activity of the model with the index of the trace that lastly visited that.
# this is a very efficient closed set since it cuts double-visiting the same activity for state visited later
# that have a lower index of a trace.
closed = {activities_one_instantiation[x]: -1 for x in activities_model}
closed[start_node] = -1
closed[end_node] = -1
f = 0
h = 0
g = f + h
num_visited = 0
num_closed = 0
open_set = [(g, 0, num_visited, -h, f, f, start_node, False, False, None)]
heapq.heapify(open_set)
while not len(open_set) == 0:
curr = heapq.heappop(open_set)
num_visited = num_visited + 1
if -curr[POSITION_LOG] <= closed[curr[POSITION_MODEL]]:
# closed
continue
closed[curr[POSITION_MODEL]] = -curr[POSITION_LOG]
not_end_trace = -curr[POSITION_LOG] < len(trace)
if curr[POSITION_MODEL] == end_node and not not_end_trace:
return __return_alignment(curr, trace, num_closed)
num_closed = num_closed + 1
trace_curr = trace[-curr[POSITION_LOG]] if not_end_trace else None
if not curr[POSITION_MODEL] == end_node:
# I can do some sync / move on model
list_act = outgoing_nodes[curr[POSITION_MODEL]]
act_not_in_model = not_end_trace and trace_curr not in activities_model
# avoid scheduling model moves if the activity of the log is not in the model,
# in that case schedule directly the model move
if not act_not_in_model:
can_sync = not_end_trace and trace_curr in list_act
if can_sync:
f = curr[POSITION_F] + sync_cost_function[trace_curr]
real_f = curr[POSITION_REAL_F] + sync_cost_function[trace_curr]
h = 0
g = f + h
new_state = (g, curr[POSITION_LOG] - 1, num_visited, -h, real_f, f, activities_one_instantiation[trace_curr],
False, False, curr)
if -new_state[POSITION_LOG] > closed[new_state[POSITION_MODEL]]:
heapq.heappush(open_set, new_state)
for act in list_act:
if act == end_node:
new_state = (
curr[POSITION_G], curr[POSITION_LOG], num_visited, curr[POSITION_H], curr[POSITION_REAL_F], curr[POSITION_F],
end_node, False, False, curr)
if -new_state[POSITION_LOG] > closed[new_state[POSITION_MODEL]]:
heapq.heappush(open_set, new_state)
elif not not_end_trace or not trace_curr in list_act:
f = curr[POSITION_F] + model_move_cost_function[act]
real_f = curr[POSITION_REAL_F] + model_move_cost_function[act]
h = 0
g = f + h
new_state = (
g, curr[POSITION_LOG], num_visited, -h, real_f, f, activities_one_instantiation[act], False,
True,
curr)
if -new_state[POSITION_LOG] > closed[new_state[POSITION_MODEL]]:
heapq.heappush(open_set, new_state)
if not_end_trace and not curr[POSITION_IS_MM]:
# I can do some move-on-log
f = curr[POSITION_F] + internal_log_move_cost_function[trace_curr]
real_f = curr[POSITION_REAL_F] + log_move_cost_function[trace_curr]
h = 0
g = f + h
new_state = (g, curr[POSITION_LOG] - 1, num_visited, -h, real_f, f, curr[POSITION_MODEL], True, False, curr)
if -new_state[POSITION_LOG] > closed[new_state[POSITION_MODEL]]:
heapq.heappush(open_set, new_state)
def __return_alignment(state, trace, closed):
"""
Returns the computed alignment in an intellegible way
Parameters
--------------
state
Current state
trace
List of activities
closed
Number of closed states
Returns
--------------
ali
Dictionary describing the alignment
"""
cost = state[POSITION_REAL_F]
internal_cost = state[POSITION_F]
visited = state[POSITION_NUM_VISITED]
moves = []
while state[POSITION_PREV] is not None:
state = state[POSITION_PREV]
is_lm = state[POSITION_IS_LM]
is_mm = state[POSITION_IS_MM]
pl = -state[POSITION_LOG] - 1
if pl >= 0:
lm = ">>"
mm = ">>"
if not is_lm:
mm = state[POSITION_MODEL]
if not is_mm:
lm = trace[pl]
moves.append((lm, mm))
moves.reverse()
return {Outputs.ALIGNMENT.value: moves, Outputs.COST.value: cost, Outputs.VISITED.value: visited,
Outputs.CLOSED.value: closed, Outputs.INTERNAL_COST.value: internal_cost}