



例如 apache, nginx, mysql 都是守护进程


很多程序以服务形式存在,他没有终端或UI交互,它可能采用其他方式与其他程序交互,如TCP/UDP Socket, UNIX Socket, fifo。程序一旦启动便进入后台,直到满足条件他便开始处理任务。


以我当前的需求为例,我需要运行一个程序,然后监听某端口,持续接受服务端发起的数据,然后对数据分析处理,再将结果写入到数据库中; 我采用ZeroMQ实现数据收发。






例 1. 守护进程例示

  1. <?php
  2. class ExampleWorker extends Worker {
  3. #public function __construct(Logging $logger) {
  4. # $this->logger = $logger;
  5. #}
  6. #protected $logger;
  7. protected static $dbh;
  8. public function __construct() {
  9. }
  10. public function run(){
  11. $dbhost = ''; // 数据库服务器
  12. $dbport = 3306;
  13. $dbuser = 'www'; // 数据库用户名
  14. $dbpass = 'qwer123'; // 数据库密码
  15. $dbname = 'example'; // 数据库名
  16. self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(
  19. PDO::ATTR_PERSISTENT => true
  20. )
  21. );
  22. }
  23. protected function getInstance(){
  24. return self::$dbh;
  25. }
  26. }
  27. /* the collectable class implements machinery for Pool::collect */
  28. class Fee extends Stackable {
  29. public function __construct($msg) {
  30. $trades = explode(",", $msg);
  31. $this->data = $trades;
  32. print_r($trades);
  33. }
  34. public function run() {
  35. #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
  36. try {
  37. $dbh = $this->worker->getInstance();
  38. $insert = "INSERT INTO fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";
  39. $sth = $dbh->prepare($insert);
  40. $sth->bindValue(':ticket', $this->data[0]);
  41. $sth->bindValue(':login', $this->data[1]);
  42. $sth->bindValue(':volume', $this->data[2]);
  43. $sth->execute();
  44. $sth = null;
  45. /* ...... */
  46. $update = "UPDATE fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";
  47. $sth = $dbh->prepare($update);
  48. $sth->bindValue(':ticket', $this->data[0]);
  49. $sth->execute();
  50. //echo $sth->queryString;
  51. //$dbh = null;
  52. }
  53. catch(PDOException $e) {
  54. $error = sprintf("%s,%s\n", $mobile, $id );
  55. file_put_contents("mobile_error.log", $error, FILE_APPEND);
  56. }
  57. }
  58. }
  59. class Example {
  60. /* config */
  61. const LISTEN = "tcp://";
  62. const MAXCONN = 100;
  63. const pidfile = __CLASS__;
  64. const uid = 80;
  65. const gid = 80;
  66. protected $pool = NULL;
  67. protected $zmq = NULL;
  68. public function __construct() {
  69. $this->pidfile = '/var/run/'.self::pidfile.'.pid';
  70. }
  71. private function daemon(){
  72. if (file_exists($this->pidfile)) {
  73. echo "The file $this->pidfile exists.\n";
  74. exit();
  75. }
  76. $pid = pcntl_fork();
  77. if ($pid == -1) {
  78. die('could not fork');
  79. } else if ($pid) {
  80. // we are the parent
  81. //pcntl_wait($status); //Protect against Zombie children
  82. exit($pid);
  83. } else {
  84. // we are the child
  85. file_put_contents($this->pidfile, getmypid());
  86. posix_setuid(self::uid);
  87. posix_setgid(self::gid);
  88. return(getmypid());
  89. }
  90. }
  91. private function start(){
  92. $pid = $this->daemon();
  93. $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);
  94. $this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
  95. $this->zmq->bind(self::LISTEN);
  96. /* Loop receiving and echoing back */
  97. while ($message = $this->zmq->recv()) {
  98. //print_r($message);
  99. //if($trades){
  100. $this->pool->submit(new Fee($message));
  101. $this->zmq->send('TRUE');
  102. //}else{
  103. // $this->zmq->send('FALSE');
  104. //}
  105. }
  106. $pool->shutdown();
  107. }
  108. private function stop(){
  109. if (file_exists($this->pidfile)) {
  110. $pid = file_get_contents($this->pidfile);
  111. posix_kill($pid, 9);
  112. unlink($this->pidfile);
  113. }
  114. }
  115. private function help($proc){
  116. printf("%s start | stop | help \n", $proc);
  117. }
  118. public function main($argv){
  119. if(count($argv) < 2){
  120. printf("please input help parameter\n");
  121. exit();
  122. }
  123. if($argv[1] === 'stop'){
  124. $this->stop();
  125. }else if($argv[1] === 'start'){
  126. $this->start();
  127. }else{
  128. $this->help($argv[0]);
  129. } //
  130. }
  131. }
  132. $cgse = new Example();
  133. $cgse->main($argv);

5.1. 程序启动



  1. private function daemon(){
  2. if (file_exists($this->pidfile)) {
  3. echo "The file $this->pidfile exists.\n";
  4. exit();
  5. }
  6. $pid = pcntl_fork();
  7. if ($pid == -1) {
  8. die('could not fork');
  9. } else if ($pid) {
  10. // we are the parent
  11. //pcntl_wait($status); //Protect against Zombie children
  12. exit($pid);
  13. } else {
  14. // we are the child
  15. file_put_contents($this->pidfile, getmypid());
  16. posix_setuid(self::uid);
  17. posix_setgid(self::gid);
  18. return(getmypid());
  19. }
  20. }


5.2. 程序停止

程序停止,只需读取pid文件,然后调用posix_kill($pid, 9); 最后将该文件删除。

  1. private function stop(){
  2. if (file_exists($this->pidfile)) {
  3. $pid = file_get_contents($this->pidfile);
  4. posix_kill($pid, 9);
  5. unlink($this->pidfile);
  6. }
  7. }