Advanced

LSTM Model Training

Build, train, and evaluate an LSTM neural network using PyTorch. Prepare sequence data, implement the training loop with early stopping, and generate price predictions.

LSTM Model Architecture

# app/model.py
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader


class StockDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = torch.FloatTensor(sequences)
        self.targets = torch.FloatTensor(targets)
    def __len__(self): return len(self.sequences)
    def __getitem__(self, idx): return self.sequences[idx], self.targets[idx]


class LSTMPredictor(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                           batch_first=True, dropout=dropout)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 1),
        )

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        last_hidden = lstm_out[:, -1, :]
        return self.fc(last_hidden).squeeze(-1)


class StockPredictor:
    def __init__(self, sequence_length=60, hidden_size=64, num_layers=2):
        self.seq_len = sequence_length
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.scaler = MinMaxScaler()
        self.model = None
        self.feature_columns = []

    def prepare_data(self, df: pd.DataFrame, target_col="Close",
                     feature_cols=None):
        if feature_cols is None:
            feature_cols = [c for c in df.select_dtypes(include=[np.number]).columns
                           if c != target_col]
        self.feature_columns = feature_cols + [target_col]

        data = df[self.feature_columns].values
        scaled = self.scaler.fit_transform(data)

        sequences, targets = [], []
        for i in range(self.seq_len, len(scaled)):
            sequences.append(scaled[i - self.seq_len:i])
            targets.append(scaled[i, -1])  # target is last column (Close)

        return np.array(sequences), np.array(targets)

    def train(self, df: pd.DataFrame, epochs=50, batch_size=32, lr=0.001):
        sequences, targets = self.prepare_data(df)
        split = int(len(sequences) * 0.8)

        train_ds = StockDataset(sequences[:split], targets[:split])
        val_ds = StockDataset(sequences[split:], targets[split:])
        train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
        val_dl = DataLoader(val_ds, batch_size=batch_size)

        input_size = sequences.shape[2]
        self.model = LSTMPredictor(input_size, self.hidden_size, self.num_layers)
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.MSELoss()

        best_val_loss = float("inf")
        patience, patience_counter = 10, 0

        for epoch in range(epochs):
            self.model.train()
            train_loss = 0
            for X_batch, y_batch in train_dl:
                optimizer.zero_grad()
                pred = self.model(X_batch)
                loss = criterion(pred, y_batch)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                optimizer.step()
                train_loss += loss.item()

            self.model.eval()
            val_loss = 0
            with torch.no_grad():
                for X_batch, y_batch in val_dl:
                    pred = self.model(X_batch)
                    val_loss += criterion(pred, y_batch).item()

            train_loss /= len(train_dl)
            val_loss /= len(val_dl)

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(self.model.state_dict(), "models/best_model.pt")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping at epoch {epoch}")
                    break

            if epoch % 10 == 0:
                print(f"Epoch {epoch}: train={train_loss:.6f} val={val_loss:.6f}")

        self.model.load_state_dict(torch.load("models/best_model.pt"))
        return {"train_loss": train_loss, "val_loss": best_val_loss}

    def predict(self, df: pd.DataFrame) -> np.ndarray:
        sequences, _ = self.prepare_data(df)
        self.model.eval()
        with torch.no_grad():
            preds = self.model(torch.FloatTensor(sequences))
        # Inverse transform predictions
        dummy = np.zeros((len(preds), len(self.feature_columns)))
        dummy[:, -1] = preds.numpy()
        return self.scaler.inverse_transform(dummy)[:, -1]

Training the Model

from app.model import StockPredictor
from app.data_collector import StockDataCollector
from app.indicators import TechnicalIndicators

# Prepare data
collector = StockDataCollector()
df = collector.fetch_history("AAPL", days=730)
ti = TechnicalIndicators()
df = ti.add_all(df)

# Train
predictor = StockPredictor(sequence_length=60)
metrics = predictor.train(df, epochs=50, batch_size=32)
print(f"Best validation loss: {metrics['val_loss']:.6f}")

# Predict
predictions = predictor.predict(df)
print(f"Predictions shape: {predictions.shape}")
📝
Important: LSTM models for stock prediction have significant limitations. Markets are influenced by unpredictable events, regulatory changes, and sentiment shifts that no model can foresee. Use predictions as one input among many, never as sole trading signals.

Key Takeaways

  • LSTM processes sequential data with memory cells that capture temporal patterns in price history.
  • MinMaxScaler normalizes features to [0,1] range for stable LSTM training.
  • Early stopping prevents overfitting by monitoring validation loss with patience.
  • Gradient clipping prevents exploding gradients common in RNN/LSTM training.