Source code for pm4py.algo.discovery.log_skeleton.variants.classic

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from collections import Counter
from enum import Enum

from pm4py.algo.discovery.log_skeleton import trace_skel
from pm4py.objects.log.obj import EventLog
from pm4py.objects.log.util import xes
from pm4py.util import exec_utils
from pm4py.util import variants_util
from pm4py.util.constants import PARAMETER_CONSTANT_ACTIVITY_KEY, PARAMETER_CONSTANT_CASEID_KEY
from typing import Optional, Dict, Any, Union, Tuple, List
from pm4py.objects.log.obj import EventLog, EventStream
import pandas as pd


[docs]class Parameters(Enum): # parameter for the noise threshold NOISE_THRESHOLD = "noise_threshold" # considered constraints in conformance checking among: equivalence, always_after, always_before, never_together, directly_follows, activ_freq CONSIDERED_CONSTRAINTS = "considered_constraints" # default choice for conformance checking DEFAULT_CONSIDERED_CONSTRAINTS = ["equivalence", "always_after", "always_before", "never_together", "directly_follows", "activ_freq"] CASE_ID_KEY = PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = PARAMETER_CONSTANT_ACTIVITY_KEY PARAMETER_VARIANT_DELIMITER = "variant_delimiter"
NOISE_THRESHOLD = Parameters.NOISE_THRESHOLD CONSIDERED_CONSTRAINTS = Parameters.CONSIDERED_CONSTRAINTS DEFAULT_CONSIDERED_CONSTRAINTS = Parameters.DEFAULT_CONSIDERED_CONSTRAINTS ACTIVITY_KEY = Parameters.ACTIVITY_KEY PARAMETER_VARIANT_DELIMITER = Parameters.PARAMETER_VARIANT_DELIMITER
[docs]class Outputs(Enum): EQUIVALENCE = "equivalence" ALWAYS_AFTER = "always_after" ALWAYS_BEFORE = "always_before" NEVER_TOGETHER = "never_together" DIRECTLY_FOLLOWS = "directly_follows" ACTIV_FREQ = "activ_freq"
[docs]def equivalence(logs_traces, all_activs, noise_threshold=0): """ Gets the equivalence relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.equivalence(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set(x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold)) return ret
[docs]def always_after(logs_traces, all_activs, noise_threshold=0): """ Gets the always-after relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.after(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set(x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold)) return ret
[docs]def always_before(logs_traces, all_activs, noise_threshold=0): """ Gets the always-before relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.before(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set(x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold)) return ret
[docs]def never_together(logs_traces, all_activs, len_log, noise_threshold=0): """ Gets the never-together relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities len_log Length of the log noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ all_combos = set((x, y) for x in all_activs for y in all_activs if x != y) ret0 = Counter() for k in all_combos: ret0[k] = all_activs[k[0]] for trace in logs_traces: rs = Counter(trace_skel.combos(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 -= rs ret = set(x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold)) return ret
[docs]def directly_follows(logs_traces, all_activs, noise_threshold=0): """ Gets the allowed directly-follows relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.directly_follows(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set(x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold)) return ret
[docs]def activ_freq(logs_traces, all_activs, len_log, noise_threshold=0): """ Gets the allowed activities frequencies given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities len_log Length of the log noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = {} ret = {} for trace in logs_traces: rs = trace_skel.activ_freq(trace) for act in all_activs: if act not in rs: rs[act] = 0 for act in rs: if act not in ret0: ret0[act] = Counter() ret0[act][rs[act]] += logs_traces[trace] for act in ret0: ret0[act] = sorted(list((x, y) for x, y in ret0[act].items()), key=lambda x: x[1], reverse=True) added = 0 i = 0 while i < len(ret0[act]): added += ret0[act][i][1] if added >= (1.0 - noise_threshold) * len_log: ret0[act] = ret0[act][:min(i + 1, len(ret0[act]))] i = i + 1 ret[act] = set(x[0] for x in ret0[act]) return ret
[docs]def apply(log: Union[EventLog, EventStream, pd.DataFrame], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[str, Any]: """ Discover a log skeleton from an event log Parameters ------------- log Event log parameters Parameters of the algorithm, including: - the activity key (Parameters.ACTIVITY_KEY) - the noise threshold (Parameters.NOISE_THRESHOLD) Returns ------------- model Log skeleton model """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY) noise_threshold = exec_utils.get_param_value(Parameters.NOISE_THRESHOLD, parameters, 0.0) logs_traces = Counter([tuple(y[activity_key] for y in x) for x in log]) all_activs = Counter(list(y[activity_key] for x in log for y in x)) ret = {} ret[Outputs.EQUIVALENCE.value] = equivalence(logs_traces, all_activs, noise_threshold=noise_threshold) ret[Outputs.ALWAYS_AFTER.value] = always_after(logs_traces, all_activs, noise_threshold=noise_threshold) ret[Outputs.ALWAYS_BEFORE.value] = always_before(logs_traces, all_activs, noise_threshold=noise_threshold) ret[Outputs.NEVER_TOGETHER.value] = never_together(logs_traces, all_activs, len(log), noise_threshold=noise_threshold) ret[Outputs.DIRECTLY_FOLLOWS.value] = directly_follows(logs_traces, all_activs, noise_threshold=noise_threshold) ret[Outputs.ACTIV_FREQ.value] = activ_freq(logs_traces, all_activs, len(log), noise_threshold=noise_threshold) return ret
[docs]def apply_from_variants_list(var_list, parameters=None): """ Discovers the log skeleton from the variants list Parameters --------------- var_list Variants list parameters Parameters Returns --------------- model Log skeleton model """ if parameters is None: parameters = {} log = EventLog() for cv in var_list: v = cv[0] trace = variants_util.variant_to_trace(v, parameters=parameters) log.append(trace) return apply(log, parameters=parameters)
[docs]def prepare_encode(log_skeleton): """ Prepares the log skeleton for encoding Parameters -------------- log_skeleton Log skeleton Returns -------------- log_skeleton Log skeleton (with lists instead of sets) """ log_skeleton[Outputs.EQUIVALENCE.value] = list(log_skeleton[Outputs.EQUIVALENCE.value]) log_skeleton[Outputs.ALWAYS_AFTER.value] = list(log_skeleton[Outputs.ALWAYS_AFTER.value]) log_skeleton[Outputs.ALWAYS_BEFORE.value] = list(log_skeleton[Outputs.ALWAYS_BEFORE.value]) log_skeleton[Outputs.NEVER_TOGETHER.value] = list(log_skeleton[Outputs.NEVER_TOGETHER.value]) log_skeleton[Outputs.DIRECTLY_FOLLOWS.value] = list(log_skeleton[Outputs.DIRECTLY_FOLLOWS.value]) for act in log_skeleton[Outputs.ACTIV_FREQ.value]: log_skeleton[Outputs.ACTIV_FREQ.value][act] = list(log_skeleton[Outputs.ACTIV_FREQ.value][act]) return log_skeleton