Done is Better Than Perfect

[딥러닝] 9. LSTM, GRU 본문

공부/딥러닝

[딥러닝] 9. LSTM, GRU

jimingee 2024. 7. 10. 15:56

 

Vanilla RNN의 단점을 해결하기 위해 제안된 모델인 LSTM과  GRU에 대해 알아보겠다.

LSTM과 GRU는 내부 연산 방식만 Vanilla RNN과 다르다. 즉, 입력값과 출력값을 Vanilla RNN와 동일하게 사용하면 된다.

 

[ 목차 ]

1. LSTM 소개

2. GRU 소개

3. RNN 모델의 활용


1. LSTM

  • Vanilla RNN의 기울기 소실 문제를 해결하고자 등장
  • Long Short Term Memory(장단기 메모리)의 약자 → 장기 의존성과 단기 의존성을 모두 기억할 수 있음
  • 새로 계산된 hidden state $h_t$ 를 출력값 $y_t$ 으로도 사용
  • LSTM의 구성요소 : cell state, forget gate, input gate, output gate

 

 

LSTM의 구성요소

 

Cell state

  • 기울기 소실 문제를 해결하기 위한 핵심 장치
  • 장기적으로 기억할 정보를 조절

 

 

 

Gate

3종류의 게이트를 4개의 FC Layer로 구성

  • $ W_f $: 망각게이트(Forget Gate)
  • $ W_i, W_C $: 입력게이트(Input Gate)
  • $ W_o $: 출력게이트(Output Gate)

 

Forget Gate

  • 기존 cell state에서 잊을 정보를 결정
  • $f_t =  \sigma (W_f[h_{t-1}, x_t]$
    • $ \sigma $ : sigmoid 함수
    • $ [h_{t-1}, x_t] $: $ h_{t-1}$ 벡터와 $ x_t $ 벡터를 concatenate하는연산

 

Input Gate

  • 현재 입력 받은 정보에서 cell state에
    저장할 정보 결정
  • $ i_t = \sigma(W_i [h_{t-1}, x_t]) $
  • $ \tilde{ C } = tanh(W_c[h_{t-1}, x_t ]) $

 

새로운 Cell state

  • Forget Gate와 Input Gate의 정보를
    통해 cell state 갱신
  • $ C_t = f_t * C_{t-1} + i_t * \tilde{C}_t $
    • * 연산자는 벡터의 각 원소별로 곱하는 연산 (Hadamard Product)
    • 예) [1 2 3] * [4 5 6] = [4 10 18]

 

Output Gate

  • 다음 hidden state $ h_t $와 출력값 $ y_t $을 계산 
    (새로 계산된 cell state 사용)
  • $ \o_t = \sigma(W_o[h_{t-1}, x_t]) $
  • $ h_t = o_t * tanh(C_t) = y_t $

 


2. GRU

  • Gated Recurrent Unit의 약자
  • LSTM을 개량한 모델
  • LSTM과 마찬가지로 새로 계산된 hidden state $h_t$ 를 출력값 $ y_t $ 으로도 사용 
  • LSTM이 가지는 3개의 게이트를 2개로 간소화하고 Cell State를 없앰
    • 파라미터 수가 감소하여 LSTM보다 빠른 학습 속도 가짐
    • 그럼에도 성능은 일반적으로 LSTM과 비슷한 수준

 

 

 

GRU의 구성요소

  • 2종류의 게이트를 2개의 FC Layer로 구성
  • $ W_r $ : 리셋 게이트(Reset Gate)
  • $ W_z $ : 업데이트 게이트(Update Gate)

 

 

 

Reset Gate

  • 기존 hidden state의 정보를 얼마나 초기화할지 결정하는 게이트
  • $ r_t = \sigma(W_r [h_{t-1}, x_t] ) $

  

 

 

Update Gate

  • 기존 hidden state의 정보를 얼마나 사용할지 결정하는 게이트
  • $ z_t = \sigma(W_x[h_{t-1}, x_t] ) $

 

 

 

 

 

새로운 hidden state 계산 ( 2 step으로 설명)

 

Step 1)

  • Reset Gate의 결과를 통해 새로운 hidden state의 후보 $\tilde{h}_t $ 계산
  • $ \tilde{h}_t = tanh (W_i [r_t * h_{t-1}, x_t] ) $
    • *는 hadamard product
    • $ r_t * h_{t-1} $ 부분이 점선을 통해 $ W_i $로 전달됨

 

Step 2)

  • Update Gate의 결과를 통해 새로운 hidden state 계산
  • $ h_t = (1-z_t) * h_{t-1} + z_t * \tilde{h}_t $
  • Update Gate의 정보 $ z_t $가 새로운 hidden state 정보 $ \tilde{h}_t $ , 이전 hidden state 정보 $ h_{t-1} $를 얼마나 사용할 지 결정함
  • Update Gate정보만이 새로운 hidden state 계산에 사용됨
    • GRU의 Update GateLSTMForget GateInput Gate하나로 합친 유사한 역할

 

 

[ 장기 의존성 문제 확인 - Vanilla RNN vs LSTM vs GRU 비교 ]

  • RNN은 시계열 데이터와 같은 경향성을 학습하는 데 훌륭한 성능 제공. 하지만 모델 학습에 사용하는 데이터의 길이(sequence)가 길어질수록 학습 성능이 저하되는 문제 있음.
  • 거리가 먼 입력값과 출력값 사이에 역전파 되는 기울기 값이 점점 0에 수렴하는 기울기 소실 문제 때문, 결과적으로 길이가 긴 시계열 데이터에서 장기 의존성을 학습하는 데 약점이 있음 -> 이를 보완하기 위해 LSTM, GRU 모델 제안됨
  • 시계역 데이터 회귀 문제
  • 데이터 셋 : 1980.01.01 ~ 1990.12.31의 멜버른 지역 최저 기온 데이터 셋
  • SimpleRNN, LSTM, GRU는 입력 형태가 동일함
 
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

import pandas as pd
import numpy as np

def load_data(window_size):
    raw_data = pd.read_csv("./daily-min-temperatures.csv")
    raw_temps = raw_data["Temp"]

    mean_temp = raw_temps.mean()
    stdv_temp = raw_temps.std(ddof=0)
    raw_temps = (raw_temps - mean_temp) / stdv_temp # 데이터 정규화

    # window size만큼 x,y 값 저장
    X, y = [], []
    for i in range(len(raw_temps) - window_size):
        cur_temps = raw_temps[i:i + window_size]
        target = raw_temps[i + window_size]

        X.append(list(cur_temps))
        y.append(target)

    X = np.array(X)
    y = np.array(y)
    X = X[:, :, np.newaxis]

    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test


''' 1. SimpleRNN + Fully-connected Layer 모델 '''
def build_rnn_model(window_size):
    model = Sequential()

    model.add(layers.SimpleRNN(units=128, input_shape=(window_size,1)))
    model.add(layers.Dense(units=32, activation='relu'))
    model.add(layers.Dense(units=1))

    return model


''' 2. LSTM + Fully-connected Layer 모델 '''
def build_lstm_model(window_size):
    model = Sequential()

    model.add(layers.LSTM(units=128, input_shape=(window_size,1)))
    model.add(layers.Dense(units=32, activation='relu'))
    model.add(layers.Dense(units=1))

    return model

''' 3. GRU + Fully-connected Layer 모델 '''
def build_gru_model(window_size):
    model = Sequential()

    model.add(layers.GRU(units=128, input_shape=(window_size,1)))
    model.add(layers.Dense(units=32, activation='relu'))
    model.add(layers.Dense(units=1))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=10, model_name=None):
    # 모델 최적화
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')

    # 모델 학습(hyperparameter 설정)
    hist = model.fit(X_train, y_train, batch_size=64, epochs=epochs, shuffle=True, verbose=2)
    
    # 모델 테스트
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    
    return test_loss, optimizer, hist

def main(window_size):
    tf.random.set_seed(2022)
    X_train, X_test, y_train, y_test = load_data(window_size)

    rnn_model = build_rnn_model(window_size)
    lstm_model = build_lstm_model(window_size)
    gru_model = build_gru_model(window_size)

    rnn_test_loss, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test, model_name="RNN")
    lstm_test_loss, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test, model_name="LSTM")
    gru_test_loss, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test, model_name="GRU")
    
    return rnn_test_loss, lstm_test_loss, gru_test_loss

if __name__ == "__main__":
    # 10일치 데이터를 보고 다음날의 기온 예측 -> window size = 10
    rnn_10_test_loss, lstm_10_test_loss, gru_10_test_loss = main(10)
    
    # 300일치 데이터를 보고 다음날의 기온 예측 -> window size = 300
    rnn_300_test_loss, lstm_300_test_loss, gru_300_test_loss = main(300)
    
    print("=" * 20, "시계열 길이가 10 인 경우", "=" * 20)
    print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_10_test_loss))
    print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_10_test_loss))
    print("[GRU ] 테스트 MSE = {:.5f}".format(gru_10_test_loss))
    print()
    
    print("=" * 20, "시계열 길이가 300 인 경우", "=" * 20)
    print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_300_test_loss))
    print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_300_test_loss))
    print("[GRU ] 테스트 MSE = {:.5f}".format(gru_300_test_loss))
    print()

 

 

[ 코드 실행 결과 ]

  • 시계열 길이(window size)가 10일 때, 모델 3개 성능 차이 별로 없음
  • 시계열 길이(window size)가 300일 때, LSTM과 GRU는 시계열 길이 (window size)가 길어도 성능에 영향을 덜 받음
  • RNN의 경우 장기 의존성 문제를 해결하지 못해서 성능이 낮음
==================== 시계열 길이가 10 인 경우 ====================
[RNN ] 테스트 MSE = 0.30041
[LSTM] 테스트 MSE = 0.30050
[GRU ] 테스트 MSE = 0.29302

==================== 시계열 길이가 300 인 경우 ====================
[RNN ] 테스트 MSE = 0.33759
[LSTM] 테스트 MSE = 0.29616
[GRU ] 테스트 MSE = 0.29959

 

 


 

 

[ RNN 기반 모델을 통한 분류 작업 ]

  • 문장을 통해 별점 예측 (5개의 클래스 분류 모델)
  • 데이터셋 : 아마존의 식품 리뷰 데이터 (전체 데이터셋에서 실제 리뷰 문장과 해당 리뷰어의 평가 점수 만을 추출한 데이터 셋)
  • 문장 내 각 단어를 숫자로 변환하는 Tokenizer 적용
  • 각 RNN 모델 구성 (Embedding, RNN, Dense)은 동일하게 구성
    • 1. Embedding layer : max_features에서 embedding_size로 각 문장을 구성하는 벡터의 크기를 줄이는 embedding layer
    • 2. RNN layer : hidden state의 크기를 20으로 설정한 RNN 기반 layer
    • 3. Dense layer : 각 결과값을 5개의 클래스로 분류하는 dense layer
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

import pandas as pd


def load_data(max_len):
    data = pd.read_csv("./review_score.csv")
    # 입력 데이터: 리뷰 문장 / 라벨 데이터: 해당 리뷰의 평점
    X = data['Review']
    y = data['Score']
    y = y - 1 # 값을 1~5에서 0~4로 변경

    # 문장 내 각 단어를 숫자로 변환하는 Tokenizer 적용 # 자연어 처리
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(X) # 문장 -> 숫자로 변환
    X = tokenizer.texts_to_sequences(X) # 숫자로 이루어진 문장 생성

    # 전체 단어 중에서 가장 큰 숫자로 mapping된 단어의 숫자 가져옴
    # 즉, max_features는 전체 데이터셋에 등장하는 겹치지 않는 단어의 개수 + 1과 동일
    max_features = max([max(_in) for _in in X]) + 1

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # 모든 문장들을 가장 긴 문장의 단어 개수가 되게 padding 추가
    X_train = pad_sequences(X_train, maxlen=max_len)
    X_test = pad_sequences(X_test, maxlen=max_len)

    return X_train, X_test, y_train, y_test, max_features


''' 1. Simple RNN 기반의 모델 '''
def build_rnn_model(max_features, embedding_size):
    model = Sequential()

    model.add(layers.Embedding(max_features, embedding_size)) 
    model.add(layers.SimpleRNN(units=20)) 
    model.add(layers.Dense(units=5, activation='softmax'))

    return model


''' 2. LSTM 기반 모델 '''
def build_lstm_model(max_features, embedding_size):
    model = Sequential()

    model.add(layers.Embedding(max_features, embedding_size))
    model.add(layers.LSTM(units=20)) #  hidden state 크기: 20
    model.add(layers.Dense(units=5, activation='softmax'))

    return model


''' 3. GRU 기반 모델 '''
def build_gru_model(max_features, embedding_size):
    model = Sequential()

    model.add(layers.Embedding(max_features, embedding_size))
    model.add(layers.GRU(units=20)) #  hidden state 크기: 20
    model.add(layers.Dense(units=5, activation='softmax'))

    return model


def run_model(model, X_train, X_test, y_train, y_test, epochs=10):
    # 모델 최적화 
    # label이 one-hot vector로 이루어진 경우에는 loss로 categorical_crossentropy 사용
    # 현재 데이터 셋이 0-4의 정수 라벨로 이루어졌기 때문에 loss로 sparse_categorical_crossentropy 사용함
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # 모델 학습
    hist = model.fit(X_train, y_train, batch_size=256,epochs=epochs,shuffle=True, verbose=2)
    
    # 모델 테스트
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)

    return test_loss, test_acc, optimizer, hist

def main():
    tf.random.set_seed(2022)
    max_len = 150
    embedding_size = 128

    X_train, X_test, y_train, y_test, max_features = load_data(max_len)
    rnn_model = build_rnn_model(max_features, embedding_size)
    lstm_model = build_lstm_model(max_features, embedding_size)
    gru_model = build_gru_model(max_features, embedding_size)

    rnn_test_loss, rnn_test_acc, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test)
    lstm_test_loss, lstm_test_acc, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test)
    gru_test_loss, gru_test_acc, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test)

    print()
    print("=" * 20, "모델 별 Test Loss와 정확도", "=" * 20)
    print("[RNN ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(rnn_test_loss, rnn_test_acc * 100))
    print("[LSTM] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(lstm_test_loss, lstm_test_acc * 100))
    print("[GRU ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(gru_test_loss, gru_test_acc * 100))

if __name__ == "__main__":
    main()

 

 

 

[ 코드 수행 결과 ]

  • 성능 : LSTM > GRU > Vanilla RNN
==================== 모델 별 Test Loss와 정확도 ====================
[RNN ] 테스트 Loss: 1.39075, 테스트 Accuracy: 63.370%
[LSTM] 테스트 Loss: 1.11475, 테스트 Accuracy: 67.670%
[GRU ] 테스트 Loss: 1.27642, 테스트 Accuracy: 66.050%

 


 

[ RNN 기반 모델을 통한 회귀 작업 ]

  • 한달 이상의 입력 데이터를 사용하여 마지막 날짜 다음날의 종가를 예측하는 모델
  • 데이터셋 : Apple 주가 데이터셋 - 날짜별로 시작가, 일 최고가, 일 최저가, 종가
    • 1980.12.12 ~ 2020.04.01 의 주가 기록
    • 각 시점의 feature 개수를 4개로 구성 - 시작가, 일 최고가, 일 최저가, 종가
  • RNN 모델 구성
    • RNN Layer : hidden_state 크기 256, input_shape=(window_size, num_features)
    • 3개의 Dense Layer : 각각 node의 개수 64, 16, 1개 / 활성화 함수 : relu
    • +) 데이터가 수치 시계열 데이터이므로 Embedding layer 필요 없음
    • 모델에 한번에 넣어줄 시점의 개수, window size는 30개로 설정
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker


def load_data(window_size):
    raw_data_df = pd.read_csv("./AAPL.csv", index_col="Date")
    
    # 데이터 표준화
    scaler = StandardScaler()
    raw_data = scaler.fit_transform(raw_data_df)
    plot_data = {"mean": scaler.mean_[3], "var": scaler.var_[3], "date": raw_data_df.index}

    # 입력 데이터(X): 시작가, 일 최고가, 일 최저가, 종가 데이터
    # 라벨 데이터(y): 종가 데이터(4번째 컬럼)
    raw_X = raw_data[:, :4]
    raw_y = raw_data[:, 3]

    # 데이터셋 구성
    # 입력 데이터(X): window_size개의 데이터
    # 예측할 대상(y): window_size보다 한 시점 뒤의 데이터
    X, y = [], []
    for i in range(len(raw_X) - window_size):
        cur_prices = raw_X[i:i + window_size, :]
        target = raw_y[i + window_size]

        X.append(list(cur_prices))
        y.append(target)

    X = np.array(X)
    y = np.array(y)

    # 학습 데이터 80%, 테스트 데이터 20%
    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test, plot_data


''' SimpleRNN 기반 모델 '''
def build_rnn_model(window_size, num_features):
    model = Sequential()
    model.add(layers.SimpleRNN(units=256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(units=64, activation='relu'))
    model.add(layers.Dense(units=16, activation='relu'))
    model.add(layers.Dense(units=1)) 

    return model


''' LSTM 기반 모델 '''
def build_lstm_model(window_size, num_features):
    model = Sequential()
    model.add(layers.LSTM(units=256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(units=64, activation='relu'))
    model.add(layers.Dense(units=16, activation='relu'))
    model.add(layers.Dense(units=1))

    return model


''' GRU 기반 모델 '''
def build_gru_model(window_size, num_features):
    model = Sequential()
    model.add(layers.GRU(units=256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(units=64, activation='relu'))
    model.add(layers.Dense(units=16, activation='relu'))
    model.add(layers.Dense(units=1))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=10, name=None):
    # 모델 최적화
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')

    # 모델 학습(학습을 위한 hyperparameter 설정)
    hist = model.fit(X_train, y_train, batch_size=128, epochs=epochs, shuffle=True, verbose=2)
    
    # 모델 테스트
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    print("[{}] 테스트 loss: {:.5f}".format(name, test_loss))
    print()

    return optimizer, hist


def plot_result(model, X_true, y_true, plot_data, name):
    y_pred = model.predict(X_true)

    # 표준화된 결과를 원래 값으로 변환
    y_true_orig = (y_true * np.sqrt(plot_data["var"])) + plot_data["mean"]
    y_pred_orig = (y_pred * np.sqrt(plot_data["var"])) + plot_data["mean"]

    # 테스트 데이터에서 사용한 날짜들
    test_date = plot_data["date"][-len(y_true):]

    # 모델의 예측값, 실제값 그래프 생성
    fig = plt.figure(figsize=(12, 8))
    ax = plt.gca()
    ax.plot(y_true_orig, color="b", label="True")
    ax.plot(y_pred_orig, color="r", label="Prediction")
    ax.set_xticks(list(range(len(test_date))))
    ax.set_xticklabels(test_date, rotation=45)
    ax.xaxis.set_major_locator(ticker.MultipleLocator(100))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(100))
    ax.set_title("{} Result".format(name))
    ax.legend(loc="upper left")
    plt.tight_layout()
    plt.savefig("apple_stock_{}".format(name.lower()))
    
    elice_utils.send_image("apple_stock_{}.png".format(name.lower()))

def main():
    tf.random.set_seed(2022)

    window_size = 30
    X_train, X_test, y_train, y_test, plot_data = load_data(window_size)
    num_features = X_train[0].shape[1]

    rnn_model = build_rnn_model(window_size, num_features)
    lstm_model = build_lstm_model(window_size, num_features)
    gru_model = build_gru_model(window_size, num_features)

    run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")
    run_model(lstm_model, X_train, X_test, y_train, y_test, name="LSTM")
    run_model(gru_model, X_train, X_test, y_train, y_test, name="GRU")

    plot_result(rnn_model, X_test, y_test, plot_data, name="RNN")
    plot_result(lstm_model, X_test, y_test, plot_data, name="LSTM")
    plot_result(gru_model, X_test, y_test, plot_data, name="GRU")

if __name__ == "__main__":
    main()

 

[ 코드 수행 결과 ]

  • Loss, 예측결과 그래프에서 성능 : GRU > LSTM > RNN 순으로 좋음
  • GRU, LSTM에 비해 RNN의 성능 현저히 떨어짐 -> GRU, LSTM은 기울기 소실 문제 해결로 성능이 더 좋음
[RNN] 테스트 loss: 0.90601
[LSTM] 테스트 loss: 0.08191
[GRU] 테스트 loss: 0.03845

 

RNN 기반 모델 예측 결과 그래프

 


3. RNN 모델의 활용

 

RNN/LSTM/GRU모델은 회귀분석분류모두 활용 가능

  • 회귀 분석 : 각 시점의 출력값이 어느 정도일지 예측 (예: 주가 예측, 기온 예측 )
  • 분류 작업 : 각 시점의 데이터가 어느 클래스일지 예측 (예: 문장에서 다음 단어 예측, 각 단어의 품사 예측)

 

모델 학습을 위한 손실 함수 계산

  • 각 시점별 예측값 $\hat{y}_t$와 실제값 $y_t$ 을 통해 시점별 손실 함수값 계산 -> $L_t$
  • $L_t$를 모두 더하여 최종 손실값 계산 -> $L  = \sum_{t=1}^{T} L_t$
  • 주로 사용하는 손실함수(f) 
    • 회귀분석 : Mean Squared Error(MSE)
    • 분류 : Cross Entropy

 

Comments