Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package main

// Config defines the root type
// Config represents a streaming site and its list of channels to monitor.
type Config struct {
Site string `yaml:"site"`
Streamers []Streamer `yaml:"channels"`
}

// Streamer definition
// Streamer represents a single channel to monitor, with quality and VOD settings.
type Streamer struct {
User string `yaml:"name"`
Quality string `yaml:"quality"`
Expand Down
2 changes: 2 additions & 0 deletions config_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
// fatalf allows tests to override fatal behavior. In production, it maps to log.Fatalf.
var fatalf = log.Fatalf

// check logs a fatal error and exits if err is non-nil.
func check(e error) {
if e != nil {
fatalf("%v", e)
}
}

// readConfig reads the YAML configuration file at loc and returns its raw bytes.
func readConfig(loc string) []byte {
dat, err := os.ReadFile(loc)
check(err)
Expand Down
5 changes: 5 additions & 0 deletions download_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
log "github.com/sirupsen/logrus"
)

// getUmask reads the UMASK environment variable (octal), defaulting to 022.
func getUmask() int {
// Get UMASK from environment, default to 022 if not set
umaskStr := os.Getenv("UMASK")
Expand All @@ -32,6 +33,7 @@ func getUmask() int {
return int(umask)
}

// createDirWithUmask creates a directory (recursively) with permissions derived from the configured UMASK.
func createDirWithUmask(path string) error {
// Check if directory already exists
if info, err := os.Stat(path); err == nil && info.IsDir() {
Expand All @@ -58,6 +60,8 @@ func createDirWithUmask(path string) error {
return os.Chmod(path, dirPerms)
}

// downloadStream records a live stream via FFmpeg, retrying on transient failures.
// It removes the user from the live list on exit and moves the finished file to moveLoc.
func downloadStream(user string, url string, outLoc string, moveLoc string, subfolder bool, control <-chan bool, response chan<- bool) {
naturalFinish := make(chan error, 1)
sigint := make(chan bool)
Expand Down Expand Up @@ -405,6 +409,7 @@ func sanitizeLog(s string) string {
return s
}

// redactBetween replaces text between start and end markers with "<redacted>".
func redactBetween(s, start, end string) string {
idx := strings.Index(s, start)
if idx == -1 {
Expand Down
2 changes: 2 additions & 0 deletions grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type VodResult struct {
DurationSeconds int64
}

// getVods calls the gRPC server to list recent VODs for a user on the given site.
func getVods(site string, user string, limit int) ([]VodResult, error) {
addr := os.Getenv("STREAMDL_GRPC_ADDR")
if addr == "" {
Expand Down Expand Up @@ -83,6 +84,7 @@ func getVods(site string, user string, limit int) ([]VodResult, error) {
return results, nil
}

// getStream calls the gRPC server to resolve a stream URL for the given site, user, and quality.
func getStream(site string, user string, quality string) (string, error) {
addr := os.Getenv("STREAMDL_GRPC_ADDR")
if addr == "" {
Expand Down
2 changes: 2 additions & 0 deletions move_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// renameFunc is used to allow testing of cross-device fallbacks by stubbing.
var renameFunc = os.Rename

// moveFile moves a file from oldPath to newPath, falling back to copy+delete for cross-device moves.
func moveFile(oldPath string, newPath string) error {
log.Infof("Moving file from %v to %v", oldPath, newPath)

Expand Down Expand Up @@ -108,6 +109,7 @@ func moveFile(oldPath string, newPath string) error {
return nil
}

// isCrossDeviceLink returns true if err indicates an EXDEV (cross-device link) failure.
func isCrossDeviceLink(err error) bool {
if err == nil {
return false
Expand Down
50 changes: 24 additions & 26 deletions streamdl.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package main implements StreamDL, a daemon that monitors configured streaming
// sites and automatically records live streams and VODs via FFmpeg.
package main

import (
Expand Down Expand Up @@ -174,39 +176,35 @@ func main() {
urlsMu.RUnlock()
if !exists {
log.Tracef("No active URL cached for %s; requesting new stream URL", streamer.User)
url, err := getStream(site.Site, streamer.User, streamer.Quality)
time.Sleep(time.Second * time.Duration(*batchTime))
if err == nil {
urlsMu.Lock()
urls[streamer.User] = url
urlsMu.Unlock()
log.Debugf("Discovered live stream for user=%s", streamer.User)
go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response)
} else if err.Error() == "rate limited" {
log.Errorf("Rate Limited, Sleeping for 30 seconds")
time.Sleep(time.Second * 30)
backoffs := []time.Duration{0, 30 * time.Second, 60 * time.Second}

for attempt := range backoffs {
if backoffs[attempt] > 0 {
log.Errorf("Rate Limited, Sleeping for %v", backoffs[attempt])
time.Sleep(backoffs[attempt])
}

url, err := getStream(site.Site, streamer.User, streamer.Quality)
if attempt == 0 {
time.Sleep(time.Second * time.Duration(*batchTime))
}

if err == nil {
urlsMu.Lock()
urls[streamer.User] = url
urlsMu.Unlock()
log.Debugf("Discovered live stream for user=%s", streamer.User)
go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response)
} else if err.Error() == "rate limited" {
log.Errorf("Rate Limited, Sleeping for 60 seconds")
time.Sleep(time.Second * 60)
url, err = getStream(site.Site, streamer.User, streamer.Quality)
if err == nil {
urlsMu.Lock()
urls[streamer.User] = url
urlsMu.Unlock()
go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response)
} else if err.Error() == "rate limited" {
log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User)
} else {
log.Warnf("GetStream failed for user=%s: %v", streamer.User, err)
}
} else {
break
}

if err.Error() != "rate limited" {
log.Warnf("GetStream failed for user=%s: %v", streamer.User, err)
break
}

if attempt == len(backoffs)-1 {
log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User)
}
}
}
Expand Down
Loading