本文介紹PacketQueue,相對于FrameQueue來說比較簡單,可以類比Android中的MessageQueue。
PacketQueue總體介紹
- 單向鏈表結構。first_pkt、last_pkt,是鏈表的起點和終點結點;recycle_pkt鏈表用于節點復用;
- 是一個多線程安全隊列,靠等待喚醒機制保證線程安全;
- 當遇到flush_pkt時,serial加1自增,標志著流序列變化,區分是否是連續的流;
typedef struct MyAVPacketList {
AVPacket pkt;
struct MyAVPacketList *next;
int serial;
} MyAVPacketList;
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;
int nb_packets;
int size;
int64_t duration;
int abort_request;
int serial;
SDL_mutex *mutex;
SDL_cond *cond;
MyAVPacketList *recycle_pkt;
int recycle_count;
int alloc_count;
int is_buffer_indicator;
SDL_Profiler videoBufferProfiler;
SDL_Profiler audioBufferProfiler;
void *ffp;
} PacketQueue;
PacketQueue API介紹
packet_queue_init:初始化;
packet_queue_start:啟動隊列,設置abort_request為0,先放一個flush_pkt;
packet_queue_put:存入一個節點,;
packet_queue_put_nullpacket:存入一個空節點;
packet_queue_put_private:存入一個節點,后喚醒packet_queue_get等待鎖;
packet_queue_get:獲取一個節點;
packet_queue_get_or_buffering:去緩沖等待水位后獲取一個節點;
packet_queue_abort:中止,設置abort_request=1后喚醒packet_queue_get等待鎖;
packet_queue_flush:清除隊列內所有的節點;
packet_queue_destroy:銷毀;
初始化
static int packet_queue_init(PacketQueue *q) {
memset(q, 0, sizeof(PacketQueue));
q->mutex = SDL_CreateMutex();
q->cond = SDL_CreateCond();
q->abort_request = 1;
return 0;
}
static void packet_queue_start(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);
SDL_UnlockMutex(q->mutex);
}
put操作
/*
* 存入null結點,eof和error時候存入,表示流結束
*/
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index) {
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {
int ret;
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);
return ret;
}
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {
if (q->abort_request) {
return -1;
}
// 如果有已經回收的就復用該回收的結點,沒有就申請一個;
MyAVPacketList *pkt1 = q->recycle_pkt;
if (pkt1) {
q->recycle_pkt = pkt1->next; // 移動到下一個
q->recycle_count++;
} else {
q->alloc_count++;
pkt1 = av_malloc(sizeof(MyAVPacketList));
}
if (!pkt1) {
return -1;
}
pkt1->pkt = *pkt;
pkt1->next = NULL;
// 遇到flush_pkt就升級serial序列號,標志剛開始或進行了seek
if (pkt == &flush_pkt) {
q->serial++;
}
pkt1->serial = q->serial;
// 賦值first_pkt和last_pkt,定義鏈表的起點和終點;
if (!q->last_pkt) { // 條件判斷同 !q->first_pkt
q->first_pkt = pkt1;
} else {
q->last_pkt->next = pkt1;
}
q->last_pkt = pkt1;
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
/* XXX: should duplicate packet data in DV case */
SDL_CondSignal(q->cond);
return 0;
}
get操作
/*
* block: 是否阻塞
* 返回1表示獲取到了,返回值<=0表示沒獲取到
*/
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial) {
MyAVPacketList *pkt1;
int ret;
// 加鎖
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
pkt1 = q->first_pkt;
if (pkt1) {
q->first_pkt = pkt1->next;
if (!q->first_pkt) {
// 說明只有一個結點
q->last_pkt = NULL;
}
q->nb_packets--;
q->size -= pkt1->pkt.size + sizeof(*pkt1);
q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
*pkt = pkt1->pkt;
if (serial) {
*serial = pkt1->serial;
}
// 把pkt1持有的pkt給出去后進行回收,放到recycle_pkt鏈表頭部
pkt1->next = q->recycle_pkt;
q->recycle_pkt = pkt1;
ret = 1;
break;
} else if (!block) {
ret = 0;
break;
} else {
// wait阻塞,等待put喚醒
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
/*
* 阻塞等待直到退出或者有AVPacket數據
* >= 0 即取到值;
*/
static int packet_queue_get_or_buffering(FFPlayer *ffp, PacketQueue *q, AVPacket *pkt, int *serial,
int *finished) {
if (!ffp->packet_buffering)
return packet_queue_get(q, pkt, 1, serial); // queue為空時會阻塞等待
while (1) {
int new_packet = packet_queue_get(q, pkt, 0, serial); // 非阻塞,直接返回
if (new_packet < 0) {
// abort_request了
return -1;
} else if (new_packet == 0) {
// 隊列為空,去緩沖
if (q->is_buffer_indicator && !*finished) {
ffp_toggle_buffering(ffp, 1);
}
// 再阻塞獲取,等待水位填充滿
new_packet = packet_queue_get(q, pkt, 1, serial);
if (new_packet < 0) {
// abort_request了
return -1;
}
}
if (*finished == *serial) {
av_packet_unref(pkt);
continue;
} else {
break;
}
}
return 1;
}
重置、銷毀操作
// stream_close時第一個調用它,主要是置abort_request為1,阻斷后續所有流程
static void packet_queue_abort(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);
SDL_UnlockMutex(q->mutex);
}
// seek或destory時調用
static void packet_queue_flush(PacketQueue *q) {
SDL_LockMutex(q->mutex);
// 釋放所有pkt
MyAVPacketList *pkt, *pkt1;
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
// 回收,放到鏈表頭部
pkt->next = q->recycle_pkt;
q->recycle_pkt = pkt;
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
// 清空所有pkt,包括recycle_pkt,stream_close處調用
static void packet_queue_destroy(PacketQueue *q) {
packet_queue_flush(q);
SDL_LockMutex(q->mutex);
while (q->recycle_pkt) {
MyAVPacketList *pkt = q->recycle_pkt;
if (pkt)
q->recycle_pkt = pkt->next;
av_freep(&pkt);
}
SDL_UnlockMutex(q->mutex);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}






