Source code for pm4py.algo.enhancement.comparison.petrinet.element_usage_comparison

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.algo.conformance.tokenreplay import algorithm as tr_algorithm
from pm4py.util.colors import get_string_from_int_below_255
from collections import Counter
from copy import copy
import matplotlib as mpl
import matplotlib.cm as cm
import math
import deprecation


[docs]def give_color_to_direction_dynamic(dir): """ Assigns a color to the direction (dynamic-defined colors) Parameters -------------- dir Direction Returns -------------- col Color """ dir = 0.5 + 0.5 * dir norm = mpl.colors.Normalize(vmin=0, vmax=1) nodes = [0.0, 0.01, 0.25, 0.4, 0.45, 0.55, 0.75, 0.99, 1.0] colors = ["deepskyblue", "skyblue", "lightcyan", "lightgray", "gray", "lightgray", "mistyrose", "salmon", "tomato"] cmap = mpl.colors.LinearSegmentedColormap.from_list("mycmap2", list(zip(nodes, colors))) #cmap = cm.plasma m = cm.ScalarMappable(norm=norm, cmap=cmap) rgba = m.to_rgba(dir) r = get_string_from_int_below_255(math.ceil(rgba[0] * 255.0)) g = get_string_from_int_below_255(math.ceil(rgba[1] * 255.0)) b = get_string_from_int_below_255(math.ceil(rgba[2] * 255.0)) return "#" + r + g + b
[docs]def give_color_to_direction_static(dir): """ Assigns a color to the direction (static-defined colors) Parameters -------------- dir Direction Returns -------------- col Color """ direction_colors = [[-0.5, "#4444FF"], [-0.1, "#AAAAFF"], [0.0, "#CCCCCC"], [0.5, "#FFAAAA"], [1.0, "#FF4444"]] for col in direction_colors: if col[0] >= dir: return col[1]
[docs]@deprecation.deprecated('2.2.5', '3.0.0', details='use pm4py.algo.comparison.petrinet.element_usage_comparison instead') def compare_element_usage_two_logs(net, im, fm, log1, log2, parameters=None): """ Returns some statistics (also visual) about the comparison of the usage of the elements in two logs given an accepting Petri net Parameters ------------- net Petri net im Initial marking fm Final marking log1 First log log2 Second log parameters Parameters of the algorithm (to be passed to the token-based replay) Returns ---------------- aggregated_statistics Statistics about the usage of places, transitions and arcs in the net """ if parameters is None: parameters = {} tr_parameters = copy(parameters) tr_parameters[tr_algorithm.Variants.TOKEN_REPLAY.value.Parameters.ENABLE_PLTR_FITNESS] = True rep_traces1, pl_fit_trace1, tr_fit_trace1, ne_act_model1 = tr_algorithm.apply(log1, net, im, fm, parameters=tr_parameters) rep_traces2, pl_fit_trace2, tr_fit_trace2, ne_act_model2 = tr_algorithm.apply(log2, net, im, fm, parameters=tr_parameters) tr_occ1 = Counter([y for x in rep_traces1 for y in x["activated_transitions"]]) tr_occ2 = Counter([y for x in rep_traces2 for y in x["activated_transitions"]]) pl_occ1 = Counter({p: pl_fit_trace1[p]["c"] + pl_fit_trace1[p]["r"] for p in pl_fit_trace1}) pl_occ2 = Counter({p: pl_fit_trace2[p]["c"] + pl_fit_trace2[p]["r"] for p in pl_fit_trace2}) all_replayed_transitions = set(tr_occ1.keys()).union(set(tr_occ2.keys())) all_replayed_places = set(pl_occ1.keys()).union(set(pl_occ2.keys())) all_transitions = all_replayed_transitions.union(set(net.transitions)) all_places = all_replayed_places.union(set(net.places)) aggregated_statistics = {} for place in all_places: aggregated_statistics[place] = {"log1_occ": pl_occ1[place], "log2_occ": pl_occ2[place], "total_occ": pl_occ1[place] + pl_occ2[place]} aggregated_statistics[place]["label"] = "(%d/%d/%d)" % ( pl_occ1[place], pl_occ2[place], pl_occ1[place] + pl_occ2[place]) dir = (pl_occ2[place] - pl_occ1[place]) / (pl_occ1[place] + pl_occ2[place]) if (pl_occ1[place] + pl_occ2[ place]) > 0 else 0 aggregated_statistics[place]["direction"] = dir aggregated_statistics[place]["color"] = give_color_to_direction_dynamic(dir) for trans in all_transitions: aggregated_statistics[trans] = {"log1_occ": tr_occ1[trans], "log2_occ": tr_occ2[trans], "total_occ": tr_occ1[trans] + tr_occ2[trans]} if trans.label is not None: aggregated_statistics[trans]["label"] = trans.label+" " else: aggregated_statistics[trans]["label"] = "" aggregated_statistics[trans]["label"] = aggregated_statistics[trans]["label"] + "(%d/%d/%d)" % ( tr_occ1[trans], tr_occ2[trans], tr_occ1[trans] + tr_occ2[trans]) dir = (tr_occ2[trans] - tr_occ1[trans]) / (tr_occ1[trans] + tr_occ2[trans]) if (tr_occ1[trans] + tr_occ2[ trans]) > 0 else 0 aggregated_statistics[trans]["direction"] = dir aggregated_statistics[trans]["color"] = give_color_to_direction_dynamic(dir) for arc in trans.in_arcs: aggregated_statistics[arc] = aggregated_statistics[trans] for arc in trans.out_arcs: aggregated_statistics[arc] = aggregated_statistics[trans] return aggregated_statistics