« Task » : différence entre les versions
De Banane Atomic
Aller à la navigationAller à la recherche
Ligne 301 : | Ligne 301 : | ||
// create 10 jobs in parallel, the job 5 will raise an Exception | // create 10 jobs in parallel, the job 5 will raise an Exception | ||
// the created jobs are wrapped into TaskResult to handle Exception | // the created jobs are wrapped into TaskResult to handle Exception | ||
var createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"). | var createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync()); | ||
var jobs = await Task.WhenAll(createJobTasks); | var jobs = await Task.WhenAll(createJobTasks); | ||
Ligne 327 : | Ligne 327 : | ||
public static class TaskExtension | public static class TaskExtension | ||
{ | { | ||
public static async Task<TaskResult<T>> | public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task) | ||
{ | { | ||
try | try |
Version du 9 novembre 2023 à 18:50
Links
Task
A Task represents an asynchronous operation.
// create and run a task in a new thread var result = await Task.Run(async () => { await Task.Delay(4000); return 0; }); |
Cancel a task
var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s CancellationToken token = cancellationTokenSource.Token; try { await Task.Run(async () => { var i = 1; while (!token.IsCancellationRequested) // break the loop if cancellation is requested { Console.Write($"{i++} "); await Task.Delay(1000); } token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException }, token); } catch (OperationCanceledException e) { Console.WriteLine(e.Message); } cancellationTokenSource.Token.Register(() => { /* what to when the token is cancelled */ }); |
Fire and forget
Call an async method without waiting for the response. Exceptions will be lost.
_ = MyMethodAsync().ConfigureAwait(false); // ConfigureAwait to avoid deadlock MyMethodAsync().Forget(); async Task MyMethodAsync() {} public static class TaskExtension { public static async void AndForget(this Task task) { await task; } } |
Propriété WPF
Nuget:
- Nito.Mvvm.Async prerelease
- FontAwesome.WPF
MainWindow.xaml |
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework" xmlns:fa="http://schemas.fontawesome.io/icons/"> <Window.Resources> <Controls:BooleanToVisibilityConverter x:Key="BooleanToVisibilityConverter" /> </Window.Resources> <TextBox Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}" /> <TextBlock Text="{Binding ResultTask.Result}" /> <Label Content="Loading..." Visibility="{Binding ResultTask.IsNotCompleted, Converter={StaticResource BooleanToVisibilityConverter}, FallbackValue=Collapsed}"/> <fa:ImageAwesome Icon="Refresh" Spin="True" Height="16" Width="16" Visibility="{Binding ResultTask.IsNotCompleted, Converter={StaticResource BooleanToVisibilityConverter}, FallbackValue=Collapsed}" /> <Label Content="{Binding NotifyValuesTask.ErrorMessage}" Visibility="{Binding ResultTask.IsFaulted, Converter={StaticResource BooleanToVisibilityConverter}}"/> |
MainVM.cs |
private string _query; public string Query { get { return _query; } set { Set(() => Query, ref _query, value, true); ResultTask = NotifyTask.Create(GetResultAsync(_query, MyCallback)); } } private void MyCallback() { /* ... */ } private NotifyTask<string> _resultTask; public NotifyTask<string> ResultTask { get { return _resultTask; } set { Set(() => ResultTask, ref _resultTask, value, true); } } public async Task<string> GetResultAsync(string query, Action callback) { var url = $"http://localhost:57157/api/v1/test/result/{query}"; var responseMessage = await _client.GetAsync(url); if (responseMessage.IsSuccessStatusCode) { return await responseMessage.Content.ReadAsStringAsync(); } else { return await Task.FromResult($"{responseMessage.StatusCode}: {responseMessage.ReasonPhrase}"); } callback(); } |
Parallel
Utile si le code n'est pas séquentiel.
// for loop Parallel.For(1, 20, i => { Console.WriteLine(i); Thread.Sleep(1000); }); // for each loop Parallel.ForEach(Enumerable.Range(1, 20), i => { Console.WriteLine(i); Thread.Sleep(1000); }); // invoke actions Parallel.Invoke( () => { Console.WriteLine(1); Thread.Sleep(1000); }, () => { Console.WriteLine(2); Thread.Sleep(1000); } ); |
Parallel options
// after 4s throw an OperationCanceledException // no further operations will start but don't stop currently executing operations var cancellationTokenSource = new CancellationTokenSource(4000); var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 12, // by default use as much computer power as possible TaskScheduler = null, CancellationToken = cancellationTokenSource.Token } Parallel.ForEach( numbers, parallelOptions, (int i, ParallelLoopState loopState) => { if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break { loopState.Break(); // break loop } if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break }); |
Handling exceptions
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
try { Parallel.Invoke( () => { var waitTime = DateTime.UtcNow.AddSeconds(4); while (DateTime.UtcNow < waitTime) { } }, () => { throw new Exception("MyException"); } ); } catch (AggregateException ex) { ex.InnerExceptions; // ReadOnlyCollection<Exception> } |
Lock
private static readonly object sumLock = new(); var sum = 0m; // shared variable, updated by threads Parallel.For(0, 100, i => { lock(sumLock) // only 1 thread at a time can access { sum += 0.5m; // code inside the lock should take as little time as possible } }); |
To avoid deadlocks:
|
Interlocked
Create thread-safe atomic operations.
Faster than lock, but Interlocked only works with integers. |
int sum = 0; // shared variable, updated by threads Parallel.For(0, 100, i => { Interlocked.Add(ref sum, 1); }); |
AsyncLocal
Allow to have a different variable for each async task.
private static AsyncLocal<decimal?> asyncLocal = new(); Parallel.For(0, 100, async (i) => { asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks }); |
Concurrent collections
BlockingCollection<T> | ajout et suppression thread-safe. Add, Take. FIFO par défaut. |
ConcurrentBag<T> | sans ordre, doublons autorisés. Add, TryTake, TryPeek. |
ConcurrentDictionary<TKey,T> | TryAdd, TryUpdate, AddOrUpdate, GetOrAdd. |
ConcurrentQueue<T> | FIFO. Enqueue, TryDequeue. |
ConcurrentStack<T> | LIFO. Push, TryPop. |
BlockingCollection<string> col = new BlockingCollection<string>(); col.Add("text"); string s = col.Take(); foreach (string v in col.GetConsumingEnumerable()) Console.WriteLine(v); |
Tasks
// AsParallel PLINQ var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x)); await Task.WhenAll(tasks); // ForEachAsync await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x)); // Task.Run run task in a new thread // even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x))); await Task.WhenAll(tasks); private async Task MyTaskAsync(int i) { Console.WriteLine(i); await Task.Delay(4000); } |
Handle exceptions with Task.WhenAll
// create 10 jobs in parallel, the job 5 will raise an Exception // the created jobs are wrapped into TaskResult to handle Exception var createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync()); var jobs = await Task.WhenAll(createJobTasks); // if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage))); await Task.WhenAll(writeTasks); async Task<Job> CreateJobAsync(string name) { await Task.Delay(1); if (name.EndsWith("5")) throw new Exception("Error 5"); return new Job { Name = name }; } class Job { public string Name { get; set; } } |
TaskExtension.cs |
public static class TaskExtension { public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task) { try { return new TaskResult<T> { Result = await task }; } catch (Exception e) { return new TaskResult<T> { Exception = e }; } } public class TaskResult<T> { public T Result; public Exception Exception { get; set; } public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty; public bool Success => Exception is null; } } |
PLINQ
AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.
var numbers = Enumerable.Range(0, 100_000_000); var parallelResult = numbers.AsParallel() .WithDegreeOfParallelism(2) .WithCancellation(token) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) .AsOrdered() // add overhead .Where(i => i % 2 == 0); // parcourt d'itération en mode parallèle, l'ordre est perdu. // le parcourt commence même si parallelResult n'est pas au complet parallelResult.ForAll(e => Console.WriteLine(e)); |