diff --git a/.coderabbit.yaml b/.coderabbit.yaml new file mode 100644 index 00000000..d293b19d --- /dev/null +++ b/.coderabbit.yaml @@ -0,0 +1,8 @@ +reviews: + auto_review: + enabled: true + base_branches: + - staging + path_filters: + - "!stream_pb2.py" + - "!stream_pb2_grpc.py" diff --git a/README.md b/README.md index 948e8526..559c3f72 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ Why not get some use out of it? Archivists everywhere, rejoice! | `-batch` | Time betwen URL checks (seconds): increase for rate limiting | `5` | | `-subfolder` | Add streams to a subfolder with the channel name | `false` | | `-log-level` | Set logging level (debug, info, warn, error, etc) | `info` | +| `-data` | Directory for persistent data (VOD tracking database) | `/app/data` | +| `-vod-out` | Output location for VOD downloads (defaults to `-out`) | Same as `-out` | +| `-vod-move` | Move location for completed VOD downloads (defaults to `-move`) | Same as `-move` | ## Install @@ -113,6 +116,40 @@ Basic YAML format. See `config/config.yaml.example` for a couple of test sites. quality: best ``` +## VOD Downloads (Twitch) + +StreamDL can download past broadcasts (VODs) from Twitch. Enable per-channel with the `vod` option: + +```yaml +- site: twitch.tv + channels: + - name: day9tv + quality: best + vod: true + vod_limit: 5 # Check the 5 most recent VODs (default: 10) +``` + +**How it works:** +- On each tick, StreamDL checks for new VODs using yt-dlp +- Downloaded VODs are tracked in a SQLite database (default `/app/data/streamdl.db`, configurable via `-data`) to avoid re-downloading +- In-progress downloads are tracked so interrupted downloads are retried after a stale threshold +- VOD files are named: `{user}_vod_{id}_{title}.mp4` +- Stream copy is used by default (no re-encoding) for fast downloads + +**Docker volume:** Mount the data directory to persist the VOD tracking database across container restarts. If using the default `-data` path: + +```yaml +volumes: + - ./data:/app/data +``` + +**Separate output directories:** Use `-vod-out` and `-vod-move` to send VODs to a different location than live streams. If not set, VODs use the same `-out` and `-move` directories. + +**Notes:** +- `vod: true` and live streaming are mutually exclusive per channel entry +- To download both live streams and VODs, add the same channel twice with different modes +- Currently supported for Twitch only + ## Environment Variables StreamDL supports configuration through environment variables for certain system-level settings. diff --git a/config.go b/config.go index 691cac8a..c2693c35 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,8 @@ type Config struct { // Streamer definition type Streamer struct { - User string `yaml:"name"` - Quality string `yaml:"quality"` + User string `yaml:"name"` + Quality string `yaml:"quality"` + VOD bool `yaml:"vod"` + VODLimit int `yaml:"vod_limit"` } diff --git a/config/config.yml.example b/config/config.yml.example index f5073cc0..3ba4eeb3 100644 --- a/config/config.yml.example +++ b/config/config.yml.example @@ -4,6 +4,8 @@ quality: best - name: day9tv quality: worst + vod: true # Download VODs instead of live streams + vod_limit: 5 # Check the 5 most recent VODs (default: 10) - site: mixer.com channels: - name: ninja diff --git a/config_reader_test.go b/config_reader_test.go index a79ae424..8a325d52 100644 --- a/config_reader_test.go +++ b/config_reader_test.go @@ -36,6 +36,45 @@ func TestReadConfig_MissingFile_Fatal(t *testing.T) { } } +func TestParseConfig_VODFields(t *testing.T) { + yamlData := []byte(` +- site: twitch.tv + channels: + - name: testuser + quality: best + vod: true + vod_limit: 5 +- site: twitch.tv + channels: + - name: liveuser + quality: best +`) + config, err := parseConfig(yamlData) + if err != nil { + t.Fatalf("Failed to parse config: %v", err) + } + + if len(config) != 2 { + t.Fatalf("Expected 2 site configs, got %d", len(config)) + } + + vodStreamer := config[0].Streamers[0] + if !vodStreamer.VOD { + t.Error("Expected VOD to be true") + } + if vodStreamer.VODLimit != 5 { + t.Errorf("Expected VODLimit 5, got %d", vodStreamer.VODLimit) + } + + liveStreamer := config[1].Streamers[0] + if liveStreamer.VOD { + t.Error("Expected VOD to default to false") + } + if liveStreamer.VODLimit != 0 { + t.Errorf("Expected VODLimit 0 (default), got %d", liveStreamer.VODLimit) + } +} + func TestParseConfig_MalformedYAML(t *testing.T) { dir := t.TempDir() cfg := filepath.Join(dir, "bad.yml") diff --git a/download_stream.go b/download_stream.go index f80439fd..554fc2c1 100644 --- a/download_stream.go +++ b/download_stream.go @@ -417,3 +417,161 @@ func redactBetween(s, start, end string) string { j += idx return s[:idx+len(start)] + "" + s[j:] } + +// sanitizeFilename removes or replaces characters that are unsafe in filenames. +func sanitizeFilename(name string) string { + replacer := strings.NewReplacer( + "/", "_", "\\", "_", ":", "_", "*", "_", + "?", "_", "\"", "_", "<", "_", ">", "_", + "|", "_", "\n", "_", "\r", "_", + ) + sanitized := replacer.Replace(name) + // Collapse multiple underscores + for strings.Contains(sanitized, "__") { + sanitized = strings.ReplaceAll(sanitized, "__", "_") + } + // Trim to reasonable length + if len(sanitized) > 100 { + sanitized = sanitized[:100] + } + return strings.Trim(sanitized, "_. ") +} + +// downloadVOD downloads a single VOD and updates its status in the database. +// The url parameter is a resolved stream URL (from GetStream via Streamlink/yt-dlp). +func downloadVOD(user string, vod VodResult, url string, outLoc string, moveLoc string, subfolder bool, vodDB *VodDB, control <-chan bool, response chan<- bool) { + sanitizedTitle := sanitizeFilename(vod.Title) + fileBase := user + "_vod_" + vod.ID + if sanitizedTitle != "" { + fileBase += "_" + sanitizedTitle + } + + naturalFinish := make(chan error, 1) + sigint := make(chan bool) + + // Always ensure base directories have correct permissions first + if err := createDirWithUmask(outLoc); err != nil { + log.Errorf("Failed to create output directory %s: %v", outLoc, err) + response <- true + return + } + if err := createDirWithUmask(moveLoc); err != nil { + log.Errorf("Failed to create move directory %s: %v", moveLoc, err) + response <- true + return + } + + if subfolder { + outLoc = filepath.Join(outLoc, user) + if err := createDirWithUmask(outLoc); err != nil { + log.Errorf("Failed to create output subfolder %s: %v", outLoc, err) + response <- true + return + } + moveLoc = filepath.Join(moveLoc, user) + if err := createDirWithUmask(moveLoc); err != nil { + log.Errorf("Failed to create move subfolder %s: %v", moveLoc, err) + response <- true + return + } + } + + outPath := filepath.Join(outLoc, fileBase+".mp4") + newPath := filepath.Join(moveLoc, fileBase+".mp4") + log.Infof("Starting VOD download for %s: %s", user, vod.Title) + + // Single control listener + go func() { + for { + _, more := <-control + if !more { + sigint <- true + return + } + } + }() + + buf := &bytes.Buffer{} + cmd := fluentffmpeg. + NewCommand(""). + InputPath(url). + OutputFormat("mp4"). + OutputPath(outPath). + OutputLogs(buf). + Build() + + // Prefer stream copy for VODs to avoid re-encoding + outIdx := indexOf(cmd.Args, outPath) + copyArgs := []string{"-c:v", "copy", "-c:a", "copy", "-movflags", "+faststart"} + if outIdx == -1 { + cmd.Args = append(cmd.Args, copyArgs...) + } else { + newArgs := make([]string, 0, len(cmd.Args)+len(copyArgs)) + newArgs = append(newArgs, cmd.Args[:outIdx]...) + newArgs = append(newArgs, copyArgs...) + newArgs = append(newArgs, cmd.Args[outIdx:]...) + cmd.Args = newArgs + } + + if indexOf(cmd.Args, "-y") == -1 { + cmd.Args = insertAfterBinary(cmd.Args, []string{"-y"}) + } + log.Debugf("FFmpeg VOD args (sanitized): %s", sanitizeArgs(cmd.Args)) + + if err := cmd.Start(); err != nil { + log.Errorf("Failed to start FFmpeg for VOD %s: %v", vod.ID, err) + if vodDB != nil { + vodDB.MarkVODFailed(vod.ID) + } + response <- true + return + } + + go func() { + naturalFinish <- cmd.Wait() + }() + + select { + case <-sigint: + log.Tracef("Sending SIGINT to VOD %s process", vod.ID) + if err := cmd.Process.Signal(syscall.SIGINT); err != nil { + log.Errorf("Failed to send SIGINT to VOD %s: %v", vod.ID, err) + } + cmd.Process.Wait() + cmd.Wait() + // Interrupted — leave as 'downloading'; stale threshold will handle retry + response <- true + return + case err := <-naturalFinish: + if err != nil { + log.Warnf("FFmpeg failed for VOD %s: %v", vod.ID, err) + ffLog := tailString(buf.String(), 50) + if ffLog != "" { + log.Warnf("FFmpeg log tail for VOD %s:\n%s", vod.ID, sanitizeLog(ffLog)) + } + if vodDB != nil { + vodDB.MarkVODFailed(vod.ID) + } + response <- true + return + } + + log.Debugf("VOD %s download complete", vod.ID) + if err := moveFile(outPath, newPath); err != nil { + log.Errorf("Failed to move VOD file: %v", err) + if vodDB != nil { + vodDB.MarkVODFailed(vod.ID) + } + } else { + log.Debugf("Moved VOD to %v", newPath) + if vodDB != nil { + if err := vodDB.MarkVODCompleted(vod.ID); err != nil { + log.Errorf("Failed to mark VOD %s as completed: %v", vod.ID, err) + } + } + } + + response <- true + return + } +} diff --git a/entrypoint_client.sh b/entrypoint_client.sh index e06eac44..81a224a8 100755 --- a/entrypoint_client.sh +++ b/entrypoint_client.sh @@ -12,8 +12,8 @@ if ! getent passwd "${PUID}" >/dev/null 2>&1; then adduser -D -u "${PUID}" -G streamdl streamdl fi -# Ensure download directories exist and are writable by the runtime user -mkdir -p /app/dl /app/out -chown "${PUID}:${PGID}" /app/dl /app/out 2>/dev/null || \ - echo "Could not change ownership on /app/dl or /app/out" +# Ensure download and data directories exist and are writable by the runtime user +mkdir -p /app/dl /app/out /app/data +chown "${PUID}:${PGID}" /app/dl /app/out /app/data 2>/dev/null || \ + echo "Could not change ownership on /app/dl, /app/out, or /app/data" exec su-exec "${PUID}":"${PGID}" /app/streamdl_client_entrypoint.sh "$@" diff --git a/go.mod b/go.mod index c2ec3ebf..930a42c5 100644 --- a/go.mod +++ b/go.mod @@ -14,24 +14,32 @@ require ( ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/go-jose/go-jose/v4 v4.1.4 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.23.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/stretchr/testify v1.11.1 // indirect go.opentelemetry.io/otel v1.41.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.49.0 // indirect - golang.org/x/sys v0.41.0 // indirect + golang.org/x/sys v0.42.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + modernc.org/libc v1.70.0 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.48.2 // indirect ) diff --git a/go.sum b/go.sum index 83e1d74c..e359a8fe 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -48,6 +50,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/modfy/fluent-ffmpeg v0.1.0 h1:9T191rhSK6KfoDo9Y/+0Tph3khrudvLQEEi05O+ijHA= github.com/modfy/fluent-ffmpeg v0.1.0/go.mod h1:GauXGqGYAmYFupCWG8n1eyuLZMKmLxGTGvszYkJ0Oyo= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -64,6 +68,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= @@ -118,6 +124,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -157,3 +165,11 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw= +modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/sqlite v1.48.2 h1:5CnW4uP8joZtA0LedVqLbZV5GD7F/0x91AXeSyjoh5c= +modernc.org/sqlite v1.48.2/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig= diff --git a/grpc_client.go b/grpc_client.go index e325b60b..f0d48ce8 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -15,6 +15,74 @@ import ( "google.golang.org/grpc/status" ) +// VodResult holds metadata for a single VOD returned by the server. +type VodResult struct { + ID string + Title string + PublishedAt string + DurationSeconds int64 +} + +func getVods(site string, user string, limit int) ([]VodResult, error) { + addr := os.Getenv("STREAMDL_GRPC_ADDR") + if addr == "" { + addr = "server" + } + port := os.Getenv("STREAMDL_GRPC_PORT") + if port == "" { + port = "50051" + } + log.Debugf("Dialing gRPC server %s:%s for VODs", addr, port) + conn, err := grpc.NewClient(addr+":"+port, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("gRPC failed to connect to %s:%s: %w", addr, port, err) + } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf("Error closing gRPC connection: %v", err) + } + }() + c := pb.NewStreamClient(conn) + + timeout := time.Second * 30 + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + log.Debugf("Calling GetVods site=%s user=%s limit=%d", site, user, limit) + msg, err := c.GetVods(ctx, &pb.VodRequest{Site: site, User: user, Limit: int32(limit)}, grpc.WaitForReady(true)) + if err != nil { + if e, ok := status.FromError(err); ok { + log.Errorf("GetVods failed for %s: %s", user, e.Message()) + switch e.Code() { + case codes.NotFound: + return nil, errors.New("user not found or no VODs available") + case codes.ResourceExhausted: + return nil, errors.New("rate limited") + default: + return nil, fmt.Errorf("GetVods failed: %s", e.Code().String()) + } + } + return nil, err + } + + if msg.GetError() != 0 { + return nil, fmt.Errorf("server returned error code: %d", msg.GetError()) + } + + var results []VodResult + for _, v := range msg.GetVods() { + results = append(results, VodResult{ + ID: v.GetId(), + Title: v.GetTitle(), + PublishedAt: v.GetPublishedAt(), + DurationSeconds: v.GetDurationSeconds(), + }) + } + + log.Debugf("GetVods returned %d VODs for %s", len(results), user) + return results, nil +} + func getStream(site string, user string, quality string) (string, error) { addr := os.Getenv("STREAMDL_GRPC_ADDR") if addr == "" { diff --git a/protos/stream.pb.go b/protos/stream.pb.go index 59ab695b..67917d06 100644 --- a/protos/stream.pb.go +++ b/protos/stream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.7 +// protoc-gen-go v1.36.11 +// protoc v7.34.1 // source: protos/stream.proto package streamdl @@ -11,6 +11,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -21,23 +22,20 @@ const ( ) type StreamInfo struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Site string `protobuf:"bytes,1,opt,name=site,proto3" json:"site,omitempty"` - User string `protobuf:"bytes,2,opt,name=user,proto3" json:"user,omitempty"` - Quality string `protobuf:"bytes,3,opt,name=quality,proto3" json:"quality,omitempty"` - OutputTemplate string `protobuf:"bytes,4,opt,name=output_template,json=outputTemplate,proto3" json:"output_template,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Site string `protobuf:"bytes,1,opt,name=site,proto3" json:"site,omitempty"` + User string `protobuf:"bytes,2,opt,name=user,proto3" json:"user,omitempty"` + Quality string `protobuf:"bytes,3,opt,name=quality,proto3" json:"quality,omitempty"` + OutputTemplate string `protobuf:"bytes,4,opt,name=output_template,json=outputTemplate,proto3" json:"output_template,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *StreamInfo) Reset() { *x = StreamInfo{} - if protoimpl.UnsafeEnabled { - mi := &file_protos_stream_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_protos_stream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *StreamInfo) String() string { @@ -48,7 +46,7 @@ func (*StreamInfo) ProtoMessage() {} func (x *StreamInfo) ProtoReflect() protoreflect.Message { mi := &file_protos_stream_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -92,21 +90,18 @@ func (x *StreamInfo) GetOutputTemplate() string { } type StreamResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` + Error int32 `protobuf:"varint,2,opt,name=error,proto3" json:"error,omitempty"` unknownFields protoimpl.UnknownFields - - Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` - Error int32 `protobuf:"varint,2,opt,name=error,proto3" json:"error,omitempty"` + sizeCache protoimpl.SizeCache } func (x *StreamResponse) Reset() { *x = StreamResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_protos_stream_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_protos_stream_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *StreamResponse) String() string { @@ -117,7 +112,7 @@ func (*StreamResponse) ProtoMessage() {} func (x *StreamResponse) ProtoReflect() protoreflect.Message { mi := &file_protos_stream_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -146,56 +141,248 @@ func (x *StreamResponse) GetError() int32 { return 0 } -var File_protos_stream_proto protoreflect.FileDescriptor +type VodRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Site string `protobuf:"bytes,1,opt,name=site,proto3" json:"site,omitempty"` + User string `protobuf:"bytes,2,opt,name=user,proto3" json:"user,omitempty"` + Limit int32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VodRequest) Reset() { + *x = VodRequest{} + mi := &file_protos_stream_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VodRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} -var file_protos_stream_proto_rawDesc = []byte{ - 0x0a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x22, 0x77, 0x0a, - 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x73, - 0x69, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x69, 0x74, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, - 0x73, 0x65, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x71, 0x75, 0x61, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x71, 0x75, 0x61, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x27, 0x0a, - 0x0f, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x54, 0x65, - 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x22, 0x38, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x32, 0x41, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x37, 0x0a, 0x09, 0x47, 0x65, - 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x16, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x20, 0x64, 0x61, 0x6e, 0x67, 0x65, 0x72, 0x6f, 0x75, 0x73, - 0x2e, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x64, 0x6c, 0x3b, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x64, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +func (*VodRequest) ProtoMessage() {} + +func (x *VodRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_stream_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } +// Deprecated: Use VodRequest.ProtoReflect.Descriptor instead. +func (*VodRequest) Descriptor() ([]byte, []int) { + return file_protos_stream_proto_rawDescGZIP(), []int{2} +} + +func (x *VodRequest) GetSite() string { + if x != nil { + return x.Site + } + return "" +} + +func (x *VodRequest) GetUser() string { + if x != nil { + return x.User + } + return "" +} + +func (x *VodRequest) GetLimit() int32 { + if x != nil { + return x.Limit + } + return 0 +} + +type VodInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Title string `protobuf:"bytes,2,opt,name=title,proto3" json:"title,omitempty"` + PublishedAt string `protobuf:"bytes,3,opt,name=published_at,json=publishedAt,proto3" json:"published_at,omitempty"` + DurationSeconds int64 `protobuf:"varint,4,opt,name=duration_seconds,json=durationSeconds,proto3" json:"duration_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VodInfo) Reset() { + *x = VodInfo{} + mi := &file_protos_stream_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VodInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VodInfo) ProtoMessage() {} + +func (x *VodInfo) ProtoReflect() protoreflect.Message { + mi := &file_protos_stream_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VodInfo.ProtoReflect.Descriptor instead. +func (*VodInfo) Descriptor() ([]byte, []int) { + return file_protos_stream_proto_rawDescGZIP(), []int{3} +} + +func (x *VodInfo) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *VodInfo) GetTitle() string { + if x != nil { + return x.Title + } + return "" +} + +func (x *VodInfo) GetPublishedAt() string { + if x != nil { + return x.PublishedAt + } + return "" +} + +func (x *VodInfo) GetDurationSeconds() int64 { + if x != nil { + return x.DurationSeconds + } + return 0 +} + +type VodResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Vods []*VodInfo `protobuf:"bytes,1,rep,name=vods,proto3" json:"vods,omitempty"` + Error int32 `protobuf:"varint,2,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VodResponse) Reset() { + *x = VodResponse{} + mi := &file_protos_stream_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VodResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VodResponse) ProtoMessage() {} + +func (x *VodResponse) ProtoReflect() protoreflect.Message { + mi := &file_protos_stream_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VodResponse.ProtoReflect.Descriptor instead. +func (*VodResponse) Descriptor() ([]byte, []int) { + return file_protos_stream_proto_rawDescGZIP(), []int{4} +} + +func (x *VodResponse) GetVods() []*VodInfo { + if x != nil { + return x.Vods + } + return nil +} + +func (x *VodResponse) GetError() int32 { + if x != nil { + return x.Error + } + return 0 +} + +var File_protos_stream_proto protoreflect.FileDescriptor + +const file_protos_stream_proto_rawDesc = "" + + "\n" + + "\x13protos/stream.proto\x12\x06protos\"w\n" + + "\n" + + "StreamInfo\x12\x12\n" + + "\x04site\x18\x01 \x01(\tR\x04site\x12\x12\n" + + "\x04user\x18\x02 \x01(\tR\x04user\x12\x18\n" + + "\aquality\x18\x03 \x01(\tR\aquality\x12'\n" + + "\x0foutput_template\x18\x04 \x01(\tR\x0eoutputTemplate\"8\n" + + "\x0eStreamResponse\x12\x10\n" + + "\x03url\x18\x01 \x01(\tR\x03url\x12\x14\n" + + "\x05error\x18\x02 \x01(\x05R\x05error\"J\n" + + "\n" + + "VodRequest\x12\x12\n" + + "\x04site\x18\x01 \x01(\tR\x04site\x12\x12\n" + + "\x04user\x18\x02 \x01(\tR\x04user\x12\x14\n" + + "\x05limit\x18\x03 \x01(\x05R\x05limit\"}\n" + + "\aVodInfo\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x14\n" + + "\x05title\x18\x02 \x01(\tR\x05title\x12!\n" + + "\fpublished_at\x18\x03 \x01(\tR\vpublishedAt\x12)\n" + + "\x10duration_seconds\x18\x04 \x01(\x03R\x0fdurationSeconds\"H\n" + + "\vVodResponse\x12#\n" + + "\x04vods\x18\x01 \x03(\v2\x0f.protos.VodInfoR\x04vods\x12\x14\n" + + "\x05error\x18\x02 \x01(\x05R\x05error2u\n" + + "\x06Stream\x127\n" + + "\tGetStream\x12\x12.protos.StreamInfo\x1a\x16.protos.StreamResponse\x122\n" + + "\aGetVods\x12\x12.protos.VodRequest\x1a\x13.protos.VodResponseB\"Z dangerous.tech/streamdl;streamdlb\x06proto3" + var ( file_protos_stream_proto_rawDescOnce sync.Once - file_protos_stream_proto_rawDescData = file_protos_stream_proto_rawDesc + file_protos_stream_proto_rawDescData []byte ) func file_protos_stream_proto_rawDescGZIP() []byte { file_protos_stream_proto_rawDescOnce.Do(func() { - file_protos_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_stream_proto_rawDescData) + file_protos_stream_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_protos_stream_proto_rawDesc), len(file_protos_stream_proto_rawDesc))) }) return file_protos_stream_proto_rawDescData } -var file_protos_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_protos_stream_proto_goTypes = []interface{}{ +var file_protos_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_protos_stream_proto_goTypes = []any{ (*StreamInfo)(nil), // 0: protos.StreamInfo (*StreamResponse)(nil), // 1: protos.StreamResponse + (*VodRequest)(nil), // 2: protos.VodRequest + (*VodInfo)(nil), // 3: protos.VodInfo + (*VodResponse)(nil), // 4: protos.VodResponse } var file_protos_stream_proto_depIdxs = []int32{ - 0, // 0: protos.Stream.GetStream:input_type -> protos.StreamInfo - 1, // 1: protos.Stream.GetStream:output_type -> protos.StreamResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 3, // 0: protos.VodResponse.vods:type_name -> protos.VodInfo + 0, // 1: protos.Stream.GetStream:input_type -> protos.StreamInfo + 2, // 2: protos.Stream.GetVods:input_type -> protos.VodRequest + 1, // 3: protos.Stream.GetStream:output_type -> protos.StreamResponse + 4, // 4: protos.Stream.GetVods:output_type -> protos.VodResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_protos_stream_proto_init() } @@ -203,39 +390,13 @@ func file_protos_stream_proto_init() { if File_protos_stream_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_protos_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamInfo); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_protos_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_protos_stream_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_protos_stream_proto_rawDesc), len(file_protos_stream_proto_rawDesc)), NumEnums: 0, - NumMessages: 2, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, @@ -244,7 +405,6 @@ func file_protos_stream_proto_init() { MessageInfos: file_protos_stream_proto_msgTypes, }.Build() File_protos_stream_proto = out.File - file_protos_stream_proto_rawDesc = nil file_protos_stream_proto_goTypes = nil file_protos_stream_proto_depIdxs = nil } diff --git a/protos/stream.proto b/protos/stream.proto index 13cb5a38..95030c2a 100644 --- a/protos/stream.proto +++ b/protos/stream.proto @@ -6,6 +6,7 @@ option go_package = "dangerous.tech/streamdl;streamdl"; service Stream { rpc GetStream(StreamInfo) returns (StreamResponse); + rpc GetVods(VodRequest) returns (VodResponse); } message StreamInfo { @@ -18,4 +19,22 @@ message StreamInfo { message StreamResponse { string url = 1; int32 error = 2; +} + +message VodRequest { + string site = 1; + string user = 2; + int32 limit = 3; +} + +message VodInfo { + string id = 1; + string title = 2; + string published_at = 3; + int64 duration_seconds = 4; +} + +message VodResponse { + repeated VodInfo vods = 1; + int32 error = 2; } \ No newline at end of file diff --git a/protos/stream_grpc.pb.go b/protos/stream_grpc.pb.go index 52273c96..c75349e6 100644 --- a/protos/stream_grpc.pb.go +++ b/protos/stream_grpc.pb.go @@ -1,4 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.1 +// - protoc v7.34.1 +// source: protos/stream.proto package streamdl @@ -11,14 +15,20 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + Stream_GetStream_FullMethodName = "/protos.Stream/GetStream" + Stream_GetVods_FullMethodName = "/protos.Stream/GetVods" +) // StreamClient is the client API for Stream service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type StreamClient interface { GetStream(ctx context.Context, in *StreamInfo, opts ...grpc.CallOption) (*StreamResponse, error) + GetVods(ctx context.Context, in *VodRequest, opts ...grpc.CallOption) (*VodResponse, error) } type streamClient struct { @@ -30,8 +40,19 @@ func NewStreamClient(cc grpc.ClientConnInterface) StreamClient { } func (c *streamClient) GetStream(ctx context.Context, in *StreamInfo, opts ...grpc.CallOption) (*StreamResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(StreamResponse) - err := c.cc.Invoke(ctx, "/protos.Stream/GetStream", in, out, opts...) + err := c.cc.Invoke(ctx, Stream_GetStream_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamClient) GetVods(ctx context.Context, in *VodRequest, opts ...grpc.CallOption) (*VodResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(VodResponse) + err := c.cc.Invoke(ctx, Stream_GetVods_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -40,20 +61,28 @@ func (c *streamClient) GetStream(ctx context.Context, in *StreamInfo, opts ...gr // StreamServer is the server API for Stream service. // All implementations must embed UnimplementedStreamServer -// for forward compatibility +// for forward compatibility. type StreamServer interface { GetStream(context.Context, *StreamInfo) (*StreamResponse, error) + GetVods(context.Context, *VodRequest) (*VodResponse, error) mustEmbedUnimplementedStreamServer() } -// UnimplementedStreamServer must be embedded to have forward compatible implementations. -type UnimplementedStreamServer struct { -} +// UnimplementedStreamServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedStreamServer struct{} func (UnimplementedStreamServer) GetStream(context.Context, *StreamInfo) (*StreamResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetStream not implemented") + return nil, status.Error(codes.Unimplemented, "method GetStream not implemented") +} +func (UnimplementedStreamServer) GetVods(context.Context, *VodRequest) (*VodResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetVods not implemented") } func (UnimplementedStreamServer) mustEmbedUnimplementedStreamServer() {} +func (UnimplementedStreamServer) testEmbeddedByValue() {} // UnsafeStreamServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to StreamServer will @@ -63,6 +92,13 @@ type UnsafeStreamServer interface { } func RegisterStreamServer(s grpc.ServiceRegistrar, srv StreamServer) { + // If the following call panics, it indicates UnimplementedStreamServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Stream_ServiceDesc, srv) } @@ -76,7 +112,7 @@ func _Stream_GetStream_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/protos.Stream/GetStream", + FullMethod: Stream_GetStream_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(StreamServer).GetStream(ctx, req.(*StreamInfo)) @@ -84,6 +120,24 @@ func _Stream_GetStream_Handler(srv interface{}, ctx context.Context, dec func(in return interceptor(ctx, in, info, handler) } +func _Stream_GetVods_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(VodRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StreamServer).GetVods(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Stream_GetVods_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StreamServer).GetVods(ctx, req.(*VodRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Stream_ServiceDesc is the grpc.ServiceDesc for Stream service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -95,6 +149,10 @@ var Stream_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetStream", Handler: _Stream_GetStream_Handler, }, + { + MethodName: "GetVods", + Handler: _Stream_GetVods_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "protos/stream.proto", diff --git a/pyproject.toml b/pyproject.toml index 7c47dd87..546902fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,10 @@ [tool.uv] -dev-dependencies = ["pytest", "bandit", "black<25.0,>=24.3"] +dev-dependencies = [ + "pytest", + "bandit", + "black<25.0,>=24.3", + "grpcio-tools>=1.71.2", +] [project] authors = [{ name = "biodrone", email = "biodrone@dangerous.tech" }] @@ -7,7 +12,7 @@ requires-python = "<4.0,>=3.10" dependencies = [ "curl-cffi>=0.15.0", "grpcio>=1.80.0,<2.0.0", - "protobuf<6.0.0,>=5.26.0", + "protobuf<6.0.0,>=5.29.0", "streamlink>=8.3.0", "wheel<1.0.0,>=0.45.1", "yt-dlp>=2026.3.17", diff --git a/stream_pb2.py b/stream_pb2.py index 1cb24c76..b183bf24 100644 --- a/stream_pb2.py +++ b/stream_pb2.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: protos/stream.proto +# NO CHECKED-IN PROTOBUF GENCODE +# source: stream.proto +# Protobuf Python Version: 5.29.0 """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'stream.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -13,18 +24,24 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13protos/stream.proto\x12\x06protos\"R\n\nStreamInfo\x12\x0c\n\x04site\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x0f\n\x07quality\x18\x03 \x01(\t\x12\x17\n\x0foutput_template\x18\x04 \x01(\t\",\n\x0eStreamResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\x05\x32\x41\n\x06Stream\x12\x37\n\tGetStream\x12\x12.protos.StreamInfo\x1a\x16.protos.StreamResponseB\"Z dangerous.tech/streamdl;streamdlb\x06proto3') - -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.stream_pb2', globals()) -if _descriptor._USE_C_DESCRIPTORS == False: +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cstream.proto\x12\x06protos\"R\n\nStreamInfo\x12\x0c\n\x04site\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x0f\n\x07quality\x18\x03 \x01(\t\x12\x17\n\x0foutput_template\x18\x04 \x01(\t\",\n\x0eStreamResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\x05\"7\n\nVodRequest\x12\x0c\n\x04site\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\x05\"T\n\x07VodInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x14\n\x0cpublished_at\x18\x03 \x01(\t\x12\x18\n\x10\x64uration_seconds\x18\x04 \x01(\x03\";\n\x0bVodResponse\x12\x1d\n\x04vods\x18\x01 \x03(\x0b\x32\x0f.protos.VodInfo\x12\r\n\x05\x65rror\x18\x02 \x01(\x05\x32u\n\x06Stream\x12\x37\n\tGetStream\x12\x12.protos.StreamInfo\x1a\x16.protos.StreamResponse\x12\x32\n\x07GetVods\x12\x12.protos.VodRequest\x1a\x13.protos.VodResponseB\"Z dangerous.tech/streamdl;streamdlb\x06proto3') - DESCRIPTOR._options = None - DESCRIPTOR._serialized_options = b'Z dangerous.tech/streamdl;streamdl' - _STREAMINFO._serialized_start=31 - _STREAMINFO._serialized_end=113 - _STREAMRESPONSE._serialized_start=115 - _STREAMRESPONSE._serialized_end=159 - _STREAM._serialized_start=161 - _STREAM._serialized_end=226 +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'stream_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'Z dangerous.tech/streamdl;streamdl' + _globals['_STREAMINFO']._serialized_start=24 + _globals['_STREAMINFO']._serialized_end=106 + _globals['_STREAMRESPONSE']._serialized_start=108 + _globals['_STREAMRESPONSE']._serialized_end=152 + _globals['_VODREQUEST']._serialized_start=154 + _globals['_VODREQUEST']._serialized_end=209 + _globals['_VODINFO']._serialized_start=211 + _globals['_VODINFO']._serialized_end=295 + _globals['_VODRESPONSE']._serialized_start=297 + _globals['_VODRESPONSE']._serialized_end=356 + _globals['_STREAM']._serialized_start=358 + _globals['_STREAM']._serialized_end=475 # @@protoc_insertion_point(module_scope) diff --git a/stream_pb2_grpc.py b/stream_pb2_grpc.py index cab87f5c..31e62831 100644 --- a/stream_pb2_grpc.py +++ b/stream_pb2_grpc.py @@ -1,9 +1,29 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc +import warnings import stream_pb2 as stream__pb2 +GRPC_GENERATED_VERSION = '1.71.2' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in stream_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + class StreamStub(object): """Missing associated documentation comment in .proto file.""" @@ -18,7 +38,12 @@ def __init__(self, channel): '/protos.Stream/GetStream', request_serializer=stream__pb2.StreamInfo.SerializeToString, response_deserializer=stream__pb2.StreamResponse.FromString, - ) + _registered_method=True) + self.GetVods = channel.unary_unary( + '/protos.Stream/GetVods', + request_serializer=stream__pb2.VodRequest.SerializeToString, + response_deserializer=stream__pb2.VodResponse.FromString, + _registered_method=True) class StreamServicer(object): @@ -30,6 +55,12 @@ def GetStream(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def GetVods(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_StreamServicer_to_server(servicer, server): rpc_method_handlers = { @@ -38,10 +69,16 @@ def add_StreamServicer_to_server(servicer, server): request_deserializer=stream__pb2.StreamInfo.FromString, response_serializer=stream__pb2.StreamResponse.SerializeToString, ), + 'GetVods': grpc.unary_unary_rpc_method_handler( + servicer.GetVods, + request_deserializer=stream__pb2.VodRequest.FromString, + response_serializer=stream__pb2.VodResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'protos.Stream', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('protos.Stream', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -59,8 +96,45 @@ def GetStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/protos.Stream/GetStream', + return grpc.experimental.unary_unary( + request, + target, + '/protos.Stream/GetStream', stream__pb2.StreamInfo.SerializeToString, stream__pb2.StreamResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetVods(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/protos.Stream/GetVods', + stream__pb2.VodRequest.SerializeToString, + stream__pb2.VodResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/streamdl.go b/streamdl.go index 1edd80e7..7b4e1913 100644 --- a/streamdl.go +++ b/streamdl.go @@ -5,6 +5,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "sort" "strings" "sync" @@ -18,6 +19,7 @@ import ( var ( urls = make(map[string]string) urlsMu sync.RWMutex + vodWg sync.WaitGroup ) var c = make(chan os.Signal, 2) @@ -29,8 +31,19 @@ func main() { batchTime := flag.Int("batch", 5, "Time betwen URL checks (seconds): increase for rate limiting") subfolder := flag.Bool("subfolder", false, "Add streams to a subfolder with the channel name") logLevel := flag.String("log-level", "info", "Log level (trace, debug, info, warn, error, fatal, panic)") + dataDir := flag.String("data", "/app/data", "Directory for persistent data (VOD tracking database)") + vodOutLoc := flag.String("vod-out", "", "Output location for VOD downloads (defaults to -out)") + vodMoveLoc := flag.String("vod-move", "", "Move location for completed VOD downloads (defaults to -move)") flag.Parse() + // Default VOD paths to the same as live stream paths if not specified + if *vodOutLoc == "" { + vodOutLoc = outLoc + } + if *vodMoveLoc == "" { + vodMoveLoc = moveLoc + } + var ticker = time.NewTicker(time.Second * time.Duration(*tickTime)) var config []Config parsed, confErr := parseConfig(readConfig(*confLoc)) @@ -51,6 +64,15 @@ func main() { log.SetLevel(ll) } log.Infof("Starting StreamDL...") + + // VOD database is lazily initialized on first VOD tick + var vodDB *VodDB + defer func() { + if vodDB != nil { + vodDB.Close() + } + }() + log.Tracef("Config: %v", config) if confErr != nil { @@ -84,43 +106,103 @@ func main() { // TODO: Probably make a nicer 429 handling to allow for counts, retry queueing, etc. for _, site := range config { for _, streamer := range site.Streamers { - log.Debugf("Checking user=%s on site=%s quality=%s", streamer.User, site.Site, streamer.Quality) - urlsMu.RLock() - _, exists := urls[streamer.User] - urlsMu.RUnlock() - if !exists { - log.Tracef("No active URL cached for %s; requesting new stream URL", streamer.User) - url, err := getStream(site.Site, streamer.User, streamer.Quality) + log.Debugf("Checking user=%s on site=%s quality=%s vod=%v", streamer.User, site.Site, streamer.Quality, streamer.VOD) + + if streamer.VOD { + // Lazy init VOD database on first use + if vodDB == nil { + var initErr error + vodDB, initErr = InitVodDB(filepath.Join(*dataDir, "streamdl.db")) + if initErr != nil { + log.Errorf("Failed to initialize VOD database: %v", initErr) + continue + } + log.Infof("VOD tracking database initialized at %s", filepath.Join(*dataDir, "streamdl.db")) + } + + // VOD mode: check for new VODs to download + limit := streamer.VODLimit + if limit <= 0 { + limit = 10 + } + vods, err := getVods(site.Site, streamer.User, limit) time.Sleep(time.Second * time.Duration(*batchTime)) - if err == nil { - urlsMu.Lock() - urls[streamer.User] = url - urlsMu.Unlock() - log.Debugf("Discovered live stream for user=%s", streamer.User) - go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) - } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited, Sleeping for 30 seconds") - time.Sleep(time.Second * 30) + if err != nil { + if err.Error() == "rate limited" { + log.Errorf("Rate limited checking VODs for %s, skipping", streamer.User) + } else { + log.Warnf("GetVods failed for user=%s: %v", streamer.User, err) + } + continue + } + // Stale threshold: 2× tick interval, minimum 10 minutes + staleThreshold := time.Duration(*tickTime) * time.Second * 2 + if staleThreshold < 10*time.Minute { + staleThreshold = 10 * time.Minute + } + for _, vod := range vods { + claimed, err := vodDB.ClaimVOD(vod.ID, streamer.User, site.Site, vod.Title, staleThreshold) + if err != nil { + log.Errorf("Error claiming VOD %s: %v", vod.ID, err) + continue + } + if !claimed { + log.Tracef("VOD %s already completed or in progress, skipping", vod.ID) + continue + } + log.Infof("VOD to download for %s: %s (%s)", streamer.User, vod.Title, vod.ID) + // Resolve the VOD URL through GetStream (Streamlink → yt-dlp fallback) + resolvedURL, err := getStream(site.Site, "videos/"+vod.ID, streamer.Quality) + time.Sleep(time.Second * time.Duration(*batchTime)) + if err != nil { + log.Warnf("Failed to resolve VOD %s: %v", vod.ID, err) + continue + } + vodWg.Add(1) + go func() { + defer vodWg.Done() + downloadVOD(streamer.User, vod, resolvedURL, *vodOutLoc, *vodMoveLoc, *subfolder, vodDB, control, response) + }() + } + } else { + // Live stream mode (existing behavior) + urlsMu.RLock() + _, exists := urls[streamer.User] + urlsMu.RUnlock() + if !exists { + log.Tracef("No active URL cached for %s; requesting new stream URL", streamer.User) url, err := getStream(site.Site, streamer.User, streamer.Quality) + time.Sleep(time.Second * time.Duration(*batchTime)) if err == nil { urlsMu.Lock() urls[streamer.User] = url urlsMu.Unlock() + log.Debugf("Discovered live stream for user=%s", streamer.User) go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited, Sleeping for 60 seconds") - time.Sleep(time.Second * 60) + log.Errorf("Rate Limited, Sleeping for 30 seconds") + time.Sleep(time.Second * 30) url, err := getStream(site.Site, streamer.User, streamer.Quality) if err == nil { urlsMu.Lock() urls[streamer.User] = url urlsMu.Unlock() go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) + } else if err.Error() == "rate limited" { + log.Errorf("Rate Limited, Sleeping for 60 seconds") + time.Sleep(time.Second * 60) + url, err := getStream(site.Site, streamer.User, streamer.Quality) + if err == nil { + urlsMu.Lock() + urls[streamer.User] = url + urlsMu.Unlock() + go downloadStream(streamer.User, url, *outLoc, *moveLoc, *subfolder, control, response) + } + } else if err.Error() == "rate limited" { + log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User) + } else { + log.Warnf("GetStream failed for user=%s: %v", streamer.User, err) } - } else if err.Error() == "rate limited" { - log.Errorf("Rate Limited Thrice, Skipping %v", streamer.User) - } else { - log.Warnf("GetStream failed for user=%s: %v", streamer.User, err) } } } @@ -152,6 +234,7 @@ func main() { for i := 0; i < urlsLen; i++ { <-response } + vodWg.Wait() time.Sleep(time.Second * 3) os.Exit(0) case t := <-ticker.C: diff --git a/streamdl_proto_srv.py b/streamdl_proto_srv.py index 9328199a..2826c3f6 100644 --- a/streamdl_proto_srv.py +++ b/streamdl_proto_srv.py @@ -93,6 +93,51 @@ def log_message(self, fmt, *args): class StreamServicer(pb_grpc.Stream): """gRPC servicer that resolves live stream URLs.""" + def GetVods(self, request, context): + """Enumerate recent VODs for a user on a given site.""" + logger.debug( + "GetVods request received site=%s user=%s limit=%d", + request.site, + request.user, + request.limit, + ) + limit = request.limit if request.limit > 0 else 10 + res = get_vods(request.site, request.user, limit) + + if "error" in res: + error_code = res["error"] + logger.debug( + "GetVods failure user=%s error=%s", + request.user, + error_code, + ) + match error_code: + case 404: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details("User not found or no VODs available") + case 429: + context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) + context.set_details("Rate limited") + case _: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details("Internal server error") + return pb.VodResponse(error=error_code) + + vod_infos = [] + for v in res.get("vods", []): + vod_infos.append( + pb.VodInfo( + id=v["id"], + title=v["title"], + published_at=v["published_at"], + duration_seconds=v["duration_seconds"], + ) + ) + + logger.debug("GetVods success user=%s count=%d", request.user, len(vod_infos)) + context.set_code(grpc.StatusCode.OK) + return pb.VodResponse(vods=vod_infos) + def GetStream(self, request, context): """Resolve a stream URL for the given site/user/quality and return it via gRPC.""" logger.debug( @@ -183,6 +228,60 @@ def serve(): server.stop(0) +def get_vods(site, user, limit=10): + """Enumerate a user's recent VODs using yt-dlp.""" + vod_url = f"https://{site}/{user}/videos" + logger.debug("Fetching VODs from %s (limit=%d)", vod_url, limit) + + try: + with yt_dlp.YoutubeDL( + { + "quiet": yt_dlp_quiet, + "no_warnings": yt_dlp_no_warnings, + "verbose": False, + "logger": None, + "extract_flat": "in_playlist", + "playlistend": limit, + } + ) as ydl: + info = ydl.extract_info(vod_url, download=False) + + if not info or "entries" not in info: + logger.warning("No VODs found for user %s", user) + return {"error": 404} + + vods = [] + for entry in info["entries"]: + if entry is None: + continue + vod = { + "id": str(entry.get("id", "")), + "title": entry.get("title", ""), + "published_at": entry.get("upload_date", ""), + "duration_seconds": int(entry.get("duration", 0) or 0), + } + if vod["id"]: + vods.append(vod) + + logger.debug("Found %d VODs for user %s", len(vods), user) + return {"vods": vods} + + except yt_dlp.utils.DownloadError as e: + error_str = str(e) + if "HTTP Error 429" in error_str: + logger.error("Rate limited fetching VODs for %s", user) + return {"error": 429} + elif "HTTP Error 404" in error_str or "does not exist" in error_str: + logger.warning("User %s not found on %s", user, site) + return {"error": 404} + else: + logger.error("DownloadError fetching VODs for %s: %s", user, e) + return {"error": 500} + except Exception as e: + logger.error("Error fetching VODs for %s: %s", user, e) + return {"error": 500} + + def get_stream(r): """Resolve a stream URL using Streamlink, falling back to yt-dlp on failure.""" logger.debug( diff --git a/tests/integration/docker-compose.integration.yml b/tests/integration/docker-compose.integration.yml index ffef924e..e6a63e20 100644 --- a/tests/integration/docker-compose.integration.yml +++ b/tests/integration/docker-compose.integration.yml @@ -31,6 +31,7 @@ services: - ./output/incomplete:/app/dl - ./output/complete:/app/out - ./config:/app/config + - ./data:/app/data depends_on: server: condition: service_healthy diff --git a/tests/integration/run.sh b/tests/integration/run.sh index c21597a1..b4ae4209 100755 --- a/tests/integration/run.sh +++ b/tests/integration/run.sh @@ -230,11 +230,10 @@ fi echo "" if [ "$PASS" = true ]; then - echo "=== PASS: Integration test succeeded ===" + echo "=== PASS: Live stream integration test succeeded ===" echo " Downloaded ${DURATION}s of $LIVE_CHANNEL's stream, valid mp4 with video." - exit 0 else - echo "=== FAIL: Integration test failed ===" + echo "=== FAIL: Live stream integration test failed ===" echo "" echo "--- ffprobe output ---" echo "$PROBE_OUTPUT" @@ -243,3 +242,141 @@ else $DC -f "$COMPOSE_FILE" logs client 2>&1 | tail -50 exit 1 fi + +# --- Phase 5: VOD download test --- +echo "" +echo "=== VOD Download Test ===" + +# Clean output and DB state for VOD test +rm -rf "$OUTPUT_DIR/incomplete"/* "$OUTPUT_DIR/complete"/* +rm -rf "$SCRIPT_DIR/data" +mkdir -p "$SCRIPT_DIR/data" + +# Channels known to have VODs, in priority order. +# Override with VOD_CHANNEL env var to skip probing. +CANDIDATE_VOD_CHANNELS=( + teampgp + kaicenat + xqc + hasanabi + shroud + summit1g +) + +if [ -n "${VOD_CHANNEL:-}" ]; then + echo "--- Using VOD_CHANNEL override: $VOD_CHANNEL ---" +else + echo "--- Probing for a channel with VODs ---" + VOD_CHANNEL="" + ANY_PROBE_OK=false + for vod_candidate in "${CANDIDATE_VOD_CHANNELS[@]}"; do + echo -n " Trying $vod_candidate... " + RESULT=$($DC -f "$COMPOSE_FILE" exec -T server \ + /app/.venv/bin/python -c " +import socket +socket.setdefaulttimeout(15) +import yt_dlp +try: + with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True, 'extract_flat': 'in_playlist', 'playlistend': 1}) as ydl: + info = ydl.extract_info('https://twitch.tv/$vod_candidate/videos', download=False) + if info and 'entries' in info and list(info['entries']): + print('HAS_VODS') + else: + print('NO_VODS') +except Exception as e: + print(f'ERROR:{e}') +" 2>/dev/null) || RESULT="ERROR" + + if [ "$RESULT" = "HAS_VODS" ]; then + echo "has VODs!" + VOD_CHANNEL="$vod_candidate" + break + else + echo "$RESULT" + # Track whether any probe completed without error + if [ "$RESULT" = "NO_VODS" ]; then + ANY_PROBE_OK=true + fi + fi + done +fi + +if [ -z "$VOD_CHANNEL" ]; then + if [ "$ANY_PROBE_OK" = false ] && [ -z "${VOD_CHANNEL:-}" ]; then + echo "" + echo "FAIL: All VOD probes failed with errors. Check server connectivity." + echo "--- Server logs ---" + $DC -f "$COMPOSE_FILE" logs server 2>&1 | tail -30 + exit 1 + fi + echo "" + echo "SKIP: No channels with VODs found among candidates." + echo " The test infrastructure works; re-run or set VOD_CHANNEL manually." + echo "=== PASS: Live stream integration test succeeded (VOD phase skipped) ===" + exit 0 +fi + +cat > "$CONFIG_DIR/config.yml" </dev/null | head -1) || true + if [ -n "$VOD_FILE" ]; then + break + fi + # An in-progress download >1KB in /incomplete proves the full pipeline works: + # VOD discovered → URL resolved → FFmpeg started → bytes flowing. + # We accept this rather than waiting for completion because full VODs can take + # much longer than a reasonable CI timeout, and the live stream phase already + # validates the download-to-completion + file-move path. + VOD_PROGRESS=$(find "$OUTPUT_DIR/incomplete" -name "*_vod_*.mp4" -size +1000c 2>/dev/null | head -1) || true + if [ -n "$VOD_PROGRESS" ]; then + echo "--- VOD download in progress: $VOD_PROGRESS ---" + break + fi + sleep 5 + VOD_ELAPSED=$((VOD_ELAPSED + 5)) +done + +if [ -z "$VOD_FILE" ] && [ -z "$VOD_PROGRESS" ]; then + echo "FAIL: No VOD download activity found after ${VOD_TIMEOUT}s" + echo "" + echo "--- Client logs ---" + $DC -f "$COMPOSE_FILE" logs client 2>&1 | tail -30 + echo "" + echo "--- Server logs ---" + $DC -f "$COMPOSE_FILE" logs server 2>&1 | tail -30 + exit 1 +fi + +if [ -n "$VOD_FILE" ]; then + echo "--- VOD download complete: $VOD_FILE ---" + VOD_CHECK="$VOD_FILE" +else + echo "--- VOD download started (in progress): $VOD_PROGRESS ---" + VOD_CHECK="$VOD_PROGRESS" +fi + +VOD_SIZE=$(stat -f%z "$VOD_CHECK" 2>/dev/null || stat --printf="%s" "$VOD_CHECK" 2>/dev/null || echo "0") +echo " File size: $VOD_SIZE bytes" + +if [ "$VOD_SIZE" -lt 1000 ]; then + echo "FAIL: VOD file too small" + exit 1 +fi + +echo "=== PASS: All integration tests succeeded ===" diff --git a/uv.lock b/uv.lock index 1ea9fa03..6bb6d732 100644 --- a/uv.lock +++ b/uv.lock @@ -348,6 +348,59 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/8c/bbe6baf2557262834f2070cf668515fa308b2d38a4bbf771f8f7872a7036/grpcio-1.80.0-cp314-cp314-win_amd64.whl", hash = "sha256:3b01e1f5464c583d2f567b2e46ff0d516ef979978f72091fd81f5ab7fa6e2e7f", size = 5019457, upload-time = "2026-03-30T08:48:37.308Z" }, ] +[[package]] +name = "grpcio-tools" +version = "1.71.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "grpcio" }, + { name = "protobuf" }, + { name = "setuptools" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ad/9a/edfefb47f11ef6b0f39eea4d8f022c5bb05ac1d14fcc7058e84a51305b73/grpcio_tools-1.71.2.tar.gz", hash = "sha256:b5304d65c7569b21270b568e404a5a843cf027c66552a6a0978b23f137679c09", size = 5330655, upload-time = "2025-06-28T04:22:00.308Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/dd/ad/e74a4d1cffff628c2ef1ec5b9944fb098207cc4af6eb8db4bc52e6d99236/grpcio_tools-1.71.2-cp310-cp310-linux_armv7l.whl", hash = "sha256:ab8a28c2e795520d6dc6ffd7efaef4565026dbf9b4f5270de2f3dd1ce61d2318", size = 2385557, upload-time = "2025-06-28T04:20:38.833Z" }, + { url = "https://files.pythonhosted.org/packages/63/bf/30b63418279d6fdc4fd4a3781a7976c40c7e8ee052333b9ce6bd4ce63f30/grpcio_tools-1.71.2-cp310-cp310-macosx_10_14_universal2.whl", hash = "sha256:654ecb284a592d39a85556098b8c5125163435472a20ead79b805cf91814b99e", size = 5446915, upload-time = "2025-06-28T04:20:40.947Z" }, + { url = "https://files.pythonhosted.org/packages/83/cd/2994e0a0a67714fdb00c207c4bec60b9b356fbd6b0b7a162ecaabe925155/grpcio_tools-1.71.2-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:b49aded2b6c890ff690d960e4399a336c652315c6342232c27bd601b3705739e", size = 2348301, upload-time = "2025-06-28T04:20:42.766Z" }, + { url = "https://files.pythonhosted.org/packages/5b/8b/4f2315927af306af1b35793b332b9ca9dc5b5a2cde2d55811c9577b5f03f/grpcio_tools-1.71.2-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a7811a6fc1c4b4e5438e5eb98dbd52c2dc4a69d1009001c13356e6636322d41a", size = 2742159, upload-time = "2025-06-28T04:20:44.206Z" }, + { url = "https://files.pythonhosted.org/packages/8d/98/d513f6c09df405c82583e7083c20718ea615ed0da69ec42c80ceae7ebdc5/grpcio_tools-1.71.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:393a9c80596aa2b3f05af854e23336ea8c295593bbb35d9adae3d8d7943672bd", size = 2473444, upload-time = "2025-06-28T04:20:45.5Z" }, + { url = "https://files.pythonhosted.org/packages/fa/fe/00af17cc841916d5e4227f11036bf443ce006629212c876937c7904b0ba3/grpcio_tools-1.71.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:823e1f23c12da00f318404c4a834bb77cd150d14387dee9789ec21b335249e46", size = 2850339, upload-time = "2025-06-28T04:20:46.758Z" }, + { url = "https://files.pythonhosted.org/packages/7d/59/745fc50dfdbed875fcfd6433883270d39d23fb1aa4ecc9587786f772dce3/grpcio_tools-1.71.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:9bfbea79d6aec60f2587133ba766ede3dc3e229641d1a1e61d790d742a3d19eb", size = 3300795, upload-time = "2025-06-28T04:20:48.327Z" }, + { url = "https://files.pythonhosted.org/packages/62/3e/d9d0fb2df78e601c28d02ef0cd5d007f113c1b04fc21e72bf56e8c3df66b/grpcio_tools-1.71.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:32f3a67b10728835b5ffb63fbdbe696d00e19a27561b9cf5153e72dbb93021ba", size = 2913729, upload-time = "2025-06-28T04:20:49.641Z" }, + { url = "https://files.pythonhosted.org/packages/09/ae/ddb264b4a10c6c10336a7c177f8738b230c2c473d0c91dd5d8ce8ea1b857/grpcio_tools-1.71.2-cp310-cp310-win32.whl", hash = "sha256:7fcf9d92c710bfc93a1c0115f25e7d49a65032ff662b38b2f704668ce0a938df", size = 945997, upload-time = "2025-06-28T04:20:50.9Z" }, + { url = "https://files.pythonhosted.org/packages/ad/8d/5efd93698fe359f63719d934ebb2d9337e82d396e13d6bf00f4b06793e37/grpcio_tools-1.71.2-cp310-cp310-win_amd64.whl", hash = "sha256:914b4275be810290266e62349f2d020bb7cc6ecf9edb81da3c5cddb61a95721b", size = 1117474, upload-time = "2025-06-28T04:20:52.54Z" }, + { url = "https://files.pythonhosted.org/packages/17/e4/0568d38b8da6237ea8ea15abb960fb7ab83eb7bb51e0ea5926dab3d865b1/grpcio_tools-1.71.2-cp311-cp311-linux_armv7l.whl", hash = "sha256:0acb8151ea866be5b35233877fbee6445c36644c0aa77e230c9d1b46bf34b18b", size = 2385557, upload-time = "2025-06-28T04:20:54.323Z" }, + { url = "https://files.pythonhosted.org/packages/76/fb/700d46f72b0f636cf0e625f3c18a4f74543ff127471377e49a071f64f1e7/grpcio_tools-1.71.2-cp311-cp311-macosx_10_14_universal2.whl", hash = "sha256:b28f8606f4123edb4e6da281547465d6e449e89f0c943c376d1732dc65e6d8b3", size = 5447590, upload-time = "2025-06-28T04:20:55.836Z" }, + { url = "https://files.pythonhosted.org/packages/12/69/d9bb2aec3de305162b23c5c884b9f79b1a195d42b1e6dabcc084cc9d0804/grpcio_tools-1.71.2-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:cbae6f849ad2d1f5e26cd55448b9828e678cb947fa32c8729d01998238266a6a", size = 2348495, upload-time = "2025-06-28T04:20:57.33Z" }, + { url = "https://files.pythonhosted.org/packages/d5/83/f840aba1690461b65330efbca96170893ee02fae66651bcc75f28b33a46c/grpcio_tools-1.71.2-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e4d1027615cfb1e9b1f31f2f384251c847d68c2f3e025697e5f5c72e26ed1316", size = 2742333, upload-time = "2025-06-28T04:20:59.051Z" }, + { url = "https://files.pythonhosted.org/packages/30/34/c02cd9b37de26045190ba665ee6ab8597d47f033d098968f812d253bbf8c/grpcio_tools-1.71.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9bac95662dc69338edb9eb727cc3dd92342131b84b12b3e8ec6abe973d4cbf1b", size = 2473490, upload-time = "2025-06-28T04:21:00.614Z" }, + { url = "https://files.pythonhosted.org/packages/4d/c7/375718ae091c8f5776828ce97bdcb014ca26244296f8b7f70af1a803ed2f/grpcio_tools-1.71.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:c50250c7248055040f89eb29ecad39d3a260a4b6d3696af1575945f7a8d5dcdc", size = 2850333, upload-time = "2025-06-28T04:21:01.95Z" }, + { url = "https://files.pythonhosted.org/packages/19/37/efc69345bd92a73b2bc80f4f9e53d42dfdc234b2491ae58c87da20ca0ea5/grpcio_tools-1.71.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:6ab1ad955e69027ef12ace4d700c5fc36341bdc2f420e87881e9d6d02af3d7b8", size = 3300748, upload-time = "2025-06-28T04:21:03.451Z" }, + { url = "https://files.pythonhosted.org/packages/d2/1f/15f787eb25ae42086f55ed3e4260e85f385921c788debf0f7583b34446e3/grpcio_tools-1.71.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:dd75dde575781262b6b96cc6d0b2ac6002b2f50882bf5e06713f1bf364ee6e09", size = 2913178, upload-time = "2025-06-28T04:21:04.879Z" }, + { url = "https://files.pythonhosted.org/packages/12/aa/69cb3a9dff7d143a05e4021c3c9b5cde07aacb8eb1c892b7c5b9fb4973e3/grpcio_tools-1.71.2-cp311-cp311-win32.whl", hash = "sha256:9a3cb244d2bfe0d187f858c5408d17cb0e76ca60ec9a274c8fd94cc81457c7fc", size = 946256, upload-time = "2025-06-28T04:21:06.518Z" }, + { url = "https://files.pythonhosted.org/packages/1e/df/fb951c5c87eadb507a832243942e56e67d50d7667b0e5324616ffd51b845/grpcio_tools-1.71.2-cp311-cp311-win_amd64.whl", hash = "sha256:00eb909997fd359a39b789342b476cbe291f4dd9c01ae9887a474f35972a257e", size = 1117661, upload-time = "2025-06-28T04:21:08.18Z" }, + { url = "https://files.pythonhosted.org/packages/9c/d3/3ed30a9c5b2424627b4b8411e2cd6a1a3f997d3812dbc6a8630a78bcfe26/grpcio_tools-1.71.2-cp312-cp312-linux_armv7l.whl", hash = "sha256:bfc0b5d289e383bc7d317f0e64c9dfb59dc4bef078ecd23afa1a816358fb1473", size = 2385479, upload-time = "2025-06-28T04:21:10.413Z" }, + { url = "https://files.pythonhosted.org/packages/54/61/e0b7295456c7e21ef777eae60403c06835160c8d0e1e58ebfc7d024c51d3/grpcio_tools-1.71.2-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:b4669827716355fa913b1376b1b985855d5cfdb63443f8d18faf210180199006", size = 5431521, upload-time = "2025-06-28T04:21:12.261Z" }, + { url = "https://files.pythonhosted.org/packages/75/d7/7bcad6bcc5f5b7fab53e6bce5db87041f38ef3e740b1ec2d8c49534fa286/grpcio_tools-1.71.2-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:d4071f9b44564e3f75cdf0f05b10b3e8c7ea0ca5220acbf4dc50b148552eef2f", size = 2350289, upload-time = "2025-06-28T04:21:13.625Z" }, + { url = "https://files.pythonhosted.org/packages/b2/8a/e4c1c4cb8c9ff7f50b7b2bba94abe8d1e98ea05f52a5db476e7f1c1a3c70/grpcio_tools-1.71.2-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a28eda8137d587eb30081384c256f5e5de7feda34776f89848b846da64e4be35", size = 2743321, upload-time = "2025-06-28T04:21:15.007Z" }, + { url = "https://files.pythonhosted.org/packages/fd/aa/95bc77fda5c2d56fb4a318c1b22bdba8914d5d84602525c99047114de531/grpcio_tools-1.71.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b19c083198f5eb15cc69c0a2f2c415540cbc636bfe76cea268e5894f34023b40", size = 2474005, upload-time = "2025-06-28T04:21:16.443Z" }, + { url = "https://files.pythonhosted.org/packages/c9/ff/ca11f930fe1daa799ee0ce1ac9630d58a3a3deed3dd2f465edb9a32f299d/grpcio_tools-1.71.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:784c284acda0d925052be19053d35afbf78300f4d025836d424cf632404f676a", size = 2851559, upload-time = "2025-06-28T04:21:18.139Z" }, + { url = "https://files.pythonhosted.org/packages/64/10/c6fc97914c7e19c9bb061722e55052fa3f575165da9f6510e2038d6e8643/grpcio_tools-1.71.2-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:381e684d29a5d052194e095546eef067201f5af30fd99b07b5d94766f44bf1ae", size = 3300622, upload-time = "2025-06-28T04:21:20.291Z" }, + { url = "https://files.pythonhosted.org/packages/e5/d6/965f36cfc367c276799b730d5dd1311b90a54a33726e561393b808339b04/grpcio_tools-1.71.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:3e4b4801fabd0427fc61d50d09588a01b1cfab0ec5e8a5f5d515fbdd0891fd11", size = 2913863, upload-time = "2025-06-28T04:21:22.196Z" }, + { url = "https://files.pythonhosted.org/packages/8d/f0/c05d5c3d0c1d79ac87df964e9d36f1e3a77b60d948af65bec35d3e5c75a3/grpcio_tools-1.71.2-cp312-cp312-win32.whl", hash = "sha256:84ad86332c44572305138eafa4cc30040c9a5e81826993eae8227863b700b490", size = 945744, upload-time = "2025-06-28T04:21:23.463Z" }, + { url = "https://files.pythonhosted.org/packages/e2/e9/c84c1078f0b7af7d8a40f5214a9bdd8d2a567ad6c09975e6e2613a08d29d/grpcio_tools-1.71.2-cp312-cp312-win_amd64.whl", hash = "sha256:8e1108d37eecc73b1c4a27350a6ed921b5dda25091700c1da17cfe30761cd462", size = 1117695, upload-time = "2025-06-28T04:21:25.22Z" }, + { url = "https://files.pythonhosted.org/packages/60/9c/bdf9c5055a1ad0a09123402d73ecad3629f75b9cf97828d547173b328891/grpcio_tools-1.71.2-cp313-cp313-linux_armv7l.whl", hash = "sha256:b0f0a8611614949c906e25c225e3360551b488d10a366c96d89856bcef09f729", size = 2384758, upload-time = "2025-06-28T04:21:26.712Z" }, + { url = "https://files.pythonhosted.org/packages/49/d0/6aaee4940a8fb8269c13719f56d69c8d39569bee272924086aef81616d4a/grpcio_tools-1.71.2-cp313-cp313-macosx_10_14_universal2.whl", hash = "sha256:7931783ea7ac42ac57f94c5047d00a504f72fbd96118bf7df911bb0e0435fc0f", size = 5443127, upload-time = "2025-06-28T04:21:28.383Z" }, + { url = "https://files.pythonhosted.org/packages/d9/11/50a471dcf301b89c0ed5ab92c533baced5bd8f796abfd133bbfadf6b60e5/grpcio_tools-1.71.2-cp313-cp313-manylinux_2_17_aarch64.whl", hash = "sha256:d188dc28e069aa96bb48cb11b1338e47ebdf2e2306afa58a8162cc210172d7a8", size = 2349627, upload-time = "2025-06-28T04:21:30.254Z" }, + { url = "https://files.pythonhosted.org/packages/bb/66/e3dc58362a9c4c2fbe98a7ceb7e252385777ebb2bbc7f42d5ab138d07ace/grpcio_tools-1.71.2-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f36c4b3cc42ad6ef67430639174aaf4a862d236c03c4552c4521501422bfaa26", size = 2742932, upload-time = "2025-06-28T04:21:32.325Z" }, + { url = "https://files.pythonhosted.org/packages/b7/1e/1e07a07ed8651a2aa9f56095411198385a04a628beba796f36d98a5a03ec/grpcio_tools-1.71.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4bd9ed12ce93b310f0cef304176049d0bc3b9f825e9c8c6a23e35867fed6affd", size = 2473627, upload-time = "2025-06-28T04:21:33.752Z" }, + { url = "https://files.pythonhosted.org/packages/d3/f9/3b7b32e4acb419f3a0b4d381bc114fe6cd48e3b778e81273fc9e4748caad/grpcio_tools-1.71.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:7ce27e76dd61011182d39abca38bae55d8a277e9b7fe30f6d5466255baccb579", size = 2850879, upload-time = "2025-06-28T04:21:35.241Z" }, + { url = "https://files.pythonhosted.org/packages/1e/99/cd9e1acd84315ce05ad1fcdfabf73b7df43807cf00c3b781db372d92b899/grpcio_tools-1.71.2-cp313-cp313-musllinux_1_1_i686.whl", hash = "sha256:dcc17bf59b85c3676818f2219deacac0156492f32ca165e048427d2d3e6e1157", size = 3300216, upload-time = "2025-06-28T04:21:36.826Z" }, + { url = "https://files.pythonhosted.org/packages/9f/c0/66eab57b14550c5b22404dbf60635c9e33efa003bd747211981a9859b94b/grpcio_tools-1.71.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:706360c71bdd722682927a1fb517c276ccb816f1e30cb71f33553e5817dc4031", size = 2913521, upload-time = "2025-06-28T04:21:38.347Z" }, + { url = "https://files.pythonhosted.org/packages/05/9b/7c90af8f937d77005625d705ab1160bc42a7e7b021ee5c788192763bccd6/grpcio_tools-1.71.2-cp313-cp313-win32.whl", hash = "sha256:bcf751d5a81c918c26adb2d6abcef71035c77d6eb9dd16afaf176ee096e22c1d", size = 945322, upload-time = "2025-06-28T04:21:39.864Z" }, + { url = "https://files.pythonhosted.org/packages/5f/80/6db6247f767c94fe551761772f89ceea355ff295fd4574cb8efc8b2d1199/grpcio_tools-1.71.2-cp313-cp313-win_amd64.whl", hash = "sha256:b1581a1133552aba96a730178bc44f6f1a071f0eb81c5b6bc4c0f89f5314e2b8", size = 1117234, upload-time = "2025-06-28T04:21:41.893Z" }, +] + [[package]] name = "h11" version = "0.14.0" @@ -716,6 +769,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9a/e2/10e9819cf4a20bd8ea2f5dabafc2e6bf4a78d6a0965daeb60a4b34d1c11f/rich-13.9.3-py3-none-any.whl", hash = "sha256:9836f5096eb2172c9e77df411c1b009bace4193d6a481d534fea75ebba758283", size = 242157, upload-time = "2024-10-22T15:36:06.098Z" }, ] +[[package]] +name = "setuptools" +version = "82.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/4f/db/cfac1baf10650ab4d1c111714410d2fbb77ac5a616db26775db562c8fab2/setuptools-82.0.1.tar.gz", hash = "sha256:7d872682c5d01cfde07da7bccc7b65469d3dca203318515ada1de5eda35efbf9", size = 1152316, upload-time = "2026-03-09T12:47:17.221Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9d/76/f789f7a86709c6b087c5a2f52f911838cad707cc613162401badc665acfe/setuptools-82.0.1-py3-none-any.whl", hash = "sha256:a59e362652f08dcd477c78bb6e7bd9d80a7995bc73ce773050228a348ce2e5bb", size = 1006223, upload-time = "2026-03-09T12:47:15.026Z" }, +] + [[package]] name = "sniffio" version = "1.3.1" @@ -763,6 +825,7 @@ dependencies = [ dev = [ { name = "bandit" }, { name = "black" }, + { name = "grpcio-tools" }, { name = "pytest" }, ] @@ -780,6 +843,7 @@ requires-dist = [ dev = [ { name = "bandit" }, { name = "black", specifier = ">=24.3,<25.0" }, + { name = "grpcio-tools", specifier = ">=1.71.2" }, { name = "pytest" }, ] diff --git a/vod_db.go b/vod_db.go new file mode 100644 index 00000000..349ba58c --- /dev/null +++ b/vod_db.go @@ -0,0 +1,119 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "time" + + log "github.com/sirupsen/logrus" + _ "modernc.org/sqlite" +) + +// VodDB tracks VOD download state in a SQLite database. +type VodDB struct { + db *sql.DB +} + +// InitVodDB opens or creates a SQLite database at the given path. +func InitVodDB(dbPath string) (*VodDB, error) { + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + return nil, err + } + + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS downloaded_vods ( + vod_id TEXT PRIMARY KEY, + user TEXT NOT NULL, + site TEXT NOT NULL, + title TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'downloading', + started_at TEXT NOT NULL, + completed_at TEXT + )`) + if err != nil { + db.Close() + return nil, err + } + + return &VodDB{db: db}, nil +} + +// Close closes the database connection. +func (v *VodDB) Close() error { + return v.db.Close() +} + +// ClaimVOD atomically checks whether a VOD should be downloaded and marks it as +// in-progress in a single operation. Returns true if the claim was successful +// (VOD is new, failed, or stale). Returns false if the VOD is completed or +// already being downloaded by another goroutine. +func (v *VodDB) ClaimVOD(vodID, user, site, title string, staleThreshold time.Duration) (bool, error) { + now := time.Now().UTC().Format(time.RFC3339) + staleCutoff := time.Now().Add(-staleThreshold).UTC().Format(time.RFC3339) + + res, err := v.db.Exec( + `INSERT INTO downloaded_vods (vod_id, user, site, title, status, started_at) + VALUES (?, ?, ?, ?, 'downloading', ?) + ON CONFLICT(vod_id) DO UPDATE SET status='downloading', started_at=?, completed_at=NULL + WHERE downloaded_vods.status = 'failed' + OR (downloaded_vods.status = 'downloading' AND downloaded_vods.started_at <= ?)`, + vodID, user, site, title, now, now, staleCutoff, + ) + if err != nil { + log.Errorf("Failed to claim VOD %s: %v", vodID, err) + return false, err + } + + rows, err := res.RowsAffected() + if err != nil { + return false, err + } + return rows > 0, nil +} + +// MarkVODCompleted marks a VOD as successfully downloaded. +func (v *VodDB) MarkVODCompleted(vodID string) error { + now := time.Now().UTC().Format(time.RFC3339) + res, err := v.db.Exec( + "UPDATE downloaded_vods SET status='completed', completed_at=? WHERE vod_id=?", + now, vodID, + ) + if err != nil { + log.Errorf("Failed to mark VOD %s as completed: %v", vodID, err) + return err + } + rows, err := res.RowsAffected() + if err != nil { + return err + } + if rows == 0 { + return fmt.Errorf("VOD %s not found in database", vodID) + } + return nil +} + +// MarkVODFailed marks a VOD download as failed so it will be retried. +func (v *VodDB) MarkVODFailed(vodID string) error { + res, err := v.db.Exec( + "UPDATE downloaded_vods SET status='failed' WHERE vod_id=?", + vodID, + ) + if err != nil { + log.Errorf("Failed to mark VOD %s as failed: %v", vodID, err) + return err + } + rows, err := res.RowsAffected() + if err != nil { + return err + } + if rows == 0 { + return fmt.Errorf("VOD %s not found in database", vodID) + } + return nil +} diff --git a/vod_db_test.go b/vod_db_test.go new file mode 100644 index 00000000..6f1ca875 --- /dev/null +++ b/vod_db_test.go @@ -0,0 +1,173 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestVodDB_InitAndClose(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := InitVodDB(dbPath) + if err != nil { + t.Fatalf("Failed to init DB: %v", err) + } + defer db.Close() + + // Verify the file was created + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + t.Error("Database file was not created") + } +} + +func TestVodDB_FullLifecycle(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := InitVodDB(dbPath) + if err != nil { + t.Fatalf("Failed to init DB: %v", err) + } + defer db.Close() + + staleThreshold := 10 * time.Minute + + // Should claim successfully (new VOD) + claimed, err := db.ClaimVOD("12345", "testuser", "twitch.tv", "Test Stream Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if !claimed { + t.Error("VOD not in DB should be claimable") + } + + // Second claim should fail (already in progress, not stale) + claimed, err = db.ClaimVOD("12345", "testuser", "twitch.tv", "Test Stream Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if claimed { + t.Error("Recently started VOD should not be re-claimable") + } + + // Mark as completed + err = db.MarkVODCompleted("12345") + if err != nil { + t.Fatalf("MarkVODCompleted failed: %v", err) + } + + // Completed VOD should NOT be claimable + claimed, err = db.ClaimVOD("12345", "testuser", "twitch.tv", "Test Stream Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if claimed { + t.Error("Completed VOD should not be claimable") + } +} + +func TestVodDB_FailedVODIsRetried(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := InitVodDB(dbPath) + if err != nil { + t.Fatalf("Failed to init DB: %v", err) + } + defer db.Close() + + staleThreshold := 10 * time.Minute + + claimed, err := db.ClaimVOD("12345", "testuser", "twitch.tv", "Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if !claimed { + t.Fatal("Initial claim should succeed") + } + + err = db.MarkVODFailed("12345") + if err != nil { + t.Fatalf("MarkVODFailed failed: %v", err) + } + + claimed, err = db.ClaimVOD("12345", "testuser", "twitch.tv", "Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if !claimed { + t.Error("Failed VOD should be re-claimable") + } +} + +func TestVodDB_StaleDownloadIsRetried(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := InitVodDB(dbPath) + if err != nil { + t.Fatalf("Failed to init DB: %v", err) + } + defer db.Close() + + staleThreshold := 10 * time.Minute + + claimed, err := db.ClaimVOD("12345", "testuser", "twitch.tv", "Title", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if !claimed { + t.Fatal("Initial claim should succeed") + } + + // With a zero threshold, the download is immediately considered stale + claimed, err = db.ClaimVOD("12345", "testuser", "twitch.tv", "Title", 0) + if err != nil { + t.Fatalf("ClaimVOD failed: %v", err) + } + if !claimed { + t.Error("Stale downloading VOD should be re-claimable") + } +} + +func TestVodDB_DifferentVODsAreIndependent(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := InitVodDB(dbPath) + if err != nil { + t.Fatalf("Failed to init DB: %v", err) + } + defer db.Close() + + staleThreshold := 10 * time.Minute + + claimed, err := db.ClaimVOD("111", "user1", "twitch.tv", "Title A", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD 111 failed: %v", err) + } + if !claimed { + t.Fatal("Claim 111 should succeed") + } + if err := db.MarkVODCompleted("111"); err != nil { + t.Fatalf("MarkVODCompleted 111 failed: %v", err) + } + + claimed, err = db.ClaimVOD("222", "user2", "twitch.tv", "Title B", staleThreshold) + if err != nil { + t.Fatalf("ClaimVOD 222 failed: %v", err) + } + if !claimed { + t.Fatal("Claim 222 should succeed") + } + if err := db.MarkVODCompleted("222"); err != nil { + t.Fatalf("MarkVODCompleted 222 failed: %v", err) + } + + d1, _ := db.ClaimVOD("111", "user1", "twitch.tv", "Title A", staleThreshold) + d2, _ := db.ClaimVOD("222", "user2", "twitch.tv", "Title B", staleThreshold) + d3, _ := db.ClaimVOD("333", "user3", "twitch.tv", "Title C", staleThreshold) + + if d1 { + t.Error("VOD 111 is completed, should not be claimable") + } + if d2 { + t.Error("VOD 222 is completed, should not be claimable") + } + if !d3 { + t.Error("VOD 333 is not in DB, should be claimable") + } +}