-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmagnetic_sampling.py
More file actions
224 lines (184 loc) · 9.01 KB
/
magnetic_sampling.py
File metadata and controls
224 lines (184 loc) · 9.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
AUTHOR: Maximilian Franz
Magnetic Sampling is a simple modification of the LAD algorithm to increase sampling speed in the phase, where we are looking for support points.
It is independent of the other implementations as of now, but will be included in the comparison, once it is sufficiently far developed.
"""
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import style
from utils import ct, transform_set, inverse_ct, create_ranges, adjust_features
style.use('ggplot')
class MagneticSampler():
def __init__(self, clf, scaler, sector_width=0.35, confidence=5, threshold=2, target_value=1):
"""
Constructor of MagneticSampler
Args:
clf: black-box classifier trained on data
scaler: StandardScaler already fit to data
sector_width: angle in radians in which to sample per sector
confidence: number of instances sampled in sector
threshold: number of instances before aborting due to too many errors
"""
self.clf = clf
self.scaler = scaler
self.sector_width = sector_width
self.confidence = confidence
self.threshold = threshold
self.target_value = target_value
@staticmethod
def sample_grid(
num_samples,
radius_inner,
radius_outer,
alphas_lower,
alphas_upper,
original_instance,
restricted=False):
"""
Samples on a grid generated between linear intervals for each dimension
This replaces sample_in, as it gets the job done more robustly. It has
deterministic behaviour and discovers edges more frequently.
Args:
num_samples: number of samples to draw in the given sector
alphas_lower: spherical coordinates between which to sample
[0] is radius by convention
alphas_upper: upper end of coordinate range
Returns:
"""
result = np.zeros((1, alphas_lower.size + 1))
radius_ranges = 1
samples_per_range = num_samples / radius_ranges
radius = (radius_inner + radius_outer) / 2
for i in range(1, radius_ranges + 1):
# radius = radius_inner + i/radius_ranges*(radius_outer - radius_inner)
lower = np.append(np.array([radius]), alphas_lower)
upper = np.append(np.array([radius]), alphas_upper)
result = np.append(result,
create_ranges(lower, upper, samples_per_range).T,
axis=0)
if restricted:
restr = transform_set(result)
return adjust_features(original_instance, [0, 5], restr)
else:
return transform_set(result) + original_instance
def get_num_errors(self, samples):
""" Get number of 'wrong' predictions in a set of predictions
"""
if self.scaler is None:
trans_set = samples
else:
trans_set = self.scaler.inverse_transform(samples)
results = self.clf.predict(trans_set)
return results[results <= 0.5].size
def clean(self, samples):
if self.scaler is None:
trans_set = samples
else:
trans_set = self.scaler.inverse_transform(samples)
prob = self.clf.predict_proba(trans_set)[:, 1]
pos_res_ind = np.where(prob > 0.5)
result = samples[prob > 0.5]
return result
def magnetic_sampling(self,
original_instance,
adversarial_instance,
num_support,
features,
sector_depth=0.6, # must be set depending on the dataset
sector_width=0.35, # About 20 degree,
confidence=10, # must be set depending on the dataset
threshold=5,
):
"""
magnetic_sampling implemented with restriction to a set of features
All non-selected features remain fixed
Args:
original_instance:
adversarial_instance:
num_support:
features: list of feature positions in the feature vectors that ought to be used.
sector_depth:
sector_width:
confidence:
threshold:
Returns:
Full instnances created by updating copies of the original_instance at
the desired features with the new features created through magnetic_sampling
"""
if self.scaler is not None:
original_instance = self.scaler.transform(original_instance.reshape(1, -1))[0]
adversarial_instance = self.scaler.transform(adversarial_instance.reshape(1, -1))[0]
expand_right = True
expand_left = True
restricted_original = original_instance[features]
restricted_adversarial = adversarial_instance[features]
found = False
distance = np.linalg.norm(restricted_adversarial - restricted_original)
while not found:
# Prep parameters
# Note that we work on a transposed space, because we look at the vector between
# original instance and adversarial_instance, before clf we must redo this step.
print('distance', distance)
radius_inner = distance - sector_depth / 2
radius_outer = distance + sector_depth / 2
alphas = np.array([inverse_ct(restricted_adversarial - restricted_original)[1:]])
alphas_lower = alphas - sector_width
alphas_upper = alphas + sector_width
# start original sample
total_samples = np.zeros((1, restricted_original.size))
total_samples = np.append(total_samples, [restricted_adversarial], axis=0)
while expand_left or expand_right:
if expand_left:
sampled_lower = self.sample_grid(confidence, radius_inner, radius_outer,
alphas_lower, alphas_lower + sector_width, restricted_original)
adjusted = adjust_features(original_instance, features, sampled_lower, restricted_original)
errs = self.get_num_errors(adjusted)
if errs > threshold:
expand_left = False
else:
alphas_lower -= sector_width
total_samples = np.append(total_samples, sampled_lower, axis=0)
if expand_right:
sampled_upper = self.sample_grid(confidence, radius_inner, radius_outer,
alphas_upper - sector_width, alphas_upper, restricted_original)
adjusted = adjust_features(original_instance, features, sampled_upper, restricted_original)
errs = self.get_num_errors(adjusted)
if errs > threshold:
expand_right = False
else:
alphas_upper += sector_width
total_samples = np.append(total_samples, sampled_upper, axis=0)
total_samples = adjust_features(original_instance, features, total_samples, restricted_original)
diff = num_support - total_samples.shape[0]
if diff > 0:
# To few samples are drawn
additional_samples = self.sample_grid(abs(diff), radius_inner, radius_outer,
alphas_lower, alphas_upper, restricted_original)
adjusted = adjust_features(original_instance, features, additional_samples, restricted_original)
total_samples = np.append(total_samples, adjusted, axis=0)
# Remove edge cases where a negative sample was drawn
cleaned_samples = self.clean(total_samples)
diff = num_support - cleaned_samples.shape[0]
if diff < 0:
take = np.random.choice(len(cleaned_samples), num_support)
cleaned_samples = cleaned_samples[take]
print('TOTAL-SHAPE:', total_samples.shape)
print('CLEAN-SHAPE:', cleaned_samples.shape)
if cleaned_samples.shape[0] > 0:
found = True
else:
# Don't increase distance, but use normal distribution
return self.use_normal(adversarial_instance)
if self.scaler is not None:
return self.scaler.inverse_transform(total_samples)
return cleaned_samples
def use_normal(self, adversarial_instance):
cleaned_samples = []
sigma = 0.01
while (len(cleaned_samples) == 0):
cov = np.diag(np.full(len(adversarial_instance), sigma))
samples = np.random.multivariate_normal(adversarial_instance, cov, 10)
cleaned_samples = self.clean(samples)
sigma *= 2
return self.scaler.inverse_transform(cleaned_samples)