RNN:原理与实现

RNN基础

什么是RNN

循环神经网络(RNN)是一类用于处理序列数据的神经网络。与传统的前馈神经网络不同,RNN具有“记忆”能力,可以保留和利用之前输入的信息,从而在处理当前输入时,参考上下文环境。

RNN的要点在于通过循环结构传递隐藏状态,使数据在序列中逐步流动并捕捉时间依赖关系。 即:

新隐态 = f(输入, 旧隐态) 输出 = g(新隐态)

RNN 原理:数据流视角

从数据流的视角来看,循环神经网络(RNN)的计算过程可以分为以下几个步骤:

  1. 输入序列的逐步处理

    • RNN 接收一个输入序列 $x = (x_1, x_2, ..., x_T)$,并在每一个时间步 $t$ 处理对应的输入 $x_t$
  2. 隐藏状态的更新

    • 在每个时间步 $t$,RNN 根据当前输入 $x_t$ 和前一个时间步的隐藏状态 $h_{t-1}$,通过以下公式计算当前的隐藏状态 $h_t$

      
      $$
      h_t = \tanh(W_{xh}x_t + W_{hh}h_{t-1} + b_h)
      $$
      
      • 这里:

      • $W_{xh}$ 是输入到隐藏层的权重矩阵。

      • $W_{hh}$ 是隐藏层到隐藏层的权重矩阵。

      • $b_h$ 是隐藏层的偏置向量。

    • 作用:隐藏状态 $h_t$ 既包含了当前时间步的输入信息 $x_t$,也保留了前序时间步的上下文信息 $h_{t-1}$

  3. 输出的生成

    • 基于当前的隐藏状态 $h_t$,计算当前时间步的输出 $y_t$

      
      $$
      y_t = W_{hy}h_t + b_y
      $$
      
      • 这里:

      • $W_{hy}$ 是隐藏层到输出层的权重矩阵。

      • $b_y$ 是输出层的偏置向量。

    • 作用:生成当前时间步的输出结果 $y_t$

  4. 递归传递与上下文融合

    • 将当前的隐藏状态 $h_t$ 作为下一个时间步 $t+1$ 的前一隐藏状态 $h_{t}$,继续处理后续的输入 $x_{t+1}$

    • 通过这种方式,RNN 能够在处理当前输入的同时,结合之前的上下文信息,实现对序列数据的建模。

RNN的优势和局限

优势:

  • 处理序列数据的能力:RNN能够自然地处理变长的序列输入,不需要固定的输入长度。

  • 参数共享:时间步之间参数共享,有助于降低模型复杂度。

局限:

  • 梯度消失与梯度爆炸:在长序列中,反向传播时梯度可能会迅速衰减或增大,导致训练困难。

  • 长距离依赖问题:传统RNN难以捕捉序列中相隔较远的依赖关系。

RNN 的变种

最具代表性的是长短期记忆网络(LSTM)和门控循环单元(GRU)。但是本文不会细讲。

长短期记忆网络(LSTM)

LSTM 通过引入“细胞状态”和“门控机制”来有效缓解梯度消失问题。其核心组件包括输入门、遗忘门和输出门,控制信息的流动和更新。公式如下:


$$
f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f)
$$

$$
i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i)
$$

$$
\tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C)
$$

$$
C_t = f_t \ast C_{t-1} + i_t \ast \tilde{C}_t
$$

$$
o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o)
$$

$$
h_t = o_t \ast \tanh(C_t)
$$

其中,$\sigma$ 表示sigmoid激活函数,$\ast$ 表示按元素相乘。

门控循环单元(GRU)

GRU是LSTM的简化版本,通过合并遗忘门和输入门,减少了参数数量,同时保持了类似的性能。GRU的公式如下:


$$
z_t = \sigma(W_z \cdot [h_{t-1}, x_t] + b_z)
$$

$$
r_t = \sigma(W_r \cdot [h_{t-1}, x_t] + b_r)
$$

$$
\tilde{h}_t = \tanh(W_h \cdot [r_t \ast h_{t-1}, x_t] + b_h)
$$

$$
h_t = (1 - z_t) \ast h_{t-1} + z_t \ast \tilde{h}_t
$$

使用PyTorch实现RNN

项目结构

为使项目结构清晰,我们将代码分离到以下不同的文件中:

rnn_custom_pytorch/
├── data.py          # 数据生成与数据集定义
├── model.py         # 自定义RNN模型定义
├── train.py         # 训练脚本
├── test.py          # 测试脚本
├── visualize.py     # 结果可视化脚本
├── utils.py         # 工具函数(可选)
└── requirements.txt # 依赖包

数据集生成与预处理

data.py

负责生成正弦波数据,并定义PyTorch的数据集和数据加载器。

 1import numpy as np
 2import torch
 3from torch.utils.data import Dataset, DataLoader
 4
 5# 生成正弦波数据
 6def generate_sine_wave(seq_length, num_samples):
 7    x = np.linspace(0, num_samples * 0.1, num_samples)
 8    y = np.sin(x)
 9    data = []
10    for i in range(len(y) - seq_length):
11        data.append(y[i:i+seq_length+1])
12    return np.array(data)
13
14# 自定义数据集
15class SineWaveDataset(Dataset):
16    def __init__(self, data):
17        super(SineWaveDataset, self).__init__()
18        self.x = torch.tensor(data[:, :-1], dtype=torch.float32).unsqueeze(-1)  # shape: (batch, seq_len, 1)
19        self.y = torch.tensor(data[:, -1], dtype=torch.float32).unsqueeze(-1)   # shape: (batch, 1)
20
21    def __len__(self):
22        return len(self.x)
23
24    def __getitem__(self, idx):
25        return self.x[idx], self.y[idx]
26
27def get_data_loaders(seq_length=50, num_samples=1000, batch_size=32, train_ratio=0.8):
28    data = generate_sine_wave(seq_length, num_samples)
29    train_size = int(train_ratio * len(data))
30    train_data = data[:train_size]
31    test_data = data[train_size:]
32
33    train_dataset = SineWaveDataset(train_data)
34    test_dataset = SineWaveDataset(test_data)
35
36    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
37    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
38
39    return train_loader, test_loader

考虑到这个波形太简单,下面是一个替代实现,是多个波的叠加。

 1import numpy as np
 2import torch
 3from torch.utils.data import Dataset, DataLoader
 4
 5# 生成叠加波形数据
 6def generate_combined_wave(seq_length, num_samples):
 7    x = np.linspace(0, num_samples * 0.1, num_samples)
 8    sine_wave = np.sin(x)
 9    cosine_wave = np.cos(x)
10    square_wave = np.sign(np.sin(x))
11
12    # 叠加三种波形
13    combined_wave = sine_wave + cosine_wave + square_wave
14
15    data = []
16    for i in range(len(combined_wave) - seq_length):
17        data.append(combined_wave[i:i+seq_length+1])
18    return np.array(data)
19
20# 自定义数据集
21class CombinedWaveDataset(Dataset):
22    def __init__(self, data):
23        super(CombinedWaveDataset, self).__init__()
24        self.x = torch.tensor(data[:, :-1], dtype=torch.float32).unsqueeze(-1)  # shape: (batch, seq_len, 1)
25        self.y = torch.tensor(data[:, -1], dtype=torch.float32).unsqueeze(-1)   # shape: (batch, 1)
26
27    def __len__(self):
28        return len(self.x)
29
30    def __getitem__(self, idx):
31        return self.x[idx], self.y[idx]
32
33def get_data_loaders(seq_length=50, num_samples=1000, batch_size=32, train_ratio=0.8):
34    data = generate_combined_wave(seq_length, num_samples)
35    train_size = int(train_ratio * len(data))
36    train_data = data[:train_size]
37    test_data = data[train_size:]
38
39    train_dataset = CombinedWaveDataset(train_data)
40    test_dataset = CombinedWaveDataset(test_data)
41
42    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
43    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
44
45    return train_loader, test_loader
46
47# 示例用法
48if __name__ == "__main__":
49    train_loader, test_loader = get_data_loaders()
50    for x, y in train_loader:
51        print(x.shape, y.shape)
52        break

自定义RNN实现

这部分是本文的核心。

model.py

 1# model.py
 2import torch
 3import torch.nn as nn
 4import numpy as np
 5
 6class CustomRNN(nn.Module):
 7    def __init__(self, input_size, hidden_size, output_size):
 8        super(CustomRNN, self).__init__()
 9        self.hidden_size = hidden_size
10        # 输入到隐藏层的权重
11        self.W_ih = nn.Parameter(torch.Tensor(input_size, hidden_size))
12        # 隐藏层到隐藏层的权重
13        self.W_hh = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
14        # 隐藏层到输出的权重
15        self.W_ho = nn.Parameter(torch.Tensor(hidden_size, output_size))
16        # 隐藏层偏置
17        self.bias_h = nn.Parameter(torch.Tensor(hidden_size))
18        # 输出层偏置
19        self.bias_o = nn.Parameter(torch.Tensor(output_size))
20        self.init_parameters()
21
22    def init_parameters(self):
23        # 使用均匀分布初始化权重
24        nn.init.kaiming_uniform_(self.W_ih, a=np.sqrt(5))
25        nn.init.kaiming_uniform_(self.W_hh, a=np.sqrt(5))
26        nn.init.kaiming_uniform_(self.W_ho, a=np.sqrt(5))
27        nn.init.zeros_(self.bias_h)
28        nn.init.zeros_(self.bias_o)
29
30    def forward(self, x):
31        """
32        x: 输入序列,形状为 (batch, seq_len, input_size)
33        """
34        batch_size, seq_len, _ = x.size()
35        h_t = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始化隐藏状态
36
37        for t in range(seq_len):
38            x_t = x[:, t, :]  # 当前时间步输入
39            h_t = torch.tanh(x_t @ self.W_ih + h_t @ self.W_hh + self.bias_h)  # 更新隐藏状态
40
41        output = h_t @ self.W_ho + self.bias_o  # 最终输出
42        return output

结构如下

<!-- Input Layer -->
<rect x="50" y="150" width="100" height="50" fill="lightblue" stroke="black" />
<text x="100" y="180" text-anchor="middle">Input (x_t)</text>

<!-- Hidden Layer -->
<rect x="250" y="150" width="100" height="50" fill="lightgreen" stroke="black" />
<text x="300" y="180" text-anchor="middle">Hidden (h_t)</text>

<!-- Output Layer -->
<rect x="450" y="150" width="100" height="50" fill="lightyellow" stroke="black" />
<text x="500" y="180" text-anchor="middle">Output</text>

<!-- Connections -->
<line x1="150" y1="175" x2="250" y2="175" stroke="black" stroke-width="2" marker-end="url(#arrowhead)" />
<text x="200" y="165" text-anchor="middle">W_ih</text>

<path d="M 300 150 C 300 100, 350 100, 350 150" fill="none" stroke="black" stroke-width="2" marker-end="url(#arrowhead)" />
<text x="325" y="110" text-anchor="middle">W_hh</text>

<line x1="350" y1="175" x2="450" y2="175" stroke="black" stroke-width="2" marker-end="url(#arrowhead)" />
<text x="400" y="165" text-anchor="middle">W_ho</text>

<!-- Biases -->
<text x="300" y="220" text-anchor="middle">bias_h</text>
<line x1="300" y1="210" x2="300" y2="200" stroke="black" stroke-width="2" marker-end="url(#arrowhead)" />

<text x="500" y="220" text-anchor="middle">bias_o</text>
<line x1="500" y1="210" x2="500" y2="200" stroke="black" stroke-width="2" marker-end="url(#arrowhead)" />

<!-- Additional Information -->
<text x="300" y="50" text-anchor="middle" font-size="20" font-weight="bold">CustomRNN Model Structure</text>
<text x="300" y="350" text-anchor="middle">Activation function: tanh (for hidden layer)</text>
### 训练与验证

train.py

负责训练模型,并在训练过程中输出损失。

 1import torch
 2import torch.nn as nn
 3import torch.optim as optim
 4import time
 5from data import get_data_loaders
 6from model import CustomRNN
 7
 8def train_model(
 9    input_size=1,
10    hidden_size=32,
11    output_size=1,
12    seq_length=50,
13    num_samples=1000,
14    batch_size=32,
15    train_ratio=0.8,
16    learning_rate=0.01,
17    epochs=100,
18    device=None
19):
20    if device is None:
21        device = 'cuda' if torch.cuda.is_available() else 'cpu'
22
23    # 获取数据加载器
24    train_loader, test_loader = get_data_loaders(
25        seq_length=seq_length,
26        num_samples=num_samples,
27        batch_size=batch_size,
28        train_ratio=train_ratio
29    )
30
31    # 实例化模型
32    model = CustomRNN(input_size, hidden_size, output_size).to(device)
33
34    # 定义损失函数和优化器
35    criterion = nn.MSELoss()
36    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
37
38    # 训练过程
39    for epoch in range(1, epochs + 1):
40        model.train()
41        epoch_loss = 0
42        start_time = time.time()
43        for batch_x, batch_y in train_loader:
44            batch_x = batch_x.to(device)
45            batch_y = batch_y.to(device)
46
47            optimizer.zero_grad()
48            outputs = model(batch_x)
49            loss = criterion(outputs, batch_y)
50            loss.backward()
51            optimizer.step()
52
53            epoch_loss += loss.item() * batch_x.size(0)
54
55        epoch_loss /= len(train_loader.dataset)
56        if epoch % 10 == 0 or epoch == 1:
57            elapsed = time.time() - start_time
58            print(f'Epoch {epoch}/{epochs}, Loss: {epoch_loss:.6f}, Time: {elapsed:.2f}s')
59
60    # 保存模型
61    torch.save(model.state_dict(), 'rnn_custom_model.pth')
62    print('Saved model to rnn_custom_model.pth')
63
64    return model, test_loader
65
66if __name__ == "__main__":
67    train_model()

test.py

加载训练好的模型,并在测试集上评估其性能。

 1import torch
 2import torch.nn as nn
 3from data import get_data_loaders
 4from model import CustomRNN
 5
 6def test_model(model_path='rnn_custom_model.pth',
 7              input_size=1,
 8              hidden_size=32,
 9              output_size=1,
10              seq_length=50,
11              num_samples=1000,
12              batch_size=32,
13              train_ratio=0.8,
14              device=None):
15    if device is None:
16        device = 'cuda' if torch.cuda.is_available() else 'cpu'
17
18    # 获取数据加载器
19    _, test_loader = get_data_loaders(
20        seq_length=seq_length,
21        num_samples=num_samples,
22        batch_size=batch_size,
23        train_ratio=train_ratio
24    )
25
26    # 实例化模型
27    model = CustomRNN(input_size, hidden_size, output_size).to(device)
28    model.load_state_dict(torch.load(model_path, map_location=device))
29    model.eval()
30
31    # 定义损失函数
32    criterion = nn.MSELoss()
33    test_loss = 0
34    with torch.no_grad():
35        for batch_x, batch_y in test_loader:
36            batch_x = batch_x.to(device)
37            batch_y = batch_y.to(device)
38            outputs = model(batch_x)
39            loss = criterion(outputs, batch_y)
40            test_loss += loss.item() * batch_x.size(0)
41
42    test_loss /= len(test_loader.dataset)
43    print(f'Test MSE Loss: {test_loss:.6f}')
44
45    return model, test_loader
46
47if __name__ == "__main__":
48    test_model()

结果与讨论

$ python train.py    
Epoch 1/100, Loss: 0.311845, Time: 0.04s
Epoch 10/100, Loss: 0.095100, Time: 0.04s
Epoch 20/100, Loss: 0.071599, Time: 0.04s
Epoch 30/100, Loss: 0.036135, Time: 0.04s
Epoch 40/100, Loss: 0.038392, Time: 0.04s
Epoch 50/100, Loss: 0.028199, Time: 0.04s
Epoch 60/100, Loss: 0.032723, Time: 0.04s
Epoch 70/100, Loss: 0.019848, Time: 0.04s
Epoch 80/100, Loss: 0.026268, Time: 0.04s
Epoch 90/100, Loss: 0.022966, Time: 0.04s
Epoch 100/100, Loss: 0.017622, Time: 0.04s
Saved model to rnn_custom_model.pth

$ python test.py     
Test MSE Loss: 0.023389

通过训练和测试脚本,我们可以观察到模型的训练损失逐步下降,测试集上的损失也保持在较低水平,表明模型在学习序列的时间依赖性。为了更直观地理解模型的表现,我们将使用一个单独的脚本进行结果可视化。

visualize.py

选择部分测试样本进行预测,并将预测结果与真实值进行对比。

 1import torch
 2import numpy as np
 3import matplotlib.pyplot as plt
 4from data import get_data_loaders
 5from model import CustomRNN
 6
 7def visualize_predictions(
 8    model_path='rnn_custom_model.pth',
 9    input_size=1,
10    hidden_size=32,
11    output_size=1,
12    seq_length=50,
13    num_samples=256*10,
14    batch_size=256,
15    train_ratio=0.8,
16    device=None,
17    num_plots=100
18):
19    if device is None:
20        device = 'cuda' if torch.cuda.is_available() else 'cpu'
21
22    # 获取数据加载器
23    _, test_loader = get_data_loaders(
24        seq_length=seq_length,
25        num_samples=num_samples,
26        batch_size=batch_size,
27        train_ratio=train_ratio
28    )
29
30    # 实例化模型
31    model = CustomRNN(input_size, hidden_size, output_size).to(device)
32    model.load_state_dict(torch.load(model_path, map_location=device))
33    model.eval()
34
35    # 获取一个批次的数据
36    with torch.no_grad():
37        for batch_x, batch_y in test_loader:
38            sample_x = batch_x.to(device)
39            sample_y = batch_y.to(device)
40            predictions = model(sample_x).cpu().numpy()
41            true = sample_y.cpu().numpy()
42            break  # 只取一个批次
43
44    # 绘制部分预测结果
45    plt.figure(figsize=(12,6))
46    plt.plot(true[:num_plots], label='True')
47    plt.plot(predictions[:num_plots], label='Predictions')
48    plt.legend()
49    plt.xlabel('Sample')
50    plt.ylabel('Value')
51    plt.title('Predictions vs True')
52    plt.show()
53    plt.savefig('predictions_vs_true.png')
54
55if __name__ == "__main__":
56    visualize_predictions()

结果如下:

gh