Loupe

De C# à C++ : elle est où la TPL ? Vive la PPL ! (partie 2)

 

Si vous lisez cela, c’est que vous avez surement apprécié la première partie de ce sujet et je vous en remercie. Dans cette suite, nous allons discuter de plusieurs choses intéressantes : comment annuler des taches (les cancellationToken de .Net !) et découvrir les “groupes de taches”.

Petit rappel des épisodes de la série :

Le système d’annulation des taches

En C# nous avons un système d’annulation des tâches qui utilise des tokens, passés d’appels en appels pour signaler une demande d’annulation ou une annulation. En C++ c’est aussi ce système de jeton d’annulation qui est utilisé.

Bien sûr, il existe la valeur spéciale “concurrency::cancellation_token::none” mais le plus simple pour créer un jeton est d’instancier et d’utiliser un objet de type concurrency::cancellation_token_source. Celui-ci possède une méthode get_token pour obtenir un token et vous pouvez appeler la méthode cancel pour “annuler” le jeton émis. Rien de bien compliqué, cela ressemble donc fortement à ce dont nous avons l’habitude en C#. Que se passe t’il lorsqu’un cancellation_token_source est détruit pour un jeton émis ? Rien du tout, le jeton reste dans le même état (qui ne pourras donc plus changer).

Petite aparté : mon tout premier réflexe était de gérer mes cancellation_token_source à l’aide des shared pointer en utilisant make_shared pour en créer et les passer à mes tasks mais Simon m’a appris dernièrement que ce n’est pas utile : le concurrency runtime (CR) utilise le pattern Pointer To Implementation (PTI – tu blogges quand dessus Simon ?) (qui sont elles mêmes ref counté) et ce n’est pas du tout couteux de les passer par valeur. De même, un pool de ces implémentations et géré par le CR et les créations de cancellation_token_source deviennent moins couteuse car les implémentations sont réutilisées. Pour résumer, les cancellation_token_source se comportent comme des smart pointers et ce n’est pas un problème de les passer par valeur.

 

//création d'une cancellation_token_source
concurrency::cancellation_token_source tokenSource ;

//Récupération du token
concurrency::cancellation_token token = tokenSource.get_token();

//Annulation du jeton émis
tokenSource.cancel();

Une fois le jeton acquis, vous pouvez le passer lors de la création de vos tasks à la méthode create_task. En passant un jeton, vous indiquez à la PPL que la task est “annulable”. Lorsque vous appellerez la méthode cancel sur votre source, il ne se passera donc …. rien ! En effet, il peut se produire 2 cas de figure :

  • La tâche n’a pas été encore exécuté/lancée,  par exemple dans le cas ou un thread libre est attendu: elle ne sera tout simplement pas lancée.
  • La tâche est déjà en cours d’exécution : dans ce cas, c’est à vous, dans votre code, de gérer ce cas et de faire le nécessaire en annulant le traitement après avoir fait le ménage nécessaire.

Pour savoir si la tache doit être annulée, on peut utiliser la méthode is_canceled présente sur les jetons mais aussi appeler la méthode concurrency::is_task_cancellation_requested depuis la lambda exécutée par la task.

Si le booleen retourné est true, alors vous devez arrêter votre traitement en cours et appeler la méthode concurrency::cancel_current_task. Il est très important de ne pas simplement faire un return mais bien d’appeler cette méthode qui aura pour effet de passer la task dans le status “canceled”. Aussi, si vous n’avez pas de traitement particulier à faire et voulez juste quitter la méthode si cela est nécessaire, vous pouvez appeler directement la fonction concurrency::interruption_point qui appelle cancel_current_task si nécessaire et ne fait rien autrement. Le système d’annulation des tasks fonctionne à l’aide d’exceptions (et notamment task_canceled). Ne soyez donc pas étonné si des exceptions apparaissent dans votre fenêtre d’Output. Il est important de laisser la PPL gérer cette exception pour vous. Attention, cette exception sera aussi levée lorsque vous faite le “get” sur votre task.

//Création d'une task en passant le token
auto firstTask = concurrency::create_task(
    [](){
        int i = 1;
        while (true){
            //patientons une seconde
            concurrency::wait(1000);
            OutputDebugString((L"Passage n°" + std::to_wstring(i) + L"\n").c_s
            i++;

            //Doit on arreter le traitement ?
            if (concurrency::is_task_cancellation_requested()){

                //Vanish et les taches s'évanouissent
                concurrency::cancel_current_task();
            }

            //Il aurait aussi été possible de faire cela
            concurrency::interruption_point();
        }
},
    //Le token est passé ici lorsque l'on créé la task
    token);

//Laissons un peu de temps à notre tâche
concurrency::wait(3000);
tokenSource.cancel();

try{
    firstTask.get();
}
catch (concurrency::task_canceled& canceledException){
    OutputDebugString(L"La task a été annulée\n");
}

 

De même, si vous souhaitez faire un wait, plutôt qu’un get, le status retourné sera canceled :

auto status = chainedTask.wait();
if (status == concurrency::task_status::canceled){

    OutputDebugString(L"La task a été annulée\n");
}

Nous avons maintenant le principe de fonctionnement pour une tache : comment cela fonctionne t’il avec une chaine de continuation? C’est en fait assez intuitif : votre jeton d’annulation est passé automatiquement dans la chaine de continuation “classique” et ne l’est pas si vous utilisez une lambda prenant une task en paramètre. Le jeton est ainsi “hérité” de sa task précédente. Lorsqu’une task est annulée, les lambdas chainées suivantes ne sont pas exécutées sauf celles prenant une task en paramètre. On comprend alors l’importance de bien appeler “cancel_current_task” lorsqu’une annulation est demandée : autrement, les task de continuations attendant une valeur s’exécuteraient comme si le traitement s’était terminé normalement. Aussi, si vous ne faites pas de “get” dans une continuation prenant une task en paramètre alors l’annulation sera ignorée comme c’est le cas pour les erreurs mais sans faire crasher l’application.

Si l’on utilises la même task que tout à l’heure on peut alors avoir ce type de situation :

auto chainedTask = firstTask
    .then([](){
        //Jamais executé car c'est une lambda prenant une valeur en paramètre.        
        OutputDebugString(L"Je ne serais pas affiché :-(\n");
}).then([](concurrency::task<void> resultTask){
    //Executé même si la task est annulée.
    OutputDebugString(L"Je suis une lambda prenant une task en paramètre.\n");

    //On gére l'annulation ici mais on pourrait le faire sur la chainedTask 
    //  aussi sans être dans une lambda.
    try{
        resultTask.get();
    }
    catch (concurrency::task_canceled& canceledException){
        OutputDebugString(L"La task a été annulée\n");
    }
});

On a alors cette sortie dans l’Output Visual Studio :

Passage n°1
Passage n°2
Je suis une lambda prenant une task en paramètre.
La task a été annulée
First-chance exception at 0x77674B32 in PPLConsoleApp.exe: 
   Microsoft C++ exception: Concurrency::task_canceled at memory location 0x0099E15C.

 

Dernière petite astuce : il est aussi possible de s’abonner à l’annulation d’un jeton à l’aide de sa fonction register_callback.  Si le jeton est déjà annulé, le callback sera exécuté tout de suite, de façon synchrone. Attention, si vous vous abonnez au jeton “concurrency::cancellation_token::none”, une exception est levée.

token.register_callback([](){OutputDebugString(L"Les annulations, ça me fout les jetons...\n"); });

 

 

Les groupes de tasks

Un groupe de task est une façon de regrouper plusieurs tasks ensemble et de les manipuler d’un bloc plutôt que de faire de la gestion tache par tache : annulation, attente de terminaison sur le même thread ou sur un autre thread, etc. Ce sont ces groupes qui sont utilisés par d’autres algorithmes de la PPL tels que parallel_invoke. Vous n’allez peut être pas vous en servir souvent mais comprendre ce qu’il y a sous le capot vous aidera mieux à comprendre la suite.

Il existe 2 types de groupes de tasks : les groupes structurés (structured_task_group) et les groupes … non structurés (task_group). Les deux supportent le système d’annulation vu précédemment, il suffit de passer un jeton d’annulation à leur constructeur. Voici les caractéristiques majeures de chaque type.

Groupes structurés(structured_task_group):

  • n’est pas thread safe et toutes les opérations doivent être effectuées sur le même thread (à part l’annulation : cancel).
  • impossible de rajouter une tache après avoir attendu la fin (wait).
  • les groupes imbriqués enfants(des groupes créés dans le contexte d’un groupe parent) doivent être terminés avant le groupe parent.
  • C’est à vous de gérer la durée de vie de vos traitements via des task handle(plus de détails sur cela ci-dessous), le groupe ne s’en charge pas.

Groupes NON structurés(task_group):

  • thread safe : youpi !
  • il est possible de rajouter une tache après avoir attendu la fin (wait).
  • pas de contraintes sur les groupes imbriqués enfants.
  • pas besoin de gérer d’handle si vous passez des lambdas à exécuter

Vous remarquerez que les taches non structurés sont bien plus alléchantes car leur utilisation est moins contraignante. Sachez que cette souplesse vient au prix des performances, et que si vous devez exécuter des task simples toutes gérées depuis le même thread, il est recommandé d’utiliser des groupes structurés.

Passons maintenant à la pratique avec les groupes non structurés : on commence par créer un groupe (comme pour les cancellation_token_source, c’est déjà un smart pointer). Ensuite, pour lancer une tache, il faut appeler la méthode run du groupe en fournissant une lambda. Cette tache sera ordonnée dans le contexte du groupe et exécutée lorsqu’un “slot” sera disponible pour elle. Je peux aussi ajouter une autre exécution au groupe mais après avoir commencé à attendre ça fin. Le groupe possède aussi une méthode “wait” permettant d’attendre la fin de l’exécution du groupe. Cette méthode lèvera les exceptions qui se sont produite dans le groupe et c’est un bon endroit pour mettre un try/catch. Une légère variante run_and_wait permet de lancer une dernière tache avant d’attendre. Dans l’exemple suivant, nous créons quelques tasks dans un groupe et nous attendons 10 secondes avant de l’annuler dans une autre task.

concurrency::cancellation_token_source tokenSource;
concurrency::cancellation_token token = tokenSource.get_token();

//Création d'un groupe annulable
concurrency::task_group group(token);

//Ajout de 3 tasks dans le groupe
for (size_t ite = 0; ite < 3; ite++)
{
    group.run([ite](){
      int i = 1;
      while (true){

          concurrency::wait(1000 * 5);
          concurrency::interruption_point();
              OutputDebugString((std::to_wstring(ite) + L" : passage n°"
            + std::to_wstring(i) + L"\n").c_str());
          i++;

      }
    });
}

//Ajout d'une task dans le groupe depuis un autre thread
// au bout d'une seconde
concurrency::create_task([group](){
    concurrency::wait(1000);
    group.run([](){
      OutputDebugString(
        L" Task ajoutée et exécutée depuis un autre thread.\n");
    });

});

try{
    OutputDebugString(L"J'attends la fin du groupe.\n");

    //on attends 10 secondes avant d'annuler le groupe
    group.run_and_wait([tokenSource](){
        concurrency::wait(10 * 1000);
        tokenSource.cancel();
    });
}
//Le groupe est annulé
catch (concurrency::task_canceled& canceledException){
    OutputDebugString(L"Le boys band a été annulé\n");
}

 

Dans un groupe structuré de task(structured_task_group), vous ne pouvez pas passer directement vos lambdas/functions mais vous devez créer des “handles”. Un handle est un objet contenant une copie de votre lambda (pas possible de la modifier ensuite donc) et tout ce qui est nécessaire pour gérer son état. En créer un est tout simple, il suffit d’appeler la fonction concurrency::make_task en lui passant la lambda ciblé. Une fois un handle en main, vous pouvez appeler la méthode run d’une structured_task_group en lui passant l’handle de la même manière que vous l’auriez fait avec un groupe non structuré. Attention, c’est à vous de gérer la durée de vie de votre handle et de bien veiller à ce qu’il ne soit pas détruit avant que vous n’ayez appelé la méthode wait (ou run_and_wait) sur le groupe pour s’assurer que la PPL n’en ai plus besoin.

//Création d'un groupe annulable
concurrency::structured_task_group structuredGroup ;

/*
CA NE MARCHE PAS !
structuredGroup.run([](){
OutputDebugString(
L"Task ajoutée et exécutée  depuis un autre thread.\n");
});
*/

auto handle1 = concurrency::make_task([](){
    OutputDebugString(L"Task ajoutée et exécutée  depuis un autre thread.\n");
});

auto handle2 = concurrency::make_task([](){
    OutputDebugString(L"Task ajoutée et exécutée  depuis un autre thread.\n");
});

structuredGroup.run(handle1);
structuredGroup.run(handle2);

structuredGroup.wait();

 

 

Derniers détails : dans le cas ou le code d’une lambda/d’un handle lève une exception, le groupe arrête tout traitement actif et n’en démarre plus de nouveau. Vous pouvez alors retrouvez l’exception à l’endroit où vous faite le wait sur le groupe. Vous pouvez donc faire un try/catch à cet endroit pour attraper les exceptions :

group.run([](){ throw  std::exception("Coucou !");    });

try{
    OutputDebugString(L"J'attends la fin du boys band.\n");
    group.wait();
}
//Le groupe est annulé
catch (concurrency::task_canceled &canceledException){
    OutputDebugString(L"Pas de concert ce soir\n");
}
//Bim !
catch (const std::exception &exception){
    OutputDebugString(L"Mais où est la règle ?\n");
}

 

Je pensais mettre quelques autres sujets dans cet article de blog mais je me rends compte que c’est déjà assez costaud pour aujourd’hui. Je vous laisse donc le temps de bien digérer cela avant de retrouver prochainement la partie 3!

Merci pour votre lecture.

Photo de profil

Ces billets pourraient aussi vous intéresser

Vous nous direz ?!

Commentaires

comments powered by Disqus