bbc使用predis下的redis集群报错EVALSHA解决方案

配置redis集群:

    'connections' => [
        'default' => [
            'servers' => [
                 'tcp://172.16.11.151:9001?alias=master&database=0',
                 'tcp://172.16.11.151:9002?alias=master&database=0',
                 'tcp://172.16.11.151:9003?alias=master&database=0',
                 'tcp://172.16.11.151:9004?alias=slave-01&database=0',
                 'tcp://172.16.11.151:9005?alias=slave-02&database=0',
                 'tcp://172.16.11.151:9006?alias=slave-03&database=0',
            ],
            'options' => [
                'prefix' => 'test:',
                'cluster' => 'redis',
            ],
        ],
    ],

执行sh bbc/script/queue/queue.sh 脚本后报错

[Predis\NotSupportedException]
Cannot use 'EVALSHA' with redis-cluster. 

通过修改以下代码绕开EVALSHA执行队列后正常

diff --git a/app/system/lib/queue/adapter/redis.php b/app/system/lib/queue/adapter/redis.php
index 6a5d3b8..6082d10 100755
--- a/app/system/lib/queue/adapter/redis.php
+++ b/app/system/lib/queue/adapter/redis.php
@@ -53,8 +53,28 @@ class system_queue_adapter_redis implements system_interface_queue_adapter {
 
         $objectRedis = redis::scene('queue');
         $objectRedis->loadScripts('queueGet');
-
-        $queueData = $objectRedis->queueGet($queueName, 'queue:'.$queueName.':reserved', time() + $this->expire);
+    
+        // 使用 Redis 命令逐步执行 Lua 脚本功能
+        $v = $objectRedis->lpop($queueName);
+    
+        if ($v === false || $v === null) {
+            return $v;
+        }
+    
+        $attempts = preg_match('/"attempts\\":(\d+)/', $v, $matches);
+        $newQueueData = $v;
+    
+        if ($attempts !== false) {
+            $attemptsStr = '"attempts\\":' . ($matches[1] + 1);
+            $newQueueData = preg_replace('/("attempts\\":\d+)/', $attemptsStr, $v, 1);
+        }
+    
+        $objectRedis->zadd('queue:'.$queueName.':reserved', time() + $this->expire, $newQueueData);
+    
+        // return $v;
+        $queueData = $v;
+        // echo '获取一个队列任务ID-get返回值:'. $queueData;
+        // $queueData = $objectRedis->queueGet($queueName, 'queue:'.$queueName.':reserved', time() + $this->expire);
         if( ! empty($queueData) )
         {
             return new system_queue_message_redis($this, $queueData, $queueName);
@@ -119,11 +139,21 @@ class system_queue_adapter_redis implements system_interface_queue_adapter {
 
         $objectReids = redis::scene('queue');
         $objectReids->loadScripts('queueMigrate');
-
-        $v = $objectReids->queueMigrate($from, $to, time());
-
-
-        return $v;
+    
+        // 获取队列数据
+        $queueData = $objectReids->zrangebyscore($from, '-inf', time());
+        $rowNum = 0;
+        foreach ($queueData as $value) {
+            // 将数据推入新队列
+            $objectReids->rpush($to, $value);
+            // 从旧队列中移除数据
+            $rowNum += $objectReids->zrem($from, $value);
+        }
+        // echo '将延时队列或者处理超时的队列重新加入到执行队列中-migrateExpiredJobs返回值:'. $rowNum;
+        return $rowNum;
+        
+        // $v = $objectReids->queueMigrate($from, $to, time());
+        // return $v;
     }
 
     /**
@@ -263,3 +293,4 @@ class system_queue_adapter_redis implements system_interface_queue_adapter {
     }
 }
 
+

完整代码参考

<?php


use base_support_arr as Arr;

class system_queue_adapter_redis implements system_interface_queue_adapter {

    /**
     * 创建执行队列的有效时间
     *
     * @var int|null
     */
    protected $expire = 3600;

    /**
     * 创建一个队列任务
     *
     * @param  string  $queueName 队列标示
     * @param  array   $data 执行队列参数
     * @return mixed
     */
    public function publish($queueName, $queueData )
    {
        return $this->pushRaw($this->createPayload($queueData), $queueName);
    }

    /**
     * 将队列保存到redis
     *
     * @param  string  $payload
     * @param  string  $queue
     * @return mixed
     */
    public function pushRaw($payload, $queueName )
    {
        redis::scene('queue')->rpush($queueName, $payload);

        return Arr::get(json_decode($payload, true), 'id');
    }

    /**
     * 获取一个队列任务ID
     *
     * @param string $queueName
     * @return mixed 队列任务数据
     */
    public function get($queueName)
    {
        if (! is_null($this->expire) )
        {
            $this->migrateAllExpiredJobs($queueName);
        }

        $objectRedis = redis::scene('queue');
        $objectRedis->loadScripts('queueGet');
    
        // 使用 Redis 命令逐步执行 Lua 脚本功能
        $v = $objectRedis->lpop($queueName);
    
        if ($v === false || $v === null) {
            return $v;
        }
    
        $attempts = preg_match('/"attempts\\":(\d+)/', $v, $matches);
        $newQueueData = $v;
    
        if ($attempts !== false) {
            $attemptsStr = '"attempts\\":' . ($matches[1] + 1);
            $newQueueData = preg_replace('/("attempts\\":\d+)/', $attemptsStr, $v, 1);
        }
    
        $objectRedis->zadd('queue:'.$queueName.':reserved', time() + $this->expire, $newQueueData);
    
        // return $v;
        $queueData = $v;
        // echo '获取一个队列任务ID-get返回值:'. $queueData;
        // $queueData = $objectRedis->queueGet($queueName, 'queue:'.$queueName.':reserved', time() + $this->expire);
        if( ! empty($queueData) )
        {
            return new system_queue_message_redis($this, $queueData, $queueName);
        }

        return false;
    }

    /**
     * 确认消息已经被消费.
     *
     * @param  string  $queueData
     * @return void
     */
    public function ack($queueName, $queueData)
    {
        return redis::scene('queue')->zrem($queueName.':reserved', $queueData);
    }

    /**
     * 清空一个队列
     *
     * @param string $queue
     */
    public function purge($queueName)
    {
        redis::scene('queue')->ltrim($queueName,-1,0);
    }

    public function is_end($queueName)
    {
        $len = redis::scene('queue')->llen($queueName);
        $reservedLen = redis::scene('queue')->zcount($queueName.':reserved', '-inf', time());
        $delayedLen = redis::scene('queue')->zcount($queueName.':delayed', '-inf', time());
        return ($len > 0 || $reservedLen > 0 || $delayedLen > 0 ) ? false : true;
    }

    /**
     * 将所有延时队列或者处理超时的队列重新加入到队列中
     *
     * @param  string  $queue
     * @return void
     */
    protected function migrateAllExpiredJobs($queueName)
    {
        $this->migrateExpiredJobs($queueName.':delayed', $queueName);

        $this->migrateExpiredJobs($queueName.':reserved', $queueName);
    }

    /**
     * 将延时队列或者处理超时的队列重新加入到执行队列中
     *
     * @param  string  $from
     * @param  string  $to
     * @return void
     */
    public function migrateExpiredJobs($from, $to)
    {
        $from = $from;
        $to = 'queue:'.$to;

        $objectReids = redis::scene('queue');
        $objectReids->loadScripts('queueMigrate');
    
        // 获取队列数据
        $queueData = $objectReids->zrangebyscore($from, '-inf', time());
        $rowNum = 0;
        foreach ($queueData as $value) {
            // 将数据推入新队列
            $objectReids->rpush($to, $value);
            // 从旧队列中移除数据
            $rowNum += $objectReids->zrem($from, $value);
        }
        // echo '将延时队列或者处理超时的队列重新加入到执行队列中-migrateExpiredJobs返回值:'. $rowNum;
        return $rowNum;
        
        // $v = $objectReids->queueMigrate($from, $to, time());
        // return $v;
    }

    /**
     * 获取失效的队列
     *
     * @param  \Predis\Transaction\MultiExec  $transaction
     * @param  string  $from
     * @param  int  $time
     * @return array
     */
    protected function getExpiredJobs($transaction, $from, $time)
    {
        return $transaction->zrangebyscore($from, '-inf', $time);
    }

    /**
     * 删除失效的队列
     *
     * @param  \Predis\Transaction\MultiExec  $transaction
     * @param  string  $from
     * @param  int  $time
     * @return void
     */
    protected function removeExpiredJobs($transaction, $from, $time)
    {
        $transaction->multi();

        $transaction->zremrangebyscore($from, '-inf', $time);
    }

    /**
     *  存储一个新的延时队列
     *
     * @param  int  $delay
     * @param  mixed   $data
     * @param  string  $queue
     * @return mixed
     */
    public function later($delay, $data = '', $queue = null)
    {
        $payload = $this->createPayload($data);

        redis::scene('queue')->zadd($queue.':delayed', time() + (int)$delay, $payload);

        return Arr::get(json_decode($payload, true), 'id');
    }

    /**
     * Release a reserved job back onto the queue.
     *
     * @param  string  $queue
     * @param  string  $payload
     * @param  int  $delay
     * @param  int  $attempts
     * @return void
     */
    public function release($queue, $payload, $delay, $attempts)
    {
        $payload = $this->setMeta($payload, 'attempts', $attempts);

        redis::scene('queue')->zadd($queue.':delayed', time() + $delay, $payload);
    }

    /**
     * 将失效的队列和延时队列加入到执行队列中
     *
     * @param  \Predis\Transaction\MultiExec  $transaction
     * @param  string  $to
     * @param  array  $jobs
     * @return void
     */
    protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
    {
        call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
    }

    /**
     * 创建队列执行参数,数组转为字符串
     *
     * @param  mixed   $data
     * @return string
     */
    protected function createPayload($data = '')
    {
        $payload = $this->setMeta($data, 'id', $this->getRandomId());
        $payload = $this->setMeta($payload, 'create_time', time());

        return $this->setMeta($payload, 'attempts', 0);
    }

    /**
     * Get a random ID string.
     *
     * @return string
     */
    protected function getRandomId()
    {
        return str_random(32);
    }

    /**
     * 在队列执行参数中追加其他参数
     *
     * @param string $payload 队列参数
     * @param string $key
     * @param string $value
     */
    protected function setMeta($payload, $key, $value)
    {
        if( ! is_array($payload) )
        {
            $payload = json_decode($payload, true);
        }

        return json_encode(Arr::set($payload, $key, $value));
    }

    /**
     * Get the expiration time in seconds.
     *
     * @return int|null
     */
    public function getExpire()
    {
        return $this->expire;
    }

    /**
     * Set the expiration time in seconds.
     *
     * @param  int|null  $seconds
     * @return void
     */
    public function setExpire($seconds)
    {
        $this->expire = $seconds;
    }
}