Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions terraform/lambda-src/cost_report/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import boto3
from botocore.config import Config
from shared.athena import run_query
from shared.constants import USD_TO_NOK, risk_emoji
from shared.slack import get_webhook_url, post_to_slack

Expand All @@ -16,6 +17,9 @@
ssm = boto3.client("ssm")

COST_WEBHOOK_PARAM = os.environ["COST_WEBHOOK_PARAM"]
CUR_DATABASE = os.environ.get("CUR_DATABASE", "")
CUR_TABLE = os.environ.get("CUR_TABLE", "")
ATHENA_WORKGROUP = os.environ.get("ATHENA_WORKGROUP", "")

# ---------------------------------------------------------------------------
# Service categorisation — pattern-based, not hardcoded lists
Expand Down Expand Up @@ -315,7 +319,7 @@ def _llm_structured(prompt, tool_config):


def generate_narrative(this_week, prev_week, mtd, tw_total, pw_total,
mtd_total, projected, month_name):
mtd_total, projected, month_name, resource_summary=""):
"""Generate executive summary. Returns dict with summary/notable or None."""
top_services = sorted(this_week.items(), key=lambda x: x[1], reverse=True)[:8]
svc_summary = ", ".join(f"{s}: ${c:.2f}" for s, c in top_services)
Expand All @@ -341,8 +345,9 @@ def generate_narrative(this_week, prev_week, mtd, tw_total, pw_total,

Top services: {svc_summary}
Biggest movers WoW: {mover_lines or 'no significant changes'}
{resource_summary}

Be specific about which services drove changes. Set notable=true only if there is a meaningful change worth highlighting, false if costs are stable week-over-week."""
Be specific about which services and resources drove changes. Set notable=true only if there is a meaningful change worth highlighting, false if costs are stable week-over-week."""
return _llm_structured(prompt, _NARRATIVE_TOOL)


Expand Down Expand Up @@ -375,6 +380,104 @@ def analyze_spike(this_week, prev_week, tw_total, pw_total):
return _llm_structured(prompt, _SPIKE_TOOL)


# ---------------------------------------------------------------------------
# CUR resource-level drilldown via Athena
# ---------------------------------------------------------------------------
def _friendly_resource_id(resource_id):
"""Shorten an ARN to a readable name."""
if not resource_id:
return "(no resource ID)"
# Strip common ARN prefix, keep the useful tail
if "::" in resource_id:
# S3 bucket: arn:aws:s3:::bucket-name → bucket-name
return resource_id.rsplit(":::", 1)[-1]
if "/" in resource_id:
# ECS/Lambda/etc: ...service/cluster/name → name
parts = resource_id.split("/")
return parts[-1] if len(parts) <= 3 else "/".join(parts[-2:])
if ":" in resource_id:
return resource_id.rsplit(":", 1)[-1]
return resource_id


def get_resource_drilldown(week_start, week_end):
"""Query CUR via Athena for top resources this week. Returns dict or None."""
if not (CUR_DATABASE and CUR_TABLE and ATHENA_WORKGROUP):
return None

year = str(week_start.year)
month = f"{week_start.month:02d}"

# Top 10 resources overall
top_query = f"""
SELECT line_item_resource_id,
line_item_product_code,
COALESCE(resource_tags_user_team, '') as team,
SUM(CAST(line_item_unblended_cost AS double)) as total_cost
FROM "{CUR_DATABASE}"."{CUR_TABLE}"
WHERE year = '{year}' AND month = '{month}'
AND line_item_usage_start_date >= TIMESTAMP '{week_start}'
AND line_item_usage_start_date < TIMESTAMP '{week_end + timedelta(days=1)}'
AND line_item_resource_id != ''
AND line_item_line_item_type = 'Usage'
GROUP BY line_item_resource_id, line_item_product_code,
COALESCE(resource_tags_user_team, '')
HAVING SUM(CAST(line_item_unblended_cost AS double)) >= 0.01
ORDER BY total_cost DESC
LIMIT 10
"""

top_resources = run_query(CUR_DATABASE, top_query, ATHENA_WORKGROUP)
if not top_resources:
return None

return {"top_resources": top_resources}


def build_resource_drilldown_blocks(drilldown):
"""Build Block Kit blocks for resource-level cost drilldown."""
blocks = []
top = drilldown.get("top_resources", [])
if not top:
return blocks

blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text": ":mag: *Resource-Level Drilldown*"}
})

header = [
{"type": "raw_text", "text": "Resource"},
{"type": "raw_text", "text": "Service"},
{"type": "raw_text", "text": "Team"},
{"type": "raw_text", "text": "Cost"},
]
rows = [header]

for item in top:
cost = float(item.get("total_cost", 0))
nok = cost * USD_TO_NOK
rows.append([
{"type": "raw_text", "text": _friendly_resource_id(item.get("line_item_resource_id", ""))},
{"type": "raw_text", "text": item.get("line_item_product_code", "")},
{"type": "raw_text", "text": item.get("team", "(untagged)")},
{"type": "raw_text", "text": f"${cost:.2f} (~{nok:.0f} NOK)"},
])

blocks.append({
"type": "table",
"column_settings": [
{"is_wrapped": True},
{},
{},
{"align": "right"},
],
"rows": rows,
})

return blocks


# ---------------------------------------------------------------------------
# Block Kit builder
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -493,6 +596,14 @@ def build_blocks(this_week, prev_week, mtd, prev_mtd, project_costs, team_costs,

blocks.append({"type": "divider"})

# Resource-level drilldown from CUR (graceful — skipped if unavailable)
drilldown = get_resource_drilldown(tw_start, tw_end)
if drilldown:
blocks.extend(build_resource_drilldown_blocks(drilldown))
blocks.append({"type": "divider"})
else:
logger.info("CUR resource drilldown unavailable — skipping")

# LLM spike root cause (shown as a section — important)
spike_result = analyze_spike(this_week, prev_week, tw_total, pw_total)
if spike_result:
Expand All @@ -506,9 +617,19 @@ def build_blocks(this_week, prev_week, mtd, prev_mtd, project_costs, team_costs,
blocks.append({"type": "divider"})

# LLM narrative + footer as context blocks
resource_summary = ""
if drilldown:
top_res = drilldown.get("top_resources", [])[:5]
resource_summary = "\nTop resources by cost: " + ", ".join(
f"{_friendly_resource_id(r.get('line_item_resource_id',''))} "
f"({r.get('line_item_product_code','')}, {r.get('team','?')}): "
f"${float(r.get('total_cost',0)):.2f}"
for r in top_res
)
narrative_result = generate_narrative(
this_week, prev_week, mtd,
tw_total, pw_total, mtd_total, projected, curr_month_name,
resource_summary=resource_summary,
)

ce_url = cost_explorer_url(tw_start, tw_end)
Expand Down
66 changes: 66 additions & 0 deletions terraform/lambda-src/daily_cost_check/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timedelta, timezone

import boto3
from shared.athena import run_query
from shared.constants import USD_TO_NOK
from shared.slack import get_webhook_url, post_to_slack

Expand All @@ -17,6 +18,9 @@
SPIKE_THRESHOLD = float(os.environ.get("SPIKE_THRESHOLD", "1.2")) # 20% above average
# Minimum daily spend (USD) to qualify as a spike — filters noise on tiny amounts
MIN_SPIKE_AMOUNT = float(os.environ.get("MIN_SPIKE_AMOUNT", "1.00"))
CUR_DATABASE = os.environ.get("CUR_DATABASE", "")
CUR_TABLE = os.environ.get("CUR_TABLE", "")
ATHENA_WORKGROUP = os.environ.get("ATHENA_WORKGROUP", "")


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -129,6 +133,54 @@ def cost_explorer_url(start, end, service=None):
return f"{base}?{urllib.parse.urlencode(params)}"


# ---------------------------------------------------------------------------
# CUR resource drilldown for spiking services
# ---------------------------------------------------------------------------
def get_spike_resources(day, service):
"""Query CUR for top resources in a spiking service. Returns list or []."""
if not (CUR_DATABASE and CUR_TABLE and ATHENA_WORKGROUP):
return []

year = str(day.year)
month = f"{day.month:02d}"
next_day = day + timedelta(days=1)

query = f"""
SELECT line_item_resource_id,
line_item_usage_type,
COALESCE(resource_tags_user_team, '') as team,
SUM(CAST(line_item_unblended_cost AS double)) as total_cost
FROM "{CUR_DATABASE}"."{CUR_TABLE}"
WHERE year = '{year}' AND month = '{month}'
AND line_item_usage_start_date >= TIMESTAMP '{day}'
AND line_item_usage_start_date < TIMESTAMP '{next_day}'
AND line_item_product_code = '{service}'
AND line_item_resource_id != ''
AND line_item_line_item_type = 'Usage'
GROUP BY line_item_resource_id, line_item_usage_type,
COALESCE(resource_tags_user_team, '')
HAVING SUM(CAST(line_item_unblended_cost AS double)) >= 0.01
ORDER BY total_cost DESC
LIMIT 5
"""

return run_query(CUR_DATABASE, query, ATHENA_WORKGROUP)


def _friendly_resource_id(resource_id):
"""Shorten an ARN to a readable name."""
if not resource_id:
return "(no resource ID)"
if ":::" in resource_id:
return resource_id.rsplit(":::", 1)[-1]
if "/" in resource_id:
parts = resource_id.split("/")
return parts[-1] if len(parts) <= 3 else "/".join(parts[-2:])
if ":" in resource_id:
return resource_id.rsplit(":", 1)[-1]
return resource_id


# ---------------------------------------------------------------------------
# Spike detection
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -237,6 +289,16 @@ def build_alert_blocks(spikes, spike_details, yesterday_date):
)
detail_parts.append(f"*By team:* {team_lines}")

resources = detail.get("resources")
if resources:
res_lines = "\n".join(
f" \u2022 {_friendly_resource_id(r.get('line_item_resource_id', ''))}: "
f"${float(r.get('total_cost', 0)):.2f}"
f"{' (' + r['team'] + ')' if r.get('team') else ''}"
for r in resources
)
detail_parts.append(f"*Top resources:*\n{res_lines}")

ce_url = detail.get("url")
if ce_url:
detail_parts.append(f"<{ce_url}|View in Cost Explorer>")
Expand Down Expand Up @@ -295,6 +357,10 @@ def handler(event, context):
detail["team_tags"] = get_tag_breakdown(ce, yesterday, svc, tag_key="team")
except Exception as e:
logger.warning("Team tag query failed for %s: %s", svc, e)
try:
detail["resources"] = get_spike_resources(yesterday, svc)
except Exception as e:
logger.warning("CUR resource query failed for %s: %s", svc, e)
spike_details[svc] = detail

blocks = build_alert_blocks(spikes, spike_details, yesterday)
Expand Down
93 changes: 93 additions & 0 deletions terraform/lambda-src/shared/athena.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Shared Athena query utility for CUR cost analytics."""

import logging
import time

import boto3

logger = logging.getLogger(__name__)


def run_query(database, query, workgroup, timeout_seconds=30):
"""Execute an Athena query synchronously and return rows as list of dicts.

On failure (timeout, query error, missing table), logs a warning and returns
an empty list so callers can gracefully degrade.
"""
athena = boto3.client("athena")

try:
start = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
WorkGroup=workgroup,
)
execution_id = start["QueryExecutionId"]
except Exception as e:
logger.warning("Athena start_query_execution failed: %s", e)
return []

# Poll until done
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
status = athena.get_query_execution(QueryExecutionId=execution_id)
state = status["QueryExecution"]["Status"]["State"]
except Exception as e:
logger.warning("Athena get_query_execution failed: %s", e)
return []

if state == "SUCCEEDED":
break
if state in ("FAILED", "CANCELLED"):
reason = status["QueryExecution"]["Status"].get(
"StateChangeReason", "unknown"
)
logger.warning("Athena query %s: %s", state, reason)
return []

time.sleep(1)
else:
logger.warning("Athena query timed out after %ds", timeout_seconds)
try:
athena.stop_query_execution(QueryExecutionId=execution_id)
except Exception:
pass
return []

# Fetch results with pagination
rows = []
columns = None
next_token = None

while True:
kwargs = {"QueryExecutionId": execution_id}
if next_token:
kwargs["NextToken"] = next_token

try:
result = athena.get_query_results(**kwargs)
except Exception as e:
logger.warning("Athena get_query_results failed: %s", e)
return rows

result_set = result["ResultSet"]

if columns is None:
columns = [
col["Name"] for col in result_set["ResultSetMetadata"]["ColumnInfo"]
]
# First page includes the header row — skip it
data_rows = result_set["Rows"][1:]
else:
data_rows = result_set["Rows"]

for row in data_rows:
values = [d.get("VarCharValue", "") for d in row["Data"]]
rows.append(dict(zip(columns, values)))

next_token = result.get("NextToken")
if not next_token:
break

return rows
Loading