Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/Platform/Microsoft.Testing.Platform/Hosts/CommonTestHost.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft.Testing.Platform.Capabilities.TestFramework;
using Microsoft.Testing.Platform.Extensions;
using Microsoft.Testing.Platform.Extensions.TestFramework;
using Microsoft.Testing.Platform.Extensions.TestHost;
Expand Down Expand Up @@ -60,6 +61,13 @@ public async Task<int> RunAsync()

bool isValidProtocol = await PushOnlyProtocol.IsCompatibleProtocolAsync(hostType).ConfigureAwait(false);

if (isValidProtocol && PushOnlyProtocol.IsServerControlChannelSupported)
{
// Start listening for server-initiated signals (e.g. session cancellation) before running tests
// so a signal that arrives mid-run is observed. React by stopping gracefully where possible.
await PushOnlyProtocol.StartServerControlChannelAsync(RequestGracefulSessionStopAsync).ConfigureAwait(false);
}

exitCode = isValidProtocol
? await RunTestAppAsync(platformOTelService, testApplicationCancellationToken, alreadyDisposed).ConfigureAwait(false)
: (int)ExitCode.IncompatibleProtocolVersion;
Expand Down Expand Up @@ -127,6 +135,25 @@ private string GetHostType()
return hostType;
}

// Reaction to a server-initiated session cancellation coming over the reverse control pipe. Prefer a graceful
// stop so the framework stops scheduling new tests but still emits trx/logs/artifacts for whatever completed
// (mirroring the local '--maximum-failed-tests' behavior). Fall back to hard cancellation when the running
// framework has no graceful-stop capability (e.g. the test host controller), which is the only lever left.
private async Task RequestGracefulSessionStopAsync(CancellationToken cancellationToken)
{
IGracefulStopTestExecutionCapability? capability =
ServiceProvider.GetService<ITestFrameworkCapabilities>()?.GetCapability<IGracefulStopTestExecutionCapability>();

if (capability is not null)
{
await capability.StopTestExecutionAsync(cancellationToken).ConfigureAwait(false);
}
else
{
ServiceProvider.GetTestApplicationCancellationTokenSource().Cancel();
}
}

private async Task<int> RunTestAppAsync(IPlatformOpenTelemetryService? platformOTelService, CancellationToken testApplicationCancellationToken, List<object> alreadyDisposed)
{
if (RunTestApplicationLifeCycleCallbacks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ public async Task<int> RunAsync()
{
return (int)ExitCode.IncompatibleProtocolVersion;
}

if (pushOnlyProtocol.IsServerControlChannelSupported)
{
// The orchestrator has no graceful-stop capability of its own; a server-initiated cancel maps to
// cancelling the application token, which propagates to the orchestrated test host processes.
await pushOnlyProtocol.StartServerControlChannelAsync(_ =>
{
applicationCancellationToken.Cancel();
return Task.CompletedTask;
}).ConfigureAwait(false);
}
}

int exitCode;
Expand Down
34 changes: 34 additions & 0 deletions src/Platform/Microsoft.Testing.Platform/IPC/NamedPipeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal sealed class NamedPipeClient : NamedPipeBase, IClient
private readonly NamedPipeClientStream _namedPipeClientStream;
private readonly SemaphoreSlim _lock = new(1, 1);
private readonly IEnvironment _environment;
private readonly bool _exitProcessOnConnectionLoss;

private bool _disposed;

Expand All @@ -32,6 +33,25 @@ public NamedPipeClient(string name)
}

public NamedPipeClient(string name, IEnvironment environment)
: this(name, environment, exitProcessOnConnectionLoss: true)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="NamedPipeClient"/> class connecting to the named pipe
/// <paramref name="name"/>.
/// </summary>
/// <param name="name">The OS-level named pipe name to connect to.</param>
/// <param name="environment">The environment abstraction used for process-exit on connection loss.</param>
/// <param name="exitProcessOnConnectionLoss">
/// When <see langword="true"/> (the default) a lost connection - the server disconnecting while we write a
/// request or before it sends a response - terminates the process via <see cref="IEnvironment.Exit(int)"/>.
/// This is the right behavior for the primary data pipe: if we cannot talk to the host there is no way to
/// recover. Set it to <see langword="false"/> for auxiliary channels (e.g. the reverse server-control pipe)
/// where a dropped connection should surface as an exception the caller can handle cooperatively rather than
/// killing the whole test host.
/// </param>
public NamedPipeClient(string name, IEnvironment environment, bool exitProcessOnConnectionLoss)
{
if (name is null)
{
Expand All @@ -41,6 +61,7 @@ public NamedPipeClient(string name, IEnvironment environment)
_namedPipeClientStream = new(".", name, PipeDirection.InOut, AsyncCurrentUserPipeOptions);
PipeName = name;
_environment = environment;
_exitProcessOnConnectionLoss = exitProcessOnConnectionLoss;
}

public string PipeName { get; }
Expand Down Expand Up @@ -69,6 +90,12 @@ public async Task<TResponse> RequestReplyAsync<TRequest, TResponse>(TRequest req
// The server disconnected while we were writing the request. Mirror the read-EOF handling
// below: if we cannot deliver the request there's no way to recover, so exit abnormally
// instead of surfacing a raw IPC error to the caller.
if (!_exitProcessOnConnectionLoss)
{
// Auxiliary channel: let the caller observe the disconnect and react cooperatively.
throw;
}

_environment.Exit((int)ExitCode.GenericFailure);
throw;
}
Expand All @@ -83,6 +110,13 @@ public async Task<TResponse> RequestReplyAsync<TRequest, TResponse>(TRequest req
// This is especially important for 'dotnet test', where the user can simply kill the dotnet.exe process themselves.
// In that case, we want the MTP process to also die.
// Exit code 1 indicates abnormal termination due to IPC connection loss.
if (!_exitProcessOnConnectionLoss)
{
// Auxiliary channel (e.g. the reverse server-control pipe): a dropped connection means the
// peer went away. Surface it as an exception so the caller can react (e.g. treat "host gone"
// as a cooperative cancel) instead of killing the whole test host.
throw new IOException($"Pipe '{PipeName}' was closed by the server before a response was received.");
}

// Surface a diagnostic on stderr so the user has a chance to understand why this process is exiting.
// We deliberately use Console.Error (and not stdout) to avoid corrupting any machine-readable output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace Microsoft.Testing.Platform.IPC.Serializers;
* TestInProgressMessagesSerializer: 10
* AzureDevOpsLogMessageSerializer: 11
* DisplayMessageSerializer: 12
* WaitForServerControlRequestSerializer: 13
* ServerControlMessageSerializer: 14
*/

[Embedded]
Expand All @@ -41,5 +43,7 @@ public static void RegisterAllSerializers(this NamedPipeBase namedPipeBase)
namedPipeBase.RegisterSerializer(new TestInProgressMessagesSerializer(), typeof(TestInProgressMessages));
namedPipeBase.RegisterSerializer(new AzureDevOpsLogMessageSerializer(), typeof(AzureDevOpsLogMessage));
namedPipeBase.RegisterSerializer(new DisplayMessageSerializer(), typeof(DisplayMessage));
namedPipeBase.RegisterSerializer(new WaitForServerControlRequestSerializer(), typeof(WaitForServerControlRequest));
namedPipeBase.RegisterSerializer(new ServerControlMessageSerializer(), typeof(ServerControlMessage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ internal sealed class DotnetTestConnection : IPushOnlyProtocol, IDisposable

private NamedPipeClient? _dotnetTestPipeClient;

private NamedPipeClient? _serverControlPipeClient;
private Task? _serverControlListenerTask;
private CancellationTokenSource? _serverControlListenerCts;
private string? _serverControlPipeName;
private int _cancelRequested;

public static string InstanceId { get; } = Guid.NewGuid().ToString("N");

public DotnetTestConnection(CommandLineHandler commandLineHandler, IEnvironment environment, ITestApplicationModuleInfo testApplicationModuleInfo, ITestApplicationCancellationTokenSource cancellationTokenSource)
Expand Down Expand Up @@ -97,6 +103,12 @@ public async Task HelpInvokedAsync()
// so an older SDK (<= 1.2.0) never receives an unknown message id.
public bool IsDisplayMessageForwardingSupported { get; private set; }

// True once the SDK advertised a reverse "server control" pipe name in its handshake reply. When set, the
// test host opens a NamedPipeClient to that pipe and parks a long-poll so the SDK can push a
// ServerControlMessage (e.g. CancelSession) at any time. The feature is gated on the presence of the handshake
// property (a capability), not on the negotiated version string, so an older SDK simply never enables it.
public bool IsServerControlChannelSupported { get; private set; }

public async Task<bool> IsCompatibleProtocolAsync(string hostType, IReadOnlyDictionary<byte, string>? additionalHandshakeProperties = null)
{
RoslynDebug.Assert(_dotnetTestPipeClient is not null);
Expand Down Expand Up @@ -132,6 +144,13 @@ public async Task<bool> IsCompatibleProtocolAsync(string hostType, IReadOnlyDict
bool.TryParse(isIDEValue, out bool isIDE) &&
isIDE;

if (response.Properties?.TryGetValue(HandshakeMessagePropertyNames.ServerControlPipeName, out string? serverControlPipeName) is true &&
!RoslynString.IsNullOrEmpty(serverControlPipeName))
{
_serverControlPipeName = serverControlPipeName;
IsServerControlChannelSupported = true;
}

if (response.Properties?.TryGetValue(HandshakeMessagePropertyNames.SupportedProtocolVersions, out string? protocolVersion) is true)
{
bool isCompatible = IsVersionCompatible(protocolVersion, supportedProtocolVersions);
Expand Down Expand Up @@ -186,7 +205,161 @@ public async Task SendMessageAsync(IRequest message)
}
}

public Task OnExitAsync() => Task.CompletedTask;
/// <summary>
/// Opens the reverse "server control" pipe the SDK advertised during the handshake and parks a long-poll
/// <see cref="WaitForServerControlRequest"/> on it. The SDK completes that request with a
/// <see cref="ServerControlMessage"/> whenever it wants to signal the test host (today only
/// <see cref="ServerControlKinds.CancelSession"/>). On cancel - or when the control pipe drops, which means
/// the host went away - <paramref name="onCancelSessionRequestedAsync"/> is invoked exactly once so the caller
/// can stop the run cooperatively (preferring a graceful stop so trx/artifacts are still produced).
/// </summary>
/// <param name="onCancelSessionRequestedAsync">
/// The reaction to a server-initiated cancel. It receives the test application cancellation token.
/// </param>
/// <remarks>
/// Both the connect and the long-poll run entirely on a background task, so test start is never blocked on
/// this auxiliary channel. This is best-effort: if the control pipe cannot be established the test run
/// continues unaffected (the feature simply stays off). Callers should only invoke this after a successful
/// handshake and when <see cref="IsServerControlChannelSupported"/> is <see langword="true"/>. It is invoked
/// once per connection on the awaited run path (before the test run starts), so the control-channel fields
/// below are written here and only read later during exit/dispose - the single-threaded lifecycle makes the
/// plain fields safe without extra synchronization.
/// </remarks>
public Task StartServerControlChannelAsync(Func<CancellationToken, Task> onCancelSessionRequestedAsync)
{
if (!IsServerControlChannelSupported || RoslynString.IsNullOrEmpty(_serverControlPipeName) || _serverControlPipeClient is not null)
{
return Task.CompletedTask;
}

_serverControlListenerCts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.CancellationToken);

// exitProcessOnConnectionLoss: false - a dropped control pipe must not kill the test host; the listener
// turns it into a cooperative cancel instead.
var controlClient = new NamedPipeClient(_serverControlPipeName, _environment, exitProcessOnConnectionLoss: false);
controlClient.RegisterAllSerializers();
_serverControlPipeClient = controlClient;

// Connect AND listen on a background task: we deliberately do not await the connect on the run path so a
// slow/absent control server can never delay (or fail) test execution start.
_serverControlListenerTask = Task.Run(
() => ConnectAndListenForServerControlAsync(controlClient, onCancelSessionRequestedAsync, _serverControlListenerCts.Token));
return Task.CompletedTask;
}

private async Task ConnectAndListenForServerControlAsync(NamedPipeClient controlClient, Func<CancellationToken, Task> onCancelSessionRequestedAsync, CancellationToken cancellationToken)
{
try
{
// Bound the connect so a misbehaving SDK that advertises a pipe it never listens on cannot leave this
// task parked forever holding resources.
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
connectCts.CancelAfter(TimeSpan.FromSeconds(30));
await controlClient.ConnectAsync(connectCts.Token).ConfigureAwait(false);
}
catch (Exception)
{
// Best-effort: failing to establish the control channel degrades to "no server-initiated cancel"
// rather than affecting the test run.
return;
}

await ListenForServerControlAsync(controlClient, onCancelSessionRequestedAsync, cancellationToken).ConfigureAwait(false);
}

private async Task ListenForServerControlAsync(NamedPipeClient controlClient, Func<CancellationToken, Task> onCancelSessionRequestedAsync, CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
ServerControlMessage message = await controlClient.RequestReplyAsync<WaitForServerControlRequest, ServerControlMessage>(
WaitForServerControlRequest.CachedInstance, cancellationToken).ConfigureAwait(false);

if (message.Kind == ServerControlKinds.CancelSession)
{
await RequestCancelOnceAsync(onCancelSessionRequestedAsync).ConfigureAwait(false);
return;
}

public void Dispose() => _dotnetTestPipeClient?.Dispose();
// Unknown control kind (forward-compat): ignore it and keep parking for the next signal.
}
}
catch (OperationCanceledException)
{
// We are shutting down (dispose or app cancellation) - nothing to do.
}
catch (Exception) when (!cancellationToken.IsCancellationRequested)
{
// The control pipe dropped while the session was still live => the host went away. Treat it as a
// cooperative cancel so we still try to wind down and report whatever completed. NOTE: this makes it a
// protocol requirement that the SDK keep the control pipe open until the data session ends - an early
// close for any reason is interpreted here as a cancel.
await RequestCancelOnceAsync(onCancelSessionRequestedAsync).ConfigureAwait(false);
}
catch (Exception)
{
// Cancellation raced with a pipe error (e.g. the stream was disposed during teardown); ignore.
}
}

private async Task RequestCancelOnceAsync(Func<CancellationToken, Task> onCancelSessionRequestedAsync)
{
if (Interlocked.Exchange(ref _cancelRequested, 1) != 0)
{
return;
}

await onCancelSessionRequestedAsync(_cancellationTokenSource.CancellationToken).ConfigureAwait(false);
}

public async Task OnExitAsync()
{
#if NET
if (_serverControlListenerCts is { } cts)
{
await cts.CancelAsync().ConfigureAwait(false);
}
#else
#pragma warning disable VSTHRD103 // CancellationTokenSource.CancelAsync is not available on this target framework.
_serverControlListenerCts?.Cancel();
#pragma warning restore VSTHRD103
#endif

// Cancelling the token is enough to abort an in-flight named-pipe read on modern .NET, but on .NET
// Framework cancellation of an already-parked overlapped read is not reliable - disposing the stream is
// what forces it to unblock. We cancelled first (above) so the listener treats the resulting failure as
// shutdown rather than "host gone => cancel".
_serverControlPipeClient?.Dispose();

if (_serverControlListenerTask is { } listenerTask)
{
// Bounded wait so a stuck listener can never hang the exit path on any target framework.
await Task.WhenAny(listenerTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(false);
}
}

public void Dispose()
{
_serverControlListenerCts?.Cancel();

// Disposing the pipe client forces a parked read to abort even where token cancellation alone would not.
_serverControlPipeClient?.Dispose();

if (_serverControlListenerTask is { } listenerTask)
{
try
{
// Bounded wait: the cancel + dispose above unblock the parked read quickly. Never hang exit.
listenerTask.Wait(TimeSpan.FromSeconds(5));
}
catch (Exception)
{
// Best-effort shutdown.
}
}

_serverControlListenerCts?.Dispose();
_dotnetTestPipeClient?.Dispose();
}
}
Loading
Loading