The order of the output of WhenAll is the same as the order of the input tasks.
varinputAndTask1 = new { Input = 1, Task = GetDataAsync(1) };
varinputAndTask2 = new { Input = 2, Task = GetDataAsync(2) };
varinputAndTask3 = new { Input = 3, Task = GetDataAsync(3) };
await Task.WhenAll(task1.Task, task2.Task, task3.Task);
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}"));
asyncTask<int> GetDataAsync(inti)
{
await Task.Delay(2000);
Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}");
return i * 2;
}
varinputAndTasks = Enumerable.Range(1, 10)
.Select(i => new { Input = i, Task = GetDataAsync(i) })
.ToList();
await Task.WhenAll(inputAndTasks.Select(x => x.Task));
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}"));
varinputs = Enumerable.Range(1, 10).ToArray();
varoutputs = await Task.WhenAll(inputs.Select(GetDataAsync));
// as the order is the same, you can ZipvarinputAndOutputs = inputs
.Zip(outputs, (input, output) => (input, output))
.ToArray();
inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}"));
Cancel a task
varcancellationTokenSource = newCancellationTokenSource(5000); // cancel after 5sCancellationTokentoken = cancellationTokenSource.Token;
try
{
await Task.Run(async () =>
{
var i = 1;
while (!token.IsCancellationRequested) // break the loop if cancellation is requested
{
Console.Write($"{i++} ");
await Task.Delay(1000);
}
token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
}, token);
}
catch (OperationCanceledExceptione)
{
Console.WriteLine(e.Message);
}
cancellationTokenSource.Token.Register(() => {
/* what to when the token is cancelled */
});
Call an async method without waiting for the response. Exceptions will be lost.
// without await, the method is called and the following lines of code are executed without waiting the end of MyMethodAsyncMyMethodAsync();
// without await, the code is executed in background// you may need to create a scope if you want to consume scoped services
Task.Run(async () =>
{
Method1();
awaitMethod2Async(); // wait the end of Method2Async before calling Method3Method3();
});
MyMethodAsync().Forget();
asyncTaskMyMethodAsync() {}
publicstaticclassTaskExtension
{
publicstaticvoidForget(thisTasktask)
{
if (!task.IsCompleted || task.IsFaulted)
{
_ = ForgetAwaited(task);
}
asyncstaticTaskForgetAwaited(Tasktask)
{
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
// simple version: returns voidpublicstaticasyncvoidForget(thisTasktask)
{
await task.ConfigureAwait(false); // ConfigureAwait to avoid deadlock
}
}
// for loop
Parallel.For(1, 20, i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// invoke actions
Parallel.Invoke(
() => {
Console.WriteLine(1);
Thread.Sleep(1000);
},
() => {
Console.WriteLine(2);
Thread.Sleep(1000);
}
);
Parallel options
// after 4s throw an OperationCanceledException// no further operations will start but don't stop currently executing operationsvarcancellationTokenSource = newCancellationTokenSource(4000);
varparallelOptions = newParallelOptions
{
MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
TaskScheduler = null,
CancellationToken = cancellationTokenSource.Token
}
Parallel.ForEach(
numbers,
parallelOptions,
(inti, ParallelLoopStateloopState) =>
{
if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
{
loopState.Break(); // break loop
}
if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});
Handling exceptions
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
privatereadonlyobjectsumLock = new();
varsum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
lock(sumLock) // only 1 thread at a time can access
{
sum += 0.5m; // code inside the lock should take as little time as possible
}
});
To avoid deadlocks:
use 1 lock object for each shared resource
avoid nested locks
use a new object
Interlocked
Create thread-safe atomic operations.
Faster than lock, but Interlocked only works with integers.
intsum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
Interlocked.Increment(refsum); // add 1 to sum and return sum + 1
Interlocked.Add(refsum, 2); // add 2 to sum and return sum + 2
});
AsyncLocal
Allow to have a different variable for each async task.
privatestaticAsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});
Concurrent collections
BlockingCollection<T>
ajout et suppression thread-safe. Add, Take. FIFO par défaut.
ConcurrentBag<T>
sans ordre, doublons autorisés. Add, TryTake, TryPeek.
ConcurrentDictionary<TKey,T>
TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
ConcurrentQueue<T>
FIFO. Enqueue, TryDequeue.
ConcurrentStack<T>
LIFO. Push, TryPop.
BlockingCollection<string> col = newBlockingCollection<string>();
col.Add("text");
strings = col.Take();
foreach (string v in col.GetConsumingEnumerable())
Console.WriteLine(v);
Tasks
// AsParallel PLINQvartasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);
// ForEachAsyncawait Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => awaitMyTaskAsync(x));
// Task.Run run task in a new thread// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be donevartasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);
privateasyncTaskMyTaskAsync(inti)
{
Console.WriteLine(i);
await Task.Delay(4000);
}
Handle exceptions with Task.WhenAll
The problem
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.
// create 10 jobs in parallel, the jobs 5 and 7 will raise an ExceptionIEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
varjobs = await mainTask;
}
catch (Exceptione)
{
e.Message; // Error Job 5, only the first exception is catched// AggregateException
mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7)// ReadOnlyCollection<Exception>
mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7
}
asyncTask<Job> CreateJobAsync(stringname)
{
await Task.Delay(1);
if (name.EndsWith("5") || name.EndsWith("7") )
thrownewException($"Error {name}");
returnnewJob { Name = name };
}
classJob
{
publicstring Name { get; set; }
}
Instead of having a unique try/catch for all the tasks, have it for each task.
// the created jobs are wrapped into TaskResult to handle ExceptionIEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = awaitTask.WhenAll(createJobTasks);
// if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessagevar writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
awaitTask.WhenAll(writeTasks);
AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.
varnumbers = Enumerable.Range(0, 100_000_000);
varparallelResult = numbers.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(token)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.AsOrdered() // add overhead
.Where(i => i % 2 == 0);
// parcourt d'itération en mode parallèle, l'ordre est perdu.// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));