from __future__ import annotations
import numpy as np
from typing import Literal
import warnings
from ..Backend import hardware as HW
[docs]
class Optimizer:
"""
Base class for all optimizers.
Parameters
----------
learning_rate : float, optional
The learning rate for the optimizer.
computational_device : Literal["GPU", "CPU"], optional
The device where computations will be performed.
device_id : int, optional
The ID of the GPU to use if computational_device is "GPU".
"""
def __init__(self, learning_rate: float = None, computational_device:Literal["GPU", "CPU"]=None, device_id: int = None):
self.learning_rate = learning_rate
self.DEVICE_ID = device_id
self.CURRENT_DEVICE = "CPU"
self.COMPUTACIONAL_DEVICE = HW.DEFAULT_COMPUTE_METHOD.split("_")[0]
self.be = HW.be
self._ASNUMPY = HW.asnumpy
if (computational_device != None):
self.COMPUTACIONAL_DEVICE = computational_device
if ((computational_device == "GPU") and not (HW.GPU_ENABLED)):
if (HW.WARNINGS_STRICT_MODE):
raise ValueError("Intentando definir como dispositivo computacional la GPU cuando esta no esta disponible.")
else:
warnings.warn("Intentando definir como dispositivo computacional la GPU cuando esta no esta disponible."+"Cambiando el dispositivo computacional a la CPU ")
self.COMPUTACIONAL_DEVICE = "CPU"
if (self.COMPUTACIONAL_DEVICE == "GPU"):
self.be = HW.cp
self._ASNUMPY = HW.cp.asnumpy
else:
self.be = np
self._ASNUMPY = np.array
self._setup_kernels()
def _refresh_parameters(self, vector_format):
"""
Internal method to refresh internal parameters when changing devices or vector formats.
This method must be implemented by subclasses to ensure that all internal state tensors
(e.g., momentum, velocity) are converted to the correct backend format (NumPy or CuPy)
provided by ``vector_format``.
"""
raise NotImplementedError
def _setup_kernels(self):
"""
Internal method to setup CUDA kernels if needed.
This method should be implemented by subclasses to compile or define any custom CUDA kernels
required for the optimizer when running on a GPU.
"""
pass
def _change_COMPUTACIONAL_DEVICE(self, device:Literal["GPU","CPU"], device_id: int = None):
"""
Internal method to change the computational device (CPU/GPU).
Parameters
----------
device : Literal["GPU", "CPU"]
The new device to use.
device_id : int, optional
The GPU ID to use if device is "GPU".
"""
if not(device in ["GPU","CPU"]):
raise ValueError("Se paso como device algo que no es GPU o CPU.")
if ((device == "GPU") and (not(HW.GPU_ENABLED))):
if (HW.WARNINGS_STRICT_MODE):
raise ValueError("Intentando definir como dispositivo computacional la GPU cuando esta no esta disponible.")
else:
warnings.warn("Intentando definir como dispositivo computacional la GPU cuando esta no esta disponible."+"Cambiando el dispositivo computacional a la CPU ")
device = "CPU"
if ((device == "GPU")and(self.COMPUTACIONAL_DEVICE == "CPU")):
if(HW.WARNINGS_STRICT_MODE):
raise RuntimeError("Se intento cambar a la gpu cuando se tiene como dispositivo computacional la cpu.")
else:
warnings.warn("Se intento cambar a la gpu cuando se tiene como dispositivo computacional la cpu. Se ignoro la peticion por seguridad")
device = "CPU"
if (self.COMPUTACIONAL_DEVICE != device):
self.COMPUTACIONAL_DEVICE = device
if ((device == "GPU")and(HW.GPU_ENABLED)):
self.be = HW.cp
self._ASNUMPY = HW.cp.asnumpy
else:
self.be = np
self._ASNUMPY = np.array
if (device_id != None):
self.DEVICE_ID = device_id
[docs]
def set_gpu_id(self,new_id:int):
"""
Sets the GPU ID for the optimizer.
Parameters
----------
new_id : int
The new GPU ID.
"""
if (new_id >= HW.NUM_GPUS):
raise ValueError("")
if (new_id != self.DEVICE_ID):
self.DEVICE_ID = new_id
if (self.CURRENT_DEVICE == "GPU"):
with HW.be.cuda.Device(self.DEVICE_ID):
self._refresh_parameters(HW.cp.array)
def _to_device(self, device: Literal["GPU", "CPU"]):
"""
Internal method to move optimizer state to a specific device.
Parameters
----------
device : Literal["GPU", "CPU"]
The target device.
"""
if not(device in ["GPU","CPU"]):
raise ValueError("Se paso como device algo que no es GPU o CPU.")
if ((device == "GPU")and(self.COMPUTACIONAL_DEVICE == "CPU")):
if (HW.WARNINGS_STRICT_MODE):
raise ValueError("Se intento cambiar a la GPU cuando se habia definido el dispositivo computacional como CPU")
else:
warnings.warn("Se intento cambiar a la GPU cuando se habia definido el dispositivo computacional como CPU."+"Ingorando peticion por seguridad.")
device = "CPU"
if (device != self.CURRENT_DEVICE):
self.CURRENT_DEVICE = device
if device == "GPU":
with HW.be.cuda.Device(self.DEVICE_ID):
self._refresh_parameters(self.be.array)
else:
self._refresh_parameters(self._ASNUMPY)
[docs]
def step(self, layers: list, inputs):
"""
Performs a single optimization step.
This method must be implemented by subclasses to define the specific optimization logic
(e.g., SGD update, Adam update) applied to the layers.
Parameters
----------
layers : list
List of layers to update.
inputs : Any
Input data (used by some optimizers for gradient calculation context if needed).
"""
raise NotImplementedError
[docs]
def get_state(self):
"""
Returns the internal state of the optimizer.
This method should be implemented by subclasses to return a dictionary containing
the current internal state (e.g., iteration count, moving averages) for serialization.
Returns
-------
dict
Dictionary containing the optimizer state.
"""
self._to_device("CPU")
return {}
[docs]
def set_state(self, state, be):
"""
Sets the internal state of the optimizer.
This method should be implemented by subclasses to restore the internal state
from a provided dictionary.
Parameters
----------
state : dict
The state dictionary to load.
be : module
The backend module (numpy or cupy) to use for creating arrays.
"""
self._to_device("CPU")
pass
[docs]
def get_config(self):
"""
Returns the configuration of the optimizer.
This method should be implemented by subclasses to return a dictionary containing
the configuration parameters necessary to reconstruct the optimizer instance.
Returns
-------
dict
Dictionary containing the configuration parameters.
"""
self._to_device("CPU")
return {'class_name': self.__class__.__name__, 'learning_rate': self.learning_rate}
[docs]
class SgdOptimizer(Optimizer):
"""
Stochastic Gradient Descent (SGD) optimizer.
Parameters
----------
learning_rate : float, optional
The learning rate. Defaults to 0.01.
computational_device : Literal["GPU", "CPU"], optional
The device where computations will be performed.
device_id : int, optional
The ID of the GPU to use if computational_device is "GPU".
"""
_kernel_weights = None
_kernel_bias = None
def __init__(self, learning_rate: float = None,computational_device:Literal["GPU", "CPU"]=None, device_id: int = None):
if (learning_rate is None):
learning_rate = 0.01
super().__init__(learning_rate,computational_device,device_id)
def _refresh_parameters(self, vector_format):
pass
def _setup_kernels(self):
if (HW.GPU_ENABLED):
if (SgdOptimizer._kernel_weights is None):
SgdOptimizer._kernel_weights = HW.cp.ElementwiseKernel(
'T grad, T lr, T mask',
'T param',
'param -= lr * grad * mask',
'sgd_weights_kernel'
)
if (SgdOptimizer._kernel_bias is None):
SgdOptimizer._kernel_bias = HW.cp.ElementwiseKernel(
'T grad, T lr',
'T param',
'param -= lr * grad',
'sgd_bias_kernel'
)
[docs]
def step(self, layers: list, inputs):
"""
Performs a single optimization step using SGD.
Parameters
----------
layers : list
List of layers to update.
inputs : Any
Input data.
"""
self._to_device(self.COMPUTACIONAL_DEVICE)
prev_a = inputs
for layer in layers:
batch_size = layer.delta.shape[1]
grad_b = self.be.mean(layer.delta, axis=1, keepdims=True)
grad_w = self.be.dot(layer.delta, prev_a.T) / batch_size
grad_w_masked = grad_w * layer.connection_mask
if (self.CURRENT_DEVICE == "GPU"):
SgdOptimizer._kernel_weights(grad_w, float(self.learning_rate), layer.connection_mask, layer.weights)
SgdOptimizer._kernel_bias(grad_b, float(self.learning_rate), layer.biases)
else:
grad_w_masked = grad_w * layer.connection_mask
layer.weights -= self.learning_rate * grad_w_masked
layer.biases -= self.learning_rate * grad_b
prev_a = layer.a
[docs]
class AdamOptimizer(Optimizer):
"""
Adam optimizer.
Parameters
----------
learning_rate : float, optional
The learning rate. Defaults to 0.001.
computational_device : Literal["GPU", "CPU"], optional
The device where computations will be performed.
device_id : int, optional
The ID of the GPU to use if computational_device is "GPU".
beta1 : float, optional
The exponential decay rate for the 1st moment estimates. Defaults to 0.9.
beta2 : float, optional
The exponential decay rate for the 2nd moment estimates. Defaults to 0.999.
epsilon : float, optional
A small constant for numerical stability. Defaults to 1e-8.
"""
_fused_kernel = None
def __init__(self, learning_rate: float = None,computational_device:Literal["GPU", "CPU"]=None, device_id: int = None, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 1e-8):
super().__init__(learning_rate,computational_device,device_id)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.t = 0
self.m = None
self.v = None
def _setup_kernels(self):
if ((HW.GPU_ENABLED) and (AdamOptimizer._fused_kernel is None)):
AdamOptimizer._fused_kernel = HW.cp.ElementwiseKernel(
'T grad, T lr, T beta1, T beta2, T eps, T beta1_t, T beta2_t, T mask',
'T param, T m, T v',
'''
T g = grad * mask;
m = beta1 * m + (1.0 - beta1) * g;
v = beta2 * v + (1.0 - beta2) * g * g;
T m_hat = m / (1.0 - beta1_t);
T v_hat = v / (1.0 - beta2_t);
param -= lr * m_hat / (sqrt(v_hat) + eps);
''',
'adam_fused_kernel'
)
def _initialize_state(self, layers: list):
if self.learning_rate is None:
self.learning_rate = 0.001
self.m = []
self.v = []
for layer in layers:
m_w = self.be.zeros_like(layer.weights)
v_w = self.be.zeros_like(layer.weights)
m_b = self.be.zeros_like(layer.biases)
v_b = self.be.zeros_like(layer.biases)
self.m.append({'w': m_w, 'b': m_b})
self.v.append({'w': v_w, 'b': v_b})
def _refresh_parameters(self, vector_format):
if self.m is None or self.v is None:
return
new_m = []
new_v = []
for i in range(len(self.m)):
new_m_layer = {
'w': vector_format(self.m[i]['w']),
'b': vector_format(self.m[i]['b'])
}
new_v_layer = {
'w': vector_format(self.v[i]['w']),
'b': vector_format(self.v[i]['b'])
}
new_m.append(new_m_layer)
new_v.append(new_v_layer)
self.m = new_m
self.v = new_v
[docs]
def step(self, layers: list, inputs):
"""
Performs a single optimization step using Adam.
Parameters
----------
layers : list
List of layers to update.
inputs : Any
Input data.
"""
self._to_device(self.COMPUTACIONAL_DEVICE)
if self.learning_rate is None:
self.learning_rate = 0.001
if self.m is None:
self._initialize_state(layers)
self.t += 1
prev_a = inputs
t_pow_beta1 = self.beta1 ** self.t
t_pow_beta2 = self.beta2 ** self.t
for i, layer in enumerate(layers):
batch_size = layer.delta.shape[1]
grad_b = self.be.mean(layer.delta, axis=1, keepdims=True)
grad_w = self.be.dot(layer.delta, prev_a.T) / batch_size
m_t = self.m[i]
v_t = self.v[i]
if (self.CURRENT_DEVICE == "GPU"):
# Weights
AdamOptimizer._fused_kernel(
grad_w, float(self.learning_rate), float(self.beta1), float(self.beta2), float(self.epsilon),
float(t_pow_beta1), float(t_pow_beta2), layer.connection_mask,
layer.weights, m_t['w'], v_t['w'])
# Biases
AdamOptimizer._fused_kernel(
grad_b, float(self.learning_rate), float(self.beta1), float(self.beta2), float(self.epsilon),
float(t_pow_beta1), float(t_pow_beta2), 1.0,
layer.biases, m_t['b'], v_t['b']
)
else:
grad_w_masked = grad_w * layer.connection_mask
m_t['w'] = self.beta1 * m_t['w'] + (1 - self.beta1) * grad_w_masked
v_t['w'] = self.beta2 * v_t['w'] + (1 - self.beta2) * (grad_w_masked ** 2)
m_w_hat = m_t['w'] / (1 - t_pow_beta1)
v_w_hat = v_t['w'] / (1 - t_pow_beta2)
layer.weights -= self.learning_rate * m_w_hat / (self.be.sqrt(v_w_hat) + self.epsilon)
m_t['b'] = self.beta1 * m_t['b'] + (1 - self.beta1) * grad_b
v_t['b'] = self.beta2 * v_t['b'] + (1 - self.beta2) * (grad_b ** 2)
m_b_hat = m_t['b'] / (1 - t_pow_beta1)
v_b_hat = v_t['b'] / (1 - t_pow_beta2)
layer.biases -= self.learning_rate * m_b_hat / (self.be.sqrt(v_b_hat) + self.epsilon)
prev_a = layer.a
[docs]
def get_state(self):
super().get_state()
if self.m is None:
return {'t': self.t, 'm': None, 'v': None}
m_np = [{'w': self._ASNUMPY(lay['w']), 'b': self._ASNUMPY(lay['b'])} for lay in self.m]
v_np = [{'w': self._ASNUMPY(lay['w']), 'b': self._ASNUMPY(lay['b'])} for lay in self.v]
return {'t': self.t, 'm': m_np, 'v': v_np}
[docs]
def set_state(self, state, be):
super().set_state(state,be)
self.t = state.get('t', 0)
m_data = state.get('m')
v_data = state.get('v')
if m_data is None or v_data is None:
self.m = None
self.v = None
return
self.m = [{'w': be.array(lay['w']), 'b': be.array(lay['b'])} for lay in m_data]
self.v = [{'w': be.array(lay['w']), 'b': be.array(lay['b'])} for lay in v_data]
[docs]
def get_config(self):
config = super().get_config()
config.update({
'beta1': self.beta1,
'beta2': self.beta2,
'epsilon': self.epsilon
})
return config