Найти в Дзене
Linux Blog

Установка и настройка Kafka кластера

Apache kafka— это распределённая система, предназначенная для обработки потоков данных в режиме реального времени. Её можно сравнить с почтой — одни сервисы передают туда сообщения-письма, а другие — получают. Apache Kafka называют брокером сообщений, потому что она выступает в качестве посредника. Apache kafka так же доступна на Linux, Windows, MacOS. 1.2.1. Минимальные системные требования: 1.2.2 Моя конфигурация системы: Проделываем дальнейшие действия на всех трёх нодах Для работы нам понадобятся некоторые утилиты: yum install curl tar wget Брандмауэр отключен по умолчанию yum install java-11-openjdk-devel Смотрим версию java: java -version wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz mkdir /opt/kafka tar zxf kafka_*.tgz -C /opt/kafka --strip 1 Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka. useradd -r -c 'Kafka broker user service' kafka chown -R ka
Оглавление

1.Введение

1.1 Что такое Apache kafka?

Apache kafka— это распределённая система, предназначенная для обработки потоков данных в режиме реального времени. Её можно сравнить с почтой — одни сервисы передают туда сообщения-письма, а другие — получают. Apache Kafka называют брокером сообщений, потому что она выступает в качестве посредника.

Apache kafka так же доступна на Linux, Windows, MacOS.

1.2. Спецификация окружающей среды:

1.2.1. Минимальные системные требования:

  • CPU - 4 cores
  • Memory - 8 gb
  • Stograge - 500 gb
  • OS - RedOS 7.3 (RHEL, CentOS)

1.2.2 Моя конфигурация системы:

  • ip - 192.168.0.1  dnsname vm-kafka-01
  • ip - 192.168.0.2  dnsname vm-kafka-02
  • ip - 192.168.0.3  dnsname vm-kafka-03

2. Подготовка системы

Проделываем дальнейшие действия на всех трёх нодах

2.1 Установка служебных пакетов:

Для работы нам понадобятся некоторые утилиты:

yum install curl tar wget

Брандмауэр отключен по умолчанию

2.2 Установка OpenJdk:

yum install java-11-openjdk-devel

OpenJDK установлен.

Смотрим версию java:

java -version

3.Установка и настройка Kafka:

3.1 Переходим на страницу загрузки Kafka и копируем ссылку на скачивание:

-2

3.2 Используя скопированную ссылку, скачиваем бинарник:

wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

3.3 Создадим каталог, куда установим кафку:

mkdir /opt/kafka

3.4 Распакуем скачанный архив в созданный каталог:

tar zxf kafka_*.tgz -C /opt/kafka --strip 1

4 Настройка запуска kafka

Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka.

4.1 Для создания пользователя вводим:

useradd -r -c 'Kafka broker user service' kafka

4.2 Назначим владельцем созданного пользователя для каталога кафки:

chown -R kafka:kafka /opt/kafka

4.3 Создаем первый юнит-файл для автозапуска zookeeper:

Открываем файл:

vi /etc/systemd/system/zookeeper.service

В файле прописываем:

[Unit]
Description=ZooKeeper for Kafka
After=network.target
[Service]
Type=forking
User=kafka
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
[Install]
WantedBy=multi-user.target

4.4 Создаем второй файл для автозапуска kafka:

Открываем файл:

vi /etc/systemd/system/kafka.service

В файле прописываем:

[Unit]
Description=Kafka Broker
After=network.target
After=zookeeper.service
[Service]
Type=forking
User=kafka
SyslogIdentifier=kafka (%i)
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
[Install]
WantedBy=multi-user.target

4.5 Перечитываем конфигурацию systemd, чтобы подхватить изменения:

systemctl daemon-reload

4.6 Разрешаем автозапуск сервисов zookeeper и kafka:

systemctl enable zookeeper kafka

4.7 Стартуем кафку:

systemctl start kafka

Обратим внимание, достаточно запустить службу kafka — благодаря настроенной зависимости, zookeeper стартует автоматически.

4.8 Проверить, что нужный нам сервис запустился и работает на порту 9092:

ss -tunlp | grep :9092

5 Тестовый обмен сообщениями

5.1 После установки и настройки и установки попробуем немного работать с kafka и проверить, что сервис работает. Создадим тему для сообщений и отправим текст Hello, World from Kafka.

Нам понадобиться три скрипта, которые идут в комплекте с кафкой:

  • kafka-topics.sh — создает тему, куда будем отправлять сообщение.
  • kafka-console-producer.sh — создает обращение издателя, который отправляет сообщение.
  • kafka-console-consumer.sh — формирует запрос к брокеру и получает сообщение.

5.2 Первой командой создадим тему:

/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test

* где:

  • /opt/kafka — путь, куда была установлена нами кафка.
  • bootstrap-server localhost:9092 — адрес хоста kafka. Предполагается, что мы запускаем нашу команду на том же сервере, где ее и развернули.
  • replication-factor — количество реплик журнала сообщений.
  • partitions — количество разделов в теме.
  • topic Test — в нашем примере мы создадим тему с названием Test.

5.3 Теперь отправляем сообщение брокеру:

echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test

* в данном примере мы отправляем в наш сервер сообщение Hello, World from Kafka.

5.4 Попробуем достать сообщение. Выполняем команду:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning

* опция from-beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика (отправки запроса на чтения данных из kafka).

5.5 Мы должны увидеть:

Hello, World from Kafka

6 Установка Kafka на 3 ноды окончена. Продолжим настройку. Объединим ноды в кластер.

6.1 Создаем каталоги для логов Kafka и для zooKeeper

mkdir -p /opt/kafka/zookeeper/data
mkdir -p /opt/kafka/kafka-logs

6.2 Переходим к конфигурации zooKeeper, открываем файл с конфигурацией

vi /opt/kafka/config/zookeeper.properties

и указываем директорию с данными:

dataDir=/opt/kafka/zookeeper/data

Сервера и лимиты синхронизации:

server.1=192.168.0.1:2888:3888
server.2=192.168.0.2:2888:3888
server.3=192.168.0.3:2888:3888
initLimit=5
syncLimit=2

6.3 Далее, на каждом сервере создаем свой id для zooKeeper

echo "1" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-01)

echo "2" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-02)

echo "3" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-03)

6.4 Переходим к настройке kafka

Редактируем конфиг сервера

vi /opt/kafka/config/server.properties

Добавляем:

broker.id=1 (для каждого сервера свой 1,2,3)

директорию с логами

log.dirs=/opt/kafka/kafka-logs

указываем прослушиватели:

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.0.1:9092

*(Для каждой ноды свой адрес, 192.168.0.1:9092 192.168.0.2:9092 192.168.0.3:9092)

*listeners — используется для внутреннего траффика между нодами кластера

*advertised.listeners используется для клиентcкого траффика

указываем ноды zooKeeper:

zookeeper.connect=192.168.18.150:2181,192.168.18.151:2181,192.168.18.152:2181

6.5 Далее меняем владельца на пользователя kafka

chown -R kafka /opt/kafka

6.6 После  загружаем информацию о новых сервисах и включаем их:

systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka

6.7 Перезагружаем сервера и убеждаемся что сервисы запущены

переходим в каталог с kafka

cd /opt/kafka

6.8 Проверим что zooKeeper работает корректно и все ноды видят друг друга

bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

6.8 вывод должен быть таким:

Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[1, 2, 3]