Source code for pm4py.algo.conformance.alignments.petri_net.algorithm

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from copy import copy

from pm4py.algo.conformance.alignments.petri_net import variants
from pm4py.objects.petri_net.utils import align_utils, check_soundness
from pm4py.statistics.variants.log import get as variants_module
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.util.xes_constants import DEFAULT_NAME_KEY, DEFAULT_TRACEID_KEY
from pm4py.objects.log.obj import Trace
import time
from pm4py.util.lp import solver
from pm4py.util import exec_utils
from enum import Enum
import sys
from pm4py.util.constants import PARAMETER_CONSTANT_ACTIVITY_KEY, PARAMETER_CONSTANT_CASEID_KEY
import pkgutil
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream, Trace
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import typing
import pandas as pd


[docs]class Variants(Enum): VERSION_STATE_EQUATION_A_STAR = variants.state_equation_a_star VERSION_TWEAKED_STATE_EQUATION_A_STAR = variants.tweaked_state_equation_a_star VERSION_DIJKSTRA_NO_HEURISTICS = variants.dijkstra_no_heuristics VERSION_DIJKSTRA_LESS_MEMORY = variants.dijkstra_less_memory VERSION_DISCOUNTED_A_STAR = variants.discounted_a_star
[docs]class Parameters(Enum): PARAM_TRACE_COST_FUNCTION = 'trace_cost_function' PARAM_MODEL_COST_FUNCTION = 'model_cost_function' PARAM_SYNC_COST_FUNCTION = 'sync_cost_function' PARAM_ALIGNMENT_RESULT_IS_SYNC_PROD_AWARE = 'ret_tuple_as_trans_desc' PARAM_TRACE_NET_COSTS = "trace_net_costs" TRACE_NET_CONSTR_FUNCTION = "trace_net_constr_function" TRACE_NET_COST_AWARE_CONSTR_FUNCTION = "trace_net_cost_aware_constr_function" PARAM_MAX_ALIGN_TIME_TRACE = "max_align_time_trace" PARAM_MAX_ALIGN_TIME = "max_align_time" PARAMETER_VARIANT_DELIMITER = "variant_delimiter" CASE_ID_KEY = PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = PARAMETER_CONSTANT_ACTIVITY_KEY VARIANTS_IDX = "variants_idx" SHOW_PROGRESS_BAR = "show_progress_bar" CORES = 'cores' BEST_WORST_COST_INTERNAL = "best_worst_cost_internal" FITNESS_ROUND_DIGITS = "fitness_round_digits" SYNCHRONOUS = "synchronous_dijkstra" EXPONENT="theta"
DEFAULT_VARIANT = Variants.VERSION_DIJKSTRA_LESS_MEMORY if solver.DEFAULT_LP_SOLVER_VARIANT is not None: DEFAULT_VARIANT = Variants.VERSION_STATE_EQUATION_A_STAR VERSION_STATE_EQUATION_A_STAR = Variants.VERSION_STATE_EQUATION_A_STAR VERSION_DIJKSTRA_NO_HEURISTICS = Variants.VERSION_DIJKSTRA_NO_HEURISTICS VERSION_DIJKSTRA_LESS_MEMORY = Variants.VERSION_DIJKSTRA_LESS_MEMORY VERSION_DISCOUNTED_A_STAR = Variants.VERSION_DISCOUNTED_A_STAR VERSIONS = {Variants.VERSION_DIJKSTRA_NO_HEURISTICS, Variants.VERSION_DIJKSTRA_NO_HEURISTICS, Variants.VERSION_DIJKSTRA_LESS_MEMORY,VERSION_DISCOUNTED_A_STAR}
[docs]def apply(obj: Union[EventLog, EventStream, pd.DataFrame, Trace], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, parameters: Optional[Dict[Any, Any]] = None, variant=DEFAULT_VARIANT) -> Union[typing.AlignmentResult, typing.ListAlignments]: if parameters is None: parameters = {} if isinstance(obj, Trace): return apply_trace(obj, petri_net, initial_marking, final_marking, parameters=parameters, variant=variant) else: return apply_log(log_converter.apply(obj, parameters, log_converter.TO_EVENT_LOG), petri_net, initial_marking, final_marking, parameters=parameters, variant=variant)
[docs]def apply_trace(trace, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT): """ apply alignments to a trace Parameters ----------- trace :class:`pm4py.log.log.Trace` trace of events petri_net :class:`pm4py.objects.petri.petrinet.PetriNet` the model to use for the alignment initial_marking :class:`pm4py.objects.petri.petrinet.Marking` initial marking of the net final_marking :class:`pm4py.objects.petri.petrinet.Marking` final marking of the net variant selected variant of the algorithm, possible values: {\'Variants.VERSION_STATE_EQUATION_A_STAR, Variants.VERSION_DIJKSTRA_NO_HEURISTICS \'} parameters :class:`dict` parameters of the algorithm, for key \'state_equation_a_star\': Parameters.ACTIVITY_KEY -> Attribute in the log that contains the activity Parameters.PARAM_MODEL_COST_FUNCTION -> mapping of each transition in the model to corresponding synchronous costs Parameters.PARAM_SYNC_COST_FUNCTION -> mapping of each transition in the model to corresponding model cost Parameters.PARAM_TRACE_COST_FUNCTION -> mapping of each index of the trace to a positive cost value Returns ----------- alignment :class:`dict` with keys **alignment**, **cost**, **visited_states**, **queued_states** and **traversed_arcs** The alignment is a sequence of labels of the form (a,t), (a,>>), or (>>,t) representing synchronous/log/model-moves. """ if parameters is None: parameters = copy({PARAMETER_CONSTANT_ACTIVITY_KEY: DEFAULT_NAME_KEY}) parameters = copy(parameters) best_worst_cost = exec_utils.get_param_value(Parameters.BEST_WORST_COST_INTERNAL, parameters, __get_best_worst_cost(petri_net, initial_marking, final_marking, variant, parameters)) ali = exec_utils.get_variant(variant).apply(trace, petri_net, initial_marking, final_marking, parameters=parameters) trace_cost_function = exec_utils.get_param_value(Parameters.PARAM_TRACE_COST_FUNCTION, parameters, []) # Instead of using the length of the trace, use the sum of the trace cost function trace_cost_function_sum = sum(trace_cost_function) if ali is not None and best_worst_cost is not None: ltrace_bwc = trace_cost_function_sum + best_worst_cost fitness_num = ali['cost'] // align_utils.STD_MODEL_LOG_MOVE_COST fitness_den = ltrace_bwc // align_utils.STD_MODEL_LOG_MOVE_COST fitness = 1 - fitness_num / fitness_den if fitness_den > 0 else 0 # other possibility: avoid integer division but proceed to rounding. # could lead to small differences with respect to the adopted-since-now fitness # (since it is rounded) """ initial_trace_cost_function = exec_utils.get_param_value(Parameters.PARAM_TRACE_COST_FUNCTION, parameters, None) initial_model_cost_function = exec_utils.get_param_value(Parameters.PARAM_MODEL_COST_FUNCTION, parameters, None) initial_sync_cost_function = exec_utils.get_param_value(Parameters.PARAM_SYNC_COST_FUNCTION, parameters, None) uses_standard_cost_function = initial_trace_cost_function is None and initial_model_cost_function is None and \ initial_sync_cost_function is None fitness = 1 - ali['cost'] / ltrace_bwc if ltrace_bwc > 0 else 0 fitness_round_digits = exec_utils.get_param_value(Parameters.FITNESS_ROUND_DIGITS, parameters, 3) fitness = round(fitness, fitness_round_digits) """ ali["fitness"] = fitness # returning also the best worst cost, for log fitness computation ali["bwc"] = ltrace_bwc return ali
[docs]def apply_log(log, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT): """ apply alignments to a log Parameters ----------- log object of the form :class:`pm4py.log.log.EventLog` event log petri_net :class:`pm4py.objects.petri.petrinet.PetriNet` the model to use for the alignment initial_marking :class:`pm4py.objects.petri.petrinet.Marking` initial marking of the net final_marking :class:`pm4py.objects.petri.petrinet.Marking` final marking of the net variant selected variant of the algorithm, possible values: {\'Variants.VERSION_STATE_EQUATION_A_STAR, Variants.VERSION_DIJKSTRA_NO_HEURISTICS \'} parameters :class:`dict` parameters of the algorithm, Returns ----------- alignment :class:`list` of :class:`dict` with keys **alignment**, **cost**, **visited_states**, **queued_states** and **traversed_arcs** The alignment is a sequence of labels of the form (a,t), (a,>>), or (>>,t) representing synchronous/log/model-moves. """ if parameters is None: parameters = dict() if solver.DEFAULT_LP_SOLVER_VARIANT is not None: if not check_soundness.check_easy_soundness_net_in_fin_marking(petri_net, initial_marking, final_marking): raise Exception("trying to apply alignments on a Petri net that is not a easy sound net!!") start_time = time.time() max_align_time = exec_utils.get_param_value(Parameters.PARAM_MAX_ALIGN_TIME, parameters, sys.maxsize) max_align_time_case = exec_utils.get_param_value(Parameters.PARAM_MAX_ALIGN_TIME_TRACE, parameters, sys.maxsize) best_worst_cost = __get_best_worst_cost(petri_net, initial_marking, final_marking, variant, parameters) variants_idxs, one_tr_per_var = __get_variants_structure(log, parameters) progress = __get_progress_bar(len(one_tr_per_var), parameters) parameters[Parameters.BEST_WORST_COST_INTERNAL] = best_worst_cost all_alignments = [] for trace in one_tr_per_var: this_max_align_time = min(max_align_time_case, (max_align_time - (time.time() - start_time)) * 0.5) parameters[Parameters.PARAM_MAX_ALIGN_TIME_TRACE] = this_max_align_time all_alignments.append(apply_trace(trace, petri_net, initial_marking, final_marking, parameters=copy(parameters), variant=variant)) if progress is not None: progress.update() alignments = __form_alignments(log, variants_idxs, all_alignments) __close_progress_bar(progress) return alignments
[docs]def apply_multiprocessing(log, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT): """ Applies the alignments using a process pool (multiprocessing) Parameters --------------- log Event log petri_net Petri net initial_marking Initial marking final_marking Final marking parameters Parameters of the algorithm Returns ---------------- aligned_traces Alignments """ if parameters is None: parameters = {} import multiprocessing num_cores = exec_utils.get_param_value(Parameters.CORES, parameters, multiprocessing.cpu_count() - 2) best_worst_cost = __get_best_worst_cost(petri_net, initial_marking, final_marking, variant, parameters) variants_idxs, one_tr_per_var = __get_variants_structure(log, parameters) parameters[Parameters.BEST_WORST_COST_INTERNAL] = best_worst_cost all_alignments = [] from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor(max_workers=num_cores) as executor: futures = [] for trace in one_tr_per_var: futures.append(executor.submit(apply_trace, trace, petri_net, initial_marking, final_marking, parameters)) progress = __get_progress_bar(len(one_tr_per_var), parameters) if progress is not None: alignments_ready = 0 while alignments_ready != len(futures): current = 0 for index, variant in enumerate(futures): current = current + 1 if futures[index].done() else current if current > alignments_ready: for i in range(0, current - alignments_ready): progress.update() alignments_ready = current for index, variant in enumerate(futures): all_alignments.append(futures[index].result()) __close_progress_bar(progress) alignments = __form_alignments(log, variants_idxs, all_alignments) return alignments
def __get_best_worst_cost(petri_net, initial_marking, final_marking, variant, parameters): parameters_best_worst = copy(parameters) best_worst_cost = exec_utils.get_variant(variant).get_best_worst_cost(petri_net, initial_marking, final_marking, parameters=parameters_best_worst) return best_worst_cost def __get_variants_structure(log, parameters): variants_idxs = exec_utils.get_param_value(Parameters.VARIANTS_IDX, parameters, None) if variants_idxs is None: variants_idxs = variants_module.get_variants_from_log_trace_idx(log, parameters=parameters) one_tr_per_var = [] variants_list = [] for index_variant, var in enumerate(variants_idxs): variants_list.append(var) for var in variants_list: one_tr_per_var.append(log[variants_idxs[var][0]]) return variants_idxs, one_tr_per_var def __get_progress_bar(num_variants, parameters): show_progress_bar = exec_utils.get_param_value(Parameters.SHOW_PROGRESS_BAR, parameters, True) progress = None if pkgutil.find_loader("tqdm") and show_progress_bar and num_variants > 1: from tqdm.auto import tqdm progress = tqdm(total=num_variants, desc="aligning log, completed variants :: ") return progress def __form_alignments(log, variants_idxs, all_alignments): al_idx = {} for index_variant, variant in enumerate(variants_idxs): for trace_idx in variants_idxs[variant]: al_idx[trace_idx] = all_alignments[index_variant] alignments = [] for i in range(len(log)): alignments.append(al_idx[i]) return alignments def __close_progress_bar(progress): if progress is not None: progress.close() del progress
[docs]def get_diagnostics_dataframe(log, align_output, parameters=None): """ Gets the diagnostics results of alignments (of a log) in a dataframe Parameters -------------- log Event log align_output Output of the alignments Returns -------------- dataframe Diagnostics dataframe """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, DEFAULT_TRACEID_KEY) import pandas as pd diagn_stream = [] for index in range(len(log)): case_id = log[index].attributes[case_id_key] cost = align_output[index]["cost"] fitness = align_output[index]["fitness"] is_fit = fitness == 1.0 diagn_stream.append({"case_id": case_id, "cost": cost, "fitness": fitness, "is_fit": is_fit}) return pd.DataFrame(diagn_stream)