Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ internal TrxResultStreamingStore(string filePath, IFileSystem fileSystem, ITask
_logger = logger;
_batchSize = batchSize;
_flushIntervalMs = flushIntervalMs;
_writerTask = task.Run(WriteLoopAsync, CancellationToken.None);
// BlockingCollection<T>.TryTake blocks the calling thread for up to _flushIntervalMs when
// the queue is idle. Running the writer on a dedicated long-running thread instead of the
// shared threadpool keeps it from starving threadpool consumers while it sleeps on the queue.
_writerTask = task.RunLongRunning(WriteLoopAsync, "TRX streaming store writer", CancellationToken.None);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The awaits in WriteLoopAsync might get you back on threadpool so I don't think this is a good fix.

Comment on lines +69 to +72
}

/// <summary>
Expand Down Expand Up @@ -158,7 +161,8 @@ await _logger.LogWarningAsync(
}
}

private async Task WriteLoopAsync()
#pragma warning disable VSTHRD103 // The writer runs on a dedicated long-running thread; synchronous waits keep queue polling off the threadpool.
private Task WriteLoopAsync()
{
var batch = new List<TrxTestResult>(_batchSize);
try
Expand All @@ -179,7 +183,7 @@ private async Task WriteLoopAsync()
batch.Add(next);
}

await WriteBatchAsync(batch).ConfigureAwait(false);
WriteBatch(batch);
batch.Clear();
}
}
Expand Down Expand Up @@ -216,32 +220,29 @@ private async Task WriteLoopAsync()
Interlocked.Add(ref _droppedCount, discarded);
}

await _logger.LogErrorAsync(
_logger.LogErrorAsync(
$"TRX streaming store writer faulted; intermediate file may be incomplete. {discarded} record(s) were dropped from the in-memory queue and will not appear in the TRX.",
ex).ConfigureAwait(false);
ex).GetAwaiter().GetResult();
Comment on lines +223 to +225
}
finally
{
try
{
if (_fileStream is not null)
{
await _fileStream.Stream.FlushAsync().ConfigureAwait(false);
}
_fileStream?.Stream.Flush();

#pragma warning disable VSTHRD103 // BinaryWriter / IFileStream do not implement IAsyncDisposable.
_writer?.Dispose();
_fileStream?.Dispose();
#pragma warning restore VSTHRD103
}
catch (Exception ex)
{
await _logger.LogErrorAsync("Failed to close TRX streaming store file.", ex).ConfigureAwait(false);
_logger.LogErrorAsync("Failed to close TRX streaming store file.", ex).GetAwaiter().GetResult();
}
}

return Task.CompletedTask;
}

private async Task WriteBatchAsync(List<TrxTestResult> batch)
private void WriteBatch(List<TrxTestResult> batch)
{
EnsureFileOpen();
ApplicationStateGuard.Ensure(_writer is not null);
Expand All @@ -259,17 +260,17 @@ private async Task WriteBatchAsync(List<TrxTestResult> batch)
long preRecordPosition = rawStream.Position;
try
{
await WriteRecordWithRetryAsync(_writer, batch[i], rawStream, preRecordPosition).ConfigureAwait(false);
WriteRecordWithRetry(_writer, batch[i], rawStream, preRecordPosition);
written++;
}
catch (Exception ex)
{
int unwritten = batch.Count - written;
Interlocked.Add(ref _droppedCount, unwritten);

await _logger.LogErrorAsync(
_logger.LogErrorAsync(
$"Failed to write TRX record {i + 1}/{batch.Count} after {MaxWriteRetries} retries; truncating to last good record. {unwritten} record(s) from this batch will not appear in the TRX.",
ex).ConfigureAwait(false);
ex).GetAwaiter().GetResult();
try
{
rawStream.Seek(preRecordPosition, SeekOrigin.Begin);
Expand Down Expand Up @@ -298,9 +299,9 @@ await _logger.LogErrorAsync(
Interlocked.Add(ref _droppedCount, additionalDropped);
}

await _logger.LogErrorAsync(
_logger.LogErrorAsync(
$"Failed to truncate TRX streaming store after write failure; marking store faulted. {additionalDropped} additional record(s) from the queue were dropped.",
truncEx).ConfigureAwait(false);
truncEx).GetAwaiter().GetResult();
return;
}

Expand All @@ -312,11 +313,11 @@ await _logger.LogErrorAsync(

try
{
await rawStream.FlushAsync().ConfigureAwait(false);
rawStream.Flush();
}
catch (Exception ex)
{
await _logger.LogErrorAsync("Failed to flush TRX streaming store; records remain in OS buffer.", ex).ConfigureAwait(false);
_logger.LogErrorAsync("Failed to flush TRX streaming store; records remain in OS buffer.", ex).GetAwaiter().GetResult();
}

if (written > 0)
Expand All @@ -325,7 +326,7 @@ await _logger.LogErrorAsync(
}
}

private async Task WriteRecordWithRetryAsync(BinaryWriter writer, TrxTestResult record, Stream rawStream, long preRecordPosition)
private void WriteRecordWithRetry(BinaryWriter writer, TrxTestResult record, Stream rawStream, long preRecordPosition)
{
// Mirrors the retry policy of TrxReportEngine.RetryWhenIOExceptionAsync but bounded so a
// permanently broken file does not stall the writer indefinitely. Critically: each retry
Expand Down Expand Up @@ -357,21 +358,22 @@ private async Task WriteRecordWithRetryAsync(BinaryWriter writer, TrxTestResult
}
catch
{
ExceptionDispatchInfo.Capture(lastError).Throw();
ExceptionDispatchInfo.Capture(lastError!).Throw();
}

try
{
await _task.Delay(TimeSpan.FromMilliseconds(RetryBaseDelayMs * attempt), _disposeCts.Token).ConfigureAwait(false);
_task.Delay(TimeSpan.FromMilliseconds(RetryBaseDelayMs * attempt), _disposeCts.Token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
ExceptionDispatchInfo.Capture(lastError).Throw();
ExceptionDispatchInfo.Capture(lastError!).Throw();
}
}

ExceptionDispatchInfo.Capture(lastError!).Throw();
}
#pragma warning restore VSTHRD103

// Must only be called from the writer thread. Lazy because most test runs may not produce results
// before they hit cancellation/discovery; we don't want to provision a file we never use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.Testing.Extensions.TrxReport.Abstractions.Streaming;
internal static class TrxTestResultExtractor
{
// Cap individual stdout/stderr/stack-trace fields when projecting into the streaming DTO.
// A single TRX result with multi-MB output trips the serializer's 16 MiB record cap (which exists
// A single TRX result with multi-MB output trips the serializer's 64 MiB record cap (which exists
// to detect corruption), at which point ReadAll cannot continue past the offending record because
// there is no sync marker. Truncating at the source preserves the rest of the run; downstream TRX
// consumers (Azure DevOps, VS) also choke on multi-MB output fields. The chosen cap is well below
Expand Down
Loading