저번 포스팅에서는 SGD를 활용하여 모델 기반 협업 필터링에 대하여 이론적인 부분에 대하여 다루었습니다.
저번 포스팅의 링크는 아래와 같습니다.
[추천시스템] (잠재요인) 모델 기반 협업 필터링(1) - 경사하강법(SGD)
저번에는 추천시스템의 이웃 기반 협업필터링에 관하여 다루었습니다.
my-develop-note.tistory.com
참고한 자료의 링크는 아래와 같습니다.
- https://yamalab.tistory.com/89
- https://yamalab.tistory.com/92
- https://lsjsj92.tistory.com/564
- https://www.kaggle.com/code/chocozzz/00-sgd-1
- https://yeong-jin-data-blog.tistory.com/entry/%EC%B6%94%EC%B2%9C-%EC%95%8C%EA%B3%A0%EB%A6%AC%EC%A6%98-Matrix-Factorization
확률적 경사하강법(SGD) 코드
import numpy as np
class SGD_CF():
def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
self._R = R
self._num_users, self._num_items = R.shape
self._k = k
self._learning_rate = learning_rate
self._reg_param = reg_param
self._epochs = epochs
self._verbose = verbose
def fit(self):
# init latent features
self._U = np.random.normal(size=(self._num_users, self._k))
self._V = np.random.normal(size=(self._num_items, self._k))
# init biases
self._O = np.zeros(self._num_users)
self._P = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])
# train while epochs
self._training_process = []
for epoch in range(self._epochs):
#rating에 존재하는 index를 기준으로 학습(training)
xi, yi = self._R.nonzero()
for i , j in zip(xi, yi):
self.gradient_descent(i, j, self._R[i, j])
cost = self._cost()
self._training_process.append((epoch, cost))
# print status
if self._verbose == True and ((epoch + 1) % 10 == 0):
print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))
def gradient_descent(self, i, j, rating):
# get error
prediction = self.get_prediction(i, j)
error = rating - prediction
# update biases
self._O[i] += self._learning_rate * (error - self._reg_param * self._O[i])
self._P[j] += self._learning_rate * (error - self._reg_param * self._P[j])
# update latent feature
du, dv = self.gradient(error, i, j)
self._U[i, :] += self._learning_rate * du
self._V[j, :] += self._learning_rate * dv
def get_prediction(self, i, j):
return self._b + self._O[i] + self._P[j] + self._U[i, :].dot(self._V[j, :].T)
def gradient(self, error, i, j):
du = (error * self._V[j, :]) - (self._reg_param * self._U[i, :])
dv = (error * self._U[i, :]) - (self._reg_param * self._V[j, :])
return du, dv
def _cost(self):
# xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
# 참고: http://codepractice.tistory.com/90
xi, yi = self._R.nonzero()
predicted = self.get_complete_matrix()
cost = 0
for x, y in zip(xi, yi):
cost += pow(self._R[x, y] - predicted[x, y], 2)
return np.sqrt(cost / len(xi))
def get_complete_matrix(self):
return self._b + self._O[:, np.newaxis] + self._P[np.newaxis:, ] + self._U.dot(self._V.T)
def print_results(self):
"""
print fit results
"""
print("User Latent U:")
print(self._U)
print("Item Latent V:")
print(self._V.T)
print("U x V:")
print(self._U.dot(self._V.T))
print("bias:")
print(self._b)
print("User Latent bias:")
print(self._O)
print("Item Latent bias:")
print(self._P)
print("Final R matrix:")
print(self.get_complete_matrix())
print("Final RMSE:")
print(self._training_process[self._epochs-1][1])
생성자(__init__)
def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
self._R = R
self._num_users, self._num_items = R.shape
self._k = k
self._learning_rate = learning_rate
self._reg_param = reg_param
self._epochs = epochs
self._verbose = verbose
생성자(__init__)함수에서는 SGD_CF 클래스가 인스턴스화 될 때 실행되는 부분입니다.
- self._R : 평점 행렬
- self._num_users : 사용자의 수
- self._num_items : 아이템의 수
- self._k : User, Item Latent 차원 수
- self._learning_rate : 학습률
- self._reg_param : 정규화 계수
- self._epochs : 전체 학습 횟수
- self._verbose : 학습 과정 출력 여부(True : 출력, False : 출력하지 않음)
위의 변수들이 채워지며 클래스내부에서 재사용됩니다.
fit
def fit(self):
# init latent features
self._U = np.random.normal(size=(self._num_users, self._k))
self._V = np.random.normal(size=(self._num_items, self._k))
# init biases
self._O = np.zeros(self._num_users)
self._P = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])
# train while epochs
self._training_process = []
for epoch in range(self._epochs):
#rating에 존재하는 index를 기준으로 학습(training)
xi, yi = self._R.nonzero()
for i , j in zip(xi, yi):
self._gradient_descent(i, j, self._R[i, j])
cost = self.cost()
self._training_process.append((epoch, cost))
# print status
if self._verbose == True and ((epoch + 1) % 10 == 0):
print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))
fit()함수는 SGD를 이용하여 Latent Matrix를 초기화하고 경사하강법을 이용하여 학습하는 함수입니다.
Latent Vector 초기화
# init latent features
self._U = np.random.normal(size=(self._num_users, self._k))
self._V = np.random.normal(size=(self._num_items, self._k))
self._U, self._V는 잠재벡터를 의미합니다.
수식에서는
$$\hat{R} \approx UV^{T}$$
에서의 $U$, $V$를 의미합니다.
np.random.normal()함수를 사용하여 self_U는 (self._num_users, self._k)크기만큼, self_V는 (self._num_items, self._k)크기 만큼 각 행렬이 정규분포 형태로 초기화됩니다.
bias 초기화
# init biases
self._O = np.zeros(self._num_users)
self._P = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])
self._O, self._P는 사용자, 아이템 bias를 의미합니다. bias로 인하여 사용자의 평가 경향을 고려하여 추천할 수 있습니다.
np.zeros()함수를 사용하여 self._O는 self._num_users 만큼, self._P는 self._num_items 만큼 0의 값을 가지는 벡터를 생성해줍니다.
self.b는 평점행렬(self._R)에서 0이 아닌 값들을 찾아 전체 평점 평균을 계산한 값입니다.
수식에서는
빨간색 테두리의 각 변수를 초기화하는 파트입니다.
학습(train)
# train while epochs
self._training_process = []
for epoch in range(self._epochs):
#rating에 존재하는 index를 기준으로 학습(training)
xi, yi = self._R.nonzero()
for i , j in zip(xi, yi):
self._gradient_descent(i, j, self._R[i, j])
cost = self.cost()
self._training_process.append((epoch, cost))
# print status
if self._verbose == True and ((epoch + 1) % 10 == 0):
print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))
학습(train)과정에서는 주어진 self._epochs 만큼 잠재벡터, bias를 학습합니다.
self._training_process 리스트는 (epoch, cost)를 저장합니다. 해당 리스트를 활용하여 최종 RMSE를 출력할 수 있습니다.
가장 먼저 xi, yi를 self._R.nonzero()로 평점행렬(self._R)에서 0이 아닌 부분에 대해서만 값을 추출하게 됩니다.
- 0이 아닌 부분은 사용자가 평점을 부여한 부분이라고 할 수 있습니다.
0이 아닌 부분에 대해서 경사하강법을 수행하게됩니다.
그리고 cost(비용함수)를 계산하고 self._training_process 리스트에 저장합니다.
그리고 verbose가 True라면 학습과정을 출력하고 False라면 출력하지 않습니다.
_gradient_descent
def gradient_descent(self, i, j, rating):
# get error
prediction = self.get_prediction(i, j)
error = rating - prediction
# update biases
self._O[i] += self._learning_rate * (error - self._reg_param * self._O[i])
self._P[j] += self._learning_rate * (error - self._reg_param * self._P[j])
# update latent feature
du, dv = self.gradient(error, i, j)
self._U[i, :] += self._learning_rate * du
self._V[j, :] += self._learning_rate * dv
self_gradient_descent는 경사하강법을 수행하는 함수입니다.
_get_prediction
self._get_prediction()함수로 $\hat{r_{ij}}$를 구하게 됩니다. 수식으로는 아래의 빨간색 테두리부분을 구하게 됩니다.
# get error
prediction = self._get_prediction(i, j)
error = rating - prediction
rating($r_{ij}$)에서 위에서 구한 예측값($\hat{r_{ij}}$)을 뺀값 error($e_{ij}$)를 구합니다. 파란색 테두리 부분에 해당됩니다.
update biases
다음은 bias를 업데이트합니다.
# update biases
self._O[i] += self._learning_rate * (error - self._reg_param * self._O[i])
self._P[j] += self._learning_rate * (error - self._reg_param * self._P[j])
여기서 self._learning_rate는 $\alpha$에 해당하고 self._reg_param은 $\beta$에 해당합니다.
Update Latent Feature
그리고 Latent Vector를 업데이트합니다.
# update latent feature
du, dv = self._gradient(error, i, j)
self._U[i, :] += self._learning_rate * du
self._V[j, :] += self._learning_rate * dv
self._gradient()함수로 du, dv를 구합니다.
여기서 du, dv는 u와 v에 대해 편미분한 값에 해당합니다.
self._gradient() 함수의 코드는 아래와 같습니다.
def _gradient(self, error, i, j):
du = (error * self._V[j, :]) - (self._reg_param * self._U[i, :])
dv = (error * self._U[i, :]) - (self._reg_param * self._V[j, :])
수식에서는 빨간색 테두리(괄호 안쪽)에 해당합니다.
나머지 코드를 확인해보면 self._learning_rate는 $\alpha$에 해당하고 사용자, 아이템 잠재 벡터를 업데이트하고 있습니다.
- 사용자 잠재 벡터 : self._U[i,:]($u_{i}$)
- 아이템 잠재 벡터 : self._V[j,:]($v_{j}$)
_cost(비용함수)
def _cost(self):
xi, yi = self._R.nonzero()
predicted = self._get_complete_matrix()
cost = 0
for x, y in zip(xi, yi):
cost += pow(self._R[x, y] - predicted[x, y], 2)
return np.sqrt(cost / len(xi))
비용함수(cost)는 실제값과 모델이 예측한 값의 차이로 여기서는 rmse로 구현하였습니다.
rsme값이 적을수록 실제값과의 차이가 작다는 의미이고 성능이 좋다고 할 수 있습니다.
식은 아래와 같습니다.
self._R.nonzero()로 실제값에서 0이 아닌 값의 index를 추출합니다.
self._get_complete_matrix()로 최종 Matrix(모델이 예측한 값)를 구합니다.
반복문을 돌면서 실제값과 모델이 예측한 값의 차이를 식에 맞게 계산하고 반환합니다.
_get_complete_matrix
def _get_complete_matrix(self):
return self._b + self._O[:, np.newaxis] + self._P[np.newaxis:, ] + self._U.dot(self._V.T)
위의 식에 해당하고 최종 Matrix를 계산하는 함수입니다.
self._O와 self.P에 np.newaxis가 사용되는 것을 볼 수 있는데 이것은 1차원인 Latent들로 2차원의 R에 행/열 단위 연산을 해주기 위해 차원을 추가하는 역할을 합니다.
np.newaxis의 설명은 링크를 참고해주시면 감사하겠습니다.
'추천시스템' 카테고리의 다른 글
[추천시스템] (잠재요인)모델 기반 협업 필터링(4) - NMF (1) | 2023.01.17 |
---|---|
[추천시스템] (잠재요인)모델 기반 협업 필터링(3) - 특이값 분해(SVD) (0) | 2023.01.13 |
[추천시스템] (잠재요인) 모델 기반 협업 필터링(1) - 경사하강법(SGD) (0) | 2023.01.10 |
[추천시스템] 이웃 기반 협업 필터링(2) - 아이템 기반 (0) | 2023.01.03 |
[추천시스템] 카카오 Mini Reco 기출문제 회고 (2) | 2023.01.01 |