Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .github/workflows/dotnetcore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ jobs:
steps:

- name: Check out code onto host
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Setup .Net 6.0
uses: actions/setup-dotnet@v4
uses: actions/setup-dotnet@v5
with:
dotnet-version: '6.0.x' # SDK Version to use.

- name: Setup .Net 8.0
uses: actions/setup-dotnet@v4
uses: actions/setup-dotnet@v5
with:
dotnet-version: '8.0.x' # SDK Version to use.

- name: Setup .Net 9.0
uses: actions/setup-dotnet@v4
uses: actions/setup-dotnet@v5
with:
dotnet-version: '9.0.x' # SDK Version to use.

Expand Down Expand Up @@ -58,15 +58,15 @@ jobs:
if: matrix.framework == 'net80'

- name: Upload dotnet test results
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: dotnet-results-${{ matrix.os }}-${{ matrix.framework }}
path: TestResults-${{ matrix.os }}-${{ matrix.framework }}
# Use always() to always run this step to publish test results when there are test failures
if: ${{ always() }}

- name: Upload BenchmarkDotNet results
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: BenchmarkDotNet-${{ matrix.os }}-${{ matrix.framework }}
path: BenchmarkDotNet.Artifacts
Expand All @@ -84,13 +84,13 @@ jobs:
steps:

- name: Check out code onto host
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Add msbuild to PATH
uses: microsoft/setup-msbuild@v1.3.1
uses: microsoft/setup-msbuild@v2

- name: Setup .Net 8.0
uses: actions/setup-dotnet@v4
uses: actions/setup-dotnet@v5
with:
dotnet-version: '8.0.x' # SDK Version to use.

Expand All @@ -114,7 +114,7 @@ jobs:
dotnet test --no-restore --configuration Release --verbosity normal --framework=${{ matrix.framework }} --logger trx --results-directory "TestResults-${{ matrix.os }}-${{ matrix.framework }}"

- name: Upload dotnet test results
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: dotnet-results-windows-latest-${{ matrix.framework }}
path: TestResults-windows-latest-${{ matrix.framework }}
Expand Down
8 changes: 2 additions & 6 deletions src/CSRakowski.Parallel/CSRakowski.Parallel.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net472;net80;net60;netstandard2.1;netstandard2.0;netstandard1.1</TargetFrameworks>
<TargetFrameworks>net80;net472;netstandard2.0</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Company />
<PackageId>CSRakowski.ParallelAsync</PackageId>
Expand All @@ -26,11 +26,7 @@
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'net472'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.1'">
<PackageReference Include="CSRakowski.AsyncStreamsPreparations" Version="1.6.0" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="[6.0.0,)" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

namespace CSRakowski.Parallel.Extensions
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

/// <summary>
/// Extension methods to allow using the functionalities of <see cref="ParallelAsync"/> with a fluent syntax
/// </summary>
Expand All @@ -28,7 +26,7 @@ public static partial class ParallelAsyncEx
/// <returns>The results of the operations</returns>
/// <exception cref="ArgumentNullException">Thrown when either <paramref name="parallelAsync"/> or <paramref name="func"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the configured maximum batch size is a negative number.</exception>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this IParallelAsyncEnumerable<TInput> parallelAsync, Func<TInput, Task<TResult>> func, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this IParallelAsyncEnumerable<TInput> parallelAsync, Func<TInput, Task<TResult>> func, CancellationToken cancellationToken = default)
{
var obj = EnsureValidEnumerable(parallelAsync);

Expand All @@ -53,7 +51,7 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this
/// <returns>The results of the operations</returns>
/// <exception cref="ArgumentNullException">Thrown when either <paramref name="parallelAsync"/> or <paramref name="func"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the configured maximum batch size is a negative number.</exception>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this IParallelAsyncEnumerable<TInput> parallelAsync, Func<TInput, CancellationToken, Task<TResult>> func, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this IParallelAsyncEnumerable<TInput> parallelAsync, Func<TInput, CancellationToken, Task<TResult>> func, CancellationToken cancellationToken = default)
{
var obj = EnsureValidEnumerable(parallelAsync);

Expand All @@ -69,7 +67,4 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TInput, TResult>(this

#endregion ForEachAsyncStream overloads
}

#endif //NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

}
31 changes: 23 additions & 8 deletions src/CSRakowski.Parallel/ParallelAsync.AsyncStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

namespace CSRakowski.Parallel
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

public static partial class ParallelAsync
{
#region IEnumerable<T>
Expand Down Expand Up @@ -41,12 +39,16 @@ public static partial class ParallelAsync
/// Setting this value to low, will mean a too small list will be allocated and you will have to pay a small performance hit for the resizing of the list during execution.
/// </para>
/// </remarks>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumerable<TIn> collection, Func<TIn, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumerable<TIn> collection, Func<TIn, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, CancellationToken cancellationToken = default)
{
#if NET8_0_OR_GREATER
ArgumentNullException.ThrowIfNull(func);
#else
if (func == null)
{
throw new ArgumentNullException(nameof(func));
}
#endif //NET8_0_OR_GREATER

var funcWithCancellationToken = WrapFunc(func);
return ForEachAsyncStream<TResult, TIn>(collection, funcWithCancellationToken, maxBatchSize, allowOutOfOrderProcessing, estimatedResultSize, cancellationToken);
Expand Down Expand Up @@ -80,8 +82,12 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumer
/// Setting this value to low, will mean a too small list will be allocated and you will have to pay a small performance hit for the resizing of the list during execution.
/// </para>
/// </remarks>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumerable<TIn> collection, Func<TIn, CancellationToken, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumerable<TIn> collection, Func<TIn, CancellationToken, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, CancellationToken cancellationToken = default)
{
#if NET8_0_OR_GREATER
ArgumentNullException.ThrowIfNull(collection);
ArgumentNullException.ThrowIfNull(func);
#else
if (collection == null)
{
throw new ArgumentNullException(nameof(collection));
Expand All @@ -91,6 +97,7 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IEnumer
{
throw new ArgumentNullException(nameof(func));
}
#endif //NET8_0_OR_GREATER

var maxBatchSizeToUse = DetermineBatchSizeToUse(maxBatchSize);

Expand Down Expand Up @@ -319,12 +326,16 @@ private static async IAsyncEnumerable<TResult> ForEachAsyncStreamImplUnordered<T
/// Setting this value to low, will mean a too small list will be allocated and you will have to pay a small performance hit for the resizing of the list during execution.
/// </para>
/// </remarks>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncEnumerable<TIn> collection, Func<TIn, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncEnumerable<TIn> collection, Func<TIn, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, CancellationToken cancellationToken = default)
{
#if NET8_0_OR_GREATER
ArgumentNullException.ThrowIfNull(func);
#else
if (func == null)
{
throw new ArgumentNullException(nameof(func));
}
#endif //NET8_0_OR_GREATER

var funcWithCancellationToken = WrapFunc(func);
return ForEachAsyncStream<TResult, TIn>(collection, funcWithCancellationToken, maxBatchSize, allowOutOfOrderProcessing, estimatedResultSize, cancellationToken);
Expand Down Expand Up @@ -358,8 +369,12 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncE
/// Setting this value to low, will mean a too small list will be allocated and you will have to pay a small performance hit for the resizing of the list during execution.
/// </para>
/// </remarks>
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncEnumerable<TIn> collection, Func<TIn, CancellationToken, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncEnumerable<TIn> collection, Func<TIn, CancellationToken, Task<TResult>> func, int maxBatchSize = 0, bool allowOutOfOrderProcessing = false, int estimatedResultSize = 0, CancellationToken cancellationToken = default)
{
#if NET8_0_OR_GREATER
ArgumentNullException.ThrowIfNull(collection);
ArgumentNullException.ThrowIfNull(func);
#else
if (collection == null)
{
throw new ArgumentNullException(nameof(collection));
Expand All @@ -369,6 +384,7 @@ public static IAsyncEnumerable<TResult> ForEachAsyncStream<TResult, TIn>(IAsyncE
{
throw new ArgumentNullException(nameof(func));
}
#endif //NET8_0_OR_GREATER

int maxBatchSizeToUse = DetermineBatchSizeToUse(maxBatchSize);

Expand Down Expand Up @@ -555,8 +571,7 @@ private static async IAsyncEnumerable<TResult> ForEachAsyncStreamImplUnorderedAs
ParallelAsyncEventSource.Log.RunStop(runId);
}

#endregion IAsyncEnumerable<T>
#endregion IAsyncEnumerable<T>
}

#endif //NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,11 @@ public async Task<int> IEnumerable_ForEachAsyncStream()
{
int count = 0;

#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

await foreach (var r in ParallelAsync.ForEachAsyncStream(InputNumbers, TestFunctions.JustAddOne_WithCancellationToken, MaxBatchSize, AllowOutOfOrder, NumberOfItemsInCollection, CancellationToken.None))
{
count++;
}

#else

await Task.CompletedTask;

#endif //NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

return count;
}

Expand All @@ -86,18 +78,10 @@ public async Task<int> IAsyncEnumerable_ForEachAsyncStream()
{
int count = 0;

#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

await foreach (var r in ParallelAsync.ForEachAsyncStream(InputNumbersAsync, TestFunctions.JustAddOne_WithCancellationToken, MaxBatchSize, AllowOutOfOrder, NumberOfItemsInCollection, CancellationToken.None))
{
count++;
}

#else

await Task.CompletedTask;

#endif //NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

return count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.15.0" />
<PackageReference Include="BenchmarkDotNet" Version="0.15.6" />
<PackageReference Include="CSRakowski.AsyncStreamsPreparations" Version="1.6.0" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' == '.NETFramework' ">
<PackageReference Include="BenchmarkDotNet.Diagnostics.Windows" Version="0.15.0" />
<PackageReference Include="BenchmarkDotNet.Diagnostics.Windows" Version="0.15.6" />
</ItemGroup>

<ItemGroup>
Expand Down
12 changes: 9 additions & 3 deletions tests/CSRakowski.Parallel.Tests/CSRakowski.Parallel.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net48;net472;net90;net80;net60</TargetFrameworks>
Expand All @@ -11,9 +11,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.8">
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand All @@ -24,6 +23,13 @@
<PackageReference Include="CSRakowski.AsyncStreamsPreparations" Version="1.6.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net60'">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' != 'net60'">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\CSRakowski.Parallel\CSRakowski.Parallel.csproj" />
</ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions tests/CSRakowski.Parallel.Tests/ExtensionMethodsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ await parallelAsync.ForEachAsync((el) => {
Interlocked.Add(ref sum, el);
Interlocked.Increment(ref count);

return TaskHelper.CompletedTask;
return Task.CompletedTask;
});

Assert.Equal(55, sum);
Expand All @@ -106,7 +106,7 @@ await parallelAsync.ForEachAsync((el, ct) => {
Interlocked.Add(ref sum, el);
Interlocked.Increment(ref count);

return TaskHelper.CompletedTask;
return Task.CompletedTask;
});

Assert.Equal(55, sum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

namespace CSRakowski.Parallel.Tests
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

[Collection("ParallelAsync AsyncStreams Extension Methods Tests")]
public class ExtensionMethodsTests_AsyncStreams
{
Expand Down Expand Up @@ -128,7 +126,4 @@ public async Task ParallelAsync_Can_Chain_Together_AsyncStreams()
}
}
}

#endif //NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ await parallelAsync.ForEachAsync((el) => {
Interlocked.Add(ref sum, el);
Interlocked.Increment(ref count);

return TaskHelper.CompletedTask;
return Task.CompletedTask;
});

Assert.Equal(55, sum);
Expand All @@ -108,7 +108,7 @@ await parallelAsync.ForEachAsync((el, ct) => {
Interlocked.Add(ref sum, el);
Interlocked.Increment(ref count);

return TaskHelper.CompletedTask;
return Task.CompletedTask;
});

Assert.Equal(55, sum);
Expand Down
Loading
Loading