複製鏈接
請複製以下鏈接發送給好友

環形緩衝器

鎖定
環形緩衝器(ringr buffer),也稱作圓形隊列(circular queue),循環緩衝區(cyclic buffer),圓形緩衝區(circula buffer),是一種用於表示一個固定尺寸、頭尾相連的緩衝區的數據結構,適合緩存數據流。
中文名
環形緩衝器
外文名
Ring Buffer
別    名
循環緩衝區
圓形緩衝區
類    型
集合、容器

環形緩衝器簡介

在通信程序中,經常使用環形緩衝器作為數據結構來存放通信中發送和接收的數據。環形緩衝區是一個先進先出的循環緩衝區,可以向通信程序提供對緩衝區的互斥訪問。

環形緩衝器用法

圓形緩衝區的一個有用特性是:當一個數據元素被用掉後,其餘數據元素不需要移動其存儲位置。相反,一個非圓形緩衝區(例如一個普通的隊列)在用掉一個數據元素後,其餘數據元素需要向前搬移。換句話説,圓形緩衝區適合實現先進先出緩衝區,而非圓形緩衝區適合後進先出緩衝區。
圓形緩衝區適合於事先明確了緩衝區的最大容量的情形。擴展一個圓形緩衝區的容量,需要搬移其中的數據。因此一個緩衝區如果需要經常調整其容量,用鏈表實現更為合適。
寫操作覆蓋圓形緩衝區中未被處理的數據在某些情況下是允許的。特別是在多媒體處理時。例如,音頻的生產者可以覆蓋掉聲卡尚未來得及處理的音頻數據。

環形緩衝器工作過程

一個圓形緩衝區最初為空並有預定的長度。例如,這是一個具有七個元素空間的圓形緩衝區,其中底部的單線與箭頭表示“頭尾相接”形成一個圓形地址空間:
環形緩衝器 環形緩衝器
假定1被寫入緩衝區中部(對於圓形緩衝區來説,最初的寫入位置在哪裏是無關緊要的):
插入一個數據 插入一個數據
再寫入2個元素,分別是2 & 3 — 被追加在1之後:
添加元素 添加元素
如果兩個元素被處理,那麼是緩衝區中最老的兩個元素被移除。在本例中,1 & 2被移除,緩衝區中只剩下3:
除去兩個元素 除去兩個元素
如果緩衝區中有7個元素,則是滿的:
填滿緩衝區 填滿緩衝區
如果緩衝區是滿的,又要寫入新的數據,一種策略是覆蓋掉最老的數據。此例中,2個新數據— A & B — 寫入,覆蓋了3 & 4:
覆蓋數據 覆蓋數據
也可以採取其他策略,禁止覆蓋緩衝區的數據,採取返回一個錯誤碼或者拋出異常。最終,如果從緩衝區中移除2個數據,不是3 & 4 而是 5 & 6 。因為 A & B 已經覆蓋了3 & 4:
移除數據 移除數據

環形緩衝器圓形緩衝區工作機制

由於計算機內存是線性地址空間 [1]  ,因此圓形緩衝區需要特別的設計才可以從邏輯上實現。

環形緩衝器讀指針與寫指針

一般的,圓形緩衝區需要4個指針 [2] 
  • 在內存中實際開始位置;
  • 在內存中實際結束位置,也可以用緩衝區長度代替;
  • 存儲在緩衝區中的有效數據的開始位置(讀指針);
  • 存儲在緩衝區中的有效數據的結尾位置(寫指針)。
讀指針、寫指針可以用整型值來表示。下例為一個未滿的緩衝區的讀寫指針:
未滿緩衝區 未滿緩衝區
下例為一個滿的緩衝區的讀寫指針:
滿的緩衝區 滿的緩衝區

環形緩衝器區分緩衝區滿或者空

緩衝區是滿、或是空,都有可能出現讀指針與寫指針指向同一位置:
指針在同一位置 指針在同一位置
有多種策略用於檢測緩衝區是滿、或是空.

環形緩衝器總是保持一個存儲單元為空

緩衝區中總是有一個存儲單元保持未使用狀態。緩衝區最多存入
個數據。如果讀寫指針指向同一位置,則緩衝區為空。如果寫指針位於讀指針的相鄰後一個位置,則緩衝區為滿。這種策略的優點是簡單、魯棒;缺點是語義上實際可存數據量與緩衝區容量不一致,測試緩衝區是否滿需要做取餘數計算。

環形緩衝器使用數據計數

這種策略不使用顯式的寫指針,而是保持着緩衝區內存儲的數據的計數。因此測試緩衝區是空是滿非常簡單;對性能影響可以忽略。缺點是讀寫操作都需要修改這個存儲數據計數,對於多線程訪問緩衝區需要併發控制。

環形緩衝器鏡像指示位

緩衝區的長度如果是n,邏輯地址空間則為
;那麼,規定
為鏡像邏輯地址空間。本策略規定讀寫指針的地址空間為
,其中低半部分對應於常規的邏輯地址空間,高半部分對應於鏡像邏輯地址空間。當指針值大於等於
鏡像指示位 鏡像指示位
時,使其折返(wrapped)到。使用一位表示寫指針或讀指針是否進入了虛擬的鏡像存儲區:置位表示進入,不置位表示沒進入還在基本存儲區。
在讀寫指針的值相同情況下,如果二者的指示位相同,説明緩衝區為空;如果二者的指示位不同,説明緩衝區為滿。這種方法優點是測試緩衝區滿/空很簡單;不需要做取餘數操作;讀寫線程可以分別設計專用算法策略,能實現精緻的併發控制。 缺點是讀寫指針各需要額外的一位作為指示位。
如果緩衝區長度是2的冪,則本方法可以省略鏡像指示位。如果讀寫指針的值相等,則緩衝區為空;如果讀寫指針相差n,則緩衝區為滿,這可以用條件表達式(寫指針 == (讀指針 異或 緩衝區長度))來判斷。
// This approach adds one bit to end and start pointers
// Circular buffer object
typedef struct {
    int size; // maximum number of elements
    int start; // index of oldest element
    int end; // index at which to write new element
    ElemType *elems; // vector of elements
} CircularBuffer;

void cbInit(CircularBuffer *cb, int size) {
    cb->size = size;
    cb->start = 0;
    cb->end = 0;
    cb->elems = (ElemType *)calloc(cb->size, sizeof(ElemType));
}

void cbPrint(CircularBuffer *cb) {
    printf("size = 0x%x, start = %d, end = %d\n", cb->size, cb->start, cb->end);
}

int cbIsFull(CircularBuffer *cb) {
// This inverts the most significant bit of start before comparison
    return cb->end == (cb->start ^ cb->size);
int cbIsEmpty(CircularBuffer *cb) {
    return cb->end == cb->start;
}

int cbIncr(CircularBuffer *cb, int p) {
// start and end pointers incrementation is done modulo 2*size

    return (p + 1) & (2 * cb->size - 1); 

void cbWrite(CircularBuffer *cb, ElemType *elem) {
    cb->elems[cb->end & (cb->size - 1)] = *elem;
    if (cbIsFull(cb)) // full, overwrite moves start pointer
        cb->start = cbIncr(cb, cb->start);
    cb->end = cbIncr(cb, cb->end);
}

void cbRead(CircularBuffer *cb, ElemType *elem) {
    *elem = cb->elems[cb->start & (cb->size - 1)];
    cb->start = cbIncr(cb, cb->start);
}

環形緩衝器讀寫計數

用兩個有符號整型變量分別保存寫入、讀出緩衝區的數據數量。其差值就是緩衝區中尚未被處理的有效數據的數量。這種方法的優點是讀線程、寫線程互不干擾;缺點是需要額外兩個變量。

環形緩衝器記錄最後的操作

使用一位記錄最後一次操作是讀還是寫。讀寫指針值相等情況下,如果最後一次操作為寫入,那麼緩衝區是滿的;如果最後一次操作為讀出,那麼緩衝區是空。 這種策略的缺點是讀寫操作共享一個標誌位,多線程時需要併發控制

環形緩衝器POSIX優化實現

#include <sys/mman.h>
#include <stdlib.h>
#include <unistd.h>

#define report_exceptional_condition() abort ()

struct ring_buffer {
    void *address;
    unsigned long count_bytes;
    unsigned long write_offset_bytes;
    unsigned long read_offset_bytes;
};

// Warning order should be at least 12 for Linux
void ring_buffer_create (struct ring_buffer *buffer, unsigned long order) {
    char path[] = "/dev/shm/ring-buffer-XXXXXX";
    int file_descriptor;
    void *address;
    int status;
    file_descriptor = mkstemp(path);
    if (file_descriptor < 0)
        report_exceptional_condition();
    status = unlink(path);
    if (status)
        report_exceptional_condition();
    buffer->count_bytes = 1UL << order;
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
    status = ftruncate(file_descriptor, buffer->count_bytes);
    if (status)
        report_exceptional_condition();
    buffer->address = mmap (NULL, buffer->count_bytes << 1, PROT_NONE,
                            MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
    if (buffer->address == MAP_FAILED)
        report_exceptional_condition();
    address =
        mmap(buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
             MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address)
        report_exceptional_condition();
    address = mmap(buffer->address + buffer->count_bytes,
                   buffer->count_bytes, PROT_READ | PROT_WRITE,
                   MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address + buffer->count_bytes)
        report_exceptional_condition();
    status = close(file_descriptor);
    if (status)
        report_exceptional_condition();
}

void ring_buffer_free(struct ring_buffer *buffer) {
    int status;
    status = munmap(buffer->address, buffer->count_bytes << 1);
    if (status)
        report_exceptional_condition ();
}

void *ring_buffer_write_address(struct ring_buffer *buffer) {
    // void pointer arithmetic is a constraint violation.
    return buffer->address + buffer->write_offset_bytes;
}

void ring_buffer_write_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->write_offset_bytes += count_bytes;
}

void *ring_buffer_read_address(struct ring_buffer *buffer) {
    return buffer->address + buffer->read_offset_bytes;
}

void ring_buffer_read_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->read_offset_bytes += count_bytes;
    if (buffer->read_offset_bytes >= buffer->count_bytes) {
        //如果讀指針大於等於緩衝區長度,那些讀寫指針同時折返回[0, buffer_size]範圍內
        buffer->read_offset_bytes -= buffer->count_bytes;
        buffer->write_offset_bytes -= buffer->count_bytes;
    }
}

unsigned long ring_buffer_count_bytes(struct ring_buffer *buffer) {
    return buffer->write_offset_bytes - buffer->read_offset_bytes;
}

unsigned long ring_buffer_count_free_bytes(struct ring_buffer *buffer) {
    return buffer->count_bytes - ring_buffer_count_bytes (buffer);
}

void ring_buffer_clear(struct ring_buffer *buffer) {
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
}
/*  Note, that initial anonymous mmap() can be avoided - after initial mmap() 
        for descriptor fd, you can try mmap() with hinted address as (buffer->address 
        + buffer->count_bytes) and if it fails - another one with hinted address as 
        (buffer->address - buffer->count_bytes). Make sure MAP_FIXED is not used in 
        such case, as under certain situations it could end with segfault. The 
        advantage of such approach is, that it avoids requirement to map twice the 
        amount you need initially (especially useful e.g. if you want to use 
        hugetlbfs and the allowed amount is limited) and in context of gcc/glibc 
        - you can avoid certain feature macros (MAP_ANONYMOUS usually requires one 
        of: _BSD_SOURCE, _SVID_SOURCE or _GNU_SOURCE). */



環形緩衝器Linux內核的kfifo

Linux內核文件kfifo.h和kfifo.c中,定義了一個先進先出圓形緩衝區實現。如果只有一個讀線程、一個寫線程,二者沒有共享的被修改的控制變量,那麼可以證明這種情況下不需要併發控制。kfifo就滿足上述條件。kfifo要求緩衝區長度必須為2的冪。讀、寫指針分別是無符號整型變量。把讀寫指針變換為緩衝區內的索引值,僅需要“按位與”操作:(指針值 按位與 (緩衝區長度-1))。這避免了計算代價高昂的“求餘”操作。且下述關係總是成立:讀指針 + 緩衝區存儲的數據長度 == 寫指針
即使在寫指針達到了無符號整型的上界,上溢出後寫指針的值小於讀指針的值,上述關係仍然保持成立(這是因為無符號整型加法的性質)。 kfifo的寫操作,首先計算緩衝區中當前可寫入存儲空間的數據長度:len = min{待寫入數據長度, 緩衝區長度 - (寫指針 - 讀指針)}然後,分兩段寫入數據。第一段是從寫指針開始向緩衝區末尾方向;第二段是從緩衝區起始處寫入餘下的可寫入數據,這部分可能數據長度為0即並無實際數據寫入。
參考資料