Trang

Thursday, April 3, 2014

Cài đặt Blocking Queue với C++11

Giới thiệu:
Blocking Queue là một Queue, nhưng có thể sử dụng bởi nhiều luồng (thread) khác nhau. Khi Blocking Queue là rỗng, một thread muốn lấy (pop) dữ liệu từ nó phải bị block cho đến có một thread khác đưa dữ liệu (push) vào. Khi Queue đầy, một thread muốn đưa dữ liệu vào cũng phải bị block cho đến khi một thread khác lấy dữ liệu ra khỏi nó. (Tham khảo : http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html)

Blocking Queue là kiểu dữ liệu rất thích hợp để giải quyết bài toán kinh điển Producer-Consumer. Tuy nhiên ở C 11, queue có sẵn của nó không phải là blocking queue cho nên ta sẽ phải tự cài đặt. Ở đây ta sẽ cài đặt queue có sức chứa vô hạn (miễn là bộ nhớ còn đáp ứng) nên chỉ cần phải chờ đợi khi queue rỗng.

Prototype
class blocking_queue về cơ bản sẽ như sau:


template >
class blocking_queue {
typedef typename _Sequence::value_type value_type;

// Yêu cầi value_type phải cùng kiểu với _Tp 
static_assert(std::is_same<_Sequence_value_type, _Tp>::value, 
"require _Sequence::value_type is the same as _Tp");
public:
typedef typename _Sequence::reference reference;
typedef typename _Sequence::const_reference const_reference;
typedef typename _Sequence::size_type size_type;
typedef _Sequence container_type;

private:
_Sequence c;

public:
blocking_queue() = default; // phương thức khởi tạo mặc định
blocking_queue(const _Sequence& __c) = delete; // Không sử dụng phương thức khởi tạo sao chép
blocking_queue& operator=(blocking_queue const&) = delete; // không sử dụng toán tử gán

bool empty() const; // Kiểm tra queue rỗng hay không
size_type size() const; // Kích thước hiện tại

void push(const_reference __x); // Thêm x vào cuối queue
void push(value_type&& __x); // Thêm x vào cuối queue
template void emplace(_Args&&... __args); // Thêm phần tử vào cuối queue, khởi tạo ngay tại chỗ

value_type take(reference __x); // Trả về giá trị phần tử đầu tiên đồng thời xoá nó ra khỏi queue

void clear(); // xoá hết tất cả các phần tử
};
Về cơ bản thì class blocking_queue giống với std::queue, nhưng khác ở chỗ: thay vì có phương thức pop() (xoá phần tử đầu) và front() (lấy giá trị phần tử đầu) như ở queue thì ở blocking_queue ta dùng phương thức take() để làm cả hai việc trên.

Như vậy ở std::queue, để lấy một phần tử và xoá nó ra khỏi queue thì ta làm như sau:

auto x = Queue.front();
Queue.pop();
Còn ở blocking_queue:

auto x = Queue.take();

Lý do của sự khác nhau này là:
- std::queue được thiết kể để sự dụng bởi 1 thread, nên sau khi 1 thread lấy giá trị các phần tử đầu/cuối, nếu thread đó không thay đổi queue thì các giá trị đầu/cuối queue vẫn chính là các giá trị mà ta đã lấy ra.
- với blocking_queue, nếu như ta lấy và xoá phần tử đầu theo cách của std::queue: gọi 2 phương thức front(), pop() riêng biệt ở thread A thì có thể xảy ra trường hợp: ở thread B ta cũng gọi front() vào pop(), trong đó front() của B xảy ra trước pop() của A. Khi đó, 2 thread A và B cùng lấy được 1 giá trị nhưng Queue thì bị xoá đi 2 phần tử.

http://diendan.congdongcviet.com/attachment.php?attachmentid=9681&stc=1&d=1340431301

ngoài ra còn có nhiều tình huống nữa có thể dẫn đến kết quá không như ý muốn.
Để giải quyết vấn đề trên, ta phải đưa hai thao tác này vào một phương thức và áp dụng các biện pháp điều độ vào phương thức này.

Cài đặt
Để đảm bảo chỉ có duy nhất 1 thread được truy cập vào blocking_queue trong 1 thời điểm, ta sử dụng mutex


...
class blocking_queue {

private:
std::mutex _mutex;
...
};


- Các phương thức size(), empty(), clear():

...
classs blocking_queue {
...
private:
_Sequence c;
std::mutex _mutex;
public:
size_type size() const {
std::unique_lock lock(_mutex);
return c.size();
}

bool empty() const {
std::unique_lock lock(_mutex);
return c.empty();
}

void clear() {
std::unique_lock lock(_mutex);
c.clear();
}
...
};

std::unique_lock là một wrapper của std::mutex, constructor nhận đối số là một đối tượng kiểu std::mutex và tự động lock đối tượng std::mutex đó bằng phương thức lock(). Destructor gọi đến phương thức unlock() của mutex. Nhờ đó, khi thoát khỏi các phương thức size(), empty(), _mutex sẽ được tự động mở khoá.

- phương thức push(), emplace(), take():

class blocking_queue {
...
private:
std::condition_variable _cond;
public:
void push(const value_type& __x) { 
std::unique_lock lock(_mutex);
c.push_back(__x); 
lock.unlock();
_cond.notify_one();
}

void push(value_type&& __x) { 
std::unique_lock lock(_mutex);
c.push_back(std::move(__x));
lock.unlock();
_cond.notify_one();
}

template
void emplace(_Args&&... __args)

std::unique_lock lock(_mutex);
c.emplace_back(std::forward<_Args>(__args)...); 
_cond.notify_one();
}

value_type take() {
std::unique_lock lock(_mutex);
_cond.wait(lock);
value_type x = std::move(c.front());
c.pop_front();
return x;
}
};


Ở đây, ta sử dụng condition_variable (biến điều kiện) để cài đặt việc chờ đợi (ở phương thức take()) và thông báo (ở phương thức push(), emplace()).
- Ở phương thức take(), câu lệnh
_cond.wait(lock);

mở khoá lock (tức là mở khoá _mutex) và chờ đợi cho đến khi một thread khác gọi notify_one() hoặc notify_all() đánh thức. Sau đó, lock lại bị khoá và thread thực hiện các câu lệnh tiếp theo

- Ở phương thức push(), emplace(), câu lệnh:
_cond.notify_one();
được thực hiện sau khi đưa một phần tử vào cuối Queue (Queue lúc này chắc chắn không rỗng), có tác dụng thông báo (đánh thức) cho 1 trong các thread đang chờ (nếu có).

Mã hoàn chỉnh:

// ================================================== ===================================
// 
// Filename: blocking_queue.hh
// 
// Description: 
// 
// Version: 1.0
// Created: 06/19/2012 08:03:03 PM
// Revision: none
// Compiler: g 
// 
// Author: BOSS14420, boss14420[at]gmail[dot]com
// Company: 
// 
// ================================================== ===================================

#include 
#include 
#include 
#include 
#include 

template >
class blocking_queue
{
typedef typename _Sequence::value_type _Sequence_value_type;

static_assert(std::is_same<_Sequence_value_type, _Tp>::value, 
"require _Sequence::value_type is the same as _Tp");

private:
_Sequence c;
std::mutex _mutex;
std::condition_variable _cond;

public:
typedef typename _Sequence::value_type value_type;
typedef typename _Sequence::reference reference;
typedef typename _Sequence::const_reference const_reference;
typedef typename _Sequence::size_type size_type;
typedef _Sequence container_type;

public:
blocking_queue() = default;
blocking_queue(const _Sequence& __c) = delete;
blocking_queue& operator=(blocking_queue const&) = delete;

void push(const value_type& __x) { 
std::unique_lock lock(_mutex);
c.push_back(__x); 
lock.unlock();
_cond.notify_one();
}

void push(value_type&& __x) { 
std::unique_lock lock(_mutex);
c.push_back(std::move(__x));
lock.unlock();
_cond.notify_one();
}

template
void emplace(_Args&&... __args)

std::unique_lock lock(_mutex);
c.emplace_back(std::forward<_Args>(__args)...); 
_cond.notify_one();
}

value_type take() {
std::unique_lock lock(_mutex);
_cond.wait(lock);
value_type x = std::move(c.front());
c.pop_front();
return x;
}

void clear() {
std::unique_lock lock(_mutex);
c.clear();
}
};

Lưu ý:

Ở các phương thức push(), emplace(), thay vì chỉ đánh thức 1 thread, có thể đánh thức tất cả các thread đang chờ đợi bằng phương thức notify_all();
_cond.notify_all(); 
Khi đó, phải sửa đoạn mã chờ đợi ở phương thức take() thành:
_cond.wait(lock, [&] () { return !c.empty(); } );
// hoặc:
while(c.empty()) _cond.wait(lock);
Nếu thời gian block một thread là nhỏ so với thời gian context-switch của hệ điều hành thì sử dụng blocking-queue là không hiệu quả. Trong trường hợp này ta có thể thay thế mutex bằng spin-lock.


Giải bài toàn Producer - Consumer bằng Blocking Queue:
Ở đây, ta có 2 Producer, một "sản xuất" ra số lẻ, một sản xuất ra số chẵn. Và 3 Consumer nhận các số được sản xuất từ Producer và in ra. 5 thread này chạy cùng lúc. Chương trình kết thúc khi cả 3 Consumer nhận được số -1.

// ================================================== ===================================
// 
// Filename: blocking_queue-example.cc
// 
// Description: 
// 
// Version: 1.0
// Created: 06/22/2012 02:17:13 PM
// Revision: none
// Compiler: g 
// 
// Author: BOSS14420, boss14420[at]gmail[dot]com
// Company: 
// 
// ================================================== ===================================

#include "blocking_queue.hh"
#include 
#include 

blocking_queue bq;
std::random_device rd;

void csm() {
int i = 0;
while(i != -1) {
std::cout << "Thread " << std::this_thread::get_id() 
<< ", consume integer " << (i = bq.take()) << std::endl;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " exit! ";
}

void odd_producer() {
int i = 0, sleep_time;
while(i != 1) {
sleep_time = rd() % 3 1;
i = (rd() % 20) * 2 1;
std::cout << "Producer thread " << std::this_thread::get_id()
<< ", product integer " << i << std::endl;
bq.push(i); // push lvalue
std::this_thread::sleep_for(std::chrono::seconds(s leep_time));
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " exit! ";
}

void even_producer() {
int i = 0, sleep_time;
while(i != 1) {
sleep_time = rd() % 3 1;
i = (rd() % 20) * 2 1;
std::cout << "Producer thread " << std::this_thread::get_id()
<< ", product integer " << i-1 << std::endl;
bq.push(i-1); // push rvalue reference
std::this_thread::sleep_for(std::chrono::seconds(s leep_time));
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " exit! ";
}

int main() {
std::thread p1(odd_producer), p2(even_producer);
std::thread c1(csm), c2(csm), c3(csm);

p1.join(), p2.join();
bq.push(-1),bq.push(-1), bq.push(-1);
c1.join(), c2.join(), c3.join();

return 0;
}


Lần đầu gửi bài, nếu có sai sót gì xin được góp ý.

No comments:

Post a Comment