В этой статье я расскажу про внешнюю компоненту для взаимодействия 1С с брокером сообщений Apache Kafka. Компонента написана с применением технологии NativeAPI , поэтому она будет работать на Windows и Linux.
Компонента разработана на двух языках. Основная логика написано на языке Rust и язык С++ используется для взаимодействия кода на Rust с платформой. За счет того что на Rust написано большая часть кода, значительно снижается риск утечки памяти и прочие проблемы присущие приложениям на С++. Так же разработка на Rust снижает время разработки/доработки/исправление ошибок.
Для объективности, замечу, что для интеграции 1С и kafka не обязательно использовать внешние компоненты. Для kafka есть готовые rest proxy, которые выполняют роль посредника между 1С и сервером kafka. Это вполне приемлемое решение, за исключением одного недостатка - сложность архитектуры. Все это сервисы нужно настроить и поддерживать в рабочем состоянии и каждый такой сервис это дополнительная точка отказа.
Настройка тестовой среды (сервер kafka)
Если у вас уже есть запущенная среда с сервером, этот раздел можно пропустить и сразу перейти к следующему разделу "Демонстрация с примерами".
Перед началом демонстрации работы компоненты покажу пример того как можно настроить тестовую среду.
Запуск сервера kafka
Самый простой способ запуска, это запуск в контейнере. Есть много разных сборок (все варианты можно найти на https://hub.docker.com/)
Я буду использовать bitnami/kafka
Для запуска необходимо выполнить одну команду в терминале (это одна строка)
sudo docker run --rm -it --hostname kafka-server \
-p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.159:9092 \
bitnami/kafka:latest
Пояснение: -rm удалит контейнер после его закрытия, удобно для тестового разового запуска, -it после запуска останется подключение к контейнеру, если нужно чтобы контейнер продолжил работу в фоне, надо удалить этот параметр, --hostname kafka-server требуется для обращения к хосту внутри контейнера, -p 9092:9092 проброс портов снаружи внутрь контейнера, -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.159:9092 это важный параметр, его значение будет передаваться клиентам, именно этот ip они будут использовать для обращения к брокерам. Надо заменить ip на свой адрес. В моем случае это адрес хостовой машины где запущен докер с брокером.
Запуск сервиса для администрирования через web интерфейс
Для этого я так же предлагаю использовать докер образ, запуск
sudo docker run --rm -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
Аналогично, --rm удалит контейнер после закрытия, -it оставит подключение к контейнеру, это удобно, чтобы видеть в консоли ошибки при добавлении брокеров (если они будут), -p 8080:8080 проброс портов в контейнер.
После запуска, можно открыть web интерфейс для администрирования по адресу http://127.0.0.1:8080/
Создание кластера, добавление брокеров
В web интерфейсе надо указать название кластера, добавить ip брокера (тот который был указан в KAFKA_CFG_ADVERTISED_LISTENERS) и нажать Submit.
Если все сделано верно, через несколько секунд слева появится название кластера и можно будет просмотреть список брокеров.
Если списка брокерв нет, надо перепроверить конфигурацию запуска контейнера с kafka, проверить что он запущен и слушает порт 9092 (проверить можно командой ss -tl она должна вернуть список портов которые слушает брокер, должен быть порт 9092). Так же посмотреть в консоль где запущен контейнер с web администрированием, если настроено что-то не верно, в консоли будут ошибки о невозможности подключиться к указанному брокеру.
Добавление топика
И последий шаг, это добавление топика, в который будут отправлять сообщения через компоненту на 1С.
Для добавления нужно перейти в раздел топики и добавить новый топик
На этом создание тестовой среды завершено и можно перейти к демонстрации работы внешней компоненты.
Демонстрация с примерами
Перейду к демонстрации возможностей и примерам.
Для начала надо скачать саму компоненту по адресу.
Так же имеется видео обзор.
Компонента поставляется в двух вариантах, платном и бесплатном. У бесплатного есть одно ограничение - размер сообщения, оно ограничено длиной 2000 символов.
Компонента поставляется в виде конфигурации с примерами использования. Использование компоненты реализовано через обработку КлиентKafka которая является оберткой для более удобной программной работы с методами компоненты. Необходимо скопировать эту обработку в конфигурации.
Отправка сообщения
Первый пример это отправка сообщения в топик. Отправка реализуются таким кодом
В первой строке создается экземпляр обработки, через которую реализовано взаимодействие с компонентой.
Во второй, создается конфигурация отправителя с указанием брокеров. Если брокеров будет несколько, их можно указать через запятую.
В третьей строке создается экземпляр отправителя. Эта переменная далее будет использоваться для отправки сообщений, так же она будет передаваться в метод УдалитьОтправителя() для удаления экземпляра отправителя.
В четвертой строке создается массив заголовков сообщения.
В пятой строке подготавливается сообщение. Здесь указывается отправитель, имя топика, само сообщение, ключ, раздел (если требуется явно указать в какой раздел должно быть отправлено сообщение), заголовки и если необходимо таймаут.
И в строке номер шесть непосредственно отправляется подготовленное сообщение.
Если отправка произошла корректно, в переменной Ответ будет содержаться структура на подобии скриншота ниже.
Структура содержит свойство Раздел, это номер раздела, куда было опубликовано сообщение, Смещение - это позиция сообщения в топике и тип ответ. Тип нужен для контроля результата, если произошла ошибка, то в свойстве тип будет содержаться значение Error и так же будет свойство Текст c описанием ошибки.
И в последней строке, номер семь, удаляется экземпляр отправителя. Это нужно делать обязательно.
Получение сообщения
После отправки сообщения в топик, его можно прочитать таким образом
В первой строке создается экземпляр обработки через которую реализована работа с компонентой.
Далее идет блок строк номер 2. В нем создается конфигурация для отправителя. Здесь указывается Топик из которого будут получены сообщения, Теги (поясню позже на примере), группа потребителей. Группа нужно, если нам нужно объединить потребителей в группу для параллельного получения сообщений из нескольких разделов или чтобы сохранять смещение. Сохранять смещение можно только для группы потребителей. Поэтому если нам нужно сохранять смещение, то надо создать группу потребителей, даже если в группе будет только один потребитель. Указываются брокеры. Можно указать несколько брокеров через запятую, если один брокер выйдет из строя, клиент продолжит получение из других указанных брокеров. Параметр enable.auto.coomit указывает что мы будем сохранять смещение для прочитанных сообщению вручную, через метод СохранитьСмещение(). Параметр auto.offset.earliest указывает что при запуске получения сообщений, получение сообщений начнется с последнего не подтвержденного.
В третьей строке запускается поток получения сообщений. Технически получение сообщений реализовано таким образом: при вызове метода НачатьПолучениеСообщений() в коде на Rust запускается поток для получения сообщений, который будет ожидать новые сообщений из топика, а 1С в свою очередь будет получать эти сообщения от потока на Rust через канал. Переменная канал, которую возвращает метод НачатьПолучениеСообщений() это и есть экземпляр канала, через который 1С будет получает сообщения от кода на Rust (напомню что основная логика внешней компоненты написана на языке Rust).
В строке номер четыре непосредственно происходит получение сообщений из канала, который был создан в строке три. Метод ПолучитьСообщение() является блокирующим, то есть он блокирует поток 1С в месте вызова, до момента пока в топике не появятся сообщения. После того, как сообщение появится в топике, его вернет метод ПолучитьСообщение().
Содержание переменной Сообщение будет содержать следующую структуру
Структура содержит всю информацию о сообщении, а именно:
Заголовки (это массив структур который был добавлен при отправке), Значение - это само сообщение, основная полезная нагрузка сообщения. Ключ - это дополнительная свойство сообщения, его можно использовать например чтобы все сообщения с одним и тем же ключом отправить в одни раздел топика. Если разделов несколько, kafka будет автоматически распределять сообщения по этим разделам, но сообщения с одинаковым ключом, будут отправляться всегда в один раздел. Это удобно если надо обеспечить при получении туже последовательность что и при отправке. Раздел - это номер раздела, куда kafka поместила сообщение. Смещение - это позиция сообщения в топике. Тип - нужен для контроля ошибок. Если произошла какая-то ошибка при получении сообщения, в типе будет указано значение Error.
И в последней строке номер пять, сохранятся смещение для топика, т.е. мы сохраняем позицию на которой мы завершили чтение сообщений в топике. При следующей сессии чтения, kafka будет отправлять сообщения следующие после сохраненного смещения.
Управление потоком получения сообщений
Как я заметил выше, метод ПолучитьСообщение() является блокирующим, то есть он блокирует поток выполнения кода 1С, до момента пока в топике не появится сообщение. Но иногда необходимо прервать ожидание сообщений. Для этого, можно открыть отработку в режиме предприятия, выбрать строку потока получения и нажать завершить.
Так же, получить список каналов получения, которые в данный момент ожидают получение сообщения и закрыть их, можно программно.
Пример можно посмотреть в этой же обработке.
Так же можно проверить состояние канала получения, отправил ему ping
Если канал "живой" и ожидает сообщения из топика, он вернет pong.
Запуск в фоновом задании
В реальных проектах удобно запускать получении сообщений в фоновом задании, где будет реализовано постоянное ожидание новых сообщений из топика. Так же, таким образом можно реализовать параллельное получение сообщений несколькими фоновыми заданиями (с применением группы потребителей).
Если необходимо завершить одна из фоновых заданий, то его необходимо завершать через методы описанные выше ПолучитьСписокКаналов() и ЗакрытьКаналПотребителя(). Для того, чтобы различать фоновые задания (если их будет несколько), в методе НачатьПолучениеСообщений(), надо указать в параметре Теги произвольный текст.
Тогда, этот текст, будет отображаться в результате который вернет метод ПолучитьСписокКаналов() или в табличной части в режиме предприятия.
Обработка ошибок
Каждый метод обработки КлиентKafka возвращает в качестве результата соответствующую структуру. Например получение сообщений вернет структуру типа Message.
Если же происходит ошибка, при вызове любого метода, то он вернет структуру с типом Error и описанием ошибки. Обработка ошибок реализуется не через оборачивание кода в попытку, а через контроль типа возвращаемого результата. Подход похож на такие языки как GO и Rust, где не используются такие конструкции языка как попытки и исключения.
Больше примеров
Больше примеров можно посмотреть в конфигурации-примере, по ссылке.
Приобретение полной версии
По вопросам приобретения полное версии, если не хватает возможностей бесплатной, можно написать в телеграм автору.
Контанты
Телеграм канал с прочими видео автора.
Телеграм канал с информацией о новых версиях внешних компонент автора.