import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
Exemplo MLP com PyTorch
Segue a implementação comentada em PyTorch da MLP para solução do problemas das meias luas, equivalente ao código implementado manualmente no Exercício 5.
Iniciando com a importação das bibliotecas:
# Fixando seeds para poder reproduzir os resultados
111)
np.random.seed(111)
torch.manual_seed(111) torch.cuda.manual_seed(
Definindo a função para gerar dados para treinamento:
# Função para garar dados de treinamento
def meias_luas(NA, NB, r1, r2, r3):
"""
dados = meias_luas(NA,NB,r1,r2,r3)
NA: número de pontos da região A
NB: número de pontos da região B
r1, r2 e r3: dados das meias-luas
"""
# total de dados de treinamento
= NA + NB
Nt
# dados das meia luas
= r1 - r3 / 2
rmin = r1 + r3 / 2
rmax
# Pontos da Região A
= np.pi * np.random.rand(NA, 1)
a = np.random.uniform(rmin, rmax, (NA, 1))
rxy = rxy * np.cos(a)
x1A = rxy * np.sin(a)
x2A = np.ones((NA, 1))
dA = np.hstack((x1A, x2A, dA))
pontosA
# Pontos da Região B
= np.pi * np.random.rand(NB, 1)
a = np.random.uniform(rmin, rmax, (NB, 1))
rxy = rxy * np.cos(a) + r1
x1B = -rxy * np.sin(a) - r2
x2B = -np.ones((NB, 1))
dB = np.hstack((x1B, x2B, dB))
pontosB
# Concatenando e embaralhando os dados
= np.vstack((pontosA, pontosB))
dados
np.random.shuffle(dados)
# Figura para mostrar os dados de treino
= plt.subplots()
fig, ax1 ".b")
ax1.plot(x1A, x2A, ".r")
ax1.plot(x1B, x2B, "x_1")
plt.xlabel("x_2")
plt.ylabel(="x", color="0.5")
plt.grid(axis="y", color="0.5")
plt.grid(axis
return dados
Em seguida, criando os dados para treinamento do modelo:
# Gerando dados de treinamento
# Note o uso do sufixo `_np` para facilitar a identificação
# de arrays do NumPy e não confundi-los com tensores do PyTorch
# número de pontos de treinamento da Região A
= 500
NA
# número de pontos de treinamento da Região B
= 500
NB
# número total de dados de treinamento
= NA + NB
Nt
= 10
r1 = 6
r3 = -4
r2
= meias_luas(NA, NB, r1, r2, r3) dados_treino_np
Ajustando os valores dos hiperparâmetros:
# Ajuste de hiperparâmetros
# passo de adaptação da rede MLP
= 0.5
eta
# Tamanho do mini-batch
= 100
Nb
# Número de épocas
= 10000 Ne
O PyTorch utiliza um elemento chamado de DataLoader
para facilitar o carregamento dos dados, embaralhamento e geração dos mini batches. Ele é criado a partir de um iterador, que contém pares de dados no formato (entrada, saída)
:
= torch.tensor(dados_treino_np, dtype=torch.float32)
dados_treino = [
train_set 0, 1]], dados_treino[i, [2]])
(dados_treino[i, [for i in range(dados_treino.shape[0])
]
= torch.utils.data.DataLoader(train_set, batch_size=Nb, shuffle=True) train_loader
Vale notar alguns detalhes do código anterior:
- Os dados de treinamento são armazenados em um tensor chamado
dados_treino
, criado a partir do array NumPy chamadodados_treino_np
. Por padrão, o PyTorch trabalha com precisão de 32 bits e o NumPy, com precisão de 64 bits. Dessa forma, pensando em trabalhar na precisão numérica padrão do PyTorch, é necessário especificardtype=torch.float32
para criar um tensor com precisão de 32 bits a partir de um array com precisão de 64 bits; - Nesse exemplo, o iterador usado para criar o
DataLoader
é chamado detrain_set
; - Ao criar o DataLoader, é fornecido o tamanho do mini batch por meio do argumento
batch_size
e, nesse caso, oDataLoader
será responsável por embaralhar os dados a cada época, de acordo com o argumentoshuffle
, configurado comoTrue
;
O modelo é definido por meio de uma classe que herda de nn.Module
:
class Model(nn.Module):
# Geralmente, os blocos da rede são definidos no método __init__()
def __init__(self):
# Necessário chamar __init__() da classe mãe
super().__init__()
# Uma das formas de se definir um modelo é a sequencial
self.model = nn.Sequential(
# Entrada com 2 elementos, conectada a 3 neurônios
2, 3),
nn.Linear(# Função de ativação Tanh
nn.Tanh(),
# Saídas de 3 neurônios conectadas a 5 neurônios
3, 5),
nn.Linear(
nn.Tanh(),
5, 5),
nn.Linear(
nn.Tanh(),
5, 2),
nn.Linear(
nn.Tanh(),
2, 1),
nn.Linear(
nn.Tanh(),
)
# O método forward() define como é feito o cálculo progressivo
# para obter a saída da rede, a partir da entrada x.
# Nesse caso, como foi definido um modelo sequencial em
# self.model, basta chamar self.model(x)
def forward(self, x):
= self.model(x)
output return output
Uma das vantagens de se usar o PyTorch para desenvolver aplicações de aprendizado de máquina é a possibilidade do uso de GPUs para acelerar o processo computação de forma simples
Geralmente é criado um objeto chamado device
que aponta para a GPU, caso ela exista ou para a CPU, caso contrário, como mostrado a seguir:
= torch.device("cuda:0" if torch.cuda.is_available() else "cpu") device
Com esse objeto criado, basta chamar o método .to(device=device)
de tensores ou modelos, para enviá-los à GPU, caso ela exista.
Tendo a classe do modelo definida, é necessário instanciar um objeto para representá-lo. Na linha a seguir, o modelo é instanciado e enviado à GPU com o método .to()
, caso ela exista:
= Model().to(device=device) model
Definindo a função custo e o otimizador:
= nn.MSELoss()
loss_function
= torch.optim.SGD(model.parameters(), lr=eta) optimizer
Nesse caso, utiliza-se a função custo do erro quadrático médio MSELoss
e o otimizador baseado no gradiente descendente estocástico SGD
. O PyTorch conta com uma série de outras fuções custo e otimizadores que podem ser utilizados. Para referência, consulte:
- Funções custo: https://pytorch.org/docs/stable/nn.html#loss-functions
- Otimizadores:https://pytorch.org/docs/stable/optim.html#algorithms
Um diferencial do PyTorch é que o treinamento deve ser feito explicitamente, com um loop para as épocas e outro para os mini batches:
# Lista usada para guardar o valor da função custo ao longo das iterações
= []
losses
# Loop das épocas
for epoch in range(Ne):
# Loop dos mini batches - note que é usado o DataLoader para obter
# os sinais de entrada e desejado, X e d
for n, (X, d) in enumerate(train_loader):
# Envia os dados para a GPU, caso ela exista
= X.to(device=device)
X = d.to(device=device)
d
# Coloca o modelo em modo treinamento. Isso não é necessário nesse
# caso, pois não estamos fazendo validação. Mas é interessante manter
# a linha para lembrar desse detalhe
model.train()
# Zera informações de gradientes: por padrão o PyTorch acumula os
# gradientes a cada chamada de loss.backward(). Na maioria dos casos,
# estamos interessados apenas no último valor dos gradientes
model.zero_grad()
# Calcula a saída
= model(X)
y
# Calcula o valor da função custo
= loss_function(y, d)
loss
# Calcula os gradientes
loss.backward()
# Atualiza os pesos do modelo, de acordo com as regras
# do otimizador escolhido
optimizer.step()
# Armazena o valor da função custo
losses.append(loss.item())
# Mostra o valor da função custo a cada 500 épocas
if epoch % 500 == 0 and n == dados_treino.shape[0]//Nb - 1:
print(f"Época: {epoch} Loss: {loss}")
plt.figure()
plt.plot(losses)"Batch")
plt.xlabel("Loss") plt.ylabel(
Época: 0 Loss: 0.7539945840835571
Época: 500 Loss: 0.0003208783164154738
Época: 1000 Loss: 1.0246196325169876e-05
Época: 1500 Loss: 5.569074801314855e-06
Época: 2000 Loss: 3.858567197312368e-06
Época: 2500 Loss: 2.9008722322032554e-06
Época: 3000 Loss: 2.3132092792366166e-06
Época: 3500 Loss: 1.960377858267748e-06
Época: 4000 Loss: 1.72843181189819e-06
Época: 4500 Loss: 1.5078306887517101e-06
Época: 5000 Loss: 1.387525685458968e-06
Época: 5500 Loss: 1.1053163007090916e-06
Época: 6000 Loss: 1.1061830491598812e-06
Época: 6500 Loss: 1.003904003482603e-06
Época: 7000 Loss: 9.989685167965945e-07
Época: 7500 Loss: 8.669725275467499e-07
Época: 8000 Loss: 8.007504561646783e-07
Época: 8500 Loss: 8.064585585998429e-07
Época: 9000 Loss: 7.04885906088748e-07
Época: 9500 Loss: 6.891851853652042e-07
Text(0, 0.5, 'Loss')
Para testar o modelo, geramos dados de teste:
# Dados de teste
= 1000
NAt = 1000
NBt = NAt + NBt
Nteste
= meias_luas(NAt, NBt, r1, r2, r3) dados_teste
Convertemos os arrays do NumPy para tensores do PyTorch e enviamos os dados para a GPU, caso ela exista:
= torch.tensor(dados_teste[:,[0,1]], dtype=torch.float32).to(device=device)
xteste = torch.tensor(dados_teste[:,[2]], dtype=torch.float32).to(device=device) dteste
Calculamos a saída do modelo considerando os dados de teste como entrada e convertemos a saída para um array do NumPy:
= model(xteste)
yteste = yteste.cpu().detach().numpy() yteste_np
Note que, para obter o array do NumPy, é necessário:
- Chamar o método
.cpu()
para trazer de volta os dados da GPU, caso ela exista; - Chamar o método
.detach()
para tirar o tensor do grafo computacional. Isso é necessário para que não sejam calculados os gradientes referentes às operações que eventualmente sejam feitas comyteste
. Na prática, quase sempre que seja necessário converter um tensor PyTorch para um array NumPy, será necessário chamar o método .detach() antes; - Chamar o método
.numpy()
para converter os dados para um array do NumPy.
Também é possível plotar a fronteira de separação, de forma semelhante à utilizada anteriormente. A diferença é a necessidade da conversão dos dados para tensores do PyTorch para utilizar o modelo e a conversão de volta para arrays do NumPy para plotar o gráfico com o Matplotlib:
# Gera a curva de separação das duas regiões
# Dados da curva de separação
= 100
Nsep = np.linspace(-15, 25, Nsep).reshape(-1, 1)
x1S = np.linspace(-10, 15, Nsep).reshape(-1, 1)
x2S
# Gera pontos da grade
= np.meshgrid(x1S, x2S)
xx1S, xx2S = xx1S.reshape(-1, 1)
xx1S = xx2S.reshape(-1, 1)
xx2S
# Gera array x
= len(xx1S)
Ngrid = np.hstack((xx1S, xx2S))
xgrid_np
# Calcula saída para cada ponto da grade
= torch.tensor(xgrid_np, dtype=torch.float32).to(device=device)
xgrid = model(xgrid)
ygrid = torch.sign(ygrid)
ygrid_dec
= ygrid.cpu().detach().numpy()
ygrid_np = ygrid_dec.cpu().detach().numpy()
ygrid_dec_np
= xteste.cpu().detach().numpy()
xteste_np = dteste.cpu().detach().numpy()
dteste_np
# Plota os pontos principais
= plt.subplots()
fig, ax2 for i in range(Nteste):
if dteste_np[i] == 1:
0], xteste_np[i, 1], ".b")
ax2.plot(xteste_np[i, else:
0], xteste_np[i, 1], ".r")
ax2.plot(xteste_np[i,
# Plota pontos da grade com saída 0 (usa transparência alpha)
= np.where(ygrid_dec_np == -1)[0]
l0 0], xgrid_np[l0, 1], "r.", alpha=0.1)
ax2.plot(xgrid_np[l0,
# Plota pontos da grade com saída 1 (usa transparência alpha)
= np.where(ygrid_dec_np == 1)[0]
l1 0], xgrid_np[l1, 1], "b.", alpha=0.1) ax2.plot(xgrid_np[l1,
Calculando a taxa de erros:
= np.sign(yteste_np)
yteste_np_dec
= np.sum(np.absolute(dteste_np - yteste_np_dec)) * 100 / (2 * Nteste)
Taxa_de_erro
print(f"Taxa de erro: {Taxa_de_erro}")
Taxa de erro: 0.05
Validação cruzada hold-out
É interessante usar um conjunto de dados de validação durante o treinamento para observar se não está ocorrendo overfitting do modelo:
= 50
NAv = 150
NBv = meias_luas(NAv, NBv, r1, r2, r3)
dados_val
= torch.tensor(dados_val[:,[0,1]], dtype=torch.float32).to(device=device)
X_val = torch.tensor(dados_val[:,[2]], dtype=torch.float32).to(device=device) d_val
Segue um exemplo da rotina de treinamento considerando a etapa de validação. As diferenças em relação à rotina mostrado anteriormente estão destacadas. Vale notar alguns detalhes:
- A necessidade de colocar o modelo em modo treinamento (train) para atualizar os pesos e inferência (eval) para calcular o valor da função custo de validação;
- Para o cálculo da saída e do valor da função custo, não é necessário calcular gradientes.
# Reiniciando o modelo e otimizador
= Model().to(device=device)
model = torch.optim.SGD(model.parameters(), lr=eta)
optimizer
# Listas para guardar o valor da função custo
# no treinamento e validação ao longo das iterações
= []
losses = []
val_losses
for epoch in range(Ne):
for n, (X, d) in enumerate(train_loader):
= X.to(device=device)
X = d.to(device=device)
d
# Necessário colocar o modelo em modo treinamento
# na etapa de treinamento
model.train()
model.zero_grad()= model(X)
y = loss_function(y, d)
loss
loss.backward()
optimizer.step()
# Validação
# Necessário colocar o modelo em modo de inferência (eval)
# pois algumas camadas têm comportamento diferente para inferência,
# por exemplo, o Dropout.
eval()
model.
# Cálculo da saída e valor da função custo com os dados de validação
# Nesse caso, não é necessário calcular gradientes, por isso é utilizado
# o bloco with torch.no_grad():
with torch.no_grad():
= model(X_val)
y_val = loss_function(y_val, d_val)
val_loss
# Armazena o valor da função custo de treinamento e validação
losses.append(loss.item())
val_losses.append(val_loss.item())
# Mostra os valores da função custo de treinamento e validação
# a cada 500 épocas
if epoch % 500 == 0 and n == dados_treino.shape[0]//Nb - 1:
print(f"Epoch: {epoch} Loss: {loss} Val. Loss: {val_loss}")
plt.figure()
plt.plot(losses)=0.8)
plt.plot(val_losses, alpha"Loss", "Val. Loss"])
plt.legend(["Batch")
plt.xlabel("Loss") plt.ylabel(
Epoch: 0 Loss: 0.4288952946662903 Val. Loss: 0.4373363256454468
Epoch: 500 Loss: 0.1020800769329071 Val. Loss: 0.1234327182173729
Epoch: 1000 Loss: 0.18264535069465637 Val. Loss: 0.36254554986953735
Epoch: 1500 Loss: 0.18082109093666077 Val. Loss: 0.2632644772529602
Epoch: 2000 Loss: 0.2237672060728073 Val. Loss: 0.3758035898208618
Epoch: 2500 Loss: 0.1505267173051834 Val. Loss: 0.07806681096553802
Epoch: 3000 Loss: 0.10975215584039688 Val. Loss: 0.06523850560188293
Epoch: 3500 Loss: 0.12872742116451263 Val. Loss: 0.07306171208620071
Epoch: 4000 Loss: 0.19222404062747955 Val. Loss: 0.08486053347587585
Epoch: 4500 Loss: 0.07514382153749466 Val. Loss: 0.07600489258766174
Epoch: 5000 Loss: 0.10347875207662582 Val. Loss: 0.06272117793560028
Epoch: 5500 Loss: 0.07646853476762772 Val. Loss: 0.06404448300600052
Epoch: 6000 Loss: 0.07575170695781708 Val. Loss: 0.06895849108695984
Epoch: 6500 Loss: 0.09453247487545013 Val. Loss: 0.06083502247929573
Epoch: 7000 Loss: 0.08741973340511322 Val. Loss: 0.07605186849832535
Epoch: 7500 Loss: 0.22517231106758118 Val. Loss: 0.05372380092740059
Epoch: 8000 Loss: 0.12835191190242767 Val. Loss: 0.05745967850089073
Epoch: 8500 Loss: 0.11189435422420502 Val. Loss: 0.0843060091137886
Epoch: 9000 Loss: 0.08424719423055649 Val. Loss: 0.12814004719257355
Epoch: 9500 Loss: 0.07224565744400024 Val. Loss: 0.12872619926929474
Text(0, 0.5, 'Loss')
Nesse caso, note que o valor da função custo utilizando o conjunto de validação aumenta por volta da iteração 80000, o que indica overfitting.
Utilizando outras funções de ativação
O PyTorch disponibiliza uma lista grande de funções de ativação que podem ser utilizadas. Para referência, acesse https://pytorch.org/docs/stable/nn.html#non-linear-activations-weighted-sum-nonlinearity.
Para usar ReLU, por exemplo, utiliza-se a função nn.ReLU
:
class Model(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
2, 3),
nn.Linear(
nn.ReLU(),3, 5),
nn.Linear(
nn.ReLU(),5, 2),
nn.Linear(
nn.ReLU(),2, 1),
nn.Linear(
nn.Tanh(),
)
def forward(self, x):
= self.model(x)
output return output
Outra função muito usada é a sigmoide, nos casos em que é necessário limitar uma saída entre 0 e 1. A implementação é feita com nn.Sigmoid()
:
class Model(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
2, 3),
nn.Linear(
nn.Sigmoid(),3, 5),
nn.Linear(
nn.Sigmoid(),5, 2),
nn.Linear(
nn.Sigmoid(),2, 1),
nn.Linear(
nn.Sigmoid(),
)
def forward(self, x):
= self.model(x)
output return output
Utilizando dropout
Para utilizar Dropout, basta adicionar camadas do tipo nn.Dropout(n)
, em que n
representa a proporção de neurônios que deve ser desativada. No exemplo a seguir é utilizado Dropout de 10% para os neurônios da primeira camada:
class Model(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
2, 3),
nn.Linear(
nn.Tanh(),0.1),
nn.Dropout(3, 5),
nn.Linear(
nn.Tanh(),5, 2),
nn.Linear(
nn.Tanh(),2, 1),
nn.Linear(
nn.Tanh(),
)
def forward(self, x):
= self.model(x)
output return output
Utilizando o otimizador Adam
Para utilizar o Adam, basta criar o otimizador usando torch.optim.Adam
:
= torch.optim.Adam(model.parameters(), lr=eta, betas=(0.9, 0.999)) optimizer
Utilizando a função custo da entropia cruzada
A função custo da entropia cruzada binária é implementada pela classe torch.nn.BCELoss
. Para utilizá-la, basta definir:
= nn.BCELoss() loss_function
No caso de classificação multiclasse, pode-se utilizar a classe torch.nn.CrossEntropyLoss
. Vale notar que essa função custo espera comparar um vetor de \(C\) posições com um número de \(0\) a \(C-1\).
Inicialização de pesos
Uma forma de inicializar os pesos da rede é definir uma função para inicializar os parâmetros de um elemento do modelo e usar o método .apply()
para aplicar essa função à todos os elementos do modelo:
class Model(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
2, 3),
nn.Linear(
nn.Tanh(),3, 5),
nn.Linear(
nn.Tanh(),5, 2),
nn.Linear(
nn.Tanh(),2, 1),
nn.Linear(
nn.Tanh(),
)
def forward(self, x):
= self.model(x)
output return output
= torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
= Model().to(device=device)
model
def weights_init(m):
= m.__class__.__name__
classname if classname.find('Linear') != -1:
torch.nn.init.xavier_normal_(m.weight)
torch.nn.init.zeros_(m.bias)
apply(weights_init) model.
Model(
(model): Sequential(
(0): Linear(in_features=2, out_features=3, bias=True)
(1): Tanh()
(2): Linear(in_features=3, out_features=5, bias=True)
(3): Tanh()
(4): Linear(in_features=5, out_features=2, bias=True)
(5): Tanh()
(6): Linear(in_features=2, out_features=1, bias=True)
(7): Tanh()
)
)
Nesse exemplo, é usada a inicialização de Xavier, com distribuição Gaussiana (torch.nn.init.xavier_normal_
). Além dela, há diversas outras alternativas, listadas em https://pytorch.org/docs/stable/nn.init.html.