Найти тему
Типичный программист

Как настроить многопоточную обработку сообщений в С++

Оглавление

Одним из методов высокопроизводительной обработки данных является параллельная обработка в нескольких потоках команд (thread-ах).

Несколько потоков способны обработать данные быстрее, чем одним потоком кратно числу потоков. В С++ потоки реализованы нативно, потому поговорим о реализации частной задаче многопоточной обработки на этом языке.

Алексей Чащегоров, DeutscheBank, старший программист

Паттерн «Очередь сообщений»

Очередь сообщений — это довольно часто используемый в системном дизайне патерн. Суть его в наличии контейнера, реализующего принцип «first in — first out» («FIFO»). Сообщения в очередь помещаются в определенном порядке, а затем могут быть в том же порядке оттуда извлечены.

При этом сообщения из очереди должны извлекаться и обрабатываться. Вопрос, поднятый в данной статье — как это лучше реализовать в быстром языке C++ и почему. Далее —  индивидуальное мнение. Комментарии могут дополнить материал.

Периодическая обработка

Самым простым вариантом обработки будет работа одного потока, извлекающего сообщения и осуществляющего обработку в своем теле. Выглядеть код такого обработчика может примерно так:

#include ‹custom_queue.h> // выдуманный хедер
#include ‹thread.h>
#include ‹chrono.h>
void process(Message&& /*msg*/) {
// ваша обработка
}

int main() {
CustomQueue queue(“connector_id”); // подключение к очереди
while(true) {
while (!queue.empty()) {
process(std::move(queue.top()));
queue.pop();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
};

Данный код будет просто выгребать сообщения из очереди с периодичностью в 1 секунду. Минус тут очевиден — если требуется меньшая, чем время обработки сообщения, задержка загрузки сообщений из очереди, то такая реализация не подойдет. В системах с низкой задержкой, вроде высокочастотной торговли или в играх, подойдет другая реализация.

Система обработки, основанная на событиях

Суть приводимого далее кода в обработке сообщений по мере их поступления, а не в результате периодического действия. Далее приведен код.

#include ‹custom_queue.h>
#include ‹atomic>
#include ‹csignal>
#include ‹functional>
#include ‹thread.h>

void processQueue(CustomQueue& queue, std::atomic‹bool>& finishing) {
auto msg_max_timeout = std::chrono::seconds(1);
while(!finishing) {
while (auto msg = queue.top( msg_max_timeout ) ) { if (msg) {
// ваша обработка
queue.pop();
}
}
}
}

static std::atomic‹bool> finishing(false);

void signal_handler(int signal) { // обработчик нажатия “CTRL+C”
finishing = true;
}

int main() {
std::signal(SIGINT, signal_handler);

CustomQueue queue(“connector_id”);
std::vector‹std::thread>> workers(
std::bind(processQueue(queue, finishing)),
10 /* число потоков обработки */
);
for(auto& worker : workers) {
worker.join(); // легальное завершение всех потоков
}
return 0;
}

В данном примере очередь блокируется на «queue.top» до прихода хоть какого-то сообщения, но не более задержки msg_max_timeout. В случае прихода сообщения, один из потоков обработки получит сообщение. Прочие останутся в заблокированном состоянии на строке с «queue.top».

Если в течение времени обработки сообщения одним потоком придет другое сообщение, то следующий поток начинает его обработку.

В данном примере дополнительные требования предъявляются к конструкции очереди:

  • она должна быть потокобезопасна
  • должна реализовывать блокировку группы потоков в своем коде до наступления события или на ограниченное время (нужно для легального завершения программы)

Рассмотрим, как может выглядеть реализация такой очереди:

template ‹class Message>
class CustomQueue {
private:
std::list‹Message> messages;
std::mutex mtx;
std::condition_variable cond;
void push(Message&& msg) {
{
std::unique_lock‹std::mutex> lock(mtx);
messages.emplace_front(msg);
}
cond.notify_one();
}
public:
Message& top(std::chrono::duration timeout=std::chrono::seconds(0)) {
std::unique_lock‹std::mutex> lock(mtx);
auto now = std::chrono::system_clock::now();
while(cond.wait_for(lock, timeout,
[&now, &timeout,](){
return (now + timeout) >
std::chrono::system_clock::now() &&
messages.empty();
}
) // ждем события или таймаута - проверяем spurious wakekup predicate
{
}
return messages.empty() ? Message::default : messages.front();
}
void pop() {
std::unique_lock‹std::mutex> lock(mtx);
if (!messages.empty()) {
messages.pop_back();
}
}
};

Такая очередь будет потокобезопасной, так как внутренний контейнер с сообщениями защищен мьютексом. При вызове метода push (добавление в очередь) произойдет пробуждение лишь одного потока из ожидающих на conditional variable.

Кроме этого, код учитывает ситуацию spurious wakeup (неожиданное пробуждение): сonditional variable::wait может разблокировать поток команд внезапно.

Для контроля в последний аргумент вызова добавлен предикат. Он проверяет наличие сообщений в очереди или превышение таймаута периодически.

Заключение

Если вы работаете с многоядерным сервером, вам наверняка пригодится этот второй метод многопоточной обработки очереди.

Данное улучшение не единственное и не конечное для высокопроизводительной обработки сообщений. Оно показывает возможности многопоточной обработки, и каждый разработчик вправе добавить что-то по своему вкусу.

Наука
7 млн интересуются