diff --git a/.gitignore b/.gitignore index c8409f3..3122c83 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Local config files *.env .vscode +main.py +frt_prueba.txt # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/README.md b/README.md index a60cd68..622c5c7 100644 --- a/README.md +++ b/README.md @@ -126,20 +126,22 @@ También tiene opción `--help` que muestra la ayuda particular de este sub-coma Usage: enerbitdso usages fetch [OPTIONS] [FRTS]... -╭─ Arguments ────────────────────────────────────────────────────────────────────────────────────────────────────╮ -│ frts [FRTS]... List of frt codes separated by ' ' [default: None] │ -╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────╮ -│ * --api-base-url TEXT [env var: ENERBIT_API_BASE_URL] [default: None] [required] │ -│ * --api-username TEXT [env var: ENERBIT_API_USERNAME] [default: None] [required] │ -│ * --api-password TEXT [env var: ENERBIT_API_PASSWORD] [default: None] [required] │ -│ --since [%Y-%m-%d|%Y%m%d] [default: (yesterday)] │ -│ --until [%Y-%m-%d|%Y%m%d] [default: (today)] │ -│ --timezone TEXT [default: America/Bogota] │ -│ --out-format [csv|jsonl] Output file format [default: jsonl] │ -│ --frt-file PATH Path file with one frt code per line [default: None] │ -│ --help Show this message and exit. │ -╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Arguments ───────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ frts [FRTS]... List of frt codes separated by ' ' [default: None] │ +╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Options ─────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ * --api-base-url TEXT [env var: ENERBIT_API_BASE_URL] [default: None] [required] │ +│ * --api-username TEXT [env var: ENERBIT_API_USERNAME] [default: None] [required] │ +│ * --api-password TEXT [env var: ENERBIT_API_PASSWORD] [default: None] [required] │ +│ --since [%Y-%m-%d|%Y%m%d] [default: (yesterday)] │ +│ --until [%Y-%m-%d|%Y%m%d] [default: (today)] │ +│ --timezone TEXT [default: America/Bogota] │ +│ --out-format [csv|jsonl] Output file format [default: jsonl] │ +│ --frt-file PATH Path file with one frt code per line [default: None] │ +│ --connection_timeout INTEGER RANGE The timeout used for HTTP connection in seconds[0<=x<=20][default: 10]│ +│ --read_timeout INTEGER RANGE The timeout used for HTTP requests in seconds[60<=x<=120][default: 60]│ +│ --help Show this message and exit. │ +╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ``` # Librería DSO @@ -150,14 +152,14 @@ Para poder hacer uso de la librería DSO se debe hacer lo siguiente Para ello se debe importar el constructor de la siguiente forma: -```txt +```python from enerbitdso.enerbit import DSOClient ``` La inicialización se debe hacer asi: -```txt -ebconnector = enerbit.DSOClient( +```python +ebconnector = DSOClient( api_base_url="https://dso.enerbit.me/", api_username="usuario_del_DSO", api_password="contraseña_del_DSO", @@ -166,7 +168,7 @@ ebconnector = enerbit.DSOClient( Al tener el objeto ya se pueden realizar consultas de la siguiente forma: -```txt +```python usage_records = ebconnector.fetch_schedule_usage_records_large_interval( frt_code=frt_code, since=since, until=until ) @@ -174,8 +176,114 @@ usage_records = ebconnector.fetch_schedule_usage_records_large_interval( Tambien se puede hacer una consulta de perfiles de la siguiente forma: -```txt +```python schedule_records = ebconnector.fetch_schedule_measurements_records_large_interval( frt_code=frt_code, since=since, until=until ) -``` \ No newline at end of file +``` + +## Configuración del Cliente DSO + +### Parámetros Básicos + +```python +ebconnector = DSOClient( + api_base_url="https://dso.enerbit.me/", + api_username="tu_usuario@empresa.com", + api_password="tu_contraseña" +) +``` + +### Configuración Avanzada con Timeouts + +Para mejorar la estabilidad en consultas masivas, especialmente cuando se procesan muchas fronteras, se recomienda configurar timeouts personalizados: + +```python +ebconnector = DSOClient( + api_base_url="https://dso.enerbit.me/", + api_username="tu_usuario@empresa.com", + api_password="tu_contraseña", + connection_timeout=20, # Timeout de conexión en segundos (1-60) + read_timeout=120 # Timeout de lectura en segundos (60-300) +) +``` + +### Parámetros de Timeout + +- **connection_timeout**: Tiempo máximo para establecer conexión con el servidor (recomendado: 10-30 segundos) +- **read_timeout**: Tiempo máximo para recibir respuesta del servidor (recomendado: 60-180 segundos) + +### Configuración con Variables de Entorno + +Una práctica recomendada es usar variables de entorno para las credenciales: + +```python +import os + +ebconnector = DSOClient( + api_base_url=os.getenv("DSO_HOST", "https://dso.enerbit.me/"), + api_username=os.getenv("DSO_USERNAME"), + api_password=os.getenv("DSO_PASSWORD"), + connection_timeout=20, + read_timeout=120 +) +``` + +Configurar las variables de entorno: + +**Linux/macOS:** +```bash +export DSO_HOST="https://dso.enerbit.me/" +export DSO_USERNAME="tu_usuario@empresa.com" +export DSO_PASSWORD="tu_contraseña" +``` + +**Windows:** +```cmd +set DSO_HOST=https://dso.enerbit.me/ +set DSO_USERNAME=tu_usuario@empresa.com +set DSO_PASSWORD=tu_contraseña +``` + +# Ejemplo de Uso Masivo + +## Archivo `example.py` + +El repositorio incluye un archivo `example.py` que demuestra cómo procesar múltiples fronteras de manera eficiente usando concurrencia. Este ejemplo es útil para: + +- **Procesamiento masivo de fronteras**: Consulta múltiples fronteras en paralelo +- **Manejo de errores**: Implementa reintentos automáticos y reportes de fronteras fallidas +- **Generación de reportes**: Crea archivos Excel con matrices horarias de consumo +- **Monitoreo de progreso**: Muestra el avance del procesamiento cada 500 fronteras + +### Características del ejemplo: + +- 🚀 **Concurrencia**: Usa ThreadPoolExecutor para procesar múltiples fronteras simultáneamente +- 🔄 **Reintentos**: Implementa backoff exponencial para manejar errores de red +- 📊 **Progreso visual**: Muestra estadísticas de avance durante la ejecución +- 📈 **Salida estructurada**: Genera matrices Excel organizadas por hora, día, mes y año +- ⚠️ **Manejo de errores**: Reporta fronteras fallidas para análisis posterior + +### Uso del ejemplo: + +1. **Configurar variables de entorno** (como se describe arriba) +2. **Crear archivo de fronteras**: `frt_prueba.txt` con una frontera por línea +3. **Ejecutar el script**: + ```bash + python example.py + ``` + +### Archivos generados: + +- `Matrices_YYYYMMDD_HHMM.xlsx` - Datos principales organizados por matrices horarias +- `fronteras_fallidas_YYYYMMDD_HHMM.txt` - Lista de fronteras que no se pudieron procesar + +### Configuración recomendada para producción: + +Para ambientes de producción o consultas masivas, ajusta estos parámetros en el ejemplo: + +- **max_workers**: Reduce de 30 a 5-10 para evitar saturar el servidor +- **Timeouts**: Usa connection_timeout=20 y read_timeout=120 como mínimo +- **Intervalos de tiempo**: Limita los rangos de fechas para consultas más eficientes + +El archivo `example.py` sirve como base para desarrollar tus propios scripts de procesamiento masivo de datos de enerBit DSO. diff --git a/example.py b/example.py new file mode 100644 index 0000000..3b5688e --- /dev/null +++ b/example.py @@ -0,0 +1,138 @@ +from enerbitdso.enerbit import DSOClient +from datetime import datetime as dt +import pandas as pd +import pytz +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +import time +import random + +colombia_tz = pytz.timezone('America/Bogota') + +ebconnector = DSOClient( + api_base_url=os.getenv("DSO_HOST"), + api_username=os.getenv("DSO_USERNAME"), + api_password=os.getenv("DSO_PASSWORD"), + connection_timeout=20, + read_timeout=120 +) +since = dt.strptime("2025-09-04T00:00-05:00", "%Y-%m-%dT%H:%M%z") +until = dt.strptime("2025-09-08T00:00-05:00", "%Y-%m-%dT%H:%M%z") + +with open("frt_prueba.txt", "r") as f1: + frontiers = [line.strip() for line in f1 if line.strip()] + +usage_records_dict = [] +fronteras_fallidas = [] + +print("Generando archivo...") + +def fetch_usage_records(frontier, max_retries=3): + for attempt in range(max_retries): + try: + usage_records = ebconnector.fetch_schedule_usage_records_large_interval( + frt_code=frontier, since=since, until=until + ) + + if not usage_records: + print(f"[INFO] No se encontraron datos para la frontera {frontier}.") + return [] + + return [{ + "Frontera": usage_record.frt_code if usage_record.frt_code is not None else "SIN_FRONTERA", + "Serial": usage_record.meter_serial, + "time_start": str(usage_record.time_start.astimezone(colombia_tz).strftime('%Y-%m-%d %H:%M:%S%z')), + "time_end": str(usage_record.time_end.astimezone(colombia_tz).strftime('%Y-%m-%d %H:%M:%S%z')), + "kWhD": usage_record.active_energy_imported, + "kWhR": usage_record.active_energy_exported, + "kVarhD": usage_record.reactive_energy_imported, + "kVarhR": usage_record.reactive_energy_exported + } for usage_record in usage_records] + + except Exception as e: + if attempt < max_retries - 1: + # Backoff exponencial con jitter + wait_time = (2 ** attempt) + random.uniform(0, 1) + print(f"[RETRY] Frontera {frontier}, intento {attempt + 1}/{max_retries}. Esperando {wait_time:.1f}s...") + time.sleep(wait_time) + continue + else: + print(f"[ERROR] Error procesando la frontera {frontier} después de {max_retries} intentos: {e}") + fronteras_fallidas.append(frontier) + return [] + +with ThreadPoolExecutor(max_workers=30) as executor: + future_to_frontier = {executor.submit(fetch_usage_records, frontier): frontier for frontier in frontiers} + + processed_count = 0 + total_frontiers = len(frontiers) + + for future in as_completed(future_to_frontier): + usage_records_dict.extend(future.result()) + processed_count += 1 + + # Mostrar progreso cada 500 fronteras o al final + if processed_count % 500 == 0 or processed_count == total_frontiers: + print(f"📊 Progreso: {processed_count}/{total_frontiers} fronteras procesadas ({processed_count/total_frontiers*100:.1f}%)") + +# Generar reporte de fronteras fallidas +if fronteras_fallidas: + timestamp_failed = dt.now().strftime("%Y%m%d_%H%M") + failed_filename = f"fronteras_fallidas_{timestamp_failed}.txt" + + with open(failed_filename, "w") as out: + out.write("\n".join(fronteras_fallidas)) + + print(f"\n❌ {len(fronteras_fallidas)} fronteras fallaron y se guardaron en: {failed_filename}") + print(f"Fronteras exitosas: {total_frontiers - len(fronteras_fallidas)}/{total_frontiers}") +else: + print(f"\n✅ Todas las {total_frontiers} fronteras se procesaron exitosamente.") + +if not usage_records_dict: + print("⚠️ No se encontraron registros para ninguna frontera. Terminando script.") + exit() + +print("\n🔄 Procesando datos y generando Excel...") + +df = pd.DataFrame(usage_records_dict) +df['time_start'] = pd.to_datetime(df['time_start']) + +df['Año'] = df['time_start'].dt.year +df['Mes'] = df['time_start'].dt.month +df['Día'] = df['time_start'].dt.day +df['hora_en_punto'] = df['time_start'].dt.hour + +cuadrante = ["kWhD", "kWhR", "kVarhD", "kVarhR"] +df_long = df.melt( + id_vars=["Frontera", "Serial", "Año", "Mes", "Día", "hora_en_punto"], + value_vars=cuadrante, + var_name="Tipo", + value_name="valor_cuadrante" +) + +horas = list(range(24)) +resultado = ( + df_long.pivot_table( + index=["Serial", "Frontera", "Tipo", "Año", "Mes", "Día"], + columns="hora_en_punto", + values="valor_cuadrante", + aggfunc="first" + ) + .reindex(columns=horas, fill_value=0) + .reset_index() +) +resultado.columns.name = None +resultado = resultado.rename(columns={col: f"Hora {col}" for col in resultado.columns if isinstance(col, int)}) + +timestamp = dt.now().strftime("%Y%m%d_%H%M") +filename = f"Matrices_{timestamp}.xlsx" +resultado.to_excel(filename, index=False) + +print(f"\n✅ Archivo generado correctamente: {filename}") + +# Resumen final +print(f"\n📋 RESUMEN FINAL:") +print(f" • Total fronteras: {total_frontiers}") +print(f" • Exitosas: {total_frontiers - len(fronteras_fallidas)}") +print(f" • Fallidas: {len(fronteras_fallidas)}") +print(f" • Registros procesados: {len(usage_records_dict)}") diff --git a/src/enerbitdso/VERSION b/src/enerbitdso/VERSION index d7d9957..964f548 100644 --- a/src/enerbitdso/VERSION +++ b/src/enerbitdso/VERSION @@ -1 +1 @@ -0.1.19 \ No newline at end of file +0.1.20 \ No newline at end of file diff --git a/src/enerbitdso/cli.py b/src/enerbitdso/cli.py index 5a21300..a7ca7c4 100644 --- a/src/enerbitdso/cli.py +++ b/src/enerbitdso/cli.py @@ -80,6 +80,18 @@ def fetch( frt_file: pathlib.Path = typer.Option( None, help="Path file with one frt code per line" ), + connection_timeout: int = typer.Option( + 10, + min=0, + max=20, + help="Config the timeout for HTTP connection (in seconds)", + ), + read_timeout: int = typer.Option( + 10, + min=0, + max=20, + help="Config the timeout for HTTP requests (in seconds)", + ), meter_serial: str = typer.Option( None, help="Filter by specific meter serial number" ), @@ -89,6 +101,8 @@ def fetch( api_base_url=api_base_url, api_username=api_username, api_password=api_password.get_secret_value(), + connection_timeout=connection_timeout, + read_timeout=read_timeout, ) today = dt.datetime.now(TZ_INFO).replace(**DATE_PARTS_TO_START_DAY) diff --git a/src/enerbitdso/enerbit.py b/src/enerbitdso/enerbit.py index 4f79386..11f3036 100644 --- a/src/enerbitdso/enerbit.py +++ b/src/enerbitdso/enerbit.py @@ -16,7 +16,7 @@ SSL_CONTEXT = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) -TIMEOUT = httpx.Timeout(5, read=60) +TIMEOUT = httpx.Timeout(10, read=60) WATT_HOUR_TO_KILOWATT_HOUR = 0.001 MAX_REQUEST_RANGE = dt.timedelta(days=7) @@ -76,11 +76,16 @@ def _get_token_expiration(token: str) -> Optional[dt.datetime]: return None +def set_http_timeout(connection_timeout, read_timeout): + global TIMEOUT + TIMEOUT = httpx.Timeout(connection_timeout, read=read_timeout) + + class ScheduleUsageRecord(pydantic.BaseModel): - frt_code: str meter_serial: str time_start: dt.datetime time_end: dt.datetime + frt_code: Optional[str] = None active_energy_imported: Optional[float] = None active_energy_exported: Optional[float] = None reactive_energy_imported: Optional[float] = None @@ -88,9 +93,9 @@ class ScheduleUsageRecord(pydantic.BaseModel): class ScheduleMeasurementRecord(pydantic.BaseModel): - frt_code: str meter_serial: str time_local_utc: dt.datetime + frt_code: Optional[str] = None voltage_multiplier: Optional[float] = None current_multiplier: Optional[float] = None active_energy_imported: Optional[float] = None @@ -248,7 +253,7 @@ def get_schedule_measurement_records( class DSOClient: - def __init__(self, api_username: str, api_password: str, api_base_url: str) -> None: + def __init__(self, api_username: str, api_password: str, api_base_url: str, connection_timeout: Optional[int] = None, read_timeout: Optional[int] = None) -> None: self.api_base_url = api_base_url self.api_username = api_username self.api_password = pydantic.SecretStr(api_password) @@ -256,6 +261,15 @@ def __init__(self, api_username: str, api_password: str, api_base_url: str) -> N self._access_token: Optional[str] = None self._refresh_token: Optional[str] = None self._token_expires_at: Optional[dt.datetime] = None + if connection_timeout is not None and (connection_timeout >= 0 and connection_timeout <= 20): + connection_timeout = connection_timeout + else: + connection_timeout = 10 + if read_timeout is not None and (read_timeout >= 60 and read_timeout <= 120): + read_timeout = read_timeout + else: + read_timeout = 60 + set_http_timeout(connection_timeout, read_timeout) def _is_token_valid(self) -> bool: """Verifica si el token actual es válido y no ha expirado"""