-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulator.py
154 lines (127 loc) · 4.84 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import numpy as np
from scipy.signal import convolve2d
from creature import Creature
from intepreter import Intepreter
from operator import methodcaller
from functools import partial
import pickle
dx = np.array([[-0.5,0.5]])
dy = dx.T
class GridWorldSimulator():
def __init__(self, size=128, wrap=False, diffusion=0.05, dissapation=0.98):
self.size = size
self.wrap = wrap
self.diffusion = diffusion
self.dissapation = dissapation
self.step_fn = []
self.new_creature_fn = []
self.allow_repr = True
self.clear_world()
# statistics
self.step_cnt = 0
self.new_added_cnt = 0
def __len__(self):
return len(self.creatures)
def get_creature_at(self, loc):
cid = self.map[loc[0],loc[1]]
if cid>0:
return self.creatures[cid]
else:
return None
def get_cid(self, c):
return self.map[c.loc[0],c.loc[1]]
def move_creature(self,c,loc):
if type(c) != Creature:
cid = c
c = self.creatures[cid]
else:
cid = self.map[c.loc[0],c.loc[1]]
self.map[c.loc[0],c.loc[1]] = 0
self.map[loc[0],loc[1]] = cid
c.loc = np.array(loc)
def add_creature(self,c):
assert self.map[c.loc[0],c.loc[1]]==0, "cannot add, place is occupied"
self.new_id+=1
self.creatures[self.new_id] = c
self.map[c.loc[0],c.loc[1]]=self.new_id
for fn in self.new_creature_fn:
fn(self, c)
self.new_added_cnt += 1
def remove_creature(self,c):
if type(c) != Creature:
cid = c
c = self.creatures[cid]
else:
cid = self.map[c.loc[0],c.loc[1]]
self.map[c.loc[0],c.loc[1]] = 0
self.res[c.loc[0],c.loc[1]] += max(c.rp,0)
c.alive=False
del self.creatures[cid]
def clear_world(self):
self.new_id = 0
self.map = np.zeros((self.size,self.size),dtype=int)
self.res = np.zeros((self.size,self.size),dtype=float)
self.creatures = {}
def init_loc(self,c):
x,y = np.random.randint(self.size,size=2)
n_try = 0
while self.map[x,y]>0:
if n_try>10:
raise Exception("hard to find an empty space, world is too crowded")
x,y = np.random.randint(self.size,size=2)
c.loc = np.array([x,y])
c.r = np.random.randint(8)
return c
def populate_density(self, density=0.1, genome_size = 8):
positions = np.logical_and(np.random.rand(self.size,self.size)<density, self.map<1)
positions = np.argwhere(positions==True)
np.random.shuffle(positions)
for x,y in positions:
genome = np.random.randint(2**32,size=genome_size,dtype=np.uint32)
# genome[1] = genome[1] & 0x0000ffff | 0x80800000 # make sure it's capable to repl
c = Creature(genome)
c.loc = np.array([x,y])
c.r = np.random.randint(8)
self.add_creature(c)
def populate_number(self, n_creatures=100, genome_size = 8, genomes=None):
if genomes is None:
genomes = np.random.randint(2**32,size=(n_creatures,genome_size),dtype=np.uint32)
for i,c in enumerate(range(len(genomes))):
c = Creature(genomes[i])
self.init_loc(c)
self.add_creature(c)
def step(self):
self.step_cnt+=1
self.new_added_cnt=0
# diffuse resource
dmdx = convolve2d(self.res, dx, boundary='wrap', mode='same')
dmdy = convolve2d(self.res, dy, boundary='wrap', mode='same')
d2m = np.roll(convolve2d(dmdx, dx, boundary='wrap', mode='same'),-1,axis=1)
d2m += np.roll(convolve2d(dmdy, dy, boundary='wrap', mode='same'),-1,axis=0)
self.res += self.diffusion*d2m
self.res*=self.dissapation
# step creatures
tmp = [(cid,c) for cid,c in self.creatures.items()]
for cid,c in tmp:
if not c.alive:
continue
# actions
inputs = []
for fn in c.reflex.enabled_inputs:
inputs.append(methodcaller(fn, c, self)(Intepreter))
outputs = Intepreter.aggregate(c.step(inputs))
enabled_actions = [(fn,v) for fn,v in zip(c.reflex.enabled_outputs, outputs)]
np.random.shuffle(enabled_actions)
for action, value in enabled_actions:
methodcaller(action, c, self, value)(Intepreter)
# other step functions
for fn in self.step_fn:
fn(self)
def save(self, filename):
with open(filename,'wb') as wf:
pickle.dump(self, wf)
@classmethod
def load(cls, filename):
with open(filename,'rb') as rf:
w=pickle.load(rf)
return w