보안_기타
시계열 데이터로 훈련시킨 LSTM모델에 대한 FGSM 공격
정지홍
2024. 9. 14. 06:25
필요한 것들을 임포트 합니다.
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
우선 필요한 시계열 데이터를 생성한다.
함수들....date_range는 해당되는 날짜 범위를 생성한다.
np.linspace( start , end , length ) start에서 end까지의 값의 length개의 값을 생성한다.
np.random.normal에서 scale은 표준 편차을 의미.size는 갯수. 즉, 평균은 0 , std=0.1 , size=100*24개의 데이터를 만들 예정
# 1. 연습용 시계열 데이터 생성한다.
data = {
'date': pd.date_range(start='2023-01-01', periods=100*24, freq='H'), # 2023년 1월 1일부터 100일간의 날짜 생성
'value': np.linspace(0, 10, 100*24) + np.random.normal(scale=0.1, size=100*24) # sin 함수에 노이즈를 더한 값 생성
}
# 2. 데이터프레임으로 변환
df = pd.DataFrame(data)
# 3. date컬럼을 pandas의 datetime형식으로 변환
df['date'] = pd.to_datetime(df['date'])
# 4. date컬럼을 인덱스로 설정 (LSTM 모델에서는 인덱스를 사용하지 않음, 하지만 시계열 데이터를 명확히 하기 위해)
df.set_index('date', inplace=True)


정규화를 시킬것이다. lstm은 정규화된 데이터에서 좀 더 정확한 예측을 한다.
# 5. 시계열 데이터를 정규화 (0과 1 사이 값으로 변환)
# df.['value'].values.reshape은 pandas datetime에서 value series만 추출해서 numpy배열로 바꾼다. 그리고 reshape으로 -1,1을 주었는데 -1은행을 자동으로 결정하라는 의미이다. 열은 1로 설정했다.
# 위의 과정으로 (100*24, )을 (100*24 , 1)의 2차원으로 변형함.
# 하는 이유는 minmaxscaler는 2차원을 줘야하기 때문
scaler = MinMaxScaler(feature_range=(0, 1))
df['value'] = scaler.fit_transform(df['value'].values.reshape(-1, 1))
# 위의 결과로 데이터 프레임의 value는 1차원에서 2차원으로 변형후에 minMaxScaler를 적용시킨후에 datetime frame에 적용시킴.

위의 프레임을 lstm모델 input을 위해서 변환시킬것이다.
그러기 위해서 우선 x_sequences를 생성하고, 이 시점의 t+1의 첫번쨰 값을 y_sequences들에 넣어줄거다.
# 6. 시계열 데이터를 LSTM 모델의 입력을 위해서 변환 시키는 함수이다.
def create_sequences(data, seq_length):
sequences = [] # 입력 데이터 시퀀스
labels = [] # 시퀀스에 해당하는 실제 값 (정답)
# 주어진 길이(seq_length)만큼의 시퀀스를 생성하고, 그 다음 값을 label로 설정
for i in range(len(data) - seq_length):
sequences.append(data[i:i + seq_length])
labels.append(data[i + seq_length])
return np.array(sequences), np.array(labels)
# 7. 시퀀스 길이 설정 (과거 10일의 데이터를 보고 다음 값을 예측)
seq_length = 15
X, y = create_sequences(df['value'].values, seq_length)

입력은 3차원을 줘야하니 3차원으로 변환시킬 것이다.
# (samples, time_steps, features) 형식으로 맞추자
X = X.reshape(X.shape[0], X.shape[1], 1)

# 9. 학습 데이터와 테스트 데이터 분리 (80%는 학습, 20%는 테스트)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:] # 학습용 데이터
y_train, y_test = y[:train_size], y[train_size:] # 테스트용 데이터

lstm 모델 쌓고 학습까지...
model = Sequential()
# 첫 번째 LSTM 레이어 (return_sequences=True로 LSTM 출력을 다음 LSTM 레이어에 전달)
model.add(LSTM(128, return_sequences=True, input_shape=(seq_length, 1)))
model.add(Dropout(0.2)) # 과적합 방지를 위한 Dropout
# 두 번째 LSTM 레이어
model.add(LSTM(64, return_sequences=False))
model.add(Dropout(0.2))
# 출력 레이어 (예측값을 출력)
model.add(Dense(32, activation='relu')) # 중간에 Dense 레이어 추가
model.add(Dense(1)) # 최종 출력은 1개의 예측값
# 모델 컴파일 (Adam 옵티마이저와 MSE 손실 함수 사용)
model.compile(optimizer='adam', loss='mean_squared_error')
# 모델 학습
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))
# 예측 수행
predictions = model.predict(X_test)
# 예측값과 실제값의 스케일을 원래 값으로 되돌림
predictions_rescaled = scaler.inverse_transform(predictions)
y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
# RMSE 계산
rmse = np.sqrt(mean_squared_error(y_test_rescaled, predictions_rescaled))
print(f"RMSE: {rmse}")

FGSM 함수를 정의한다...
- 기울기(gradient)는 입력 데이터가 모델의 손실(예측과 실제 값 간의 차이)에 얼마나 영향을 미치는지를 나타낸다. 기울기는 주로 모델이 입력 변화에 얼마나 민감한지를 분석할때 사용.
- 기울기가 크다: 입력 데이터의 작은 변화가 손실에 큰 영향을 미친다는 의미
- 기울기가 작다: 입력 데이터의 변화가 손실에 거의 영향을 미치지 않는다는 의미
- 기울기가 양수다: 입력 데이터의 값이 증가되면, 손실 값이 더 커진다는 의미이다. 이는 모델의 예측값이 실제값에 더 멀어짐을 의미.
- 만약 기울기가 양수라면, 입력 데이터의 값을 증가시키면 손실이 커지는 방향으로 기울어진다. 이는 입력 데이터의 변화가 모델 예측에 부정적인 영향을 미친다는 것을 의미한다.
- 기울기가 음수다: 입력 데이터의 값이 증가되면, 손실값이 줄어든다는 의미이며, 이는 모델위 예측값이 실제값보다 더 가까워짐을 의미.
- 만약 기울기가 음수라면, 입력 데이터의 값을 증가시키면 손실이 줄어드는 방향으로 기울어진다. 이는 입력 데이터의 변화가 모델 예측에 긍정적인 영향을 미친다는 것을 의미한다
- with as는 컨텍스트 관리를 해준다, 즉, context management는 원하는 타이밍에 리소스를 할당하고 해제할때 사용한다.
- convert_to_tensor는 입력 데이터를 텐서로 변환해준다.
- tf.GradientTape은 자동으로 기울기를 계산하는 함수이다.
- tf.GradientTape.watch()는 GradientTape이 특정 텐서에 대해서 기울기를 추적할 수 있도록 해준다.
- 즉, with tf.GradientTape() as tape:에서는 tape이 자동으로 기울기를 계산하는데 이러한 것을 watch가 추적해준다.
- 여기서 watch가 필요한 이유....
- 학습가능한 변수는 tf.Variable이 자동으로 추적하지만,(예를 들면 모델의 가중치...)
- 이외의 입력 데이터나 일반 텐서를 추적하지않음. 그래서 입력 데이터같은것에 기울기가 필요한 경우에는 watch를 사용. ( 참고로 텐서는 다차원 배열을 의미. 딥러닝 라이브러리의 데이터 처리의 기본 단위)
- 적대적 예시 뿐만 아니라 신경망 해석 및 설명할때도 사용한다.(입력에 대한 민감도 분석 or 특정 입력값이 예측에 미치는 영향 확인)
- 여기서 watch가 필요한 이유....
- tape.gradient( loss , input_data )는 손실에 대한 입력 데이터의 기울기를 계산한다. 이는 역전파 과정을 통해서 input_data에 대해 손실 함수가 얼마나 민감히 반응하는지 알려줌.
- 즉, input_data가 loss에 얼마나 영향을 받는지...

# 입력 데이터에 대한 기울기 계산. 즉, 모델의 입력값에 대해서 기울기를 계산하고, 이 기울기로 공격을 수행할거임
def create_adversarial_pattern(model, input_data, true_label):
# MSE 손실 함수 객체 생성. 손실 함수는 예측값과 실제값 사이의 차이를 제곱해 평균을 낸 값을 계산함
loss_object = tf.keras.losses.MeanSquaredError()
with tf.GradientTape() as tape:
# 입력 데이터에 대한 기울기를 추적하기 위해 해당 입력 데이터를 감시(watch)함.
tape.watch(input_data)
prediction = model(input_data) # 모델에 입력 데이터를 넣고 예측값(prediction)을 얻음.
loss = loss_object(true_label, prediction) # 모델의 예측값과 실제 레이블(true_label) 간의 손실(loss)을 계산. 여기서는 MSE 손실 함수를 사용해 예측값과 실제값의 차이를 측정함.
# 손실에 대한 입력 데이터의 기울기 계산
gradient = tape.gradient(loss, input_data)
return gradient
# FGSM 공격 함수 정의
def fgsm_attack(data, epsilon, gradient):
sign_gradient = tf.sign(gradient) # 기울기의 부호를 계산
perturbed_data = data + epsilon * sign_gradient # 원래 데이터에 노이즈를 더해 공격 수행
return perturbed_data
epsilon = 0.1 # 공격 강도
X_test_tensor = tf.convert_to_tensor(X_test)
# create_adversarial_pattern을 이용해서 입력데이터에 대해서 기울기를 계산합니다.
gradients = create_adversarial_pattern(model, X_test_tensor, y_test)

X_test_adversarial = fgsm_attack(X_test_tensor, epsilon, gradients)

# 적대적 데이터로 모델 예측 수행
predictions_adversarial = model.predict(X_test_adversarial)
# 적대적 데이터에 대한 RMSE 계산
predictions_adversarial_rescaled = scaler.inverse_transform(predictions_adversarial)
rmse_adversarial = np.sqrt(mean_squared_error(y_test_rescaled, predictions_adversarial_rescaled))
print(f"Adversarial RMSE: {rmse_adversarial}")

import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
# 실제값
plt.plot(y_test_rescaled, label='True Values', color='black')
# 원래 예측값
plt.plot(predictions_rescaled, label='Original Predictions', color='blue')
# 적대적 공격 후 예측값
plt.plot(predictions_adversarial_rescaled, label='Adversarial Predictions', color='red')
plt.legend()
plt.title(f'LSTM Predictions vs Adversarial Predictions (FGSM)\nRMSE: {rmse:.3f}, Adversarial RMSE: {rmse_adversarial:.3f}')
plt.show()
