Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libshpool/src/daemon/prompt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ fn wait_for_startup(pty_master: &mut shpool_pty::fork::Master) -> anyhow::Result
.write_all(startup_sentinel_cmd.as_bytes())
.context("running startup sentinel script")?;

let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
let mut buf: [u8; 2048] = [0; 2048];
loop {
if std::time::Instant::now() > deadline {
warn!("timed out waiting for startup sentinel after 30s");
return Err(anyhow!("timed out waiting for startup sentinel"));
}
let len = pty_master.read(&mut buf).context("reading chunk to scan for startup")?;
if len == 0 {
continue;
Expand Down
211 changes: 126 additions & 85 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,89 +363,122 @@ impl Server {
)> {
let warnings = vec![];

// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "1_lock(shells)").entered();
let mut shells = self.shells.lock().unwrap();

let mut status = AttachStatus::Attached { warnings: warnings.clone() };
if let Some(session) = shells.get(&header.name) {
info!("found entry for '{}'", header.name);
if let Ok(mut inner) = session.inner.try_lock() {
let _s =
span!(Level::INFO, "aquired_lock(session.inner)", s = header.name).entered();
// We have an existing session in our table, but the subshell
// proc might have exited in the meantime, for example if the
// user typed `exit` right before the connection dropped there
// could be a zombie entry in our session table. We need to
// re-check whether the subshell has exited before taking this over.
//
// N.B. this is still technically a race, but in practice it does
// not ever cause problems, and there is no real way to avoid some
// sort of race without just always creating a new session when
// a shell exits, which would break `exit` typed at the shell prompt.
match session.child_exit_notifier.wait(Some(time::Duration::from_millis(0))) {
None => {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
// Phase 1: Determine what to do while holding the lock.
// We must NOT call spawn_subshell while holding this lock
// because spawn_subshell may call wait_for_startup which
// blocks indefinitely reading from the PTY. If it blocks,
// the global shells mutex is held forever, deadlocking
// the entire daemon (list, attach, detach, kill all hang).
let needs_new_shell = {
let _s = span!(Level::INFO, "1_lock(shells)").entered();
let shells = self.shells.lock().unwrap();

let mut status = AttachStatus::Attached { warnings: warnings.clone() };
if let Some(session) = shells.get(&header.name) {
info!("found entry for '{}'", header.name);
if let Ok(mut inner) = session.inner.try_lock() {
let _s =
span!(Level::INFO, "aquired_lock(session.inner)", s = header.name)
.entered();
// We have an existing session in our table, but the subshell
// proc might have exited in the meantime, for example if the
// user typed `exit` right before the connection dropped there
// could be a zombie entry in our session table. We need to
// re-check whether the subshell has exited before taking this over.
//
// N.B. this is still technically a race, but in practice it does
// not ever cause problems, and there is no real way to avoid some
// sort of race without just always creating a new session when
// a shell exits, which would break `exit` typed at the shell prompt.
match session.child_exit_notifier.wait(Some(time::Duration::from_millis(0))) {
None => {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
warn!(
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
);
status = AttachStatus::Created { warnings: warnings.clone() };
}

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
warn!(
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
info!(
"stale inner, (child exited with status {}) clobbering with new subshell",
exit_status
);
status = AttachStatus::Created { warnings };
status = AttachStatus::Created { warnings: warnings.clone() };
}

// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
info!(
"stale inner, (child exited with status {}) clobbering with new subshell",
exit_status
);
status = AttachStatus::Created { warnings };

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
info!("shell->client thread finished, joining");
if let Some(h) = inner.shell_to_client_join_h.take() {
h.join()
.map_err(|e| {
anyhow!("joining shell->client on reattach: {:?}", e)
})?
.context("within shell->client thread on reattach")?;
}
assert!(matches!(status, AttachStatus::Created { .. }));
}
}

if inner.shell_to_client_join_h.as_ref().map(|h| h.is_finished()).unwrap_or(false) {
info!("shell->client thread finished, joining");
if let Some(h) = inner.shell_to_client_join_h.take() {
h.join()
.map_err(|e| anyhow!("joining shell->client on reattach: {:?}", e))?
.context("within shell->client thread on reattach")?;
// fallthrough to bidi streaming
} else {
info!("busy shell session, doing nothing");
// The stream is busy, so we just inform the client and close the stream.
write_reply(
&mut stream,
AttachReplyHeader { status: AttachStatus::Busy },
)?;
stream.shutdown(net::Shutdown::Both).context("closing stream")?;
if let Err(err) = self.hooks.on_busy(&header.name) {
warn!("busy hook: {:?}", err);
}
assert!(matches!(status, AttachStatus::Created { .. }));
return Err(ShellSelectionError::BusyShellSession)?;
}
} else {
info!("no existing '{}' session, creating new one", &header.name);
status = AttachStatus::Created { warnings: warnings.clone() };
}

// fallthrough to bidi streaming
if matches!(status, AttachStatus::Created { .. }) {
// Signal that we need to spawn a new shell, but do NOT
// do it here — we'll do it after releasing the lock.
if let Err(err) = self.hooks.on_new_session(&header.name) {
warn!("new_session hook: {:?}", err);
}
true
} else {
info!("busy shell session, doing nothing");
// The stream is busy, so we just inform the client and close the stream.
write_reply(&mut stream, AttachReplyHeader { status: AttachStatus::Busy })?;
stream.shutdown(net::Shutdown::Both).context("closing stream")?;
if let Err(err) = self.hooks.on_busy(&header.name) {
warn!("busy hook: {:?}", err);
if let Err(err) = self.hooks.on_reattach(&header.name) {
warn!("reattach hook: {:?}", err);
}
return Err(ShellSelectionError::BusyShellSession)?;
false
}
} else {
info!("no existing '{}' session, creating new one", &header.name);
status = AttachStatus::Created { warnings };
}
// shells lock is dropped here
};

if matches!(status, AttachStatus::Created { .. }) {
info!("creating new subshell");
if let Err(err) = self.hooks.on_new_session(&header.name) {
warn!("new_session hook: {:?}", err);
}
// Phase 2: Spawn the subshell WITHOUT the shells lock held.
// This is critical because spawn_subshell -> maybe_inject_prefix
// -> wait_for_startup can block indefinitely.
if needs_new_shell {
info!("creating new subshell (lock released)");
let motd = self.config.get().motd.clone().unwrap_or_default();
let session = self.spawn_subshell(
conn_id,
Expand All @@ -458,24 +491,32 @@ impl Server {

session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());

// Re-acquire the lock to insert the new session
let _s = span!(Level::INFO, "2_lock(shells)").entered();
let mut shells = self.shells.lock().unwrap();
shells.insert(header.name.clone(), Box::new(session));
// fallthrough to bidi streaming
} else if let Err(err) = self.hooks.on_reattach(&header.name) {
warn!("reattach hook: {:?}", err);
}

// return a reference to the inner session so that
// we can work with it without the global session
// table lock held
if let Some(session) = shells.get(&header.name) {
Ok((
Some(Arc::clone(&session.child_exit_notifier)),
Some(Arc::clone(&session.inner)),
Some(Arc::clone(&session.pager_ctl)),
status,
))
} else {
Ok((None, None, None, status))
// Phase 3: Return references to the session.
// Re-acquire the lock (briefly) to get Arc references.
{
let _s = span!(Level::INFO, "3_lock(shells)").entered();
let shells = self.shells.lock().unwrap();
if let Some(session) = shells.get(&header.name) {
Ok((
Some(Arc::clone(&session.child_exit_notifier)),
Some(Arc::clone(&session.inner)),
Some(Arc::clone(&session.pager_ctl)),
if needs_new_shell {
AttachStatus::Created { warnings: vec![] }
} else {
AttachStatus::Attached { warnings: vec![] }
},
))
} else {
Ok((None, None, None, AttachStatus::Created { warnings: vec![] }))
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions shpool/tests/data/hanging_shell.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
# A fake shell that reads stdin but never executes anything.
# This simulates a shell that is slow to start up, which causes
# wait_for_startup to block forever.
sleep 3600
13 changes: 13 additions & 0 deletions shpool/tests/data/slow_shell_startup.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
norc = true
noecho = true
# Use cat as the shell - it never produces the startup sentinel,
# simulating a slow/hanging shell startup that triggers the deadlock.
shell = "/bin/cat"
session_restore_mode = "simple"
# Non-empty prompt_prefix enables sentinel injection, which triggers
# wait_for_startup in spawn_subshell.
prompt_prefix = "test> "

[env]
PS1 = "prompt> "
TERM = ""
Loading