The Math Behind LSTM

Author:Murphy  |  View: 22650  |  Time: 2025-03-22 21:51:36

Index

1: Introduction

2: The Architecture of LSTMs2.1: LSTM Gates2.2: Managing Information Flow

3: Mathematics Behind LSTMs3.1. Math in LSTM Cells:3.2: Gating Mechanisms3.3: Contrast with Basic RNN Cell Operations

4: Building an LSTM from Scratch in Python4.1: Imports and Initial Setup4:2: LSTM Class4.3: Training and Validation4.4: Data Preprocessing4.5: Model Training

5: Advanced LSTM Models5.1: Bidirectional LSTMs5.2: Stacked LSTMs5.3: Peephole Connections

6: Conclusion

References


1: Introduction

In our last discussion on Recurrent Neural Networks (RNNs), we looked at how their design lets them process sequences effectively. This makes them perfect for tasks where the sequence and context of data matter, like analyzing time-series data or processing language.

Now, we're moving on to a type of RNN that tackles one of the big challenges traditional RNNs face: managing long-term data dependencies. These are the Long Short-Term Memory Networks (LSTMs), which are a step up in complexity. They use a system of gates that control how information flows through the network – deciding what to keep and what to forget over extended sequences.

LSTMs are incredibly useful across a wide range of AI applications. Whether it's forecasting stock prices or powering sophisticated translation tools, they provide a strong framework that enhances the accuracy and reliability of models that handle sequential data.

By the end of this article, you'll have a solid grasp of how LSTMs are built and the math that powers them. You'll also learn how to code these powerful networks from scratch in Python. Let's dive in!

2: The Architecture of LSTMs

The heart of an LSTM is its cell, which includes several gates that regulate how information moves within it. The input, output, and forget gates help LSTMs excel in tasks where the data points are spread out over time.

2.1: LSTM Gates

LSTM Architecture – Image by Author

Input Gate This gate decides how much new information to keep in the cell state. It uses a sigmoid function to filter the incoming data, determining which values need to be updated.

Forget Gate This gate determines how much old information to remove from the cell state. It also employs a sigmoid function to review the current inputs and past outputs, deciding whether to retain or discard previous states. This helps keep the network free from unnecessary or irrelevant data.

Output Gate This gate manages what the next hidden layer or the output will receive from the cell state. It applies a tanh function to the filtered cell state to scale the values, deciding based on the sigmoid's state what should be passed on to the output.

2.2: Managing Information Flow

The strength of an LSTM in holding onto important information for long periods comes from its clever design. The cell state acts like a conveyor belt, carrying essential information throughout the sequence processing. Each gate in the LSTM can either add or remove information from the cell state, finely tuning the system to handle long-term dependencies effectively.

The use of sigmoid functions in these gates ensures that updates are precise and controlled. This is crucial for addressing the vanishing gradient problem, which can cause gradients to shrink too much during training, making it hard for the network to learn from earlier data points. Thanks to this architecture, LSTMs can learn and remember information across much longer sequences than traditional RNNs, which often lose track of earlier inputs as they move through the sequence.

3: Mathematics Behind LSTMs

LSTM networks are enhanced with a complex system that includes multiple gates, each responsible for regulating the flow of information in a manner that traditional RNNs are not capable of. Here, we'll explore the mathematical operations that make this possible, delve into the formulas governing the gates, and contrast these with the operations in a basic RNN cell to underscore the improvements LSTMs offer.

3.1. Math in LSTM Cells:

An LSTM cell contains four main components that work together to update and maintain the cell state: the forget gate _ft​, input gate _it​, cell state _Ct​, and output gate _ot​. Here's how each component is calculated at time step t:

Forget Gate _f_t_This gate decides which parts of the previous cell state C__(t−_1)​ are to be forgotten.​

Forget Gate Formula – Image by Author

Where:

  • σ is the sigmoid function, which outputs a value between 0 and 1. This value multiplies the previous cell state C(t_−1​), effectively deciding the extent to which each component of the cell state is remembered or forgotten. A value close to 0 means "forget it almost completely," while a value close to 1 means "retain it entirely."
  • _Wf is the weight matrix for the forget gate.
  • h(t_−1)​ is the output from the previous time step.
  • _xt is the current input.
  • _bf is the bias term for the forget gate.

Input Gate _it​ and Candidate Cell State _C_t_The input gate determines which values will be updated in the cell state, while the candidate cell state _Ct​ represents a filtered version of the input data, prepared to be potentially added to the actual cell state.

Input Gate and Candidate Cell State Formulas – Image by Author

Where:

  • σ again represents the sigmoid function, controlling the input gate.
  • tanh⁡ is the hyperbolic tangent function, which outputs values between -1 and 1. It helps regulate the network's non-linear characteristics.
  • _W_i​ and W_C_​ are the weight matrices for the input gate and the candidate cell state, respectively.
  • _bi and _b_C_​ are the biases for the input gate and the candidate cell state.

Cell State Update _C_t_The cell state is an element-wise addition of the old state multiplied by the forget gate and the candidate state multiplied by the input gate.

Cell State Update Formula – Image by Author

The previous cell state C(t_−1​) multiplied by the forget gate output _ft​ determines how much the old state can retain. The candidate cell state _Ct​ multiplied by the input gate output _it​ determines how much of the new state to add.

Output Gate _ot​ and Output _h_t_Finally, the output gate controls the parts of the cell state that are output to the next layer or used in the final prediction. The output is calculated as follows:

Output Formula – Image by Author

Where:

  • σ is the sigmoid function used for the output gate.
  • tanh applied to _Ct​ scales the cell state values to be between -1 and 1.
  • _W_o​ is the weight matrix for the output gate, and b_o_​ is the bias.

3.2: Gating Mechanisms

The gating mechanisms of LSTMs are designed to combat the vanishing gradient problem by controlling the flow of gradients during backpropagation. The gates use two primary functions to manage this flow: the sigmoid function and the hyperbolic tangent function (tanh). These functions are not just chosen arbitrarily; their mathematical properties are ideally suited to the tasks of gating and updating neural network cell states.

Sigmoid Function σ(x)This function outputs a value between 0 and 1, which is ideal for creating a gate-like behavior.

Sigmoid Function – Image by Author

The sigmoid function outputs values between 0 and 1. This characteristic is ideal for gating because a value close to 0 can block the component (acting like a gate is closed), and a value close to 1 can allow the component to pass through (acting like a gate is open).

Sigmoid Plot – Image by Author

In the context of LSTMs, the sigmoid function is used in the forget gate to decide which parts of the previous cell state to keep or discard, and in the input and output gates to regulate the contribution of new input data and the outputting of cell state information respectively.

Tanh Function ⁡tanh(x)This function outputs a value between -1 and 1, which helps keep the network's activations normalized.

tanh formula – Image by Author

The tanh function outputs values between -1 and 1. This range is beneficial for neural network activations because it centers the output, helping to maintain the mean of the activations throughout the network close to zero, which in turn aids in faster convergence during training.

tanh Plot – Image by Author

The tanh function is primarily used in two places within the LSTM cell. First, it helps to create a candidate cell state, which is a filtered version of the input data, potentially added to the cell state if the input gate allows it. Second, it transforms the final cell state output to be within the range of -1 to 1 before being modulated by the output gate.

If you are interested in learning more about these and other activation functions, take a look at this previous article:

The Math Behind Neural Networks

3.3: Contrast with Basic RNN Cell Operations

In a basic RNN cell, the hidden state _ht​ is updated using a simple formula without any gating mechanisms:

LSTM's hidden state formula – Image by Author

This update rule can lead to the vanishing gradient problem because the derivative of the tanh function can be very small, leading to gradients that diminish exponentially as they are propagated backward through time.

In contrast, LSTM's use of gates regulates the updates and mitigates the risk of vanishing gradients by selectively updating the cell state. This controlled updating process ensures that important information is retained over longer sequences, significantly enhancing the memory capabilities of the network.

4: Building an LSTM from Scratch in Python

In this section, we'll break down the implementation of an LSTM in Python, step by step, referring back to the mathematical foundations and concepts covered earlier in the article. We will train our made-from-scratch model on the Google stock data. The dataset was retrieved from Kaggle, which is free to use for commercial use. While the code will give you the option to insert your API key in case you would like to test it on a different dataset, getting your API key is not necessary, and you can directly download the dataset from my GitHub repo. Make sure to download the LSTM folder containing demo.ipynb, src, and data:

models-from-scratch-python/LSTM at main · cristianleoo/models-from-scratch-python

4.1: Imports and Initial Setup

import numpy as np
import pandas as pd

from src.model import WeightInitializer
from src.trainer import PlotManager, EarlyStopping

numpy (np) and pandas (pd) Used for all array and data frame operations, which are fundamental in any kind of numerical computation and particularly in the implementation of neural networks.

The classes WeightInitializer, PlotManager, and EarlyStopping are custom classes that I built from scratch, but I won't dive deep into their explanation, as I explained them in another article. If you are not familiar with those, I will link you to the articles where you can gain a better understanding.

WeightInitializer

class WeightInitializer:
    def __init__(self, method='random'):
        self.method = method

    def initialize(self, shape):
        if self.method == 'random':
            return np.random.randn(*shape)
        elif self.method == 'xavier':
            return np.random.randn(*shape) / np.sqrt(shape[0])
        elif self.method == 'he':
            return np.random.randn(*shape) * np.sqrt(2 / shape[0])
        elif self.method == 'uniform':
            return np.random.uniform(-1, 1, shape)
        else:
            raise ValueError(f'Unknown initialization method: {self.method}')

WeightInitializer is a custom class that handles the initialization of weights. This is crucial as different initialization methods can significantly affect the convergence behavior of an LSTM. To learn more about weight initialization and how it can help achieve a better and faster convergence, take a look at this article:

The Math Behind Fine-Tuning Deep Neural Networks

PlotManager

class PlotManager:
    def __init__(self):
        self.fig, self.ax = plt.subplots(3, 1, figsize=(6, 4))

    def plot_losses(self, train_losses, val_losses):
        self.ax.plot(train_losses, label='Training Loss')
        self.ax.plot(val_losses, label='Validation Loss')
        self.ax.set_title('Training and Validation Losses')
        self.ax.set_xlabel('Epoch')
        self.ax.set_ylabel('Loss')
        self.ax.legend()

    def show_plots(self):
        plt.tight_layout()

Utility class from src.trainer for managing plots, which will enable us to plot train and validation loss.

EarlyStopping

class EarlyStopping:
    """
    Early stopping to stop the training when the loss does not improve after

    Args:
    -----
        patience (int): Number of epochs to wait before stopping the training.
        verbose (bool): If True, prints a message for each epoch where the loss
                        does not improve.
        delta (float): Minimum change in the monitored quantity to qualify as an improvement.
    """
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_loss):
        """
        Determines if the model should stop training.

        Args:
            val_loss (float): The loss of the model on the validation set.
        """
        score = -val_loss

        if self.best_score is None:
            self.best_score = score

        elif score < self.best_score + self.delta:
            self.counter += 1

            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

Utility class from src.trainer for handling early stopping during training to prevent overfitting. You can learn more about EarlyStopping, and how it's functionality is extremely useful for deep neural networks in this article:

The Math Behind Deep CNN – AlexNet

4:2: LSTM Class

Let's first take a look at what the whole class looks like, and then break it down into more manageable steps:

class LSTM:
    """
    Long Short-Term Memory (LSTM) network.

    Parameters:
    - input_size: int, dimensionality of input space
    - hidden_size: int, number of LSTM units
    - output_size: int, dimensionality of output space
    - init_method: str, weight initialization method (default: 'xavier')
    """
    def __init__(self, input_size, hidden_size, output_size, init_method='xavier'):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.weight_initializer = WeightInitializer(method=init_method)

        # Initialize weights
        self.wf = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
        self.wi = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
        self.wo = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
        self.wc = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))

        # Initialize biases
        self.bf = np.zeros((hidden_size, 1))
        self.bi = np.zeros((hidden_size, 1))
        self.bo = np.zeros((hidden_size, 1))
        self.bc = np.zeros((hidden_size, 1))

        # Initialize output layer weights and biases
        self.why = self.weight_initializer.initialize((output_size, hidden_size))
        self.by = np.zeros((output_size, 1))

    @staticmethod
    def sigmoid(z):
        """
        Sigmoid activation function.

        Parameters:
        - z: np.ndarray, input to the activation function

        Returns:
        - np.ndarray, output of the activation function
        """
        return 1 / (1 + np.exp(-z))

    @staticmethod
    def dsigmoid(y):
        """
        Derivative of the sigmoid activation function.

        Parameters:
        - y: np.ndarray, output of the sigmoid activation function

        Returns:
        - np.ndarray, derivative of the sigmoid function
        """
        return y * (1 - y)

    @staticmethod
    def dtanh(y):
        """
        Derivative of the hyperbolic tangent activation function.

        Parameters:
        - y: np.ndarray, output of the hyperbolic tangent activation function

        Returns:
        - np.ndarray, derivative of the hyperbolic tangent function
        """
        return 1 - y * y

    def forward(self, x):
        """
        Forward pass through the LSTM network.

        Parameters:
        - x: np.ndarray, input to the network

        Returns:
        - np.ndarray, output of the network
        - list, caches containing intermediate values for backpropagation
        """
        caches = []
        h_prev = np.zeros((self.hidden_size, 1))
        c_prev = np.zeros((self.hidden_size, 1))
        h = h_prev
        c = c_prev

        for t in range(x.shape[0]):
            x_t = x[t].reshape(-1, 1)
            combined = np.vstack((h_prev, x_t))

            f = self.sigmoid(np.dot(self.wf, combined) + self.bf)
            i = self.sigmoid(np.dot(self.wi, combined) + self.bi)
            o = self.sigmoid(np.dot(self.wo, combined) + self.bo)
            c_ = np.tanh(np.dot(self.wc, combined) + self.bc)

            c = f * c_prev + i * c_
            h = o * np.tanh(c)

            cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h)
            caches.append(cache)

            h_prev, c_prev = h, c

        y = np.dot(self.why, h) + self.by
        return y, caches

    def backward(self, dy, caches, clip_value=1.0):
        """
        Backward pass through the LSTM network.

        Parameters:
        - dy: np.ndarray, gradient of the loss with respect to the output
        - caches: list, caches from the forward pass
        - clip_value: float, value to clip gradients to (default: 1.0)

        Returns:
        - tuple, gradients of the loss with respect to the parameters
        """
        dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)]
        dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)]
        dWhy = np.zeros_like(self.why)
        dby = np.zeros_like(self.by)

        # Ensure dy is reshaped to match output size
        dy = dy.reshape(self.output_size, -1)
        dh_next = np.zeros((self.hidden_size, 1))  # shape must match hidden_size
        dc_next = np.zeros_like(dh_next)

        for cache in reversed(caches):
            h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache

            # Add gradient from next step to current output gradient
            dh = np.dot(self.why.T, dy) + dh_next
            dc = dc_next + (dh * o * self.dtanh(np.tanh(c)))

            df = dc * c_prev * self.dsigmoid(f)
            di = dc * c_ * self.dsigmoid(i)
            do = dh * self.dtanh(np.tanh(c))
            dc_ = dc * i * self.dtanh(c_)

            dcombined_f = np.dot(self.wf.T, df)
            dcombined_i = np.dot(self.wi.T, di)
            dcombined_o = np.dot(self.wo.T, do)
            dcombined_c = np.dot(self.wc.T, dc_)

            dcombined = dcombined_f + dcombined_i + dcombined_o + dcombined_c
            dh_next = dcombined[:self.hidden_size]
            dc_next = f * dc

            dWf += np.dot(df, combined.T)
            dWi += np.dot(di, combined.T)
            dWo += np.dot(do, combined.T)
            dWc += np.dot(dc_, combined.T)

            dbf += df.sum(axis=1, keepdims=True)
            dbi += di.sum(axis=1, keepdims=True)
            dbo += do.sum(axis=1, keepdims=True)
            dbc += dc_.sum(axis=1, keepdims=True)

        dWhy += np.dot(dy, h.T)
        dby += dy

        gradients = (dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby)

        # Gradient clipping
        for i in range(len(gradients)):
            np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])

        return gradients

    def update_params(self, grads, learning_rate):
        """
        Update the parameters of the network using the gradients.

        Parameters:
        - grads: tuple, gradients of the loss with respect to the parameters
        - learning_rate: float, learning rate
        """
        dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby = grads

        self.wf -= learning_rate * dWf
        self.wi -= learning_rate * dWi
        self.wo -= learning_rate * dWo
        self.wc -= learning_rate * dWc

        self.bf -= learning_rate * dbf
        self.bi -= learning_rate * dbi
        self.bo -= learning_rate * dbo
        self.bc -= learning_rate * dbc

        self.why -= learning_rate * dWhy
        self.by -= learning_rate * dby

InitializationThe __init__ method initializes an LSTM instance with specified sizes for input, hidden, and output layers, and selects a method for weight initialization.

def __init__(self, input_size, hidden_size, output_size, init_method='xavier'):
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.weight_initializer = WeightInitializer(method=init_method)

    # Initialize weights
    self.wf = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
    self.wi = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
    self.wo = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))
    self.wc = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))

    # Initialize biases
    self.bf = np.zeros((hidden_size, 1))
    self.bi = np.zeros((hidden_size, 1))
    self.bo = np.zeros((hidden_size, 1))
    self.bc = np.zeros((hidden_size, 1))

    # Initialize output layer weights and biases
    self.why = self.weight_initializer.initialize((output_size, hidden_size))
    self.by = np.zeros((output_size, 1))

The weights are initialized for the gates (forget wf, input wi, output wo, and cell wc) and for connecting the last hidden state to the output (why). Xavier initialization is often chosen as it's a good default for maintaining the variance of activations across layers.

Biases for all gates and the output layer are initialized to zero. This is a common practice, although sometimes small constants are added to avoid dead neurons at the start.

Forward Pass Method

def forward(self, x):
    caches = []
    h_prev = np.zeros((self.hidden_size, 1))
    c_prev = np.zeros((self.hidden_size, 1))
    h = h_prev
    c = c_prev

    for t in range(x.shape[0]):
        x_t = x[t].reshape(-1, 1)
        combined = np.vstack((h_prev, x_t))

        f = self.sigmoid(np.dot(self.wf, combined) + self.bf)
        i = self.sigmoid(np.dot(self.wi, combined) + self.bi)
        o = self.sigmoid(np.dot(self.wo, combined) + self.bo)
        c_ = np.tanh(np.dot(self.wc, combined) + self.bc)

        c = f * c_prev + i * c_
        h = o * np.tanh(c)

        cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h)
        caches.append(cache)

        h_prev, c_prev = h, c

    y = np.dot(self.why, h) + self.by
    return y, caches

We start by setting the previous hidden state h_prev and cell state c_prev to zero, which is typical for the first timestep.

def forward(self, x):
    caches = []
    h_prev = np.zeros((self.hidden_size, 1))
    c_prev = np.zeros((self.hidden_size, 1))
    h = h_prev
    c = c_prev

The input x is processed timestep by timestep, where each timestep updates the gates' activations, the cell state, and the hidden state.

for t in range(x.shape[0]):
    x_t = x[t].reshape(-1, 1)
    combined = np.vstack((h_prev, x_t))

At each time step, the input and the previous hidden state are stacked vertically to form a single combined input for matrix operations. This is crucial for performing the linear transformations efficiently in one go.

f = self.sigmoid(np.dot(self.wf, combined) + self.bf)
    i = self.sigmoid(np.dot(self.wi, combined) + self.bi)
    o = self.sigmoid(np.dot(self.wo, combined) + self.bo)
    c_ = np.tanh(np.dot(self.wc, combined) + self.bc)

    c = f * c_prev + i * c_
    h = o * np.tanh(c)

Each gate (forget, input, output) computes its activation using a sigmoid function, influencing how the cell state and the hidden state are updated.

Here, the forget gate (f) determines the amount of the previous cell state to retain. The input gate (i) decides how much of the new candidate cell state (c_) to add. Finally, the output gate (o) calculates what portion of the cell state to output as the hidden state.

The cell state is updated as a weighted sum of the previous state and the new candidate state. The hidden state is derived by passing the updated cell state through a tanh function and then gating it with the output gate.

cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h)
caches.append(cache)

We store relevant values needed for backpropagation in cache. This includes states, gate activations, and inputs.

y = np.dot(self.why, h) + self.by

Finally, the output y is computed as a linear transformation of the last hidden state. The method returns both the output and the cached values for use during backpropagation.

Backward Pass Method

def backward(self, dy, caches, clip_value=1.0):
    dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)]
    dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)]
    dWhy = np.zeros_like(self.why)
    dby = np.zeros_like(self.by)

    # Ensure dy is reshaped to match output size
    dy = dy.reshape(self.output_size, -1)
    dh_next = np.zeros((self.hidden_size, 1))  # shape must match hidden_size
    dc_next = np.zeros_like(dh_next)

    for cache in reversed(caches):
        h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache

        # Add gradient from next step to current output gradient
        dh = np.dot(self.why.T, dy) + dh_next
        dc = dc_next + (dh * o * self.dtanh(np.tanh(c)))

        df = dc * c_prev * self.dsigmoid(f)
        di = dc * c_ * self.dsigmoid(i)
        do = dh * self.dtanh(np.tanh(c))
        dc_ = dc * i * self.dtanh(c_)

        dcombined_f = np.dot(self.wf.T, df)
        dcombined_i = np.dot(self.wi.T, di)
        dcombined_o = np.dot(self.wo.T, do)
        dcombined_c = np.dot(self.wc.T, dc_)

        dcombined = dcombined_f + dcombined_i + dcombined_o + dcombined_c
        dh_next = dcombined[:self.hidden_size]
        dc_next = f * dc

        dWf += np.dot(df, combined.T)
        dWi += np.dot(di, combined.T)
        dWo += np.dot(do, combined.T)
        dWc += np.dot(dc_, combined.T)

        dbf += df.sum(axis=1, keepdims=True)
        dbi += di.sum(axis=1, keepdims=True)
        dbo += do.sum(axis=1, keepdims=True)
        dbc += dc_.sum(axis=1, keepdims=True)

    dWhy += np.dot(dy, h.T)
    dby += dy

    gradients = (dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby)

    # Gradient clipping
    for i in range(len(gradients)):
        np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])

    return gradients

This method is used to calculate gradients of the loss function with respect to the weights and biases of the LSTM. These gradients are necessary for updating the model's parameters during training.

def backward(self, dy, caches, clip_value=1.0):
    dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)]
    dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)]
    dWhy = np.zeros_like(self.why)
    dby = np.zeros_like(self.by)

All gradients for the weights (dWf, dWi, dWo, dWc, dWhy) and biases (dbf, dbi, dbo, dbc, dby) are initialized to zero. This is necessary because the gradients are accumulated over each timestep in the sequence.

dy = dy.reshape(self.output_size, -1)
dh_next = np.zeros((self.hidden_size, 1))
dc_next = np.zeros_like(dh_next)

Here, we ensure that dy is in the correct shape for matrix operations. dh_next and dc_next store gradients are flowing back from later timesteps.

for cache in reversed(caches):
        h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache

The LSTM state and gate activations for each timestep are retrieved from cache. Processing starts from the last timestep and moves backward (reversed(caches)), which is essential for correctly applying the chain rule in recurrent neural networks (Backpropagation Through Time – BPTT).

        dh = np.dot(self.why.T, dy) + dh_next
        dc = dc_next + (dh * o * self.dtanh(np.tanh(c)))
        df = dc * c_prev * self.dsigmoid(f)
        di = dc * c_ * self.dsigmoid(i)
        do = dh * self.dtanh(np.tanh(c))
        dc_ = dc * i * self.dtanh(c_)

dh and dc are gradients of the loss with respect to the hidden state and cell state. Gradients for each gate (df, di, do) and the candidate cell state (dc_) are calculated using the chain rule, involving derivatives of the sigmoid (dsigmoid) and tanh (dtanh) functions, which were discussed in the gating mechanisms.

        dWf += np.dot(df, combined.T)
        dWi += np.dot(di, combined.T)
        dWo += np.dot(do, combined.T)
        dWc += np.dot(dc_, combined.T)
        dbf += df.sum(axis=1, keepdims=True)
        dbi += di.sum(axis=1, keepdims=True)
        dbo += do.sum(axis=1, keepdims=True)
        dbc += dc_.sum(axis=1, keepdims=True)

These lines accumulate the gradients over all timesteps for each weight and bias.

for i in range(len(gradients)):
    np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])

To prevent exploding gradients, we clip the gradients to a specified range (clip_value), which is a common practice in training RNNs.

Parameter Update Method

def update_params(self, grads, learning_rate):
    dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby = grads
    ...
    self.wf -= learning_rate * dWf
    ...

Each weight and bias is updated by subtracting a fraction (learning_rate) of the corresponding gradient. This step adjusts the model parameters to minimize the loss function.

4.3: Training and Validation

class LSTMTrainer:
    """
    Trainer for the LSTM network.

    Parameters:
    - model: LSTM, the LSTM network to train
    - learning_rate: float, learning rate for the optimizer
    - patience: int, number of epochs to wait before early stopping
    - verbose: bool, whether to print training information
    - delta: float, minimum change in validation loss to qualify as an improvement
    """
    def __init__(self, model, learning_rate=0.01, patience=7, verbose=True, delta=0):
        self.model = model
        self.learning_rate = learning_rate
        self.train_losses = []
        self.val_losses = []
        self.early_stopping = EarlyStopping(patience, verbose, delta)

    def train(self, X_train, y_train, X_val=None, y_val=None, epochs=10, batch_size=1, clip_value=1.0):
        """
        Train the LSTM network.

        Parameters:
        - X_train: np.ndarray, training data
        - y_train: np.ndarray, training labels
        - X_val: np.ndarray, validation data
        - y_val: np.ndarray, validation labels
        - epochs: int, number of training epochs
        - batch_size: int, size of mini-batches
        - clip_value: float, value to clip gradients to
        """
        for epoch in range(epochs):
            epoch_losses = []
            for i in range(0, len(X_train), batch_size):
                batch_X = X_train[i:i + batch_size]
                batch_y = y_train[i:i + batch_size]
                losses = []

                for x, y_true in zip(batch_X, batch_y):
                    y_pred, caches = self.model.forward(x)
                    loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))
                    losses.append(loss)

                    # Backpropagation to get gradients
                    dy = y_pred - y_true.reshape(-1, 1)
                    grads = self.model.backward(dy, caches, clip_value=clip_value)
                    self.model.update_params(grads, self.learning_rate)

                batch_loss = np.mean(losses)
                epoch_losses.append(batch_loss)

            avg_epoch_loss = np.mean(epoch_losses)
            self.train_losses.append(avg_epoch_loss)

            if X_val is not None and y_val is not None:
                val_loss = self.validate(X_val, y_val)
                self.val_losses.append(val_loss)
                print(f'Epoch {epoch + 1}/{epochs} - Loss: {avg_epoch_loss:.5f}, Val Loss: {val_loss:.5f}')

                # Check early stopping condition
                self.early_stopping(val_loss)
                if self.early_stopping.early_stop:
                    print("Early stopping")
                    break
            else:
                print(f'Epoch {epoch + 1}/{epochs} - Loss: {avg_epoch_loss:.5f}')

    def compute_loss(self, y_pred, y_true):
        """
        Compute mean squared error loss.
        """
        return np.mean((y_pred - y_true) ** 2)

    def validate(self, X_val, y_val):
        """
        Validate the model on a separate set of data.
        """
        val_losses = []
        for x, y_true in zip(X_val, y_val):
            y_pred, _ = self.model.forward(x)
            loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))
            val_losses.append(loss)
        return np.mean(val_losses)

The trainer orchestrates the training process over multiple epochs, handling batches of data, and optionally validating the model.

for epoch in range(epochs):
      ...
      for i in range(0, len(X_train), batch_size):
          ...
          for x, y_true in zip(batch_X, batch_y):
              y_pred, caches = self.model.forward(x)
              ...

Each batch of data is fed through the model. The forward pass generates predictions and caches intermediate values for backpropagation.

dy = y_pred - y_true.reshape(-1, 1)
grads = self.model.backward(dy, caches, clip_value=clip_value)
self.model.update_params(grads, self.learning_rate)

After calculating the loss, the gradient with respect to the prediction error (dy) is used to perform backpropagation. The resulting gradients are used to update the model parameters.

print(f'Epoch {epoch + 1}/{epochs} - Loss: {avg_epoch_loss:.5f}')

Training progress is logged to help monitor the model's performance over time.

4.4: Data Preprocessing

class TimeSeriesDataset:
    """
    Dataset class for time series data.

    Parameters:
    - ticker: str, stock ticker symbol
    - start_date: str, start date for data retrieval
    - end_date: str, end date for data retrieval
    - look_back: int, number of previous time steps to include in each sample
    - train_size: float, proportion of data to use for training
    """
    def __init__(self, start_date, end_date, look_back=1, train_size=0.67):
        self.start_date = start_date
        self.end_date = end_date
        self.look_back = look_back
        self.train_size = train_size

    def load_data(self):
        """
        Load stock data.

        Returns:
        - np.ndarray, training data
        - np.ndarray, testing data
        """
        df = pd.read_csv('data/google.csv')
        df = df[(df['Date'] >= self.start_date) & (df['Date'] <= self.end_date)]
        df = df.sort_index()
        df = df.loc[self.start_date:self.end_date]
        df = df[['Close']].astype(float)  # Use closing price
        df = self.MinMaxScaler(df.values)  # Convert DataFrame to numpy array
        train_size = int(len(df) * self.train_size)
        train, test = df[0:train_size,:], df[train_size:len(df),:]
        return train, test

    def MinMaxScaler(self, data):
        """
        Min-max scaling of the data.

        Parameters:
        - data: np.ndarray, input data
        """
        numerator = data - np.min(data, 0)
        denominator = np.max(data, 0) - np.min(data, 0)
        return numerator / (denominator + 1e-7)

    def create_dataset(self, dataset):
        """
        Create the dataset for time series prediction.

        Parameters:
        - dataset: np.ndarray, input data

        Returns:
        - np.ndarray, input data
        - np.ndarray, output data
        """
        dataX, dataY = [], []
        for i in range(len(dataset)-self.look_back):
            a = dataset[i:(i + self.look_back), 0]
            dataX.append(a)
            dataY.append(dataset[i + self.look_back, 0])
        return np.array(dataX), np.array(dataY)

    def get_train_test(self):
        """
        Get the training and testing data.

        Returns:
        - np.ndarray, training input
        - np.ndarray, training output
        - np.ndarray, testing input
        - np.ndarray, testing output
        """
        train, test = self.load_data()
        trainX, trainY = self.create_dataset(train)
        testX, testY = self.create_dataset(test)
        return trainX, trainY, testX, testY

This class handles fetching and preprocessing data into a format suitable for training the LSTM, including scaling and splitting into training and test sets.

4.5: Model Training

Now let's leverage all the code defined above to load the dataset, preprocess it, and train our LSTM model.

First, let's load the dataset:

# Instantiate the dataset
dataset = TimeSeriesDataset( '2010-1-1', '2020-12-31', look_back=1)
trainX, trainY, testX, testY = dataset.get_train_test()

In this instance, it's configured to fetch historical data for Google (GOOGL) from Kaggle, spanning from January 1, 2010, to December 31, 2020.

look_back=1: This parameter sets the number of past time steps to include in each input sample. Here, each input sample will contain data from 1 previous time step, which means that the model will use data from one day to predict the next.

get_train_test(): This method processes the fetched data, normalizes it, and splits it into training and testing datasets. This is essential for training the model on one segment of the data and validating its performance on another to check for overfitting.

# Reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))

This reshaping step adjusts the data format to what the LSTM expects. LSTMs require input to be in the shape of [samples, time steps, features]. Here:

  • samples: the number of data points.
  • time steps: the number of time steps per sample (look_back).
  • features: the number of features per time step (in this case, 1, because we are probably looking at one dimension of data like closing price).
look_back = 1  # Number of previous time steps to include in each sample
hidden_size = 256  # Number of LSTM units
output_size = 1  # Dimensionality of the output space

lstm = LSTM(input_size=1, hidden_size=hidden_size, output_size=output_size)

In this code:

  • hidden_size : The number of LSTM units in the hidden layer, which is set to 256. This defines the capacity of the model, with more units potentially capturing more complex patterns but also requiring more computational power and data to train effectively.
  • output_size : The output dimensionality, which is 1 in this case, suggests the model predicts a single value per input sample, such as the next day's stock price.
trainer = LSTMTrainer(lstm, learning_rate=1e-3, patience=50, verbose=True, delta=0.001)
trainer.train(trainX, trainY, testX, testY, epochs=1000, batch_size=32)

Here we set the rate to 1e-3 (0.001). A learning rate that's too high can cause the model to converge too quickly to a suboptimal solution, and too low a rate can make the training process slow and possibly stuck. We also specified patience to 50, which will stop the model training if the validation loss doesn't improve for 50 epochs.

The train() method executes the training process over a specified number of epochs and batch sizes. During training, the model will print the model performance every 10 epochs, resulting in an output similar to this one:

Epoch 1/1000 - Loss: 0.25707, Val Loss: 0.43853
Epoch 11/1000 - Loss: 0.06463, Val Loss: 0.06056
Epoch 21/1000 - Loss: 0.05313, Val Loss: 0.02100
Epoch 31/1000 - Loss: 0.04862, Val Loss: 0.01134
Epoch 41/1000 - Loss: 0.04512, Val Loss: 0.00678
Epoch 51/1000 - Loss: 0.04234, Val Loss: 0.00395
Epoch 61/1000 - Loss: 0.04014, Val Loss: 0.00210
Epoch 71/1000 - Loss: 0.03840, Val Loss: 0.00095
Epoch 81/1000 - Loss: 0.03703, Val Loss: 0.00031
Epoch 91/1000 - Loss: 0.03595, Val Loss: 0.00004
Epoch 101/1000 - Loss: 0.03509, Val Loss: 0.00003
Epoch 111/1000 - Loss: 0.03442, Val Loss: 0.00021
Epoch 121/1000 - Loss: 0.03388, Val Loss: 0.00051
Epoch 131/1000 - Loss: 0.03346, Val Loss: 0.00090
Epoch 141/1000 - Loss: 0.03312, Val Loss: 0.00133
Early stopping

Lastly, let's plot train and validation loss to get a better sense of a possible convergence/divergence. We can achieve that using the following lines of code:

plot_manager = PlotManager()

# Inside your training loop
plot_manager.plot_losses(trainer.train_losses, trainer.val_losses)

# After your training loop
plot_manager.show_plots()

Which will plot a similar chart to the following:

Train and Validation Plot – Image by Author

From the graph, we can see that both train and validation drop quickly in the earlier epochs, which suggests our initialization technique (Xavier) may not be ideal for this purpose. Although early stopping is triggered after ~90 epochs achieving some impressive performances, we could try to decrease the learning rate and run it for more epochs. Moreover, we could try to use other techniques like learning rate schedulers or Adam optimization. You can find a comprehensive article about Adam optimization here:

The Math behind Adam Optimizer

5: Advanced LSTM Models

Let's briefly explore a few LSTM-inspired models that, depending on the task, might perform even better. I'll dive deeper into each model in separate articles, so don't forget to click the following button to stay updated.

5.1: Bidirectional LSTMs

This setup uses two separate LSTMs: one processes the input from start to finish (forward direction), and the other from finish to start (backward direction). This approach ensures that the network has comprehensive knowledge of the sequence from both directions at every point, which enhances the model's understanding of the context. It's particularly effective in applications like speech recognition where the surrounding context significantly impacts interpretation.

5.2: Stacked LSTMs

Adding depth to the model, stacked LSTMs consist of multiple layers of LSTM units stacked one after the other. This structure allows the model to learn at different levels of abstraction, with each layer processing and passing on its interpretation to the next. By capturing more nuanced features at various layers, stacked LSTMs can handle more complex tasks more effectively, offering a deeper and richer representation of the data.

5.3: Peephole Connections

Unlike standard LSTMs where the gates only interact with the previous hidden state and the current input, peephole connections allow the gates to also "peek" at the cell state. This added glimpse provides extra context, enhancing the ability of the gates to make more accurate decisions about how much information to retain, update, or forget. This can lead to more precise updates and improvements in tasks where long-term insights are crucial.

Conclusion

LSTMs play a crucial role in AI and machine learning, especially in fields like time-series analysis, natural language processing, and other areas where understanding the flow of time is essential. Their ability to retain and use past information for current tasks makes them extremely useful for making predictions, classifying data, and generating sequences where earlier inputs significantly impact later outputs.

The flexibility of LSTMs opens up a wealth of possibilities for innovation across different fields. By experimenting with various LSTM configurations and their applications, you can drive significant progress in both academic research and real-world applications. Whether it's enhancing speech recognition systems, building advanced predictive models for the financial sector, or developing responsive systems for medical diagnostics, LSTMs provide a robust set of tools that can significantly boost the intelligence and effectiveness of your solutions.

References

  1. Hochreiter, S., & Schmidhuber, J. (1997). Long Short-Term Memory. Neural Computation, 9(8), 1735–1780. Link to paper
  2. Gers, F. A., Schmidhuber, J., & Cummins, F. (2000). Learning to Forget: Continual Prediction with LSTM. Neural Computation, 12(10), 2451–2471. Link to paper
  3. Goodfellow, I., Bengio, Y., & Courville, A. (2016). Deep Learning. MIT Press.

Tags: Data Science Deep Learning Hands On Tutorials Lstm Machine Learning

Comment