林嶔 (Lin, Chin)
Lesson 9 變分自編碼器與異常檢測
– 我們講述了他其中一個重要的功能,就是解壓縮的部分能夠拿來當作「圖像生成」用,但效果相當悲劇
悲劇的原因在於在壓縮的過程中,他的樣本空間很有可能只有部分是可用的,而當你的「隨機數」指定到剩下的部分當然沒有辦法很好的做出圖像「還原」。
讓我們利用MNIST的再做一個有趣的範例!
library(data.table)
library(OpenImageR)
DAT = fread("data/MNIST.csv", data.table = FALSE)
DAT = data.matrix(DAT)
imageShow(t(matrix(as.numeric(DAT[123,-1]), nrow = 28, byrow = TRUE)))
library(mxnet)
library(magrittr)
my_iterator_func <- setRefClass("Custom_Iter1",
fields = c("iter", "data.csv", "data.shape", "batch.size"),
contains = "Rcpp_MXArrayDataIter",
methods = list(
initialize = function(iter, data.csv, data.shape, batch.size){
csv_iter <- mx.io.CSVIter(data.csv = data.csv, data.shape = data.shape, batch.size = batch.size)
.self$iter <- csv_iter
.self
},
value = function(){
val <- as.array(.self$iter$value()$data)
val.x <- val[-1,]
dim(val.x) <- c(28, 28, 1, ncol(val.x))
val.x <- val.x/255
val.x <- mx.nd.array(val.x)
val.y <- val.x
list(data=val.x, label=val.y)
},
iter.next = function(){
.self$iter$iter.next()
},
reset = function(){
.self$iter$reset()
},
finalize=function(){
}
)
)
my_iter = my_iterator_func(iter = NULL, data.csv = 'data/MNIST.csv', data.shape = 785, batch.size = 50)
# Encoder
data <- mx.symbol.Variable('data')
fc1 <- mx.symbol.FullyConnected(data = data, num.hidden = 64, name = 'fc1')
relu1 <- mx.symbol.Activation(data = fc1, act_type = "relu", name = 'relu1')
fc2 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
encoder <- mx.symbol.FullyConnected(data = relu2, num.hidden = 2, name = 'encoder')
# Decoder
fc3 <- mx.symbol.FullyConnected(data = encoder, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
# MSE loss
label <- mx.symbol.Variable(name = 'label')
residual <- mx.symbol.broadcast_minus(lhs = label, rhs = decoder)
square_residual <- mx.symbol.square(data = residual)
mean_square_residual <- mx.symbol.mean(data = square_residual, axis = 0:3, keepdims = FALSE)
mse_loss <- mx.symbol.MakeLoss(data = mean_square_residual, name = 'mse')
my_optimizer <- mx.opt.create(name = "adam", learning.rate = 0.001, beta1 = 0.9, beta2 = 0.999, wd = 1e-4)
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = mse_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
all_layers <- model$symbol$get.internals()
encoder_output <- which(all_layers$outputs == 'encoder_output') %>% all_layers$get.output()
encoder_model <- model
encoder_model$symbol <- encoder_output
encoder_model$arg.params <- encoder_model$arg.params[names(encoder_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_model$aux.params <- encoder_model$aux.params[names(encoder_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$aux.shapes)]
data <- mx.symbol.Variable('data')
fc3 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder_output <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
decoder_model <- model
decoder_model$symbol <- decoder_output
decoder_model$arg.params <- decoder_model$arg.params[names(decoder_model$arg.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$arg.shapes)]
decoder_model$aux.params <- decoder_model$aux.params[names(decoder_model$aux.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$aux.shapes)]
X <- t(DAT[,-1])
dim(X) <- c(28, 28, 1, ncol(X))
X <- X/255
Y <- DAT[,1]
zip_code <- predict(encoder_model, X)
plot(zip_code[1,], zip_code[2,], xlab = 'dim 1', ylab = 'dim 2',
pch = 1, cex = 0.5, col = rainbow(10, alpha = 0.5)[Y + 1])
legend('bottomleft', legend = 0:9, pch = 1, col = rainbow(10))
my_zip_code <- apply(zip_code[,which(Y == 0)], 1, mean)
dim(my_zip_code) <- c(2, 1)
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4))
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,1], 0, 0, 1, 1, interpolate=FALSE)
my_zip_code <- c(-25, -15)
dim(my_zip_code) <- c(2, 1)
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4))
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,1], 0, 0, 1, 1, interpolate=FALSE)
zip_code.6 <- apply(zip_code[,which(Y == 6)], 1, mean)
zip_code.0 <- apply(zip_code[,which(Y == 0)], 1, mean)
my_zip_code <- rbind(seq(zip_code.6[1], zip_code.0[1], length.out = 10), seq(zip_code.6[2], zip_code.0[2], length.out = 10))
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4),mfrow = c(2, 5))
for (i in 1:10) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,i], 0, 0, 1, 1, interpolate=FALSE)
}
par(mar = rep(0,4))
zip_code <- predict(encoder_model, X)
plot(zip_code[1,], zip_code[2,],
pch = 1, cex = 0.5, col = rainbow(10, alpha = 0.5)[Y + 1])
legend('bottomleft', legend = 0:9, pch = 1, col = rainbow(10))
points(my_zip_code[1,], my_zip_code[2,], pch = 19, cex = 1.5)
– 我們對「圖像」該長成什麼樣子應該已經有個既定的印象了,這個印象從我們的類似經驗中獲取,所以出現了新的樣本我們就會覺得「不像」。
– 那現在的關鍵是,自編碼器的訓練中我們沒辦法保證我們知道這個「隱碼分布」長成什麼樣子,那我們該怎樣確保呢?
– 舉例來說,在剛剛的範例中以類似正則化的方式限制隱碼要盡可能的接近原點\((0, 0)\),那這樣子問題自然就有可能好轉。
– 實現過程不會很難,讓我們加上一個損失就可以了,這邊要注意權重:
# Encoder
data <- mx.symbol.Variable('data')
fc1 <- mx.symbol.FullyConnected(data = data, num.hidden = 64, name = 'fc1')
relu1 <- mx.symbol.Activation(data = fc1, act_type = "relu", name = 'relu1')
fc2 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
encoder <- mx.symbol.FullyConnected(data = relu2, num.hidden = 2, name = 'encoder')
# Decoder
fc3 <- mx.symbol.FullyConnected(data = encoder, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
# MSE loss
label <- mx.symbol.Variable(name = 'label')
residual <- mx.symbol.broadcast_minus(lhs = label, rhs = decoder)
square_residual <- mx.symbol.square(data = residual)
mean_square_residual <- mx.symbol.mean(data = square_residual, axis = 0:3, keepdims = FALSE)
# Encoder loss
square_encoder <- mx.symbol.square(data = encoder)
mean_square_encoder <- mx.symbol.mean(data = square_encoder, axis = 0:1, keepdims = FALSE)
my_loss <- mx.symbol.MakeLoss(data = mean_square_residual + 1e-4 * mean_square_encoder, name = 'loss')
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = my_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
# Encoder
all_layers <- model$symbol$get.internals()
encoder_output <- which(all_layers$outputs == 'encoder_output') %>% all_layers$get.output()
encoder_model <- model
encoder_model$symbol <- encoder_output
encoder_model$arg.params <- encoder_model$arg.params[names(encoder_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_model$aux.params <- encoder_model$aux.params[names(encoder_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$aux.shapes)]
# Decoder
data <- mx.symbol.Variable('data')
fc3 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder_output <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
decoder_model <- model
decoder_model$symbol <- decoder_output
decoder_model$arg.params <- decoder_model$arg.params[names(decoder_model$arg.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$arg.shapes)]
decoder_model$aux.params <- decoder_model$aux.params[names(decoder_model$aux.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$aux.shapes)]
X <- t(DAT[,-1])
dim(X) <- c(28, 28, 1, ncol(X))
X <- X/255
Y <- DAT[,1]
zip_code <- predict(encoder_model, X)
plot(zip_code[1,], zip_code[2,], xlab = 'dim 1', ylab = 'dim 2',
pch = 1, cex = 0.5, col = rainbow(10, alpha = 0.5)[Y + 1])
legend('bottomleft', legend = 0:9, pch = 1, col = rainbow(10))
zip_code.6 <- apply(zip_code[,which(Y == 6)], 1, mean)
zip_code.0 <- apply(zip_code[,which(Y == 0)], 1, mean)
my_zip_code <- rbind(seq(zip_code.6[1], zip_code.0[1], length.out = 10), seq(zip_code.6[2], zip_code.0[2], length.out = 10))
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4),mfrow = c(2, 5))
for (i in 1:10) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,i], 0, 0, 1, 1, interpolate=FALSE)
}
– 簡單來說VAE加入了一些noise進去AutoEncoder內,透過Normal distribution的抽樣讓結果更好。
– 在標準差的部分,我們輸出的是\(ln(sd)\),這是為了避免出現負數。
# Encoder
data <- mx.symbol.Variable('data')
fc1 <- mx.symbol.FullyConnected(data = data, num.hidden = 64, name = 'fc1')
relu1 <- mx.symbol.Activation(data = fc1, act_type = "relu", name = 'relu1')
fc2 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
encoder_mean <- mx.symbol.FullyConnected(data = relu2, num.hidden = 2, name = 'encoder_mean')
encoder_lnsd <- mx.symbol.FullyConnected(data = relu2, num.hidden = 2, name = 'encoder_lnsd')
encoder_sd <- mx.symbol.exp(encoder_lnsd, name = 'encoder_sd')
encoder_noise <- mx.symbol.random_normal(loc = 0, scale = 1, shape = c(2, 50))
encoder <- encoder_noise * encoder_sd + encoder_mean
# Decoder
fc3 <- mx.symbol.FullyConnected(data = encoder, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
# MSE loss
label <- mx.symbol.Variable(name = 'label')
residual <- mx.symbol.broadcast_minus(lhs = label, rhs = decoder)
square_residual <- mx.symbol.square(data = residual)
mean_square_residual <- mx.symbol.mean(data = square_residual, axis = 0:3, keepdims = FALSE)
# Encoder loss
square_encoder_mean <- mx.symbol.square(data = encoder_mean)
loss_encoder <- mx.symbol.mean(1 + encoder_lnsd - square_encoder_mean - encoder_sd, axis = 0:1, keepdims = FALSE)
my_loss <- mx.symbol.MakeLoss(data = mean_square_residual - 5e-4 * loss_encoder, name = 'loss')
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = my_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
# Encoder
all_layers <- model$symbol$get.internals()
encoder_output <- which(all_layers$outputs == 'encoder_mean_output') %>% all_layers$get.output()
encoder_model <- model
encoder_model$symbol <- encoder_output
encoder_model$arg.params <- encoder_model$arg.params[names(encoder_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_model$aux.params <- encoder_model$aux.params[names(encoder_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$aux.shapes)]
# Decoder
data <- mx.symbol.Variable('data')
fc3 <- mx.symbol.FullyConnected(data = data, num.hidden = 8, name = 'fc3')
relu3 <- mx.symbol.Activation(data = fc3, act_type = "relu", name = 'relu3')
fc4 <- mx.symbol.FullyConnected(data = relu3, num.hidden = 64, name = 'fc4')
relu4 <- mx.symbol.Activation(data = fc4, act_type = "relu", name = 'relu4')
fc5 <- mx.symbol.FullyConnected(data = relu4, num.hidden = 784, name = 'fc5')
decoder_output <- mx.symbol.reshape(data = fc5, shape = c(28, 28, 1, -1), name = 'decoder')
decoder_model <- model
decoder_model$symbol <- decoder_output
decoder_model$arg.params <- decoder_model$arg.params[names(decoder_model$arg.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$arg.shapes)]
decoder_model$aux.params <- decoder_model$aux.params[names(decoder_model$aux.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(2, 7))$aux.shapes)]
X <- t(DAT[,-1])
dim(X) <- c(28, 28, 1, ncol(X))
X <- X/255
Y <- DAT[,1]
zip_code <- predict(encoder_model, X)
plot(zip_code[1,], zip_code[2,], xlab = 'dim 1', ylab = 'dim 2',
pch = 1, cex = 0.5, col = rainbow(10, alpha = 0.5)[Y + 1])
legend('bottomleft', legend = 0:9, pch = 1, col = rainbow(10))
zip_code.6 <- apply(zip_code[,which(Y == 6)], 1, mean)
zip_code.0 <- apply(zip_code[,which(Y == 0)], 1, mean)
my_zip_code <- rbind(seq(zip_code.6[1], zip_code.0[1], length.out = 10), seq(zip_code.6[2], zip_code.0[2], length.out = 10))
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4),mfrow = c(2, 5))
for (i in 1:10) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,i], 0, 0, 1, 1, interpolate=FALSE)
}
– 我們這次把隨機的過程也加進來,並且再看看隨機隱碼分布:
all_layers <- model$symbol$get.internals()
encoder_mean_output <- which(all_layers$outputs == 'encoder_mean_output') %>% all_layers$get.output()
encoder_sd_output <- which(all_layers$outputs == 'encoder_sd_output') %>% all_layers$get.output()
encoder_mean_model <- model
encoder_mean_model$symbol <- encoder_mean_output
encoder_mean_model$arg.params <- encoder_mean_model$arg.params[names(encoder_mean_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_mean_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_mean_model$aux.params <- encoder_mean_model$aux.params[names(encoder_mean_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_mean_output, data = c(28, 28, 1, 7))$aux.shapes)]
encoder_sd_model <- model
encoder_sd_model$symbol <- encoder_sd_output
encoder_sd_model$arg.params <- encoder_sd_model$arg.params[names(encoder_sd_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_sd_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_sd_model$aux.params <- encoder_sd_model$aux.params[names(encoder_sd_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_sd_output, data = c(28, 28, 1, 7))$aux.shapes)]
– 這個過程具有隨機性,你可以多跑幾次看看。
zip_mean <- predict(encoder_mean_model, X)
zip_sd <- predict(encoder_sd_model, X)
zip_code <- zip_sd * rnorm(n = length(zip_sd), mean = 0, sd = 1) + zip_mean
plot(zip_code[1,], zip_code[2,], xlab = 'dim 1', ylab = 'dim 2',
pch = 1, cex = 0.5, col = rainbow(10, alpha = 0.5)[Y + 1])
legend('bottomleft', legend = 0:9, pch = 1, col = rainbow(10))
– 但要注意一點,如果你想讓生成模型更不容易出現亂碼,請你增加Encoder loss的權重。
– 但如果你希望圖像清楚一點,請你降低Encoder loss的權重,但這樣隨機生成時更容易出現亂碼。
# Encoder
data <- mx.symbol.Variable('data')
fc1 <- mx.symbol.FullyConnected(data = data, num.hidden = 128, name = 'fc1')
relu1 <- mx.symbol.Activation(data = fc1, act_type = "relu", name = 'relu1')
encoder_mean <- mx.symbol.FullyConnected(data = relu1, num.hidden = 32, name = 'encoder_mean')
encoder_lnsd <- mx.symbol.FullyConnected(data = relu1, num.hidden = 32, name = 'encoder_lnsd')
encoder_sd <- mx.symbol.exp(encoder_lnsd, name = 'encoder_sd')
encoder_noise <- mx.symbol.random_normal(loc = 0, scale = 1, shape = c(32, 50))
encoder <- encoder_noise * encoder_sd + encoder_mean
# Decoder
fc2 <- mx.symbol.FullyConnected(data = encoder, num.hidden = 128, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
fc3 <- mx.symbol.FullyConnected(data = relu2, num.hidden = 784, name = 'fc3')
sigmoid_out <- mx.symbol.Activation(data = fc3, act_type = "sigmoid", name = 'sigmoid_out')
decoder <- mx.symbol.reshape(data = sigmoid_out, shape = c(28, 28, 1, -1), name = 'decoder')
# MSE loss
label <- mx.symbol.Variable(name = 'label')
residual <- mx.symbol.broadcast_minus(lhs = label, rhs = decoder)
square_residual <- mx.symbol.square(data = residual)
mean_square_residual <- mx.symbol.mean(data = square_residual, axis = 0:3, keepdims = FALSE)
# Encoder loss
square_encoder_mean <- mx.symbol.square(data = encoder_mean)
loss_encoder <- mx.symbol.mean(1 + encoder_lnsd - square_encoder_mean - encoder_sd, axis = 0:1, keepdims = FALSE)
my_loss <- mx.symbol.MakeLoss(data = mean_square_residual - 5e-2 * loss_encoder, name = 'loss')
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = my_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
data <- mx.symbol.Variable('data')
fc2 <- mx.symbol.FullyConnected(data = data, num.hidden = 128, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
fc3 <- mx.symbol.FullyConnected(data = relu2, num.hidden = 784, name = 'fc3')
sigmoid_out <- mx.symbol.Activation(data = fc3, act_type = "sigmoid", name = 'sigmoid_out')
decoder_output <- mx.symbol.reshape(data = sigmoid_out, shape = c(28, 28, 1, -1), name = 'decoder')
decoder_model <- model
decoder_model$symbol <- decoder_output
decoder_model$arg.params <- decoder_model$arg.params[names(decoder_model$arg.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(32, 7))$arg.shapes)]
decoder_model$aux.params <- decoder_model$aux.params[names(decoder_model$aux.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(32, 7))$aux.shapes)]
my_zip_code <- array(rnorm(320, mean = 0, sd = 1), dim = c(32, 10))
unzip_pred <- predict(decoder_model, my_zip_code, array.layout = 'colmajor')
unzip_pred[unzip_pred > 1] <- 1
unzip_pred[unzip_pred < 0] <- 0
par(mar = rep(0,4),mfrow = c(2, 5))
for (i in 1:10) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(unzip_pred[,,,i], 0, 0, 1, 1, interpolate=FALSE)
}
– 也許你覺得很失望,但換個念頭,我們也許能夠利用這個「缺陷」。
– 那既然不屬於本來的分布,在他經過Decoder還原回來的時候,圖片是不是就會變的很怪?
– 也許我們可以利用這個特性來做異常檢測任務!
– 在這個實驗中,我們假設我們手上有的「異常」資料非常非常少,只有少量的2,並且我們根本就沒有3。
– 但我們假設在未來的狀況之下,我們會面對到2與3的資料,而我們要有能力將他們分離出來。
library(data.table)
library(OpenImageR)
DAT <- fread("data/MNIST.csv", data.table = FALSE)
DAT <- data.matrix(DAT)
Normal_idx <- which(!DAT[,1] %in% 2:3)
Abnormal_idx.2 <- which(DAT[,1] %in% 2)
Abnormal_idx.3 <- which(DAT[,1] %in% 3)
set.seed(0)
Train_Normal_idx <- sample(Normal_idx, length(Normal_idx) * 0.6, replace = FALSE)
Train_Abnormal_idx.2 <- sample(Abnormal_idx.2, 10, replace = FALSE)
Train_idx <- sort(c(Train_Normal_idx, Train_Abnormal_idx.2))
Train_X <- t(DAT[Train_idx,-1])
dim(Train_X) <- c(28, 28, 1, ncol(Train_X))
Train_X <- Train_X / 255
Train_Y <- DAT[Train_idx,1]
Test_X <- t(DAT[-Train_idx,-1])
dim(Test_X) <- c(28, 28, 1, ncol(Test_X))
Test_X <- Test_X / 255
Test_Y <- DAT[-Train_idx,1]
my_iterator_core <- function (batch_size) {
batch = 0
batch_per_epoch = dim(Train_X)[4] / batch_size
reset = function() {batch <<- 0}
iter.next = function() {
batch <<- batch+1
if (batch > batch_per_epoch) {return(FALSE)} else {return(TRUE)}
}
value = function() {
pos_idx <- sample(which(Train_Y %in% 2:3), batch_size / 2)
neg_idx <- sample(which(!Train_Y %in% 2:3), batch_size / 2)
idx <- c(pos_idx, neg_idx)
data <- mx.nd.array(Train_X[,,,idx, drop=FALSE])
label <- mx.nd.array(array((Train_Y[idx] %in% 2:3) + 0, dim = c(1, batch_size)))
return(list(data = data, label = label))
}
return(list(reset = reset, iter.next = iter.next, value = value, batch_size = batch_size, batch = batch))
}
my_iterator_func <- setRefClass("Custom_Iter",
fields = c("iter", "batch_size"),
contains = "Rcpp_MXArrayDataIter",
methods = list(
initialize = function(iter, batch_size = 100){
.self$iter <- my_iterator_core(batch_size = batch_size)
.self
},
value = function(){
.self$iter$value()
},
iter.next = function(){
.self$iter$iter.next()
},
reset = function(){
.self$iter$reset()
},
finalize=function(){
}
)
)
my_iter <- my_iterator_func(iter = NULL, batch_size = 20)
# input
data <- mx.symbol.Variable('data')
label = mx.symbol.Variable(name = 'label')
# first conv
conv1 <- mx.symbol.Convolution(data=data, kernel=c(5,5), num_filter=10, name = 'conv1')
relu1 <- mx.symbol.Activation(data=conv1, act_type="relu")
pool1 <- mx.symbol.Pooling(data=relu1, pool_type="max",
kernel=c(2,2), stride=c(2,2))
# second conv
conv2 <- mx.symbol.Convolution(data=pool1, kernel=c(5,5), num_filter=20, name = 'conv2')
relu2 <- mx.symbol.Activation(data=conv2, act_type="relu")
pool2 <- mx.symbol.Pooling(data=relu2, pool_type="max",
kernel=c(2,2), stride=c(2,2))
# first fullc
flatten <- mx.symbol.Flatten(data=pool2)
fc1 <- mx.symbol.FullyConnected(data=flatten, num_hidden=150, name = 'fc1')
relu3 <- mx.symbol.Activation(data=fc1, act_type="relu")
# second fullc
fc2 <- mx.symbol.FullyConnected(data=relu3, num_hidden=1, name = 'fc2')
# logistic
lenet = mx.symbol.sigmoid(data = fc2, name = 'lenet')
eps = 1e-8
ce_loss_pos = mx.symbol.broadcast_mul(mx.symbol.log(lenet + eps), label)
ce_loss_neg = mx.symbol.broadcast_mul(mx.symbol.log(1 - lenet + eps), 1 - label)
ce_loss_mean = 0 - mx.symbol.mean(ce_loss_pos + ce_loss_neg)
ce_loss = mx.symbol.MakeLoss(ce_loss_mean, name = 'ce_loss')
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = ce_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
model$symbol <- lenet
pred_test <- predict(model, Test_X)
library(pROC)
roc_test <- roc((Test_Y %in% 2:3) ~ as.numeric(pred_test))
plot(roc_test)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_test[['auc']], 4, format = 'f')), col = 'red')
– 除此之外,他是針對2做的不錯,對3其實做的非常差,這說明我們未來如果有更多不夠像2的樣本,這樣模型也很難識別出他是個錯誤的樣本。
par(mfrow = c(1, 2))
roc_test <- roc((Test_Y[Test_Y != 3] %in% 2) ~ pred_test[,Test_Y != 3])
plot(roc_test)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_test[['auc']], 4, format = 'f')), col = 'red')
roc_test <- roc((Test_Y[Test_Y != 2] %in% 3) ~ pred_test[,Test_Y != 2])
plot(roc_test)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_test[['auc']], 4, format = 'f')), col = 'red')
Valid_Normal_idx <- sample(which(!Train_Y %in% 2:3), 1000, replace = FALSE)
Valid_Abnormal_idx <- which(Train_Y %in% 2:3)
Valid_idx <- sort(c(Valid_Normal_idx, Valid_Abnormal_idx))
Valid_X <- Train_X[,,,Valid_idx,drop=FALSE]
Valid_Y <- Train_Y[Valid_idx]
Train_X <- Train_X[,,,-Valid_idx,drop=FALSE]
Train_Y <- Train_Y[-Valid_idx]
my_iterator_core <- function (batch_size) {
batch = 0
batch_per_epoch = dim(Train_X)[4] / batch_size
reset = function() {batch <<- 0}
iter.next = function() {
batch <<- batch+1
if (batch > batch_per_epoch) {return(FALSE)} else {return(TRUE)}
}
value = function() {
idx <- sample(1:dim(Train_X)[4], batch_size)
data <- mx.nd.array(Train_X[,,,idx, drop=FALSE])
label <- mx.nd.array(Train_X[,,,idx, drop=FALSE])
return(list(data = data, label = label))
}
return(list(reset = reset, iter.next = iter.next, value = value, batch_size = batch_size, batch = batch))
}
my_iterator_func <- setRefClass("Custom_Iter",
fields = c("iter", "batch_size"),
contains = "Rcpp_MXArrayDataIter",
methods = list(
initialize = function(iter, batch_size = 100){
.self$iter <- my_iterator_core(batch_size = batch_size)
.self
},
value = function(){
.self$iter$value()
},
iter.next = function(){
.self$iter$iter.next()
},
reset = function(){
.self$iter$reset()
},
finalize=function(){
}
)
)
my_iter <- my_iterator_func(iter = NULL, batch_size = 20)
# Encoder
data <- mx.symbol.Variable('data')
fc1 <- mx.symbol.FullyConnected(data = data, num.hidden = 64, name = 'fc1')
relu1 <- mx.symbol.Activation(data = fc1, act_type = "relu", name = 'relu1')
encoder_mean <- mx.symbol.FullyConnected(data = relu1, num.hidden = 8, name = 'encoder_mean')
encoder_lnsd <- mx.symbol.FullyConnected(data = relu1, num.hidden = 8, name = 'encoder_lnsd')
encoder_sd <- mx.symbol.exp(encoder_lnsd, name = 'encoder_sd')
encoder_noise <- mx.symbol.random_normal(loc = 0, scale = 1, shape = c(8, 20))
encoder <- encoder_noise * encoder_sd + encoder_mean
# Decoder
fc2 <- mx.symbol.FullyConnected(data = encoder, num.hidden = 64, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
fc3 <- mx.symbol.FullyConnected(data = relu2, num.hidden = 784, name = 'fc3')
sigmoid_out <- mx.symbol.Activation(data = fc3, act_type = "sigmoid", name = 'sigmoid_out')
decoder <- mx.symbol.reshape(data = sigmoid_out, shape = c(28, 28, 1, -1), name = 'decoder')
# MSE loss
label <- mx.symbol.Variable(name = 'label')
residual <- mx.symbol.broadcast_minus(lhs = label, rhs = decoder)
square_residual <- mx.symbol.square(data = residual)
mean_square_residual <- mx.symbol.mean(data = square_residual, axis = 0:3, keepdims = FALSE)
# Encoder loss
square_encoder_mean <- mx.symbol.square(data = encoder_mean)
loss_encoder <- mx.symbol.mean(1 + encoder_lnsd - square_encoder_mean - encoder_sd, axis = 0:1, keepdims = FALSE)
my_loss <- mx.symbol.MakeLoss(data = mean_square_residual - 5e-4 * loss_encoder, name = 'loss')
my.eval.metric.loss <- mx.metric.custom(
name = "my-loss",
function(real, pred) {
return(as.array(pred))
}
)
mx.set.seed(0)
model <- mx.model.FeedForward.create(symbol = my_loss, X = my_iter, optimizer = my_optimizer,
eval.metric = my.eval.metric.loss,
array.batch.size = 20, ctx = mx.gpu(), num.round = 20)
# Encoder
all_layers <- model$symbol$get.internals()
encoder_output <- which(all_layers$outputs == 'encoder_mean_output') %>% all_layers$get.output()
encoder_model <- model
encoder_model$symbol <- encoder_output
encoder_model$arg.params <- encoder_model$arg.params[names(encoder_model$arg.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$arg.shapes)]
encoder_model$aux.params <- encoder_model$aux.params[names(encoder_model$aux.params) %in% names(mx.symbol.infer.shape(encoder_output, data = c(28, 28, 1, 7))$aux.shapes)]
# Decoder
data <- mx.symbol.Variable('data')
fc2 <- mx.symbol.FullyConnected(data = data, num.hidden = 64, name = 'fc2')
relu2 <- mx.symbol.Activation(data = fc2, act_type = "relu", name = 'relu2')
fc3 <- mx.symbol.FullyConnected(data = relu2, num.hidden = 784, name = 'fc3')
sigmoid_out <- mx.symbol.Activation(data = fc3, act_type = "sigmoid", name = 'sigmoid_out')
decoder_output <- mx.symbol.reshape(data = sigmoid_out, shape = c(28, 28, 1, -1), name = 'decoder')
decoder_model <- model
decoder_model$symbol <- decoder_output
decoder_model$arg.params <- decoder_model$arg.params[names(decoder_model$arg.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(8, 7))$arg.shapes)]
decoder_model$aux.params <- decoder_model$aux.params[names(decoder_model$aux.params) %in% names(mx.symbol.infer.shape(decoder_output, data = c(8, 7))$aux.shapes)]
Valid_code <- predict(encoder_model, Valid_X)
Valid_img <- predict(decoder_model, Valid_code)
Valid_img[Valid_img > 1] <- 1
Valid_img[Valid_img < 0] <- 0
par(mar = rep(0,4), mfcol = c(2, 5))
for (i in 1:5) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(Valid_X[,,,which(!Valid_Y %in% 2:3)[i]], 0, 0, 1, 1, interpolate=FALSE)
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(Valid_img[,,,which(!Valid_Y %in% 2:3)[i]], 0, 0, 1, 1, interpolate=FALSE)
}
par(mar = rep(0,4), mfcol = c(2, 5))
for (i in 1:5) {
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(Valid_X[,,,which(Valid_Y %in% 2:3)[i]], 0, 0, 1, 1, interpolate=FALSE)
plot(NA, xlim = c(0.04, 0.96), ylim = c(0.04, 0.96), xaxt = "n", yaxt = "n", bty = "n")
rasterImage(Valid_img[,,,which(Valid_Y %in% 2:3)[i]], 0, 0, 1, 1, interpolate=FALSE)
}
Valid_code <- predict(encoder_model, Valid_X)
Valid_img <- predict(decoder_model, Valid_code)
Valid_diff <- Valid_diff <- apply((Valid_X - Valid_img)^2, 4, sum)
library(pROC)
roc_valid <- roc((Valid_Y %in% 2:3) ~ Valid_diff)
plot(roc_valid)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_valid[['auc']], 4, format = 'f')), col = 'red')
best_pos <- which.max(roc_valid$sensitivities + roc_valid$specificities)
best_cut <- roc_valid$thresholds[best_pos]
print(best_cut)
## [1] 38.72186
Test_code <- predict(encoder_model, Test_X)
Test_img <- predict(decoder_model, Test_code)
Test_diff <- apply((Test_img - Test_X)^2, 4, sum)
library(pROC)
tab_test <- table(factor(Test_diff >= best_cut, levels = c(FALSE, TRUE)), Test_Y %in% 2:3)
sens <- tab_test[2,2] / sum(tab_test[,2])
spec <- tab_test[1,1] / sum(tab_test[,1])
roc_test <- roc((Test_Y %in% 2:3) ~ Test_diff)
plot(roc_test)
points(spec, sens, col = 'red', pch = 19)
text(0.5, 0.5, paste0('Sens = ', formatC(sens, digits = 3, format = 'f'),
'\nSpec = ', formatC(spec, digits = 3, format = 'f'),
'\nAUC = ', formatC(roc_test$auc, digits = 3, format = 'f')), col = 'red')
par(mfrow = c(1, 2))
roc_test.1 <- roc((Test_Y[Test_Y != 2] %in% 2:3) ~ Test_diff[Test_Y != 2])
plot(roc_test.1)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_test.1[['auc']], 4, format = 'f')), col = 'red')
roc_test.2 <- roc((Test_Y[Test_Y != 3] %in% 2:3) ~ Test_diff[Test_Y != 3])
plot(roc_test.2)
text(0.5, 0.5, paste0('AUC = ', formatC(roc_test.2[['auc']], 4, format = 'f')), col = 'red')
– 這堂課最有趣的是最後的異常檢測,原來我們可以透過這種方式來獲得異常檢測的方式,而這種方式讓我們不需要有足夠的異常樣本即可進行。
– 看過一些深度學習應用之後,你將不得不佩服電腦科學家的厲害。未來在面對你自己的研究時,希望你也能想出有趣的演算法來解決問題!