Найти тему
wcademy

Построение простой очереди сообщений с помощью NSQ

Привет, программист! Добро пожаловать в статью (а может серию статей) посвящённую NSQ. Об этой технологии не так уж много материалов, так что я решил познакомить тебя с ней. Надеюсь, вам понравится!

Почему NSQ?

NSQ — реалтаймовая распределённая платформа обмена сообщениями написанная на Go командой разработчиков сервиса bit.ly.

Платформа простая и понятная, особенно если сравнивать её с похожими системами (как Kafka или RabbitMQ), её легко использовать и у нее простая интуитивная админка. Если тебе никогда еще не приходилось использовать очереди сообщений, то NSQ — отличный вариант для понимания основных подходов.

Идея очередей сообщений

Очереди сообщений реализуют архитектурный паттерн "издатель-подписчик", который используется для связи между разными частями (приложениями, сервисами) системы.

-2

Когда происходит какое-либо событие (к примеру, пользователь зарегистрировался), сообщение публикуется в очередь сообщений. Сервисы, подписанные на этот тип сообщений, получают информацию о событии и могут совершить необходимое действие (к примеру, сервис email-рассылок отошлёт приветственное сообщение).

Скачиваем NSQ

Скачивай с официального сайта (https://nsq.io/deployment/installing.html) версию под твою ось. Если у тебя мак, можешь поставить через brew:

brew install nsq

После распаковки архива у нас будет россыпь файлов:

  • nsqlookupd.exe;
  • nsqd.exe;
  • nsqadmin.exe
  • и много других, но они нам сейчас не так интересны;

Запускаем nsqlookupd

Открой распакованную папке в терминале, который тебе больше нравится, и запустим:

./nsqlookupd

Ты увидишь что-то вроде такого:

└─▪ nsqlookupd
[nsqlookupd] 2020/02/16 20:19:20.443726 INFO: nsqlookupd v1.2.0 (built w/go1.13.5)
[nsqlookupd] 2020/02/16 20:19:20.446779 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2020/02/16 20:19:20.446807 INFO: TCP: listening on [::]:4160

Это говорит нам, что nsqlookupd запущен и имеет два интерфейса: TCP на порту 4160 и HTTP на 4160.

Для проверки, что все работает, откроем браузер с адресом http://localhost:4161/ping

Если у тебя тоже "OK" - все отлично. Это http api позволяет нам проверять статус nsq, известные ноды, создавать/просматривать/удалять темы (topics), каналы (channels), подробнее в доках (ты же знаешь, или хотя бы учишь инглиш?).

По сути, nsqlookupd — "сервис обнаружения" (это странное словосочетание подсказала википедия, все говорят service discovery). Он помогает подписчикам найти издателей, с которых можно читать сообщения конкретной темы.

Запускаем nsqd

Теперь в другом окне терминала запускаем nsqd:

./nsqd

Если всё хорошо, то ты увидишь похожее:

└─▪ nsqd
[nsqd] 2020/02/16 20:49:55.184982 INFO: nsqd v1.2.0 (built w/go1.13.5)
[nsqd] 2020/02/16 20:49:55.186159 INFO: ID: 357
[nsqd] 2020/02/16 20:49:55.186670 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2020/02/16 20:49:55.193869 INFO: HTTP: listening on [::]:4151
[nsqd] 2020/02/16 20:49:55.193920 INFO: TCP: listening on [::]:4150

Публикуем сообщение

Теперь настало время опубликовать первое сообщение в очередь. Для этого нужно сделать POST запрос к http://localhost:4151/pub?topic=test с json-контентом:

{
"text": "some message"
}

Я, к примеру, люблю httpie (напомните, пожалуйста, к следующей неделе написать шпаргалку по нему!):

└─▪ http POST http://localhost:4151/pub?topic=test text="some message"
HTTP/1.1 200 OK
Content-Length: 2
Content-Type: text/plain; charset=utf-8
Date: Sun, 16 Feb 2020 17:06:37 GMT
X-Nsq-Content-Type: nsq; version=1.0
OK

/pub — эндпоинт для создания сообщений. Он требует GET-параметр topic. Тема представляет собой имя для сообщения, все сообщения. которые были опубликованы с одинаковой темой будут получены всеми подписчиками это] темы.

Если код ответа 200, новая тема будет создан автоматически. В терминале с запущенным nsqd появятся сообщения об этом:

[nsqd] 2020/02/16 21:06:37.226017 INFO: TOPIC(test): created
[nsqd] 2020/02/16 21:06:37.226158 INFO: NSQ: persisting topic/channel metadata to nsqd.dat

Вторая строка говорит, что информация о созданно] теме была сохранена в файле nsqd.dat.

Его можно открыть в любом редакторе и ты увидишь имя созданной темы:

└─▪ cat nsqd.dat
{"topics":[{"channels":[],"name":"test","paused":false}],"version":"1.2.0"}

Но у нас есть и более удобный путь просмотреть темы, время познакомиться с NSQ Admin.

Запускаем NSQ Admin

Если просто запустить nsqadmin:

./nsqadmin

То выведется ошибка:

└─▪ nsqadmin
[nsqadmin] 2020/02/16 21:23:52.607636 FATAL: failed to instantiate nsqadmin - --nsqd-http-address or --lookupd-http-address required

Ошибка говорит, что nsqadmin хочет получить параметр с адресом nsqd или nsqlookupd. Ну так дадим ему то, что он хочет.

└─▪ nsqadmin --lookupd-http-address localhost:4161
[nsqadmin] 2020/02/17 21:56:54.413719 INFO: nsqadmin v1.2.0 (built w/go1.13.5)
[nsqadmin] 2020/02/17 21:56:54.414703 INFO: HTTP: listening on [::]:4171

Теперь можно открыть админку http://localhost:4171/. Кажется, что всё нормально… Но где же созданная тема test? А если перейти во вкладку Nodes, то мы снова не увидим ничего - NSQd Nodes (0). Почему?

При запуске nsqadmin, мы сообщили ему, как подключиться к nsqlookupd, а как же рабочая нода nsqd? Она ни к чему не подключена💥! Перезапустим её, передав параметр lookupd-tcp-address. Плюс передадим необязательный параметр broadcast-address. Он отвечает за то, какой домен будет возвращать nsqlookupd клиентам, по умолчанию это hostname, но для локальной работы это должен быть localhost.

nsqd -lookupd-tcp-address localhost:4160 -broadcast-address localhost

Как я уже писал, nsqlookupd — service discovery, и именно он является центральным сервисом объединяющим всё вместе.

Теперь если обновить страницу с открытой админкой мы увидим запущенную ноду и тему test. ✨

-3

Создание приложения потребителя

Нам нужно базовое приложение для получения сообщений. Создадим простое go приложение для этого. Создадим проект и установим клиентскую библиотеку для nsq:

mkdir nsqbasicconsumer
cd nsqbasicconsumer
go mod init nsqbasicconsumer
go get github.com/nsqio/go-nsq

Создадим main.go:

И запустим в папке проекта:

go run main.go

Приложение получит все сообщения из очереди. В консоли у нас должно быть такое:

└─▪ go run main.go
2020/02/17 22:58:52 INF 1 [test/test] querying nsqlookupd http://localhost:4161/lookup?topic=test
2020/02/17 22:58:52 INF 1 [test/test] (localhost:4150) connecting to nsqd
message {"text": "some message"}

Это произошло, потому что ранее отправленное сообщение ждало момента, когда его прочтут.

В админке, если выбрать Nodes -> localhost можно увидеть новый Client Host, подключённый несколько секунд (ну или минут) назад.

Тестируем получение сообщений

Оставляем main.go запущенным, и отправим ещё одно сообщение:

└─▪ http POST http://localhost:4151/pub?topic=test message="it works"

Оно должно́ сразу же появиться в консоли. Поздравляю! Работающая система очереди сообщений!

⚠️Если нажать Counters в админке, то отобразится количество отправленных сообщений.

Если отправить сообщение в другую тему, оно не отобразится, потому что мы подключились только к теме test.

В качестве бонуса похожий простой потребитель, но на TypeScript:

mkdir tsnsq
cd tsnsq
npm init -y
npm i @types/nsqjs nsqjs
npm i -D typescript

Компилируем

npx tsc index.ts

И запускаем:

node index.js

Результат должен быть идентичным.

🚀Если узнал из статьи что-то полезное, ставь лайк и подписывайся на Дзене или группе ВК 😏

Если код не отображается - временно отключи блокировщик рекламы, это все Дзен.

Все новые статьи можно найти на сайте - https://wcademy.ru/