« Task » : différence entre les versions
(120 versions intermédiaires par le même utilisateur non affichées) | |||
Ligne 1 : | Ligne 1 : | ||
[[Category:CSharp]] | [[Category:CSharp]] | ||
= | = Links = | ||
* [[LINQ#Select_async|Select async]] | |||
* [[LINQ#Select_async|SelectMany async]] | |||
= | = Task = | ||
A Task represents an asynchronous operation. | |||
<kode lang= | <kode lang=cs> | ||
// | // create and run a task in a new thread | ||
var result = await Task.Run(async () => | |||
{ | |||
await Task.Delay(4000); | |||
return 0; | |||
}); | |||
</kode> | |||
// | = [https://stackoverflow.com/questions/27464287/what-is-the-difference-between-await-taskt-and-taskt-result await vs Result] = | ||
{| class="wikitable wtp" | |||
! await | |||
! Result | |||
|- | |||
| asynchronous wait || blocking wait | |||
|- | |||
| "yield" the current thread || block the current thread | |||
|- | |||
| (re-)raise the exception || wrap the exception in an AggregateException | |||
|} | |||
= WhenAll = | |||
{{info | The order of the output of WhenAll is the same as the order of the input tasks.}} | |||
<kode lang='cs'> | |||
var inputAndTask1 = new { Input = 1, Task = GetDataAsync(1) }; | |||
var inputAndTask2 = new { Input = 2, Task = GetDataAsync(2) }; | |||
var inputAndTask3 = new { Input = 3, Task = GetDataAsync(3) }; | |||
await Task.WhenAll(task1.Task, task2.Task, task3.Task); | |||
Task | |||
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished | |||
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}")); | |||
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}")); | |||
. | Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}")); | ||
Task<int> | async Task<int> GetDataAsync(int i) | ||
{ | { | ||
Console.WriteLine(" | await Task.Delay(2000); | ||
} | Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}"); | ||
return i * 2; | |||
} | |||
</kode> | |||
<kode lang='cs'> | |||
{ | var inputAndTasks = Enumerable.Range(1, 10) | ||
.Select(i => new { Input = i, Task = GetDataAsync(i) }) | |||
.ToList(); | |||
await Task.WhenAll(inputAndTasks.Select(x => x.Task)); | |||
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished | |||
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}")); | |||
</kode> | </kode> | ||
<kode lang='cs'> | <kode lang='cs'> | ||
var inputs = Enumerable.Range(1, 10).ToArray(); | |||
var outputs = await Task.WhenAll(inputs.Select(GetDataAsync)); | |||
// | // as the order is the same, you can Zip | ||
var inputAndOutputs = inputs | |||
.Zip(outputs, (input, output) => (input, output)) | |||
.ToArray(); | |||
inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}")); | |||
</kode> | </kode> | ||
= | = Cancel a task = | ||
<kode lang='cs'> | <kode lang='cs'> | ||
var cancellationTokenSource = new CancellationTokenSource(); | var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s | ||
CancellationToken token = cancellationTokenSource.Token; | CancellationToken token = cancellationTokenSource.Token; | ||
try | |||
{ | { | ||
await Task.Run(async () => | |||
{ | { | ||
Console.Write(" | var i = 1; | ||
while (!token.IsCancellationRequested) // break the loop if cancellation is requested | |||
{ | |||
Console.Write($"{i++} "); | |||
} | await Task.Delay(1000); | ||
} | |||
token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException | |||
}, token); | |||
} | } | ||
catch ( | catch (OperationCanceledException e) | ||
{ | { | ||
Console.WriteLine(e | Console.WriteLine(e.Message); | ||
} | } | ||
cancellationTokenSource.Token.Register(() => { | |||
/* what to when the token is cancelled */ | |||
}); | |||
</kode> | </kode> | ||
= | = [https://www.meziantou.net/fire-and-forget-a-task-in-dotnet.htm Fire and forget] = | ||
Call an async method without waiting for the response. Exceptions will be lost. | |||
<kode lang='cs'> | |||
// without await, the method is called and the following lines of code are executed without waiting the end of MyMethodAsync | |||
MyMethodAsync(); | |||
<kode lang=cs> | |||
// | // without await, the code is executed in background | ||
async | // you may need to create a scope if you want to consume scoped services | ||
Task.Run(async () => | |||
{ | { | ||
// | Method1(); | ||
await Method2Async(); // wait the end of Method2Async before calling Method3 | |||
Method3(); | |||
}); | |||
MyMethodAsync().Forget(); | |||
async Task MyMethodAsync() {} | |||
} | |||
public static class TaskExtension | |||
{ | { | ||
public static void Forget(this Task task) | |||
{ | |||
if (!task.IsCompleted || task.IsFaulted) | |||
{ | |||
_ = ForgetAwaited(task); | |||
} | |||
async static Task ForgetAwaited(Task task) | |||
{ | { | ||
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); | |||
} | } | ||
} | |||
} | |||
// simple version: returns void | |||
public static async void Forget(this Task task) | |||
{ | |||
{ | await task.ConfigureAwait(false); // ConfigureAwait to avoid deadlock | ||
} | |||
} | } | ||
</kode> | </kode> | ||
= [https://stackoverflow.com/questions/48212998/how-to-call-an-async-method-from-a-property-setter/48217792?noredirect=1#comment83625536_48217792 Propriété WPF] = | |||
Nuget: | Nuget: | ||
* {{boxx|Nito.Mvvm.Async}} prerelease | * {{boxx|Nito.Mvvm.Async}} prerelease | ||
* {{boxx|FontAwesome.WPF}} | * {{boxx|FontAwesome.WPF}} | ||
<filebox fn='MainWindow.xaml'> | |||
<filebox fn='MainWindow.xaml' collapsed> | |||
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework" | <Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework" | ||
xmlns:fa="http://schemas.fontawesome.io/icons/"> | xmlns:fa="http://schemas.fontawesome.io/icons/"> | ||
Ligne 199 : | Ligne 172 : | ||
Converter={StaticResource BooleanToVisibilityConverter}}"/> | Converter={StaticResource BooleanToVisibilityConverter}}"/> | ||
</filebox> | </filebox> | ||
<filebox fn='MainVM.cs'> | |||
<filebox fn='MainVM.cs' collapsed> | |||
private string _query; | private string _query; | ||
public string Query | public string Query | ||
Ligne 243 : | Ligne 217 : | ||
</filebox> | </filebox> | ||
= | = Parallel = | ||
Utile si le code n'est pas séquentiel. | |||
<kode lang='cs'> | |||
// for loop | |||
Parallel.For(1, 20, i => | |||
{ | |||
Console.WriteLine(i); | |||
Thread.Sleep(1000); | |||
}); | |||
// for each loop | |||
Parallel.ForEach(Enumerable.Range(1, 20), i => | |||
{ | |||
Console.WriteLine(i); | |||
Thread.Sleep(1000); | |||
}); | |||
// invoke actions | |||
Parallel.Invoke( | |||
() => { | |||
Console.WriteLine(1); | |||
Thread.Sleep(1000); | |||
}, | |||
() => { | |||
Console.WriteLine(2); | |||
Thread.Sleep(1000); | |||
} | |||
); | |||
</kode> | |||
== Parallel options == | |||
<kode lang='cs'> | |||
// after 4s throw an OperationCanceledException | |||
// no further operations will start but don't stop currently executing operations | |||
var cancellationTokenSource = new CancellationTokenSource(4000); | |||
var parallelOptions = new ParallelOptions | |||
{ | |||
MaxDegreeOfParallelism = 12, // by default use as much computer power as possible | |||
TaskScheduler = null, | |||
CancellationToken = cancellationTokenSource.Token | |||
} | |||
Parallel.ForEach( | |||
numbers, | |||
parallelOptions, | |||
(int i, ParallelLoopState loopState) => | |||
{ | |||
if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break | |||
{ | |||
loopState.Break(); // break loop | |||
} | |||
if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break | |||
}); | |||
</kode> | |||
== Handling exceptions == | |||
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any. | |||
<kode lang='cs'> | <kode lang='cs'> | ||
catch (AggregateException | try | ||
{ | |||
Parallel.Invoke( | |||
() => | |||
{ | |||
var waitTime = DateTime.UtcNow.AddSeconds(4); | |||
while (DateTime.UtcNow < waitTime) { } | |||
}, | |||
() => | |||
{ | |||
throw new Exception("MyException"); | |||
} | |||
); | |||
} | |||
catch (AggregateException ex) | |||
{ | { | ||
ex.InnerExceptions; // ReadOnlyCollection<Exception> | |||
} | } | ||
</kode> | </kode> | ||
= | == Shared variable == | ||
=== [https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock Lock] === | |||
<kode lang='cs'> | |||
private readonly object sumLock = new(); | |||
var sum = 0m; // shared variable, updated by threads | |||
Parallel.For(0, 100, i => | |||
{ | |||
lock(sumLock) // only 1 thread at a time can access | |||
{ | |||
sum += 0.5m; // code inside the lock should take as little time as possible | |||
} | |||
}); | |||
</kode> | |||
{{warn | To avoid deadlocks: | |||
* use 1 lock object for each shared resource | |||
* avoid nested locks | |||
* use a {{boxx|new object}}}} | |||
=== Interlocked === | |||
Create thread-safe atomic operations. | |||
{{warn | Faster than lock, but {{boxx|Interlocked}} only works with integers.}} | |||
<kode lang='cs'> | |||
int sum = 0; // shared variable, updated by threads | |||
Parallel.For(0, 100, i => | |||
{ | |||
Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1 | |||
Interlocked.Add(ref sum, 2); // add 2 to sum and return sum + 2 | |||
}); | |||
</kode> | |||
== AsyncLocal == | |||
Allow to have a different variable for each async task. | |||
<kode lang='cs'> | |||
private static AsyncLocal<decimal?> asyncLocal = new(); | |||
Parallel.For(0, 100, async (i) => | |||
{ | |||
asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks | |||
}); | |||
</kode> | |||
== Concurrent collections == | == Concurrent collections == | ||
{| class="wikitable wtp wtmono1" | {| class="wikitable wtp wtmono1" | ||
Ligne 274 : | Ligne 360 : | ||
</kode> | </kode> | ||
== | == Tasks == | ||
<kode lang='cs'> | <kode lang='cs'> | ||
// | // AsParallel PLINQ | ||
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x)); | |||
await Task.WhenAll(tasks); | |||
// ForEachAsync | |||
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x)); | |||
// Task.Run run task in a new thread | |||
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done | |||
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x))); | |||
await Task.WhenAll(tasks); | |||
private async Task MyTaskAsync(int i) | |||
{ | { | ||
Console.WriteLine(i); | |||
await Task.Delay(4000); | |||
} | |||
</kode> | |||
== Handle exceptions with Task.WhenAll == | |||
=== The problem === | |||
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks. | |||
<kode lang='cs' collapsed> | |||
// create 10 jobs in parallel, the jobs 5 and 7 will raise an Exception | |||
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}")); | |||
Task<Job[]> mainTask = Task.WhenAll(createJobTasks); | |||
try | |||
{ | { | ||
var jobs = await mainTask; | |||
} | } | ||
catch (Exception e) | |||
{ | |||
e.Message; // Error Job 5, only the first exception is catched | |||
// AggregateException | |||
mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7) | |||
// ReadOnlyCollection<Exception> | |||
mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7 | |||
} | |||
async Task<Job> CreateJobAsync(string name) | |||
{ | |||
await Task.Delay(1); | |||
if (name.EndsWith("5") || name.EndsWith("7") ) | |||
throw new Exception($"Error {name}"); | |||
return new Job { Name = name }; | |||
} | |||
class Job | |||
{ | { | ||
public string Name { get; set; } | |||
public | |||
} | } | ||
</kode> | </kode> | ||
== | === Solution: use the task ids === | ||
<kode lang='cs'> | <kode lang='cs'> | ||
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList(); | |||
{ | Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName); | ||
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList(); | |||
List<(string JobName, Job? Job, string ErrorMessage)> results; | |||
var results = await TaskExtension.WhenAll(createJobTasks); | |||
results.Select(x => new | |||
{ | |||
JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId], | |||
Job = x.Result, | |||
GlobalErrorMessage = x.Exception?.Message ?? string.Empty | |||
})); | |||
</kode> | </kode> | ||
= | <filebox fn='TaskExtension.cs' collapsed> | ||
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks) | |||
{ | { | ||
ArgumentNullException.ThrowIfNull(tasks); | |||
List<(int taskId, T? Result, AggregateException? Exception)> results = new(); | |||
try | |||
{ | |||
{ | await Task.WhenAll(tasks); | ||
} | |||
catch | |||
{ | |||
results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception))); | |||
} | |||
finally | |||
{ | |||
results.AddRange( | |||
tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default, | |||
(AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) })))); | |||
results.AddRange( | |||
tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result, | |||
(AggregateException?)default))); | |||
} | |||
return results; | |||
} | |||
} | </filebox> | ||
</ | |||
== | === [https://stackoverflow.com/questions/55887028/is-it-possible-to-get-successful-results-from-a-task-whenall-when-one-of-the-tas Solution: use ContinueWith] === | ||
<kode lang='cs'> | <kode lang='cs' collapsed> | ||
var | var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks); | ||
static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks) | |||
{ | { | ||
ArgumentNullException.ThrowIfNull(tasks); | |||
return Task.WhenAll(tasks).ContinueWith(t => | |||
{ | |||
T[] results = tasks | |||
.Where(t => t.IsCompletedSuccessfully) | |||
.Select(t => t.Result) | |||
.ToArray(); | |||
= | var aggregateExceptions = tasks | ||
.Where(t => t.IsFaulted) | |||
.Select(t => t.Exception!); | |||
var | var exceptions = new AggregateException(aggregateExceptions) | ||
.Flatten() | |||
.InnerExceptions | |||
.ToArray(); | |||
// No exceptions and at least one task was canceled | |||
if (exceptions.Length == 0 && t.IsCanceled) | |||
{ | |||
exceptions = new[] { new TaskCanceledException(t) }; | |||
} | |||
return (results, exceptions); | |||
}, | |||
default, | |||
TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously, | |||
TaskScheduler.Default); | |||
} | |||
</kode> | </kode> | ||
== | === [https://thesharperdev.com/csharps-whenall-and-exception-handling/ Solution: wrapping the task into a TaskResult] === | ||
<kode lang='cs'> | Instead of having a unique try/catch for all the tasks, have it for each task. | ||
<kode lang='cs' collapsed> | |||
// the created jobs are wrapped into TaskResult to handle Exception | |||
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync()); | |||
var jobs = await Task.WhenAll(createJobTasks); | |||
var | // if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage | ||
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage))); | |||
await Task.WhenAll(writeTasks); | |||
</kode> | |||
<filebox fn='TaskExtension.cs' collapsed> | |||
public static class TaskExtension | |||
{ | |||
public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task) | |||
{ | |||
try | |||
{ | { | ||
return new TaskResult<T> { Result = await task }; | |||
} | |||
catch (Exception e) | |||
{ | |||
} | return new TaskResult<T> { Exception = e }; | ||
} | |||
} | } | ||
public class TaskResult<T> | |||
{ | { | ||
public T? Result; | |||
public Exception? Exception { get; set; } | |||
public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty; | |||
public bool Success => Exception is null; | |||
} | |||
} | } | ||
</ | </filebox> | ||
== PLINQ == | == PLINQ == | ||
{{boxx|AsParallel}} analyses the query to see if it is suitable for parallelization. This analysis adds overhead.<br> | |||
If it is unsafe or faster to run sequentially then it won't be run in parallel. | |||
<kode lang='cs'> | <kode lang='cs'> | ||
var numbers = Enumerable.Range(0, 100_000_000); | var numbers = Enumerable.Range(0, 100_000_000); | ||
var parallelResult = numbers.AsParallel().AsOrdered() | var parallelResult = numbers.AsParallel() | ||
.WithDegreeOfParallelism(2) | |||
.WithCancellation(token) | |||
.WithExecutionMode(ParallelExecutionMode.ForceParallelism) | |||
.WithMergeOptions(ParallelMergeOptions.Default) | |||
.AsOrdered() // add overhead | |||
.Where(i => i % 2 == 0); | |||
// parcourt d'itération en mode parallèle, l'ordre est perdu. | // parcourt d'itération en mode parallèle, l'ordre est perdu. | ||
// le parcourt commence même si parallelResult n'est pas au complet | // le parcourt commence même si parallelResult n'est pas au complet | ||
parallelResult.ForAll(e => Console.WriteLine(e)); | parallelResult.ForAll(e => Console.WriteLine(e)); | ||
</kode> | </kode> |
Dernière version du 25 novembre 2024 à 11:18
Links
Task
A Task represents an asynchronous operation.
// create and run a task in a new thread
var result = await Task.Run(async () =>
{
await Task.Delay(4000);
return 0;
});
|
await vs Result
await | Result |
---|---|
asynchronous wait | blocking wait |
"yield" the current thread | block the current thread |
(re-)raise the exception | wrap the exception in an AggregateException |
WhenAll
![]() |
The order of the output of WhenAll is the same as the order of the input tasks. |
var inputAndTask1 = new { Input = 1, Task = GetDataAsync(1) };
var inputAndTask2 = new { Input = 2, Task = GetDataAsync(2) };
var inputAndTask3 = new { Input = 3, Task = GetDataAsync(3) };
await Task.WhenAll(task1.Task, task2.Task, task3.Task);
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}"));
async Task<int> GetDataAsync(int i)
{
await Task.Delay(2000);
Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}");
return i * 2;
}
|
var inputAndTasks = Enumerable.Range(1, 10)
.Select(i => new { Input = i, Task = GetDataAsync(i) })
.ToList();
await Task.WhenAll(inputAndTasks.Select(x => x.Task));
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}"));
|
var inputs = Enumerable.Range(1, 10).ToArray();
var outputs = await Task.WhenAll(inputs.Select(GetDataAsync));
// as the order is the same, you can Zip
var inputAndOutputs = inputs
.Zip(outputs, (input, output) => (input, output))
.ToArray();
inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}"));
|
Cancel a task
var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s
CancellationToken token = cancellationTokenSource.Token;
try
{
await Task.Run(async () =>
{
var i = 1;
while (!token.IsCancellationRequested) // break the loop if cancellation is requested
{
Console.Write($"{i++} ");
await Task.Delay(1000);
}
token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
}, token);
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
cancellationTokenSource.Token.Register(() => {
/* what to when the token is cancelled */
});
|
Fire and forget
Call an async method without waiting for the response. Exceptions will be lost.
// without await, the method is called and the following lines of code are executed without waiting the end of MyMethodAsync
MyMethodAsync();
// without await, the code is executed in background
// you may need to create a scope if you want to consume scoped services
Task.Run(async () =>
{
Method1();
await Method2Async(); // wait the end of Method2Async before calling Method3
Method3();
});
MyMethodAsync().Forget();
async Task MyMethodAsync() {}
public static class TaskExtension
{
public static void Forget(this Task task)
{
if (!task.IsCompleted || task.IsFaulted)
{
_ = ForgetAwaited(task);
}
async static Task ForgetAwaited(Task task)
{
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
// simple version: returns void
public static async void Forget(this Task task)
{
await task.ConfigureAwait(false); // ConfigureAwait to avoid deadlock
}
}
|
Propriété WPF
Nuget:
- Nito.Mvvm.Async prerelease
- FontAwesome.WPF
MainWindow.xaml |
MainVM.cs |
Parallel
Utile si le code n'est pas séquentiel.
// for loop
Parallel.For(1, 20, i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// invoke actions
Parallel.Invoke(
() => {
Console.WriteLine(1);
Thread.Sleep(1000);
},
() => {
Console.WriteLine(2);
Thread.Sleep(1000);
}
);
|
Parallel options
// after 4s throw an OperationCanceledException
// no further operations will start but don't stop currently executing operations
var cancellationTokenSource = new CancellationTokenSource(4000);
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
TaskScheduler = null,
CancellationToken = cancellationTokenSource.Token
}
Parallel.ForEach(
numbers,
parallelOptions,
(int i, ParallelLoopState loopState) =>
{
if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
{
loopState.Break(); // break loop
}
if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});
|
Handling exceptions
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
try
{
Parallel.Invoke(
() =>
{
var waitTime = DateTime.UtcNow.AddSeconds(4);
while (DateTime.UtcNow < waitTime) { }
},
() =>
{
throw new Exception("MyException");
}
);
}
catch (AggregateException ex)
{
ex.InnerExceptions; // ReadOnlyCollection<Exception>
}
|
Lock
private readonly object sumLock = new();
var sum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
lock(sumLock) // only 1 thread at a time can access
{
sum += 0.5m; // code inside the lock should take as little time as possible
}
});
|
![]() |
To avoid deadlocks:
|
Interlocked
Create thread-safe atomic operations.
![]() |
Faster than lock, but Interlocked only works with integers. |
int sum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1
Interlocked.Add(ref sum, 2); // add 2 to sum and return sum + 2
});
|
AsyncLocal
Allow to have a different variable for each async task.
private static AsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});
|
Concurrent collections
BlockingCollection<T> | ajout et suppression thread-safe. Add, Take. FIFO par défaut. |
ConcurrentBag<T> | sans ordre, doublons autorisés. Add, TryTake, TryPeek. |
ConcurrentDictionary<TKey,T> | TryAdd, TryUpdate, AddOrUpdate, GetOrAdd. |
ConcurrentQueue<T> | FIFO. Enqueue, TryDequeue. |
ConcurrentStack<T> | LIFO. Push, TryPop. |
BlockingCollection<string> col = new BlockingCollection<string>();
col.Add("text");
string s = col.Take();
foreach (string v in col.GetConsumingEnumerable())
Console.WriteLine(v);
|
Tasks
// AsParallel PLINQ
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);
// ForEachAsync
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));
// Task.Run run task in a new thread
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);
private async Task MyTaskAsync(int i)
{
Console.WriteLine(i);
await Task.Delay(4000);
}
|
Handle exceptions with Task.WhenAll
The problem
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.
|
Solution: use the task ids
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();
List<(string JobName, Job? Job, string ErrorMessage)> results;
var results = await TaskExtension.WhenAll(createJobTasks);
results.Select(x => new
{
JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
Job = x.Result,
GlobalErrorMessage = x.Exception?.Message ?? string.Empty
}));
|
TaskExtension.cs |
Solution: use ContinueWith
|
Solution: wrapping the task into a TaskResult
Instead of having a unique try/catch for all the tasks, have it for each task.
|
TaskExtension.cs |
PLINQ
AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.
var numbers = Enumerable.Range(0, 100_000_000);
var parallelResult = numbers.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(token)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.AsOrdered() // add overhead
.Where(i => i % 2 == 0);
// parcourt d'itération en mode parallèle, l'ordre est perdu.
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));
|