'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from collections import Counter
from enum import Enum
from pm4py.util import constants, exec_utils
from pm4py.util import xes_constants as xes_util
from enum import Enum
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream
[docs]class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
WINDOW = "window"
KEEP_ONCE_PER_CASE = "keep_once_per_case"
[docs]def apply(log: Union[EventLog, EventStream], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[Tuple[str, str], int]:
return native(log, parameters=parameters)
[docs]def native(log: Union[EventLog, EventStream], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[Tuple[str, str], int]:
"""
Counts the number of directly follows occurrences, i.e. of the form <...a,b...>, in an event log.
Parameters
----------
log
Trace log
parameters
Possible parameters passed to the algorithms:
activity_key -> Attribute to use as activity
Returns
-------
dfg
DFG graph
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_util.DEFAULT_NAME_KEY)
window = exec_utils.get_param_value(Parameters.WINDOW, parameters, 1)
keep_once_per_case = exec_utils.get_param_value(Parameters.KEEP_ONCE_PER_CASE, parameters, False)
if keep_once_per_case:
dfgs = map((lambda t: set((t[i - window][activity_key], t[i][activity_key]) for i in range(window, len(t)))),
log)
else:
dfgs = map((lambda t: [(t[i - window][activity_key], t[i][activity_key]) for i in range(window, len(t))]), log)
return Counter([dfg for lista in dfgs for dfg in lista])