略微加速

PHP官方手册 - 互联网笔记

PHP - Manual: Pool

2025-01-20

Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 对象是多个 Worker 对象的容器,同时也是它们的控制器。

线程池是对 Worker 功能的高层抽象,包括按照 pthreads 需要的方式来管理应用的功能。

类摘要

class Pool {
/* 属性 */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* 方法 */
public collect(Callable $collector = ?): int
public __construct(int $size, string $class = ?, array $ctor = ?): Pool
publicresize(int $size): void
publicshutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

属性

size

Pool 对象可容纳的 Worker 对象的最大数量

class

Worker 的类

workers

指向 Worker 对象的引用

ctor

构造新的 Worker 对象时所需的参数

last

最后使用的 Worker 对象在池中的位置偏移量

目录

add a noteadd a note

User Contributed Notes 4 notes

up
3
meadowsjared at gmail dot com
1 year ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
   
protected $complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData) {
       
//transfer all the variables to local variables
       
$this->complete = false;
       
$this->testData = $pData;
    }
   
//This is where all of your work will be done.
   
public function run() {
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete = true;
    }
    public function
isDone() {
        return
$this->complete;
    }
}
class
ExamplePool extends Pool {
    public
$data = array(); // used to return data after we're done
   
private $numTasks = 0; // counter used to know when we're done
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
   
public function submit(Threaded $task) {
       
$this->numTasks++;
       
parent::submit($task);
    }
   
/**
     * used to wait until all workers are done
     */
   
public function process() {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->data) < $this->numTasks) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done, collect its results
               
if ($task->isDone()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete = $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return
$task->isDone();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
5
meadowsjared at gmail dot com
6 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected
$complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData){
       
//transfer all the variables to local variables
       
$this->complete = false;
       
$this->testData = $pData;
    }
   
//This is where all of your work will be done.
   
public function run(){
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete = true;
    }
    public function
isGarbage() {
        return
$this->complete;
    }
}
class
ExamplePool extends Pool
{
    public
$data = array();
    public function
process()
    {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->work)) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done
                // collect its results
               
if ($task->isGarbage()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete = $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return
$task->isGarbage();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
2
olavk
7 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public
$val;

  public function
__construct($val){
   
// init some properties
   
$this->val = $val;
  }
  public function
run(){
   
// do some work
   
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
   
$this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new
job('0'),
  new
job('1'),
  new
job('2'),
  new
job('3'),
  new
job('4'),
  new
job('5'),
  new
job('6'),
  new
job('7'),
  new
job('8'),
  new
job('9'),
  new
job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
 
$p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo
$checkingTask->val;
  return
$checkingTask->isGarbage();
});

?>
up
-5
fajan
8 years ago
Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)
<?php
class Config extends Threaded{    // shared global object
   
protected $val=0, $val2=0;
    protected function
inc(){++$this->val;}    // protected synchronizes by-object
   
public function inc2(){++$this->val2;}    // no synchronization
}
class
WorkerClass extends Worker{
    protected static
$worker_id_next = -1;
    protected
$worker_id;
    protected
$config;
    public function
__construct($config){
       
$this->worker_id = ++static::$worker_id_next;    // static members are not avalable in thread but are in 'main thread'
       
$this->config = $config;
    }
    public function
run(){
        global
$config;
       
$config = $this->config;    // NOTE: setting by reference WON'T work
       
global $worker_id;
       
$worker_id = $this->worker_id;
        echo
"working context {$worker_id} is created!\n";
       
//$this->say_config();    // globally synchronized function.
   
}
    protected function
say_config(){    // 'protected' is synchronized by-object so WON'T work between multiple instances
       
global $config;        // you can use the shared $config object as synchronization source.
       
$config->synchronized(function() use (&$config){    // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized
           
var_dump($config);
        });
    }
}
class
Task extends Stackable{    // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.
   
protected $set;
    public function
__construct($set){
       
$this->set = $set;
    }
    public function
run(){
        global
$worker_id;
        echo
"task is running in {$worker_id}!\n";
       
usleep(mt_rand(1,100)*100);
       
$config = $this->getConfig();
       
$val = $config->arr->shift();
       
$config->arr[] = $this->set;
        for (
$i = 0 ; $i < 1000; ++$i){
           
$config->inc();
           
$config->inc2();
        }
    }
    public function
getConfig(){
        global
$config;    // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)
       
return $config;
    }
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class
PoolClass extends Pool{
    public function
worker_list(){
        if (
$this->workers !== null)
            return
array_keys($this->workers);
        return
null;
    }
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10));    // submitTo DOES NOT try to create worker

$spammed_id = -1;
for (
$i = 1; $i <= 100; ++$i){    // add some jobs
   
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
       
$spammed_id = $x[2];
        echo
"spamming worker {$spammed_id} with lots of tasks from now on\n";
    }
    if (
$spammed_id != -1 && ($i % 5) == 0)    // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )
       
$pool->submitTo($spammed_id,new Task(10*$i));   
    else
       
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less
// also: if you disable the spammer, you'll that the order of the "arr" is random.
?>

官方地址:https://www.php.net/manual/en/class.pool.php

北京半月雨文化科技有限公司.版权所有 京ICP备12026184号-3