'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
import pkgutil
import sys
import time
from copy import copy
from pm4py.algo.conformance.alignments.petri_net.variants import state_equation_a_star
from pm4py.objects.log import obj as log_implementation
from pm4py.objects.log.obj import Trace
from pm4py.objects.log.util.xes import DEFAULT_NAME_KEY
from pm4py.objects.petri_net.utils import align_utils as utils, decomposition as decomp_utils
from pm4py.statistics.variants.log import get as variants_module
from pm4py.util import exec_utils
from pm4py.util import variants_util
from enum import Enum
from pm4py.util import constants
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import typing
from pm4py.objects.conversion.log import converter as log_converter
[docs]class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
BEST_WORST_COST = 'best_worst_cost'
PARAM_TRACE_COST_FUNCTION = 'trace_cost_function'
ICACHE = "icache"
MCACHE = "mcache"
PARAM_THRESHOLD_BORDER_AGREEMENT = "thresh_border_agreement"
PARAMETER_VARIANT_DELIMITER = "variant_delimiter"
PARAM_MODEL_COST_FUNCTION = 'model_cost_function'
PARAM_SYNC_COST_FUNCTION = 'sync_cost_function'
PARAM_TRACE_NET_COSTS = "trace_net_costs"
PARAM_MAX_ALIGN_TIME = "max_align_time"
PARAM_MAX_ALIGN_TIME_TRACE = "max_align_time_trace"
SHOW_PROGRESS_BAR = "show_progress_bar"
[docs]def get_best_worst_cost(petri_net, initial_marking, final_marking, parameters=None):
trace = log_implementation.Trace()
best_worst, cf = align(trace, petri_net, initial_marking, final_marking, parameters=parameters)
best_worst_cost = sum(cf[x] for x in best_worst['alignment']) // utils.STD_MODEL_LOG_MOVE_COST if best_worst else 0
return best_worst_cost
[docs]def apply_from_variants_list_petri_string(var_list, petri_net_string, parameters=None):
if parameters is None:
parameters = {}
from pm4py.objects.petri_net.importer.variants import pnml as petri_importer
petri_net, initial_marking, final_marking = petri_importer.import_petri_from_string(petri_net_string)
res = apply_from_variants_list(var_list, petri_net, initial_marking, final_marking, parameters=parameters)
return res
[docs]def apply_from_variants_list(var_list, petri_net, initial_marking, final_marking, parameters=None):
"""
Apply the alignments from the specification of a list of variants in the log
Parameters
-------------
var_list
List of variants (for each item, the first entry is the variant itself, the second entry may be the number of cases)
petri_net
Petri net
initial_marking
Initial marking
final_marking
Final marking
parameters
Parameters of the algorithm (same as 'apply' method, plus 'variant_delimiter' that is , by default)
Returns
--------------
dictio_alignments
Dictionary that assigns to each variant its alignment
"""
if parameters is None:
parameters = {}
log = log_implementation.EventLog()
dictio_alignments = {}
for varitem in var_list:
variant = varitem[0]
trace = variants_util.variant_to_trace(variant, parameters=parameters)
log.append(trace)
alignment = apply(log, petri_net, initial_marking, final_marking)
for index, varitem in enumerate(var_list):
variant = varitem[0]
dictio_alignments[variant] = alignment[index]
return dictio_alignments
[docs]def apply(log: EventLog, net: PetriNet, im: Marking, fm: Marking, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> typing.ListAlignments:
"""
Apply the recomposition alignment approach
to a log and a Petri net performing decomposition
Parameters
--------------
log
Event log
net
Petri net
im
Initial marking
fm
Final marking
parameters
Parameters of the algorithm
Returns
--------------
aligned_traces
For each trace, return its alignment
"""
if parameters is None:
parameters = {}
log = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters)
best_worst_cost = get_best_worst_cost(net, im, fm, parameters=parameters)
parameters[Parameters.BEST_WORST_COST] = best_worst_cost
list_nets = decomp_utils.decompose(net, im, fm)
return apply_log(log, list_nets, parameters=parameters)
[docs]def apply_log(log, list_nets, parameters=None):
"""
Apply the recomposition alignment approach
to a log and a decomposed Petri net
Parameters
--------------
log
Log
list_nets
Decomposition
parameters
Parameters of the algorithm
Returns
--------------
aligned_traces
For each trace, return its alignment
"""
if parameters is None:
parameters = {}
show_progress_bar = exec_utils.get_param_value(Parameters.SHOW_PROGRESS_BAR, parameters, True)
icache = exec_utils.get_param_value(Parameters.ICACHE, parameters, dict())
mcache = exec_utils.get_param_value(Parameters.MCACHE, parameters, dict())
parameters[Parameters.ICACHE] = icache
parameters[Parameters.MCACHE] = mcache
variants_idxs = variants_module.get_variants_from_log_trace_idx(log, parameters=parameters)
progress = None
if pkgutil.find_loader("tqdm") and show_progress_bar:
from tqdm.auto import tqdm
progress = tqdm(total=len(variants_idxs),
desc="aligning log with decomposition/recomposition, completed variants :: ")
one_tr_per_var = []
variants_list = []
for index_variant, variant in enumerate(variants_idxs):
variants_list.append(variant)
for variant in variants_list:
one_tr_per_var.append(log[variants_idxs[variant][0]])
all_alignments = []
max_align_time = exec_utils.get_param_value(Parameters.PARAM_MAX_ALIGN_TIME, parameters, sys.maxsize)
start_time = time.time()
for index, trace in enumerate(one_tr_per_var):
this_time = time.time()
if this_time - start_time <= max_align_time:
alignment = apply_trace(trace, list_nets, parameters=parameters)
else:
alignment = None
if progress is not None:
progress.update()
all_alignments.append(alignment)
al_idx = {}
for index_variant, variant in enumerate(variants_idxs):
for trace_idx in variants_idxs[variant]:
al_idx[trace_idx] = all_alignments[index_variant]
alignments = []
for i in range(len(log)):
alignments.append(al_idx[i])
# gracefully close progress bar
if progress is not None:
progress.close()
del progress
return alignments
[docs]def get_acache(cons_nets):
"""
Calculates the A-Cache of the given decomposition
Parameters
--------------
cons_nets
List of considered nets
Returns
--------------
acache
A-Cache
"""
ret = {}
for index, el in enumerate(cons_nets):
for lab in el[0].lvis_labels:
if lab not in ret:
ret[lab] = []
ret[lab].append(index)
return ret
[docs]def get_alres(al):
"""
Gets a description of the alignment for the border agreement
Parameters
--------------
al
Alignment
Returns
--------------
alres
Description of the alignment
"""
if al is not None:
ret = {}
for index, el in enumerate(al["alignment"]):
if el[1][0] is not None and el[1][0] != ">>":
if not el[1][0] in ret:
ret[el[1][0]] = []
if el[1][1] is not None and el[1][1] != ">>":
ret[el[1][0]].append(0)
else:
ret[el[1][0]].append(1)
return ret
return None
[docs]def order_nodes_second_round(to_visit, G0):
"""
Orders the second round of nodes to visit to reconstruct the alignment
Parameters
---------------
to_visit
Node to visit
G0
Recomposition graph
Returns
---------------
to_visit
Sorted list of nodes
"""
cont_loop = True
while cont_loop:
cont_loop = False
i = 0
while i < len(to_visit):
j = i + 1
must_break = False
while j < len(to_visit):
if to_visit[j] != to_visit[i]:
edg = [e for e in G0.edges if e[0] == to_visit[j] and e[1] == to_visit[i]]
edg2 = [e for e in G0.edges if e[1] == to_visit[i] and e[0] == to_visit[j]]
if edg and not edg2:
to_visit[i], to_visit[j] = to_visit[j], to_visit[i]
must_break = True
break
j = j + 1
if must_break:
cont_loop = True
break
i = i + 1
return to_visit
[docs]def recompose_alignment(cons_nets, cons_nets_result):
"""
Alignment recomposition
Parameters
---------------
cons_nets
Decomposed Petri net elements
cons_nets_result
Result of the alignments on such elements
parameters
Parameters of the method
Returns
---------------
alignment
Recomposed alignment
"""
import networkx as nx
G0 = nx.DiGraph()
for i in range(len(cons_nets_result)):
if cons_nets_result[i] is not None:
G0.add_node(i)
for i in range(len(cons_nets_result)):
if cons_nets_result[i] is not None:
for j in range(len(cons_nets_result)):
if cons_nets_result[j] is not None:
if i != j:
if cons_nets_result[i]["alignment"][-1][1] == cons_nets_result[j]["alignment"][0][1]:
G0.add_edge(i, j)
all_available = [i for i in range(len(cons_nets_result)) if cons_nets_result[i] is not None]
to_visit = [i for i in range(len(cons_nets)) if len(list(cons_nets[i][1])) > 0]
visited = set()
overall_ali = []
count = 0
while len(to_visit) > 0:
curr = to_visit.pop(0)
output_edges = [e for e in G0.edges if e[0] == curr]
for edge in output_edges:
to_visit.append(edge[1])
if count > 0:
sind = 1
else:
sind = 0
if cons_nets_result[curr] is not None:
overall_ali = overall_ali + [x for x in cons_nets_result[curr]["alignment"][sind:]]
visited.add(curr)
count = count + 1
to_visit = [x for x in all_available if x not in visited]
to_visit = order_nodes_second_round(to_visit, G0)
added = set()
while len(to_visit) > 0:
curr = to_visit.pop(0)
if not curr in visited:
output_edges = [e for e in G0.edges if e[0] == curr]
for edge in output_edges:
to_visit.append(edge[1])
if count > 0:
sind = 1
else:
sind = 0
if cons_nets_result[curr] is not None:
for y in [x for x in cons_nets_result[curr]["alignment"][sind:]]:
if not y in added:
overall_ali.append(y)
added.add(y)
visited.add(curr)
count = count + 1
return overall_ali
[docs]def apply_trace(trace, list_nets, parameters=None):
"""
Align a trace against a decomposition
Parameters
--------------
trace
Trace
list_nets
List of Petri nets (decomposed)
parameters
Parameters of the algorithm
Returns
--------------
alignment
Alignment of the trace
"""
if parameters is None:
parameters = {}
max_align_time_trace = exec_utils.get_param_value(Parameters.PARAM_MAX_ALIGN_TIME_TRACE, parameters, sys.maxsize)
threshold_border_agreement = exec_utils.get_param_value(Parameters.PARAM_THRESHOLD_BORDER_AGREEMENT, parameters,
100000000)
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, DEFAULT_NAME_KEY)
icache = exec_utils.get_param_value(Parameters.ICACHE, parameters, dict())
mcache = exec_utils.get_param_value(Parameters.MCACHE, parameters, dict())
cons_nets = copy(list_nets)
acache = get_acache(cons_nets)
cons_nets_result = []
cons_nets_alres = []
cons_nets_costs = []
max_val_alres = 0
start_time = time.time()
i = 0
while i < len(cons_nets):
this_time = time.time()
if this_time - start_time > max_align_time_trace:
# the alignment did not termine in the provided time
return None
net, im, fm = cons_nets[i]
proj = Trace([x for x in trace if x[activity_key] in net.lvis_labels])
if len(proj) > 0:
acti = tuple(x[activity_key] for x in proj)
tup = (cons_nets[i], acti)
if tup not in icache:
al, cf = align(proj, net, im, fm, parameters=parameters)
alres = get_alres(al)
icache[tup] = (al, cf, alres)
al, cf, alres = icache[tup]
cons_nets_result.append(al)
cons_nets_alres.append(alres)
cons_nets_costs.append(cf)
if this_time - start_time > max_align_time_trace:
# the alignment did not termine in the provided time
return None
max_val_alres = max(max_val_alres, max(z for y in alres.values() for z in y) if alres else 0)
border_disagreements = 0
if max_val_alres > 0:
comp_to_merge = set()
for act in [x[activity_key] for x in trace if x[activity_key] in net.lvis_labels]:
for ind in acache[act]:
if ind >= i:
break
if cons_nets_alres[ind] is None or cons_nets_alres[ind] is None:
# the alignment did not termine in the provided time
return None
if cons_nets_alres[ind][act] != cons_nets_alres[i][act]:
for ind2 in acache[act]:
comp_to_merge.add(ind2)
if comp_to_merge:
comp_to_merge = sorted(list(comp_to_merge), reverse=True)
border_disagreements += len(comp_to_merge)
# if the number of border disagreements exceed the specified threshold
# then stop iterating on the trace
if border_disagreements > threshold_border_agreement:
return None
comp_to_merge_ids = tuple(list(cons_nets[j][0].t_tuple for j in comp_to_merge))
if comp_to_merge_ids not in mcache:
mcache[comp_to_merge_ids] = decomp_utils.merge_sublist_nets(
[cons_nets[zz] for zz in comp_to_merge])
new_comp = mcache[comp_to_merge_ids]
cons_nets.append(new_comp)
j = 0
while j < len(comp_to_merge):
z = comp_to_merge[j]
if z < i:
i = i - 1
if z <= i:
del cons_nets_result[z]
del cons_nets_alres[z]
del cons_nets_costs[z]
del cons_nets[z]
j = j + 1
acache = get_acache(cons_nets)
continue
else:
cons_nets_result.append(None)
cons_nets_alres.append(None)
cons_nets_costs.append(None)
i = i + 1
if this_time - start_time > max_align_time_trace:
# the alignment did not termine in the provided time
return None
alignment = recompose_alignment(cons_nets, cons_nets_result, )
overall_cost_dict = {}
for cf in cons_nets_costs:
if cf is not None:
for el in cf:
overall_cost_dict[el] = cf[el]
cost = 0
for el in alignment:
cost = cost + overall_cost_dict[el]
alignment = [x[1] for x in alignment]
if this_time - start_time > max_align_time_trace:
# the alignment did not termine in the provided time
return None
res = {"cost": cost, "alignment": alignment}
best_worst_cost = exec_utils.get_param_value(Parameters.BEST_WORST_COST, parameters, None)
if best_worst_cost is not None and len(trace) > 0:
cost1 = cost // utils.STD_MODEL_LOG_MOVE_COST
fitness = 1.0 - cost1 / (best_worst_cost + len(trace))
res["fitness"] = fitness
res["bwc"] = (best_worst_cost + len(trace)) * utils.STD_MODEL_LOG_MOVE_COST
return res
[docs]def align(trace, petri_net, initial_marking, final_marking, parameters=None):
if parameters is None:
parameters = {}
new_parameters = copy(parameters)
new_parameters[state_equation_a_star.Parameters.RETURN_SYNC_COST_FUNCTION] = True
new_parameters[state_equation_a_star.Parameters.PARAM_ALIGNMENT_RESULT_IS_SYNC_PROD_AWARE] = True
aligned_trace, cost_function = state_equation_a_star.apply(trace, petri_net, initial_marking, final_marking,
parameters=new_parameters)
cf = {}
for x in cost_function:
cf[((x.name[0], x.name[1]), (x.label[0], x.label[1]))] = cost_function[x]
return aligned_trace, cf