Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
}()
logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
n, err := io.Copy(w, reader)
n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n)
if err != nil {
logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err)
Expand All @@ -240,7 +240,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
if openErr == nil && reader != nil {
logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
n, err := io.Copy(w, reader)
n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n)
if err != nil {
logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err)
Expand Down Expand Up @@ -279,23 +279,42 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
}
defer reader.Close()

if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL, repoName); err != nil {
if err := s.serveSnapshotWithBundle(ctx, w, r, reader, headers, repo, upstreamURL, repoName); err != nil {
logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err)
}
}

func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header) error {
func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error {
for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
}
}
if _, err := io.Copy(w, reader); err != nil {
if _, err := serveReaderFast(w, r, reader); err != nil {
return errors.Wrap(err, "streaming artifact")
}
return nil
}

// serveReaderFast serves the content from reader using the most efficient method
// available. When reader is an *os.File, it uses http.ServeContent which enables
// sendfile(2) zero-copy I/O and automatic Content-Length/Range support. For other
// reader types it falls back to io.Copy. Returns bytes served for metrics.
func serveReaderFast(w http.ResponseWriter, r *http.Request, reader io.Reader) (int64, error) {
if f, ok := reader.(*os.File); ok {
info, err := f.Stat()
if err != nil {
return 0, errors.Wrap(err, "stat file for serving")
}
// http.ServeContent handles Content-Length, Range requests, and uses
// sendfile(2) for zero-copy transfer from file to socket.
http.ServeContent(w, r, "", time.Time{}, f)
return info.Size(), nil
}
n, err := io.Copy(w, reader)
return n, errors.Wrap(err, "copy to response")
}

func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
ctx := r.Context()
logger := logging.FromContext(ctx)
Expand Down Expand Up @@ -351,7 +370,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
}
}

func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string) error {
func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string) error {
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
mirrorHead := s.getMirrorHead(ctx, repo)

Expand Down Expand Up @@ -384,7 +403,7 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
}

w.Header().Set("Content-Type", "application/zstd")
n, err := io.Copy(w, reader)
n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "cache", repoName, n)
return errors.Wrap(err, "stream snapshot")
}
Expand Down Expand Up @@ -782,7 +801,7 @@ func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Reque
if reader != nil {
defer reader.Close()
logger.DebugContext(ctx, "Serving cached LFS snapshot", "upstream", upstreamURL)
if err := s.streamSnapshotArtifact(ctx, w, reader, headers); err != nil {
if err := s.streamSnapshotArtifact(ctx, w, r, reader, headers); err != nil {
logger.ErrorContext(ctx, "Failed to stream LFS snapshot", "upstream", upstreamURL, "error", err)
}
return
Expand Down