16

Write a process scheduler in PHP

Your service has to send a series of HTTP requests to another service on the web through several channels with a constraint on the number of requests per channel and a minimum delay between each request on the same channel. Learn how to write in PHP a process scheduler which runs several tasks in parallel with a minimum delay to wait between each request.

Carefully read the documentation on the PCNTL functions.

Download the file scheduler.php.

Run it in the command processor:

$ php scheduler.php
php scheduler.php channels tasks

The program expects 2 arguments: the number of channels available and the number of tasks to run. Ask to run 9 tasks on 2 channels:

$ php scheduler.php 2 9
   8  2 12:03:09 0.6
   9  1 12:03:09 0.8
   7  1 12:03:10 0.7
   6  2 12:03:10 0.9
   5  1 12:03:11 0.6
   4  2 12:03:11 0.6
   2  2 12:03:12 1.0
   3  1 12:03:12 0.3
   1  1 12:03:13 0.2

The program displays the task number, the channel number used, the execution time and duration. The duration of a task is random between 200 ms and 1 s. Notice that a channel is always used for 1 second.

  1. <?php
  2.  
  3. define('USAGE', 'php %s channels tasks');
  4.  
  5. $progname=basename($argv[0]);

Defines the constant USAGE which will be used to display the correct command line usage for the program. Retrieves the name of the program from the command line and assigns it to the global variable $progname. $progname usually contains php.

  1. function abort($msg, $code=1) {
  2.     echo $msg, PHP_EOL;
  3.     exit($code);
  4. }

The function abort displays $msg and quits the program by calling exit with the return code $code defined to 1 by default.

  1. function usage() {
  2.     global $progname;
  3.  
  4.     abort(sprintf(USAGE, $progname), 1);
  5. }

The function usage displays the correct calling format of the program and quits the the program by calling abort.

  1. if ($argc != 3) {
  2.     usage();
  3. }

Quits the program if the number of arguments isn't correct.

  1. $nchannels=$argv[1];
  2. if (!is_numeric($nchannels) or $nchannels < 1) {
  3.     abort($nchannels . '?');
  4. }
  5.  
  6. $ntasks=$argv[2];
  7. if (!is_numeric($ntasks) or $ntasks < 0) {
  8.     abort($ntasks . '?');
  9. }

Extracts the number of channels and the number of tasks from the command-line and keeps them in the global variables $nchannels and $ntasks. Checks if the parameters are correct. Quits the program in case of error.

  1. declare(ticks = 1);

Necessary for handling signals. The value 1 ensures an immediate treatment.

  1. define('MINTIME', 0.2);
  2. define('MAXTIME', 1.0);
  3.  
  4. define('TIMESLICE', 1);

Defines the minimum duration and the maximum duration of a task. TIMESLICE defines the minimum delay between 2 uses of the same channel. Try the program with MAXTIME set to 2.0. If a task lasts longer than TIMESLICE, the channel used is immediately available.

  1. $feeders=array();
  2. $atwork=array();

$feeders contains for each channel the time when it will be available, i.e. the time of the last execution + TIMESLICE. $atwork contains for each used channel the number of the child process which is mobilizing it.

  1. pcntl_signal(SIGCHLD,   'sig_handler');
  2.  
  3. pcntl_signal(SIGINT,    'sig_handler');
  4. pcntl_signal(SIGTERM,   'sig_handler');
  5.  
  6. pcntl_signal(SIGHUP,    'sig_handler');
  7.  
  8. pcntl_signal(SIGQUIT,   SIG_IGN);

Captures the signals SIGCHLD, SIGINT, SIGTERM and SIGHUP with the function sig_handler. Ignores the signal SIGQUIT.

  1. for (;;) {

The program runs in a loop from which it exits only when all the tasks have been run, in case of error or when an interrupt signal is received.

  1.     for ($channel=0; $channel < $nchannels; $channel++) {

Goes through all the channels.

  1.         if (!$ntasks) {
  2.             break;
  3.         }

If all the tasks have been run, stops going through the channels.

  1.         if (isset($atwork[$channel])) {
  2.             continue;
  3.         }

If the channel is busy, tries the next one.

  1.         if (isset($feeders[$channel]) and $feeders[$channel] > microtime(true)) {
  2.             continue;
  3.         }

If the time of the next allowed use of the channel isn't reached yet, tries the next one.

  1.         $pid = pcntl_fork();

The channel is available. Duplicates the process.

  1.         if ($pid === -1) {
  2.             terminate(2);
  3.         }

If the fork failed, terminates the program with status 2.

  1.         else if ($pid === 0) {
  2.             run_task($channel);
  3.             exit(0);
  4.         }

If the process is the child process, runs a task and terminates the process.

  1.         else {
  2.             $atwork[$channel]=$pid;
  3.  
  4.             $feeders[$channel]=microtime(true)+TIMESLICE;
  5.  
  6.             --$ntasks;
  7.         }
  8.     }

The parent process writes down the number of the child process in $atwork and the next time the channel will be available in $feeders. Decrements the number of tasks to run.

  1.     if (!count($atwork)) {
  2.         if (!$ntasks) {
  3.             terminate();
  4.         }
  5.  
  6.         $dtime=min($feeders)-microtime(true);
  7.         $msecs=ceil($dtime*1000000);
  8.         usleep($msecs);
  9.         continue;
  10.     }

When all the channels have been scanned, if no channel is busy, if all the tasks have be run, terminates the program normally, otherwise blocks the parent process until the time of the next available execution before going through all the channels again.

  1.     $resting=array_diff_key($feeders, array_values($atwork));
  2.     $dtime=$resting ? min($resting)-microtime(true) : 0;

At least one channel is busy. Extracts the list of all the channels which are not busy. Computes the next time a channel will be available.

  1.     if ($dtime > 0) {
  2.         $secs=floor($dtime);
  3.         $nano=(int)(($dtime-$secs)*1000000000);
  4.         @pcntl_sigtimedwait(array(SIGCHLD), $siginfo, $secs, $nano);
  5.     }

If at least one channel is waiting, puts the program on hold until a child process terminates or at the latest until an unused channel is available.

  1.     else {
  2.         @pcntl_sigwaitinfo(array(SIGCHLD));
  3.     }

If all the channels are busy, puts the program on hold until a child process terminates.

  1.     while (($pid = pcntl_wait($status, WNOHANG)) > 0) {
  2.         if (pcntl_wifexited($status)) {
  3.             $retcode = pcntl_wexitstatus($status);
  4.             if ($retcode !== 0) {
  5.                 ;
  6.             }
  7.             $channel=array_search($pid, $atwork);
  8.             unset($atwork[$channel]);
  9.         }
  10.     }
  11. }

Captures in a loop the termination of child processes. If a process ended normally, saves its return code and frees the channel for another possible execution. Jumps back to going through all the channels.

  1. function terminate($code=0) {
  2.     exit($code);
  3. }

The function terminate quits the programs returning $code which is at 0 by default.

  1. function sig_handler($signo) {
  2.     switch ($signo) {
  3.         case SIGCHLD:
  4.             break;
  5.         case SIGHUP:
  6.             break;
  7.         case SIGINT:
  8.         case SIGTERM:
  9.             terminate(1);
  10.         default:
  11.             break;
  12.     }
  13. }

The functions sig_handler handles the signals SIGCHLD, SIGHUP, SIGINT and SIGTERM. SIGCHLD is received when a child process terminates. This event is captured in the main loop of the program. SIGHUP is ignored. SIGINT and SIGTERM terminate the program with status 1.

  1. function run_task($channel) {
  2.     global $ntasks;
  3.  
  4.     $ts=date('h:i:s');
  5.     $secs=rand(MINTIME*10, MAXTIME*10) / 10;
  6.  
  7.     $s=sprintf("%4d %2d %s %0.1f", $ntasks, $channel+1, $ts, $secs);
  8.     echo "$s\n";
  9.  
  10.     $msecs=$secs * 1000000;
  11.     usleep($msecs);
  12. }

The function run_task simulates running a task in a child process. The code computes the duration of the task at random between MINTIME and MAXTIME, displays the task number, the channel number used, the execution time and duration, then blocks the function for the simulated duration of the execution.

Comments

To add a comment, click here.