本篇文章給大家?guī)?lái)了關(guān)于thinkphp的相關(guān)知識(shí),其中主要介紹了關(guān)于使用think-queue來(lái)實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列的相關(guān)內(nèi)容,think-queue是thinkphp官方提供的一個(gè)消息隊(duì)列服務(wù),下面一起來(lái)看一下,希望對(duì)大家有幫助。

###TP6 隊(duì)列
TP6 中使用 think-queue 可以實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列。
think-queue 是thinkphp 官方提供的一個(gè)消息隊(duì)列服務(wù),它支持消息隊(duì)列的一些基本特性:
消息的發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時(shí)控制等
隊(duì)列的多隊(duì)列, 內(nèi)存限制 ,啟動(dòng),停止,守護(hù)等
消息隊(duì)列可降級(jí)為同步執(zhí)行
消息隊(duì)列實(shí)現(xiàn)過(guò)程
1、通過(guò)生產(chǎn)者推送消息到消息隊(duì)列服務(wù)中
2、消息隊(duì)列服務(wù)將收到的消息存入redis隊(duì)列中(zset)
3、消費(fèi)者進(jìn)行監(jiān)聽(tīng)隊(duì)列,當(dāng)監(jiān)聽(tīng)到隊(duì)列有新的消息時(shí),獲取隊(duì)列第一條
4、處理獲取下來(lái)的消息調(diào)用業(yè)務(wù)類進(jìn)行處理相關(guān)業(yè)務(wù)
5、業(yè)務(wù)處理后,需要從隊(duì)列中刪除消息
composer 安裝 think-queue
composer require topthink/think-queue
配置文件
安裝完 think-queue 后會(huì)在 config 目錄中生成 queue.php,這個(gè)文件是隊(duì)列的配置文件。
tp6中提供了多種消息隊(duì)列的實(shí)現(xiàn)方式,默認(rèn)使用sync,我這里選擇使用Redis。
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('redis.host', '127.0.0.1'),
'port' => env('redis.port', '6379'),
'password' => env('redis.password','123456'),
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];創(chuàng)建目錄及隊(duì)列消費(fèi)類文件
在 app 目錄下創(chuàng)建 queue 目錄,然后在該目錄下新建一個(gè)抽象類 Queue.php 文件,作為基礎(chǔ)類
<?php
namespace app\queue;
use think\facade\Cache;
use think\queue\Job;
use think\facade\Log;
/**
* Class Queue 隊(duì)列消費(fèi)基礎(chǔ)類
* @package app\queue
*/
abstract class Queue
{
/**
* @describe:fire是消息隊(duì)列默認(rèn)調(diào)用的方法
* @param \think\queue\Job $job
* @param $message
*/
public function fire(Job $job, $data)
{
if (empty($data)) {
Log::error(sprintf('[%s][%s] 隊(duì)列無(wú)消息', __CLASS__, __FUNCTION__));
return ;
}
$jobId = $job->getJobId(); // 隊(duì)列的數(shù)據(jù)庫(kù)id或者redis key
// $jobClassName = $job->getName(); // 隊(duì)列對(duì)象類
// $queueName = $job->getQueue(); // 隊(duì)列名稱
// 如果已經(jīng)執(zhí)行中或者執(zhí)行完成就不再執(zhí)行了
if (!$this->checkJob($jobId, $data)) {
$job->delete();
Cache::store('redis')->delete($jobId);
return ;
}
// 執(zhí)行業(yè)務(wù)處理
if ($this->execute($data)) {
Log::record(sprintf('[%s][%s] 隊(duì)列執(zhí)行成功', __CLASS__, __FUNCTION__));
$job->delete(); // 任務(wù)執(zhí)行成功后刪除
Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
} else {
// 檢查任務(wù)重試次數(shù)
if ($job->attempts() > 3) {
Log::error(sprintf('[%s][%s] 隊(duì)列執(zhí)行重試次數(shù)超過(guò)3次,執(zhí)行失敗', __CLASS__, __FUNCTION__));
// 第1種處理方式:重新發(fā)布任務(wù),該任務(wù)延遲10秒后再執(zhí)行;也可以不指定秒數(shù)立即執(zhí)行
//$job->release(10);
// 第2種處理方式:原任務(wù)的基礎(chǔ)上1分鐘執(zhí)行一次并增加嘗試次數(shù)
//$job->failed();
// 第3種處理方式:刪除任務(wù)
$job->delete(); // 任務(wù)執(zhí)行后刪除
Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
}
}
}
/**
* 消息在到達(dá)消費(fèi)者時(shí)可能已經(jīng)不需要執(zhí)行了
* @param string $jobId
* @param $message
* @return bool 任務(wù)執(zhí)行的結(jié)果
* @throws \Psr\SimpleCache\InvalidArgumentException
*/
protected function checkJob(string $jobId, $message): bool
{
// 查詢r(jià)edis
$data = Cache::store('redis')->get($jobId);
if (!empty($data)) {
return false;
}
Cache::store('redis')->set($jobId, $message);
return true;
}
/**
* @describe: 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
* @param $data 數(shù)據(jù)
* @return bool 返回結(jié)果
*/
abstract protected function execute($data): bool;
}所有真正的消費(fèi)類繼承基礎(chǔ)抽象類
<?php
namespace app\queue\test;
use app\queue\Queue;
class Test extends Queue
{
protected function execute($data): bool
{
// 具體消費(fèi)業(yè)務(wù)邏輯
}
}生產(chǎn)者邏輯
use think\facade\Queue; // 普通隊(duì)列生成調(diào)用方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName); // 延時(shí)隊(duì)列生成調(diào)用方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延時(shí)隊(duì)列 10 秒后執(zhí)行: Queue::later(10 , Test::class, $data, $queueName);
開(kāi)啟進(jìn)程監(jiān)聽(tīng)任務(wù)并執(zhí)行
php think queue:listen php think queue:work
命令模式介紹
命令模式
queue:work 命令
work 命令: 該命令將啟動(dòng)一個(gè) work 進(jìn)程來(lái)處理消息隊(duì)列。
php think queue:work --queue TestQueue
queue:listen 命令
listen 命令: 該命令將會(huì)創(chuàng)建一個(gè) listen 父進(jìn)程 ,然后由父進(jìn)程通過(guò) proc_open(‘php think queue:work’) 的方式來(lái)創(chuàng)建一個(gè)work 子 進(jìn)程來(lái)處理消息隊(duì)列,且限制該work進(jìn)程的執(zhí)行時(shí)間。
php think queue:listen --queue TestQueue
命令行參數(shù)
Work 模式
php think queue:work \ --daemon //是否循環(huán)執(zhí)行,如果不加該參數(shù),則該命令處理完下一個(gè)消息就退出 --queue helloJobQueue //要處理的隊(duì)列的名稱 --delay 0 \ //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0 --force \ //系統(tǒng)處于維護(hù)狀態(tài)時(shí)是否仍然處理任務(wù),并未找到相關(guān)說(shuō)明 --memory 128 \ //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位 --sleep 3 \ //如果隊(duì)列中無(wú)任務(wù),則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任務(wù)已經(jīng)超過(guò)嘗試次數(shù)上限,則觸發(fā)‘任務(wù)嘗試次數(shù)超限’事件,默認(rèn)為0
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //監(jiān)聽(tīng)的隊(duì)列的名稱 --delay 0 \ //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0 --memory 128 \ //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位 --sleep 3 \ //如果隊(duì)列中無(wú)任務(wù),則多長(zhǎng)時(shí)間后重新檢查,daemon模式下有效 --tries 0 \ //如果任務(wù)已經(jīng)超過(guò)重發(fā)次數(shù)上限,則進(jìn)入失敗處理邏輯,默認(rèn)為0 --timeout 60 //創(chuàng)建的work子進(jìn)程的允許執(zhí)行的最長(zhǎng)時(shí)間,以秒為單位
可以看到 listen 模式下,不包含 --deamon 參數(shù),原因下面會(huì)說(shuō)明
消息隊(duì)列的開(kāi)始,停止與重啟
開(kāi)始一個(gè)消息隊(duì)列:
php think queue:work
停止所有的消息隊(duì)列:
php think queue:restart
重啟所有的消息隊(duì)列:
php think queue:restart php think queue:work






