Source code for privugger.attacker.metrics
import matplotlib.pyplot as plt
from sklearn.feature_selection import mutual_info_regression
import numpy as np
import pymc3 as pm
import scipy.stats as st
import pickle
import datetime
[docs]class SimulationMetrics:
"""
A method used to convert traces to something a data analyst can use to analyse
"""
def __init__(self, traces=[], path=""):
"""
Constructor for SimulationMetrics
Parameters:
----------
traces: List[Pymc3.trace]
- A list of traces returned from simulation
path: String
- The path to a stored SimulationMetrics
"""
if isinstance(traces, str):
path = traces
traces = []
if len(traces):
self.traces = traces
else:
self.traces = self.load_from_file(path)
self.I = []
def __str__(self):
"""
Overrides the __str__ str method to print traces
"""
return self.traces.__str__()
[docs] def load_from_file(self, location):
"""
A method which loads a pickled object as the trace
Returns:
----------------
- A "un"pickled object
Parameters:
----------------
Location: str
- The location of the file
"""
with open(location, "rb") as file:
return pickle.load(file)
[docs] def plot_mutual_bar(self, shift=0):
"""
A method used in case of multiple parameters
Is needed since the traces are than stored in a different sense
Parameters
------------------
Shift: int
- Since there are quite a large number of simulation a shift in the data can be needed
"""
start, stop = 0,10
executions = 5 #len(sm.traces)//2//10
fig, ax = plt.subplots(executions,2, figsize=(20,18))
for j in range(shift, shift+executions):
for p in range(2):
start = (p*10)+(j*20)
stop = ((p*10)+10)+(j*20)
traces = self.traces[start:stop]
Is = {}
labels = []
for i in range(10):
trace = traces[i][0]
name = traces[i][2][0]
alice = trace[f"intDist_{i}"]
output = trace[f"Output_{i}"]
I = mutual_info_regression([[a] for a in alice], output, discrete_features=True)[0]
if name[0] in Is:
Is[name[0]].append(I)
else:
labels.append(name[0])
Is[name[0]] = [I]
values = len(Is.keys())
vals = [max(v[1]) for v in Is.items()]
ax[j-shift][p].bar(labels, vals)
ax[j-shift][p].set_ylim(0,7)
ax[j-shift][p].set_title(f"Parameter {p} opposite was {self.traces[start+stop//2][2][1]}")
ax[j-shift][p].set_ylabel("$I(X;Y)$")
plt.tight_layout()
plt.show()
[docs] def plot_mutual_information(self, figsize=(16,8), as_bar=True):
"""
Plots the mutual information as a graph for each distribution
Parameters
--------------
figsize: Tuple<Int>
- The size of the images
as_bar: bool
- Determines if the distribution should be a dot plot or a bar plot
"""
I = self.mutual_information()
size = len(I) if len(I) > 1 else 2
plt.style.use('seaborn-darkgrid')
_, ax = plt.subplots(2, 2,figsize=figsize)
ylim = round(max(self.highest_leakage(head=1, verbose=0), key=lambda x: x[0])[0][0]+0.5)
for pos, (axs, values) in enumerate(zip(ax.flatten(), I)):
x = 0
items = values.items()
labels = []
best_vals = []
for k,v in items:
if as_bar:
best = max(v, key=lambda x: x[0])
best_vals.append(best[0])
else:
for value, info in v:
axs.plot([x], value, "x", label=str(info[1:]))
best_info = round(max(v, key=lambda x: x[0])[0],2)
axs.annotate(str(best_info), xy=(x, best_info), fontsize=16)
if str(k) == "TruncatedNormal":
labels.append("Truncated \n Normal")
else:
labels.append(str(k))
x+=1
if as_bar:
axs.bar([i for i in range(len(best_vals))], best_vals)
alice = {0: (0,100), 1: (0,300), 2: (0,10), 3: (0,1)}
pi_pos = "A_{\pi_" + str(pos+1) + "}"
title = f"$I(Y_{pos+1};{pi_pos})$ for parameter {pos+1} where ${pi_pos}$ ~ $U$ {alice[pos]}"
ylabel = f"$I(Y_{pos+1};{pi_pos})$"
axs.set_title(title, fontsize=16)
axs.set_ylabel(ylabel, fontsize=14)
axs.set_xlabel(f"Distributions", fontsize=14)
axs.set_xticks(range(len(labels)))
axs.set_xticklabels(labels, fontsize=14)
axs.set_ylim(0,ylim)
pos += 1
plt.tight_layout()
plt.show()
[docs] def highest_leakage(self, head=1, verbose=1):
"""
A method used to calculate the highest leakage grouped by each distribution
Returns
-----------
List[Tuple[Float, Tuple[String, ]]]
- Returns a list containing the mutual information next to the specific distribution
Parameters
-----------
head: Int
- Determines how many distributions are included
verbose: int
- Detmines if the distributions should be printed
"""
if not len(self.I):
self.mutual_information()
best_vals = []
for parameter_pos, l in enumerate(self.I):
best_dist = []
for k,v in l.items():
best = max(v, key=lambda x: x[0])
best_dist.append(best)
best_dist = list(sorted(best_dist, key=lambda x: x[0], reverse=True))
if verbose:
print(f"The distribution that had the most leakage was {best_dist[:head]} for parameter {parameter_pos}")
best_vals.append(best_dist[:head])
return best_vals
[docs] def save_to_file(self, location=""):
"""
Save the particular trace to a file with the format: Metrics-%Y-%m-%d-%H-%M-%S.priv
Parameter:
-----------
location: string
- The location in which the files should be saved
"""
date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
with open(location+f"metrics-{date}.priv", "wb") as file:
pickle.dump(self.traces, file)
[docs] def mutual_information(self):
"""
Calculates the mutual information for each distribution and appends them to a global variable I
Returns
--------
List[float, List[String,]]
- A list of the mutual information paired with its respective distribution
"""
_, names, _ = self.traces[0]
size = len(names)
mutual_information = [{} for _ in range(size)]
for i in range(size):
for trace, names, info in self.traces:
discrete = ("int" in str(names[i]))
alice = trace[names[i]]
try:
output = trace["Output"]
except:
pos = int(str(names[i]).split("_")[-1])
if pos < 10:
output = trace[f"Output_{pos}"]
else:
continue
I_ao = mutual_info_regression([[j] for j in alice], output, discrete_features=discrete)[0]
while (len(info) == 1):
info = info[0] # Used to unwrap the inner information in case of subtypes such as List[List[Tuple[...]]]
if isinstance(info, tuple) or (isinstance(info, list) and isinstance(info[0], list)):
info = info[i]
if info[0] in mutual_information[i]:
mutual_information[i][info[0]].append((I_ao,info))
else:
mutual_information[i][info[0]] = [(I_ao, info)]
self.I = mutual_information
return mutual_information