In [None]:
!mkdir dichasus
!wget --content-disposition https://darus.uni-stuttgart.de/api/access/datafile/:persistentId?persistentId=doi:10.18419/darus-2202/2 -P dichasus # dichasus-0152
!wget --content-disposition https://darus.uni-stuttgart.de/api/access/datafile/:persistentId?persistentId=doi:10.18419/darus-2202/3 -P dichasus # dichasus-0153
!wget --content-disposition https://darus.uni-stuttgart.de/api/access/datafile/:persistentId?persistentId=doi:10.18419/darus-2202/4 -P dichasus # dichasus-0154

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

training_files = ["dichasus/dichasus-0152.tfrecords", "dichasus/dichasus-0153.tfrecords"]
test_files = ["dichasus/dichasus-0154.tfrecords"]

feature_description = {
	"pos-tachy": tf.io.FixedLenFeature([], tf.string, default_value = ''),
	"csi": tf.io.FixedLenFeature([], tf.string, default_value = ''),
}

def record_parse_function(proto):
	record = tf.io.parse_single_example(proto, feature_description)

	pos = tf.io.parse_tensor(record["pos-tachy"], out_type = tf.float64)
	csi_tensor = tf.ensure_shape(tf.io.parse_tensor(record["csi"], out_type = tf.float32), (32, 1024, 2))
	csi_tensor = tf.signal.fftshift(csi_tensor, axes = 1)

	# Average neighboring subcarriers
	chunksize = 32
	featurecount = csi_tensor.shape[1] // chunksize
	csi_averaged = tf.stack([tf.math.reduce_mean(csi_tensor[:, (chunksize * s):(chunksize * (s + 1)), :], axis = 1) for s in range(featurecount)], axis = 1)

	csi_UL = csi_averaged[:, :8] / tf.norm(csi_tensor[:, :8])
	csi_DL = csi_averaged[:, -5] / tf.norm(csi_tensor[:, -5])

	return csi_UL, csi_DL, pos[:2]

training_set = tf.data.TFRecordDataset(training_files).map(record_parse_function).cache()
test_set = tf.data.TFRecordDataset(test_files).map(record_parse_function).cache()

training_set = training_set.shuffle(buffer_size = 100000)
test_set = test_set.shuffle(buffer_size = 100000)

In [None]:
positions = []
for csi_ul, csi_dl, pos in test_set:
	positions.append(pos)
positions = np.vstack(positions)

plt.figure(figsize = (8, 8))
plt.title("Ground Truth Positions", fontsize = 16, pad = 16)
plt.scatter(positions[:, 0], positions[:, 1], marker = ".")
plt.axis("equal")
plt.tick_params(axis = "both", which = "major", labelsize = 16)
plt.xlabel("$x$ coordinate [m]", fontsize = 16)
plt.ylabel("$y$ coordinate [m]", fontsize = 16)
plt.show()

In [None]:
def normalize_csi(channelvectors):
	mean_power = tf.sqrt(tf.reduce_sum(tf.abs(channelvectors)**2, axis = -1))
	return tf.math.divide(channelvectors, tf.cast(tf.expand_dims(mean_power, axis = -1), tf.complex64))

def loss_power(y_true, y_pred):
	# Cast true channel / predicted channel vectors to complex-valued vectors
	y_true = normalize_csi(tf.complex(y_true[..., 0], y_true[..., 1]))
	y_pred = normalize_csi(tf.complex(y_pred[..., 0], y_pred[..., 1]))
		
	power = tf.square(tf.abs(tf.reduce_sum(tf.math.multiply(y_true, tf.math.conj(y_pred)), axis = 1)))
	
	# Since the loss-function is calculated over the whole batch, need to compute mean
	return 1 - tf.reduce_mean(power)

In [None]:
# Number of neurons in the penalty layer
latent_space_dimensionality = 2

In [None]:
encoder_input = tf.keras.layers.Input((32, 8, 2), name="Encoder_Input")

nn_output = tf.keras.layers.Flatten()(encoder_input)
nn_output = tf.keras.layers.Dense(units = 256, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 128, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 64, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 32, activation = "relu")(nn_output)
encoder_output = tf.keras.layers.Dense(units = latent_space_dimensionality, activation = "linear", name = "Encoder_Output")(nn_output)

encoder = tf.keras.Model(inputs = [encoder_input], outputs = encoder_output, name = "encoder")

nn_output = tf.keras.layers.Dense(units = 16, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 32, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 64, activation = "relu")(nn_output)
nn_output = tf.keras.layers.Dense(units = 32 * 2, activation = "linear", name = "Decoder_Output")(nn_output)
decoder_output = tf.keras.layers.Reshape((32, 2))(nn_output)

model = tf.keras.Model(inputs = [encoder_input], outputs = decoder_output, name = "EncoderDecoder")

In [None]:
# Compile encoder / decoder model:
model.compile(optimizer=tf.keras.optimizers.Adam(), loss = loss_power)

def remove_pos(csi_ul, csi_dl, pos):
	return (csi_ul, csi_dl)

# Use multiple training steps with increase batch size:
batch_sizes = [32, 64, 128, 256, 512, 1024]
for b in batch_sizes:
	training_set_batched = training_set.batch(b)
	test_set_batched = test_set.batch(b) 
	print("\nBatch Size:", b)
	model.fit(training_set_batched.map(remove_pos), epochs = 10, validation_data = test_set_batched.map(remove_pos))

In [None]:
def power(y_true, y_pred):
	y_true = normalize_csi(tf.complex(y_true[..., 0], y_true[..., 1]))
	y_pred = normalize_csi(tf.complex(y_pred[..., 0], y_pred[..., 1]))

	return tf.square(tf.abs(tf.reduce_sum(tf.math.multiply(y_true, tf.math.conj(y_pred)), axis = 1)))

powers = []
positions = []

for csi_UL, csi_DL, pos in test_set.batch(1000):
	powers.append(power(csi_DL, model.predict(csi_UL)).numpy())
	positions.append(pos.numpy())

powers = np.hstack(powers)
positions = np.vstack(positions)

In [None]:
plt.figure(figsize = (8, 8))
plt.title("Normalized receive power $P$",  fontsize = 16, pad = 16)
plt.hexbin(x = positions[:, 0], y = positions[:, 1], vmin = -15, vmax = 0, C = 10 * np.log10(powers), gridsize = 28)
plt.axis("equal")
plt.tick_params(axis = "both", which = "major", labelsize = 16)
plt.xlabel("$x$ coordinate [m]", fontsize = 16)
plt.ylabel("$y$ coordinate [m]", fontsize = 16)

cb = plt.colorbar()
cb.set_label("Normalized Power $P$ [dB]", fontsize = 16)
cb.ax.tick_params(labelsize = 16)

plt.show()

In [None]:
plt.figure(figsize = (15, 4))
plt.title("$P$: Histogram", fontsize = 16, pad = 16)
plt.xlabel("Normalized Power [dB]", fontsize = 16)
plt.ylabel("Occurences", fontsize = 16)
plt.tick_params(axis = "both", labelsize = 16)

plt.hist(10 * np.log10(powers), range = (-15, 0), bins = 100)
plt.show()

In [None]:
latentspace = []
positions = []

for csi_UL, csi_DL, pos in test_set_batched:
	latentspace.append(encoder.predict(csi_UL))
	positions.append(pos)

latentspace = np.vstack(latentspace)
positions = np.vstack(positions)

In [None]:
for neuron in range(latent_space_dimensionality):
	plt.figure(figsize = (8, 8))
	plt.title(r"""Latent variable $\tilde x_""" + str(neuron) + "$", fontsize = 16, pad=16)

	plt.hexbin(x = positions[:,0], y = positions[:,1], C = latentspace[:, neuron], gridsize = 35, bins=None)
	plt.tick_params(axis = "both", labelsize = 16)

	cb = plt.colorbar()
	cb.set_label("Value in Latent Space", fontsize=16)
	cb.ax.tick_params(labelsize = 16)
	plt.xlabel("$x$ coordinate [m]", fontsize = 16)
	plt.ylabel("$y$ coordinate [m]", fontsize = 16)

	plt.show()