🏭 Caso de Uso

RNN Vanilla en PyTorch: Clasificación de Secuencias Sintéticas

Implementación completa de una RNN vanilla many-to-one en PyTorch para clasificar señales temporales sintéticas (senos y ondas cuadradas).

🐍 Python 📓 Jupyter Notebook

Fundamentos de RNN en PyTorch: clasificación de secuencias sintéticas

En este notebook vamos a construir de principio a fin un ejemplo de uso de redes neuronales recurrentes (RNN) con PyTorch, siguiendo la lógica del submódulo de Fundamentos:

  • Qué son los datos secuenciales y por qué un MLP no modela bien el orden temporal.
  • Cómo una RNN reutiliza un estado oculto para mantener contexto.
  • Cómo se formula una tarea many-to-one (entrada secuencial, una salida final).
  • Cómo entrenar, evaluar y diagnosticar un modelo recurrente.

Objetivo del notebook

Resolver una tarea de clasificación de secuencias: dada una señal temporal ruidosa de longitud fija, predecir qué tipo de dinámica la generó.

Usaremos un dataset sintético (generado por fórmula + ruido), con tres clases:

  1. Seno de baja frecuencia
  2. Seno de alta frecuencia
  3. Onda cuadrada suavemente ruidosa

Esto nos permite centrarnos en el comportamiento de la RNN sin depender de descargas externas.


Recordatorio matemático (RNN vanilla)

En una RNN estándar, para cada paso temporal (t):

[ \mathbf{h}t = anh(\mathbf{W}{xh}\mathbf{x}t + \mathbf{W}{hh}\mathbf{h}_{t-1} + \mathbf{b}_h) ]

  • (\mathbf{x}_t): entrada en el instante (t)
  • (\mathbf{h}_{t-1}): memoria (estado oculto anterior)
  • (\mathbf{h}_t): nuevo estado oculto

Para clasificación many-to-one, usamos el último estado (\mathbf{h}_T) para producir logits:

[ \mathbf{z} = \mathbf{W}_{hy}\mathbf{h}_T + \mathbf{b}_y ]

Luego aplicamos softmax implícitamente vía CrossEntropyLoss.


Enfoque computacional

  1. Generar dataset sintético con ruido controlado.
  2. Hacer un pequeño EDA (distribución de clases y visualización de secuencias).
  3. Preparar DataLoader para train/val/test.
  4. Definir una arquitectura nn.RNN + capa lineal final.
  5. Entrenar registrando loss y accuracy en train y val.
  6. Evaluar en test con métricas, matriz de confusión y análisis cualitativo.
  7. Concluir y proponer extensiones.

Nota didáctica: empezamos con una RNN vanilla porque es el modelo base conceptual antes de pasar a LSTM/GRU.

[1]

# Imports principales
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    classification_report,
)

# Configuración visual
sns.set_theme(style="whitegrid")
plt.rcParams["figure.figsize"] = (10, 4)

# Semilla para reproducibilidad
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

# Dispositivo de cómputo
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Usando dispositivo:", DEVICE)
Usando dispositivo: cuda

1) Generación del dataset sintético

Generaremos secuencias de longitud fija (T=60). Cada secuencia pertenece a una de 3 clases y añade ruido gaussiano moderado.

  • Clase 0: seno de baja frecuencia.
  • Clase 1: seno de alta frecuencia.
  • Clase 2: onda cuadrada (señal con cambios bruscos) + ruido.

Esto fuerza a la RNN a usar el patrón temporal global, no solo un punto aislado.

[2]

# Parámetros del dataset
N_SAMPLES = 3600
SEQ_LEN = 60
NOISE_STD = 0.20

CLASS_NAMES = ["seno_baja_freq", "seno_alta_freq", "onda_cuadrada"]
N_CLASSES = len(CLASS_NAMES)

# Eje temporal normalizado [0, 1]
t = np.linspace(0, 1, SEQ_LEN)


def generate_sequence(label, seq_len=SEQ_LEN, noise_std=NOISE_STD):
    """Genera una secuencia según la clase indicada y añade ruido."""
    phase = np.random.uniform(0, 2 * np.pi)

    if label == 0:
        # Baja frecuencia
        freq = np.random.uniform(1.0, 2.0)
        signal = np.sin(2 * np.pi * freq * t + phase)
    elif label == 1:
        # Alta frecuencia
        freq = np.random.uniform(3.0, 5.0)
        signal = np.sin(2 * np.pi * freq * t + phase)
    elif label == 2:
        # Onda cuadrada aproximada por el signo del seno
        freq = np.random.uniform(1.5, 3.0)
        signal = np.sign(np.sin(2 * np.pi * freq * t + phase)).astype(float)
    else:
        raise ValueError("Label fuera de rango")

    # Amplitud aleatoria suave para aumentar variabilidad
    amplitude = np.random.uniform(0.8, 1.2)
    signal = amplitude * signal

    # Ruido gaussiano
    noise = np.random.normal(loc=0.0, scale=noise_std, size=seq_len)

    return (signal + noise).astype(np.float32)


# Construcción del dataset completo
X = np.zeros((N_SAMPLES, SEQ_LEN), dtype=np.float32)
y = np.zeros((N_SAMPLES,), dtype=np.int64)

for i in range(N_SAMPLES):
    label = i % N_CLASSES  # dataset balanceado
    X[i] = generate_sequence(label)
    y[i] = label

print("Shape X:", X.shape)
print("Shape y:", y.shape)
print("Primeras etiquetas:", y[:10])
Shape X: (3600, 60)
Shape y: (3600,)
Primeras etiquetas: [0 1 2 0 1 2 0 1 2 0]

2) EDA rápido y visual

[3]

# Distribución de clases
unique, counts = np.unique(y, return_counts=True)

plt.figure(figsize=(6, 4))
plt.bar([CLASS_NAMES[u] for u in unique], counts, color=["#4c78a8", "#f58518", "#54a24b"])
plt.title("Distribución de clases")
plt.ylabel("Número de secuencias")
plt.xticks(rotation=15)
plt.show()

print("Conteos por clase:", dict(zip([CLASS_NAMES[u] for u in unique], counts)))
Output
Conteos por clase: {'seno_baja_freq': 1200, 'seno_alta_freq': 1200, 'onda_cuadrada': 1200}
[4]

# Visualizamos ejemplos de cada clase para entender el problema
fig, axes = plt.subplots(1, 3, figsize=(15, 3.8), sharey=True)

for class_id in range(N_CLASSES):
    idxs = np.where(y == class_id)[0]
    chosen = np.random.choice(idxs, size=5, replace=False)

    for idx in chosen:
        axes[class_id].plot(X[idx], alpha=0.75)

    axes[class_id].set_title(CLASS_NAMES[class_id])
    axes[class_id].set_xlabel("Paso temporal")
    if class_id == 0:
        axes[class_id].set_ylabel("Valor")

plt.suptitle("5 secuencias de ejemplo por clase")
plt.tight_layout()
plt.show()
Output
[5]

# Estadísticos básicos por clase (media y desviación típica global)
for class_id, class_name in enumerate(CLASS_NAMES):
    data = X[y == class_id]
    print(f"{class_name:18s} | media={data.mean(): .3f} | std={data.std(): .3f}")
seno_baja_freq     | media=-0.005 | std= 0.739
seno_alta_freq     | media= 0.000 | std= 0.739
onda_cuadrada      | media= 0.000 | std= 1.024

3) Preparación de train/val/test y DataLoaders

Hacemos partición estratificada para mantener proporciones de clase.

Además, la RNN de PyTorch con batch_first=True espera entradas con shape:

[ (batch_size, seq_len, input_size) ]

Como nuestra señal es univariante, input_size = 1, así que añadimos una dimensión final.

[6]

# Split train+temp / test
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.30, stratify=y, random_state=42
)

# Split temp en val / test
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=42
)

print("Train:", X_train.shape, y_train.shape)
print("Val:  ", X_val.shape, y_val.shape)
print("Test: ", X_test.shape, y_test.shape)

# Convertimos a tensores y añadimos dimensión de feature (=1)
X_train_t = torch.tensor(X_train).unsqueeze(-1)
X_val_t = torch.tensor(X_val).unsqueeze(-1)
X_test_t = torch.tensor(X_test).unsqueeze(-1)

y_train_t = torch.tensor(y_train)
y_val_t = torch.tensor(y_val)
y_test_t = torch.tensor(y_test)

# TensorDataset + DataLoader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val_t, y_val_t), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test_t, y_test_t), batch_size=batch_size, shuffle=False)
Train: (2520, 60) (2520,)
Val:   (540, 60) (540,)
Test:  (540, 60) (540,)

4) Modelo RNN many-to-one en PyTorch

Usaremos:

  • nn.RNN(input_size=1, hidden_size=32, num_layers=1)
  • Capa lineal final para clasificar en 3 clases.

Estrategia many-to-one: tomamos la representación del último paso temporal y la pasamos al clasificador.

[7]

class SequenceRNNClassifier(nn.Module):
    """RNN vanilla para clasificación many-to-one de secuencias."""

    def __init__(self, input_size=1, hidden_size=32, num_layers=1, num_classes=3, dropout=0.0):
        super().__init__()

        # Capa recurrente
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            nonlinearity="tanh",
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
        )

        # Capa de salida para clasificación
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x: (batch, seq_len, input_size)
        output, h_n = self.rnn(x)

        # h_n: (num_layers, batch, hidden_size)
        # Tomamos el estado oculto de la última capa en el último tiempo
        last_hidden = h_n[-1]  # (batch, hidden_size)

        logits = self.classifier(last_hidden)  # (batch, num_classes)
        return logits


model = SequenceRNNClassifier(
    input_size=1,
    hidden_size=32,
    num_layers=1,
    num_classes=N_CLASSES,
).to(DEVICE)

n_params = sum(p.numel() for p in model.parameters())
print(model)
print(f"Parámetros entrenables: {n_params:,}")
SequenceRNNClassifier(
  (rnn): RNN(1, 32, batch_first=True)
  (classifier): Linear(in_features=32, out_features=3, bias=True)
)
Parámetros entrenables: 1,219

5) Entrenamiento y validación

[8]

# Funciones auxiliares de entrenamiento/evaluación

def run_epoch(model, loader, criterion, optimizer=None, device=DEVICE):
    """Ejecuta una época en modo train (si hay optimizer) o eval."""
    training = optimizer is not None
    model.train() if training else model.eval()

    epoch_losses = []
    all_preds, all_targets = [], []

    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)

        # Forward
        logits = model(xb)
        loss = criterion(logits, yb)

        if training:
            # Backward + actualización de pesos
            optimizer.zero_grad()
            loss.backward()

            # Clipping opcional para estabilidad en RNN
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

        # Registro de métricas
        epoch_losses.append(loss.item())
        preds = torch.argmax(logits, dim=1)
        all_preds.append(preds.detach().cpu().numpy())
        all_targets.append(yb.detach().cpu().numpy())

    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)
    avg_loss = float(np.mean(epoch_losses))
    acc = float(accuracy_score(all_targets, all_preds))

    return avg_loss, acc


# Hiperparámetros de entrenamiento
lr = 1e-3
epochs = 35

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

history = {
    "train_loss": [],
    "val_loss": [],
    "train_acc": [],
    "val_acc": [],
}

for epoch in range(1, epochs + 1):
    train_loss, train_acc = run_epoch(model, train_loader, criterion, optimizer=optimizer)
    val_loss, val_acc = run_epoch(model, val_loader, criterion, optimizer=None)

    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["train_acc"].append(train_acc)
    history["val_acc"].append(val_acc)

    if epoch % 5 == 0 or epoch == 1:
        print(
            f"Epoch {epoch:02d}/{epochs} | "
            f"train_loss={train_loss:.4f} val_loss={val_loss:.4f} | "
            f"train_acc={train_acc:.3f} val_acc={val_acc:.3f}"
        )
Epoch 01/35 | train_loss=1.1013 val_loss=1.0999 | train_acc=0.310 val_acc=0.280
Epoch 05/35 | train_loss=1.0967 val_loss=1.0983 | train_acc=0.356 val_acc=0.322
Epoch 10/35 | train_loss=0.6805 val_loss=0.5814 | train_acc=0.585 val_acc=0.628
Epoch 15/35 | train_loss=0.5388 val_loss=0.5269 | train_acc=0.662 val_acc=0.633
Epoch 20/35 | train_loss=0.4954 val_loss=0.5105 | train_acc=0.706 val_acc=0.670
Epoch 25/35 | train_loss=0.4998 val_loss=0.5011 | train_acc=0.728 val_acc=0.702
Epoch 30/35 | train_loss=0.1919 val_loss=0.1548 | train_acc=0.946 val_acc=0.954
Epoch 35/35 | train_loss=0.1602 val_loss=0.0854 | train_acc=0.952 val_acc=0.978
[9]

# Curvas de entrenamiento: loss y accuracy para train/val
fig, axes = plt.subplots(1, 2, figsize=(13, 4))

axes[0].plot(history["train_loss"], label="Train loss")
axes[0].plot(history["val_loss"], label="Val loss")
axes[0].set_title("Evolución de la pérdida")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()

axes[1].plot(history["train_acc"], label="Train accuracy")
axes[1].plot(history["val_acc"], label="Val accuracy")
axes[1].set_title("Evolución de la accuracy")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()

plt.tight_layout()
plt.show()
Output

6) Evaluación en test: métricas y matriz de confusión

[11]

# Predicción en test
model.eval()
all_test_preds, all_test_targets = [], []

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = torch.argmax(logits, dim=1).cpu().numpy()

        all_test_preds.append(preds)
        all_test_targets.append(yb.numpy())


y_true = np.concatenate(all_test_targets)
y_pred = np.concatenate(all_test_preds)

test_acc = accuracy_score(y_true, y_pred)
print(f"Test accuracy: {test_acc:.4f}")
print("Reporte de clasificación:")
print(classification_report(y_true, y_pred, target_names=CLASS_NAMES, digits=4))
Test accuracy: 0.9778
Reporte de clasificación:
                precision    recall  f1-score   support

seno_baja_freq     1.0000    0.9500    0.9744       180
seno_alta_freq     0.9944    0.9889    0.9916       180
 onda_cuadrada     0.9421    0.9944    0.9676       180

      accuracy                         0.9778       540
     macro avg     0.9788    0.9778    0.9779       540
  weighted avg     0.9788    0.9778    0.9779       540

[12]

# Matriz de confusión
cm = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(6, 5))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=CLASS_NAMES,
    yticklabels=CLASS_NAMES,
)
plt.title("Matriz de confusión (test)")
plt.xlabel("Predicción")
plt.ylabel("Etiqueta real")
plt.xticks(rotation=20)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()
Output

7) Inspección cualitativa de aciertos y errores

[14]

# Mostramos ejemplos bien y mal clasificados para interpretación
correct_idx = np.where(y_true == y_pred)[0]
wrong_idx = np.where(y_true != y_pred)[0]

print(f"Ejemplos correctos: {len(correct_idx)}")
print(f"Ejemplos incorrectos: {len(wrong_idx)}")

# Seleccionamos algunos ejemplos para visualizar
n_show = 3
sample_correct = np.random.choice(correct_idx, size=min(n_show, len(correct_idx)), replace=False)
sample_wrong = np.random.choice(wrong_idx, size=min(n_show, len(wrong_idx)), replace=False) if len(wrong_idx) > 0 else []

fig, axes = plt.subplots(2, n_show, figsize=(15, 6), sharey=True)

# Fila superior: aciertos
for i, idx in enumerate(sample_correct):
    axes[0, i].plot(X_test[idx], color="#2ca02c")
    axes[0, i].set_title(f"OK | real={CLASS_NAMES[y_test[idx]]}\npred={CLASS_NAMES[y_pred[idx]]}", fontsize=10)
    axes[0, i].set_xlabel("t")

# Fila inferior: errores (si existen)
for i in range(n_show):
    if len(sample_wrong) > i:
        idx = sample_wrong[i]
        axes[1, i].plot(X_test[idx], color="#d62728")
        axes[1, i].set_title(f"Error | real={CLASS_NAMES[y_test[idx]]}\npred={CLASS_NAMES[y_pred[idx]]}", fontsize=10)
        axes[1, i].set_xlabel("t")
    else:
        axes[1, i].axis("off")

axes[0, 0].set_ylabel("Aciertos")
axes[1, 0].set_ylabel("Errores")
plt.suptitle("Ejemplos cualitativos de predicciones")
plt.tight_layout()
plt.show()
Ejemplos correctos: 528
Ejemplos incorrectos: 12
Output

8) Mini-experimento: ¿qué pasa si reducimos mucho hidden_size?

Para reforzar la intuición, repetimos entrenamiento rápido con una RNN más pequeña (hidden_size=8) y comparamos curvas.

Esto ayuda a entender el impacto de la capacidad del estado oculto.

[15]

# Segundo experimento corto con menor capacidad
small_model = SequenceRNNClassifier(
    input_size=1,
    hidden_size=8,
    num_layers=1,
    num_classes=N_CLASSES,
).to(DEVICE)

small_optimizer = torch.optim.Adam(small_model.parameters(), lr=1e-3)
small_criterion = nn.CrossEntropyLoss()

small_history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}
small_epochs = 20

for _ in range(small_epochs):
    tr_loss, tr_acc = run_epoch(small_model, train_loader, small_criterion, optimizer=small_optimizer)
    va_loss, va_acc = run_epoch(small_model, val_loader, small_criterion, optimizer=None)

    small_history["train_loss"].append(tr_loss)
    small_history["val_loss"].append(va_loss)
    small_history["train_acc"].append(tr_acc)
    small_history["val_acc"].append(va_acc)
[16]

# Comparación de curvas entre modelo base y modelo pequeño
fig, axes = plt.subplots(1, 2, figsize=(13, 4))

axes[0].plot(history["val_loss"], label="Val loss (hidden=32)")
axes[0].plot(small_history["val_loss"], label="Val loss (hidden=8)")
axes[0].set_title("Comparación de pérdida en validación")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()

axes[1].plot(history["val_acc"], label="Val acc (hidden=32)")
axes[1].plot(small_history["val_acc"], label="Val acc (hidden=8)")
axes[1].set_title("Comparación de accuracy en validación")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()

plt.tight_layout()
plt.show()
Output

Conclusiones

  • Hemos implementado una RNN vanilla many-to-one en PyTorch de forma completa: datos, EDA, entrenamiento y evaluación.
  • El modelo aprende patrones temporales y logra una accuracy alta en un problema sintético con ruido.
  • Al reducir demasiado hidden_size, la capacidad de representación suele bajar y las métricas empeoran.

Ideas para seguir practicando

  1. Sustituir nn.RNN por nn.LSTM y nn.GRU con el mismo pipeline.
  2. Incrementar longitud de secuencia para observar limitaciones de memoria en RNN vanilla.
  3. Probar regularización (dropout, weight decay) y early stopping.
  4. Cambiar el dataset por texto simple (sentimiento) o series reales.
  5. Experimentar con tareas many-to-many (etiquetado por paso temporal).

Si quieres, en un siguiente notebook podemos extender este ejemplo a LSTM y comparar directamente las tres arquitecturas (RNN/LSTM/GRU) con exactamente los mismos datos y métricas.