| queue_id | queue_title | status | start_time | worker | cursor_id | params | runkey | 
|---|
queue_id 队列ID 在运行队列的时候是唯一标识 queue_title 队列标题,直观区分队列 status 队列状态(running/运行中,hibernate/休眠中,paused/已暂停,failure/执行失败) start_time 队列产生时间 worker 在执行队列的时候会根据worker这个字段表示该队列在哪个方法执行 cursor_id 游标--对于一个队列中有多个任务需要用到此字段 params 参数(参数,通常就是filter) runkey 队列运行的Key,通常不需要自定义

https://localhost/ecstore-bugfix/index.php/shopadmin/index.php?ctl=default&act=status
在desktop_ctl_default的status方法中会调
function status(){
    ...
    app::get('base')->model('queue')->flush();
    ...
}
<?php
class base_mdl_queue extends base_db_model{
    var $limit = 100; //最大任务并发
    var $task_timeout = 300; //单次任务超时
    function flush(){
        $base_url = kernel::base_url();
        foreach($this->db->select('select queue_id from sdb_base_queue limit '.$this->limit) as $r){
            $this->runtask($r['queue_id']);
        }
    }
    function runtask($task_id){
        $http = new base_httpclient;
        $_POST['task_id'] = $task_id;
        $url =  kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
        kernel::log('Spawn [Task-'.$task_id.'] '.$url);
        //99%概率不会有问题
        $http->hostaddr = $_SERVER["SERVER_ADDR"]?$_SERVER["SERVER_ADDR"]:'127.0.0.1';
        $http->hostport = $_SERVER["SERVER_PORT"]?$_SERVER["SERVER_PORT"]:'80';
        $http->timeout = 2;
        kernel::log($http->post($url,$_POST));
    }
}
从代码中可以看到在这获取到存放在队列表中的队列任务,进行运行
根据  kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
知道调用了base_service_queue类的worker方法
function worker(){
    ...
    list($worker,$method) = explode('.',$task_info['worker']);
    $errmsg = null;
    $obj_work = kernel::single($worker);
    $remaining = call_user_func_array( array(  $obj_work ,$method),array(&$task_info['cursor_id'],$task_info['params'], &$errmsg));
    ...
}
从以上代码可以看出:执行队列的方法是队列worker字段的值。
eg
'worker'=>'b2c_queue.send_msg',表示在b2c_queue类中的send_msg方法中执行此队列
从worker方法中可以看出,在向执行队列的方法中会传入cursor_id,params两个参数
如果在一个队列中只有一个任务,如上则在执行完一个任务后需要返回一个false,则会删除此队列.
如果有一个队列中有多个任务,则会根据游标(cursor_id)进行完成任务,只要队列还没执行完成,
返回true则会跟新游标(cursor_id)直到返回false,再删除队列
如下是ECOS后台中群发消息的一个队列
在ECOS中每个队列都是放在sdb_base_queue这个队列表中,所以在插入队列的时候需要把队列信息插入到
队列表中即可
//可循环插入队列
$data = array(
    'queue_title'=>app::get('b2c')->_('到货通知站内信'),
    'start_time'=>time(),
    'params'=>array(
                'member_id'=>$data['member_id'],
                'data' =>$aTmp,
                'name' => $login_name,
                'gnotify_id' => $gnid,
        ),
    'worker'=>'b2c_queue.send_msg',
);
if(!$queue->insert($data)){
    $this->end(false,app::get('b2c')->_('操作失败!'));
}
新建b2c_queue类,写send_msg方法来执行此队列
function send_msg(&$cursor_id,$params){
    $obj_memmsg = kernel::single('b2c_message_msg');
    $aData = $params['data'];
    $aData['member_id'] = 1;
    $aData['uname'] = app::get('b2c')->_('管理员');
    $aData['to_id'] = $params['member_id'];
    $aData['msg_to'] = $params['name'];
    $aData['subject'] = $aData['title'];
    $aData['comment'] = $aData['content'];
    $aData['has_sent'] = 'true';
    $obj_memmsg->send($aData);
}
注意执行完一个队列,需要返回一个false