'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from copy import deepcopy
from enum import Enum
import numpy as np
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.log.util import basic_filter
from pm4py.util import exec_utils
[docs]class Parameters(Enum):
STRING_ATTRIBUTES = "string_attributes"
NUMERIC_ATTRIBUTES = "numeric_attributes"
ENABLE_MULTIPLIER = "enable_multiplier"
[docs]def diagnose_from_trans_fitness(log, trans_fitness, parameters=None):
"""
Perform root cause analysis starting from transition fitness knowledge
Parameters
-------------
log
Trace log object
trans_fitness
Transition fitness object
parameters
Possible parameters of the algorithm, including:
string_attributes -> List of string event attributes to consider
in building the decision tree
numeric_attributes -> List of numeric event attributes to consider
in building the decision tree
Returns
-----------
diagnostics
For each problematic transition:
- a decision tree comparing fit and unfit executions
- feature names
- classes
"""
from sklearn import tree
if parameters is None:
parameters = {}
diagnostics = {}
string_attributes = exec_utils.get_param_value(Parameters.STRING_ATTRIBUTES, parameters, [])
numeric_attributes = exec_utils.get_param_value(Parameters.NUMERIC_ATTRIBUTES, parameters, [])
enable_multiplier = exec_utils.get_param_value(Parameters.ENABLE_MULTIPLIER, parameters, False)
for trans in trans_fitness:
if len(trans_fitness[trans]["underfed_traces"]) > 0:
fit_cases_repr = []
underfed_cases_repr = []
for trace in log:
if trace in trans_fitness[trans]["underfed_traces"]:
underfed_cases_repr.append(trans_fitness[trans]["underfed_traces"][trace][0])
elif trace in trans_fitness[trans]["fit_traces"]:
fit_cases_repr.append(trans_fitness[trans]["fit_traces"][trace][0])
if fit_cases_repr and underfed_cases_repr:
data, feature_names = form_representation_from_dictio_couple(fit_cases_repr, underfed_cases_repr,
string_attributes, numeric_attributes,
enable_multiplier=enable_multiplier)
target = []
classes = []
if enable_multiplier:
multiplier_first = int(max(float(len(underfed_cases_repr)) / float(len(fit_cases_repr)), 1))
multiplier_second = int(max(float(len(fit_cases_repr)) / float(len(underfed_cases_repr)), 1))
else:
multiplier_first = 1
multiplier_second = 1
for j in range(multiplier_first):
for i in range(len(fit_cases_repr)):
target.append(0)
classes.append("fit")
for j in range(multiplier_second):
for i in range(len(underfed_cases_repr)):
target.append(1)
classes.append("underfed")
target = np.asarray(target)
clf = tree.DecisionTreeClassifier(max_depth=7)
clf.fit(data, target)
diagn_dict = {"clf": clf, "data": data, "feature_names": feature_names, "target": target,
"classes": classes}
diagnostics[trans] = diagn_dict
return diagnostics
[docs]def diagnose_from_notexisting_activities(log, notexisting_activities_in_model, parameters=None):
"""
Perform root cause analysis related to activities that are not present in the model
Parameters
-------------
log
Trace log object
notexisting_activities_in_model
Not existing activities in the model
parameters
Possible parameters of the algorithm, including:
string_attributes -> List of string event attributes to consider
in building the decision tree
numeric_attributes -> List of numeric event attributes to consider
in building the decision tree
Returns
-----------
diagnostics
For each problematic transition:
- a decision tree comparing fit and unfit executions
- feature names
- classes
"""
from sklearn import tree
if parameters is None:
parameters = {}
diagnostics = {}
string_attributes = exec_utils.get_param_value(Parameters.STRING_ATTRIBUTES, parameters, [])
numeric_attributes = exec_utils.get_param_value(Parameters.NUMERIC_ATTRIBUTES, parameters, [])
enable_multiplier = exec_utils.get_param_value(Parameters.ENABLE_MULTIPLIER, parameters, False)
parameters_filtering = deepcopy(parameters)
parameters_filtering["positive"] = False
values = list(notexisting_activities_in_model.keys())
filtered_log = basic_filter.filter_log_traces_attr(log, values, parameters=parameters_filtering)
for act in notexisting_activities_in_model:
fit_cases_repr = []
containing_cases_repr = []
for trace in log:
if trace in notexisting_activities_in_model[act]:
containing_cases_repr.append(notexisting_activities_in_model[act][trace])
elif trace in filtered_log:
fit_cases_repr.append(dict(trace[-1]))
if fit_cases_repr and containing_cases_repr:
data, feature_names = form_representation_from_dictio_couple(fit_cases_repr, containing_cases_repr,
string_attributes, numeric_attributes,
enable_multiplier=enable_multiplier)
target = []
classes = []
if enable_multiplier:
multiplier_first = int(max(float(len(containing_cases_repr)) / float(len(fit_cases_repr)), 1))
multiplier_second = int(max(float(len(fit_cases_repr)) / float(len(containing_cases_repr)), 1))
else:
multiplier_first = 1
multiplier_second = 1
for j in range(multiplier_first):
for i in range(len(fit_cases_repr)):
target.append(0)
classes.append("fit")
for j in range(multiplier_second):
for i in range(len(containing_cases_repr)):
target.append(1)
classes.append("containing")
target = np.asarray(target)
clf = tree.DecisionTreeClassifier(max_depth=7)
clf.fit(data, target)
diagn_dict = {"clf": clf, "data": data, "feature_names": feature_names, "target": target,
"classes": classes}
diagnostics[act] = diagn_dict
return diagnostics