-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
perception4e.py
467 lines (394 loc) · 16 KB
/
perception4e.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
"""Perception (Chapter 24)"""
import cv2
import keras
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
from keras.datasets import mnist
from keras.layers import Dense, Activation, Flatten, InputLayer, Conv2D, MaxPooling2D
from keras.models import Sequential
from utils4e import gaussian_kernel_2D
# ____________________________________________________
# 24.3 Early Image Processing Operators
# 24.3.1 Edge Detection
def array_normalization(array, range_min, range_max):
"""Normalize an array in the range of (range_min, range_max)"""
if not isinstance(array, np.ndarray):
array = np.asarray(array)
array = array - np.min(array)
array = array * (range_max - range_min) / np.max(array) + range_min
return array
def gradient_edge_detector(image):
"""
Image edge detection by calculating gradients in the image
:param image: numpy ndarray or an iterable object
:return: numpy ndarray, representing a gray scale image
"""
if not isinstance(image, np.ndarray):
image = np.asarray(image)
# gradient filters of x and y direction edges
x_filter, y_filter = np.array([[1, -1]]), np.array([[1], [-1]])
# convolution between filter and image to get edges
y_edges = scipy.signal.convolve2d(image, x_filter, 'same')
x_edges = scipy.signal.convolve2d(image, y_filter, 'same')
edges = array_normalization(x_edges + y_edges, 0, 255)
return edges
def gaussian_derivative_edge_detector(image):
"""Image edge detector using derivative of gaussian kernels"""
if not isinstance(image, np.ndarray):
image = np.asarray(image)
gaussian_filter = gaussian_kernel_2D()
# init derivative of gaussian filters
x_filter = scipy.signal.convolve2d(gaussian_filter, np.asarray([[1, -1]]), 'same')
y_filter = scipy.signal.convolve2d(gaussian_filter, np.asarray([[1], [-1]]), 'same')
# extract edges using convolution
y_edges = scipy.signal.convolve2d(image, x_filter, 'same')
x_edges = scipy.signal.convolve2d(image, y_filter, 'same')
edges = array_normalization(x_edges + y_edges, 0, 255)
return edges
def laplacian_edge_detector(image):
"""Extract image edge with laplacian filter"""
if not isinstance(image, np.ndarray):
image = np.asarray(image)
# init laplacian filter
laplacian_kernel = np.asarray([[0, -1, 0], [-1, 4, -1], [0, -1, 0]])
# extract edges with convolution
edges = scipy.signal.convolve2d(image, laplacian_kernel, 'same')
edges = array_normalization(edges, 0, 255)
return edges
def show_edges(edges):
""" helper function to show edges picture"""
plt.imshow(edges, cmap='gray', vmin=0, vmax=255)
plt.axis('off')
plt.show()
# __________________________________________________
# 24.3.3 Optical flow
def sum_squared_difference(pic1, pic2):
"""SSD of two frames"""
pic1 = np.asarray(pic1)
pic2 = np.asarray(pic2)
assert pic1.shape == pic2.shape
min_ssd = np.inf
min_dxy = (np.inf, np.inf)
# consider picture shift from -30 to 30
for Dx in range(-30, 31):
for Dy in range(-30, 31):
# shift the image
shifted_pic = np.roll(pic2, Dx, axis=0)
shifted_pic = np.roll(shifted_pic, Dy, axis=1)
# calculate the difference
diff = np.sum((pic1 - shifted_pic) ** 2)
if diff < min_ssd:
min_dxy = (Dx, Dy)
min_ssd = diff
return min_dxy, min_ssd
# ____________________________________________________
# segmentation
def gen_gray_scale_picture(size, level=3):
"""
Generate a picture with different gray scale levels
:param size: size of generated picture
:param level: the number of level of gray scales in the picture,
range (0, 255) are equally divided by number of levels
:return image in numpy ndarray type
"""
assert level > 0
# init an empty image
image = np.zeros((size, size))
if level == 1:
return image
# draw a square on the left upper corner of the image
for x in range(size):
for y in range(size):
image[x, y] += (250 // (level - 1)) * (max(x, y) * level // size)
return image
gray_scale_image = gen_gray_scale_picture(3)
def probability_contour_detection(image, discs, threshold=0):
"""
Detect edges/contours by applying a set of discs to an image
:param image: an image in type of numpy ndarray
:param discs: a set of discs/filters to apply to pixels of image
:param threshold: threshold to tell whether the pixel at (x, y) is on an edge
:return image showing edges in numpy ndarray type
"""
# init an empty output image
res = np.zeros(image.shape)
step = discs[0].shape[0]
for x_i in range(0, image.shape[0] - step + 1, 1):
for y_i in range(0, image.shape[1] - step + 1, 1):
diff = []
# apply each pair of discs and calculate the difference
for d in range(0, len(discs), 2):
disc1, disc2 = discs[d], discs[d + 1]
# crop the region of interest
region = image[x_i: x_i + step, y_i: y_i + step]
diff.append(np.sum(np.multiply(region, disc1)) - np.sum(np.multiply(region, disc2)))
if max(diff) > threshold:
# change color of the center of region
res[x_i + step // 2, y_i + step // 2] = 255
return res
def group_contour_detection(image, cluster_num=2):
"""
Detecting contours in an image with k-means clustering
:param image: an image in numpy ndarray type
:param cluster_num: number of clusters in k-means
"""
img = image
Z = np.float32(img)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
K = cluster_num
# use kmeans in opencv-python
ret, label, center = cv2.kmeans(Z, K, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
center = np.uint8(center)
res = center[label.flatten()]
res2 = res.reshape(img.shape)
# show the image
# cv2.imshow('res2', res2)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
return res2
def image_to_graph(image):
"""
Convert an image to an graph in adjacent matrix form
"""
graph_dict = {}
for x in range(image.shape[0]):
for y in range(image.shape[1]):
graph_dict[(x, y)] = [(x + 1, y) if x + 1 < image.shape[0] else None,
(x, y + 1) if y + 1 < image.shape[1] else None]
return graph_dict
def generate_edge_weight(image, v1, v2):
"""
Find edge weight between two vertices in an image
:param image: image in numpy ndarray type
:param v1, v2: verticles in the image in form of (x index, y index)
"""
diff = abs(image[v1[0], v1[1]] - image[v2[0], v2[1]])
return 255 - diff
class Graph:
"""Graph in adjacent matrix to represent an image"""
def __init__(self, image):
"""image: ndarray"""
self.graph = image_to_graph(image)
# number of columns and rows
self.ROW = len(self.graph)
self.COL = 2
self.image = image
# dictionary to save the maximum flow of each edge
self.flow = {}
# initialize the flow
for s in self.graph:
self.flow[s] = {}
for t in self.graph[s]:
if t:
self.flow[s][t] = generate_edge_weight(image, s, t)
def bfs(self, s, t, parent):
"""Breadth first search to tell whether there is an edge between source and sink
parent: a list to save the path between s and t"""
# queue to save the current searching frontier
queue = [s]
visited = []
while queue:
u = queue.pop(0)
for node in self.graph[u]:
# only select edge with positive flow
if node not in visited and node and self.flow[u][node] > 0:
queue.append(node)
visited.append(node)
parent.append((u, node))
return True if t in visited else False
def min_cut(self, source, sink):
"""Find the minimum cut of the graph between source and sink"""
parent = []
max_flow = 0
while self.bfs(source, sink, parent):
path_flow = np.inf
# find the minimum flow of s-t path
for s, t in parent:
path_flow = min(path_flow, self.flow[s][t])
max_flow += path_flow
# update all edges between source and sink
for s in self.flow:
for t in self.flow[s]:
if t[0] <= sink[0] and t[1] <= sink[1]:
self.flow[s][t] -= path_flow
parent = []
res = []
for i in self.flow:
for j in self.flow[i]:
if self.flow[i][j] == 0 and generate_edge_weight(self.image, i, j) > 0:
res.append((i, j))
return res
def gen_discs(init_scale, scales=1):
"""
Generate a collection of disc pairs by splitting an round discs with different angles
:param init_scale: the initial size of each half discs
:param scales: scale number of each type of half discs, the scale size will be doubled each time
:return: the collection of generated discs: [discs of scale1, discs of scale2...]
"""
discs = []
for m in range(scales):
scale = init_scale * (m + 1)
disc = []
# make the full empty dist
white = np.zeros((scale, scale))
center = (scale - 1) / 2
for i in range(scale):
for j in range(scale):
if (i - center) ** 2 + (j - center) ** 2 <= (center ** 2):
white[i, j] = 255
# generate lower half and upper half
lower_half = np.copy(white)
lower_half[:(scale - 1) // 2, :] = 0
upper_half = lower_half[::-1, ::-1]
# generate left half and right half
disc += [lower_half, upper_half, np.transpose(lower_half), np.transpose(upper_half)]
# generate upper-left, lower-right, upper-right, lower-left half discs
disc += [np.tril(white, 0), np.triu(white, 0), np.flip(np.tril(white, 0), axis=0),
np.flip(np.triu(white, 0), axis=0)]
discs.append(disc)
return discs
# __________________________________________________
# 24.4 Classifying Images
def load_MINST(train_size, val_size, test_size):
"""Load MINST dataset from keras"""
(x_train, y_train), (x_test, y_test) = mnist.load_data()
total_size = len(x_train)
if train_size + val_size > total_size:
train_size = total_size - val_size
x_train = x_train.reshape(x_train.shape[0], 1, 28, 28)
x_test = x_test.reshape(x_test.shape[0], 1, 28, 28)
x_train = x_train.astype('float32')
x_train /= 255
test_x = x_test.astype('float32')
test_x /= 255
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)
return ((x_train[:train_size], y_train[:train_size]),
(x_train[train_size:train_size + val_size], y_train[train_size:train_size + val_size]),
(x_test[:test_size], y_test[:test_size]))
def simple_convnet(size=3, num_classes=10):
"""
Simple convolutional network for digit recognition
:param size: number of convolution layers
:param num_classes: number of output classes
:return a convolution network in keras model type
"""
model = Sequential()
# add input layer for images of size (28, 28)
model.add(InputLayer(input_shape=(1, 28, 28)))
# add convolution layers and max pooling layers
for _ in range(size):
model.add(Conv2D(32, (2, 2), padding='same', kernel_initializer='random_uniform'))
model.add(MaxPooling2D(padding='same'))
# add flatten layer and output layers
model.add(Flatten())
model.add(Dense(num_classes))
model.add(Activation('softmax'))
# compile model
model.compile(loss='categorical_crossentropy',
metrics=['accuracy'])
print(model.summary())
return model
def train_model(model):
"""Train the simple convolution network"""
# load dataset
(train_x, train_y), (val_x, val_y), (test_x, test_y) = load_MINST(1000, 100, 100)
model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=5, verbose=2, batch_size=32)
scores = model.evaluate(test_x, test_y, verbose=1)
print(scores)
return model
# _____________________________________________________
# 24.5 DETECTING OBJECTS
def selective_search(image):
"""
Selective search for object detection
:param image: str, the path of image or image in ndarray type with 3 channels
:return list of bounding boxes, each element is in form of [x_min, y_min, x_max, y_max]
"""
if not image:
im = cv2.imread("./images/stapler1-test.png")
elif isinstance(image, str):
im = cv2.imread(image)
else:
im = np.stack(image * 3, axis=-1)
# use opencv python to extract bounding box with selective search
ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()
ss.setBaseImage(im)
ss.switchToSelectiveSearchQuality()
rects = ss.process()
# show bounding boxes with the input image
image_out = im.copy()
for rect in rects[:100]:
print(rect)
x, y, w, h = rect
cv2.rectangle(image_out, (x, y), (x + w, y + h), (0, 255, 0), 1, cv2.LINE_AA)
cv2.imshow("Output", image_out)
cv2.waitKey(0)
return rects
# faster RCNN
def pool_rois(feature_map, rois, pooled_height, pooled_width):
"""
Applies ROI pooling for a single image and various ROIs
:param feature_map: ndarray, in shape of (width, height, channel)
:param rois: list of roi
:param pooled_height: height of pooled area
:param pooled_width: width of pooled area
:return list of pooled features
"""
def curried_pool_roi(roi):
return pool_roi(feature_map, roi, pooled_height, pooled_width)
pooled_areas = list(map(curried_pool_roi, rois))
return pooled_areas
def pool_roi(feature_map, roi, pooled_height, pooled_width):
"""
Applies a single ROI pooling to a single image
:param feature_map: ndarray, in shape of (width, height, channel)
:param roi: region of interest, in form of [x_min_ratio, y_min_ratio, x_max_ratio, y_max_ratio]
:return feature of pooling output, in shape of (pooled_width, pooled_height)
"""
# Compute the region of interest
feature_map_height = int(feature_map.shape[0])
feature_map_width = int(feature_map.shape[1])
h_start = int(feature_map_height * roi[0])
w_start = int(feature_map_width * roi[1])
h_end = int(feature_map_height * roi[2])
w_end = int(feature_map_width * roi[3])
region = feature_map[h_start:h_end, w_start:w_end, :]
# Divide the region into non overlapping areas
region_height = h_end - h_start
region_width = w_end - w_start
h_step = region_height // pooled_height
w_step = region_width // pooled_width
areas = [[(
i * h_step,
j * w_step,
(i + 1) * h_step if i + 1 < pooled_height else region_height,
(j + 1) * w_step if j + 1 < pooled_width else region_width)
for j in range(pooled_width)]
for i in range(pooled_height)]
# take the maximum of each area and stack the result
def pool_area(x):
return np.max(region[x[0]:x[2], x[1]:x[3], :])
pooled_features = np.stack([[pool_area(x) for x in row] for row in areas])
return pooled_features
# faster rcnn demo can be installed and shown in jupyter notebook
# def faster_rcnn_demo(directory):
# """
# show the demo of rcnn, the model is from
# @inproceedings{renNIPS15fasterrcnn,
# Author = {Shaoqing Ren and Kaiming He and Ross Girshick and Jian Sun},
# Title = {Faster {R-CNN}: Towards Real-Time Object Detection
# with Region Proposal Networks},
# Booktitle = {Advances in Neural Information Processing Systems ({NIPS})},
# Year = {2015}}
# :param directory: the directory where the faster rcnn model is installed
# """
# os.chdir(directory + '/lib')
# # make file
# os.system("make clean")
# os.system("make")
# # run demo
# os.chdir(directory)
# os.system("./tools/demo.py")
# return 0