1.Введение
1.1 Что такое Apache kafka?
Apache kafka— это распределённая система, предназначенная для обработки потоков данных в режиме реального времени. Её можно сравнить с почтой — одни сервисы передают туда сообщения-письма, а другие — получают. Apache Kafka называют брокером сообщений, потому что она выступает в качестве посредника.
Apache kafka так же доступна на Linux, Windows, MacOS.
1.2. Спецификация окружающей среды:
1.2.1. Минимальные системные требования:
- CPU - 4 cores
- Memory - 8 gb
- Stograge - 500 gb
- OS - RedOS 7.3 (RHEL, CentOS)
1.2.2 Моя конфигурация системы:
- ip - 192.168.0.1 dnsname vm-kafka-01
- ip - 192.168.0.2 dnsname vm-kafka-02
- ip - 192.168.0.3 dnsname vm-kafka-03
2. Подготовка системы
Проделываем дальнейшие действия на всех трёх нодах
2.1 Установка служебных пакетов:
Для работы нам понадобятся некоторые утилиты:
yum install curl tar wget
Брандмауэр отключен по умолчанию
2.2 Установка OpenJdk:
yum install java-11-openjdk-devel
OpenJDK установлен.
Смотрим версию java:
java -version
3.Установка и настройка Kafka:
3.1 Переходим на страницу загрузки Kafka и копируем ссылку на скачивание:
3.2 Используя скопированную ссылку, скачиваем бинарник:
wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
3.3 Создадим каталог, куда установим кафку:
mkdir /opt/kafka
3.4 Распакуем скачанный архив в созданный каталог:
tar zxf kafka_*.tgz -C /opt/kafka --strip 1
4 Настройка запуска kafka
Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka.
4.1 Для создания пользователя вводим:
useradd -r -c 'Kafka broker user service' kafka
4.2 Назначим владельцем созданного пользователя для каталога кафки:
chown -R kafka:kafka /opt/kafka
4.3 Создаем первый юнит-файл для автозапуска zookeeper:
Открываем файл:
vi /etc/systemd/system/zookeeper.service
В файле прописываем:
[Unit]
Description=ZooKeeper for Kafka
After=network.target
[Service]
Type=forking
User=kafka
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
[Install]
WantedBy=multi-user.target
4.4 Создаем второй файл для автозапуска kafka:
Открываем файл:
vi /etc/systemd/system/kafka.service
В файле прописываем:
[Unit]
Description=Kafka Broker
After=network.target
After=zookeeper.service
[Service]
Type=forking
User=kafka
SyslogIdentifier=kafka (%i)
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
[Install]
WantedBy=multi-user.target
4.5 Перечитываем конфигурацию systemd, чтобы подхватить изменения:
systemctl daemon-reload
4.6 Разрешаем автозапуск сервисов zookeeper и kafka:
systemctl enable zookeeper kafka
4.7 Стартуем кафку:
systemctl start kafka
Обратим внимание, достаточно запустить службу kafka — благодаря настроенной зависимости, zookeeper стартует автоматически.
4.8 Проверить, что нужный нам сервис запустился и работает на порту 9092:
ss -tunlp | grep :9092
5 Тестовый обмен сообщениями
5.1 После установки и настройки и установки попробуем немного работать с kafka и проверить, что сервис работает. Создадим тему для сообщений и отправим текст Hello, World from Kafka.
Нам понадобиться три скрипта, которые идут в комплекте с кафкой:
- kafka-topics.sh — создает тему, куда будем отправлять сообщение.
- kafka-console-producer.sh — создает обращение издателя, который отправляет сообщение.
- kafka-console-consumer.sh — формирует запрос к брокеру и получает сообщение.
5.2 Первой командой создадим тему:
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test
* где:
- /opt/kafka — путь, куда была установлена нами кафка.
- bootstrap-server localhost:9092 — адрес хоста kafka. Предполагается, что мы запускаем нашу команду на том же сервере, где ее и развернули.
- replication-factor — количество реплик журнала сообщений.
- partitions — количество разделов в теме.
- topic Test — в нашем примере мы создадим тему с названием Test.
5.3 Теперь отправляем сообщение брокеру:
echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test
* в данном примере мы отправляем в наш сервер сообщение Hello, World from Kafka.
5.4 Попробуем достать сообщение. Выполняем команду:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning
* опция from-beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика (отправки запроса на чтения данных из kafka).
5.5 Мы должны увидеть:
Hello, World from Kafka
6 Установка Kafka на 3 ноды окончена. Продолжим настройку. Объединим ноды в кластер.
6.1 Создаем каталоги для логов Kafka и для zooKeeper
mkdir -p /opt/kafka/zookeeper/data
mkdir -p /opt/kafka/kafka-logs
6.2 Переходим к конфигурации zooKeeper, открываем файл с конфигурацией
vi /opt/kafka/config/zookeeper.properties
и указываем директорию с данными:
dataDir=/opt/kafka/zookeeper/data
Сервера и лимиты синхронизации:
server.1=192.168.0.1:2888:3888
server.2=192.168.0.2:2888:3888
server.3=192.168.0.3:2888:3888
initLimit=5
syncLimit=2
6.3 Далее, на каждом сервере создаем свой id для zooKeeper
echo "1" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-01)
echo "2" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-02)
echo "3" > /opt/kafka/zookeeper/data/myid (для сервера vm-kafka-03)
6.4 Переходим к настройке kafka
Редактируем конфиг сервера
vi /opt/kafka/config/server.properties
Добавляем:
broker.id=1 (для каждого сервера свой 1,2,3)
директорию с логами
log.dirs=/opt/kafka/kafka-logs
указываем прослушиватели:
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.0.1:9092
*(Для каждой ноды свой адрес, 192.168.0.1:9092 192.168.0.2:9092 192.168.0.3:9092)
*listeners — используется для внутреннего траффика между нодами кластера
*advertised.listeners используется для клиентcкого траффика
указываем ноды zooKeeper:
zookeeper.connect=192.168.18.150:2181,192.168.18.151:2181,192.168.18.152:2181
6.5 Далее меняем владельца на пользователя kafka
chown -R kafka /opt/kafka
6.6 После загружаем информацию о новых сервисах и включаем их:
systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka
6.7 Перезагружаем сервера и убеждаемся что сервисы запущены
переходим в каталог с kafka
cd /opt/kafka
6.8 Проверим что zooKeeper работает корректно и все ноды видят друг друга
bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
6.8 вывод должен быть таким:
Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[1, 2, 3]