Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Local config files
*.env
.vscode
main.py
frt_prueba.txt

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
148 changes: 128 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,22 @@ También tiene opción `--help` que muestra la ayuda particular de este sub-coma

Usage: enerbitdso usages fetch [OPTIONS] [FRTS]...

╭─ Arguments ────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ frts [FRTS]... List of frt codes separated by ' ' [default: None] │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ * --api-base-url TEXT [env var: ENERBIT_API_BASE_URL] [default: None] [required] │
│ * --api-username TEXT [env var: ENERBIT_API_USERNAME] [default: None] [required] │
│ * --api-password TEXT [env var: ENERBIT_API_PASSWORD] [default: None] [required] │
│ --since [%Y-%m-%d|%Y%m%d] [default: (yesterday)] │
│ --until [%Y-%m-%d|%Y%m%d] [default: (today)] │
│ --timezone TEXT [default: America/Bogota] │
│ --out-format [csv|jsonl] Output file format [default: jsonl] │
│ --frt-file PATH Path file with one frt code per line [default: None] │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Arguments ───────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ frts [FRTS]... List of frt codes separated by ' ' [default: None] │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ * --api-base-url TEXT [env var: ENERBIT_API_BASE_URL] [default: None] [required] │
│ * --api-username TEXT [env var: ENERBIT_API_USERNAME] [default: None] [required] │
│ * --api-password TEXT [env var: ENERBIT_API_PASSWORD] [default: None] [required] │
│ --since [%Y-%m-%d|%Y%m%d] [default: (yesterday)] │
│ --until [%Y-%m-%d|%Y%m%d] [default: (today)] │
│ --timezone TEXT [default: America/Bogota] │
│ --out-format [csv|jsonl] Output file format [default: jsonl] │
│ --frt-file PATH Path file with one frt code per line [default: None] │
│ --connection_timeout INTEGER RANGE The timeout used for HTTP connection in seconds[0<=x<=20][default: 10]│
│ --read_timeout INTEGER RANGE The timeout used for HTTP requests in seconds[60<=x<=120][default: 60]│
│ --help Show this message and exit. │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

# Librería DSO
Expand All @@ -150,14 +152,14 @@ Para poder hacer uso de la librería DSO se debe hacer lo siguiente

Para ello se debe importar el constructor de la siguiente forma:

```txt
```python
from enerbitdso.enerbit import DSOClient
```

La inicialización se debe hacer asi:

```txt
ebconnector = enerbit.DSOClient(
```python
ebconnector = DSOClient(
api_base_url="https://dso.enerbit.me/",
api_username="usuario_del_DSO",
api_password="contraseña_del_DSO",
Expand All @@ -166,16 +168,122 @@ ebconnector = enerbit.DSOClient(

Al tener el objeto ya se pueden realizar consultas de la siguiente forma:

```txt
```python
usage_records = ebconnector.fetch_schedule_usage_records_large_interval(
frt_code=frt_code, since=since, until=until
)
```

Tambien se puede hacer una consulta de perfiles de la siguiente forma:

```txt
```python
schedule_records = ebconnector.fetch_schedule_measurements_records_large_interval(
frt_code=frt_code, since=since, until=until
)
```
```

## Configuración del Cliente DSO

### Parámetros Básicos

```python
ebconnector = DSOClient(
api_base_url="https://dso.enerbit.me/",
api_username="tu_usuario@empresa.com",
api_password="tu_contraseña"
)
```

### Configuración Avanzada con Timeouts

Para mejorar la estabilidad en consultas masivas, especialmente cuando se procesan muchas fronteras, se recomienda configurar timeouts personalizados:

```python
ebconnector = DSOClient(
api_base_url="https://dso.enerbit.me/",
api_username="tu_usuario@empresa.com",
api_password="tu_contraseña",
connection_timeout=20, # Timeout de conexión en segundos (1-60)
read_timeout=120 # Timeout de lectura en segundos (60-300)
)
```

### Parámetros de Timeout

- **connection_timeout**: Tiempo máximo para establecer conexión con el servidor (recomendado: 10-30 segundos)
- **read_timeout**: Tiempo máximo para recibir respuesta del servidor (recomendado: 60-180 segundos)

### Configuración con Variables de Entorno

Una práctica recomendada es usar variables de entorno para las credenciales:

```python
import os

ebconnector = DSOClient(
api_base_url=os.getenv("DSO_HOST", "https://dso.enerbit.me/"),
api_username=os.getenv("DSO_USERNAME"),
api_password=os.getenv("DSO_PASSWORD"),
connection_timeout=20,
read_timeout=120
)
```

Configurar las variables de entorno:

**Linux/macOS:**
```bash
export DSO_HOST="https://dso.enerbit.me/"
export DSO_USERNAME="tu_usuario@empresa.com"
export DSO_PASSWORD="tu_contraseña"
```

**Windows:**
```cmd
set DSO_HOST=https://dso.enerbit.me/
set DSO_USERNAME=tu_usuario@empresa.com
set DSO_PASSWORD=tu_contraseña
```

# Ejemplo de Uso Masivo

## Archivo `example.py`

El repositorio incluye un archivo `example.py` que demuestra cómo procesar múltiples fronteras de manera eficiente usando concurrencia. Este ejemplo es útil para:

- **Procesamiento masivo de fronteras**: Consulta múltiples fronteras en paralelo
- **Manejo de errores**: Implementa reintentos automáticos y reportes de fronteras fallidas
- **Generación de reportes**: Crea archivos Excel con matrices horarias de consumo
- **Monitoreo de progreso**: Muestra el avance del procesamiento cada 500 fronteras

### Características del ejemplo:

- 🚀 **Concurrencia**: Usa ThreadPoolExecutor para procesar múltiples fronteras simultáneamente
- 🔄 **Reintentos**: Implementa backoff exponencial para manejar errores de red
- 📊 **Progreso visual**: Muestra estadísticas de avance durante la ejecución
- 📈 **Salida estructurada**: Genera matrices Excel organizadas por hora, día, mes y año
- ⚠️ **Manejo de errores**: Reporta fronteras fallidas para análisis posterior

### Uso del ejemplo:

1. **Configurar variables de entorno** (como se describe arriba)
2. **Crear archivo de fronteras**: `frt_prueba.txt` con una frontera por línea
3. **Ejecutar el script**:
```bash
python example.py
```

### Archivos generados:

- `Matrices_YYYYMMDD_HHMM.xlsx` - Datos principales organizados por matrices horarias
- `fronteras_fallidas_YYYYMMDD_HHMM.txt` - Lista de fronteras que no se pudieron procesar

### Configuración recomendada para producción:

Para ambientes de producción o consultas masivas, ajusta estos parámetros en el ejemplo:

- **max_workers**: Reduce de 30 a 5-10 para evitar saturar el servidor
- **Timeouts**: Usa connection_timeout=20 y read_timeout=120 como mínimo
- **Intervalos de tiempo**: Limita los rangos de fechas para consultas más eficientes

El archivo `example.py` sirve como base para desarrollar tus propios scripts de procesamiento masivo de datos de enerBit DSO.
138 changes: 138 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from enerbitdso.enerbit import DSOClient
from datetime import datetime as dt
import pandas as pd
import pytz
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

colombia_tz = pytz.timezone('America/Bogota')

ebconnector = DSOClient(
api_base_url=os.getenv("DSO_HOST"),
api_username=os.getenv("DSO_USERNAME"),
api_password=os.getenv("DSO_PASSWORD"),
connection_timeout=20,
read_timeout=120
)
since = dt.strptime("2025-09-04T00:00-05:00", "%Y-%m-%dT%H:%M%z")
until = dt.strptime("2025-09-08T00:00-05:00", "%Y-%m-%dT%H:%M%z")

with open("frt_prueba.txt", "r") as f1:
frontiers = [line.strip() for line in f1 if line.strip()]

usage_records_dict = []
fronteras_fallidas = []

print("Generando archivo...")

def fetch_usage_records(frontier, max_retries=3):
for attempt in range(max_retries):
try:
usage_records = ebconnector.fetch_schedule_usage_records_large_interval(
frt_code=frontier, since=since, until=until
)

if not usage_records:
print(f"[INFO] No se encontraron datos para la frontera {frontier}.")
return []

return [{
"Frontera": usage_record.frt_code if usage_record.frt_code is not None else "SIN_FRONTERA",
"Serial": usage_record.meter_serial,
"time_start": str(usage_record.time_start.astimezone(colombia_tz).strftime('%Y-%m-%d %H:%M:%S%z')),
"time_end": str(usage_record.time_end.astimezone(colombia_tz).strftime('%Y-%m-%d %H:%M:%S%z')),
"kWhD": usage_record.active_energy_imported,
"kWhR": usage_record.active_energy_exported,
"kVarhD": usage_record.reactive_energy_imported,
"kVarhR": usage_record.reactive_energy_exported
} for usage_record in usage_records]

except Exception as e:
if attempt < max_retries - 1:
# Backoff exponencial con jitter
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"[RETRY] Frontera {frontier}, intento {attempt + 1}/{max_retries}. Esperando {wait_time:.1f}s...")
time.sleep(wait_time)
continue
else:
print(f"[ERROR] Error procesando la frontera {frontier} después de {max_retries} intentos: {e}")
fronteras_fallidas.append(frontier)
return []

with ThreadPoolExecutor(max_workers=30) as executor:
future_to_frontier = {executor.submit(fetch_usage_records, frontier): frontier for frontier in frontiers}

processed_count = 0
total_frontiers = len(frontiers)

for future in as_completed(future_to_frontier):
usage_records_dict.extend(future.result())
processed_count += 1

# Mostrar progreso cada 500 fronteras o al final
if processed_count % 500 == 0 or processed_count == total_frontiers:
print(f"📊 Progreso: {processed_count}/{total_frontiers} fronteras procesadas ({processed_count/total_frontiers*100:.1f}%)")

# Generar reporte de fronteras fallidas
if fronteras_fallidas:
timestamp_failed = dt.now().strftime("%Y%m%d_%H%M")
failed_filename = f"fronteras_fallidas_{timestamp_failed}.txt"

with open(failed_filename, "w") as out:
out.write("\n".join(fronteras_fallidas))

print(f"\n❌ {len(fronteras_fallidas)} fronteras fallaron y se guardaron en: {failed_filename}")
print(f"Fronteras exitosas: {total_frontiers - len(fronteras_fallidas)}/{total_frontiers}")
else:
print(f"\n✅ Todas las {total_frontiers} fronteras se procesaron exitosamente.")

if not usage_records_dict:
print("⚠️ No se encontraron registros para ninguna frontera. Terminando script.")
exit()

print("\n🔄 Procesando datos y generando Excel...")

df = pd.DataFrame(usage_records_dict)
df['time_start'] = pd.to_datetime(df['time_start'])

df['Año'] = df['time_start'].dt.year
df['Mes'] = df['time_start'].dt.month
df['Día'] = df['time_start'].dt.day
df['hora_en_punto'] = df['time_start'].dt.hour

cuadrante = ["kWhD", "kWhR", "kVarhD", "kVarhR"]
df_long = df.melt(
id_vars=["Frontera", "Serial", "Año", "Mes", "Día", "hora_en_punto"],
value_vars=cuadrante,
var_name="Tipo",
value_name="valor_cuadrante"
)

horas = list(range(24))
resultado = (
df_long.pivot_table(
index=["Serial", "Frontera", "Tipo", "Año", "Mes", "Día"],
columns="hora_en_punto",
values="valor_cuadrante",
aggfunc="first"
)
.reindex(columns=horas, fill_value=0)
.reset_index()
)
resultado.columns.name = None
resultado = resultado.rename(columns={col: f"Hora {col}" for col in resultado.columns if isinstance(col, int)})

timestamp = dt.now().strftime("%Y%m%d_%H%M")
filename = f"Matrices_{timestamp}.xlsx"
resultado.to_excel(filename, index=False)

print(f"\n✅ Archivo generado correctamente: {filename}")

# Resumen final
print(f"\n📋 RESUMEN FINAL:")
print(f" • Total fronteras: {total_frontiers}")
print(f" • Exitosas: {total_frontiers - len(fronteras_fallidas)}")
print(f" • Fallidas: {len(fronteras_fallidas)}")
print(f" • Registros procesados: {len(usage_records_dict)}")
2 changes: 1 addition & 1 deletion src/enerbitdso/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.19
0.1.20
14 changes: 14 additions & 0 deletions src/enerbitdso/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def fetch(
frt_file: pathlib.Path = typer.Option(
None, help="Path file with one frt code per line"
),
connection_timeout: int = typer.Option(
10,
min=0,
max=20,
help="Config the timeout for HTTP connection (in seconds)",
),
read_timeout: int = typer.Option(
10,
min=0,
max=20,
help="Config the timeout for HTTP requests (in seconds)",
),
meter_serial: str = typer.Option(
None, help="Filter by specific meter serial number"
),
Expand All @@ -89,6 +101,8 @@ def fetch(
api_base_url=api_base_url,
api_username=api_username,
api_password=api_password.get_secret_value(),
connection_timeout=connection_timeout,
read_timeout=read_timeout,
)

today = dt.datetime.now(TZ_INFO).replace(**DATE_PARTS_TO_START_DAY)
Expand Down
Loading