代码拉取完成,页面将自动刷新
同步操作将从 mirrors_bojone/vae 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#! -*- coding: utf-8 -*-
import numpy as np
from keras.layers import *
from keras.models import Model
from keras import backend as K
import imageio,os
from keras.datasets import mnist
# from keras.datasets import fashion_mnist as mnist
batch_size = 100
latent_dim = 20
epochs = 50
num_classes = 10
img_dim = 28
filters = 16
intermediate_dim = 256
# 加载MNIST数据集
(x_train, y_train_), (x_test, y_test_) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((-1, img_dim, img_dim, 1))
x_test = x_test.reshape((-1, img_dim, img_dim, 1))
# 搭建模型
x = Input(shape=(img_dim, img_dim, 1))
h = x
for i in range(2):
filters *= 2
h = Conv2D(filters=filters,
kernel_size=3,
strides=2,
padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2D(filters=filters,
kernel_size=3,
strides=1,
padding='same')(h)
h = LeakyReLU(0.2)(h)
h_shape = K.int_shape(h)[1:]
h = Flatten()(h)
z_mean = Dense(latent_dim)(h) # p(z|x)的均值
z_log_var = Dense(latent_dim)(h) # p(z|x)的方差
encoder = Model(x, z_mean) # 通常认为z_mean就是所需的隐变量编码
z = Input(shape=(latent_dim,))
h = z
h = Dense(np.prod(h_shape))(h)
h = Reshape(h_shape)(h)
for i in range(2):
h = Conv2DTranspose(filters=filters,
kernel_size=3,
strides=1,
padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2DTranspose(filters=filters,
kernel_size=3,
strides=2,
padding='same')(h)
h = LeakyReLU(0.2)(h)
filters //= 2
x_recon = Conv2DTranspose(filters=1,
kernel_size=3,
activation='sigmoid',
padding='same')(h)
decoder = Model(z, x_recon) # 解码器
generator = decoder
z = Input(shape=(latent_dim,))
y = Dense(intermediate_dim, activation='relu')(z)
y = Dense(num_classes, activation='softmax')(y)
classfier = Model(z, y) # 隐变量分类器
# 重参数技巧
def sampling(args):
z_mean, z_log_var = args
epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
return z_mean + K.exp(z_log_var / 2) * epsilon
# 重参数层,相当于给输入加入噪声
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
x_recon = decoder(z)
y = classfier(z)
class Gaussian(Layer):
"""这是个简单的层,定义q(z|y)中的均值参数,每个类别配一个均值。
然后输出“z - 均值”,为后面计算loss准备。
"""
def __init__(self, num_classes, **kwargs):
self.num_classes = num_classes
super(Gaussian, self).__init__(**kwargs)
def build(self, input_shape):
latent_dim = input_shape[-1]
self.mean = self.add_weight(name='mean',
shape=(self.num_classes, latent_dim),
initializer='zeros')
def call(self, inputs):
z = inputs # z.shape=(batch_size, latent_dim)
z = K.expand_dims(z, 1)
return z - K.expand_dims(self.mean, 0)
def compute_output_shape(self, input_shape):
return (None, self.num_classes, input_shape[-1])
gaussian = Gaussian(num_classes)
z_prior_mean = gaussian(z)
# 建立模型
vae = Model(x, [x_recon, z_prior_mean, y])
# 下面一大通都是为了定义loss
z_mean = K.expand_dims(z_mean, 1)
z_log_var = K.expand_dims(z_log_var, 1)
lamb = 2.5 # 这是重构误差的权重,它的相反数就是重构方差,越大意味着方差越小。
xent_loss = 0.5 * K.mean((x - x_recon)**2, 0)
kl_loss = - 0.5 * (z_log_var - K.square(z_prior_mean))
kl_loss = K.mean(K.batch_dot(K.expand_dims(y, 1), kl_loss), 0)
cat_loss = K.mean(y * K.log(y + K.epsilon()), 0)
vae_loss = lamb * K.sum(xent_loss) + K.sum(kl_loss) + K.sum(cat_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')
vae.summary()
vae.fit(x_train,
shuffle=True,
epochs=epochs,
batch_size=batch_size,
validation_data=(x_test, None))
means = K.eval(gaussian.mean)
x_train_encoded = encoder.predict(x_train)
y_train_pred = classfier.predict(x_train_encoded).argmax(axis=1)
x_test_encoded = encoder.predict(x_test)
y_test_pred = classfier.predict(x_test_encoded).argmax(axis=1)
def cluster_sample(path, category=0):
"""观察被模型聚为同一类的样本
"""
n = 8
figure = np.zeros((img_dim * n, img_dim * n))
idxs = np.where(y_train_pred == category)[0]
for i in range(n):
for j in range(n):
digit = x_train[np.random.choice(idxs)]
digit = digit.reshape((img_dim, img_dim))
figure[i * img_dim: (i + 1) * img_dim,
j * img_dim: (j + 1) * img_dim] = digit
imageio.imwrite(path, figure * 255)
def random_sample(path, category=0, std=1):
"""按照聚类结果进行条件随机生成
"""
n = 8
figure = np.zeros((img_dim * n, img_dim * n))
for i in range(n):
for j in range(n):
noise_shape = (1, latent_dim)
z_sample = np.array(np.random.randn(*noise_shape)) * std + means[category]
x_recon = generator.predict(z_sample)
digit = x_recon[0].reshape((img_dim, img_dim))
figure[i * img_dim: (i + 1) * img_dim,
j * img_dim: (j + 1) * img_dim] = digit
imageio.imwrite(path, figure * 255)
if not os.path.exists('samples'):
os.mkdir('samples')
for i in range(10):
cluster_sample(u'samples/聚类类别_%s.png' % i, i)
random_sample(u'samples/类别采样_%s.png' % i, i)
right = 0.
for i in range(10):
_ = np.bincount(y_train_[y_train_pred == i])
right += _.max()
print 'train acc: %s' % (right / len(y_train_))
right = 0.
for i in range(10):
_ = np.bincount(y_test_[y_test_pred == i])
right += _.max()
print 'test acc: %s' % (right / len(y_test_))
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。