Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ jobs:

- name: Run tests
run: pytest tests/ -v
env:
SNACK_GITHUB_CLIENT_ID: ${{ secrets.SNACK_GITHUB_CLIENT_ID }}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "python-snacks"
version = "0.1.4"
version = "0.2.0"
description = "A CLI tool for managing a personal stash of reusable Python code snippets."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
115 changes: 115 additions & 0 deletions snacks/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""GitHub authentication for snack CLI.

Resolution order:
1. GITHUB_TOKEN env var (useful for CI)
2. `gh auth token` (GitHub CLI, if installed and logged in)
3. GitHub OAuth device flow (prompts user in browser)
"""
from __future__ import annotations

import json
import os
import subprocess
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Optional

import typer

# Register at https://github.com/settings/apps/new (Device Flow, no client secret needed)
_CLIENT_ID = os.environ.get("SNACK_GITHUB_CLIENT_ID", "")


def get_github_token() -> Optional[str]:
"""Return a GitHub token, prompting via device flow if needed."""
token = os.environ.get("GITHUB_TOKEN")
if token:
return token

token = _token_from_gh_cli()
if token:
return token

return _device_flow()


def _token_from_gh_cli() -> Optional[str]:
try:
result = subprocess.run(
["gh", "auth", "token"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None


def _device_flow() -> Optional[str]:
if not _CLIENT_ID:
typer.echo(
"[error] No GitHub client ID configured. "
"Set SNACK_GITHUB_CLIENT_ID or GITHUB_TOKEN to authenticate.",
err=True,
)
raise typer.Exit(1)

# Step 1 — request device + user code
data = urllib.parse.urlencode({"client_id": _CLIENT_ID, "scope": "repo"}).encode()
req = urllib.request.Request(
"https://github.com/login/device/code",
data=data,
headers={"Accept": "application/json"},
)
try:
with urllib.request.urlopen(req) as resp:
payload = json.loads(resp.read())
except urllib.error.URLError as e:
typer.echo(f"[error] Could not reach GitHub: {e.reason}", err=True)
raise typer.Exit(1)

device_code = payload["device_code"]
user_code = payload["user_code"]
verification_uri = payload["verification_uri"]
interval = payload.get("interval", 5)
expires_in = payload.get("expires_in", 900)

typer.echo(f"\nOpen: {verification_uri}")
typer.echo(f"Code: {user_code}\n")

# Step 2 — poll until the user approves
deadline = time.time() + expires_in
while time.time() < deadline:
time.sleep(interval)

data = urllib.parse.urlencode({
"client_id": _CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
}).encode()
req = urllib.request.Request(
"https://github.com/login/oauth/access_token",
data=data,
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req) as resp:
result = json.loads(resp.read())

if "access_token" in result:
typer.echo("Authenticated.")
return result["access_token"]

error = result.get("error")
if error == "slow_down":
interval += 5
elif error != "authorization_pending":
typer.echo(f"[error] Authentication failed: {error}", err=True)
raise typer.Exit(1)

typer.echo("[error] Authentication timed out.", err=True)
raise typer.Exit(1)
44 changes: 35 additions & 9 deletions snacks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from snacks.config import SnackConfig, get_stash_path
from snacks.ops import add_remote as do_add_remote
from snacks.ops import pack as do_pack, unpack as do_unpack
from snacks.ops import read_index

app = typer.Typer(
name="snack",
Expand Down Expand Up @@ -65,11 +66,7 @@ def list_snacks(
) -> None:
"""List all snippets in the stash."""
stash = get_stash_path()
snippets = sorted(
p.relative_to(stash).as_posix()
for p in stash.rglob("*.py")
if not any(part.startswith(("_", ".")) for part in p.relative_to(stash).parts)
)
snippets = sorted(read_index(stash))
if category:
snippets = [s for s in snippets if s.startswith(f"{category}/")]
typer.echo("\n".join(snippets) if snippets else "No snippets found.")
Expand All @@ -82,10 +79,8 @@ def search(
"""Search snippet filenames for a keyword."""
stash = get_stash_path()
matches = sorted(
p.relative_to(stash).as_posix()
for p in stash.rglob("*.py")
if keyword.lower() in p.name.lower()
and not any(part.startswith(("_", ".")) for part in p.relative_to(stash).parts)
s for s in read_index(stash)
if keyword.lower() in Path(s).name.lower()
)
typer.echo("\n".join(matches) if matches else f"No snippets matching '{keyword}'.")

Expand Down Expand Up @@ -179,6 +174,37 @@ def stash_move(
cfg.save()


@stash_app.command("delete")
def stash_delete(
name: str = typer.Argument(..., help="Name of the stash to remove from config."),
) -> None:
"""Remove a stash from config (does not delete files on disk)."""
cfg = SnackConfig()
if not cfg.has_stash(name):
typer.echo(
f"[error] No stash named '{name}'. Run 'snack stash list' to see available stashes.",
err=True,
)
raise typer.Exit(1)

was_active = cfg.active_name() == name
cfg.remove_stash(name)

if was_active:
remaining = cfg.stashes()
if remaining:
next_name = next(iter(sorted(remaining)))
cfg.set_active(next_name)
cfg.save()
typer.echo(f"Deleted stash '{name}'. Active stash → '{next_name}'.")
else:
cfg.save()
typer.echo(f"Deleted stash '{name}'. No stashes remaining.")
else:
cfg.save()
typer.echo(f"Deleted stash '{name}'.")


@stash_app.command("add-remote")
def stash_add_remote(
repo: str = typer.Argument(..., help="GitHub repo as 'owner/repo' or a full GitHub URL."),
Expand Down
134 changes: 88 additions & 46 deletions snacks/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@

import typer

_MANIFEST = ".snack_index"


def read_index(stash: Path) -> list[str]:
"""Return all tracked snack paths (relative to stash root)."""
index = stash / _MANIFEST
if not index.exists():
return []
return [l.strip() for l in index.read_text().splitlines() if l.strip()]


def _track(stash: Path, rel_path: str) -> None:
"""Add a path to the stash manifest if not already present."""
index = stash / _MANIFEST
existing = set(read_index(stash))
if rel_path not in existing:
with open(index, "a") as f:
f.write(rel_path + "\n")


def unpack(stash: Path, snippet_path: str, flat: bool, force: bool) -> None:
"""Copy a file from the stash into the current working directory."""
Expand All @@ -30,69 +49,92 @@ def pack(stash: Path, snippet_path: str, force: bool) -> None:

dest = stash / snippet_path
_copy(src, dest, force)
_track(stash, snippet_path)
typer.echo(f"Packed {snippet_path} → {dest}")


def add_remote(stash: Path, repo: str, subdir: Optional[str], force: bool) -> None:
"""Download .py files from a GitHub repo into the stash."""
from snacks.auth import get_github_token

owner, repo_name = _parse_github_repo(repo)
url = f"https://api.github.com/repos/{owner}/{repo_name}/tarball"
headers = {"User-Agent": "python-snacks", "Accept": "application/vnd.github+json"}

typer.echo(f"Fetching {owner}/{repo_name}...")
req = urllib.request.Request(
url,
headers={"User-Agent": "snack-stash", "Accept": "application/vnd.github+json"},
)

try:
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir)
tarball = tmp / "repo.tar.gz"

with urllib.request.urlopen(req) as response:
tarball.write_bytes(response.read())

extract_dir = tmp / "repo"
extract_dir.mkdir()
with tarfile.open(tarball) as tf:
try:
tf.extractall(extract_dir, filter="data")
except TypeError:
tf.extractall(extract_dir) # Python < 3.12

roots = list(extract_dir.iterdir())
if not roots:
typer.echo("[error] Downloaded archive was empty.", err=True)
raise typer.Exit(1)
repo_root = roots[0]

py_files = sorted(
p for p in repo_root.rglob("*.py")
if subdir is None or _is_under_subdir(p.relative_to(repo_root), subdir)
)

if not py_files:
msg = "No Python files found"
if subdir:
msg += f" under '{subdir}'"
typer.echo(msg + ".")
return

for src in py_files:
rel = src.relative_to(repo_root)
_copy(src, stash / rel, force)
typer.echo(f" + {rel.as_posix()}")

typer.echo(f"\nAdded {len(py_files)} file(s) from {owner}/{repo_name}.")
def _make_request() -> urllib.request.Request:
return urllib.request.Request(url, headers=headers)

try:
_download_and_install(stash, owner, repo_name, subdir, force, _make_request())
except urllib.error.HTTPError as e:
typer.echo(f"[error] HTTP {e.code}: {e.reason}", err=True)
raise typer.Exit(1)
if e.code != 404:
typer.echo(f"[error] HTTP {e.code}: {e.reason}", err=True)
raise typer.Exit(1)
# 404 may mean private repo — authenticate and retry once
token = get_github_token()
headers["Authorization"] = f"Bearer {token}"
try:
_download_and_install(stash, owner, repo_name, subdir, force, _make_request())
except urllib.error.HTTPError as e2:
typer.echo(f"[error] HTTP {e2.code}: {e2.reason}", err=True)
raise typer.Exit(1)
except urllib.error.URLError as e:
typer.echo(f"[error] Network error: {e.reason}", err=True)
raise typer.Exit(1)


def _download_and_install(
stash: Path,
owner: str,
repo_name: str,
subdir: Optional[str],
force: bool,
req: urllib.request.Request,
) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir)
tarball = tmp / "repo.tar.gz"

with urllib.request.urlopen(req) as response:
tarball.write_bytes(response.read())

extract_dir = tmp / "repo"
extract_dir.mkdir()
with tarfile.open(tarball) as tf:
try:
tf.extractall(extract_dir, filter="data")
except TypeError:
tf.extractall(extract_dir) # Python < 3.12

roots = list(extract_dir.iterdir())
if not roots:
typer.echo("[error] Downloaded archive was empty.", err=True)
raise typer.Exit(1)
repo_root = roots[0]

py_files = sorted(
p for p in repo_root.rglob("*.py")
if subdir is None or _is_under_subdir(p.relative_to(repo_root), subdir)
)

if not py_files:
msg = "No Python files found"
if subdir:
msg += f" under '{subdir}'"
typer.echo(msg + ".")
return

for src in py_files:
rel = src.relative_to(repo_root)
_copy(src, stash / rel, force)
_track(stash, rel.as_posix())
typer.echo(f" + {rel.as_posix()}")

typer.echo(f"\nAdded {len(py_files)} file(s) from {owner}/{repo_name}.")


def _parse_github_repo(repo: str) -> tuple[str, str]:
repo = repo.strip().rstrip("/")
for prefix in ("https://github.com/", "http://github.com/", "github.com/"):
Expand Down
Loading
Loading