Código fuente para datadis_python.client.datadis_client

"""
Cliente actualizado para la API de Datadis (versión corregida).

Este módulo proporciona un cliente para interactuar con la API de Datadis.
"""

import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import requests

from ..exceptions import APIError, AuthenticationError, DatadisError
from ..models import (
    ConsumptionData,
    ConsumptionResponse,
    ContractData,
    ContractResponse,
    DistributorsResponse,
    MaxPowerData,
    MaxPowerResponse,
    SuppliesResponse,
    SupplyData,
)
from ..utils.constants import (
    API_ENDPOINTS,
    DATADIS_API_BASE,
    DATADIS_BASE_URL,
    DEFAULT_TIMEOUT,
    MAX_RETRIES,
)
from ..utils.validators import (
    validate_date_range,
    validate_distributor_code,
    validate_measurement_type,
    validate_point_type,
)


[documentos] class DatadisClient: """ Cliente actualizado para interactuar con la API de Datadis. :param username: NIF del usuario registrado en Datadis. :type username: str :param password: Contraseña de acceso a Datadis. :type password: str :param timeout: Timeout para requests en segundos. :type timeout: int :param retries: Número de reintentos automáticos. :type retries: int """
[documentos] def __init__( self, username: str, password: str, timeout: int = DEFAULT_TIMEOUT, retries: int = MAX_RETRIES, ): """ Inicializa el cliente. :param username: NIF del usuario registrado en Datadis. :param password: Contraseña de acceso a Datadis. :param timeout: Timeout para requests en segundos. :param retries: Número de reintentos automáticos. """ self.username = username self.password = password self.timeout = timeout self.retries = retries self.base_url = DATADIS_BASE_URL self.api_base = DATADIS_API_BASE self.session = requests.Session() self.token: Optional[str] = None self.token_expiry: Optional[float] = None # Headers por defecto self.session.headers.update( { "User-Agent": "datadis-python-sdk/0.1.0", "Content-Type": "application/json", "Accept": "application/json", } )
def _make_request( self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, authenticated: bool = True, use_form_data: bool = False, ) -> Union[Dict[str, Any], str]: """ Realiza una petición HTTP a la API. :param method: Método HTTP (GET, POST) :param endpoint: Endpoint de la API :param data: Datos para el body de la petición :param params: Parámetros de query string :param authenticated: Si requiere autenticación :param use_form_data: Si usar form data en lugar de JSON :return: Respuesta JSON de la API o texto plano """ if authenticated: self._ensure_authenticated() # Usar URL base apropiada según el endpoint if endpoint.startswith("/nikola-auth"): url = f"{self.base_url}{endpoint}" else: url = f"{self.api_base}{endpoint}" # Agregar delay entre peticiones para evitar rate limiting # (excepto para autenticación) if not endpoint.startswith("/nikola-auth"): time.sleep(0.5) # 500ms entre peticiones normales # Reintentos automáticos for attempt in range(self.retries + 1): try: # Configurar la petición según el tipo de datos if use_form_data and data: # Para autenticación usar form data con headers específicos headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "User-Agent": "datadis-python-sdk/0.1.0", } response = requests.request( method=method, url=url, data=data, params=params, headers=headers, timeout=self.timeout, ) else: # Para peticiones normales usar la sesión con JSON response = self.session.request( method=method, url=url, json=data, params=params, timeout=self.timeout, ) # Manejar respuestas de la API if response.status_code == 200: # Para autenticación, la respuesta es texto plano (JWT) if endpoint.startswith("/nikola-auth"): return response.text.strip() # Para otras peticiones, esperamos JSON try: return response.json() except ValueError: # Si no es JSON válido, devolver como texto return response.text elif response.status_code == 401: # Token expirado, intentar renovar self.token = None if authenticated: self._authenticate() continue else: raise AuthenticationError("Credenciales inválidas") elif response.status_code == 429: # Rate limiting - esperar más tiempo progresivamente if attempt < self.retries: wait_time = min(30, (2**attempt) * 2) # Máximo 30 segundos print( f"Rate limit alcanzado. Esperando {wait_time} segundos..." ) time.sleep(wait_time) continue raise APIError( "Límite de peticiones excedido después de varios reintentos", 429, ) else: # Otros errores HTTP error_msg = f"Error HTTP {response.status_code}" try: error_data = response.json() if "message" in error_data: error_msg = error_data["message"] elif "error" in error_data: error_msg = error_data["error"] except ValueError: # Si no es JSON, usar el texto de la respuesta if response.text: error_msg = response.text raise APIError(error_msg, response.status_code) except requests.RequestException as e: if attempt == self.retries: raise DatadisError(f"Error de conexión: {str(e)}") time.sleep(1) # Este punto nunca debería alcanzarse, pero MyPy requiere retorno explícito raise DatadisError("Error inesperado: se agotaron todos los reintentos") def _authenticate(self) -> None: """Autentica con la API y obtiene token de acceso.""" login_data = {"username": self.username, "password": self.password} try: # La API de Datadis requiere form data, no JSON token = self._make_request( "POST", API_ENDPOINTS["login"], data=login_data, authenticated=False, use_form_data=True, ) # La respuesta es directamente el token JWT como texto if isinstance(token, str) and token: self.token = token self.session.headers["Authorization"] = f"Bearer {self.token}" # Asumir que el token expira en 24 horas (valor típico para JWT) self.token_expiry = time.time() + (24 * 3600) else: raise AuthenticationError("No se recibió token válido en la respuesta") except APIError as e: if e.status_code == 401 or e.status_code == 500: raise AuthenticationError("Credenciales inválidas") raise def _ensure_authenticated(self) -> None: """Asegura que el cliente está autenticado con un token válido.""" if not self.token or ( self.token_expiry and time.time() >= self.token_expiry - 300 ): # Renovar 5 min antes self._authenticate()
[documentos] def get_distributors(self) -> List[Dict[str, Any]]: """Obtiene la lista de distribuidores disponibles usando API v1. :return: Lista de distribuidores (raw response de la API) """ response = self._make_request("GET", API_ENDPOINTS["distributors"]) # Devolver la respuesta directa de la API v1 if isinstance(response, list): return response elif isinstance(response, dict): return [response] if response else [] return []
[documentos] def get_supplies(self) -> List[Dict[str, Any]]: """Obtiene la lista de puntos de suministro disponibles usando API v1. :return: Lista de datos de suministros (raw response de la API) """ response = self._make_request("GET", API_ENDPOINTS["supplies"]) # Devolver la respuesta directa de la API v1 if isinstance(response, list): return response elif isinstance(response, dict) and "supplies" in response: return response["supplies"] return []
[documentos] def get_contract_detail(self, cups: str, distributor_code: str) -> Dict[str, Any]: """Obtiene el detalle del contrato para un CUPS específico usando API v1. :param cups: Código CUPS del punto de suministro :param distributor_code: Código del distribuidor :return: Datos del contrato (raw response de la API) """ params = {"cups": cups, "distributorCode": distributor_code} response = self._make_request("GET", API_ENDPOINTS["contracts"], params=params) # Devolver la respuesta directa de la API v1 return response if isinstance(response, dict) else {}
[documentos] def get_consumption( self, cups: str, distributor_code: str, date_from: str, date_to: str, measurement_type: int = 0, point_type: Optional[int] = None, ) -> List[Dict[str, Any]]: """Obtiene datos de consumo para un CUPS y rango de fechas usando API v1. :param cups: Código CUPS del punto de suministro :param distributor_code: Código del distribuidor :param date_from: Fecha inicial (YYYY/MM) :param date_to: Fecha final (YYYY/MM) :param measurement_type: Tipo de medida (0=hora, 1=cuarto hora) :param point_type: Tipo de punto (obtenido de supplies) :return: Lista de datos de consumo (raw response de la API) """ params = { "cups": cups, "distributorCode": distributor_code, "startDate": date_from, "endDate": date_to, "measurementType": str(measurement_type), } if point_type is not None: params["pointType"] = str(point_type) response = self._make_request( "GET", API_ENDPOINTS["consumption"], params=params ) # Devolver la respuesta directa de la API v1 if isinstance(response, list): return response elif isinstance(response, dict) and "timeCurve" in response: return response["timeCurve"] return []
[documentos] def get_max_power( self, cups: str, distributor_code: str, date_from: str, date_to: str ) -> List[Dict[str, Any]]: """Obtiene datos de potencia máxima para un CUPS y rango de fechas usando API v1. :param cups: Código CUPS del punto de suministro :param distributor_code: Código del distribuidor :param date_from: Fecha inicial (YYYY/MM) :param date_to: Fecha final (YYYY/MM) :return: Lista de datos de potencia máxima (raw response de la API) """ params = { "cups": cups, "distributorCode": distributor_code, "startDate": date_from, "endDate": date_to, } response = self._make_request("GET", API_ENDPOINTS["max_power"], params=params) # Devolver la respuesta directa de la API v1 if isinstance(response, list): return response elif isinstance(response, dict) and "maxPower" in response: return response["maxPower"] return []
[documentos] def close(self) -> None: """Cierra la sesión y libera recursos.""" if self.session: self.session.close() self.token = None self.token_expiry = None
[documentos] def __enter__(self): """Context manager entry.""" return self
[documentos] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close()