Добавить в корзинуПозвонить
Найти в Дзене
Timeweb Cloud

Очередь сообщений в Redis: как реализовать

Redis (Remote Dictionary Service) — это база данных с открытым исходным кодом, которая, в отличие от множества других СУБД, хранит данные в оперативной памяти в формате «ключ — значение». Размещение данных в оперативной памяти ускоряет работу БД, отчего Redis используют для хранения информации с ограниченным сроком жизни (потому что к такой информации, как правило, требуется частый доступ): кэша backend-приложений, переменных игрового сервера или очереди сообщений связанных между собой сервисов. В этой статье мы поговорим о последнем варианте использования Redis — создании и обработке очереди сообщений с помощью брокера сообщений. Система с подобным функционалом работает примерно так: При этом каждый сервис равноправен по отношению к другим — может как отправлять сообщения, так и принимать. Таким образом брокер сообщений (который является сервисом сам по себе) становится своего рода посредником (медиатором) в общении между несколькими сервисами. Для реализации брокера сообщений в Redis
Оглавление

Redis (Remote Dictionary Service) — это база данных с открытым исходным кодом, которая, в отличие от множества других СУБД, хранит данные в оперативной памяти в формате «ключ — значение».

Размещение данных в оперативной памяти ускоряет работу БД, отчего Redis используют для хранения информации с ограниченным сроком жизни (потому что к такой информации, как правило, требуется частый доступ): кэша backend-приложений, переменных игрового сервера или очереди сообщений связанных между собой сервисов.

В этой статье мы поговорим о последнем варианте использования Redis — создании и обработке очереди сообщений с помощью брокера сообщений.

Система с подобным функционалом работает примерно так:

  • Некий сервис отправляет сообщение (или несколько сообщений) брокеру, который в свою очередь хранит его в оперативной памяти с помощью базы данных Redis, тем самым формируя очередь сообщений.
  • Другие сервисы по необходимости извлекают сообщения из очереди и выполняют их обработку.

При этом каждый сервис равноправен по отношению к другим — может как отправлять сообщения, так и принимать.

Таким образом брокер сообщений (который является сервисом сам по себе) становится своего рода посредником (медиатором) в общении между несколькими сервисами.

Для реализации брокера сообщений в Redis есть набор встроенных методов со своими особенностями:

  • «Pub/Sub» (Публикация/Подписка). Некий сервис отправляет в очередь свое сообщение, после чего его могут обработать другие сервисы, заранее подписавшиеся на эту очередь сообщений. Если на очередь никто не подписан, то отправленное сервисом сообщение не сохранится и будет утеряно.
  • «List» (Список). Очередь, работающая по принципу FIFO (первым пришел — первым ушел). Некий сервис отправляет в очередь свое сообщение, после его может обработать только один сервис, заранее подписавшийся на эту очередь сообщений. Если на очередь никто не подписан, тоотправленное сервисом сообщение аналогично не сохранится и будет утеряно.
  • «Stream» (Поток). Метод похож на Pub/Sub, однако дает гарантию на доставку сообщения какому-либо сервису. То есть если на очередь никто не подписан, то отправленное сервисом сообщение останется в очереди до тех пор, пока его не прочитает другой сервис.

Перечисленные методы — низкоуровневые функции обработки ввода и вывода данных в Redis. То есть, они могут применяться для реализации логики самых разных механик, одной из которых является брокер сообщений.

В этой статье мы рассмотрим реализацию очереди сообщений в Redis с использованием каждого из перечисленных методов на основе облачной базы данных Timeweb Cloud.

Управление базой данных будет выполняться с помощью скриптов на языке программирования Python версии 3.10.12, интерпретатор которого будет запущен на операционной системе Ubuntu 22.04.

Подготовка базы данных Redis в Timeweb Cloud

Сначала нужно авторизоваться в панели управления Timeweb Cloud, после чего перейти на страницу «Базы данных» через боковое меню.

На открывшейся странице может быть одна из следующих кнопок:

  • Если созданных баз данных еще нет, то отображается кнопка «Создать»
  • Если созданные базы данных уже существуют, то отображается кнопка «Добавить»

Необходимо нажать на любую из них, после чего будет выполнен переход на страницу конфигурации базы данных.

Раздел «Базы данных»
Раздел «Базы данных»

Самый главный параметр — тип базы данных. Необходимо выбрать «Redis» версии 6. Все остальные настройки можно оставить как есть — по крайней мере, в рамках этого руководства.

Создание базы данных Redis
Создание базы данных Redis

После завершения конфигурации жмем кнопку «Заказать». Откроется страница управления базой данных. Справа внизу будет блок с информацией для удаленного подключения к БД по SSH-соединению.

Подготовка системы

Обновление системы

Перед переходом к созданию приложения на Python важно обновить репозитории и пакеты операционной системы:

sudo apt update

sudo apt upgrade -y

Установка Python

В системе должен быть установлен Python. Проверить его наличие можно запросив версию интерпретатора:

python --version

Если Python уже есть внутри системы, в консоли появится номер версии:

Python 3.10.12

В случае, если в системе Python отсутствует, его следует установить вручную через менеджер пакетов APT:

sudo apt install -y python3

Установка виртуальной среды Python

Для работы приложения нужна виртуальная среда Python. Она тоже устанавливается вручную:

sudo apt install python3-venv -y

Создание рабочего каталога

Для примеров из этого руководства мы создадим отдельный каталог:

mkdir queue

После этого перейдем в него:

cd queue

Активация виртуальной среды

Внутри ранее созданного каталога необходимо активировать виртуальную среду Python:

python -m venv venv

Если по итогу выполнения команды в консольном терминале не появилось ни одного сообщения, то виртуальная среда была успешно создана.

Можно проверить состояние файловой системы:

ls

В рабочем каталоге появится специальная директория виртуальной среды:

venv

После создания виртуальной среды закономерно выполняется ее активация — в противном случае установленные впоследствии зависимости не будут работать при выполнении приложения, вызывая ошибку.

Виртуальная среда активируется через запуск соответствующего скрипта:

source ./venv/bin/activate

Установка пакетного менеджера Pip

Для работы с Redis необходим соответствующий модуль, который отвечает за подключение и управление сервером Redis.

Чтобы установить этот модуль, сперва необходимо установить пакетный менеджер, через который модуль будет загружен. В данном случае этот pip:

sudo apt install python3-pip -y

Убедиться в корректности установки пакетного менеджера можно через запрос его версии:

pip -V

В консоли появится вывод с версией pip:

pip 22.0.2 from /root/queue/venv/lib/python3.10/site-packages/pip (python 3.10)

Как видно, в этом руководстве используется pip версии 22.0.2.

Установка модуля Redis

Теперь можно установить сам модуль для управления сервером Redis:

pip install redis

Написание приложения Python

Очередь с помощью Pub/Sub

В рабочем каталоге создадим скрипт, который будет обрабатывать очередь сообщений:

sudo nano pubsub_consumer.py

Код скрипта будет таким:

import redis # важно подключить ранее установленный с помощью Pip модуль для управления сервером базы данных Redis

import time # модуль времени помогает в создании циклов обработки очереди сообщений

# подключаемся к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", нужно указать # root-пароль сервера Redis

port=6379, # стандартный порт сервера Redis

db=0,

decode_responses=True # этот параметр необходим, чтобы ответы сервера Redis автоматически преобразовывались в человеко-читаемый вид

)

queue = connection.pubsub() # создаем очередь типа Pub/Sub

queue.subscribe("first_channel", "second_channel") # активируем прослушивание каналов с перечисленными именами

# бесконечный цикл, который последовательно обрабатывает все приходящие сообщения

while True:

time.sleep(0.01) # небольшой интервал обработки

msg = queue.get_message() # достаем сообщение

if msg: # если сообщение имеет содержимое

if not isinstance(msg["data"], int): # проверяем, чтобы полученные данные не имели тип числа перед выводом в консоль

print(msg["data"])

Теперь создадим скрипт, отправляющий последовательность сообщений в очередь:

sudo nano pubsub_producer.py

Код скрипта будет следующим:

import redis

# подключение к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

connection.publish('first_channel', 'Данные в первый канал') # это сообщение помещается в первый канал

connection.publish('second_channel', 'Данные во второй канал') # это сообщение помещается во второй канал

Сначала нужно запустить обработчик очереди сообщений в уже открытой консоли:

python pubsub_consumer.py

А после необходимо открыть второй консольный терминал и перейти в ранее созданный каталог проекта:

cd queue

Снова выполняем активацию среды:

source ./venv/bin/activate

После чего запускаем отправителя сообщений:

python pubsub_producer.py

В первом терминале появится такой вывод:

Данные в первый канал

Данные во второй канал

Очередь с помощью List

Аналогичную по функционалу очередь можно реализовать с помощью List.

Давайте сперва создадим скрипт обработчика сообщений:

sudo nano list_consumer.py

Его код будет таким:

import redis

import random

import time

# подключение к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

# бесконечный цикл, который последовательно обрабатывает все приходящие сообщения

while True:

time.sleep(0.01) # небольшой интервал обработки

if connection.llen("listQueue") != 0: # если длина очередь сообщений больше нуля

msg = connection.rpop("listQueue") # извлекаем новое сообщение

if msg: # если сообщение содержит данные

print(msg) # выводим сообщение в консоль

Теперь реализуем файл отправителя:

sudo nano list_producer.py

Содержимое будет такое:

import redis

import random

# подключение к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

# отправка 3-х сообщений

for i in range(0,3):

connection.lpush("listQueue", "Сообщение №" + str(random.randint(0, 100)))

Сперва запустим скрипт отправителя сообщений:

python list_producer.py

А уже после после этого выполним обработку очереди сообщений другим скриптом:

python list_consumer.py

Консольный вывод будет примерно следующим:

Сообщение №12

Сообщение №45

Сообщение №86

Очередь с помощью Stream

Несколько отличную логику очереди сообщений можно реализовать с помощью так называемых «Потоков».

Потоки управляются следующими командами:

  • XADD. Позволяет добавить новую запись в поток.
  • XREAD. Позволяет прочитать записи из поток.
  • XRANGE. Позволяет извлечь несколько записей.
  • XLEN. Позволяет узнать длину потока.

Аналогично предыдущим примерам, создадим скрипт отправителя сообщений:

sudo nano stream_producer.py

Код внутри следующий:

import redis

import random

# подключение к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

# помещаем в поток 3 сообщения

for i in range(0,3):

connection.xadd("stream_queue", { "data":"Сообщение №" + str(random.randint(0, 100))})

print("Длина очереди: " + str(connection.xlen("stream_queue"))) # выводим в консоль длину потока

Теперь реализуем скрипт обработчика сообщений:

sudo nano stream_consumer.py

Код внутри такой:

import redis

import random

# подключение к серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

len = connection.xlen("stream_queue") # узнаем длину потока

if len > 0:

messages = connection.xread(count=len, streams={"stream_queue":0}) # получаем весь список сообщений в потоке

# перебираем список сообщений

for msg in messages:

print(msg) # выводим сообщение в консоль

Сперва запускаем скрипт отправителя сообщений:

python stream_producer.py

В консоли должен появиться соответствующий вывод, сообщающий длину потока:

Длина очереди: 3

Далее запускаем скрипт обработчика сообщений:

python stream_consumer.py

Консольный вывод будет примерно таким:

[stream_queue, [('1711712995031-0', {'data': 'Сообщение №74'}), ('1711712995033-0', {'data': 'Сообщение №54'})]]

Можно заметить, что сообщения внутри потоков обладают уникальным идентификатором, который автоматически присваивает сам Redis.

Проблема приведенного примера в том, что при каждом чтении потока извлекаются одни и те же сообщения.

Поэтому код чтения можно доработать, читая только новые сообщения внутри потока:

import redis

import random

# подключение к удаленному серверу Redis

connection = redis.Redis(

host="IP", # нужно указать IP-адрес сервера Redis

password="PASSWORD", # нужно указать root-пароль сервера Redis

port=6379,

db=0,

decode_responses=True

)

# потребуется переменная внутри базы данных Redis, которая будет содержать идентификатор последнего сообщения в качестве значения

if connection.get("last") == None: # проверяем существование этой переменной на тот случай, если код обработчика запускается повторно

connection.set("last", 0) # по умолчанию значением будет 0

len = connection.xlen("stream_queue") # получаем длину потока

if len > 0: # если внутри потока есть сообщения

messages = connection.xread(count=len, block=1000, streams={"stream_queue":connection.get("last")}) # в момент чтения передаем ID последнего сообщения в качестве аргумента (либо 0)

print(connection.get("last")) # для наглядности показываем в консоли идентификатор последнего сообщения или 0, если его еще нет

# обрабатываем все новые сообщения

for msg in messages:

print(msg) # показываем данные сообщения в терминале

connection.set("last", msg[-1][-1][0]) # обновляем идентификатор последнего обработанного сообщения

Благодаря обновленному коду повторный запуск этого скрипта будет обрабатывать только новые сообщения.

Заключение

Использование базы данных Redis в качестве брокера сообщений возможно благодаря трем вариантам создания очереди сообщений:

  • Pub/Sub (Публикация/Подписка)
  • List (Список)
  • Stream (Поток)

Все эти способы основаны на встроенных функциях управления данными — с каждой можно ознакомиться подробнее в официальной документации Redis:

В этом руководстве были рассмотрены лишь базовые реализации брокера сообщений с использованием этих функций. В реальных проектах их необходимо доработать под конкретную реализацию серверной инфраструктуры.