{ "cells": [ { "cell_type": "raw", "id": "fb28bffb", "metadata": {}, "source": [ "Run in Google Colab" ] }, { "cell_type": "markdown", "id": "bd403cec", "metadata": {}, "source": [ "# Autoencoders in SciKeras\n", "\n", "Autencoders are an approach to use nearual networks to distill data into it's most important features, thereby compressing the data.\n", "We will be following the [Keras tutorial](https://blog.keras.io/building-autoencoders-in-keras.html) on the topic, which goes much more in depth and breadth than we will here.\n", "You are highly encouraged to check out that tutorial if you want to learn about autoencoders in the general sense.\n", "\n", "## Table of contents\n", "\n", "* [1. Setup](#1.-Setup)\n", "* [2. Data](#2.-Data)\n", "* [3. Define Keras Model](#3.-Define-Keras-Model)\n", "* [4. Training](#4.-Training)\n", "* [5. Explore Results](#5.-Explore-Results)\n", "* [6. Deep AutoEncoder](#6.-Deep-AutoEncoder)\n", "\n", "## 1. Setup" ] }, { "cell_type": "code", "execution_count": 1, "id": "651d6e81", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:35.382842Z", "iopub.status.busy": "2022-10-14T16:50:35.382570Z", "iopub.status.idle": "2022-10-14T16:50:37.647882Z", "shell.execute_reply": "2022-10-14T16:50:37.647277Z" } }, "outputs": [], "source": [ "try:\n", " import scikeras\n", "except ImportError:\n", " !python -m pip install scikeras" ] }, { "cell_type": "markdown", "id": "317207fd", "metadata": {}, "source": [ "Silence TensorFlow logging to keep output succinct." ] }, { "cell_type": "code", "execution_count": 2, "id": "a0859ea9", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:37.654249Z", "iopub.status.busy": "2022-10-14T16:50:37.651703Z", "iopub.status.idle": "2022-10-14T16:50:37.660099Z", "shell.execute_reply": "2022-10-14T16:50:37.659594Z" } }, "outputs": [], "source": [ "import warnings\n", "from tensorflow import get_logger\n", "get_logger().setLevel('ERROR')\n", "warnings.filterwarnings(\"ignore\", message=\"Setting the random state for TF\")" ] }, { "cell_type": "code", "execution_count": 3, "id": "48ba7f7f", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:37.665161Z", "iopub.status.busy": "2022-10-14T16:50:37.663625Z", "iopub.status.idle": "2022-10-14T16:50:37.907124Z", "shell.execute_reply": "2022-10-14T16:50:37.905259Z" } }, "outputs": [], "source": [ "import numpy as np\n", "from scikeras.wrappers import KerasClassifier, KerasRegressor\n", "from tensorflow import keras" ] }, { "cell_type": "markdown", "id": "af357d15", "metadata": {}, "source": [ "## 2. Data\n", "\n", "We load the dataset from the Keras tutorial. The dataset consists of images of cats and dogs." ] }, { "cell_type": "code", "execution_count": 4, "id": "b74cdbad", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:37.915145Z", "iopub.status.busy": "2022-10-14T16:50:37.911204Z", "iopub.status.idle": "2022-10-14T16:50:38.443076Z", "shell.execute_reply": "2022-10-14T16:50:38.442472Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(60000, 784)\n", "(10000, 784)\n" ] } ], "source": [ "from tensorflow.keras.datasets import mnist\n", "import numpy as np\n", "\n", "\n", "(x_train, _), (x_test, _) = mnist.load_data()\n", "x_train = x_train.astype('float32') / 255.\n", "x_test = x_test.astype('float32') / 255.\n", "x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))\n", "x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))\n", "print(x_train.shape)\n", "print(x_test.shape)" ] }, { "cell_type": "markdown", "id": "c9f0e289", "metadata": {}, "source": [ "## 3. Define Keras Model\n", "\n", "We will be defining a very simple autencoder. We define _three_ model architectures:\n", "\n", "1. An encoder: a series of densly connected layers culminating in an \"output\" layer that determines the encoding dimensions.\n", "2. A decoder: takes the output of the encoder as it's input and reconstructs the original data.\n", "3. An autoencoder: a chain of the encoder and decoder that directly connects them for training purposes.\n", "\n", "The only variable we give our model is the encoding dimensions, which will be a hyperparemter of our final transformer.\n", "\n", "The encoder and decoder are views to the first/last layers of the autoencoder model.\n", "They'll be directly used in `transform` and `inverse_transform`, so we'll create some SciKeras models with those layers\n", "and save them as in `encoder_model_` and `decoder_model_`. All three models are created within `_keras_build_fn`.\n", "\n", "For a background on chaining Functional Models like this, see [All models are callable](https://keras.io/guides/functional_api/#all-models-are-callable-just-like-layers) in the Keras docs." ] }, { "cell_type": "code", "execution_count": 5, "id": "b4d6c6c8", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:38.447964Z", "iopub.status.busy": "2022-10-14T16:50:38.446816Z", "iopub.status.idle": "2022-10-14T16:50:38.459164Z", "shell.execute_reply": "2022-10-14T16:50:38.458636Z" } }, "outputs": [], "source": [ "from typing import Dict, Any\n", "\n", "from sklearn.base import TransformerMixin\n", "from sklearn.metrics import mean_squared_error\n", "from scikeras.wrappers import BaseWrapper\n", "\n", "\n", "class AutoEncoder(BaseWrapper, TransformerMixin):\n", " \"\"\"A class that enables transform and fit_transform.\n", " \"\"\"\n", "\n", " encoder_model_: BaseWrapper\n", " decoder_model_: BaseWrapper\n", " \n", " def _keras_build_fn(self, encoding_dim: int, meta: Dict[str, Any]):\n", " n_features_in = meta[\"n_features_in_\"]\n", "\n", " encoder_input = keras.Input(shape=(n_features_in,))\n", " encoder_output = keras.layers.Dense(encoding_dim, activation='relu')(encoder_input)\n", " encoder_model = keras.Model(encoder_input, encoder_output)\n", "\n", " decoder_input = keras.Input(shape=(encoding_dim,))\n", " decoder_output = keras.layers.Dense(n_features_in, activation='sigmoid', name=\"decoder\")(decoder_input)\n", " decoder_model = keras.Model(decoder_input, decoder_output)\n", " \n", " autoencoder_input = keras.Input(shape=(n_features_in,))\n", " encoded_img = encoder_model(autoencoder_input)\n", " reconstructed_img = decoder_model(encoded_img)\n", "\n", " autoencoder_model = keras.Model(autoencoder_input, reconstructed_img)\n", "\n", " self.encoder_model_ = BaseWrapper(encoder_model, verbose=self.verbose)\n", " self.decoder_model_ = BaseWrapper(decoder_model, verbose=self.verbose)\n", "\n", " return autoencoder_model\n", " \n", " def _initialize(self, X, y=None):\n", " X, _ = super()._initialize(X=X, y=y)\n", " # since encoder_model_ and decoder_model_ share layers (and their weights)\n", " # X_tf here come from random weights, but we only use it to initialize our models\n", " X_tf = self.encoder_model_.initialize(X).predict(X)\n", " self.decoder_model_.initialize(X_tf)\n", " return X, X\n", "\n", " def initialize(self, X):\n", " self._initialize(X=X, y=X)\n", " return self\n", "\n", " def fit(self, X, *, sample_weight=None) -> \"AutoEncoder\":\n", " super().fit(X=X, y=X, sample_weight=sample_weight)\n", " # at this point, encoder_model_ and decoder_model_\n", " # are both \"fitted\" because they share layers w/ model_\n", " # which is fit in the above call\n", " return self\n", "\n", " def score(self, X) -> float:\n", " # Note: we use 1-MSE as the score\n", " # With MSE, \"larger is better\", but Scikit-Learn\n", " # always maximizes the score (e.g. in GridSearch)\n", " return 1 - mean_squared_error(self.predict(X), X)\n", "\n", " def transform(self, X) -> np.ndarray:\n", " X: np.ndarray = self.feature_encoder_.transform(X)\n", " return self.encoder_model_.predict(X)\n", "\n", " def inverse_transform(self, X_tf: np.ndarray):\n", " X: np.ndarray = self.decoder_model_.predict(X_tf)\n", " return self.feature_encoder_.inverse_transform(X)" ] }, { "cell_type": "markdown", "id": "d68e18e7", "metadata": {}, "source": [ "Next, we wrap the Keras Model with Scikeras. Note that for our encoder/decoder estimators, we do not need to provide a loss function since no training will be done.\n", "We do however need to have the `fit_model` and `encoding_dim` so that these will be settable by `BaseWrapper.set_params`." ] }, { "cell_type": "code", "execution_count": 6, "id": "d118b58e", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:38.463730Z", "iopub.status.busy": "2022-10-14T16:50:38.462599Z", "iopub.status.idle": "2022-10-14T16:50:38.467264Z", "shell.execute_reply": "2022-10-14T16:50:38.466747Z" } }, "outputs": [], "source": [ "autoencoder = AutoEncoder(\n", " loss=\"binary_crossentropy\",\n", " encoding_dim=32,\n", " random_state=0,\n", " epochs=5,\n", " verbose=False,\n", " optimizer=\"adam\",\n", ")" ] }, { "cell_type": "markdown", "id": "c7e11dc8", "metadata": {}, "source": [ "## 4. Training\n", "\n", "To train the model, we pass the input images as both the features and the target.\n", "This will train the layers to compress the data as accurately as possible between the encoder and decoder.\n", "Note that we only pass the `X` parameter, since we defined the mapping `y=X` in `KerasTransformer.fit` above." ] }, { "cell_type": "code", "execution_count": 7, "id": "820ea287", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:50:38.471704Z", "iopub.status.busy": "2022-10-14T16:50:38.470591Z", "iopub.status.idle": "2022-10-14T16:51:02.857358Z", "shell.execute_reply": "2022-10-14T16:51:02.856591Z" } }, "outputs": [], "source": [ "_ = autoencoder.fit(X=x_train)" ] }, { "cell_type": "markdown", "id": "8f25c617", "metadata": {}, "source": [ "Next, we round trip the test dataset and explore the performance of the autoencoder." ] }, { "cell_type": "code", "execution_count": 8, "id": "143f25d8", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:02.861109Z", "iopub.status.busy": "2022-10-14T16:51:02.860739Z", "iopub.status.idle": "2022-10-14T16:51:04.270287Z", "shell.execute_reply": "2022-10-14T16:51:04.269457Z" } }, "outputs": [], "source": [ "roundtrip_imgs = autoencoder.inverse_transform(autoencoder.transform(x_test))" ] }, { "cell_type": "markdown", "id": "06d46f1c", "metadata": {}, "source": [ "## 5. Explore Results\n", "\n", "Let's compare our inputs to lossy decoded outputs:" ] }, { "cell_type": "code", "execution_count": 9, "id": "11cb6f22", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:04.274317Z", "iopub.status.busy": "2022-10-14T16:51:04.273976Z", "iopub.status.idle": "2022-10-14T16:51:06.274050Z", "shell.execute_reply": "2022-10-14T16:51:06.273262Z" } }, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "\n", "\n", "n = 10 # How many digits we will display\n", "plt.figure(figsize=(20, 4))\n", "for i in range(n):\n", " # Display original\n", " ax = plt.subplot(2, n, i + 1)\n", " plt.imshow(x_test[i].reshape(28, 28))\n", " plt.gray()\n", " ax.get_xaxis().set_visible(False)\n", " ax.get_yaxis().set_visible(False)\n", "\n", " # Display reconstruction\n", " ax = plt.subplot(2, n, i + 1 + n)\n", " plt.imshow(roundtrip_imgs[i].reshape(28, 28))\n", " plt.gray()\n", " ax.get_xaxis().set_visible(False)\n", " ax.get_yaxis().set_visible(False)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "99cdc8f1", "metadata": {}, "source": [ "What about the compression? Let's check the sizes of the arrays." ] }, { "cell_type": "code", "execution_count": 10, "id": "700216c9", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:06.279548Z", "iopub.status.busy": "2022-10-14T16:51:06.277982Z", "iopub.status.idle": "2022-10-14T16:51:06.975837Z", "shell.execute_reply": "2022-10-14T16:51:06.973152Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "x_test size (in MB): 29.91\n", "encoded_imgs size (in MB): 1.22\n", "Compression ratio: 1/25\n" ] } ], "source": [ "encoded_imgs = autoencoder.transform(x_test)\n", "print(f\"x_test size (in MB): {x_test.nbytes/1024**2:.2f}\")\n", "print(f\"encoded_imgs size (in MB): {encoded_imgs.nbytes/1024**2:.2f}\")\n", "cr = round((encoded_imgs.nbytes/x_test.nbytes), 2)\n", "print(f\"Compression ratio: 1/{1/cr:.0f}\")" ] }, { "cell_type": "markdown", "id": "25a30862", "metadata": {}, "source": [ "## 6. Deep AutoEncoder" ] }, { "cell_type": "markdown", "id": "7d92a2f2", "metadata": {}, "source": [ "We can easily expand our model to be a deep autoencoder by adding some hidden layers. All we have to do is add a parameter `hidden_layer_sizes` and use it in `_keras_build_fn` to build hidden layers.\n", "For simplicity, we use a single `hidden_layer_sizes` parameter and mirror it across the encoding layers and decoding layers, but there is nothing forcing us to build symetrical models." ] }, { "cell_type": "code", "execution_count": 11, "id": "ff82ac94", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:06.982562Z", "iopub.status.busy": "2022-10-14T16:51:06.980558Z", "iopub.status.idle": "2022-10-14T16:51:06.993827Z", "shell.execute_reply": "2022-10-14T16:51:06.993131Z" } }, "outputs": [], "source": [ "from typing import List\n", "\n", "\n", "class DeepAutoEncoder(AutoEncoder):\n", " \"\"\"A class that enables transform and fit_transform.\n", " \"\"\"\n", " \n", " def _keras_build_fn(self, encoding_dim: int, hidden_layer_sizes: List[str], meta: Dict[str, Any]):\n", " n_features_in = meta[\"n_features_in_\"]\n", "\n", " encoder_input = keras.Input(shape=(n_features_in,))\n", " x = encoder_input\n", " for layer_size in hidden_layer_sizes:\n", " x = keras.layers.Dense(layer_size, activation='relu')(x)\n", " encoder_output = keras.layers.Dense(encoding_dim, activation='relu')(x)\n", " encoder_model = keras.Model(encoder_input, encoder_output)\n", "\n", " decoder_input = keras.Input(shape=(encoding_dim,))\n", " x = decoder_input\n", " for layer_size in reversed(hidden_layer_sizes):\n", " x = keras.layers.Dense(layer_size, activation='relu')(x)\n", " decoder_output = keras.layers.Dense(n_features_in, activation='sigmoid', name=\"decoder\")(x)\n", " decoder_model = keras.Model(decoder_input, decoder_output)\n", "\n", " autoencoder_input = keras.Input(shape=(n_features_in,))\n", " encoded_img = encoder_model(autoencoder_input)\n", " reconstructed_img = decoder_model(encoded_img)\n", "\n", " autoencoder_model = keras.Model(autoencoder_input, reconstructed_img)\n", "\n", " self.encoder_model_ = BaseWrapper(encoder_model, verbose=self.verbose)\n", " self.decoder_model_ = BaseWrapper(decoder_model, verbose=self.verbose)\n", "\n", " return autoencoder_model" ] }, { "cell_type": "code", "execution_count": 12, "id": "19edd385", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:06.997219Z", "iopub.status.busy": "2022-10-14T16:51:06.996857Z", "iopub.status.idle": "2022-10-14T16:51:43.599219Z", "shell.execute_reply": "2022-10-14T16:51:43.598358Z" } }, "outputs": [], "source": [ "deep = DeepAutoEncoder(\n", " loss=\"binary_crossentropy\",\n", " encoding_dim=32,\n", " hidden_layer_sizes=[128],\n", " random_state=0,\n", " epochs=5,\n", " verbose=False,\n", " optimizer=\"adam\",\n", ")\n", "_ = deep.fit(X=x_train)" ] }, { "cell_type": "code", "execution_count": 13, "id": "f3e5835e", "metadata": { "execution": { "iopub.execute_input": "2022-10-14T16:51:43.603128Z", "iopub.status.busy": "2022-10-14T16:51:43.602884Z", "iopub.status.idle": "2022-10-14T16:51:45.175459Z", "shell.execute_reply": "2022-10-14T16:51:45.174803Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1-MSE for training set (higher is better)\n", "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "AutoEncoder: 0.9899\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Deep AutoEncoder: 0.9916\n" ] } ], "source": [ "print(\"1-MSE for training set (higher is better)\\n\")\n", "score = autoencoder.score(X=x_test)\n", "print(f\"AutoEncoder: {score:.4f}\")\n", "\n", "score = deep.score(X=x_test)\n", "print(f\"Deep AutoEncoder: {score:.4f}\")" ] }, { "cell_type": "markdown", "id": "8878487a", "metadata": {}, "source": [ "Suprisingly, our score got worse. It's possible that that because of the extra trainable variables, our deep model trains slower than our simple model.\n", "\n", "Check out the [Keras tutorial](https://blog.keras.io/building-autoencoders-in-keras.html) to see the difference after 100 epochs of training, as well as more architectures and applications for AutoEncoders!" ] } ], "metadata": { "jupytext": { "formats": "ipynb,md" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.14" } }, "nbformat": 4, "nbformat_minor": 5 }