队列,很简单的一个东西,但往往就是有那么多的麻烦。
比如PHP发送邮件的时候,如果在用户注册,你是注册的时候发送邮件呢,还是注册成功之后发送呢,很显然,大多数时候都是在注册完成之后发送邮件,除非特殊情况,但是怎么让注册之后直接返回结果而不管是否发送了邮件呢。
这里就需要这样一个东西,单独处理一个队列,一般情况有两种方式来实现,定时执行网页,还有就是使用PHP的cli模式。
首先讨论队列的实现。使用数据库,这点很重要。比如这里我建了这样一个表。
CREATE TABLE IF NOT EXISTS `pitus`.`queue` (
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '队列唯一ID' ,
`time` DATETIME NOT NULL COMMENT '队列创建时间' ,
`up_time` DATETIME NOT NULL COMMENT '队列更新时间' ,
`status` INT NOT NULL DEFAULT 0 COMMENT '该队列的状态' ,
`callback` LONGTEXT NOT NULL COMMENT '队列的回调函数' ,
`param` LONGTEXT NOT NULL COMMENT '回调函数接受的参数' ,
`library` LONGTEXT NOT NULL COMMENT '回调函数需要的类库' ,
`message` VARCHAR(1024) NULL COMMENT '队列执行信息' ,
PRIMARY KEY (`id`) )
ENGINE = InnoDB
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '队列唯一ID' ,
`time` DATETIME NOT NULL COMMENT '队列创建时间' ,
`up_time` DATETIME NOT NULL COMMENT '队列更新时间' ,
`status` INT NOT NULL DEFAULT 0 COMMENT '该队列的状态' ,
`callback` LONGTEXT NOT NULL COMMENT '队列的回调函数' ,
`param` LONGTEXT NOT NULL COMMENT '回调函数接受的参数' ,
`library` LONGTEXT NOT NULL COMMENT '回调函数需要的类库' ,
`message` VARCHAR(1024) NULL COMMENT '队列执行信息' ,
PRIMARY KEY (`id`) )
ENGINE = InnoDB
队列如何来处理呢,简单的可以分为几个方法,还是直接贴代码比较现实,分为一个队列的处理类,和一个接口。每个回调对象必须实现此接口,否则无法调用。
/**
* 队列处理
* Class Queue
* @package ULib
*/
class Queue{
/**
* 开始执行队列
*/
public function run(){
if(substr(php_sapi_name(), 0, 3) === "cli"){
//命令行模式循环等待执行
while(true){
$this->run_list($this->get_list());
sleep(5);
}
} else{
$this->run_list($this->get_list());
}
}
/**
* 执行指定的队列列表
* @param array $list
*/
public function run_list($list){
for($i = 0, $l = count($list); $i < $l; $i++){
$v = $list[$i];
$status = intval($v['status']);
$message = NULL;
try{
//执行回调函数,如果没有返回异常则为成功执行
$this->exec($v['callback'], $v['param'], $v['library']);
$status = 1;
} catch(\Exception $ex){
--$status;
$message = $ex->getMessage();
echo "ERROR:", $message;
}
//更新队列信息
db()->update("queue", [
'status' => $status,
'up_time' => date("Y-m-d H:i:s"),
'message' => $message
], ['id' => $v['id']]);
$list[$i] = NULL;
}
}
/**
* 获取队列
* @return array
*/
private function get_list(){
return db()->select("queue", [
'id',
'callback',
'param',
'library',
'status'
], [
'status[<]' => 1,
'ORDER' => 'up_time DESC'
]);
}
/**
* @param QueueCallback $call 回调类
* @param mixed $param 参数
* @param string $lib Lib名称
*/
public function add($call, $param, $lib){
$time = date("Y-m-d H:i:s");
//新将对应的数据序列化后存储到数据库中
if(db()->insert("queue", [
'time' => $time,
'up_time' => $time,
'callback' => serialize($call),
'param' => serialize($param),
'library' => serialize($lib)
]) < 0
){
//添加错误记录
$this->record_error("Add queue error on sql." . debug("SQL error:" . implode(",", db()->error()['write'])));
}
}
/**
* 记录错误信息
* @param $err
*/
private function record_error($err){
}
/**
* 执行回调
* @param string $callback
* @param string $param
* @param string $library
* @throws \Exception
*/
public function exec($callback, $param, $library){
$lib = @unserialize($library);
//首先加载反序列化所需的类库
if(isset($lib['lib']) && is_array($lib['lib'])){
call_user_func_array([
lib(),
'load'
], $lib['lib']);
}
if(isset($lib['c_lib']) && is_array($lib['c_lib'])){
call_user_func_array([
c_lib(),
'load'
], $lib['c_lib']);
}
/**
* 对回调函数反序列化
* @var QueueCallback $call
*/
$call = @unserialize($callback);
if(!is_object($call)){
//初步判断是否为对象
throw new \Exception("unserialize error");
}
$ref = new \ReflectionClass($call);
if(!in_array("ULib\\QueueCallback", $ref->getInterfaceNames())){
//检测是否为正确的实现了接口
throw new \Exception("callback class error.");
}
//最后执行,并使用对应的参数
@$call->run(@unserialize($param));
}
}
/**
* 队列的接口
* Interface QueueCallback
* @package ULib
*/
interface QueueCallback{
/**
* 执行回调函数
* @param $param
* @return mixed
*/
public function run($param);
}
* 队列处理
* Class Queue
* @package ULib
*/
class Queue{
/**
* 开始执行队列
*/
public function run(){
if(substr(php_sapi_name(), 0, 3) === "cli"){
//命令行模式循环等待执行
while(true){
$this->run_list($this->get_list());
sleep(5);
}
} else{
$this->run_list($this->get_list());
}
}
/**
* 执行指定的队列列表
* @param array $list
*/
public function run_list($list){
for($i = 0, $l = count($list); $i < $l; $i++){
$v = $list[$i];
$status = intval($v['status']);
$message = NULL;
try{
//执行回调函数,如果没有返回异常则为成功执行
$this->exec($v['callback'], $v['param'], $v['library']);
$status = 1;
} catch(\Exception $ex){
--$status;
$message = $ex->getMessage();
echo "ERROR:", $message;
}
//更新队列信息
db()->update("queue", [
'status' => $status,
'up_time' => date("Y-m-d H:i:s"),
'message' => $message
], ['id' => $v['id']]);
$list[$i] = NULL;
}
}
/**
* 获取队列
* @return array
*/
private function get_list(){
return db()->select("queue", [
'id',
'callback',
'param',
'library',
'status'
], [
'status[<]' => 1,
'ORDER' => 'up_time DESC'
]);
}
/**
* @param QueueCallback $call 回调类
* @param mixed $param 参数
* @param string $lib Lib名称
*/
public function add($call, $param, $lib){
$time = date("Y-m-d H:i:s");
//新将对应的数据序列化后存储到数据库中
if(db()->insert("queue", [
'time' => $time,
'up_time' => $time,
'callback' => serialize($call),
'param' => serialize($param),
'library' => serialize($lib)
]) < 0
){
//添加错误记录
$this->record_error("Add queue error on sql." . debug("SQL error:" . implode(",", db()->error()['write'])));
}
}
/**
* 记录错误信息
* @param $err
*/
private function record_error($err){
}
/**
* 执行回调
* @param string $callback
* @param string $param
* @param string $library
* @throws \Exception
*/
public function exec($callback, $param, $library){
$lib = @unserialize($library);
//首先加载反序列化所需的类库
if(isset($lib['lib']) && is_array($lib['lib'])){
call_user_func_array([
lib(),
'load'
], $lib['lib']);
}
if(isset($lib['c_lib']) && is_array($lib['c_lib'])){
call_user_func_array([
c_lib(),
'load'
], $lib['c_lib']);
}
/**
* 对回调函数反序列化
* @var QueueCallback $call
*/
$call = @unserialize($callback);
if(!is_object($call)){
//初步判断是否为对象
throw new \Exception("unserialize error");
}
$ref = new \ReflectionClass($call);
if(!in_array("ULib\\QueueCallback", $ref->getInterfaceNames())){
//检测是否为正确的实现了接口
throw new \Exception("callback class error.");
}
//最后执行,并使用对应的参数
@$call->run(@unserialize($param));
}
}
/**
* 队列的接口
* Interface QueueCallback
* @package ULib
*/
interface QueueCallback{
/**
* 执行回调函数
* @param $param
* @return mixed
*/
public function run($param);
}
这样使用回调对象的一个好处就是可以使队列处理的内容扩大,而不仅仅限于邮件的处理,还比如一些其他耗时的操作,当然这里也可以更改为多线程处理队列,如果你需要的话。
最后就是如何实现队列的处理,必须有一个前提就是,同一个队列不能同时有多个线程去处理。这里需要用到一个其他的东西,文件锁,这个实现起来相对容易,而已跨平台性好。如果使用信号那么windows下就不行,代码如下。某些时候一个比较特殊的操作可能也遇得到吧。
<?php
$lock_file = __DIR__ . "/config/queue.lock";
$fp = fopen($lock_file, 'w');//写模式打开,文件不存在直接创建
if(!flock($fp, LOCK_EX | LOCK_NB)){
//如果当前文件无法锁定,表示被其他进程锁定,所以结束执行
//LOCK_EX为独享锁,LOCK_NB为非阻塞
fclose($fp);
die("Queue must be a single run.\n");
} else{
echo "LOCK\n";
}
set_time_limit(0);
require_once("sys/config.php");
cfg()->load('config/all.php'); //加载其他配置文件
lib()->load('Queue', 'Hook');
$hook = new \ULib\Hook();
if(db()->status()){
$queue = new \ULib\Queue();
$queue->run();
} else{
echo("Cannot connect to the database.");
}
flock($fp, LOCK_UN);
fclose($fp);
?>
$lock_file = __DIR__ . "/config/queue.lock";
$fp = fopen($lock_file, 'w');//写模式打开,文件不存在直接创建
if(!flock($fp, LOCK_EX | LOCK_NB)){
//如果当前文件无法锁定,表示被其他进程锁定,所以结束执行
//LOCK_EX为独享锁,LOCK_NB为非阻塞
fclose($fp);
die("Queue must be a single run.\n");
} else{
echo "LOCK\n";
}
set_time_limit(0);
require_once("sys/config.php");
cfg()->load('config/all.php'); //加载其他配置文件
lib()->load('Queue', 'Hook');
$hook = new \ULib\Hook();
if(db()->status()){
$queue = new \ULib\Queue();
$queue->run();
} else{
echo("Cannot connect to the database.");
}
flock($fp, LOCK_UN);
fclose($fp);
?>
拜读。
用数据库不太好吧
一年前技术能力有限,当时只能想到这
一直想学,但翻了几页书 又放弃了。今天拜读了
怎么说呢,这种方式的实现也就只是能用
围观了!!
额。围观,好吧。
学习了以后总会用得上。
是的,好好学。
唔,都没看懂,新手一枚。
没关系,慢慢学吧
看不懂,感觉不会再爱了
难道我写的很复杂么?
这是要成为高手的节奏啊。
或许有那么一天吧。高手还是有点距离的
看来都是写代码高手啊,我真心不会,继续慢慢学
慢慢学吧,我这水平也就一般
居然没有使用过。
并不是说有的系统都会用到这个,我之前也没用过。
不会php
,不会看看就好。