За последние 24 часа нас посетили 18068 программистов и 1678 роботов. Сейчас ищет 1061 программист ...

Выполнение одновременно группы запросов на PHP

Тема в разделе "Решения, алгоритмы", создана пользователем Ensiferum, 3 дек 2010.

  1. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Наткнулся на пост на хабре, название которого меня давно интересовало. Решение, описанное там, не относилось конечно к желаемому, но всё же имеет место быть. Правда мне класс показался не совсем удобным, например нельзя указать функцию для обработки ответа, нет отчета по времени выполнения, не совсем удобно передаются данные да и вообще в коде ошибки есть. Немного переделал решение и выкладываю тут:

    PHP:
    1. <?php
    2. class MultiThreading {
    3.     /**
    4.      * функция для обработки ответа
    5.      *
    6.      * @var string
    7.      * @access private
    8.      */
    9.     private $callback;
    10.     /**
    11.      * Имя сервера
    12.      *
    13.      * @var string
    14.      * @access private
    15.      */
    16.     private $server;
    17.    
    18.     /**
    19.      * Максимальное количество потоков
    20.      *
    21.      * @var int
    22.      * @access private
    23.      */
    24.     private $maxthreads;
    25.    
    26.     /**
    27.      * Имя скрипта, который выполняет нужную нам задачу
    28.      *
    29.      * @var string
    30.      * @access private
    31.      */
    32.     private $scriptname;
    33.    
    34.     /**
    35.      * Параметры, которые мы будем передавать скрипту
    36.      *
    37.      * @var array
    38.      * @access private
    39.      */
    40.     private $params = array();
    41.    
    42.     /**
    43.      * Массив, в котором хранятся потоки
    44.      *
    45.      * @var array
    46.      * @access private
    47.      */
    48.     private $threads = array();
    49.     /**
    50.      * Массив, содержащий время выполнения работы
    51.      *
    52.      * @var array
    53.      * @access private
    54.      */
    55.     private $times = array(0, 0);    
    56.     /**
    57.      * Массив, в котором хранятся результаты
    58.      *
    59.      * @var array
    60.      * @access private
    61.      */
    62.     private $results = array();
    63.    
    64.     /**
    65.      * Конструктор класса. В нем мы указываем максимальное количество потоков и имя сервера. Оба аргумента необязательны.
    66.      *
    67.      * @param int $maxthreads максимальное количество потоков, по умолчанию 10
    68.      * @param string $callback имя функции для обработки ответа
    69.      * @param string $server имя сервера, по умолчанию имя сервера, на котором запущено приложение
    70.      * @access public
    71.      */
    72.     public function __construct($maxthreads = 10, $callback = '', $server = '') {
    73.         $this->server     = ($server)? $server : $_SERVER['SERVER_NAME'];
    74.         $this->maxthreads = $maxthreads;
    75.         if ($callback)
    76.             $this->callback = $callback;
    77.     }
    78.    
    79.     /**
    80.      * Указываем имя скрипта, который выполняет нужную нам задачу
    81.      *
    82.      * @param string $scriptname имя скрипта, включая путь к нему
    83.      * @access public
    84.      */
    85.     public function setScriptName($scriptname) {
    86.         if (!$fp = fopen('http://'.$this->server.'/'.$scriptname, 'r'))
    87.             throw new Exception('Cant open script file');
    88.        
    89.         fclose($fp);
    90.        
    91.         $this->scriptname = $scriptname;
    92.     }
    93.  
    94.     /**
    95.      * Возвращаем время, затраченное на выполнение всей работы
    96.      *
    97.      * @param null
    98.      * @access public
    99.      */
    100.     public function getTime() {
    101.         return round($this->times[1] - $this->times[0]);
    102.     }
    103.    
    104.     /**
    105.      * Задаем параметры, которые мы будем передавать скрипту
    106.      *
    107.      * @param array $params массив параметров
    108.      * @access public
    109.      */
    110.     public function setParams($params = array()) {
    111.         $this->params = $params;
    112.     }
    113.    
    114.     /**
    115.      * Выполняем задачу, комментарии в коде
    116.      *
    117.      * @access public
    118.      */
    119.     public function execute() {
    120.         $this->times[0] = microtime(true);
    121.         // Запускаем механизм, и он работает, пока не выполнятся все потоки
    122.         do {
    123.             // Если не превысили лимит потоков
    124.             if (sizeof($this->threads) < $this->maxthreads) {
    125.                 // Если удается получить следующий набор параметров
    126.                 if ($item = current($this->params)) {
    127.                
    128.                     // Формируем запрос методом GET
    129.                    
    130.                     $query = "GET [url=http://]http://[/url]".$this->server."/".$this->scriptname."?".$item." HTTP/1.0\r\n";
    131.                    
    132.                     // Открыватем соединение
    133.                    
    134.                     if (!$fsock = fsockopen($this->server, 80))
    135.                         throw new Exception('Cant open socket connection');
    136.                
    137.                     fputs($fsock, $query);
    138.                     fputs($fsock, "Host: {$this->server}\r\n");
    139.                     fputs($fsock, "\r\n");
    140.                
    141.                     stream_set_blocking($fsock, 0);
    142.                     stream_set_timeout($fsock, 3600);
    143.                    
    144.                     // Записываем поток
    145.                
    146.                     $this->threads[] = $fsock;
    147.                     $this->results[] = '';
    148.                     // Переходим к следующему элементу
    149.                
    150.                     next($this->params);
    151.                 }
    152.             }
    153.            
    154.             // Перебираем потоки
    155.             foreach ($this->threads as $key=>$value) {
    156.                 // Если поток отработал, закрываем и удаляем, вызываем пользовательскую функцию
    157.                 if (feof($value)) {
    158.                     fclose($value);
    159.                     unset($this->threads[$key]);
    160.                     if ($this->callback)
    161.                         call_user_func($this->callback, $this->results[$key]);
    162.                 } else {
    163.                     // Иначе считываем результаты
    164.                     $this->results[$key] .= fgets($value,4096);
    165.                 }
    166.             }
    167.             usleep(500);
    168.            
    169.         // ... пока не выполнятся все потоки    
    170.         } while (sizeof($this->threads) > 0);
    171.         $this->times[1] = microtime(true);
    172.         return $this->results;
    173.     }
    174. }
    175. ?>
    Всегда с удовольствием выслушаю Ваши замечания.
     
  2. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Ах да, пример использования:

    PHP:
    1. <?
    2. function r_callback($in) {
    3.     echo array_pop(explode(PHP_EOL.PHP_EOL,$in));
    4. }
    5.  
    6. $multithreading = new MultiThreading(20,'r_callback');
    7. try {
    8.     $multithreading->setScriptName('groups/task2secure.php');
    9.     $multithreading->setParams($data);
    10.     $multithreading->execute();
    11. } catch (Exception $e) {
    12. print_r($e);
    13. }
    14.  
    15. echo 'Время выполнения:' . $multithreading->getTime() . ' секунд';
    16. ?>
     
  3. <?=RPG?>

    <?=RPG?> Активный пользователь

    С нами с:
    19 ноя 2010
    Сообщения:
    451
    Симпатии:
    0
    Замечание только одно: РНР не создан для многопоточности. Поэтому она там нафиг сдалась, если нужно писать бота 24/7, то лучше для этого выбрать более подходящие инструменты.

    Автор ссылается на то, что ему нужна многопоточность для рассылки писем. Только вот нужно учитывать, что за такой спам любой хостер заблокирует.
     
  4. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Все прекрасно знают про ограничения в этом плане и каждый вы*бывается по своему, чтобы нужную задачу решить. Замечания нужны именно по коду.
     
  5. <?=RPG?>

    <?=RPG?> Активный пользователь

    С нами с:
    19 ноя 2010
    Сообщения:
    451
    Симпатии:
    0
    Единственное что могу сказать — отсутствие sleep в классе настораживает.
     
  6. [vs]

    [vs] Суперстар
    Команда форума Модератор

    С нами с:
    27 сен 2007
    Сообщения:
    10.559
    Симпатии:
    632
    Есть смысл сделать usleep(1000) - на милисекунду.
     
  7. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Поправил код, добавив задержку
     
  8. [vs]

    [vs] Суперстар
    Команда форума Модератор

    С нами с:
    27 сен 2007
    Сообщения:
    10.559
    Симпатии:
    632
    Ensiferum
    Потестировал на скрипте counter:
    PHP:
    1. <?php
    2. for ($i = $_GET['start']; $i < $_GET['limit']; $i++);
    3. echo $i;
    PHP:
    1. <?php
    2. $th = new MultiThreading(10, 'cleanHeaders', '192.168.1.11');
    3. $th->setScriptName('multi/counter.php');
    4. $param = 'start=1&limit=10000000';
    5. $params = array();
    6. for ($i=0; $i<2; $i++) {
    7.     $params[] = $param;
    8. }
    9. $th->setParams($params);
    10. echo array_sum($th->execute());
    Так вот, мультипочное выполнение всегда было в 2 раза дольше однопоточного - хоть до 20 млн. двумя потоками (5 с. против 2.5), хоть до 100 млн. 10 потоков (25 с. против 12).
    Кстати, задержка практически оптимальная - её увеличение на данном тесте не сказалось, а без нее over 30 секунд, хотя если потоков много, то чем реже они будут впустую перебираться, тем лучше.
    Все так интересует задача, с которой многопоточный режим справится быстрее.
     
  9. <?=RPG?>

    <?=RPG?> Активный пользователь

    С нами с:
    19 ноя 2010
    Сообщения:
    451
    Симпатии:
    0
    Очень просто. Эта задача — скачивание какойнить хрени с чужого сайта (разумеется, с целью копипасты на свой сайт). Правда до сих пор не понимаю на кой тогда люди wget придумали.
     
  10. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Моя задача - выполнение действий, связанных с курлом (авторизация и далее).
     
  11. Koc

    Koc Активный пользователь

    С нами с:
    3 мар 2008
    Сообщения:
    2.253
    Симпатии:
    0
    Адрес:
    \Ukraine\Dnepropetrovsk
  12. Ensiferum

    Ensiferum Активный пользователь

    С нами с:
    11 июл 2010
    Сообщения:
    1.292
    Симпатии:
    0
    Адрес:
    из секты поклонников Нео
    Я знаю про мультикурл и для простых запросов использую rolling curl. Но у этой системы один затык: только когда ВСЯ порция отработает, стартует новая порция, а так, дааа, в несколько потоков. А мне нужно несколько действий курлом совершить (сложная авторизация, кукисы, хеши, пост / гет). Раньше эмитировал многопоточность через AJAX так же ссылаясь на обработчик. Но решение выше при том же количестве потоком работает раз в 10 быстрее.