1
16

Écrire un séquenceur de processus en PHP

Votre service doit envoyer une série de requêtes HTTP à un autre service sur le web via plusieurs canaux avec une contrainte sur le nombre de requêtes par canal et un délai minimum entre chaque requête sur le même canal. Apprenez à écrire en PHP un séquenceur de processus qui exécute plusieurs tâches en parallèle.

Lisez attentivement la documentation sur les fonctions PCNTL.

Téléchargez le fichier scheduler.php.

Lancez son exécution dans le processeur de commandes :

$ php scheduler.php
php scheduler.php channels tasks

Le programme attend 2 arguments : le nombre de canaux disponibles et le nombre de tâches à exécuter. Demandez l'exécution de 9 tâches sur 2 canaux :

$ php scheduler.php 2 9
   8  2 12:03:09 0.6
   9  1 12:03:09 0.8
   7  1 12:03:10 0.7
   6  2 12:03:10 0.9
   5  1 12:03:11 0.6
   4  2 12:03:11 0.6
   2  2 12:03:12 1.0
   3  1 12:03:12 0.3
   1  1 12:03:13 0.2

Le programme affiche le numéro de la tâche, le numéro du canal utilisé, l'heure et la durée de l'exécution. La durée d'une tâche est aléatoire entre 200 ms et 1 s. Remarquez que l'utilisation d'un canal est toujours d'une seconde.

  1. <?php
  2.  
  3. define('USAGE', 'php %s channels tasks');
  4.  
  5. $progname=basename($argv[0]);

Définit la constante USAGE qui permettra d'afficher la ligne de commande correcte pour lancer le programme. Récupère le nom du programme dans la ligne de commande et l'assigne à la variable globale $progname. $progname contient généralement php.

  1. function abort($msg, $code=1) {
  2.     echo $msg, PHP_EOL;
  3.     exit($code);
  4. }

La fonction abort affiche $msg et quitte le programme en appelant exit avec le code de retour $code défini à 1 par défaut.

  1. function usage() {
  2.     global $progname;
  3.  
  4.     abort(sprintf(USAGE, $progname), 1);
  5. }

La fonction usage affiche le format d'appel correct du programme et quitte le programme en appelant abort.

  1. if ($argc != 3) {
  2.     usage();
  3. }

Quitte le programme si le nombre d'arguments n'est pas correct.

  1. $nchannels=$argv[1];
  2. if (!is_numeric($nchannels) or $nchannels < 1) {
  3.     abort($nchannels . '?');
  4. }
  5.  
  6. $ntasks=$argv[2];
  7. if (!is_numeric($ntasks) or $ntasks < 0) {
  8.     abort($ntasks . '?');
  9. }

Récupère le nombre de canaux et le nombre de tâches sur la ligne de commande et les garde dans les variables globales $nchannels et $ntasks. Vérifie si les paramètres sont corrects. Quitte le programme en cas d'erreur.

  1. declare(ticks = 1);

Nécessaire à la gestion des signaux. La valeur 1 garantit un traitement immédiat.

  1. define('MINTIME', 0.2);
  2. define('MAXTIME', 1.0);
  3.  
  4. define('TIMESLICE', 1);

Définit la durée minimum et la durée maximum d'une tâche. TIMESLICE définit le délai minimum entre 2 utilisation du même canal. Essayez le programme avec MAXTIME à 2.0. Si une tâche dure plus longtemps que TIMESLICE, le canal utilisé est immédiatement disponible.

  1. $feeders=array();
  2. $atwork=array();

$feeders contient pour chaque canal le temps de sa prochaine disponibilité, i.e. le temps du démarrage de la dernière exécution + TIMESLICE. $atwork contient pour chaque canal utilisé le numéro du processus fils qui l'a mobilisé.

  1. pcntl_signal(SIGCHLD,   'sig_handler');
  2.  
  3. pcntl_signal(SIGINT,    'sig_handler');
  4. pcntl_signal(SIGTERM,   'sig_handler');
  5.  
  6. pcntl_signal(SIGHUP,    'sig_handler');
  7.  
  8. pcntl_signal(SIGQUIT,   SIG_IGN);

Capture les signaux SIGCHLD, SIGINT, SIGTERM et SIGHUP avec la fonction sig_handler. Ignore le signal SIGQUIT.

  1. for (;;) {

Le programme tourne dans une boucle dont il sort uniquement quand toutes les tâches ont été exécutées, en cas d'erreur ou à la réception d'un signal d'interruption.

  1.     for ($channel=0; $channel < $nchannels; $channel++) {

Parcourt tous les canaux.

  1.         if (!$ntasks) {
  2.             break;
  3.         }

Si toutes les tâches ont été exécutées, arrête de parcourir les canaux.

  1.         if (isset($atwork[$channel])) {
  2.             continue;
  3.         }

Si le canal est occupé, passe au suivant.

  1.         if (isset($feeders[$channel]) and $feeders[$channel] > microtime(true)) {
  2.             continue;
  3.         }

Si le temps pour la prochaine utilisation autorisée du canal n'est pas encore atteint, passe au suivant.

  1.         $pid = pcntl_fork();

Le canal est disponible. Duplique le processus.

  1.         if ($pid === -1) {
  2.             terminate(2);
  3.         }

Si la duplication a échoué, termine le programme avec le statut 2.

  1.         else if ($pid === 0) {
  2.             run_task($channel);
  3.             exit(0);
  4.         }

Si le processus est le processus fils, exécute une tâche et termine le processus.

  1.         else {
  2.             $atwork[$channel]=$pid;
  3.  
  4.             $feeders[$channel]=microtime(true)+TIMESLICE;
  5.  
  6.             --$ntasks;
  7.         }
  8.     }

Le processus père note le numéro du processus fils dans $atwork et le temps de la prochaine disponibilité du canal dans $feeders. Décrémente le nombre de tâches à exécuter.

  1.     if (!count($atwork)) {
  2.         if (!$ntasks) {
  3.             terminate();
  4.         }
  5.  
  6.         $dtime=min($feeders)-microtime(true);
  7.         $msecs=ceil($dtime*1000000);
  8.         usleep($msecs);
  9.         continue;
  10.     }

Une fois tous les canaux parcourus, si aucun canal est occupé, si toutes les tâches ont été exécutées, termine le programme normalement, sinon bloque le processus père jusqu'au prochain temps d'exécution disponible avant de reprendre le parcours de tous les canaux.

  1.     $resting=array_diff_key($feeders, array_values($atwork));
  2.     $dtime=$resting ? min($resting)-microtime(true) : 0;

Au moins un canal est occupé. Extrait la liste des canaux inoccupés en attente. Calcule quand un canal sera disponible.

  1.     if ($dtime > 0) {
  2.         $secs=floor($dtime);
  3.         $nano=(int)(($dtime-$secs)*1000000000);
  4.         @pcntl_sigtimedwait(array(SIGCHLD), $siginfo, $secs, $nano);
  5.     }

Si au moins un canal est en attente, suspend le programme jusqu'à la terminaison d'un processus fils ou au plus tard jusqu'à ce qu'un canal inoccupé soit disponible.

  1.     else {
  2.         @pcntl_sigwaitinfo(array(SIGCHLD));
  3.     }

Si tous les canaux sont occupés, suspend le programme jusqu'à la terminaison d'un processus fils.

  1.     while (($pid = pcntl_wait($status, WNOHANG)) > 0) {
  2.         if (pcntl_wifexited($status)) {
  3.             $retcode = pcntl_wexitstatus($status);
  4.             if ($retcode !== 0) {
  5.                 ;
  6.             }
  7.             $channel=array_search($pid, $atwork);
  8.             unset($atwork[$channel]);
  9.         }
  10.     }
  11. }

Capture en boucle la terminaison des processus fils. Si un processus s'est terminé normalement, récupère le code de retour du processus et libère le canal pour une prochaine exécution éventuelle. Reprend le parcours de tous les canaux.

  1. function terminate($code=0) {
  2.     exit($code);
  3. }

La fonction terminate quitte le programme en retournant $code qui est à 0 par défaut.

  1. function sig_handler($signo) {
  2.     switch ($signo) {
  3.         case SIGCHLD:
  4.             break;
  5.         case SIGHUP:
  6.             break;
  7.         case SIGINT:
  8.         case SIGTERM:
  9.             terminate(1);
  10.         default:
  11.             break;
  12.     }
  13. }

La fonction sig_handler gère les signaux SIGCHLD, SIGHUP, SIGINT et SIGTERM. SIGCHLD est reçu quand un processus fils se termine. Cet événement est capturé dans la boucle principale du programme. SIGHUP est ignoré. SIGINT et SIGTERM termine le programme avec le statut 1.

  1. function run_task($channel) {
  2.     global $ntasks;
  3.  
  4.     $ts=date('h:i:s');
  5.     $secs=rand(MINTIME*10, MAXTIME*10) / 10;
  6.  
  7.     $s=sprintf("%4d %2d %s %0.1f", $ntasks, $channel+1, $ts, $secs);
  8.     echo "$s\n";
  9.  
  10.     $msecs=$secs * 1000000;
  11.     usleep($msecs);
  12. }

La fonction run_task simule une tâche exécutée par un processus fils. Le code calcule une durée d'exécution aléatoire entre MINTIME et MAXTIME, affiche le numéro de la tâche, le numéro du canal utilisé, l'heure et la durée de l'exécution, puis bloque le retour de la fonction pour la durée d'exécution simulée.

Commentaires

Pour ajouter un commentaire, cliquez ici.