'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
import warnings
from typing import List, Union, Set, List, Tuple, Collection, Any, Dict
import deprecation
import pandas as pd
from pm4py.meta import VERSION as PM4PY_CURRENT_VERSION
from pm4py.objects.log.obj import EventLog, EventStream
from pm4py.util import constants, xes_constants
from pm4py.util.pandas_utils import check_is_pandas_dataframe, check_pandas_dataframe_columns
from pm4py.utils import get_properties
from pm4py.objects.ocel.obj import OCEL
import datetime
[docs]def filter_log_relative_occurrence_event_attribute(log: Union[EventLog, pd.DataFrame], min_relative_stake: float, attribute_key : str = xes_constants.DEFAULT_NAME_KEY, level="cases") -> Union[EventLog, pd.DataFrame]:
"""
Filters the event log keeping only the events having an attribute value which occurs:
- in at least the specified (min_relative_stake) percentage of events, when level="events"
- in at least the specified (min_relative_stake) percentage of cases, when level="cases"
Parameters
-------------------
log
Event log / Pandas dataframe
min_relative_stake
Minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.
attribute_key
The attribute to filter
level
The level of the filter (if level="events", then events / if level="cases", then cases)
Returns
------------------
filtered_log
Filtered event log
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
parameters[attributes_filter.Parameters.ATTRIBUTE_KEY] = attribute_key
parameters[attributes_filter.Parameters.KEEP_ONCE_PER_CASE] = True if level == "cases" else False
return attributes_filter.filter_df_relative_occurrence_event_attribute(log, min_relative_stake, parameters=parameters)
else:
from pm4py.algo.filtering.log.attributes import attributes_filter
parameters[attributes_filter.Parameters.ATTRIBUTE_KEY] = attribute_key
parameters[attributes_filter.Parameters.KEEP_ONCE_PER_CASE] = True if level == "cases" else False
return attributes_filter.filter_log_relative_occurrence_event_attribute(log, min_relative_stake, parameters=parameters)
[docs]def filter_start_activities(log: Union[EventLog, pd.DataFrame], activities: Union[Set[str], List[str]], retain: bool = True) -> \
Union[EventLog, pd.DataFrame]:
"""
Filter cases having a start activity in the provided list
Parameters
--------------
log
Log object
activities
List start activities
retain
if True, we retain the traces containing the given activities, if false, we drop the traces
Returns
--------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
parameters[start_activities_filter.Parameters.POSITIVE] = retain
return start_activities_filter.apply(log, activities,
parameters=parameters)
else:
from pm4py.algo.filtering.log.start_activities import start_activities_filter
parameters[start_activities_filter.Parameters.POSITIVE] = retain
return start_activities_filter.apply(log, activities,
parameters=parameters)
[docs]def filter_end_activities(log: Union[EventLog, pd.DataFrame], activities: Union[Set[str], List[str]], retain: bool = True) -> Union[
EventLog, pd.DataFrame]:
"""
Filter cases having an end activity in the provided list
Parameters
---------------
log
Log object
activities
List of admitted end activities
retain
if True, we retain the traces containing the given activities, if false, we drop the traces
Returns
---------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.end_activities import end_activities_filter
parameters[end_activities_filter.Parameters.POSITIVE] = retain
return end_activities_filter.apply(log, activities,
parameters=parameters)
else:
from pm4py.algo.filtering.log.end_activities import end_activities_filter
parameters[end_activities_filter.Parameters.POSITIVE] = retain
return end_activities_filter.apply(log, activities,
parameters=parameters)
[docs]@deprecation.deprecated(deprecated_in='2.1.4', removed_in='2.4.0', current_version=PM4PY_CURRENT_VERSION,
details='Filtering method will be removed due to fuzzy naming.\
Use: filter_event_attribute_values')
def filter_attribute_values(log, attribute_key, values, level="case", retain=True):
return filter_event_attribute_values(log, attribute_key, values, level=level, retain=retain)
[docs]def filter_event_attribute_values(log: Union[EventLog, pd.DataFrame], attribute_key: str, values: Union[Set[str], List[str]],
level: str = "case", retain: bool = True) -> Union[EventLog, pd.DataFrame]:
"""
Filter a log object on the values of some event attribute
Parameters
--------------
log
Log object
attribute_key
Attribute to filter
values
Admitted (or forbidden) values
level
Specifies how the filter should be applied ('case' filters the cases where at least one occurrence happens,
'event' filter the events eventually trimming the cases)
retain
Specified if the values should be kept or removed
Returns
--------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
parameters[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = attribute_key
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
if level == "event":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_events(log, values,
parameters=parameters)
elif level == "case":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values, parameters=parameters)
else:
from pm4py.algo.filtering.log.attributes import attributes_filter
if level == "event":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_events(log, values,
parameters=parameters)
elif level == "case":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values, parameters=parameters)
[docs]@deprecation.deprecated(deprecated_in='2.1.4', removed_in='2.4.0', current_version=PM4PY_CURRENT_VERSION,
details='Filtering method will be removed due to fuzzy naming.\
Use: filter_event_attribute_values')
def filter_trace_attribute(log, attribute_key, values, retain=True):
return filter_trace_attribute_values(log, attribute_key, values, retain=retain)
[docs]def filter_trace_attribute_values(log: Union[EventLog, pd.DataFrame], attribute_key: str, values: Union[Set[str], List[str]],
retain: bool = True) -> Union[EventLog, pd.DataFrame]:
"""
Filter a log on the values of a trace attribute
Parameters
--------------
log
Event log
attribute_key
Attribute to filter
values
Values to filter (list of)
retain
Boolean value (keep/discard matching traces)
Returns
--------------
filtered_log
Filtered event log
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
parameters[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = attribute_key
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values,
parameters=parameters)
else:
from pm4py.algo.filtering.log.attributes import attributes_filter
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_trace_attribute(log, values, parameters=parameters)
[docs]def filter_variants(log: Union[EventLog, pd.DataFrame], variants: Union[Set[str], List[str]], retain: bool = True) -> Union[
EventLog, pd.DataFrame]:
"""
Filter a log on a specified set of variants
Parameters
---------------
log
Event log
variants
collection of variants to filter; A variant should be specified as a list of activity names, e.g., ['a','b','c']
retain
boolean; if True all traces conforming to the specified variants are retained; if False, all those traces are removed
Returns
--------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
from pm4py.util import variants_util
parameters = get_properties(log)
if variants_util.VARIANT_SPECIFICATION == variants_util.VariantsSpecifications.STRING:
variants = [constants.DEFAULT_VARIANT_SEP.join(v) for v in variants]
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.variants import variants_filter
parameters[variants_filter.Parameters.POSITIVE] = retain
return variants_filter.apply(log, variants,
parameters=parameters)
else:
from pm4py.algo.filtering.log.variants import variants_filter
parameters[variants_filter.Parameters.POSITIVE] = retain
return variants_filter.apply(log, variants,
parameters=parameters)
[docs]@deprecation.deprecated(deprecated_in='2.1.3.1', removed_in='2.4.0', current_version=PM4PY_CURRENT_VERSION,
details='Filtering method will be removed due to fuzzy interpretation of the threshold.\
Will be replaced with two new functions filter_variants_top_k and filter_variants_relative_frequency')
def filter_variants_percentage(log: Union[EventLog, pd.DataFrame], threshold: float = 0.8) -> Union[
EventLog, pd.DataFrame]:
"""
Filter a log on the percentage of variants
Parameters
---------------
log
Event log
threshold
Percentage (scale 0.1) of admitted variants
Returns
--------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
if check_is_pandas_dataframe(log):
raise Exception(
"filtering variants percentage on Pandas dataframe is currently not available! please convert the dataframe to event log with the method: log = pm4py.convert_to_event_log(df)")
else:
from pm4py.algo.filtering.log.variants import variants_filter
return variants_filter.filter_log_variants_percentage(log, percentage=threshold, parameters=get_properties(log))
[docs]@deprecation.deprecated(deprecated_in='2.1.3.1', removed_in='2.4.0', current_version=PM4PY_CURRENT_VERSION,
details='Use filter_directly_follows_relation')
def filter_paths(log, allowed_paths, retain=True):
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
return filter_directly_follows_relation(log, allowed_paths, retain)
[docs]def filter_directly_follows_relation(log: Union[EventLog, pd.DataFrame], relations: List[str], retain: bool = True) -> \
Union[EventLog, pd.DataFrame]:
"""
Retain traces that contain any of the specified 'directly follows' relations.
For example, if relations == [('a','b'),('a','c')] and log [<a,b,c>,<a,c,b>,<a,d,b>]
the resulting log will contain traces describing [<a,b,c>,<a,c,b>].
Parameters
---------------
log
Log object
relations
List of activity name pairs, which are allowed/forbidden paths
retain
Parameter that says whether the paths
should be kept/removed
Returns
----------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.paths import paths_filter
parameters[paths_filter.Parameters.POSITIVE] = retain
return paths_filter.apply(log, relations, parameters=parameters)
else:
from pm4py.algo.filtering.log.paths import paths_filter
parameters[paths_filter.Parameters.POSITIVE] = retain
return paths_filter.apply(log, relations, parameters=parameters)
[docs]def filter_eventually_follows_relation(log: Union[EventLog, pd.DataFrame], relations: List[str], retain: bool = True) -> \
Union[EventLog, pd.DataFrame]:
"""
Retain traces that contain any of the specified 'eventually follows' relations.
For example, if relations == [('a','b'),('a','c')] and log [<a,b,c>,<a,c,b>,<a,d,b>]
the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].
Parameters
---------------
log
Log object
relations
List of activity name pairs, which are allowed/forbidden paths
retain
Parameter that says whether the paths
should be kept/removed
Returns
----------------
filtered_log
Filtered log object
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.ltl import ltl_checker
parameters[ltl_checker.Parameters.POSITIVE] = retain
if retain:
cases = set()
else:
cases = set(log[constants.CASE_CONCEPT_NAME])
for path in relations:
filt_log = ltl_checker.eventually_follows(log, path,
parameters=parameters)
this_traces = set(filt_log[constants.CASE_CONCEPT_NAME])
if retain:
cases = cases.union(this_traces)
else:
cases = cases.intersection(this_traces)
return log[log[constants.CASE_CONCEPT_NAME].isin(cases)]
else:
from pm4py.algo.filtering.log.ltl import ltl_checker
parameters[ltl_checker.Parameters.POSITIVE] = retain
if retain:
cases = set()
else:
cases = set(id(trace) for trace in log)
for path in relations:
filt_log = ltl_checker.eventually_follows(log, path,
parameters=parameters)
this_traces = set(id(trace) for trace in filt_log)
if retain:
cases = cases.union(this_traces)
else:
cases = cases.intersection(this_traces)
filtered_log = EventLog(attributes=log.attributes, extensions=log.extensions, omni_present=log.omni_present,
classifiers=log.classifiers, properties=log.properties)
for trace in log:
if id(trace) in cases:
filtered_log.append(trace)
return filtered_log
[docs]def filter_time_range(log: Union[EventLog, pd.DataFrame], dt1: str, dt2: str, mode="events") -> Union[
EventLog, pd.DataFrame]:
"""
Filter a log on a time interval
Parameters
----------------
log
Log object
dt1
Left extreme of the interval
dt2
Right extreme of the interval
mode
Modality of filtering (events, traces_contained, traces_intersecting)
events: any event that fits the time frame is retained
traces_contained: any trace completely contained in the timeframe is retained
traces_intersecting: any trace intersecting with the time-frame is retained.
Returns
----------------
filtered_log
Filtered log
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.timestamp import timestamp_filter
if mode == "events":
return timestamp_filter.apply_events(log, dt1, dt2, parameters=get_properties(log))
elif mode == "traces_contained":
return timestamp_filter.filter_traces_contained(log, dt1, dt2, parameters=get_properties(log))
elif mode == "traces_intersecting":
return timestamp_filter.filter_traces_intersecting(log, dt1, dt2, parameters=get_properties(log))
else:
warnings.warn('mode provided: ' + mode + ' is not recognized; original log returned!')
return log
else:
from pm4py.algo.filtering.log.timestamp import timestamp_filter
if mode == "events":
return timestamp_filter.apply_events(log, dt1, dt2, parameters=get_properties(log))
elif mode == "traces_contained":
return timestamp_filter.filter_traces_contained(log, dt1, dt2, parameters=get_properties(log))
elif mode == "traces_intersecting":
return timestamp_filter.filter_traces_intersecting(log, dt1, dt2, parameters=get_properties(log))
else:
warnings.warn('mode provided: ' + mode + ' is not recognized; original log returned!')
return log
[docs]def filter_between(log: Union[EventLog, pd.DataFrame], act1: str, act2: str) -> Union[EventLog, pd.DataFrame]:
"""
Finds all the sub-cases leading from an event with activity "act1" to an event with activity "act2" in the log,
and returns a log containing only them.
Example:
Log
A B C D E F
A B E F C
A B F C B C B E F C
act1 = B
act2 = C
Returned sub-cases:
B C (from the first case)
B E F C (from the second case)
B F C (from the third case)
B C (from the third case)
B E F C (from the third case)
Parameters
-----------------
log
Event log / Pandas dataframe
act1
Source activity
act2
Target activity
Returns
-----------------
filtered_log
Log containing all the subcases
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.between import between_filter
return between_filter.apply(log, act1, act2, parameters=parameters)
else:
from pm4py.algo.filtering.log.between import between_filter
return between_filter.apply(log, act1, act2, parameters=parameters)
[docs]def filter_case_size(log: Union[EventLog, pd.DataFrame], min_size: int, max_size: int) -> Union[EventLog, pd.DataFrame]:
"""
Filters the event log, keeping the cases having a length (number of events) included between min_size
and max_size
Parameters
-----------------
log
Event log / Pandas dataframe
min_size
Minimum allowed number of events
max_size
Maximum allowed number of events
Returns
----------------
filtered_log
Log with cases having the desidered number of events.
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.cases import case_filter
case_id = parameters[
constants.PARAMETER_CONSTANT_CASEID_KEY] if constants.PARAMETER_CONSTANT_CASEID_KEY in parameters else constants.CASE_CONCEPT_NAME
return case_filter.filter_on_case_size(log, case_id, min_size, max_size)
else:
from pm4py.algo.filtering.log.cases import case_filter
return case_filter.filter_on_case_size(log, min_size, max_size)
[docs]def filter_activities_rework(log: Union[EventLog, pd.DataFrame], activity: str, min_occurrences: int = 2) -> Union[EventLog, pd.DataFrame]:
"""
Filters the event log, keeping the cases where the specified activity occurs at least min_occurrences times.
Parameters
-----------------
log
Event log / Pandas dataframe
activity
Activity
min_occurrences
Minimum desidered number of occurrences
Returns
-----------------
filtered_log
Log with cases having at least min_occurrences occurrences of the given activity
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
parameters["min_occurrences"] = min_occurrences
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.rework import rework_filter
return rework_filter.apply(log, activity, parameters=parameters)
else:
from pm4py.algo.filtering.log.rework import rework_filter
return rework_filter.apply(log, activity, parameters=parameters)
[docs]def filter_variants_top_k(log: Union[EventLog, pd.DataFrame], k: int) -> Union[EventLog, pd.DataFrame]:
"""
Keeps the top-k variants of the log
Parameters
-------------
log
Event log
k
Number of variants that should be kept
parameters
Parameters
Returns
-------------
filtered_log
Filtered log
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.variants import variants_filter
return variants_filter.filter_variants_top_k(log, k, parameters=parameters)
else:
from pm4py.algo.filtering.log.variants import variants_filter
return variants_filter.filter_variants_top_k(log, k, parameters=parameters)
[docs]def filter_variants_by_coverage_percentage(log: Union[EventLog, pd.DataFrame], min_coverage_percentage: float) -> Union[EventLog, pd.DataFrame]:
"""
Filters the variants of the log by a coverage percentage
(e.g., if min_coverage_percentage=0.4, and we have a log with 1000 cases,
of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3,
the filter keeps only the traces of variant 1 and variant 2).
Parameters
---------------
log
Event log
min_coverage_percentage
Minimum allowed percentage of coverage
parameters
Parameters
Returns
---------------
filtered_log
Filtered log
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.variants import variants_filter
return variants_filter.filter_variants_by_coverage_percentage(log, min_coverage_percentage, parameters=parameters)
else:
from pm4py.algo.filtering.log.variants import variants_filter
return variants_filter.filter_variants_by_coverage_percentage(log, min_coverage_percentage, parameters=parameters)
[docs]def filter_prefixes(log: Union[EventLog, pd.DataFrame], activity: str, strict=True, first_or_last="first"):
"""
Filters the log, keeping the prefixes to a given activity. E.g., for a log with traces:
A,B,C,D
A,B,Z,A,B,C,D
A,B,C,D,C,E,C,F
The prefixes to "C" are respectively:
A,B
A,B,Z,A,B
A,B
Parameters
------------------
log
Event log / Pandas dataframe
activity
Target activity of the filter
strict
Applies the filter strictly (cuts the occurrences of the selected activity).
first_or_last
Decides if the first or last occurrence of an activity should be selected as baseline for the filter.
Returns
------------------
filtered_log
Filtered log / dataframe
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
parameters["strict"] = strict
parameters["first_or_last"] = first_or_last
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.prefixes import prefix_filter
return prefix_filter.apply(log, activity, parameters=parameters)
else:
from pm4py.algo.filtering.log.prefixes import prefix_filter
return prefix_filter.apply(log, activity, parameters=parameters)
[docs]def filter_suffixes(log: Union[EventLog, pd.DataFrame], activity: str, strict=True, first_or_last="first"):
"""
Filters the log, keeping the suffixes from a given activity. E.g., for a log with traces:
A,B,C,D
A,B,Z,A,B,C,D
A,B,C,D,C,E,C,F
The suffixes from "C" are respectively:
D
D
D,C,E,C,F
Parameters
------------------
log
Event log / Pandas dataframe
activity
Target activity of the filter
strict
Applies the filter strictly (cuts the occurrences of the selected activity).
first_or_last
Decides if the first or last occurrence of an activity should be selected as baseline for the filter.
Returns
------------------
filtered_log
Filtered log / dataframe
"""
if type(log) not in [pd.DataFrame, EventLog, EventStream]: raise Exception("the method can be applied only to a traditional event log!")
parameters = get_properties(log)
parameters["strict"] = strict
parameters["first_or_last"] = first_or_last
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log)
from pm4py.algo.filtering.pandas.suffixes import suffix_filter
return suffix_filter.apply(log, activity, parameters=parameters)
else:
from pm4py.algo.filtering.log.suffixes import suffix_filter
return suffix_filter.apply(log, activity, parameters=parameters)
[docs]def filter_ocel_event_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) -> OCEL:
"""
Filters the object-centric event log on the provided event attributes values
Parameters
----------------
ocel
Object-centric event log
attribute_key
Attribute at the event level
attribute_values
Attribute values
positive
Decides if the values should be kept (positive=True) or removed (positive=False)
Returns
----------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import event_attributes
return event_attributes.apply(ocel, attribute_values, parameters={event_attributes.Parameters.ATTRIBUTE_KEY: attribute_key, event_attributes.Parameters.POSITIVE: positive})
[docs]def filter_ocel_object_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) -> OCEL:
"""
Filters the object-centric event log on the provided object attributes values
Parameters
----------------
ocel
Object-centric event log
attribute_key
Attribute at the event level
attribute_values
Attribute values
positive
Decides if the values should be kept (positive=True) or removed (positive=False)
Returns
----------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import object_attributes
return object_attributes.apply(ocel, attribute_values, parameters={object_attributes.Parameters.ATTRIBUTE_KEY: attribute_key, object_attributes.Parameters.POSITIVE: positive})
[docs]def filter_ocel_object_types_allowed_activities(ocel: OCEL, correspondence_dict: Dict[str, Collection[str]]) -> OCEL:
"""
Filters an object-centric event log keeping only the specified object types
with the specified activity set (filters out the rest).
Parameters
----------------
ocel
Object-centric event log
correspondence_dict
Dictionary containing, for every object type of interest, a
collection of allowed activities. Example:
{"order": ["Create Order"], "element": ["Create Order", "Create Delivery"]}
Keeps only the object types "order" and "element".
For the "order" object type, only the activity "Create Order" is kept.
For the "element" object type, only the activities "Create Order" and "Create Delivery" are kept.
Returns
-----------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import activity_type_matching
return activity_type_matching.apply(ocel, correspondence_dict)
[docs]def filter_ocel_object_per_type_count(ocel: OCEL, min_num_obj_type: Dict[str, int]) -> OCEL:
"""
Filters the events of the object-centric logs which are related to at least
the specified amount of objects per type.
E.g. pm4py.filter_object_per_type_count(ocel, {"order": 1, "element": 2})
Would keep the following events:
ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order
0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1]
1 e11 1981-01-01 Create Order [i6, i5] [o2]
2 e14 1981-01-04 Create Order [i8, i7] [o3]
Parameters
------------------
ocel
Object-centric event log
min_num_obj_type
Minimum number of objects per type
Returns
-----------------
filtered_event_log
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import objects_ot_count
return objects_ot_count.apply(ocel, min_num_obj_type)
[docs]def filter_ocel_start_events_per_object_type(ocel: OCEL, object_type: str) -> OCEL:
"""
Filters the events in which a new object for the given object type is spawn.
(E.g. an event with activity "Create Order" might spawn new orders).
Parameters
------------------
ocel
Object-centric event log
object_type
Object type to consider
Returns
------------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import ot_endpoints
return ot_endpoints.filter_start_events_per_object_type(ocel, object_type)
[docs]def filter_ocel_end_events_per_object_type(ocel: OCEL, object_type: str) -> OCEL:
"""
Filters the events in which an object for the given object type terminates its lifecycle.
(E.g. an event with activity "Pay Order" might terminate an order).
Parameters
------------------
ocel
Object-centric event log
object_type
Object type to consider
Returns
------------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import ot_endpoints
return ot_endpoints.filter_end_events_per_object_type(ocel, object_type)
[docs]def filter_ocel_events_timestamp(ocel: OCEL, min_timest: Union[datetime.datetime, str], max_timest: Union[datetime.datetime, str], timestamp_key: str = "ocel:timestamp") -> OCEL:
"""
Filters the object-centric event log keeping events in the provided timestamp range
Parameters
-----------------
ocel
Object-centric event log
min_timest
Left extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)
max_timest
Right extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)
timestamp_key
The attribute to use as timestamp (default: ocel:timestamp)
Returns
-----------------
filtered_ocel
Filtered object-centric event log
"""
from pm4py.algo.filtering.ocel import event_attributes
return event_attributes.apply_timestamp(ocel, min_timest, max_timest, parameters={"pm4py:param:timestamp_key": timestamp_key})
[docs]def filter_ocel_object_types(ocel: OCEL, obj_types: Collection[str], positive: bool = True) -> OCEL:
"""
Filters the object types of an object-centric event log.
:param ocel: object-centric event log
:param obj_types: object types to keep/remove
:param positive: boolean value (True=keep, False=remove)
:rtype: ``OCEL``
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order'])
"""
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.objects = filtered_ocel.objects[filtered_ocel.objects[filtered_ocel.object_type_column].isin(obj_types)]
else:
filtered_ocel.objects = filtered_ocel.objects[~filtered_ocel.objects[filtered_ocel.object_type_column].isin(obj_types)]
return filtering_utils.propagate_object_filtering(filtered_ocel)
[docs]def filter_ocel_objects(ocel: OCEL, object_identifiers: Collection[str], positive: bool = True, level: int = 1) -> OCEL:
"""
Filters the object identifiers of an object-centric event log.
:param ocel: object-centric event log
:param object_identifiers: object identifiers to keep/remove
:param positive: boolean value (True=keep, False=remove)
:param level: recursively expand the set of object identifiers until the specified level
:rtype: ``OCEL``
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=1)
"""
object_identifiers = set(object_identifiers)
if level > 1:
ev_rel_obj = ocel.relations.groupby(ocel.event_id_column)[ocel.object_id_column].apply(list).to_dict()
objects_ids = set(ocel.objects[ocel.object_id_column].unique())
graph = {o: set() for o in objects_ids}
for ev in ev_rel_obj:
rel_obj = ev_rel_obj[ev]
for o1 in rel_obj:
for o2 in rel_obj:
if o1 != o2:
graph[o1].add(o2)
while level > 1:
curr = list(object_identifiers)
for el in curr:
for el2 in graph[el]:
object_identifiers.add(el2)
level = level - 1
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.objects = filtered_ocel.objects[filtered_ocel.objects[filtered_ocel.object_id_column].isin(object_identifiers)]
else:
filtered_ocel.objects = filtered_ocel.objects[~filtered_ocel.objects[filtered_ocel.object_id_column].isin(object_identifiers)]
return filtering_utils.propagate_object_filtering(filtered_ocel)
[docs]def filter_ocel_events(ocel: OCEL, event_identifiers: Collection[str], positive: bool = True) -> OCEL:
"""
Filters the event identifiers of an object-centric event log.
:param ocel: object-centric event log
:param event_identifiers: event identifiers to keep/remove
:param positive: boolean value (True=keep, False=remove)
:rtype: ``OCEL``
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1'])
"""
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.events = filtered_ocel.events[filtered_ocel.events[filtered_ocel.event_id_column].isin(event_identifiers)]
else:
filtered_ocel.events = filtered_ocel.events[~filtered_ocel.events[filtered_ocel.event_id_column].isin(event_identifiers)]
return filtering_utils.propagate_event_filtering(filtered_ocel)
[docs]def filter_ocel_cc_object(ocel: OCEL, object_id: str) -> OCEL:
"""
Returns the connected component of the object-centric event log
to which the object with the provided identifier belongs.
:param ocel: object-centric event log
:param object_id: object identifier
:rtype: ``OCEL``
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'order1')
"""
from pm4py.algo.transformation.ocel.split_ocel import algorithm
ocel_splits = algorithm.apply(ocel)
for cc in ocel_splits:
if object_id in cc.objects[ocel.object_id_column].unique():
return cc