Apache Kafka

Материал из ALT Linux Wiki

Предварительные требования:

  • 1. Не меньше 4GB RAM на сервере. Инсталляция с меньшим количеством ОЗУ может закончиться с ошибкой "out of memory" от JVM при старте.
  • 2. На сервере должен быть установлен OpenJDK не менее 8 версии.

Шаг 1. Создание пользователя

Так как сервер Kafka обрабатывает запросы, получаемые из сети Интернет, необходимо создать специального пользователя. Это сведёт к минимуму ущерб, нанесённый серверу Kafka, в случае компроментации. Мы создадим отдельного пользователя с именем Kafka. Конечно кроме него на сервере неплохо бы иметь и другого пользователя с правами sudo, для выполнения других задач администрирования, не относящихся к обслуживанию сервера Kafka. Установим sudo:

 # apt-get install sudo 

Разрешим использовать sudo только пользователяем из группы wheel

 # control sudowheel enable 

Создаём пользователя kafka с домашней директорией:

 # useradd kafka -m 

Задаём пароль:

 # passwd kafka 

Добавим пользователя kafka в группу wheel:

 # usermod -aG wheel kafka 

Ну и собственно логинимся под свежесозданным пользователем:

 # su -l kafka 

Шаг 2. Начало установки Kafka

В первую очередь установим java:

 $ sudo apt-get install java-1.8.0-openjdk 

Создадим директорию для скачивания исходников

 $ mkdir ~/Downloads && cd ~/Downloads 

Скачиваем актуальную версию кафки:

 $ wget -O kafka.tgz https://www.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz 

Создаём каталог с именем kafka, и сразу переходим в него:

 $ mkdir ~/kafka && cd ~/kafka 

После чего распаковываем в него скачанный архив с Apache Kafka

 $ tar -xvzf ~/Downloads/kafka.tgz --strip 1 

Шаг 3. Конфигурирование сервера Kafka

По умолчанию сервер Kafka не позволяет нам удалить тему, категории, группы, или имя канала в котором могут публиковаться сообщения. Для исправления такого поведения предлагается отредактировать конфигурационный файл server.properties. Необходимо добавить в конец этого файла настройки, которые позволят удалить темы Kafka:

 $ echo "delete.topic.enable = true" >> ~/kafka/config/server.properties 

Шаг 4. Создаём systemd unit файл, и запускаем сервер Kafka

На этом шаге необходимо создать файлы модулей sytemd для служб Kafka.

Создание сервиса Zookeeper Поскольку для управления состоянием кластера и конфигурациями Kafka использует службу Zookeeper, то сначала необходимо сконфигурировать сервис systemd для этой службы. Для детального изучения zookeeper лучше посетить сайт с официальной документацией - https://zookeeper.apache.org/doc/current/index.html

 $ vim /etc/systemd/system/zookeeper.service 

Наполняем следующим содержимым:

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

В секции [Service] определено использование скриптов start и stop для сервиса zookeeper, а так же возможность автоматического рестарта при возникновении аномального поведения Сохраняем файл и выходим из vim

Создание сервиса Kafka

 $ sudo vim /etc/systemd/system/kafka.service 

С следующим содержимым:

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

В секции [Unit] определяем зависимость от сервиса zookeeper. В секции [Service] определяем использование скриптов start и stop, и добавляем возможность автоматического рестарта при возникновении аномального поведения

Сохраняемся и выходим.

Запуск сервиса Kafka

 $ sudo systemctl start kafka 

Проверяем логи старта

 $ journalct -u kafka 

Если всё успешно, вывод должен быть без ошибок, и последним сообщением должно значится что-то в духе:

 Jul 26 14:08:59 kafka-centos systemd[1]: Started kafka.service. 

Теперь kafka готова к работе и прослушивает порт 9092, проверить это можно с помощью ss:

 $ netstat -tulpn | grep LISTEN 

Включим запуск сервиса kafka по умолчанию:

 $ sudo systemctl enable kafka 

Шаг 5. Тестирование инсталляции

Попробуйте отправить на сервер Kafka сообщение "Hello World", чтобы убедиться в корректном поведении сервера Kafka: Публикация сообщений в kafka требует двух компонентов: 1. Producer - позволяет публиковать записи и данные по темам 2. Consumer - читает сообщения и данные по темам. Создадим первый тестовый topic с названием TutorialTopic

 $ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic 

В случае успешного выполнения команды увидим надпись:

 Created topic "TutorialTopic". 

Далее вы можете создать producer из командной строки с помощью скрипта kafka-console-producer.sh он будет ожидать в качестве аргументов имя хоста, порт и название темы. Опубликуем строку Hello, World в тему TutorialTopic, набрав:

 echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null 

Далее создаем consumer, используя для этого скрипт kafka-console-consumer.sh . Он так же в качестве аргументов будет ожидать имя хоста, порт и название темы. Следующая команда прочтёт сообщение из TutorialTopic:

  • обратите внимание на флаг --from-beginning, который позволяет прочесть сообщения опубликованные раньше, чем

consumer был запущен.

 $ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning 

Если всё работает нормально, команда выведет в консоли сообщение:

 Hello, World 

и будет дальше продолжать работать, пока не будет остановлена принудительно через ctrl^c

На этом основная часть установки сервера Kafka заканчивается. Для повышения удобства администрирования вы можете установить инструмент KafkaT, который позволит упростить выполнение ряда административных задач из командной строки.