hypergan.gans.aligned_gan module
import importlib
import json
import numpy as np
import os
import sys
import time
import uuid
import copy
from hypergan.discriminators import *
from hypergan.encoders import *
from hypergan.generators import *
from hypergan.inputs import *
from hypergan.samplers import *
from hypergan.trainers import *
import hyperchamber as hc
from hyperchamber import Config
from hypergan.ops import TensorflowOps
import tensorflow as tf
import hypergan as hg
from hypergan.gan_component import ValidationException, GANComponent
from hypergan.trainers.multi_step_trainer import MultiStepTrainer
from .base_gan import BaseGAN
from .standard_gan import StandardGAN
class AlignedGAN(BaseGAN):
def required(self):
return ["generator", "discriminator"]
def create(self):
config = self.config
ops = self.ops
self.session = self.ops.new_session(self.ops_config)
encoder_config = dict(config.input_encoder)
encode_a = self.create_component(encoder_config)
encode_a.ops.describe("encode_a")
encode_b = self.create_component(encoder_config)
encode_b.ops.describe("encode_b")
g_ab = self.create_component(config.generator)
g_ab.ops.describe("g_ab")
g_ba = self.create_component(config.generator)
g_ba.ops.describe("g_ba")
#encode_a.ops = g_ab.ops
#encode_b.ops = g_ba.ops
encode_a.create(self.inputs.xa)
encode_b.create(self.inputs.xb)
g_ab.create(encode_a.sample)
g_ba.create(encode_b.sample)
self.xba = g_ba.sample
self.xab = g_ab.sample
discriminator_a = self.create_component(config.discriminator)
discriminator_b = self.create_component(config.discriminator)
discriminator_a.ops.describe("discriminator_a")
discriminator_b.ops.describe("discriminator_b")
discriminator_a.create(x=self.inputs.xa, g=g_ba.sample)
discriminator_b.create(x=self.inputs.xb, g=g_ab.sample)
encode_g_ab = encode_b.reuse(g_ab.sample)
encode_g_ba = encode_a.reuse(g_ba.sample)
cyca = g_ba.reuse(encode_g_ab)
cycb = g_ab.reuse(encode_g_ba)
lossa = self.create_component(config.loss, discriminator=discriminator_a, generator=g_ba)
lossb = self.create_component(config.loss, discriminator=discriminator_b, generator=g_ab)
lossa.create()
lossb.create()
cycloss = tf.reduce_mean(tf.abs(self.inputs.xa-cyca)) + \
tf.reduce_mean(tf.abs(self.inputs.xb-cycb))
# loss terms
cycloss_lambda = config.cycloss_lambda
if cycloss_lambda is None:
cycloss_lambda = 10
cycloss *= cycloss_lambda
loss1=('generator', cycloss + lossb.g_loss)
loss2=('discriminator', lossb.d_loss)
loss3=('generator', cycloss + lossa.g_loss)
loss4=('discriminator', lossa.d_loss)
var_lists = []
var_lists.append(encode_a.variables() + g_ab.variables())
var_lists.append(discriminator_b.variables())
var_lists.append(encode_b.variables() + g_ba.variables())
var_lists.append(discriminator_a.variables())
metrics = []
metrics.append(lossa.metrics)
metrics.append(None)
metrics.append(lossb.metrics)
metrics.append(None)
self.trainer = MultiStepTrainer(self, self.config.trainer, [loss1,loss2,loss3,loss4], var_lists=var_lists, metrics=metrics)
self.trainer.create()
self.cyca = cyca
self.cycb = cycb
self.cycloss = cycloss
self.encoder = encode_a
self.generator = g_ab
self.session.run(tf.global_variables_initializer())
def step(self, feed_dict={}):
return self.trainer.step(feed_dict)
Classes
class AlignedGAN
GANComponents are pluggable pieces within a GAN.
GAN objects are also GANComponents.
class AlignedGAN(BaseGAN):
def required(self):
return ["generator", "discriminator"]
def create(self):
config = self.config
ops = self.ops
self.session = self.ops.new_session(self.ops_config)
encoder_config = dict(config.input_encoder)
encode_a = self.create_component(encoder_config)
encode_a.ops.describe("encode_a")
encode_b = self.create_component(encoder_config)
encode_b.ops.describe("encode_b")
g_ab = self.create_component(config.generator)
g_ab.ops.describe("g_ab")
g_ba = self.create_component(config.generator)
g_ba.ops.describe("g_ba")
#encode_a.ops = g_ab.ops
#encode_b.ops = g_ba.ops
encode_a.create(self.inputs.xa)
encode_b.create(self.inputs.xb)
g_ab.create(encode_a.sample)
g_ba.create(encode_b.sample)
self.xba = g_ba.sample
self.xab = g_ab.sample
discriminator_a = self.create_component(config.discriminator)
discriminator_b = self.create_component(config.discriminator)
discriminator_a.ops.describe("discriminator_a")
discriminator_b.ops.describe("discriminator_b")
discriminator_a.create(x=self.inputs.xa, g=g_ba.sample)
discriminator_b.create(x=self.inputs.xb, g=g_ab.sample)
encode_g_ab = encode_b.reuse(g_ab.sample)
encode_g_ba = encode_a.reuse(g_ba.sample)
cyca = g_ba.reuse(encode_g_ab)
cycb = g_ab.reuse(encode_g_ba)
lossa = self.create_component(config.loss, discriminator=discriminator_a, generator=g_ba)
lossb = self.create_component(config.loss, discriminator=discriminator_b, generator=g_ab)
lossa.create()
lossb.create()
cycloss = tf.reduce_mean(tf.abs(self.inputs.xa-cyca)) + \
tf.reduce_mean(tf.abs(self.inputs.xb-cycb))
# loss terms
cycloss_lambda = config.cycloss_lambda
if cycloss_lambda is None:
cycloss_lambda = 10
cycloss *= cycloss_lambda
loss1=('generator', cycloss + lossb.g_loss)
loss2=('discriminator', lossb.d_loss)
loss3=('generator', cycloss + lossa.g_loss)
loss4=('discriminator', lossa.d_loss)
var_lists = []
var_lists.append(encode_a.variables() + g_ab.variables())
var_lists.append(discriminator_b.variables())
var_lists.append(encode_b.variables() + g_ba.variables())
var_lists.append(discriminator_a.variables())
metrics = []
metrics.append(lossa.metrics)
metrics.append(None)
metrics.append(lossb.metrics)
metrics.append(None)
self.trainer = MultiStepTrainer(self, self.config.trainer, [loss1,loss2,loss3,loss4], var_lists=var_lists, metrics=metrics)
self.trainer.create()
self.cyca = cyca
self.cycb = cycb
self.cycloss = cycloss
self.encoder = encode_a
self.generator = g_ab
self.session.run(tf.global_variables_initializer())
def step(self, feed_dict={}):
return self.trainer.step(feed_dict)
Ancestors (in MRO)
- AlignedGAN
- hypergan.gans.base_gan.BaseGAN
- hypergan.gan_component.GANComponent
- builtins.object
Static methods
def __init__(
self, config=None, inputs=None, device='/gpu:0', ops_config=None, ops_backend=<class 'hypergan.ops.tensorflow.ops.TensorflowOps'>, batch_size=None, width=None, height=None, channels=None)
Initialized a new GAN.
def __init__(self, config=None, inputs=None, device='/gpu:0', ops_config=None, ops_backend=TensorflowOps,
batch_size=None, width=None, height=None, channels=None):
""" Initialized a new GAN."""
self.inputs = inputs
self.device = device
self.ops_backend = ops_backend
self.ops_config = ops_config
self.created = False
self.components = []
self._batch_size = batch_size
self._width = width
self._height = height
self._channels = channels
if config == None:
config = hg.Configuration.default()
# A GAN as a component has a parent of itself
# gan.gan.gan.gan.gan.gan
GANComponent.__init__(self, self, config)
def batch_size(
self)
def batch_size(self):
if self._batch_size:
return self._batch_size
if self.inputs == None:
raise ValidationException("gan.batch_size() requested but no inputs provided")
return self.ops.shape(self.inputs.x)[0]
def biases(
self)
Biases of the GAN component.
def biases(self):
"""
Biases of the GAN component.
"""
return self.ops.biases
def channels(
self)
def channels(self):
if self._channels:
return self._channels
if self.inputs == None:
raise ValidationException("gan.channels() requested but no inputs provided")
return self.ops.shape(self.inputs.x)[-1]
def create(
self)
def create(self):
config = self.config
ops = self.ops
self.session = self.ops.new_session(self.ops_config)
encoder_config = dict(config.input_encoder)
encode_a = self.create_component(encoder_config)
encode_a.ops.describe("encode_a")
encode_b = self.create_component(encoder_config)
encode_b.ops.describe("encode_b")
g_ab = self.create_component(config.generator)
g_ab.ops.describe("g_ab")
g_ba = self.create_component(config.generator)
g_ba.ops.describe("g_ba")
#encode_a.ops = g_ab.ops
#encode_b.ops = g_ba.ops
encode_a.create(self.inputs.xa)
encode_b.create(self.inputs.xb)
g_ab.create(encode_a.sample)
g_ba.create(encode_b.sample)
self.xba = g_ba.sample
self.xab = g_ab.sample
discriminator_a = self.create_component(config.discriminator)
discriminator_b = self.create_component(config.discriminator)
discriminator_a.ops.describe("discriminator_a")
discriminator_b.ops.describe("discriminator_b")
discriminator_a.create(x=self.inputs.xa, g=g_ba.sample)
discriminator_b.create(x=self.inputs.xb, g=g_ab.sample)
encode_g_ab = encode_b.reuse(g_ab.sample)
encode_g_ba = encode_a.reuse(g_ba.sample)
cyca = g_ba.reuse(encode_g_ab)
cycb = g_ab.reuse(encode_g_ba)
lossa = self.create_component(config.loss, discriminator=discriminator_a, generator=g_ba)
lossb = self.create_component(config.loss, discriminator=discriminator_b, generator=g_ab)
lossa.create()
lossb.create()
cycloss = tf.reduce_mean(tf.abs(self.inputs.xa-cyca)) + \
tf.reduce_mean(tf.abs(self.inputs.xb-cycb))
# loss terms
cycloss_lambda = config.cycloss_lambda
if cycloss_lambda is None:
cycloss_lambda = 10
cycloss *= cycloss_lambda
loss1=('generator', cycloss + lossb.g_loss)
loss2=('discriminator', lossb.d_loss)
loss3=('generator', cycloss + lossa.g_loss)
loss4=('discriminator', lossa.d_loss)
var_lists = []
var_lists.append(encode_a.variables() + g_ab.variables())
var_lists.append(discriminator_b.variables())
var_lists.append(encode_b.variables() + g_ba.variables())
var_lists.append(discriminator_a.variables())
metrics = []
metrics.append(lossa.metrics)
metrics.append(None)
metrics.append(lossb.metrics)
metrics.append(None)
self.trainer = MultiStepTrainer(self, self.config.trainer, [loss1,loss2,loss3,loss4], var_lists=var_lists, metrics=metrics)
self.trainer.create()
self.cyca = cyca
self.cycb = cycb
self.cycloss = cycloss
self.encoder = encode_a
self.generator = g_ab
self.session.run(tf.global_variables_initializer())
def create_component(
self, defn, *args, **kw_args)
def create_component(self, defn, *args, **kw_args):
if defn == None:
return None
if defn['class'] == None:
raise ValidationException("Component definition is missing '" + name + "'")
gan_component = defn['class'](self, defn, *args, **kw_args)
self.components.append(gan_component)
return gan_component
def create_ops(
self, config)
Create the ops object as self.ops
. Also looks up config
def create_ops(self, config):
"""
Create the ops object as `self.ops`. Also looks up config
"""
if self.gan is None:
return
if self.gan.ops_backend is None:
return
self.ops = self.gan.ops_backend(config=self.config, device=self.gan.device)
self.config = self.gan.ops.lookup(config)
def fully_connected_from_list(
self, nets)
def fully_connected_from_list(self, nets):
results = []
ops = self.ops
for net, net2 in nets:
net = ops.concat([net, net2], axis=3)
shape = ops.shape(net)
bs = shape[0]
net = ops.reshape(net, [bs, -1])
features = ops.shape(net)[1]
net = ops.linear(net, features)
#net = self.layer_regularizer(net)
net = ops.lookup('lrelu')(net)
#net = ops.linear(net, features)
net = ops.reshape(net, shape)
results.append(net)
return results
def get_config_value(
self, symbol)
def get_config_value(self, symbol):
if symbol in self.config:
config = hc.Config(hc.lookup_functions(self.config[symbol]))
return config
return None
def height(
self)
def height(self):
if self._height:
return self._height
if self.inputs == None:
raise ValidationException("gan.height() requested but no inputs provided")
return self.ops.shape(self.inputs.x)[1]
def layer_regularizer(
self, net)
def layer_regularizer(self, net):
symbol = self.config.layer_regularizer
op = self.gan.ops.lookup(symbol)
if op:
net = op(self, net)
return net
def load(
self, save_file)
def load(self, save_file):
save_file = os.path.expanduser(save_file)
if os.path.isfile(save_file) or os.path.isfile(save_file + ".index" ):
print("[hypergan] |= Loading network from "+ save_file)
dir = os.path.dirname(save_file)
print("[hypergan] |= Loading checkpoint from "+ dir)
ckpt = tf.train.get_checkpoint_state(os.path.expanduser(dir))
if ckpt and ckpt.model_checkpoint_path:
saver = tf.train.Saver()
saver.restore(self.session, save_file)
loadedFromSave = True
return True
else:
return False
else:
return False
def permute(
self, nets, k)
def permute(self, nets, k):
return list(itertools.permutations(nets, k))
def relation_layer(
self, net)
def relation_layer(self, net):
ops = self.ops
#hack
shape = ops.shape(net)
input_size = shape[1]*shape[2]*shape[3]
netlist = self.split_by_width_height(net)
permutations = self.permute(netlist, 2)
permutations = self.fully_connected_from_list(permutations)
net = ops.concat(permutations, axis=3)
#hack
bs = ops.shape(net)[0]
net = ops.reshape(net, [bs, -1])
net = ops.linear(net, input_size)
net = ops.reshape(net, shape)
return net
def required(
self)
Return a list of required config strings and a ValidationException
will be thrown if any are missing.
Example:
python
class MyComponent(GANComponent):
def required(self):
"learn rate is required"
["learn_rate"]
def required(self):
return ["generator", "discriminator"]
def reuse(
self, net)
def reuse(self, net):
self.ops.reuse()
net = self.build(net)
self.ops.stop_reuse()
return net
def save(
self, save_file)
def save(self, save_file):
print("[hypergan] Saving network to ", save_file)
os.makedirs(os.path.expanduser(os.path.dirname(save_file)), exist_ok=True)
saver = tf.train.Saver()
saver.save(self.session, save_file)
def split_batch(
self, net, count=2)
Discriminators return stacked results (on axis 0).
This splits the results. Returns [d_real, d_fake]
def split_batch(self, net, count=2):
"""
Discriminators return stacked results (on axis 0).
This splits the results. Returns [d_real, d_fake]
"""
ops = self.ops or self.gan.ops
s = ops.shape(net)
bs = s[0]
nets = []
net = ops.reshape(net, [bs, -1])
start = [0 for x in ops.shape(net)]
for i in range(count):
size = [bs//count] + [x for x in ops.shape(net)[1:]]
nets.append(ops.slice(net, start, size))
start[0] += bs//count
return nets
def split_by_width_height(
self, net)
def split_by_width_height(self, net):
elems = []
ops = self.gan.ops
shape = ops.shape(net)
bs = shape[0]
height = shape[1]
width = shape[2]
for i in range(width):
for j in range(height):
elems.append(ops.slice(net, [0, i, j, 0], [bs, 1, 1, -1]))
return elems
def step(
self, feed_dict={})
def step(self, feed_dict={}):
return self.trainer.step(feed_dict)
def validate(
self)
Validates a GANComponent. Return an array of error messages. Empty array []
means success.
def validate(self):
"""
Validates a GANComponent. Return an array of error messages. Empty array `[]` means success.
"""
errors = []
required = self.required()
for argument in required:
if(self.config.__getattr__(argument) == None):
errors.append("`"+argument+"` required")
if(self.gan is None):
errors.append("GANComponent constructed without GAN")
return errors
def variables(
self)
All variables associated with this component.
def variables(self):
"""
All variables associated with this component.
"""
return self.ops.variables()
def weights(
self)
The weights of the GAN component.
def weights(self):
"""
The weights of the GAN component.
"""
return self.ops.weights
def width(
self)
def width(self):
if self._width:
return self._width
if self.inputs == None:
raise ValidationException("gan.width() requested but no inputs provided")
return self.ops.shape(self.inputs.x)[2]