Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/analytics.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ scripting.
```bash
ytstudio analytics overview # channel summary for the last 28 days
ytstudio analytics overview --days 7 # rolling 7-day window
ytstudio analytics overview --no-compare # totals only, no growth deltas
ytstudio analytics video <video-id> # one video's recent performance
ytstudio analytics video <video-id> --days 90 # longer window for one video
```

By default `overview` also compares each metric to the previous equal-length
window and shows the percent change (green up, red down). Pass `--no-compare`
for plain totals. In JSON output the comparison is exposed as `previous`
(the prior window's metrics) and `pct_change` (percent change per metric,
`null` when there is no prior-window baseline).

## Custom queries

`analytics query` is a thin wrapper over the YouTube Analytics
Expand Down Expand Up @@ -55,7 +62,7 @@ Run `ytstudio analytics --help` (or open the
=== "JSON"

```bash
ytstudio analytics overview -o json | jq '.rows[0]'
ytstudio analytics overview -o json | jq '.pct_change'
```

## Quota
Expand Down
91 changes: 84 additions & 7 deletions src/ytstudio/commands/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,44 @@ def fetch_query(
return api(analytics_service.reports().query(**query_params))


# Metrics shown with a period-over-period delta in the overview.
OVERVIEW_COMPARE_METRICS = (
"views",
"estimatedMinutesWatched",
"averageViewDuration",
"likes",
"comments",
)


def _pct_change(current: float, previous: float) -> float | None:
"""Percent change vs the previous window, or None when there is no baseline."""
if previous == 0:
return None
return round((current - previous) / previous * 100, 1)


def _delta_suffix(metrics: dict, previous: dict | None, metric: str) -> str:
"""Coloured ' (+N%)' suffix comparing a metric to the previous window."""
if previous is None:
return ""
current = float(metrics.get(metric, 0))
pct = _pct_change(current, float(previous.get(metric, 0)))
if pct is None:
return dim(" (new)") if current else ""
colour = "green" if pct >= 0 else "red"
sign = "+" if pct >= 0 else ""
return f" [{colour}]({sign}{pct:.0f}%)[/{colour}]"


@app.command()
def overview(
days: int = typer.Option(28, "--days", "-d", help="Number of days to analyze"),
compare: bool = typer.Option(
True,
"--compare/--no-compare",
help="Compare each metric to the previous equal-length window",
),
output: str = typer.Option("table", "--output", "-o", help="Output format: table, json"),
):
"""Get channel overview analytics"""
Expand Down Expand Up @@ -119,8 +154,43 @@ def overview(

metrics = dict(zip(headers, rows[0], strict=False))

# Previous equal-length window ending the day before the current one starts,
# so the two windows do not share a boundary day.
previous = None
pct_change = None
if compare:
prev_end = (datetime.now() - timedelta(days=days + 1)).strftime("%Y-%m-%d")
prev_start = (datetime.now() - timedelta(days=days * 2 + 1)).strftime("%Y-%m-%d")
prev_response = fetch_query(
data_service,
analytics_service,
metric_names=metric_names,
dimension_names=[],
start_date=prev_start,
end_date=prev_end,
days=days,
)
prev_rows = prev_response.get("rows", [])
if prev_rows:
prev_headers = [h["name"] for h in prev_response.get("columnHeaders", [])]
previous = dict(zip(prev_headers, prev_rows[0], strict=False))
pct_change = {
m: _pct_change(float(metrics.get(m, 0)), float(previous.get(m, 0)))
for m in OVERVIEW_COMPARE_METRICS
}

if output == "json":
print(json.dumps({"analytics": metrics, "days": days}, indent=2))
print(
json.dumps(
{
"analytics": metrics,
"days": days,
"previous": previous,
"pct_change": pct_change,
},
indent=2,
)
)
return

views = int(metrics.get("views", 0))
Expand All @@ -131,16 +201,23 @@ def overview(
likes = int(metrics.get("likes", 0))
comments = int(metrics.get("comments", 0))

console.print(f"\n[bold]Channel Analytics[/bold] {dim(f'(last {days} days)')}\n")
subtitle = f"(last {days} days"
subtitle += f" vs previous {days})" if previous is not None else ")"
console.print(f"\n[bold]Channel Analytics[/bold] {dim(subtitle)}\n")
table = create_kv_table()

table.add_row(dim("views"), format_number(views))
table.add_row(dim("watch time"), f"{watch_hours} hours")
table.add_row(dim("avg duration"), f"{avg_secs // 60}:{avg_secs % 60:02d}")
def d(metric: str) -> str:
return _delta_suffix(metrics, previous, metric)

table.add_row(dim("views"), format_number(views) + d("views"))
table.add_row(dim("watch time"), f"{watch_hours} hours" + d("estimatedMinutesWatched"))
table.add_row(
dim("avg duration"), f"{avg_secs // 60}:{avg_secs % 60:02d}" + d("averageViewDuration")
)
table.add_row(dim("subscribers gained"), f"[green]+{subs_gained}[/green]")
table.add_row(dim("subscribers lost"), f"[red]-{subs_lost}[/red]")
table.add_row(dim("likes"), format_number(likes))
table.add_row(dim("comments"), format_number(comments))
table.add_row(dim("likes"), format_number(likes) + d("likes"))
table.add_row(dim("comments"), format_number(comments) + d("comments"))

console.print(table)

Expand Down
106 changes: 106 additions & 0 deletions tests/test_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,112 @@ def test_overview_no_data(self):
assert result.exit_code == 0
assert "No analytics data" in result.output

@staticmethod
def _overview_response(row):
return {
"columnHeaders": [
{"name": "views"},
{"name": "estimatedMinutesWatched"},
{"name": "averageViewDuration"},
{"name": "subscribersGained"},
{"name": "subscribersLost"},
{"name": "likes"},
{"name": "comments"},
],
"rows": [row],
}

def _two_window_services(self, current_row, previous_row):
data_service = MagicMock()
analytics_service = MagicMock()
data_service.channels.return_value.list.return_value.execute.return_value = {
"items": [{"id": "UC_test"}]
}
analytics_service.reports.return_value.query.return_value.execute.side_effect = [
self._overview_response(current_row),
self._overview_response(previous_row),
]
return data_service, analytics_service

def test_overview_compare_deltas_table(self):
# views 12000 vs 10000 -> +20%, avg duration 180 vs 200 -> -10%
data_svc, analytics_svc = self._two_window_services(
[12000, 6000, 180, 42, 3, 770, 25],
[10000, 6000, 200, 42, 3, 700, 25],
)
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(app, ["analytics", "overview"])
assert result.exit_code == 0
assert "+20%" in result.output
assert "-10%" in result.output

def test_overview_compare_json(self):
data_svc, analytics_svc = self._two_window_services(
[12000, 6000, 180, 42, 3, 770, 25],
[10000, 6000, 200, 42, 3, 700, 25],
)
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(app, ["analytics", "overview", "-o", "json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["previous"]["views"] == 10000
assert payload["pct_change"]["views"] == 20.0
assert payload["pct_change"]["averageViewDuration"] == -10.0
assert payload["pct_change"]["likes"] == 10.0

def test_overview_compare_windows_do_not_overlap(self):
# the previous window must end before the current window starts
data_svc, analytics_svc = self._two_window_services(
[12000, 6000, 180, 42, 3, 770, 25],
[10000, 6000, 200, 42, 3, 700, 25],
)
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(app, ["analytics", "overview", "--days", "7"])
assert result.exit_code == 0

calls = analytics_svc.reports.return_value.query.call_args_list
current_start = calls[0].kwargs["startDate"]
previous_end = calls[1].kwargs["endDate"]
assert previous_end < current_start

def test_overview_no_compare(self):
data_svc, analytics_svc = self._mock_overview_services()
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(app, ["analytics", "overview", "--no-compare", "-o", "json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["previous"] is None
assert payload["pct_change"] is None
# only the current window is queried
assert analytics_svc.reports.return_value.query.return_value.execute.call_count == 1

def test_overview_compare_zero_previous(self):
# previous window has no data -> guard against divide-by-zero
data_svc, analytics_svc = self._two_window_services(
[500, 6000, 180, 42, 3, 789, 25],
[0, 0, 0, 0, 0, 0, 0],
)
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(app, ["analytics", "overview", "-o", "json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["pct_change"]["views"] is None

def test_video_not_found(self, mock_auth):
mock_auth.videos.return_value.list.return_value.execute.return_value = {"items": []}
with (
Expand Down