亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本篇文章給大家?guī)?lái)了關(guān)于thinkphp的相關(guān)知識(shí),其中主要介紹了關(guān)于使用think-queue來(lái)實(shí)現(xiàn)普通隊(duì)列延遲隊(duì)列的相關(guān)內(nèi)容,think-queue是thinkphp官方提供的一個(gè)消息隊(duì)列服務(wù),下面一起來(lái)看一下,希望對(duì)大家有幫助。

一起聊聊thinkphp6使用think-queue實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列

###TP6 隊(duì)列

TP6 中使用 think-queue 可以實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列。

think-queue 是thinkphp 官方提供的一個(gè)消息隊(duì)列服務(wù),它支持消息隊(duì)列的一些基本特性:

消息的發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時(shí)控制等

隊(duì)列的多隊(duì)列, 內(nèi)存限制 ,啟動(dòng),停止,守護(hù)等

消息隊(duì)列可降級(jí)為同步執(zhí)行

消息隊(duì)列實(shí)現(xiàn)過(guò)程

1、通過(guò)生產(chǎn)者推送消息到消息隊(duì)列服務(wù)中

2、消息隊(duì)列服務(wù)將收到的消息存入redis隊(duì)列中(zset)

3、消費(fèi)者進(jìn)行監(jiān)聽(tīng)隊(duì)列,當(dāng)監(jiān)聽(tīng)到隊(duì)列有新的消息時(shí),獲取隊(duì)列第一條

4、處理獲取下來(lái)的消息調(diào)用業(yè)務(wù)類進(jìn)行處理相關(guān)業(yè)務(wù)

5、業(yè)務(wù)處理后,需要從隊(duì)列中刪除消息


composer 安裝 think-queue

composer require topthink/think-queue

配置文件

安裝完 think-queue 后會(huì)在 config 目錄中生成 queue.php,這個(gè)文件是隊(duì)列的配置文件。

tp6中提供了多種消息隊(duì)列的實(shí)現(xiàn)方式,默認(rèn)使用sync,我這里選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

創(chuàng)建目錄及隊(duì)列消費(fèi)類文件

在 app 目錄下創(chuàng)建 queue 目錄,然后在該目錄下新建一個(gè)抽象類 Queue.php 文件,作為基礎(chǔ)類

<?php
namespace app\queue;
use think\facade\Cache;
use think\queue\Job;
use think\facade\Log;
/**
 * Class Queue 隊(duì)列消費(fèi)基礎(chǔ)類
 * @package app\queue
 */
abstract class Queue
{
    /**
     * @describe:fire是消息隊(duì)列默認(rèn)調(diào)用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 隊(duì)列無(wú)消息', __CLASS__, __FUNCTION__));
            return ;
        }
 
        $jobId = $job->getJobId(); // 隊(duì)列的數(shù)據(jù)庫(kù)id或者redis key
        // $jobClassName = $job->getName(); // 隊(duì)列對(duì)象類
        // $queueName = $job->getQueue(); // 隊(duì)列名稱
 
        // 如果已經(jīng)執(zhí)行中或者執(zhí)行完成就不再執(zhí)行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }
 
        // 執(zhí)行業(yè)務(wù)處理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 隊(duì)列執(zhí)行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任務(wù)執(zhí)行成功后刪除
            Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
        } else {
            // 檢查任務(wù)重試次數(shù)
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 隊(duì)列執(zhí)行重試次數(shù)超過(guò)3次,執(zhí)行失敗', __CLASS__, __FUNCTION__));
                 // 第1種處理方式:重新發(fā)布任務(wù),該任務(wù)延遲10秒后再執(zhí)行;也可以不指定秒數(shù)立即執(zhí)行
                //$job->release(10); 
                // 第2種處理方式:原任務(wù)的基礎(chǔ)上1分鐘執(zhí)行一次并增加嘗試次數(shù)
                //$job->failed();   
                // 第3種處理方式:刪除任務(wù)
                $job->delete(); // 任務(wù)執(zhí)行后刪除
                Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
            }
        }
    }
 
    /**
     * 消息在到達(dá)消費(fèi)者時(shí)可能已經(jīng)不需要執(zhí)行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任務(wù)執(zhí)行的結(jié)果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查詢r(jià)edis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }
 
    /**
     * @describe: 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
     * @param $data 數(shù)據(jù)
     * @return bool 返回結(jié)果
     */
    abstract protected function execute($data): bool;
}

所有真正的消費(fèi)類繼承基礎(chǔ)抽象類

<?php
namespace app\queue\test;
use app\queue\Queue;
class Test extends Queue
{
    protected function execute($data): bool
    {
       // 具體消費(fèi)業(yè)務(wù)邏輯
    }
}

生產(chǎn)者邏輯

use think\facade\Queue;
 
// 普通隊(duì)列生成調(diào)用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);
 
// 延時(shí)隊(duì)列生成調(diào)用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延時(shí)隊(duì)列 10 秒后執(zhí)行:
Queue::later(10 , Test::class, $data, $queueName);

開(kāi)啟進(jìn)程監(jiān)聽(tīng)任務(wù)并執(zhí)行

php think queue:listen
php think queue:work


命令模式介紹

命令模式

queue:work 命令

work 命令: 該命令將啟動(dòng)一個(gè) work 進(jìn)程來(lái)處理消息隊(duì)列。

php think queue:work --queue TestQueue

queue:listen 命令

listen 命令: 該命令將會(huì)創(chuàng)建一個(gè) listen 父進(jìn)程 ,然后由父進(jìn)程通過(guò) proc_open(‘php think queue:work’) 的方式來(lái)創(chuàng)建一個(gè)work 子 進(jìn)程來(lái)處理消息隊(duì)列,且限制該work進(jìn)程的執(zhí)行時(shí)間。

php think queue:listen --queue TestQueue


命令行參數(shù)

Work 模式

php think queue:work \
--daemon            //是否循環(huán)執(zhí)行,如果不加該參數(shù),則該命令處理完下一個(gè)消息就退出
--queue  helloJobQueue  //要處理的隊(duì)列的名稱
--delay  0 \        //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
--force  \          //系統(tǒng)處于維護(hù)狀態(tài)時(shí)是否仍然處理任務(wù),并未找到相關(guān)說(shuō)明
--memory 128 \      //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
--sleep  3 \        //如果隊(duì)列中無(wú)任務(wù),則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
--tries  2          //如果任務(wù)已經(jīng)超過(guò)嘗試次數(shù)上限,則觸發(fā)‘任務(wù)嘗試次數(shù)超限’事件,默認(rèn)為0

Listen 模式

php think queue:listen \
--queue  helloJobQueue \   //監(jiān)聽(tīng)的隊(duì)列的名稱
--delay  0 \         //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
--memory 128 \       //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
--sleep  3 \         //如果隊(duì)列中無(wú)任務(wù),則多長(zhǎng)時(shí)間后重新檢查,daemon模式下有效
--tries  0 \         //如果任務(wù)已經(jīng)超過(guò)重發(fā)次數(shù)上限,則進(jìn)入失敗處理邏輯,默認(rèn)為0
--timeout 60         //創(chuàng)建的work子進(jìn)程的允許執(zhí)行的最長(zhǎng)時(shí)間,以秒為單位

可以看到 listen 模式下,不包含 --deamon 參數(shù),原因下面會(huì)說(shuō)明

消息隊(duì)列的開(kāi)始,停止與重啟

開(kāi)始一個(gè)消息隊(duì)列:

php think queue:work

停止所有的消息隊(duì)列:

php think queue:restart

重啟所有的消息隊(duì)列:

php think queue:restart 
php think queue:work


分享到:
標(biāo)簽:thinkphp6 think-queue thinkphp普通隊(duì)列 thinkphp延遲隊(duì)列
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定