PHP使用Beanstalkd实例详解

有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: jmsite.cn
  5. * Date: 2019/1/21
  6. * Time: 10:32
  7. */
  8. require "../vendor/autoload.php";
  9. use Pheanstalk\Pheanstalk;
  10. $pheanstalk = new Pheanstalk('192.168.75.135',11300);
  11. print_r($pheanstalk->stats());

生产者代码Producter.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: jmsite.cn
  5. * Date: 2019/1/20
  6. * Time: 16:30
  7. */
  8. require "../vendor/autoload.php";
  9. use Pheanstalk\Pheanstalk;
  10. $pheanstalk = new Pheanstalk('192.168.75.135',11300);
  11. for ($i=0;$i<50;$i++){
  12. $data = array(
  13. 'key' => 'testkey'.$i,
  14. 'value' => 'testvalue',
  15. 'time' => time(),
  16. //phpfensi.com
  17. );
  18. $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
  19. var_dump($ret);
  20. }

消费者代码Consumer.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: jmsite.cn
  5. * Date: 2019/1/20
  6. * Time: 16:31
  7. */
  8. set_time_limit(0);
  9. ini_set('default_socket_timeout', 900);
  10. require "../vendor/autoload.php";
  11. use Pheanstalk\Pheanstalk;
  12. $pheanstalk = new Pheanstalk('192.168.75.135',11300);
  13. while (true){
  14. $job = $pheanstalk
  15. ->watch('test-tube')
  16. ->ignore('default')
  17. ->reserve();
  18. if ($job){
  19. sleep(2);
  20. echo $job->getData();
  21. echo "\n";
  22. $pheanstalk->delete($job);
  23. }
  24. }

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

  1. PS E:\repository\work\beanstalk> php .\Producter.php
  2. int(101)
  3. int(102)
  4. int(103)
  5. int(104)
  6. int(105)
  7. int(106)
  8. int(107)
  9. int(108)
  10. int(109)
  11. int(110)
  12. int(111)
  13. int(112)
  14. int(113)
  15. int(114)
  16. ......

由此可见,$pheanstalk->putInTube成功后返回的是job的id

查看状态

  1. PS E:\repository\work\beanstalk> php Status.php
  2. Pheanstalk\Response\ArrayResponse Object
  3. (
  4. [_name:Pheanstalk\Response\ArrayResponse:private] => OK
  5. [storage:ArrayObject:private] => Array
  6. (
  7. [current-jobs-urgent] => 0
  8. [current-jobs-ready] => 50
  9. [current-jobs-reserved] => 0
  10. [current-jobs-delayed] => 0
  11. [current-jobs-buried] => 0
  12. ......

结果中显示处于ready待读取状态的job是50个

打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争

消费者1

  1. PS E:\repository\work\beanstalk> php .\Consumer.php
  2. {"key":"testkey0","value":"testvalue","time":1548039103}
  3. {"key":"testkey1","value":"testvalue","time":1548039103}
  4. {"key":"testkey2","value":"testvalue","time":1548039103}
  5. {"key":"testkey4","value":"testvalue","time":1548039103}
  6. {"key":"testkey6","value":"testvalue","time":1548039103}
  7. {"key":"testkey8","value":"testvalue","time":1548039103}
  8. {"key":"testkey10","value":"testvalue","time":1548039103}
  9. {"key":"testkey12","value":"testvalue","time":1548039103}
  10. {"key":"testkey14","value":"testvalue","time":1548039103}
  11. {"key":"testkey16","value":"testvalue","time":1548039103}
  12. {"key":"testkey18","value":"testvalue","time":1548039103}
  13. {"key":"testkey20","value":"testvalue","time":1548039103}
  14. {"key":"testkey22","value":"testvalue","time":1548039103}
  15. {"key":"testkey24","value":"testvalue","time":1548039103}
  16. {"key":"testkey26","value":"testvalue","time":1548039103}
  17. {"key":"testkey28","value":"testvalue","time":1548039103}
  18. {"key":"testkey30","value":"testvalue","time":1548039103}
  19. {"key":"testkey32","value":"testvalue","time":1548039103}
  20. {"key":"testkey34","value":"testvalue","time":1548039103}
  21. {"key":"testkey36","value":"testvalue","time":1548039103}
  22. {"key":"testkey38","value":"testvalue","time":1548039103}
  23. {"key":"testkey40","value":"testvalue","time":1548039103}
  24. {"key":"testkey42","value":"testvalue","time":1548039103}
  25. {"key":"testkey44","value":"testvalue","time":1548039103}
  26. {"key":"testkey46","value":"testvalue","time":1548039103}
  27. {"key":"testkey48","value":"testvalue","time":1548039103}

消费者2

  1. PS E:\repository\work\beanstalk> php .\Consumer.php
  2. {"key":"testkey3","value":"testvalue","time":1548039103}
  3. {"key":"testkey5","value":"testvalue","time":1548039103}
  4. {"key":"testkey7","value":"testvalue","time":1548039103}
  5. {"key":"testkey9","value":"testvalue","time":1548039103}
  6. {"key":"testkey11","value":"testvalue","time":1548039103}
  7. {"key":"testkey13","value":"testvalue","time":1548039103}
  8. {"key":"testkey15","value":"testvalue","time":1548039103}
  9. {"key":"testkey17","value":"testvalue","time":1548039103}
  10. {"key":"testkey19","value":"testvalue","time":1548039103}
  11. {"key":"testkey21","value":"testvalue","time":1548039103}
  12. {"key":"testkey23","value":"testvalue","time":1548039103}
  13. {"key":"testkey25","value":"testvalue","time":1548039103}
  14. {"key":"testkey27","value":"testvalue","time":1548039103}
  15. {"key":"testkey29","value":"testvalue","time":1548039103}
  16. {"key":"testkey31","value":"testvalue","time":1548039103}
  17. {"key":"testkey33","value":"testvalue","time":1548039103}
  18. {"key":"testkey35","value":"testvalue","time":1548039103}
  19. {"key":"testkey37","value":"testvalue","time":1548039103}
  20. {"key":"testkey39","value":"testvalue","time":1548039103}
  21. {"key":"testkey41","value":"testvalue","time":1548039103}
  22. {"key":"testkey43","value":"testvalue","time":1548039103}
  23. {"key":"testkey45","value":"testvalue","time":1548039103}
  24. {"key":"testkey47","value":"testvalue","time":1548039103}
  25. {"key":"testkey49","value":"testvalue","time":1548039103}

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象

2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。

判断socket超时的代码

  1. public function getLine($length = null)
  2. {
  3. $timeout = ini_get('default_socket_timeout');
  4. $timer = microtime(true);
  5. do {
  6. $data = isset($length) ?
  7. $this->_wrapper()->fgets($this->_socket, $length) :
  8. $this->_wrapper()->fgets($this->_socket);
  9. if ($this->_wrapper()->feof($this->_socket)) {
  10. throw new Exception\SocketException('Socket closed by server!');
  11. }
  12. if (($data === false) && microtime(true) - $timer > $timeout) {
  13. $this->disconnect();
  14. throw new Exception\SocketException('Socket timed out!');
  15. }
  16. } while ($data === false);
  17. return rtrim($data);
  18. }