详解Ecstore计划任务的代码逻辑
Ecos的框架是建立在linux的基础上的,在windows上是会缺胳膊少腿的,其中计划任务用的是linux的crontab,之前网上见过不少人碰到ecstore的计划任务执行报错的案例,也给出了一定的解决方法,这其中有些是误打误撞。tiandi认为,一段代码没有执行或者报错,必须得从它的源头查起。
让我们来看看ecos的计划任务到底是怎么运行的:
1. 执行/app/system/controller/admin/crontab.php中的exec方法,执行base_contab_schedule::trigger_once($cron[‘id’])。
function exec($cron_id) {
$this->begin('index.php?app=system&ctl=admin_crontab&act=index');
$model = app::get('base')->model('crontab');
$cron = $model->getRow('id', array('id'=>$cron_id));
if(!$cron || (base_crontab_schedule::trigger_one($cron['id'])===false)) {
$this->end(false, '执行失败');
}
$this->end(true, '执行成功');
}
2. /base/lib/crontab/schedule.php的trigger_once方法,$cron_id为任务类名。执行system_queue::instance()->publish(‘crontab:’.$worker, $worker)后,会打印日志在/data/crontab/下,并且将crontab表该任务的last字段做时间更新。
static public function trigger_one($cron_id){
if ($cron = app::get('base')->model('crontab')->getRow('id, last', array('id' => $cron_id, 'enabled => true'))){
$now = time();
if (($now - $cron['last'])<60) {
trigger_error(app::get('base')->_('1分钟之内不能重复执行'), E_USER_ERROR);
}
//add_task
$worker = $cron['id'];
system_queue::instance()->publish('crontab:'.$worker, $worker);
self::__log($cron_id, $now, 'add queue ok');
app::get('base')->model('crontab')->update(array('last'=>$now), array('id' => $cron_id));
}
}
3. /app/system/lib/queue.php中的publish方法,$queues的值为array(‘slow’),array(‘quick’),array(‘nor mal’)中的一个,然后请求控制器的publish。
public function publish($exchange_name, $worker, $params=array(), $routing_key=null){
$queues = $this->__get_publish_queues($exchange_name);
foreach($queues as $queue_name){
$queue_data = array(
'queue_name' => $queue_name,
'worker' => $worker,
'params' => $params);
$this->get_controller()->publish($queue_name, $queue_data);
}
return true;
}
4. /app/system/lib/queue/adapter/mysql.php的publish方法,向system_queue_mysql表插入一条记录,默认该记录own_thread_id为-1,在dbschema表中定义。
public function publish($queue_name,$queue_data){
$time = time();
$data = array('queue_name' => $queue_data['queue_name'],
'worker' => $queue_data['worker'],
'params' => serialize((array)$queue_data['params']),
'create_time' => $time);
return $this->__model->insert($data);
}
5. 到此点击时间处理完毕,并不会真正的执行计划任务。执行还是靠的linux的cron发起的php请求。linux下设置了两条cron,分别是
* * * * * /srv/www/www.suanjuzi.com/script/queue/queue.sh /opt/php/bin/php >/dev
* * * * * /opt/php/bin/php /srv/www/www.suanjuzi.com/script/crontab/crontab.php
6. 先看上面的第2条/script/crontab/crontab.php,实际上这个文件就主要做了一件事,调用base_crontab_schedule::trigger_all(),和后台点执行调用的trigger_one的代码逻辑差不多,只是这里是触发了所有的该执行的ecstore后台计划任务。
7. 回过来看上面第1条命令,script/queue/queue.sh是执行命令,后面是参数。
打开shell文件,可以看到queuelist是从/config/queuelist.php这个默认定义的文件返回,返回的值为slow,quick,normal,然后通过checkprocess查看当前linux进程是否有存在/script/queue.php slow[quick][normal],在的话显示active,不在的话,请求该php文件。之前计划任务不跑的原因就在这里,作为我们后面加的计划任务,都是属于normal,结果这里进程一直存在,所以就不会再继续再执行normal的计划任务,只会执行原先默认的quick和slow的计划任务。
8. /script/queue.php执行了下面代码
$queue_name = $argv[1];
$queues = system_queue::instance()->get_config('queues');
if ($num = (int)$queues[$queue_name]['thread']) {
system_queue_consumer::instance('proc')->exec($queue_name, $num);
}
9. /app/system/lib/queue/consumer/proc.php中的exec方法,当前线程小于可允许最大值(/config/queuelist.php里定义)时,并且queue_mysql表的own_thread_id为-1时,通过new system_queue_consumer_proc_thread来创建新的线程,
while ($this->threadRunning < $max && !system_queue::instance()->is_end($queue_name)) {
$this->running[] = new system_queue_consumer_proc_thread($queue_name,$phpExec);
usleep(200000);
$this->threadRunning++;
}
10. /app/system/lib/queue/consumer/proc/thread.php,通过proc_open建立新线程,超过2次报错,但是在出问题normal卡住的时候这边也没报错,所以应该跟这个throw无关。
while (($this->resource = proc_open($executable." ".$script." ".$queue_name, $descriptorspec, $this->pipes, NULL, $_ENV))===null) {
$i++;
if ($i>2) {
throw new Exception(' cannot create new proccess for consume queue.', 30001);
}
}
11.proc_open的第一个参数是命令行,调用PHP执行 /script/queue/queuescript.php $queue_name,
if($queue_message = system_queue::instance()->get($queue_name)){
system_queue::instance()->run_task($queue_message);
system_queue::instance()->ack($queue_message);
}
12./app/system/lib/queue.php里的run_task和ack方法。Run_task最终调用的自定义计划任务类的exec方法,而ack则最后删除了数据表里的相应记录。
public function run_task($queue_message){
//todo: 异常处理
$worker = $queue_message->get_worker();
$params = $queue_message->get_params();
$obj_task = new $worker();
if ($obj_task instanceof base_interface_task) {
call_user_func_array(array($obj_task, 'exec'), array($params));
logger::info('task:'. get_class($obj_task). ' exec ok');
}
return true;
}
public function ack($queue_message){
$queue_id = $queue_message->get_id();
return $this->__model->delete(array('id'=>$queue_id));
}