-
Notifications
You must be signed in to change notification settings - Fork 2
/
env.py
127 lines (101 loc) · 3.67 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import gym
import time
import numpy as np
from gym import spaces
class ADEnv(gym.Env):
"""
Customized environment for anomaly detection
"""
def __init__(self,dataset: np.ndarray,sampling_Du=1000,prob_au=0.5,label_normal=0,label_anomaly=1, name="default"):
"""
Initialize anomaly environment for DPLAN algorithm.
:param dataset: Input dataset in the form of 2-D array. The Last column is the label.
:param sampling_Du: Number of sampling on D_u for the generator g_u
:param prob_au: Probability of performing g_a.
:param label_normal: label of normal instances
:param label_anomaly: label of anomaly instances
"""
super().__init__()
self.name=name
# hyperparameters:
self.num_S=sampling_Du
self.normal=label_normal
self.anomaly=label_anomaly
self.prob=prob_au
# Dataset infos: D_a and D_u
self.m,self.n=dataset.shape
self.n_feature=self.n-1
self.n_samples=self.m
self.x=dataset[:,:self.n_feature]
self.y=dataset[:,self.n_feature]
self.dataset=dataset
self.index_u=np.where(self.y==self.normal)[0]
self.index_a=np.where(self.y==self.anomaly)[0]
self.index_n=np.where(self.y==2)[0]
# observation space:
self.observation_space=spaces.Discrete(self.m)
# action space: 0 or 1
self.action_space=spaces.Discrete(2)
# initial state
self.counts=None
self.state=None
self.DQN=None
def generater_a(self, *args, **kwargs):
# sampling function for D_a
index=np.random.choice(self.index_a)
return index
def generater_n(self, *args, **kwargs):
# sampling function for D_n
index=np.random.choice(self.index_n)
return index
def generate_u(self,action,s_t):
# sampling function for D_u
S=np.random.choice(self.index_u,self.num_S)
# calculate distance in the space of last hidden layer of DQN
all_x=self.x[np.append(S,s_t)]
all_dqn_s = self.DQN.get_latent(all_x)
all_dqn_s = all_dqn_s.cpu().detach().numpy()
dqn_s=all_dqn_s[:-1]
dqn_st=all_dqn_s[-1]
dist=np.linalg.norm(dqn_s-dqn_st,axis=1)
if action==1:
loc=np.argmin(dist)
elif action==0:
loc=np.argmax(dist)
index=S[loc]
return index
def reward_h(self,action,s_t):
# Anomaly-biased External Handcrafted Reward Function h
if (action==1) & (s_t in self.index_a):
return 1
elif (action==0) & (s_t in self.index_n):
return 1
elif (action==0) & (s_t in self.index_u):
return 0
elif (action==1) & (s_t in self.index_u):
return -0.5
return -1
def step(self,action):
self.state = int(self.state)
# store former state
s_t=self.state
# choose generator
g=np.random.choice([self.generater_a, self.generate_u, self.generater_n],p=[0.4,0.2,0.4])
s_tp1=g(action,s_t)
# change to the next state
self.state=s_tp1
self.state = int(self.state)
self.counts+=1
# calculate the reward
reward=self.reward_h(action,s_t)
# done: whether terminal or not
done=False
# info
info={"State t":s_t, "Action t": action, "State t+1":s_tp1}
return self.state, reward, done, info
def reset(self):
# reset the status of environment
self.counts=0
# the first observation is uniformly sampled from the D_u
self.state=np.random.choice(self.index_u)
return self.state