Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 94 additions & 13 deletions gitindex/catfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"

Expand Down Expand Up @@ -60,10 +62,16 @@ type catfileReader struct {
cmd *exec.Cmd
reader *bufio.Reader
writeErr <-chan error
stderr <-chan catfileStderrResult

procOnce sync.Once
procMu sync.Mutex
procErr error

// pending tracks unread content bytes + trailing LF for the current
// entry. Next() discards any pending bytes before reading the next header.
pending int
eof bool

closeOnce sync.Once
closeErr error
Expand Down Expand Up @@ -92,14 +100,36 @@ func newCatfileReader(repoDir string, ids []plumbing.Hash, opts catfileReaderOpt
return nil, fmt.Errorf("stdout pipe: %w", err)
}

stderr, err := cmd.StderrPipe()
if err != nil {
stdin.Close()
stdout.Close()
return nil, fmt.Errorf("stderr pipe: %w", err)
}

if err := cmd.Start(); err != nil {
stdin.Close()
stdout.Close()
stderr.Close()
return nil, fmt.Errorf("start git cat-file: %w", err)
}

// Writer goroutine: feed all SHAs then close stdin to trigger flush.
writeErr := make(chan error, 1)
stderrDone := make(chan catfileStderrResult, 1)

cr := &catfileReader{
cmd: cmd,
reader: bufio.NewReaderSize(stdout, 512*1024),
writeErr: writeErr,
stderr: stderrDone,
}
go func() {
stderrBytes, stderrReadErr := io.ReadAll(stderr)
stderrDone <- catfileStderrResult{data: stderrBytes, err: stderrReadErr}
close(stderrDone)
}()

// Writer goroutine: feed all SHAs then close stdin to trigger flush.
go func() {
defer close(writeErr)
defer stdin.Close()
Expand All @@ -116,11 +146,7 @@ func newCatfileReader(repoDir string, ids []plumbing.Hash, opts catfileReaderOpt
writeErr <- bw.Flush()
}()

return &catfileReader{
cmd: cmd,
reader: bufio.NewReaderSize(stdout, 512*1024),
writeErr: writeErr,
}, nil
return cr, nil
}

// Next advances to the next blob entry. It returns the blob's size and whether
Expand All @@ -131,23 +157,30 @@ func newCatfileReader(repoDir string, ids []plumbing.Hash, opts catfileReaderOpt
// After Next returns successfully with missing=false and excluded=false, call
// Read to consume the blob content, or call Next again to skip it.
func (cr *catfileReader) Next() (size int, missing bool, excluded bool, err error) {
if cr.eof {
return 0, false, false, io.EOF
}

// Discard unread content from the previous entry.
if cr.pending > 0 {
if _, err := cr.reader.Discard(cr.pending); err != nil {
return 0, false, false, fmt.Errorf("discard pending bytes: %w", err)
return 0, false, false, cr.wrapReadError("discard pending bytes", err)
}
cr.pending = 0
}

headerBytes, err := cr.reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
if procErr := cr.waitProcess(); procErr != nil {
return 0, false, false, procErr
}
cr.eof = true
return 0, false, false, io.EOF
}
return 0, false, false, fmt.Errorf("read header: %w", err)
}
header := headerBytes[:len(headerBytes)-1] // trim \n

if bytes.HasSuffix(header, []byte(" missing")) {
return 0, true, false, nil
}
Expand Down Expand Up @@ -184,7 +217,7 @@ func (cr *catfileReader) Read(p []byte) (int, error) {
if contentRemaining <= 0 {
// Only the trailing LF remains; consume it and signal EOF.
if _, err := cr.reader.ReadByte(); err != nil {
return 0, fmt.Errorf("read trailing LF: %w", err)
return 0, cr.wrapReadError("read trailing LF", err)
}
cr.pending = 0
return 0, io.EOF
Expand All @@ -197,13 +230,13 @@ func (cr *catfileReader) Read(p []byte) (int, error) {
n, err := cr.reader.Read(p)
cr.pending -= n
if err != nil {
return n, err
return n, cr.wrapReadError("read blob content", err)
}

// If we've consumed all content bytes, also consume the trailing LF.
if cr.pending == 1 {
if _, err := cr.reader.ReadByte(); err != nil {
return n, fmt.Errorf("read trailing LF: %w", err)
return n, cr.wrapReadError("read trailing LF", err)
}
cr.pending = 0
}
Expand All @@ -223,7 +256,7 @@ func (cr *catfileReader) Close() error {
_, _ = io.Copy(io.Discard, cr.reader)
// Wait for writer goroutine (unblocks via broken pipe from Kill).
<-cr.writeErr
err := cr.cmd.Wait()
err := cr.waitProcess()
// Suppress the expected "signal: killed" error from our own Kill().
if isKilledErr(err) {
err = nil
Expand All @@ -233,9 +266,57 @@ func (cr *catfileReader) Close() error {
return cr.closeErr
}

func (cr *catfileReader) waitProcess() error {
cr.procOnce.Do(func() {
stderr := <-cr.stderr
waitErr := cr.cmd.Wait()

cr.procMu.Lock()
cr.procErr = formatCatfileProcessErr(waitErr, stderr.data, stderr.err)
cr.procMu.Unlock()
})

cr.procMu.Lock()
defer cr.procMu.Unlock()

return cr.procErr
}

func (cr *catfileReader) wrapReadError(context string, err error) error {
if err == io.EOF {
if procErr := cr.waitProcess(); procErr != nil {
return procErr
}
}

return fmt.Errorf("%s: %w", context, err)
}

func formatCatfileProcessErr(waitErr error, stderr []byte, stderrReadErr error) error {
if waitErr == nil {
if stderrReadErr != nil {
return fmt.Errorf("read git cat-file stderr: %w", stderrReadErr)
}
return nil
}

stderrText := strings.TrimSpace(string(stderr))
if stderrText == "" {
return fmt.Errorf("git cat-file exited unsuccessfully: %w", waitErr)
}

return fmt.Errorf("git cat-file exited unsuccessfully: %w: %s", waitErr, stderrText)
}

type catfileStderrResult struct {
data []byte
err error
}

// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL.
func isKilledErr(err error) bool {
exitErr, ok := err.(*exec.ExitError)
var exitErr *exec.ExitError
ok := errors.As(err, &exitErr)
if !ok {
return false
}
Expand Down
63 changes: 47 additions & 16 deletions gitindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,24 +655,21 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
}

if err := indexCatfileBlobs(cr, mainRepoKeys, repos, opts, builder); err != nil {
return false, err
var readErr *catfileReadError
if errors.As(err, &readErr) && readErr.processed == 0 {
log.Printf("git cat-file failed before yielding any blobs, falling back to go-git for %d blobs: %v", len(mainRepoKeys), err)
if err := indexGoGitBlobs(mainRepoKeys, repos, opts.BuildOptions, builder); err != nil {
return false, err
}
} else {
return false, err
}
}
}

// Index submodule blobs via go-git.
for idx, key := range submoduleKeys {
doc, err := createDocument(key, repos, opts.BuildOptions)
if err != nil {
return false, err
}

if err := builder.Add(doc); err != nil {
return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err)
}

if idx%10_000 == 0 {
builder.CheckMemoryUsage()
}
if err := indexGoGitBlobs(submoduleKeys, repos, opts.BuildOptions, builder); err != nil {
return false, err
}

return true, builder.Finish()
Expand All @@ -684,11 +681,12 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
// The reader is always closed when this function returns.
func indexCatfileBlobs(cr *catfileReader, keys []fileKey, repos map[fileKey]BlobLocation, opts Options, builder *index.Builder) error {
defer cr.Close()
processed := 0

for idx, key := range keys {
size, missing, excluded, err := cr.Next()
if err != nil {
return fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err)
return &catfileReadError{processed: processed, err: fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err)}
}

branches := repos[key].Branches
Expand All @@ -712,7 +710,7 @@ func indexCatfileBlobs(cr *catfileReader, keys []fileKey, repos map[fileKey]Blob
// avoids the intermediate allocation and the size is known.
content := make([]byte, size)
if _, err := io.ReadFull(cr, content); err != nil {
return fmt.Errorf("read blob %s: %w", keyFullPath, err)
return &catfileReadError{processed: processed, err: fmt.Errorf("read blob %s: %w", keyFullPath, err)}
}
doc = index.Document{
SubRepositoryPath: key.SubRepoPath,
Expand All @@ -723,6 +721,39 @@ func indexCatfileBlobs(cr *catfileReader, keys []fileKey, repos map[fileKey]Blob
}
}

if err := builder.Add(doc); err != nil {
return fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err)
}
processed++

if idx%10_000 == 0 {
builder.CheckMemoryUsage()
}
}

return nil
}

type catfileReadError struct {
processed int
err error
}

func (e *catfileReadError) Error() string {
return e.err.Error()
}

func (e *catfileReadError) Unwrap() error {
return e.err
}

func indexGoGitBlobs(keys []fileKey, repos map[fileKey]BlobLocation, opts index.Options, builder *index.Builder) error {
for idx, key := range keys {
doc, err := createDocument(key, repos, opts)
if err != nil {
return err
}

if err := builder.Add(doc); err != nil {
return fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err)
}
Expand Down
Loading
Loading