Source code for pm4py.statistics.sojourn_time.log.get

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from enum import Enum
from statistics import mean, median

from pm4py.util import exec_utils, constants, xes_constants
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.util.business_hours import BusinessHours
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog


[docs]class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY AGGREGATION_MEASURE = "aggregationMeasure" BUSINESS_HOURS = "business_hours" WORKTIMING = "worktiming" WEEKENDS = "weekends" WORKCALENDAR = "workcalendar"
DIFF_KEY = "@@diff"
[docs]def apply(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[str, float]: """ Gets the sojourn time per activity on an event log object Parameters -------------- dataframe Pandas dataframe parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY => activity key - Parameters.START_TIMESTAMP_KEY => start timestamp key - Parameters.TIMESTAMP_KEY => timestamp key - Parameters.BUSINESS_HOURS => calculates the difference of time based on the business hours, not the total time. Default: False - Parameters.WORKTIMING => work schedule of the company (provided as a list where the first number is the start of the work time, and the second number is the end of the work time), if business hours are enabled Default: [7, 17] (work shift from 07:00 to 17:00) - Parameters.WEEKENDS => indexes of the days of the week that are weekend Default: [6, 7] (weekends are Saturday and Sunday) - Parameters.AGGREGATION_MEASURE => performance aggregation measure (sum, min, max, mean, median) Returns -------------- soj_time_dict Sojourn time dictionary """ if parameters is None: parameters = {} business_hours = exec_utils.get_param_value(Parameters.BUSINESS_HOURS, parameters, False) worktiming = exec_utils.get_param_value(Parameters.WORKTIMING, parameters, [7, 17]) weekends = exec_utils.get_param_value(Parameters.WEEKENDS, parameters, [6, 7]) workcalendar = exec_utils.get_param_value(Parameters.WORKCALENDAR, parameters, constants.DEFAULT_BUSINESS_HOURS_WORKCALENDAR) log = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters) activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) aggregation_measure = exec_utils.get_param_value(Parameters.AGGREGATION_MEASURE, parameters, "mean") durations_dict = {} activities = [ev[activity_key] for trace in log for ev in trace] for act in activities: durations_dict[act] = [] for trace in log: for event in trace: activity = event[activity_key] if business_hours: bh = BusinessHours(event[start_timestamp_key].replace(tzinfo=None), event[timestamp_key].replace(tzinfo=None), worktiming=worktiming, weekends=weekends, workcalendar=workcalendar) durations_dict[activity].append(bh.getseconds()) else: start_time = event[start_timestamp_key].timestamp() complete_time = event[timestamp_key].timestamp() durations_dict[activity].append(complete_time - start_time) for act in durations_dict: if aggregation_measure == "median": durations_dict[act] = median(durations_dict[act]) elif aggregation_measure == "min": durations_dict[act] = min(durations_dict[act]) elif aggregation_measure == "max": durations_dict[act] = max(durations_dict[act]) elif aggregation_measure == "sum": durations_dict[act] = sum(durations_dict[act]) else: durations_dict[act] = mean(durations_dict[act]) return durations_dict