-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconcurrent_example.py
More file actions
158 lines (127 loc) · 5.75 KB
/
concurrent_example.py
File metadata and controls
158 lines (127 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import asyncio
import os
import random
import time
import traceback
from datetime import datetime as dt
from zoneinfo import ZoneInfo
import pandas as pd
from enerbitdso.enerbit import DSOClient
colombia_tz = ZoneInfo("America/Bogota")
since = dt.strptime("2026-02-01T00:00-05:00", "%Y-%m-%dT%H:%M%z")
until = dt.strptime("2026-02-08T00:00-05:00", "%Y-%m-%dT%H:%M%z")
with open("frt_prueba.txt", "r") as f1:
frontiers = [line.strip() for line in f1 if line.strip()]
usage_records_dict: list[dict] = []
fronteras_fallidas: list[str] = []
print("Generando archivo...")
async def fetch_usage_records(ebconnector, frontier, semaphore, fronteras_fallidas, max_retries=3):
for attempt in range(max_retries):
try:
async with semaphore:
usage_records = await ebconnector.fetch_schedule_usage_records_large_interval(
frt_code=frontier, since=since, until=until
)
if not usage_records:
print(f"[INFO] No se encontraron datos para la frontera {frontier}.")
return []
return [
{
"Frontera": usage_record.frt_code if usage_record.frt_code is not None else "SIN_FRONTERA",
"Serial": usage_record.meter_serial,
"time_start": str(usage_record.time_start.astimezone(colombia_tz).strftime("%Y-%m-%d %H:%M:%S%z")),
"time_end": str(usage_record.time_end.astimezone(colombia_tz).strftime("%Y-%m-%d %H:%M:%S%z")),
"kWhD": usage_record.active_energy_imported,
"kWhR": usage_record.active_energy_exported,
"kVarhD": usage_record.reactive_energy_imported,
"kVarhR": usage_record.reactive_energy_exported,
}
for usage_record in usage_records
]
except Exception as e:
if attempt < max_retries - 1:
wait_time = (2**attempt) + random.uniform(0, 1)
print(
f"[RETRY] Frontera {frontier}, intento {attempt + 1}/{max_retries}."
f" Esperando {wait_time:.1f}s...\n{traceback.format_exc()}"
)
await asyncio.sleep(wait_time)
continue
else:
print(f"[ERROR] Error procesando la frontera {frontier} después de {max_retries} intentos: {e}")
fronteras_fallidas.append(frontier)
return []
async def main():
start = time.perf_counter()
semaphore = asyncio.Semaphore(30)
total_frontiers = len(frontiers)
processed_count = 0
async with DSOClient(
api_base_url=os.getenv("DSO_HOST"),
api_username=os.getenv("DSO_USERNAME"),
api_password=os.getenv("DSO_PASSWORD"),
connection_timeout=20,
read_timeout=120,
) as ebconnector:
tasks = [
asyncio.create_task(fetch_usage_records(ebconnector, f, semaphore, fronteras_fallidas)) for f in frontiers
]
for coro in asyncio.as_completed(tasks):
records = await coro
usage_records_dict.extend(records)
processed_count += 1
if processed_count % 500 == 0 or processed_count == total_frontiers:
pct = processed_count / total_frontiers * 100
print(f"📊 Progreso: {processed_count}/{total_frontiers} fronteras procesadas ({pct:.1f}%)")
# Generar reporte de fronteras fallidas
if fronteras_fallidas:
timestamp_failed = dt.now().strftime("%Y%m%d_%H%M")
failed_filename = f"fronteras_fallidas_{timestamp_failed}.txt"
with open(failed_filename, "w") as out:
out.write("\n".join(fronteras_fallidas))
print(f"\n❌ {len(fronteras_fallidas)} fronteras fallaron y se guardaron en: {failed_filename}")
print(f"Fronteras exitosas: {total_frontiers - len(fronteras_fallidas)}/{total_frontiers}")
else:
print(f"\n✅ Todas las {total_frontiers} fronteras se procesaron exitosamente.")
if not usage_records_dict:
print("⚠️ No se encontraron registros para ninguna frontera. Terminando script.")
return
print("\n🔄 Procesando datos y generando Excel...")
df = pd.DataFrame(usage_records_dict)
df["time_start"] = pd.to_datetime(df["time_start"])
df["Año"] = df["time_start"].dt.year
df["Mes"] = df["time_start"].dt.month
df["Día"] = df["time_start"].dt.day
df["hora_en_punto"] = df["time_start"].dt.hour
cuadrante = ["kWhD", "kWhR", "kVarhD", "kVarhR"]
df_long = df.melt(
id_vars=["Frontera", "Serial", "Año", "Mes", "Día", "hora_en_punto"],
value_vars=cuadrante,
var_name="Tipo",
value_name="valor_cuadrante",
)
horas = list(range(24))
resultado = (
df_long.pivot_table(
index=["Serial", "Frontera", "Tipo", "Año", "Mes", "Día"],
columns="hora_en_punto",
values="valor_cuadrante",
aggfunc="first",
)
.reindex(columns=horas, fill_value=0)
.reset_index()
)
resultado.columns.name = None
resultado = resultado.rename(columns={col: f"Hora {col}" for col in resultado.columns if isinstance(col, int)})
timestamp = dt.now().strftime("%Y%m%d_%H%M")
filename = f"Matrices_{timestamp}.xlsx"
resultado.to_excel(filename, index=False)
print(f"\n✅ Archivo generado correctamente: {filename}")
# Resumen final
print("\n📋 RESUMEN FINAL:")
print(f" • Total fronteras: {total_frontiers}")
print(f" • Exitosas: {total_frontiers - len(fronteras_fallidas)}")
print(f" • Fallidas: {len(fronteras_fallidas)}")
print(f" • Registros procesados: {len(usage_records_dict)}")
print(time.perf_counter() - start)
asyncio.run(main())