本篇文章給大家?guī)砹岁P于thinkphp的相關知識,其中主要整理了使用think-queue實現(xiàn)redis消息隊列的相關問題,下面一起來看一下,希望對大家有幫助。

簡單介紹:
消息隊列中間件是大型系統(tǒng)中的重要組件,已經(jīng)逐漸成為企業(yè)系統(tǒng)內部通信的核心手段。它具有松耦合、異步消息、流量削峰、可靠投遞、廣播、流量控制、最終一致性等一系列功能,已經(jīng)成為異步RPC的主要手段之一。
大白話:
消息隊列有兩個角色和一個容器,角色分別為生產(chǎn)者(負責發(fā)布任務)和消費者(負責執(zhí)行任務),容器這是用來存放/堆積生產(chǎn)者發(fā)布的任務,將發(fā)布和執(zhí)行兩個步驟分開且互不影響。
消息隊列的大致流程為:
生產(chǎn)者發(fā)布任務存放/堆積在消息隊列中,由消費者主動去消息隊列中取出任務并執(zhí)行,先發(fā)布的先執(zhí)行(隊列:先進先出),在沒有消費者的情況下任務會堆積在隊列中等待被取出執(zhí)行。
優(yōu)點:
消息隊列適用于大并發(fā)或者處理時間長并需要批量操作的第三方接口,可用于但不僅限于短信發(fā)送、郵件發(fā)送、APP推送等,支持跨系統(tǒng),即本系統(tǒng)發(fā)布的消息隊列可以由自己或者給其他系統(tǒng)執(zhí)行任務,同理本系統(tǒng)也可以作為消費者執(zhí)行自己或者其他系統(tǒng)發(fā)布的消息隊列任務。
接下來主要介紹一下 think-queue 的使用
ThinkPHP的Queue內置了 Redis、Database、Topthink、Sync四種驅動,這里使用的是 Redis,也推薦使用 Redis
think-queue 隊列消息可以進行任務的發(fā)布、獲取、執(zhí)行、刪除、重新發(fā)布、延遲發(fā)布、超時控制等操作
消息隊列基本配置
在 extra 目錄下創(chuàng)建 queue.php 配置文件
<?php return [ 'connector' => 'Redis', 'expire' => null, // 任務過期時間,默認為60秒,若要禁用,則設置為 null 'default' => 'REDIS_QUEUE', // 默認的隊列名 'host' => '127.0.0.1', // redis 主機ip 'port' => 6379, // redis 端口 'password' => '', // redis 密碼 'select' => 0, // 使用哪里一個 db,默認為 db0 'timeout' => 0, // redis 連接的超時時間 'persistent' => false, // 是否是長連接 ];
至于為什么放在這里,是因為 Queue 源代碼默認從 extra 讀取 queue 文件獲取配置信息,如果想要將配置文件放置其他地方,則需要對應去修改源代碼中的默認獲取配置,如下圖所示

生產(chǎn)者
創(chuàng)建一個測試類,寫入生產(chǎn)者方法
<?php
namespace app\api\controller;
use think\Controller;
use think\Queue;
class Test extends Controller{
// 生產(chǎn)者,添加消息隊列
public function addQueue()
{
// 參數(shù)
$data = [
'id' => rand(0, 99),
'userName' => '一起摸魚'
];
// 消息隊列名
$queueName = 'testQueue';
// 推入消息隊列,注意這里的 ::class 是PHP5.5才有的寫法
$isPushed = Queue::push(TestQueue::class, $data, $queueName);
// PHP5.5以下的可以直接寫命名空間
// $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName);
if ($isPushed !== false) {
// 成功之后的業(yè)務
echo '隊列加入成功';
} else {
// 失敗之后的業(yè)務
echo '隊列加入失敗';
}
}
}消費者
創(chuàng)建一個 TestQueue 類,用做消費者,執(zhí)行消息隊列中的任務
<?php
namespace app\common\queue;
use think\Log;
use think\queue\Job;
class TestQueue{
// 消費者執(zhí)行入口
public function fire(Job $job, $data)
{
// 具體執(zhí)行業(yè)務
$isJobDone = $this->doJob($data);
if ($isJobDone) {
// 消息隊列執(zhí)行成功,刪除隊列,否則會一直執(zhí)行
$job->delete();
} else {
// 消息隊列執(zhí)行失敗
// 獲取消息隊列已經(jīng)重試了幾遍
$attempts = $job->attempts();
if ($attempts == 0 || $attempts == 1) {
// 重新發(fā)布,參數(shù) delay 是延時發(fā)布的時間
$job->release(2);
}
}
}
// 消息隊列執(zhí)行失敗后會自動執(zhí)行該方法
public function failed($data)
{
Log::error('消息隊列達到最大重復執(zhí)行次數(shù)后失敗:' . json_encode($data));
}
// 消息隊列執(zhí)行方法
public function doJob($data)
{
// 具體執(zhí)行業(yè)務
$data = json_encode($data);
echo '消息隊列:' . $data;
// 這里的判斷條件以具體業(yè)務是否執(zhí)行成功進行判斷
if ($data) {
echo "執(zhí)行成功";
return true;
} else {
echo "執(zhí)行失敗";
return false;
}
}
}運行結果
請求接口,生產(chǎn)者發(fā)布任務

redis 隊列存放任務

接下來就是啟用隊列的監(jiān)聽模式了,因為不可能每次一有任務加進來就去手動執(zhí)行一次隊列。隊列的監(jiān)聽模式有兩種,配置參數(shù)如下:

項目根目錄執(zhí)行
php think queue:work --queue 隊列名
開啟消費者,執(zhí)行任務

redis 隊列中的任務執(zhí)行后也被刪除

但是由于需要,我們還要將消費者掛起守護進程執(zhí)行,以確保關掉終端還能夠啟動隊列。
nohup php think queue:listen --queue 隊列名 &

PS:shell中輸入exit來退出終端
PS:shell中輸入exit來退出終端
PS:shell中輸入exit來退出終端
因為在nohup執(zhí)行成功后直接點關閉程序按鈕關閉終端時會斷掉該命令所對應的session,導致 nohup 對應的進程被通知需要一起關掉。
至此,整個消息隊列流程就結束了。






