From 31ef9b0095b16d3003d168311b0ba2d6259272b9 Mon Sep 17 00:00:00 2001 From: Josh Jacobs Date: Tue, 14 Apr 2026 10:37:48 +0100 Subject: [PATCH] fix: replace nested rate-limit retry with loop and add missing docstrings Replace the deeply nested else-if chain for rate-limit retries in the live stream tick loop with a clean retry loop with backoff. The old structure also silently swallowed non-rate-limit errors on the first attempt. Add doc comments to all functions and types missing them across streamdl.go, download_stream.go, grpc_client.go, config.go, config_reader.go, and move_file.go. Closes #555 Closes #554 --- config.go | 4 ++-- config_reader.go | 2 ++ download_stream.go | 5 +++++ grpc_client.go | 2 ++ move_file.go | 2 ++ streamdl.go | 50 ++++++++++++++++++++++------------------------ 6 files changed, 37 insertions(+), 28 deletions(-) diff --git a/config.go b/config.go index c2693c35..915394d8 100644 --- a/config.go +++ b/config.go @@ -1,12 +1,12 @@ package main -// Config defines the root type +// Config represents a streaming site and its list of channels to monitor. type Config struct { Site string `yaml:"site"` Streamers []Streamer `yaml:"channels"` } -// Streamer definition +// Streamer represents a single channel to monitor, with quality and VOD settings. type Streamer struct { User string `yaml:"name"` Quality string `yaml:"quality"` diff --git a/config_reader.go b/config_reader.go index c4108632..43f8b90e 100644 --- a/config_reader.go +++ b/config_reader.go @@ -10,12 +10,14 @@ import ( // fatalf allows tests to override fatal behavior. In production, it maps to log.Fatalf. var fatalf = log.Fatalf +// check logs a fatal error and exits if err is non-nil. func check(e error) { if e != nil { fatalf("%v", e) } } +// readConfig reads the YAML configuration file at loc and returns its raw bytes. func readConfig(loc string) []byte { dat, err := os.ReadFile(loc) check(err) diff --git a/download_stream.go b/download_stream.go index ddf64c1b..cfb6a0b8 100644 --- a/download_stream.go +++ b/download_stream.go @@ -15,6 +15,7 @@ import ( log "github.com/sirupsen/logrus" ) +// getUmask reads the UMASK environment variable (octal), defaulting to 022. func getUmask() int { // Get UMASK from environment, default to 022 if not set umaskStr := os.Getenv("UMASK") @@ -32,6 +33,7 @@ func getUmask() int { return int(umask) } +// createDirWithUmask creates a directory (recursively) with permissions derived from the configured UMASK. func createDirWithUmask(path string) error { // Check if directory already exists if info, err := os.Stat(path); err == nil && info.IsDir() { @@ -58,6 +60,8 @@ func createDirWithUmask(path string) error { return os.Chmod(path, dirPerms) } +// downloadStream records a live stream via FFmpeg, retrying on transient failures. +// It removes the user from the live list on exit and moves the finished file to moveLoc. func downloadStream(user string, url string, outLoc string, moveLoc string, subfolder bool, control <-chan bool, response chan<- bool) { naturalFinish := make(chan error, 1) sigint := make(chan bool) @@ -405,6 +409,7 @@ func sanitizeLog(s string) string { return s } +// redactBetween replaces text between start and end markers with "". func redactBetween(s, start, end string) string { idx := strings.Index(s, start) if idx == -1 { diff --git a/grpc_client.go b/grpc_client.go index f0d48ce8..ca439818 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -23,6 +23,7 @@ type VodResult struct { DurationSeconds int64 } +// getVods calls the gRPC server to list recent VODs for a user on the given site. func getVods(site string, user string, limit int) ([]VodResult, error) { addr := os.Getenv("STREAMDL_GRPC_ADDR") if addr == "" { @@ -83,6 +84,7 @@ func getVods(site string, user string, limit int) ([]VodResult, error) { return results, nil } +// getStream calls the gRPC server to resolve a stream URL for the given site, user, and quality. func getStream(site string, user string, quality string) (string, error) { addr := os.Getenv("STREAMDL_GRPC_ADDR") if addr == "" { diff --git a/move_file.go b/move_file.go index 1e23313b..1652acf2 100644 --- a/move_file.go +++ b/move_file.go @@ -14,6 +14,7 @@ import ( // renameFunc is used to allow testing of cross-device fallbacks by stubbing. var renameFunc = os.Rename +// moveFile moves a file from oldPath to newPath, falling back to copy+delete for cross-device moves. func moveFile(oldPath string, newPath string) error { log.Infof("Moving file from %v to %v", oldPath, newPath) @@ -108,6 +109,7 @@ func moveFile(oldPath string, newPath string) error { return nil } +// isCrossDeviceLink returns true if err indicates an EXDEV (cross-device link) failure. func isCrossDeviceLink(err error) bool { if err == nil { return false diff --git a/streamdl.go b/streamdl.go index f91c47bf..7474f785 100644 --- a/streamdl.go +++ b/streamdl.go @@ -1,3 +1,5 @@ +// Package main implements StreamDL, a daemon that monitors configured streaming +// sites and automatically records live streams and VODs via FFmpeg. package main import ( @@ -174,39 +176,35 @@ func main() { urlsMu.RUnlock() if !exists { log.Tracef("No active URL cached for %s; requesting new stream URL", streamer.User) - url, err := getStream(site.Site, streamer.User, streamer.Quality) - time.Sleep(time.Second * time.Duration(*batchTime)) - if err == nil { - urlsMu.Lock() - urls[streamer.User] = url - urlsMu.Unlock() - log.Debugf("Discovered live stream for user=%s", streamer.User) - go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) - } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited, Sleeping for 30 seconds") - time.Sleep(time.Second * 30) + backoffs := []time.Duration{0, 30 * time.Second, 60 * time.Second} + + for attempt := range backoffs { + if backoffs[attempt] > 0 { + log.Errorf("Rate Limited, Sleeping for %v", backoffs[attempt]) + time.Sleep(backoffs[attempt]) + } + url, err := getStream(site.Site, streamer.User, streamer.Quality) + if attempt == 0 { + time.Sleep(time.Second * time.Duration(*batchTime)) + } + if err == nil { urlsMu.Lock() urls[streamer.User] = url urlsMu.Unlock() + log.Debugf("Discovered live stream for user=%s", streamer.User) go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) - } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited, Sleeping for 60 seconds") - time.Sleep(time.Second * 60) - url, err = getStream(site.Site, streamer.User, streamer.Quality) - if err == nil { - urlsMu.Lock() - urls[streamer.User] = url - urlsMu.Unlock() - go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) - } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User) - } else { - log.Warnf("GetStream failed for user=%s: %v", streamer.User, err) - } - } else { + break + } + + if err.Error() != "rate limited" { log.Warnf("GetStream failed for user=%s: %v", streamer.User, err) + break + } + + if attempt == len(backoffs)-1 { + log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User) } } }