diff --git a/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxResultStreamingStore.cs b/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxResultStreamingStore.cs index 9a514d8e13..b488fd4293 100644 --- a/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxResultStreamingStore.cs +++ b/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxResultStreamingStore.cs @@ -66,7 +66,10 @@ internal TrxResultStreamingStore(string filePath, IFileSystem fileSystem, ITask _logger = logger; _batchSize = batchSize; _flushIntervalMs = flushIntervalMs; - _writerTask = task.Run(WriteLoopAsync, CancellationToken.None); + // BlockingCollection.TryTake blocks the calling thread for up to _flushIntervalMs when + // the queue is idle. Running the writer on a dedicated long-running thread instead of the + // shared threadpool keeps it from starving threadpool consumers while it sleeps on the queue. + _writerTask = task.RunLongRunning(WriteLoopAsync, "TRX streaming store writer", CancellationToken.None); } /// @@ -158,7 +161,8 @@ await _logger.LogWarningAsync( } } - private async Task WriteLoopAsync() +#pragma warning disable VSTHRD103 // The writer runs on a dedicated long-running thread; synchronous waits keep queue polling off the threadpool. + private Task WriteLoopAsync() { var batch = new List(_batchSize); try @@ -179,7 +183,7 @@ private async Task WriteLoopAsync() batch.Add(next); } - await WriteBatchAsync(batch).ConfigureAwait(false); + WriteBatch(batch); batch.Clear(); } } @@ -216,32 +220,29 @@ private async Task WriteLoopAsync() Interlocked.Add(ref _droppedCount, discarded); } - await _logger.LogErrorAsync( + _logger.LogErrorAsync( $"TRX streaming store writer faulted; intermediate file may be incomplete. {discarded} record(s) were dropped from the in-memory queue and will not appear in the TRX.", - ex).ConfigureAwait(false); + ex).GetAwaiter().GetResult(); } finally { try { - if (_fileStream is not null) - { - await _fileStream.Stream.FlushAsync().ConfigureAwait(false); - } + _fileStream?.Stream.Flush(); -#pragma warning disable VSTHRD103 // BinaryWriter / IFileStream do not implement IAsyncDisposable. _writer?.Dispose(); _fileStream?.Dispose(); -#pragma warning restore VSTHRD103 } catch (Exception ex) { - await _logger.LogErrorAsync("Failed to close TRX streaming store file.", ex).ConfigureAwait(false); + _logger.LogErrorAsync("Failed to close TRX streaming store file.", ex).GetAwaiter().GetResult(); } } + + return Task.CompletedTask; } - private async Task WriteBatchAsync(List batch) + private void WriteBatch(List batch) { EnsureFileOpen(); ApplicationStateGuard.Ensure(_writer is not null); @@ -259,7 +260,7 @@ private async Task WriteBatchAsync(List batch) long preRecordPosition = rawStream.Position; try { - await WriteRecordWithRetryAsync(_writer, batch[i], rawStream, preRecordPosition).ConfigureAwait(false); + WriteRecordWithRetry(_writer, batch[i], rawStream, preRecordPosition); written++; } catch (Exception ex) @@ -267,9 +268,9 @@ private async Task WriteBatchAsync(List batch) int unwritten = batch.Count - written; Interlocked.Add(ref _droppedCount, unwritten); - await _logger.LogErrorAsync( + _logger.LogErrorAsync( $"Failed to write TRX record {i + 1}/{batch.Count} after {MaxWriteRetries} retries; truncating to last good record. {unwritten} record(s) from this batch will not appear in the TRX.", - ex).ConfigureAwait(false); + ex).GetAwaiter().GetResult(); try { rawStream.Seek(preRecordPosition, SeekOrigin.Begin); @@ -298,9 +299,9 @@ await _logger.LogErrorAsync( Interlocked.Add(ref _droppedCount, additionalDropped); } - await _logger.LogErrorAsync( + _logger.LogErrorAsync( $"Failed to truncate TRX streaming store after write failure; marking store faulted. {additionalDropped} additional record(s) from the queue were dropped.", - truncEx).ConfigureAwait(false); + truncEx).GetAwaiter().GetResult(); return; } @@ -312,11 +313,11 @@ await _logger.LogErrorAsync( try { - await rawStream.FlushAsync().ConfigureAwait(false); + rawStream.Flush(); } catch (Exception ex) { - await _logger.LogErrorAsync("Failed to flush TRX streaming store; records remain in OS buffer.", ex).ConfigureAwait(false); + _logger.LogErrorAsync("Failed to flush TRX streaming store; records remain in OS buffer.", ex).GetAwaiter().GetResult(); } if (written > 0) @@ -325,7 +326,7 @@ await _logger.LogErrorAsync( } } - private async Task WriteRecordWithRetryAsync(BinaryWriter writer, TrxTestResult record, Stream rawStream, long preRecordPosition) + private void WriteRecordWithRetry(BinaryWriter writer, TrxTestResult record, Stream rawStream, long preRecordPosition) { // Mirrors the retry policy of TrxReportEngine.RetryWhenIOExceptionAsync but bounded so a // permanently broken file does not stall the writer indefinitely. Critically: each retry @@ -357,21 +358,22 @@ private async Task WriteRecordWithRetryAsync(BinaryWriter writer, TrxTestResult } catch { - ExceptionDispatchInfo.Capture(lastError).Throw(); + ExceptionDispatchInfo.Capture(lastError!).Throw(); } try { - await _task.Delay(TimeSpan.FromMilliseconds(RetryBaseDelayMs * attempt), _disposeCts.Token).ConfigureAwait(false); + _task.Delay(TimeSpan.FromMilliseconds(RetryBaseDelayMs * attempt), _disposeCts.Token).GetAwaiter().GetResult(); } catch (OperationCanceledException) { - ExceptionDispatchInfo.Capture(lastError).Throw(); + ExceptionDispatchInfo.Capture(lastError!).Throw(); } } ExceptionDispatchInfo.Capture(lastError!).Throw(); } +#pragma warning restore VSTHRD103 // Must only be called from the writer thread. Lazy because most test runs may not produce results // before they hit cancellation/discovery; we don't want to provision a file we never use. diff --git a/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxTestResultExtractor.cs b/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxTestResultExtractor.cs index 5d15d51f14..9865f9b15b 100644 --- a/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxTestResultExtractor.cs +++ b/src/Platform/Microsoft.Testing.Extensions.TrxReport/Streaming/TrxTestResultExtractor.cs @@ -10,7 +10,7 @@ namespace Microsoft.Testing.Extensions.TrxReport.Abstractions.Streaming; internal static class TrxTestResultExtractor { // Cap individual stdout/stderr/stack-trace fields when projecting into the streaming DTO. - // A single TRX result with multi-MB output trips the serializer's 16 MiB record cap (which exists + // A single TRX result with multi-MB output trips the serializer's 64 MiB record cap (which exists // to detect corruption), at which point ReadAll cannot continue past the offending record because // there is no sync marker. Truncating at the source preserves the rest of the run; downstream TRX // consumers (Azure DevOps, VS) also choke on multi-MB output fields. The chosen cap is well below