Done is Better Than Perfect

[딥러닝] 8. RNN 본문

공부/딥러닝

[딥러닝] 8. RNN

jimingee 2024. 7. 1. 18:55

[ 목차 ]

 

1. 순차데이터란?

2. Recurrent Neural Network

3. Vanilla RNN (가장 간단한 형태의 RNN 모델)


1. 순차 데이터란?

RNN(Recurrent Neural Network) : 시계열 데이터 같은 순차 데이터(Sequential Data) 처리를 위한 모델

 

 

순차 데이터(Sequential Data) - 예시: 시계열 데이터, 자연어 데이터

  • 순서(Order)가지고 나타나는 데이터
  • 데이터 개체간의 순서중요
  • 예) 날짜에 따른 기온 데이터, 단어들로 이루어진 문장, DNA 염기 서열, 샘플링된 소리 신호 등
  • 시계열 데이터 (Time-Series Data)
    • 일정한 시간 간격가지고 얻어낸 데이터
    • 예) 연도별 대한민국의 평균 기온, 시간별 주식 가격 기록
  • 자연어 데이터 (Natural Language)
    • 인류가 말하는 언어의미
    • 주로 문장 내에서 단어가 등장하는 순서주목

 

딥러닝을 활용한 순차 데이터 처리 예시

1. 경향성 파악 : 주가 예측, 기예측 등, 다양한 시계열 특징을 가지는 데이터적용 가능

2. 음악 장르 분석 : 오디오 파일은 본질적으로 시계열 데이터, 음파 형태 등을 분석하여 오디오 파일의 장르분석

3. 강수량예측(Precipitation Forecasting) : 구글에서 이미지 처리 기술과 결합하여 주도적으로 연구 (: MetNet)

4. 음성 인식 (Speech Recognition) : 음성에 포함된 단어나 소리를 추출 (예: Apple Siri, Google Assistant)

5. 번역기 (Translator) : 언어문장 번역, 딥러닝 발전 이후 번역의 자연스러움 향상 (예: 구글 번역, 네이버 파파고 등)

6. 챗봇 (chatbot) : 사용자의 질문에 사람처럼 응답하는 프로그램, 사용자의 질문을 분석 질문에 적절한 응답 생성

 

 

 

2. Recurrent Neural Network

 

[ Fully connected Layer가 순차 데이터를 해결할 수 없는 이유 ]

  • FC Layer는 입력 노드 개수와 출력 노드 개수가 정해짐
  • 순차데이터는 하나의 데이터를 이루는 개체 수가 다를 수 있음 (예, 문장은 모두 서로 다른 개수의 단어로 이루어짐)
  • 또한, FC Layer는 순서 고려 불가능

 

 

 

RNN ( Recurrent Neural Network )

  • 순차 데이터 처리를 위한 딥러닝 모델
  • RNN의 대표적인 구성요소 -> Hidden State: 순환구조를 구현하는 핵심 장치

 

[ 입력 데이터 (순차적 데이터) 구조 ]

  • $x_1,x_2,x_3, ... , x_n $과 같이 데이터의 나열
  • 각 $x_t$의 의미
    • 시계열 데이터 : 일정 시간 간격으로 나눠진 데이터 개체 하나
    • 자연어 데이터 : 문장 내의 단어

 

  • 시계열 데이터의 벡터 변환
    • 입력 데이터의 각 $x_t$벡터 형태
    • 시계열 데이터의 경우, 데이터를 이루는 Feature 값들을 원소로 하는 벡터변환

 

 

  • 자연어 데이터의 벡터 변환
    • 임베딩(Embedding)단어들을 숫자로 이루어진 벡터로 변환
    • 대표적인 임베딩 기법
      • One-hot Encoding : 하나의 요소만 1이고 나머지는 모두 0인 희소 벡터
      • Word2Vec 주어진 단어들을 벡터로 변환하는 기계 학습 모델 (단어간의 연관성 표현)

 

 

 

 

3. Vanilla RNN

Vanilla RNN의 구조

  • 가장 간단한 형태의 RNN모델
  • 내부에 세개의 FC Layer구성
    • $W_{hh}$ : hidden state($h_{t-1}$)변환하는 Layer가중치 행렬
    • $W_{xh}$ : 시점의 입력값($x_t$)변환하는 Layer가중치 행렬
    • $W_{hy}$ : 시점의 출력값($y_t$)변환하는 Layer가중치 행렬

 

[ Vanilla RNN의 연산 과정 - hidden state, output ]

 

  • 현재 입력값($x_t$)에 대한 새로운 hidden state ($h_t$) 계산
  • $ h_t = tanh(h_{t-1} W_{hh} + x_t W_{xh} $

 

 

 

 

  • 현재 입력값($x_t$)에 대한 새로운 출력값 ($y_t$) 계산
  • $ y_t = W_{hy}h_t $
  • 앞서 계산한 hidden state($h_t$) 이용

 

 

 

 

+) tanh는 tangent hyperbolic 함수 : 활성화 함수로 사용 (비선형성 추가)

 

 

 

[ 시간 순으로 보는 Vanilla RNN연산 과정 ]

  • 모델에 들어오는 시점의 데이터 $x_t$마다 앞서 설명한 연산 과정을 수행
  • 입력값에 따라 반복해서 출력값($y_n$)hidden state($h_n$)계산
  • 이전 시점에 생성된 hidden state다음 시점에 사용 -> recurrent
  • 여기서 RNN 모델은 동일한 RNN 모델 - 입력 시점($x_n$)이 다름을 표현하기 위해 옆으로 펼쳐서 표현했음

 

  • Hidden state의 의미
    • 특정 시점 $t$까지 들어온 입력값들의 상관 관계나 경향성 정보를 압축해서 저장
    • 모델이 내부적으로 계속 가지는 값이므로 일종의 메모리(Memory)로 볼 수 있음
  • Parameter Sharing
    • 모든 시점에서 같은 RNN 모델과 hidden state를 사용
    • Hidden state와 출력값 계산을 위한 FC Layer를 모든 시점의 입력값이 재사용
    • FC Layer 세 개가 모델 파라미터의 전부

 

 

[ Vanilla RNN의 종류 ]

사용할 입력값과 출력값의 구성에 따라 여러 종류의 RNN존재

  • many-to-one : 한 시점의값 출력값만 사용
  • many-to-many : 여러 시점의 입력값과 여러 시점의 출력값을 사용
                                            ( 입력값과 출력값에 사용하는 시점의 개수는 같을 수도 있고 다를 수도 있음 ) 

 

 

  • Encoder-Decoder : 입력값들을 받아 특정 hidden state로 인코딩한 후, 이 hidden state로 새로운 출력값 생성하는 구조

 

 

 

[ Vanilla RNN의 문제점 ]

 

  • RNN은 출력값이 시간 순서에 따라 생성
  • 각 시점의 출력값과 실제값을 비교하여 손실(Loss)값 계산 (시점마다 손실값 계산)
  • 역전파 알고리즘이 시간에 따라 작동 → Back-propagation Through Time (BPTT)
  • 입력값의 길이가 매우 길어질 경우(시점의 개수가 많을 경우) -> 초기 입력값과 나중 출력값 사이에 전파되는 기울기 값이 매우 작아질 가능성 높음
  • 기울기 소실(Vanishing Gradient)문제발생하기 쉬움 → 장기 의존성(Long-term Dependency)다루기가 어려움
  • RNN의 문제점을 해결하기 위해 LSTM, GRU 등의 모델이 제안됨

 

 

 

[ Vanilla RNN 분류 모델 구현 - 1개의 SimpleRNN layer로 구현 ]

  • Tensorflow에서는 Vanilla RNN이 SimpleRNN로 구현되어 있음
  • Vanilla RNN으로 IMDb 데이터 학습하기
    • 사용한 데이터 셋 : IMDb : 영화 정보, 사용자들의 리뷰가 포함된 데이터
    • 스탠포드 대학에서 사용자 리뷰를 별점 기반으로 리뷰가 긍정적인지 부정적인지 분석하여 클래스가 두개인 데이터 셋을 구성함
  • 모델에서 마지막 dense layer의 노드 개수가 1개로 설정 -> 활성화 함수가 sigmoid이므로 0~1 값이 나옴, 이는 확률로 해석 가능함
        
import os
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences

def load_data(num_words, max_len): # imdb 데이터셋을 불러옵니다.
    (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=num_words) # num_words: 활용할 단어의 개수

    # 단어가 가장 많은 문장의 단어 개수로 통일 -> maxlen의 수만큼 padding(0으로) 추가
    X_train = pad_sequences(X_train, maxlen=max_len) 
    X_test = pad_sequences(X_test, maxlen=max_len)
    
    return X_train, X_test, y_train, y_test

' Vanilla RNN 모델 구현 '
def build_rnn_model(num_words, embedding_len):
    model = Sequential()
    
    model.add(layers.Embedding(input_dim=num_words, output_dim=embedding_len))
    model.add(layers.SimpleRNN(units=16)) # hidden state의 크기
    model.add(layers.Dense(units=1, activation='sigmoid')) # 분류
    # 왜 dense layer의 노드 개수가 1개 인지? => sigmoid이므로 0~1 값이 나옴, 이는 확률로 해석 가능함
    
    return model

def main(model=None, epochs=5):
    # IMDb 데이터셋에서 가져올 단어의 개수
    num_words = 6000
    
    # 각 문장이 가질 수 있는 최대 단어 개수
    max_len = 130
    
    # 임베딩 된 벡터의 길이
    embedding_len = 100
    
    X_train, X_test, y_train, y_test = load_data(num_words, max_len)
    
    if model is None:
        model = build_rnn_model(num_words, embedding_len)
    
    # 모델 최적화
    optimizer = Adam(learning_rate=0.001)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
    # 모델 훈련
    hist = model.fit(X_train, y_train, epochs=epochs,batch_size=100,validation_split=0.2,shuffle=True,verbose=2)
    
    # 모델 테스트
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print()
    print("테스트 Loss: {:.5f}, 테스트 정확도: {:.3f}%".format(test_loss, test_acc * 100))
    
    return optimizer, hist

if __name__=="__main__":
    main()

 

[ 코드 수행 결과 ] 

테스트 Loss: 0.49907, 테스트 정확도: 81.984%

 

 

 

[ Vanilla RNN 예측 모델 구현 - 1개의 SimpleRNN layer로 구현 ]

  • 항공 승객 수 데이터셋을 사용하여 월별로 항공기를 이용하는 승객 수 예측하는 모델 생성
  • 사용할 데이터 셋 : 1949년 1월부터 1960년 12월까지 항공기 이용 승객 수를 월별로 기록한 데이터셋 (시계열 데이터)
  • 시계열 데이터를 사용하여 RNN 기반 모델을 학습할 때는 window size라는 개념 사용
    • window size : 모델을 한번 학습할 때 사용할 데이터의 개수를 의미
    • 그림처럼 총 10개의 데이터에서 4개의 데이터를 한번 학습에 사용한다면 window size는 4

 

import os
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def load_data(window_size):
    raw_data = pd.read_csv("./airline-passengers.csv")
    raw_passengers = raw_data["Passengers"].to_numpy()

    # 데이터의 평균과 표준편차 값으로 정규화(표준화)
    mean_passenger = raw_passengers.mean()
    stdv_passenger = raw_passengers.std(ddof=0)
    raw_passengers = (raw_passengers - mean_passenger) / stdv_passenger
    data_stat = {"month": raw_data["Month"], "mean": mean_passenger, "stdv": stdv_passenger}

    ''' 시계열 데이터 셋 구성 '''
    # window_size개의 데이터를 불러와 입력 데이터(X)로 설정하고
    # window_size보다 한 시점 뒤의 데이터를 예측할 대상(y)으로 설정하여 데이터셋 구성
    X, y = [], []
    for i in range(len(raw_passengers) - window_size):
        cur_passenger = raw_passengers[i:i + window_size]
        target = raw_passengers[i + window_size]

        X.append(list(cur_passenger))
        y.append(target)

    X = np.array(X)
    y = np.array(y)

    # 각 입력 데이터는 sequence 길이가 window_size이고, featuer 개수는 1개가 되도록 마지막에 새로운 차원 추가
    # 즉, (전체 데이터 개수, window_size) -> (전체 데이터 개수, window_size, 1)이 되도록 변환
    X = X[:, :, np.newaxis]

    # 학습 데이터 80%, 테스트 데이터 20%
    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test, data_stat

''' Vanilla RNN 모델 구현 '''
def build_rnn_model(window_size):
    model = Sequential()

    model.add(layers.SimpleRNN(units=4,input_shape=(window_size, 1))) # 자연어 데이터가 아니라 시계열 데이터이므로 embedding layer 없음 -> 따라서 input_shape을 알려줘야 할 필요있음
    model.add(layers.Dense(units=1)) # 활성화 함수 사용 x ->모델의 출력값 자체를 확률로 사용

    return model
    
def plot_result(X_true, y_true, y_pred, data_stat):
    # 표준화된 결과를 다시 원래 값으로 변환
    y_true_orig = (y_true * data_stat["stdv"]) + data_stat["mean"]
    y_pred_orig = (y_pred * data_stat["stdv"]) + data_stat["mean"]

    # 테스트 데이터에서 사용한 날짜들만 가져옴
    test_month = data_stat["month"][-len(y_true):]

    # 모델의 예측값, 실제값 그래프
    fig = plt.figure(figsize=(8, 6))
    ax = plt.gca()
    ax.plot(y_true_orig, color="b", label="True")
    ax.plot(y_pred_orig, color="r", label="Prediction")
    ax.set_xticks(list(range(len(test_month))))
    ax.set_xticklabels(test_month, rotation=45)
    ax.set_title("RNN Result")
    ax.legend(loc="upper left")
    plt.savefig("airline_rnn.png")
    elice_utils.send_image("airline_rnn.png")

def main(model=None, epochs=100):
    tf.random.set_seed(2022)

    window_size = 4
    X_train, X_test, y_train, y_test, data_stat = load_data(window_size)

    if model is None:
        model = build_rnn_model(window_size)

    # 모델 최적화
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='MeanSquaredError')

    # 모델 학습
    hist = model.fit(X_train, y_train, batch_size=8,epochs=epochs,shuffle=True,verbose=2)
    
    # 모델 테스트
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    print()
    print("테스트 MSE: {:.5f}".format(test_loss))
    print()
    
    y_pred = model.predict(X_test)
    plot_result(X_test, y_test, y_pred, data_stat)

    return optimizer, hist

if __name__ == "__main__":
    main()

 

 

 

[ Deep Vanilla RNN 모델 구현 -  여러개의 SimpleRNN layer로 구현 ]

  • SimpleRNN 또한 Convolutional Layer 처럼 하나의 Layer 라고 볼 수 있기 때문에 여러 층으로 쌓을 수 있음
  • 여러 SimpleRNN 층으로 이루어진 모델 -> 심층 RNN(Deep RNN) 모델
  • 실험에서 SimpleRNN이 하나로 이루어진 모델과 두개로 이루어진 모델의 성능 비교
    • 데이터셋 : numpy를 이용하여 2개의 sin 함수를 조합한 간단한 시계열 데이터
    • Window Size는 50, 모델의 예측 성능을 Mean Squared Error(MSE) 점수로 확인
  • Deep Vanilla RNN은 many-to-many RNN 구조 사용 -> 생성된 N개의 output이 다음 RNN layer의 입력으로 쓰이기 때문
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
import numpy as np

def load_data(num_data, window_size):
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, num_data, 1)
    ## freq1, freq2 : 2개의 sin함수값
    ## offset : sin 함수가 시작될때가지의 지연 시간

    time = np.linspace(0, 1, window_size + 1)
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))
    series += 0.1 * np.sin((time - offsets2) * (freq2 * 10 + 10)) # sin 함수를 2개 사용하여 데이터 합쳐줌
    series += 0.1 * (np.random.rand(num_data, window_size + 1) - 0.5)
    
    num_train = int(num_data * 0.8)
    X_train, y_train = series[:num_train, :window_size], series[:num_train, -1]
    X_test, y_test = series[num_train:, :window_size], series[num_train:, -1]
    
    X_train = X_train[:, :, np.newaxis]
    X_test = X_test[:, :, np.newaxis]
    
    return X_train, X_test, y_train, y_test

''' 1개의 SimpleRNN layer를 가지는 RNN 모델 '''
def build_rnn_model(window_size):
    model = Sequential()

    model.add(layers.SimpleRNN(units=20,input_shape=(window_size, 1)))
    model.add(layers.Dense(units=1))

    return model

''' 2개의 SimpleRNN layer를 가지는 Deep RNN 모델 '''
def build_deep_rnn_model(window_size):
    model = Sequential()

    # return sequences : RNN의 종류 중 many-to-many를 사용하겠다는 의미
    # 심층 RNN 이므로 many-to-many RNN 모델 생성 (생성된 20개의 output이 다음 simple RNN의 입력으로 쓰임)
    model.add(layers.SimpleRNN(units=20,return_sequences=True,input_shape=(window_size, 1))) 
    model.add(layers.SimpleRNN(units=20))
    model.add(layers.Dense(units=1))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=20, name=None):
    # 모델 최적화
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')

    # 모델 학습
    hist = model.fit(X_train, y_train,epochs=epochs,batch_size=256,shuffle=True,verbose=2)
    
    # 모델 테스트
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    print("[{}] 테스트 MSE: {:.5f}".format(name, test_loss))
    print()

    return optimizer, hist
    
def main():
    tf.random.set_seed(2022)
    np.random.seed(2022)

    window_size = 50
    X_train, X_test, y_train, y_test = load_data(10000, window_size)

    rnn_model = build_rnn_model(window_size)
    run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")

    deep_rnn_model = build_deep_rnn_model(window_size)
    run_model(deep_rnn_model, X_train, X_test, y_train, y_test, name="Deep RNN")


if __name__ == "__main__":
    main()

 

 

 

 

[ SimpleRNN을 사용하여 Encoder-Decoder 구조를 가지는 모델 구현 ]

  • Encoder에서 나오는 출력값은 사용하지 않고, Encoder의 hidden state만 가져와서 Decoder의 초기 hidden state로 활용
  • __call__ 메소드 :  실제 입력값이 주어졌을 때 모델의 연산을 수행하는 함수
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras import layers, Sequential, Input


class EncoderDecoder(Model):
    def __init__(self, hidden_dim, encoder_input_shape, decoder_input_shape, num_classes):
        super(EncoderDecoder, self).__init__()
        
        # SimpleRNN으로 이루어진 Encoder
        self.encoder = layers.SimpleRNN(units=hidden_dim,return_state=True,input_shape=encoder_input_shape)
        # return state: hidden state를 사용하기 위한 파라미터
                                        
        # SimpleRNN으로 이루어진 Decoder
        self.decoder = layers.SimpleRNN(units=hidden_dim,return_sequences=True,input_shape=decoder_input_shape)
        # decoder도 input shape 지정할 필요 있음
        
        self.dense = layers.Dense(num_classes, activation="softmax")
        
    def call(self, encoder_inputs, decoder_inputs):
        # Encoder에 입력값을 넣어 Decoder의 초기 state로 사용할 hidden state 반환
        encoder_outputs, encoder_state = self.encoder(encoder_inputs)
        
        # Decoder에 입력값을 넣고, 초기 state는 Encoder에서 얻어낸 state(hidden state)로 설정
        decoder_outputs = self.decoder(decoder_inputs, initial_state = [encoder_state])
        
        outputs = self.dense(decoder_outputs)
        
        return outputs


def main():
    # hidden state의 크기
    hidden_dim = 20
    
    # Encoder에 들어갈 각 데이터의 모양
    encoder_input_shape = (10, 1) # encoder에 들어갈 각 시점의 데이터 - sequence 길이 :10, 각 시점의 데이터 길이 : 1
    
    # Decoder에 들어갈 각 데이터의 모양
    decoder_input_shape = (30, 1)
    
    # 분류한 클래스 개수
    num_classes = 5

    model = EncoderDecoder(hidden_dim, encoder_input_shape, decoder_input_shape, num_classes)
    
    # 모델에 넣어줄 가상의 데이터 생성
    encoder_x, decoder_x = tf.random.uniform(shape=encoder_input_shape), tf.random.uniform(shape=decoder_input_shape)
    encoder_x, decoder_x = tf.expand_dims(encoder_x, axis=0), tf.expand_dims(decoder_x, axis=0)
    y = model(encoder_x, decoder_x)

    # 모델 정보 출력
    model.summary()

if __name__ == "__main__":
    main()

 

[ 코드 실행 결과 ] 

 

Comments