[WPF] Asynchronisme et parallélisme avec TPL Dataflow
Introduction
Dataflow est un composant de la TPL (Task Parallel Library) qui permet la création de pipeline complexe de tâches, via un assemblage de Block. Il supporte la programmation asynchrone et le parallélisme avec une facilité de mise en place déconcertante. Pour plus d’informations : http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx.
Pour installer TPL Dataflow, exécutez la commande NuGet suivante dans la Package Manager Console.
PM> Install-Package Microsoft.Tpl.Dataflow
C’est comme les Lego™
Dataflow travaille avec différent type de blocs qui, une fois assemblés, forme un flux pour vos données.
Vous n’avez plus qu’à choisir les bons blocs !
ActionBlock<TInput>
- Crée un bloc qui lance un délégué Action<T> pour chaque élément reçu.
- Vous ne pouvez pas lier la sortie avec un autre bloc.
1: private async void Initialize()
2: {
3: ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
4: {
5: Console.WriteLine(++number);
6: });
7:
8: for (int i = 0; i < 10; i++)
9: {
10: await incrementActionBlock.SendAsync(i);
11: }
12:
13: // CONSOLE OUTPUT
14: //
15: // 1
16: // 4
17: // 5
18: // 6
19: // 7
20: // 8
21: // 9
22: // 10
23: // 2
24: // 3
25: }
TransformBlock<TInput, TOutput>
- Crée un bloc qui lance un délégué Func<T, TResult> pour chaque élément reçu.
- Vous pouvez lier la sortie à un autre bloc.
1: private async void Initialize()
2: {
3: TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
4: {
5: return int.Parse(text);
6: }, new ExecutionDataflowBlockOptions
7: {
8: MaxDegreeOfParallelism = 2
9: });
10:
11: ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
12: {
13: Console.WriteLine(++number);
14: }, new ExecutionDataflowBlockOptions
15: {
16: MaxDegreeOfParallelism = 2
17: });
18:
19: // Link the blocks to create a pipeline !
20: convertTransformBlock.LinkTo(incrementActionBlock);
21:
22: List<string> numbers = new List<string> { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" };
23:
24: // Send data to the TransformBlock
25: foreach (var number in numbers)
26: {
27: await convertTransformBlock.SendAsync(number);
28: }
29:
30: // CONSOLE OUTPUT
31: //
32: // 2
33: // 1
34: // 3
35: // 4
36: // 5
37: // 6
38: // 7
39: // 9
40: // 10
41: // 8
42: }
On remarque que les résultats obtenus dans la console sont ordonnés aléatoirement à cause de l’appel à la méthode asynchrone SendAsync et parce qu’on a activé le parallélisme avec le paramètre MaxDegreeOfParallelism.
BufferBlock<T>
- Crée un bloc de stockage de données.
- Vous pouvez lier la sortie à un autre bloc.
1: TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
2: {
3: return int.Parse(text);
4: });
5:
6: BufferBlock<int> temporaryBufferBlock = new BufferBlock<int>();
7:
8: ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
9: {
10: Console.WriteLine(++number);
11: });
12:
13: // Link the blocks to create a pipeline !
14: convertTransformBlock.LinkTo(temporaryBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
15:
16: List<string> numbers = new List<string> { "0", "1", "2" };
17:
18: // Send data to the TransformBlock and buffer them
19: foreach (var number in numbers)
20: {
21: convertTransformBlock.SendAsync(number);
22: }
23:
24: // Signal to the block that all data was sent
25: convertTransformBlock.Complete();
26:
27: // Do something crazy here... :)
28:
29: // Link the buffer to the ActionBlock to end the process
30: temporaryBufferBlock.LinkTo(incrementActionBlock);
31:
32: // CONSOLE OUTPUT
33: //
34: // 1
35: // 2
36: // 3
On utilise la classe DataflowLinkOptions pour activer la propriété PropagateCompletion afin de signaler automatiquement au bloc suivant quand le bloc précèdent se termine. Les blocs savent alors que leurs travaux sont terminés et qu’aucune donnée ne va être ajoutée dans le flux.
JoinBlock<T1, T2>
- Crée un bloc qui relie plusieurs sources de données et qui attend un item de chacune des sources afin d’en créer un Tuple.
- Vous pouvez lier la sortie à un autre bloc.
1: JoinBlock<double, float> joinBlock = new JoinBlock<double, float>();
2:
3: ActionBlock<Tuple<double, float>> incrementActionBlock = new ActionBlock<Tuple<double, float>>(number =>
4: {
5: // Performs action only when the block receives each kind of number.
6: Console.WriteLine(number.Item1 + number.Item2);
7: });
8:
9: // Link the blocks to create a pipeline !
10: joinBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
11:
12: List<double> doubles = new List<double> { 0, 1, 2 };
13: List<float> floats = new List<float> { 0, 1, 2 };
14:
15: // Send data to the transform block and buffer them
16: foreach (var doubleNumber in doubles)
17: {
18: joinBlock.Target1.SendAsync(doubleNumber);
19: }
20:
21: foreach (var floatNumber in floats)
22: {
23: joinBlock.Target2.SendAsync(floatNumber);
24: }
25:
26: // Signal to the block that all data was sent
27: joinBlock.Complete();
28:
29: // CONSOLE
30: //
31: // 0
32: // 2
33: // 4
Les données passées à ce bloc ne sont pas forcement du même type.
BatchBlock<T>
- Crée un bloc qui stocke les données dans des tableaux d’une taille spécifique.
- Vous pouvez lier la sortie à un autre bloc.
1: BatchBlock<int> groupBatchBlock = new BatchBlock<int>(3);
2:
3: ActionBlock<int[]> incrementActionBlock = new ActionBlock<int[]>(numbers =>
4: {
5: Console.WriteLine(numbers.Aggregate((begin, end) => begin + end));
6: });
7:
8: groupBatchBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
9:
10: for (int i = 0; i < 10; i++)
11: {
12: groupBatchBlock.Post(i);
13: }
14:
15: // Signal to the block that all data was sent
16: groupBatchBlock.Complete();
17:
18: // CONSOLE OUTPUT
19: //
20: // 3
21: // 12
22: // 21
23: // 9
BroadcastBlock<T>
- Crée un bloc qui stocke un élément à la fois, écrasant chaque message par le nouvel arrivant.
- Vous pouvez lier la sortie à d’autres blocs.
1: private async void Initialize()
2: {
3: BroadcastBlock<string> broadcastBlock = new BroadcastBlock<string>(text =>
4: {
5: return text.ToUpper();
6: }, new DataflowBlockOptions { BoundedCapacity = 1 });
7:
8: ActionBlock<string> helloActionBlock = new ActionBlock<string>(text =>
9: {
10: Console.WriteLine("Hello {0}", text);
11: });
12:
13: ActionBlock<string> byeActionBlock = new ActionBlock<string>(text =>
14: {
15: Console.WriteLine("Bye {0}", text);
16: });
17:
18: // Broadcast the message to others blocks
19: broadcastBlock.LinkTo(helloActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
20: broadcastBlock.LinkTo(byeActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
21:
22: List<string> names = new List<string> { "Max", "Jon", "Michel" };
23:
24: foreach (var name in names)
25: {
26: await broadcastBlock.SendAsync(name);
27: }
28:
29: // Signal to the block that all data was sent
30: broadcastBlock.Complete();
31:
32: // CONSOLE OUTPUT
33: //
34: // Bye MAX
35: // Bye JON
36: // Hello MAX
37: // Hello JON
38: // Hello MICHEL
39: // Bye MICHEL
40: }
On ajuste la propriété BoundedCapacity de notre bloc pour être en mesure de ne mettre en mémoire qu’un seul message à la fois.
Une fois encore, les résultats dans la console sont ordonnés aléatoirement à cause de l’appel à la méthode asynchrone SendAsync.
Commentaires