Loupe

[WPF] Asynchronisme et parallélisme avec TPL Dataflow

Introduction

Dataflow est un composant de la TPL (Task Parallel Library) qui permet la création de pipeline complexe de tâches, via un assemblage de Block. Il supporte la programmation asynchrone et le parallélisme avec une facilité de mise en place déconcertante. Pour plus d’informations : http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx.

Pour installer TPL Dataflow, exécutez la commande NuGet suivante dans la Package Manager Console.

PM> Install-Package Microsoft.Tpl.Dataflow

C’est comme les Lego™

Dataflow travaille avec différent type de blocs qui, une fois assemblés, forme un flux pour vos données.

Vous n’avez plus qu’à choisir les bons blocs !

ActionBlock<TInput>

  • Crée un bloc qui lance un délégué Action<T> pour chaque élément reçu.
  • Vous ne pouvez pas lier la sortie avec un autre bloc.
   1: private async void Initialize()
   2: {
   3:   ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
   4:   {
   5:     Console.WriteLine(++number);
   6:   });
   7:  
   8:   for (int i = 0; i < 10; i++)
   9:   {
  10:     await incrementActionBlock.SendAsync(i);
  11:   }
  12:  
  13:   // CONSOLE OUTPUT
  14:   //
  15:   // 1
  16:   // 4
  17:   // 5
  18:   // 6
  19:   // 7
  20:   // 8
  21:   // 9
  22:   // 10
  23:   // 2
  24:   // 3
  25: }

TransformBlock<TInput, TOutput>

  • Crée un bloc qui lance un délégué Func<T, TResult> pour chaque élément reçu.
  • Vous pouvez lier la sortie à un autre bloc.
   1: private async void Initialize()
   2: {
   3:   TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
   4:   {
   5:     return int.Parse(text);
   6:   }, new ExecutionDataflowBlockOptions
   7:   {
   8:     MaxDegreeOfParallelism = 2
   9:   });
  10:  
  11:   ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
  12:   {
  13:     Console.WriteLine(++number);
  14:   }, new ExecutionDataflowBlockOptions
  15:   {
  16:     MaxDegreeOfParallelism = 2
  17:   });
  18:  
  19:   // Link the blocks to create a pipeline !
  20:   convertTransformBlock.LinkTo(incrementActionBlock);
  21:  
  22:   List<string> numbers = new List<string> { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" };
  23:  
  24:   // Send data to the TransformBlock
  25:   foreach (var number in numbers)
  26:   {
  27:     await convertTransformBlock.SendAsync(number);
  28:   }
  29:  
  30:   // CONSOLE OUTPUT
  31:   //
  32:   // 2
  33:   // 1
  34:   // 3
  35:   // 4
  36:   // 5
  37:   // 6
  38:   // 7
  39:   // 9
  40:   // 10
  41:   // 8
  42: }

On remarque que les résultats obtenus dans la console sont ordonnés aléatoirement à cause de l’appel à la méthode asynchrone SendAsync et parce qu’on a activé le parallélisme avec le paramètre MaxDegreeOfParallelism.

BufferBlock<T>

  • Crée un bloc de stockage de données.
  • Vous pouvez lier la sortie à un autre bloc.
   1: TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
   2: {
   3:  return int.Parse(text);
   4: });
   5:  
   6: BufferBlock<int> temporaryBufferBlock = new BufferBlock<int>();
   7:  
   8: ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
   9: {
  10:  Console.WriteLine(++number);
  11: });
  12:  
  13: // Link the blocks to create a pipeline !
  14: convertTransformBlock.LinkTo(temporaryBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
  15:  
  16: List<string> numbers = new List<string> { "0", "1", "2" };
  17:  
  18: // Send data to the TransformBlock and buffer them
  19: foreach (var number in numbers)
  20: {
  21:  convertTransformBlock.SendAsync(number);
  22: }
  23:  
  24: // Signal to the block that all data was sent
  25: convertTransformBlock.Complete();
  26:  
  27: // Do something crazy here... :)
  28:  
  29: // Link the buffer to the ActionBlock to end the process
  30: temporaryBufferBlock.LinkTo(incrementActionBlock);
  31:  
  32: // CONSOLE OUTPUT
  33: //
  34: // 1
  35: // 2
  36: // 3

On utilise la classe DataflowLinkOptions pour activer la propriété PropagateCompletion afin de signaler automatiquement au bloc suivant quand le bloc précèdent se termine. Les blocs savent alors que leurs travaux sont terminés et qu’aucune donnée ne va être ajoutée dans le flux.

JoinBlock<T1, T2>

  • Crée un bloc qui relie plusieurs sources de données et qui attend un item de chacune des sources afin d’en créer un Tuple.
  • Vous pouvez lier la sortie à un autre bloc.
   1: JoinBlock<double, float> joinBlock = new JoinBlock<double, float>();
   2:  
   3: ActionBlock<Tuple<double, float>> incrementActionBlock = new ActionBlock<Tuple<double, float>>(number =>
   4: {
   5:   // Performs action only when the block receives each kind of number.
   6:   Console.WriteLine(number.Item1 + number.Item2);
   7: });
   8:  
   9: // Link the blocks to create a pipeline !
  10: joinBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
  11:  
  12: List<double> doubles = new List<double> { 0, 1, 2 };
  13: List<float> floats = new List<float> { 0, 1, 2 };
  14:  
  15: // Send data to the transform block and buffer them
  16: foreach (var doubleNumber in doubles)
  17: {
  18:   joinBlock.Target1.SendAsync(doubleNumber);
  19: }
  20:  
  21: foreach (var floatNumber in floats)
  22: {
  23:   joinBlock.Target2.SendAsync(floatNumber);
  24: }
  25:  
  26: // Signal to the block that all data was sent
  27: joinBlock.Complete();
  28:  
  29: // CONSOLE
  30: //
  31: // 0
  32: // 2
  33: // 4

Les données passées à ce bloc ne sont pas forcement du même type.

BatchBlock<T>

  • Crée un bloc qui stocke les données dans des tableaux d’une taille spécifique.
  • Vous pouvez lier la sortie à un autre bloc.
   1: BatchBlock<int> groupBatchBlock = new BatchBlock<int>(3);
   2:  
   3: ActionBlock<int[]> incrementActionBlock = new ActionBlock<int[]>(numbers =>
   4: {
   5:   Console.WriteLine(numbers.Aggregate((begin, end) => begin + end));
   6: });
   7:  
   8: groupBatchBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
   9:  
  10: for (int i = 0; i < 10; i++)
  11: {
  12:   groupBatchBlock.Post(i);
  13: }
  14:  
  15: // Signal to the block that all data was sent
  16: groupBatchBlock.Complete();
  17:  
  18: // CONSOLE OUTPUT
  19: //
  20: // 3
  21: // 12
  22: // 21
  23: // 9

BroadcastBlock<T>

  • Crée un bloc qui stocke un élément à la fois, écrasant chaque message par le nouvel arrivant.
  • Vous pouvez lier la sortie à d’autres blocs.
   1: private async void Initialize()
   2: {
   3:   BroadcastBlock<string> broadcastBlock = new BroadcastBlock<string>(text =>
   4:   {
   5:     return text.ToUpper();
   6:   }, new DataflowBlockOptions { BoundedCapacity = 1 });
   7:  
   8:   ActionBlock<string> helloActionBlock = new ActionBlock<string>(text =>
   9:   {
  10:     Console.WriteLine("Hello {0}", text);
  11:   });
  12:  
  13:   ActionBlock<string> byeActionBlock = new ActionBlock<string>(text =>
  14:   {
  15:     Console.WriteLine("Bye {0}", text);
  16:   });
  17:  
  18:   // Broadcast the message to others blocks
  19:   broadcastBlock.LinkTo(helloActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
  20:   broadcastBlock.LinkTo(byeActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
  21:  
  22:   List<string> names = new List<string> { "Max", "Jon", "Michel" };
  23:  
  24:   foreach (var name in names)
  25:   {
  26:     await broadcastBlock.SendAsync(name);
  27:   }
  28:  
  29:   // Signal to the block that all data was sent
  30:   broadcastBlock.Complete();
  31:  
  32:   // CONSOLE OUTPUT
  33:   //
  34:   // Bye MAX
  35:   // Bye JON
  36:   // Hello MAX
  37:   // Hello JON
  38:   // Hello MICHEL
  39:   // Bye MICHEL
  40: }

On ajuste la propriété BoundedCapacity de notre bloc pour être en mesure de ne mettre en mémoire qu’un seul message à la fois.

Une fois encore, les résultats dans la console sont ordonnés aléatoirement à cause de l’appel à la méthode asynchrone SendAsync.

Ces billets pourraient aussi vous intéresser

Vous nous direz ?!

Commentaires

comments powered by Disqus