'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.util import exec_utils
from enum import Enum
from pm4py.util import constants, xes_constants
from pm4py.objects.conversion.log import converter
from pm4py.objects.log.obj import EventStream, Event
from pm4py.algo.discovery.correlation_mining import util as cm_util
import numpy as np
import pandas as pd
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream
import pandas as pd
[docs]class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
EXACT_TIME_MATCHING = "exact_time_matching"
INDEX_KEY = "index_key"
DEFAULT_INDEX_KEY = "@@@index"
[docs]def apply(log: Union[EventLog, EventStream, pd.DataFrame], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Dict[Tuple[str, str], int], Dict[Tuple[str, str], float]]:
"""
Apply the correlation miner to an event stream
(other types of logs are converted to that)
The approach is described in:
Pourmirza, Shaya, Remco Dijkman, and Paul Grefen. "Correlation miner: mining business process models and event
correlations without case identifiers." International Journal of Cooperative Information Systems 26.02 (2017):
1742002.
Parameters
---------------
log
Log object
parameters
Parameters of the algorithm
Returns
---------------
dfg
DFG
performance_dfg
Performance DFG (containing the estimated performance for the arcs)
"""
if parameters is None:
parameters = {}
transf_stream, activities_grouped, activities = preprocess_log(log, parameters=parameters)
PS_matrix, duration_matrix = get_PS_dur_matrix(activities_grouped, activities, parameters=parameters)
activities_counter = {x: len(y) for x, y in activities_grouped.items()}
return resolve_lp_get_dfg(PS_matrix, duration_matrix, activities, activities_counter)
[docs]def resolve_lp_get_dfg(PS_matrix, duration_matrix, activities, activities_counter):
"""
Resolves a LP problem to get a DFG
Parameters
--------------
PS_matrix
Precede-succeed matrix
duration_matrix
Duration matrix
activities
List of activities of the log
activities_counter
Counter of the activities
Returns
--------------
dfg
DFG
performance_dfg
Performance DFG (containing the estimated performance for the arcs)
"""
C_matrix = cm_util.get_c_matrix(PS_matrix, duration_matrix, activities, activities_counter)
dfg, performance_dfg = cm_util.resolve_LP(C_matrix, duration_matrix, activities, activities_counter)
return dfg, performance_dfg
[docs]def get_PS_dur_matrix(activities_grouped, activities, parameters=None):
"""
Combined methods to get the two matrixes
Parameters
----------------
activities_grouped
Grouped activities
activities
List of activities of the log
parameters
Parameters of the algorithm
Returns
---------------
PS_matrix
Precede-succeed matrix
duration_matrix
Duration matrix
"""
if parameters is None:
parameters = {}
timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
exact_time_matching = exec_utils.get_param_value(Parameters.EXACT_TIME_MATCHING, parameters, False)
PS_matrix = get_precede_succeed_matrix(activities, activities_grouped, timestamp_key, start_timestamp_key)
duration_matrix = get_duration_matrix(activities, activities_grouped, timestamp_key, start_timestamp_key,
exact=exact_time_matching)
return PS_matrix, duration_matrix
[docs]def preprocess_log(log, activities=None, parameters=None):
"""
Preprocess a log to enable correlation mining
Parameters
--------------
log
Log object
activities
(if provided) list of activities of the log
parameters
Parameters of the algorithm
Returns
--------------
transf_stream
Transformed stream
activities_grouped
Grouped activities
activities
List of activities of the log
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)
timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
index_key = exec_utils.get_param_value(Parameters.INDEX_KEY, parameters, DEFAULT_INDEX_KEY)
if type(log) is pd.DataFrame:
# keep only the two columns before conversion
log = log[list(set([activity_key, timestamp_key, start_timestamp_key]))]
parameters["deepcopy"] = False
parameters["include_case_attributes"] = False
log = converter.apply(log, variant=converter.TO_EVENT_STREAM, parameters=parameters)
transf_stream = EventStream()
for idx, ev in enumerate(log):
transf_stream.append(
Event({activity_key: ev[activity_key], timestamp_key: ev[timestamp_key].timestamp(),
start_timestamp_key: ev[start_timestamp_key].timestamp(), index_key: idx}))
transf_stream = sorted(transf_stream, key=lambda x: (x[start_timestamp_key], x[timestamp_key], x[index_key]))
if activities is None:
activities = sorted(list(set(x[activity_key] for x in transf_stream)))
activities_grouped = {x: [y for y in transf_stream if y[activity_key] == x] for x in activities}
return transf_stream, activities_grouped, activities
[docs]def get_precede_succeed_matrix(activities, activities_grouped, timestamp_key, start_timestamp_key):
"""
Calculates the precede succeed matrix
Parameters
---------------
activities
Ordered list of activities of the log
activities_grouped
Grouped list of activities
timestamp_key
Timestamp key
start_timestamp_key
Start timestamp key (events start)
Returns
---------------
precede_succeed_matrix
Precede succeed matrix
"""
ret = np.zeros((len(activities), len(activities)))
for i in range(len(activities)):
ai = [x[timestamp_key] for x in activities_grouped[activities[i]]]
if ai:
for j in range(len(activities)):
if not i == j:
aj = [x[start_timestamp_key] for x in activities_grouped[activities[j]]]
if aj:
k = 0
z = 0
count = 0
while k < len(ai):
while z < len(aj):
if ai[k] < aj[z]:
break
z = z + 1
count = count + (len(aj) - z)
k = k + 1
ret[i, j] = count / float(len(ai) * len(aj))
return ret
[docs]def get_duration_matrix(activities, activities_grouped, timestamp_key, start_timestamp_key, exact=False):
"""
Calculates the duration matrix
Parameters
---------------
activities
Ordered list of activities of the log
activities_grouped
Grouped list of activities
timestamp_key
Timestamp key
start_timestamp_key
Start timestamp key (events start)
exact
Performs an exact matching of the times (True/False)
Returns
---------------
duration_matrix
Duration matrix
"""
# greedy algorithm
ret = np.zeros((len(activities), len(activities)))
for i in range(len(activities)):
ai = [x[timestamp_key] for x in activities_grouped[activities[i]]]
if ai:
for j in range(len(activities)):
if not i == j:
aj = [x[start_timestamp_key] for x in activities_grouped[activities[j]]]
if aj:
ret[i, j] = cm_util.match_return_avg_time(ai, aj, exact=exact)
return ret