Redis (Remote Dictionary Service) — это база данных с открытым исходным кодом, которая, в отличие от множества других СУБД, хранит данные в оперативной памяти в формате «ключ — значение».
Размещение данных в оперативной памяти ускоряет работу БД, отчего Redis используют для хранения информации с ограниченным сроком жизни (потому что к такой информации, как правило, требуется частый доступ): кэша backend-приложений, переменных игрового сервера или очереди сообщений связанных между собой сервисов.
В этой статье мы поговорим о последнем варианте использования Redis — создании и обработке очереди сообщений с помощью брокера сообщений.
Система с подобным функционалом работает примерно так:
- Некий сервис отправляет сообщение (или несколько сообщений) брокеру, который в свою очередь хранит его в оперативной памяти с помощью базы данных Redis, тем самым формируя очередь сообщений.
- Другие сервисы по необходимости извлекают сообщения из очереди и выполняют их обработку.
При этом каждый сервис равноправен по отношению к другим — может как отправлять сообщения, так и принимать.
Таким образом брокер сообщений (который является сервисом сам по себе) становится своего рода посредником (медиатором) в общении между несколькими сервисами.
Для реализации брокера сообщений в Redis есть набор встроенных методов со своими особенностями:
- «Pub/Sub» (Публикация/Подписка). Некий сервис отправляет в очередь свое сообщение, после чего его могут обработать другие сервисы, заранее подписавшиеся на эту очередь сообщений. Если на очередь никто не подписан, то отправленное сервисом сообщение не сохранится и будет утеряно.
- «List» (Список). Очередь, работающая по принципу FIFO (первым пришел — первым ушел). Некий сервис отправляет в очередь свое сообщение, после его может обработать только один сервис, заранее подписавшийся на эту очередь сообщений. Если на очередь никто не подписан, тоотправленное сервисом сообщение аналогично не сохранится и будет утеряно.
- «Stream» (Поток). Метод похож на Pub/Sub, однако дает гарантию на доставку сообщения какому-либо сервису. То есть если на очередь никто не подписан, то отправленное сервисом сообщение останется в очереди до тех пор, пока его не прочитает другой сервис.
Перечисленные методы — низкоуровневые функции обработки ввода и вывода данных в Redis. То есть, они могут применяться для реализации логики самых разных механик, одной из которых является брокер сообщений.
В этой статье мы рассмотрим реализацию очереди сообщений в Redis с использованием каждого из перечисленных методов на основе облачной базы данных Timeweb Cloud.
Управление базой данных будет выполняться с помощью скриптов на языке программирования Python версии 3.10.12, интерпретатор которого будет запущен на операционной системе Ubuntu 22.04.
Подготовка базы данных Redis в Timeweb Cloud
Сначала нужно авторизоваться в панели управления Timeweb Cloud, после чего перейти на страницу «Базы данных» через боковое меню.
На открывшейся странице может быть одна из следующих кнопок:
- Если созданных баз данных еще нет, то отображается кнопка «Создать»
- Если созданные базы данных уже существуют, то отображается кнопка «Добавить»
Необходимо нажать на любую из них, после чего будет выполнен переход на страницу конфигурации базы данных.
Самый главный параметр — тип базы данных. Необходимо выбрать «Redis» версии 6. Все остальные настройки можно оставить как есть — по крайней мере, в рамках этого руководства.
После завершения конфигурации жмем кнопку «Заказать». Откроется страница управления базой данных. Справа внизу будет блок с информацией для удаленного подключения к БД по SSH-соединению.
Подготовка системы
Обновление системы
Перед переходом к созданию приложения на Python важно обновить репозитории и пакеты операционной системы:
sudo apt update
sudo apt upgrade -y
Установка Python
В системе должен быть установлен Python. Проверить его наличие можно запросив версию интерпретатора:
python --version
Если Python уже есть внутри системы, в консоли появится номер версии:
Python 3.10.12
В случае, если в системе Python отсутствует, его следует установить вручную через менеджер пакетов APT:
sudo apt install -y python3
Установка виртуальной среды Python
Для работы приложения нужна виртуальная среда Python. Она тоже устанавливается вручную:
sudo apt install python3-venv -y
Создание рабочего каталога
Для примеров из этого руководства мы создадим отдельный каталог:
mkdir queue
После этого перейдем в него:
cd queue
Активация виртуальной среды
Внутри ранее созданного каталога необходимо активировать виртуальную среду Python:
python -m venv venv
Если по итогу выполнения команды в консольном терминале не появилось ни одного сообщения, то виртуальная среда была успешно создана.
Можно проверить состояние файловой системы:
ls
В рабочем каталоге появится специальная директория виртуальной среды:
venv
После создания виртуальной среды закономерно выполняется ее активация — в противном случае установленные впоследствии зависимости не будут работать при выполнении приложения, вызывая ошибку.
Виртуальная среда активируется через запуск соответствующего скрипта:
source ./venv/bin/activate
Установка пакетного менеджера Pip
Для работы с Redis необходим соответствующий модуль, который отвечает за подключение и управление сервером Redis.
Чтобы установить этот модуль, сперва необходимо установить пакетный менеджер, через который модуль будет загружен. В данном случае этот pip:
sudo apt install python3-pip -y
Убедиться в корректности установки пакетного менеджера можно через запрос его версии:
pip -V
В консоли появится вывод с версией pip:
pip 22.0.2 from /root/queue/venv/lib/python3.10/site-packages/pip (python 3.10)
Как видно, в этом руководстве используется pip версии 22.0.2.
Установка модуля Redis
Теперь можно установить сам модуль для управления сервером Redis:
pip install redis
Написание приложения Python
Очередь с помощью Pub/Sub
В рабочем каталоге создадим скрипт, который будет обрабатывать очередь сообщений:
sudo nano pubsub_consumer.py
Код скрипта будет таким:
import redis # важно подключить ранее установленный с помощью Pip модуль для управления сервером базы данных Redis
import time # модуль времени помогает в создании циклов обработки очереди сообщений
# подключаемся к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", нужно указать # root-пароль сервера Redis
port=6379, # стандартный порт сервера Redis
db=0,
decode_responses=True # этот параметр необходим, чтобы ответы сервера Redis автоматически преобразовывались в человеко-читаемый вид
)
queue = connection.pubsub() # создаем очередь типа Pub/Sub
queue.subscribe("first_channel", "second_channel") # активируем прослушивание каналов с перечисленными именами
# бесконечный цикл, который последовательно обрабатывает все приходящие сообщения
while True:
time.sleep(0.01) # небольшой интервал обработки
msg = queue.get_message() # достаем сообщение
if msg: # если сообщение имеет содержимое
if not isinstance(msg["data"], int): # проверяем, чтобы полученные данные не имели тип числа перед выводом в консоль
print(msg["data"])
Теперь создадим скрипт, отправляющий последовательность сообщений в очередь:
sudo nano pubsub_producer.py
Код скрипта будет следующим:
import redis
# подключение к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
connection.publish('first_channel', 'Данные в первый канал') # это сообщение помещается в первый канал
connection.publish('second_channel', 'Данные во второй канал') # это сообщение помещается во второй канал
Сначала нужно запустить обработчик очереди сообщений в уже открытой консоли:
python pubsub_consumer.py
А после необходимо открыть второй консольный терминал и перейти в ранее созданный каталог проекта:
cd queue
Снова выполняем активацию среды:
source ./venv/bin/activate
После чего запускаем отправителя сообщений:
python pubsub_producer.py
В первом терминале появится такой вывод:
Данные в первый канал
Данные во второй канал
Очередь с помощью List
Аналогичную по функционалу очередь можно реализовать с помощью List.
Давайте сперва создадим скрипт обработчика сообщений:
sudo nano list_consumer.py
Его код будет таким:
import redis
import random
import time
# подключение к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
# бесконечный цикл, который последовательно обрабатывает все приходящие сообщения
while True:
time.sleep(0.01) # небольшой интервал обработки
if connection.llen("listQueue") != 0: # если длина очередь сообщений больше нуля
msg = connection.rpop("listQueue") # извлекаем новое сообщение
if msg: # если сообщение содержит данные
print(msg) # выводим сообщение в консоль
Теперь реализуем файл отправителя:
sudo nano list_producer.py
Содержимое будет такое:
import redis
import random
# подключение к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
# отправка 3-х сообщений
for i in range(0,3):
connection.lpush("listQueue", "Сообщение №" + str(random.randint(0, 100)))
Сперва запустим скрипт отправителя сообщений:
python list_producer.py
А уже после после этого выполним обработку очереди сообщений другим скриптом:
python list_consumer.py
Консольный вывод будет примерно следующим:
Сообщение №12
Сообщение №45
Сообщение №86
Очередь с помощью Stream
Несколько отличную логику очереди сообщений можно реализовать с помощью так называемых «Потоков».
Потоки управляются следующими командами:
- XADD. Позволяет добавить новую запись в поток.
- XREAD. Позволяет прочитать записи из поток.
- XRANGE. Позволяет извлечь несколько записей.
- XLEN. Позволяет узнать длину потока.
Аналогично предыдущим примерам, создадим скрипт отправителя сообщений:
sudo nano stream_producer.py
Код внутри следующий:
import redis
import random
# подключение к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
# помещаем в поток 3 сообщения
for i in range(0,3):
connection.xadd("stream_queue", { "data":"Сообщение №" + str(random.randint(0, 100))})
print("Длина очереди: " + str(connection.xlen("stream_queue"))) # выводим в консоль длину потока
Теперь реализуем скрипт обработчика сообщений:
sudo nano stream_consumer.py
Код внутри такой:
import redis
import random
# подключение к серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
len = connection.xlen("stream_queue") # узнаем длину потока
if len > 0:
messages = connection.xread(count=len, streams={"stream_queue":0}) # получаем весь список сообщений в потоке
# перебираем список сообщений
for msg in messages:
print(msg) # выводим сообщение в консоль
Сперва запускаем скрипт отправителя сообщений:
python stream_producer.py
В консоли должен появиться соответствующий вывод, сообщающий длину потока:
Длина очереди: 3
Далее запускаем скрипт обработчика сообщений:
python stream_consumer.py
Консольный вывод будет примерно таким:
[stream_queue, [('1711712995031-0', {'data': 'Сообщение №74'}), ('1711712995033-0', {'data': 'Сообщение №54'})]]
Можно заметить, что сообщения внутри потоков обладают уникальным идентификатором, который автоматически присваивает сам Redis.
Проблема приведенного примера в том, что при каждом чтении потока извлекаются одни и те же сообщения.
Поэтому код чтения можно доработать, читая только новые сообщения внутри потока:
import redis
import random
# подключение к удаленному серверу Redis
connection = redis.Redis(
host="IP", # нужно указать IP-адрес сервера Redis
password="PASSWORD", # нужно указать root-пароль сервера Redis
port=6379,
db=0,
decode_responses=True
)
# потребуется переменная внутри базы данных Redis, которая будет содержать идентификатор последнего сообщения в качестве значения
if connection.get("last") == None: # проверяем существование этой переменной на тот случай, если код обработчика запускается повторно
connection.set("last", 0) # по умолчанию значением будет 0
len = connection.xlen("stream_queue") # получаем длину потока
if len > 0: # если внутри потока есть сообщения
messages = connection.xread(count=len, block=1000, streams={"stream_queue":connection.get("last")}) # в момент чтения передаем ID последнего сообщения в качестве аргумента (либо 0)
print(connection.get("last")) # для наглядности показываем в консоли идентификатор последнего сообщения или 0, если его еще нет
# обрабатываем все новые сообщения
for msg in messages:
print(msg) # показываем данные сообщения в терминале
connection.set("last", msg[-1][-1][0]) # обновляем идентификатор последнего обработанного сообщения
Благодаря обновленному коду повторный запуск этого скрипта будет обрабатывать только новые сообщения.
Заключение
Использование базы данных Redis в качестве брокера сообщений возможно благодаря трем вариантам создания очереди сообщений:
- Pub/Sub (Публикация/Подписка)
- List (Список)
- Stream (Поток)
Все эти способы основаны на встроенных функциях управления данными — с каждой можно ознакомиться подробнее в официальной документации Redis:
В этом руководстве были рассмотрены лишь базовые реализации брокера сообщений с использованием этих функций. В реальных проектах их необходимо доработать под конкретную реализацию серверной инфраструктуры.