Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions cmd/runed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -75,6 +76,12 @@ func run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// evicted is set by the self-eviction watchdog when our socket is taken
// over mid-boot. It lets the boot-failure paths below exit 0 (so the
// supervisor stays down) instead of reporting the cancelled bootstrap as
// a crash that would be restarted into the same stolen path.
var evicted atomic.Bool

paths, err := bootstrap.Resolve()
if err != nil {
return fmt.Errorf("resolve paths: %w", err)
Expand Down Expand Up @@ -138,6 +145,13 @@ func run() error {
close(serveErr)
}()

// Self-eviction watchdog: if our socket file is unlinked or rebound by
// another daemon (typically after $RUNED_HOME is recreated out from under
// us), we can no longer accept new connections even though the process is
// otherwise healthy. Detect that and shut down via the normal graceful
// path so we don't linger as an orphan holding a dead socket.
go watchSocketOwnership(ctx, cancel, &evicted, logger, lis, sockPath, srv)

// Forward bootstrap progress into Health via stage→Phase mapping.
reporter := bootstrap.StatusReporter(func(stage string, done, total int64) {
phase, msg := stagePhase(stage)
Expand All @@ -154,6 +168,9 @@ func run() error {
bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "", reporter)
if err != nil {
bailBoot(logger, srv, gs, nil)
if evicted.Load() {
return nil
}
return err
}
if llamaBin == "" {
Expand All @@ -172,6 +189,9 @@ func run() error {
modelID, err := sha256File(model)
if err != nil {
bailBoot(logger, srv, gs, nil)
if evicted.Load() {
return nil
}
return fmt.Errorf("model hash: %w", err)
}
logger.Info("model identity", "sha256", modelID, "path", model)
Expand All @@ -192,6 +212,9 @@ func run() error {
// b.Start may have spawned a child that failed health-probe;
// bailBoot's b.Stop reaps it.
bailBoot(logger, srv, gs, b)
if evicted.Load() {
return nil
}
return fmt.Errorf("backend start: %w", err)
}
logger.Info("llama-server ready", "port", b.Port())
Expand All @@ -202,6 +225,9 @@ func run() error {
idleTimeout, err := parseIdleTimeout()
if err != nil {
bailBoot(logger, srv, gs, b)
if evicted.Load() {
return nil
}
return fmt.Errorf("RUNED_IDLE_TIMEOUT: %w", err)
}
if idleTimeout > 0 {
Expand Down Expand Up @@ -414,6 +440,55 @@ func anotherDaemonReachable(ctx context.Context, sockPath string) bool {
return true
}

// socketOwnershipPoll is how often the self-eviction watchdog checks that
// runed's socket file still belongs to it.
const socketOwnershipPoll = 5 * time.Second

// watchSocketOwnership shuts the daemon down (via the normal graceful path)
// if its socket file is removed or replaced by another daemon's socket. This
// reaps the orphan that would otherwise result when $RUNED_HOME is wiped or
// rebound out from under a running daemon: the socket path stops pointing at
// us, so we no longer receive connections while staying alive and holding a
// dead socket. Triggering a graceful shutdown (which exits 0) lets the
// supervisor stay down rather than restart us into the same stolen path.
//
// On detection it records the eviction, triggers the normal graceful-shutdown
// path via TriggerShutdown, and cancels the daemon context so an in-flight
// self-bootstrap / backend start aborts promptly instead of running to
// completion first; the eviction flag lets run()'s boot-failure paths exit 0
// so the supervisor stays down. It also returns on any other shutdown cause
// (signal, Shutdown RPC, ctx cancel), and re-checks ShutdownCh before logging
// so a concurrent shutdown is not misreported as an eviction.
func watchSocketOwnership(ctx context.Context, cancel context.CancelFunc, evicted *atomic.Bool, logger *slog.Logger, lis ipc.Listener, sockPath string, srv *server.Server) {
ticker := time.NewTicker(socketOwnershipPoll)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-srv.ShutdownCh():
return
case <-ticker.C:
if lis.StillOwned() {
continue
}
// A concurrent shutdown may have already closed (and unlinked) our
// own socket; don't mistake that for an eviction.
select {
case <-srv.ShutdownCh():
return
default:
}
logger.Warn("socket no longer owned by this daemon; self-evicting",
"socket", sockPath)
evicted.Store(true)
srv.TriggerShutdown()
cancel()
return
}
}
}

// sha256File returns a short, prefixed SHA-256 identifier of the file at path.
// The 16-char hex truncation keeps Info.model_identity compact while retaining
// enough entropy to distinguish GGUF revisions in practice (Plan A ships a
Expand Down
17 changes: 17 additions & 0 deletions internal/ipc/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ipc

import "net"

// Listener is the net.Listener returned by Listen. Beyond accepting
// connections it can report whether the on-disk socket file still refers to
// the socket it bound. runed uses StillOwned to self-evict when its socket
// path is unlinked or taken over by another daemon — which happens when
// $RUNED_HOME is recreated out from under a running daemon, leaving the old
// process alive but unreachable on a path that now points elsewhere.
type Listener interface {
net.Listener
// StillOwned reports whether the socket file at the bind path still
// refers to this listener's socket. It returns false once the file is
// removed or replaced by a different socket (different device/inode).
StillOwned() bool
}
69 changes: 65 additions & 4 deletions internal/ipc/uds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package ipc

import (
"net"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -57,9 +56,8 @@ func TestListen_CleansUpStaleSocket(t *testing.T) {
}
defer lis.Close()

_, ok := lis.(*net.UnixListener)
if !ok {
t.Fatalf("expected UnixListener")
if !lis.StillOwned() {
t.Fatal("freshly bound socket should be owned")
}
}

Expand All @@ -81,3 +79,66 @@ func TestListen_RejectsLiveSocket(t *testing.T) {
t.Fatalf("want 'already in use' error, got: %v", err)
}
}

func TestStillOwned_FalseAfterRemove(t *testing.T) {
dir := shortTempDir(t)
sockPath := filepath.Join(dir, "embedding.sock")

lis, err := Listen(sockPath)
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer lis.Close()

if !lis.StillOwned() {
t.Fatal("want owned immediately after bind")
}
if err := os.Remove(sockPath); err != nil {
t.Fatalf("remove: %v", err)
}
if lis.StillOwned() {
t.Fatal("want not owned after socket file removed")
}
}

// TestClose_DoesNotRemoveReboundSocket is the Fix-2 regression: an evicted
// listener closing must not delete the socket file a *different* listener
// rebound at the same path (Go's default unlink-on-close would).
func TestClose_DoesNotRemoveReboundSocket(t *testing.T) {
dir := shortTempDir(t)
sockPath := filepath.Join(dir, "embedding.sock")

l1, err := Listen(sockPath)
if err != nil {
t.Fatalf("first Listen: %v", err)
}

// Simulate another daemon rebinding the path after l1's home was wiped:
// remove the path (l1 keeps its fd, so its inode is pinned), then bind a
// fresh socket with a new inode at the same path.
if err := os.Remove(sockPath); err != nil {
t.Fatalf("remove: %v", err)
}
l2, err := Listen(sockPath)
if err != nil {
t.Fatalf("second Listen: %v", err)
}
defer l2.Close()

if l1.StillOwned() {
t.Fatal("l1 should no longer own the rebound path")
}
if !l2.StillOwned() {
t.Fatal("l2 should own the path it just bound")
}

// Closing the evicted l1 must leave l2's socket file intact.
_ = l1.Close()

if _, err := os.Stat(sockPath); err != nil {
t.Fatalf("socket path should still exist after evicted l1.Close(): %v", err)
}
if !l2.StillOwned() {
t.Fatal("l1.Close() deleted l2's socket file — unlink-on-close regression")
}
}
91 changes: 88 additions & 3 deletions internal/ipc/uds_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@ import (
"net"
"os"
"path/filepath"
"sync"
"syscall"
)

// Listen binds a unix domain socket at path with 0700 permissions.
//
// The parent directory is created (0700) if missing. Stale socket files
// (leftover from a crashed daemon) are unlinked and re-bound. If another
// process is actively listening on the same path, Listen returns an error.
func Listen(path string) (net.Listener, error) {
//
// The returned Listener disables Go's default unlink-on-close and instead
// removes the socket file only if it still refers to the socket this call
// bound (see ownedListener.Close). Go's default would unlink whatever file
// occupies the path on Close, even one a *different* daemon rebound there —
// which can delete a healthy daemon's socket after $RUNED_HOME is recreated
// out from under a running daemon. StillOwned exposes the same identity check
// so the daemon can self-evict when its socket is taken over.
func Listen(path string) (Listener, error) {
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return nil, fmt.Errorf("mkdir parent: %w", err)
}
Expand Down Expand Up @@ -46,9 +56,84 @@ func Listen(path string) (net.Listener, error) {
if err != nil {
return nil, fmt.Errorf("listen: %w", err)
}
ul, ok := lis.(*net.UnixListener)
if !ok {
// "unix" always yields *net.UnixListener; defensive. unlinkOnClose is
// still at its default (true) here, so Close cleans up the file.
_ = lis.Close()
return nil, fmt.Errorf("listen: unexpected listener type %T", lis)
}
// Take ownership of unlinking so a different daemon's socket at this path
// is never deleted by our Close (see ownedListener.Close).
ul.SetUnlinkOnClose(false)

if err := os.Chmod(path, 0o700); err != nil {
lis.Close()
_ = ul.Close()
_ = os.Remove(path)
return nil, fmt.Errorf("chmod: %w", err)
}
return lis, nil
dev, ino, ok := socketIdentity(path)
if !ok {
_ = ul.Close()
_ = os.Remove(path)
return nil, fmt.Errorf("stat bound socket %s", path)
}
return &ownedListener{UnixListener: ul, path: path, dev: dev, ino: ino}, nil
}

// ownedListener is a *net.UnixListener that remembers the identity (device +
// inode) of the socket file it bound, so it can tell whether the file at its
// path is still its own socket and avoid unlinking one a different daemon
// rebound at the same path.
type ownedListener struct {
*net.UnixListener
path string
dev uint64
ino uint64
once sync.Once
}

// Close stops accepting and removes the socket file, but only if the file
// still refers to this listener's socket. SetUnlinkOnClose(false) in Listen
// disabled net's unconditional unlink, so this is the sole remover — and it
// will not delete a socket another daemon rebound at the same path.
func (l *ownedListener) Close() error {
err := l.UnixListener.Close()
l.once.Do(func() {
if sameSocket(l.path, l.dev, l.ino) {
_ = os.Remove(l.path)
}
})
return err
}

// StillOwned reports whether the socket file at the bind path still refers to
// the socket this listener bound. It returns false if the file was removed or
// replaced by a different socket — e.g. after $RUNED_HOME was wiped and
// another daemon rebound the path.
func (l *ownedListener) StillOwned() bool {
return sameSocket(l.path, l.dev, l.ino)
}

// socketIdentity returns the device and inode of the file at path.
func socketIdentity(path string) (dev, ino uint64, ok bool) {
fi, err := os.Stat(path)
if err != nil {
return 0, 0, false
}
st, ok := fi.Sys().(*syscall.Stat_t)
if !ok {
return 0, 0, false
}
// Dev is int32 on darwin and uint64 on linux; widen uniformly.
return uint64(st.Dev), uint64(st.Ino), true
}

// sameSocket reports whether the file at path currently has the given device
// and inode. A live listener pins its own inode, so that inode cannot be
// reused for a different file while we are alive — making (dev,ino) a stable
// identity for "is the path still my socket".
func sameSocket(path string, dev, ino uint64) bool {
d, i, ok := socketIdentity(path)
return ok && d == dev && i == ino
}
3 changes: 1 addition & 2 deletions internal/ipc/uds_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ package ipc

import (
"errors"
"net"
)

// Listen is not implemented on Windows in Plan A. Named-pipe support lands
// in Plan B.
func Listen(path string) (net.Listener, error) {
func Listen(path string) (Listener, error) {
_ = path
return nil, errors.New("runed: Windows named pipe not implemented in Plan A")
}
Loading