DevOps

Установка Apache Kafka в Debian 10

Apache Kafka — популярный распределенный брокер сообщений, предназначенный для обработки больших объемов данных в реальном времени. Кластер Kafka отличается масштабируемостью и отказоустойчивостью и имеет более высокую пропускную способность по сравнению с другими брокерами сообщений, такими как ActiveMQ и RabbitMQ. Хотя он обычно используется в качестве системы рассылки сообщений издатель/подписчик, многие организации также используют этого брокера для агрегации логов, поскольку он предоставляет надежное хранение для публикуемых сообщений.

Система рассылки сообщений издатель/подписчик позволяет одному или нескольким производителям публиковать сообщения без учета количества потребителей или того, как эти потребители будут обрабатывать сообщения. Подписанные клиенты уведомляются автоматически об обновлениях и создании новых сообщений. Эта система является более эффективной и масштабируемой по сравнению с системами, где клиенты периодически выполняют опрос, чтобы определить, доступны ли новые сообщения.

В этом руководстве вы выполните защищенную установку и настройку Apache Kafka 2.1.1 на сервере Debian 10, а затем протестируете настройки посредством создания и получения сообщения Hello World. Затем вы сможете при желании провести установку KafkaT для мониторинга Kafka и настройки кластера Kafka с несколькими узлами.

Предварительные требования

Чтобы выполнить описанные ниже шаги, вам потребуется следующее:

  • Один сервер Debian 10, имеющий не менее 4 ГБ ОЗУ и пользователя sudo без привилегий root.
  • OpenJDK 11, установленный на вашем сервере. Kafka написан на языке Java и поэтому требует JVM.

Примечание. Установка без 4 ГБ оперативной памяти может вызвать сбои в работе Kafka, когда виртуальная машина Java (JVM) будет выдавать ошибку Out Of Memory при запуске.

Шаг 1 — Создание пользователя для Kafka

Поскольку Kafka может обрабатывать запросы через сеть, лучше всего создать для этого отдельного пользователя. Это позволит минимизировать ущерб для системы в случае взлома сервера Kafka. На этом шаге вы создадите отдельного пользователя kafka.

Выполните вход с помощью пользователя без прав root с привилегиями sudo и создайте пользователя kafka с помощью команды useradd:

$ sudo useradd kafka -m

Флаг -m гарантирует, что для пользователя будет создана домашняя директория. Домашняя директория /home/kafka будет выступать в качестве директории рабочего пространства для последующего выполнения команд.

Установите пароль с помощью команды passwd:

$ sudo passwd kafka

Введите пароль, который вы хотите использовать для этого пользователя.

Затем добавьте пользователя kafka в группу sudo с помощью команды adduser для предоставления ему необходимых привилегий для установки зависимостей Kafka:

$ sudo adduser kafka sudo

Ваш пользователь kafka готов к работе. Выполните вход в учетную запись с помощью команды su:

$ su -l kafka

После создания пользователя для Kafka можно перейти к загрузке и извлечению двоичных файлов Kafka.

Шаг 2 — Загрузка и извлечение двоичных файлов Kafka

На этом шаге мы выполним загрузку и извлечение двоичных файлов Kafka в специально отведенные для этого папки домашней директории пользователя kafka.

Сначала создайте директорию в /home/kafka с названием Downloads, чтобы сохранить там загруженные данные:

$ mkdir ~/Downloads

Затем установите curl с помощью команды apt-get для получения возможности загрузки удаленных файлов:

$ sudo apt-get update && sudo apt-get install curl

После получения соответствующего предупреждения введите Y для подтверждения загрузки curl.

После установки curl мы используем его для загрузки двоичных файлов Kafka:

$ curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz

Создайте директорию с названием kafka и замените ее на эту директорию. Она будет служить базовой директорией для установки Kafka:

$ mkdir ~/kafka && cd ~/kafka

Извлеките архив, который вы загрузили, с помощью команды tar:

$ tar -xvzf ~/Downloads/kafka.tgz --strip 1

Мы указали флаг --strip 1, чтобы обеспечить извлечение архива в директорию ~/kafka/, а не в другую директорию, например ~/kafka/kafka_2.12-2.1.1/.

После успешной загрузки и извлечения двоичных файлов мы можем перейти к настройке Kafka для удаления темы.

Шаг 3 — Настройка сервера Kafka

Поведение Kafka по умолчанию не позволяет нам удалять название темы, категории, группы или ветки, где были опубликованы сообщения. Для изменения нужно отредактировать файл конфигурации.

Опции конфигурации Kafka указаны в файле server.properties. Откройте файл в nano или вашем любимом редакторе:

$ nano ~/kafka/config/server.properties

Давайте добавим настройку, которая позволит нам удалять темы Kafka. Добавим в конец файла следующую выделенную строку:

~/kafka/config/server.properties

...
group.initial.rebalance.delay.ms

delete.topic.enable = true

Сохраните файл и закройте nano. После настройки Kafka можно создать файлы элементов systemd для запуска и активации Kafka при загрузке системы.

Шаг 4 — Создание файлов элементов systemd и запуск сервера Kafka

В этом разделе мы научимся создавать файлы элементов systemd для службы Kafka. Это поможет нам выполнять стандартные действия со службой, в том числе запускать, останавливать и перезапускать службу Kafka аналогично другим службам Linux.

Kafka использует службу ZooKeeper для управления состоянием кластера и конфигурациями. Она обычно используется в качестве неотъемлемого компонента распределенных систем. В этом обучающем руководстве мы научимся использовать Zookeeper для управления этими аспектами работы Kafka. Если вам требуется дополнительная информация, воспользуйтесь официальной документацией ZooKeeper.

Вначале мы создадим файл элемента для zookeeper:

$ sudo nano /etc/systemd/system/zookeeper.service

Введите следующее определение элемента в файле:

/etc/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

В разделе [Unit] указывается, что для запуска ZooKeeper требуется готовность сети и файловой системы.

В разделе [Service] указывается, что systemd использует файлы оболочки zookeeper-server-start.sh​​​​​​ и ​​​​​​zookeeper-server-stop.sh для запуска и остановки службы. Также указывается, что в случае аномального выхода ZooKeeper следует перезапускать.

Затем создайте сервисный файл systemd для kafka:

$ sudo nano /etc/systemd/system/kafka.service

Введите следующее определение элемента в файле:

/etc/systemd/system/kafka.service

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simpleUser=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

В разделе [Unit] указывается, что этот элемент является зависимым от zookeeper.service. Это позволит автоматически запускать zookeeper после запуска службы kafka.

В разделе [Service] указывается, что systemd использует файлы оболочки kafka-server-start.sh и kafka-server-stop.sh для запуска и остановки службы. В нем также указывается, что Kafka автоматически перезапускается в случае аварийного завершения работы.

После определения всех элементов выполните запуск Kafka с помощью следующей команды:

$ sudo systemctl start kafka

Чтобы убедиться, что сервер успешно запущен, проверьте логи журнала для элемента kafka:

$ sudo journalctl -u kafka

Вы увидите примерно следующий результат:

Output

Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.

Теперь у нас имеется сервер Kafka, прослушивающий порт 9092, который используется Kafka по умолчанию.

Мы запустили службу kafka, но если мы теперь перезагрузим сервер, она не запустится автоматически. Чтобы активировать запуск kafka при загрузке сервера, выполните следующую команду:

$ sudo systemctl enable kafka

После запуска и активации служб следует проверить установку.

Шаг 5 — Тестирование установки

Опубликуем и примем сообщение Hello World, чтобы проверить правильность работы сервера Kafka. Для публикации сообщений в Kafka требуется следующее:

  • Издатель, который позволяет публиковать записи и данные в темах.
  • Подписчик, который считывает сообщения и данные из темы.

Сначала создайте тему с названием TutorialTopic, введя следующую команду:

$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

Вы можете создать издателя из командной строки с помощью скрипта kafka-console-producer.sh. Ему требуется имя хоста сервера Kafka, порт и название темы в качестве аргументов.

Опубликуем строку Hello, World в теме TutorialTopic:

$ echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

Флаг --broker-list указывает список брокеров сообщений, которым будет отправлено сообщение, в данном случае это localhost:9092. Флаг --topic обозначает тему как TutorialTopic.

Затем вы можете создать подписчика Kafka с помощью скрипта kafka-console-consumer.sh. Команда ожидает получить в качестве аргументов имя хоста и номер порта сервера ZooKeeper.

Следующая команда получает сообщения из TutorialTopic. Обратите внимание, что использование флага --from-beginning позволяет получать сообщения, которые были опубликованы до запуска подписчика:

$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning

Флаг --bootstrap-server указывает список точек входа в кластер Kafka. В данном случае мы используем localhost:9092.

Теперь мы должны увидеть на терминале сообщение Hello, World:

Output

Hello, World

Скрипт продолжит работать и будет ждать, когда будут опубликованы новые сообщения для темы. Вы можете открыть новый терминал и запустить издателя для нескольких новых сообщений. Вы должны увидеть все сообщения в выводе подписчика. Дополнительную информацию по использованию Kafka можно найти в официальной документации по Kafka.

После окончания тестирование нажмите CTRL+C, чтобы остановить скрипт подписчика. Мы протестировали установку и теперь можем перейти к установке KafkaT, чтобы нам было удобнее администрировать кластер Kafka.

Шаг 6 — Установка KafkaT (необязательно)

KafkaT — это инструмент от Airbnb, который упрощает просмотр данных о кластере Kafka и выполняет некоторые административные задачи из командной строки. Поскольку он написан на Ruby, для его использования вам потребуется Ruby. Также вам потребуется пакет build-essential для создания других «бриллиантов», используемых в качестве зависимостей. Выполните установку с помощью apt:

$ sudo apt install ruby ruby-dev build-essential

Теперь вы можете выполнить установку KafkaT с помощью команды gem:

$ sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

Опция CFLAGS=-Wno-error=format-overflow отключает предупреждения о переполнении формата и требуется для зависимости ZooKeeper (зависимость KafkaT).

KafkaT использует .kafkatcfg в качестве файла конфигурации для определения директорий и логов на вашем сервере Kafka. Также нужно добавить запись, указывающую KafkaT путь к экземпляру ZooKeeper.

Создайте новый файл .kafkatcfg:

$ nano ~/.kafkatcfg

Добавьте следующие строки для указания требуемой информации о вашем сервере Kafka и экземпляре Zookeeper:

~/.kafkatcfg

{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}

Теперь вы можете использовать KafkaT. Для начала продемонстрируем, как вы можете использовать его, чтобы просмотреть подробную информацию обо всех разделах Kafka:

$ kafkat partitions

Результат будет выглядеть следующим образом:

На экране результатов будет показано сообщение TutorialTopic, а также внутренняя тема __consumer_offsets, используемая Kafka для хранения данных клиентов. Вы можете спокойно игнорировать строки, начинающиеся с __consumer_offsets.

Чтобы узнать больше о KafkaT, перейдите в соответствующий репозиторий на GitHub.

Мы установили KafkaT и теперь можем настроить Kafka на кластере Debian 10 для создания кластера из нескольких узлов.

Шаг 7 — Настройка многоузлового кластера (необязательно)

Если вы хотите создать кластер с несколькими брокерами, используя дополнительные серверы Debian 10, повторите шаги 1, 4 и 5 на каждом новом сервере. Также необходимо внести следующие изменения в файл ~/kafka/config/server.properties для каждой системы:

  • Изменим значение свойства broker.id так, что оно будет уникальным внутри всего кластера. Это свойство служит уникальным идентификатором каждого сервера в кластере и может содержать любую строку в качестве значения. Например, в качестве идентификаторов могут использоваться "server1""server2" и т. д.
  • Значение свойства zookeeper.connect следует изменить таким образом, чтобы все узлы указывали на один экземпляр ZooKeeper. Это свойство указывает адрес экземпляра ZooKeeper и имеет формат <HOSTNAME/IP_ADDRESS>:. Для этого обучающего руководства мы используем your_first_server_IP:2181, заменив your_first_server_IP IP-адресом уже настроенного сервера Debian 10.

Если вы хотите использовать несколько экземпляров ZooKeeper для вашего кластера, то значение свойства zookeeper.connect должно представлять собой одинаковую, разделенную запятыми строку со списком IP-адресов и номеров портов для всех экземпляров ZooKeeper.

Примечание. Если на сервере Debian 10 с установленным Zookeeper работает брандмауэр, необходимо открыть на нем порт 2181 для получения входящих запросов от других узлов кластера.

Шаг 8 — Ограничение для пользователя Kafka

Теперь, когда все установки выполнены, вы можете удалить права администратора пользователя kafka. Прежде чем сделать это, выйдите и войдите снова с помощью любого пользователя без прав root с привилегиями sudo. Если вы все еще используете один сеанс командной строки, который вы запустили в начале прохождения руководства, просто введите exit.

Удалите пользователя kafka из группы sudo:

$ sudo deluser kafka sudo

Чтобы дополнительно повысить безопасность вашего сервера Kafka, заблокируйте пароль пользователя kafka с помощью команды passwd. Это гарантирует, что никто не сможет напрямую выполнить вход на сервер с помощью этой учетной записи:

$ sudo passwd kafka -l

На данный момент только пользователь root или sudo может выполнить вход как пользователь kafka, воспользовавшись следующей командой:

$ sudo su - kafka

В будущем, если вы захотите разблокировать его, используйте passwd с флагом -u:

$ sudo passwd kafka -u

Вы успешно ограничили права администратора пользователя kafka.