PHP pthreads v3下同步处理synchronized用法示例

本文实例讲述了PHP pthreads v3下同步处理synchronized用法,分享给大家供大家参考,具体如下:

之所以会用到同步,是因为如果多个线程中对同一个资源进行操作时,会发生混乱。

比如2个线程对变量进行加1操作,第1个线程还没来的及改写数据时,第2个线程就对变量进行操作了,那变量最终的结果就是未知的,这个时候就需要同步来进行控制了。

例子如下:

  1. <?php
  2. class Count extends Thread
  3. {
  4. public $cnt = 0;
  5. public function run()
  6. {
  7. $this->add();
  8. }
  9. public function add()
  10. {
  11. //对成员进行加1操作
  12. for ($i = 0; $i < 100000; $i++) {
  13. ++$this->cnt;
  14. }
  15. }
  16. }
  17. $c = new Count();
  18. //调用start()时,线程run()中就调用了add()方法
  19. $c->start();
  20. //我们人为再调用一次add()方法,这时候就会有两个for循环对$cnt进行操作
  21. $c->add();
  22. //把创建的线程加入主线程中,让主线程等待子线程运行结束
  23. $c->join();
  24. //这里输出就是不确定性的
  25. var_dump($c->cnt);

多次运行后,$cnt的值是不确定的,如下图所示:

PHP pthreads v3下同步处理synchronized用法示例

在pthreads v2中我们可以用Mutex,不过在v3版本中被删除了,所以我们可以简单的把加1操作放到synchronized中进行同步,代码如下:

  1. <?php
  2. class Count extends Thread
  3. {
  4. public $cnt = 0;
  5. public function run()
  6. {
  7. $this->add();
  8. }
  9. public function add()
  10. {
  11. $this->synchronized(function () {
  12. //对成员进行加1操作
  13. for ($i = 0; $i < 100000; $i++) {
  14. ++$this->cnt;
  15. }
  16. });
  17. }
  18. }
  19. $c = new Count();
  20. //调用start()时,线程run()中就调用了add()方法
  21. $c->start();
  22. //我们人为再调用一次add()方法,这时候就会有两个for循环对$cnt进行操作
  23. $c->add();
  24. //把创建的线程加入主线程中,让主线程等待子线程运行结束
  25. $c->join();
  26. //这里就会一直输出200000
  27. var_dump($c->cnt);

结果如下所示:

PHP pthreads v3下同步处理synchronized用法示例

当然我们也可以通过notify()和wait()进行同步控制,代码如下:

  1. <?php
  2. class Task extends Thread
  3. {
  4. public $flag = 1;
  5. public function run()
  6. {
  7. $this->synchronized(function () {
  8. //标识不为1就一直等待
  9. if ($this->flag !== 1) {
  10. $this->wait();
  11. }
  12. for ($i = 1; $i <= 10; $i++) {
  13. echo "flag : {$this->flag} i : {$i} \n";
  14. if ($this->flag === 1) {
  15. //设置标识
  16. $this->flag = 2;
  17. //发送唤醒通知,然后让当前线程等待
  18. //注意,notify()与wait()顺序不要搞错了,不然会一直阻塞
  19. $this->notify();
  20. $this->wait();
  21. }
  22. }
  23. //我们在这里再次调用notify()
  24. //因为在最后一次输出flag : 2 i : 20时,当前线程的i已经变成11了,跳出了for循环,
  25. //但另一个线程则一直阻塞在wait()那里,程序无法结束,所以需要notify()再次唤醒一次
  26. $this->notify();
  27. });
  28. }
  29. }
  30. $t = new Task();
  31. $t->start();
  32. $t->synchronized(function ($obj) {
  33. //标识不为2就一直等待
  34. if ($obj->flag !== 2) {
  35. $obj->wait();
  36. }
  37. for ($i = 11; $i <= 20; $i++) {
  38. echo "flag : {$obj->flag} i : {$i} \n";
  39. if ($obj->flag === 2) {
  40. $obj->flag = 1;
  41. $obj->notify();
  42. $obj->wait();
  43. }
  44. }
  45. }, $t);
  46. //把创建的线程加入主线程中,让主线程等待子线程运行结束
  47. $t->join();

结果如下图所示:

PHP pthreads v3下同步处理synchronized用法示例

我们通过notify()和wait()控制了两个for循环,来回的输出变量i的值,保证了顺序性。

我们再来看一个复杂点的例子,共享的资源,如果不进行同步操作,会出现不可预知的情况,代码如下:

  1. <?php
  2. class Task extends Thread
  3. {
  4. private $name;
  5. private $file;
  6. public function __construct($name, $file)
  7. {
  8. $this->name = $name;
  9. $this->file = $file;
  10. }
  11. public function run()
  12. {
  13. $data = file_get_contents($this->file);
  14. $data = floatval($data);
  15. for ($i = 0; $i < 100000; $i++) {
  16. ++$data;
  17. }
  18. file_put_contents($this->file, $data);
  19. echo "task : {$this->name} data : {$data} \n";
  20. }
  21. }
  22. $tasks = [];
  23. $file = './test.log';
  24. for ($i = 0; $i < 100; $i++) {
  25. $tasks[$i] = new Task($i, $file);
  26. $tasks[$i]->start();
  27. }
  28. for ($i = 0; $i < 100; $i++) {
  29. $tasks[$i]->join();
  30. }

我们开100个线程对文件test.log进行读写,理想状态下,test.log中的数据应该是每次增加10000000的。现在的电脑配置都比较好,大家可以多运行几次就可以看出效果。

PHP pthreads v3下同步处理synchronized用法示例

很明显最后的数据好像少了200000,多线程下对test.log文件进行读写,而我们又没有加锁,显然是会出现数据混乱的。

现在我们修改一下代码,如下:

  1. <?php
  2. class File extends Thread
  3. {
  4. private $file;
  5. public function __construct($file)
  6. {
  7. $this->file = $file;
  8. }
  9. public function inc()
  10. {
  11. //进行同步控制,当100个task线程调用inc方法时,synchronized可以保证块内的代码是同步的
  12. //注意,注意,不要把inc方法写到Task里,那样是没效果的,因为每个task线程都是独立空间,他们各自调各自的inc方法,是没法做到同步的
  13. //常用的做法是我们要同步哪些资源,就为这些资源写个Thread类,并提供操作这些资源的方法,并在方法里加上synchronized
  14. return $this->synchronized(function () {
  15. $data = file_get_contents($this->file);
  16. $data = floatval($data);
  17. for ($i = 0; $i < 100000; $i++) {
  18. ++$data;
  19. }
  20. file_put_contents($this->file, $data);
  21. return $data;
  22. });
  23. }
  24. }
  25. class Task extends Thread
  26. {
  27. private $name;
  28. private $file;
  29. public function __construct($name, $file)
  30. {
  31. $this->name = $name;
  32. $this->file = $file;
  33. }
  34. public function run()
  35. {
  36. $data = $this->file->inc();
  37. echo "task : {$this->name} data : {$data} \n";
  38. }
  39. }
  40. $tasks = [];
  41. $file = new File('./test.log');
  42. for ($i = 0; $i < 100; $i++) {
  43. $tasks[$i] = new Task($i, $file);
  44. $tasks[$i]->start();
  45. }
  46. for ($i = 0; $i < 100; $i++) {
  47. $tasks[$i]->join();
  48. }

结果如下图所示,当然为了保险起见,我们可以试着多运行几次,下面是我运行了25次的结果:

PHP pthreads v3下同步处理synchronized用法示例