Links
Task
A Task represents an asynchronous operation.
|
var result = await Task.Run(async () =>
{
await Task.Delay(4000);
return 0;
});
|
await
|
Result
|
asynchronous wait |
blocking wait
|
"yield" the current thread |
block the current thread
|
(re-)raise the exception |
wrap the exception in an AggregateException
|
WhenAll
 |
The order of the output of WhenAll is the same as the order of the input tasks. |
|
var inputAndTask1 = new { Input = 1, Task = GetDataAsync(1) };
var inputAndTask2 = new { Input = 2, Task = GetDataAsync(2) };
var inputAndTask3 = new { Input = 3, Task = GetDataAsync(3) };
await Task.WhenAll(task1.Task, task2.Task, task3.Task);
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}"));
async Task<int> GetDataAsync(int i)
{
await Task.Delay(2000);
Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}");
return i * 2;
}
|
|
var inputAndTasks = Enumerable.Range(1, 10)
.Select(i => new { Input = i, Task = GetDataAsync(i) })
.ToList();
await Task.WhenAll(inputAndTasks.Select(x => x.Task));
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}"));
|
|
var inputs = Enumerable.Range(1, 10).ToArray();
var outputs = await Task.WhenAll(inputs.Select(GetDataAsync));
var inputAndOutputs = inputs
.Zip(outputs, (input, output) => (input, output))
.ToArray();
inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}"));
|
Cancel a task
|
var cancellationTokenSource = new CancellationTokenSource(5000);
CancellationToken token = cancellationTokenSource.Token;
try
{
await Task.Run(async () =>
{
var i = 1;
while (!token.IsCancellationRequested)
{
Console.Write($"{i++} ");
await Task.Delay(1000);
}
token.ThrowIfCancellationRequested();
}, token);
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
cancellationTokenSource.Token.Register(() => {
/* what to when the token is cancelled */
});
|
Call an async method without waiting for the response. Exceptions will be lost.
|
MyMethodAsync();
Task.Run(async () =>
{
Method1();
await Method2Async();
Method3();
});
MyMethodAsync().Forget();
async Task MyMethodAsync() {}
public static class TaskExtension
{
public static void Forget(this Task task)
{
if (!task.IsCompleted || task.IsFaulted)
{
_ = ForgetAwaited(task);
}
async static Task ForgetAwaited(Task task)
{
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
public static async void Forget(this Task task)
{
await task.ConfigureAwait(false);
}
}
|
Nuget:
- Nito.Mvvm.Async prerelease
- FontAwesome.WPF
MainWindow.xaml
|
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
xmlns:fa="http://schemas.fontawesome.io/icons/">
<Window.Resources>
<Controls:BooleanToVisibilityConverter x:Key="BooleanToVisibilityConverter" />
</Window.Resources>
<TextBox Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}" />
<TextBlock Text="{Binding ResultTask.Result}" />
<Label Content="Loading..."
Visibility="{Binding ResultTask.IsNotCompleted,
Converter={StaticResource BooleanToVisibilityConverter},
FallbackValue=Collapsed}"/>
<fa:ImageAwesome Icon="Refresh" Spin="True" Height="16" Width="16"
Visibility="{Binding ResultTask.IsNotCompleted,
Converter={StaticResource BooleanToVisibilityConverter},
FallbackValue=Collapsed}" />
<Label Content="{Binding NotifyValuesTask.ErrorMessage}"
Visibility="{Binding ResultTask.IsFaulted,
Converter={StaticResource BooleanToVisibilityConverter}}"/>
|
MainVM.cs
|
private string _query;
public string Query
{
get { return _query; }
set
{
Set(() => Query, ref _query, value, true);
ResultTask = NotifyTask.Create(GetResultAsync(_query, MyCallback));
}
}
private void MyCallback() { }
private NotifyTask<string> _resultTask;
public NotifyTask<string> ResultTask
{
get
{
return _resultTask;
}
set
{
Set(() => ResultTask, ref _resultTask, value, true);
}
}
public async Task<string> GetResultAsync(string query, Action callback)
{
var url = $"http://localhost:57157/api/v1/test/result/{query}";
var responseMessage = await _client.GetAsync(url);
if (responseMessage.IsSuccessStatusCode)
{
return await responseMessage.Content.ReadAsStringAsync();
}
else
{
return await Task.FromResult($"{responseMessage.StatusCode}: {responseMessage.ReasonPhrase}");
}
callback();
}
|
Parallel
Utile si le code n'est pas séquentiel.
|
Parallel.For(1, 20, i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
Parallel.Invoke(
() => {
Console.WriteLine(1);
Thread.Sleep(1000);
},
() => {
Console.WriteLine(2);
Thread.Sleep(1000);
}
);
|
Parallel options
|
var cancellationTokenSource = new CancellationTokenSource(4000);
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = 12,
TaskScheduler = null,
CancellationToken = cancellationTokenSource.Token
}
Parallel.ForEach(
numbers,
parallelOptions,
(int i, ParallelLoopState loopState) =>
{
if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
{
loopState.Break();
}
if (!cancellationTokenSource.Token.IsCancellationRequested) { }
});
|
Handling exceptions
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
|
try
{
Parallel.Invoke(
() =>
{
var waitTime = DateTime.UtcNow.AddSeconds(4);
while (DateTime.UtcNow < waitTime) { }
},
() =>
{
throw new Exception("MyException");
}
);
}
catch (AggregateException ex)
{
ex.InnerExceptions;
}
|
Shared variable
|
private readonly object sumLock = new();
var sum = 0m;
Parallel.For(0, 100, i =>
{
lock(sumLock) // only 1 thread at a time can access
{
sum += 0.5m;
}
});
|
 |
To avoid deadlocks:
- use 1 lock object for each shared resource
- avoid nested locks
- use a new object
|
Interlocked
Create thread-safe atomic operations.
 |
Faster than lock, but Interlocked only works with integers. |
|
int sum = 0;
Parallel.For(0, 100, i =>
{
Interlocked.Increment(ref sum);
Interlocked.Add(ref sum, 2);
});
|
AsyncLocal
Allow to have a different variable for each async task.
|
private static AsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
asyncLocal.Value = 10;
});
|
Concurrent collections
BlockingCollection<T> |
ajout et suppression thread-safe. Add, Take. FIFO par défaut.
|
ConcurrentBag<T> |
sans ordre, doublons autorisés. Add, TryTake, TryPeek.
|
ConcurrentDictionary<TKey,T> |
TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
|
ConcurrentQueue<T> |
FIFO. Enqueue, TryDequeue.
|
ConcurrentStack<T> |
LIFO. Push, TryPop.
|
|
BlockingCollection<string> col = new BlockingCollection<string>();
col.Add("text");
string s = col.Take();
foreach (string v in col.GetConsumingEnumerable())
Console.WriteLine(v);
|
Tasks
|
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);
private async Task MyTaskAsync(int i)
{
Console.WriteLine(i);
await Task.Delay(4000);
}
|
Handle exceptions with Task.WhenAll
The problem
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.
|
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
var jobs = await mainTask;
}
catch (Exception e)
{
e.Message;
mainTask.Exception.Message;
mainTask.InnerExceptions;
}
async Task<Job> CreateJobAsync(string name)
{
await Task.Delay(1);
if (name.EndsWith("5") || name.EndsWith("7") )
throw new Exception($"Error {name}");
return new Job { Name = name };
}
class Job
{
public string Name { get; set; }
}
|
Solution: use the task ids
|
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();
List<(string JobName, Job? Job, string ErrorMessage)> results;
var results = await TaskExtension.WhenAll(createJobTasks);
results.Select(x => new
{
JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
Job = x.Result,
GlobalErrorMessage = x.Exception?.Message ?? string.Empty
}));
|
TaskExtension.cs
|
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks)
{
ArgumentNullException.ThrowIfNull(tasks);
List<(int taskId, T? Result, AggregateException? Exception)> results = new();
try
{
await Task.WhenAll(tasks);
}
catch
{
results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception)));
}
finally
{
results.AddRange(
tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default,
(AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) }))));
results.AddRange(
tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result,
(AggregateException?)default)));
}
return results;
}
|
|
var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks);
static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks)
{
ArgumentNullException.ThrowIfNull(tasks);
return Task.WhenAll(tasks).ContinueWith(t =>
{
T[] results = tasks
.Where(t => t.IsCompletedSuccessfully)
.Select(t => t.Result)
.ToArray();
var aggregateExceptions = tasks
.Where(t => t.IsFaulted)
.Select(t => t.Exception!);
var exceptions = new AggregateException(aggregateExceptions)
.Flatten()
.InnerExceptions
.ToArray();
if (exceptions.Length == 0 && t.IsCanceled)
{
exceptions = new[] { new TaskCanceledException(t) };
}
return (results, exceptions);
},
default,
TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
|
Instead of having a unique try/catch for all the tasks, have it for each task.
|
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = await Task.WhenAll(createJobTasks);
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
await Task.WhenAll(writeTasks);
|
TaskExtension.cs
|
public static class TaskExtension
{
public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task)
{
try
{
return new TaskResult<T> { Result = await task };
}
catch (Exception e)
{
return new TaskResult<T> { Exception = e };
}
}
public class TaskResult<T>
{
public T? Result;
public Exception? Exception { get; set; }
public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty;
public bool Success => Exception is null;
}
}
|
PLINQ
AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.
|
var numbers = Enumerable.Range(0, 100_000_000);
var parallelResult = numbers.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(token)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.AsOrdered() // add overhead
.Where(i => i % 2 == 0);
parallelResult.ForAll(e => Console.WriteLine(e));
|