Введение
Apache Kafka — популярный распределенный брокер сообщений, предназначенный для обработки больших объемов данных в режиме реального времени. Кластер Kafka обладает высокой масштабируемостью и отказоустойчивостью, а также имеет гораздо более высокую пропускную способность, чем другие брокеры сообщений, такие как ActiveMQ и RabbitMQ. Хотя он обычно используется в качестве системы обмена сообщениями публикации/подписки, многие организации также используют его для агрегирования журналов, поскольку он предлагает постоянное хранилище для опубликованных сообщений.
Система обмена сообщениями публикации/подписки позволяет одному или нескольким производителям публиковать сообщения независимо от количества потребителей или того, как они будут обрабатывать сообщения. Подписанные клиенты автоматически уведомляются об обновлениях и создании новых сообщений.
В этом руководстве вы установите и настроите Apache Kafka 2.1.1 на сервере Debian 10. При необходимости вы установите KafkaT для мониторинга Kafka и настроите многоузловой кластер Kafka.
Предпосылки
Чтобы следовать этому руководству, вам понадобятся:
- Сервер Debian 10 с не менее 4 ГБ ОЗУ и пользователем без полномочий root с привилегиями sudo.
- OpenJDK 11 установлен на вашем сервере. Чтобы установить эту версию, следуйте инструкциям Как установить Java на Debian 10. Kafka написана на Java, поэтому для нее требуется JVM.
Примечание.Out Of Memory
Если вы хотите установить Apache Kafka на удаленный сервер, продолжайте чтение, в противном случае пропустите первый абзац «Подключение к серверу» и читайте следующий.
Подключение к серверу
Чтобы получить доступ к серверу, вам нужно знать IP-адрес. Вам также потребуется ваше имя пользователя и пароль для аутентификации. Чтобы подключиться к серверу как root, введите следующую команду:
ssh root@IP_DEL_SERVER
Далее вам нужно будет ввести пароль пользователя root.
Если вы не используете пользователя root, вы можете войти под другим именем пользователя с помощью той же команды, а затем изменить root на свое имя пользователя:
ssh nome_utente@IP_DEL_SERVER
Затем вам будет предложено ввести пароль пользователя.
Стандартный порт для подключения по ssh — 22, если ваш сервер использует другой порт, вам нужно будет указать его с помощью параметра -p, затем введите следующую команду:
ssh nome_utente@IP_DEL_SERVER -p PORTA
Шаг 1. Создайте пользователя для Kafka
Поскольку Kafka может обрабатывать запросы по сети, рекомендуется создать отдельного пользователя. Это сводит к минимуму ущерб компьютеру Debian в случае компрометации сервера Kafka. На этом шаге вы создадите выделенного пользователя kafka.
Войдя в систему как пользователь sudo без полномочий root, создайте пользователя с именем kafka
с помощью команды useradd
:
sudo useradd kafka -m
Флаг -m
гарантирует, что для пользователя будет создан домашний каталог. Этот домашний каталог, /home/kafka
, позже будет служить каталогом рабочей области для выполнения команд.
Установите пароль с помощью passwd
:
sudo passwd kafka
Введите пароль, который вы хотите использовать для этого пользователя.
Затем добавьте пользователя kafka в группу sudo
с помощью команды adduser
, чтобы у него были права, необходимые для установки зависимостей Kafka:
sudo adduser kafka sudo
Теперь ваш пользователь kafka готов. Войдите в эту учетную запись, su
:
su -l kafka
Теперь, когда вы создали конкретного пользователя Kafka, вы можете перейти к загрузке и извлечению двоичных файлов Kafka.
Шаг 2. Загрузите и распакуйте двоичные файлы Kafka.
На этом шаге вы загрузите и извлечете двоичные файлы Kafka в специальные папки в домашнем каталоге пользователя kafka.
Для начала создайте каталог Downloads
в /home/kafka
для хранения ваших загрузок:
mkdir ~/Downloads
Затем установите curl
с помощью apt-get
, чтобы вы могли загружать удаленные файлы:
sudo apt-get update && sudo apt-get install curl
При появлении запроса введите Y
, чтобы подтвердить загрузку curl
.
После установки curl
используйте его для загрузки двоичных файлов Kafka:
curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
Создайте каталог с именем kafka
и перейдите в этот каталог. Это будет базовый каталог установки Kafka:
mkdir ~/kafka && cd ~/kafka
Извлеките загруженный архив с помощью команды tar
:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
Флаг --strip 1
был указан для того, чтобы содержимое архива извлекалось в ~/kafka/
, а не в другой каталог внутри него, например ~/kafka/kafka_2.12-2.1.1/
.
Теперь, когда вы успешно загрузили и распаковали двоичные файлы, вы можете перейти к настройке Kafka, чтобы разрешить удаление темы.
Шаг 3. Настройте сервер Kafka.
Поведение Kafka по умолчанию не позволяет нам удалять тему, категорию, группу или имя канала, в котором могут публиковаться сообщения. Чтобы изменить это, вы отредактируете файл конфигурации.
Параметры конфигурации Kafka указаны в server.properties
. Откройте этот файл с помощью nano
или вашего любимого редактора:
nano ~/kafka/config/server.properties
Давайте добавим настройку, которая позволит нам исключить аргументы Кафки. Добавьте следующую выделенную строку в конец файла:
...
group.initial.rebalance.delay.ms
delete.topic.enable = true
Сохраните файл и выйдите из nano
. Теперь, когда вы настроили Kafka, вы можете создать файл systemd
диска для запуска и включения Kafka при запуске.
Шаг 4. Создайте файл диска Systemd и запустите сервер Kafka.
В этом разделе вы создадите файл диска systemd
для службы Kafka. Это поможет вам выполнять общие действия службы, такие как запуск, остановка и перезапуск Kafka, согласованно с другими службами Linux.
ZooKeeper — это служба, которую Kafka использует для управления состоянием и конфигурациями кластера. Он обычно используется в распределенных системах как неотъемлемый компонент. В этом руководстве вы будете использовать Zookeeper для управления этими аспектами Kafka. Если вы хотите узнать больше, посетите официальные документы ZooKeeper.
Сначала создайте юнит-файл для zookeeper
:
sudo nano /etc/systemd/system/zookeeper.service
Введите в файл следующее определение единицы измерения:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Раздел [Unit]
указывает, что ZooKeeper требует подключения к сети и что файловая система готова к запуску.
Раздел [Service]
указывает, что systemd
должен использовать файлы оболочки zookeeper-server-start.sh
и zookeeper-server-stop.sh
для запуска и остановки службы. Он также указывает, что ZooKeeper должен автоматически перезапускаться в случае аварийного завершения работы.
Затем создайте служебный файл systemd
для kafka
:
sudo nano /etc/systemd/system/kafka.service
Введите в файл следующее определение единицы измерения:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Раздел [Unit]
указывает, что этот файл модуля зависит от zookeeper.service
. Это обеспечит автоматический zookeeper
при запуске службы kafka
.
В разделе [Service]
указано, что systemd
должен использовать файлы оболочки kafka-server-start.sh
и kafka-server-stop.sh
для запуска и остановки службы. Он также указывает, что Kafka должен перезапускаться автоматически в случае аварийного завершения работы.
Теперь, когда единицы измерения определены, запустите Kafka с помощью следующей команды:
sudo systemctl start kafka
Чтобы убедиться, что сервер успешно запущен, проверьте журналы журнала для диска kafka
:
sudo journalctl -u kafka
Вы увидите вывод, подобный следующему:
-- Logs begin at Tue 2020-03-03 19:48:52 CET, end at Tue 2020-03-03 20:51:40 CET. --
Mar 03 20:51:37 TEST-SERVER-1 systemd[1]: Started kafka.service.
Теперь у вас есть сервер Kafka, прослушивающий порт 9092
, который является портом по умолчанию для Kafka.
Вы запустили службу kafka
, но если перезапустить сервер, она не запустится автоматически. Чтобы включить kafka
при запуске сервера, запустите:
sudo systemctl enable kafka
Теперь, когда вы запустили и включили службы, пришло время проверить установку.
Шаг 5. Проверьте установку
Мы публикуем сообщение Hello World
, чтобы убедиться, что сервер Kafka работает правильно. Для публикации сообщений в Kafka требуется:
- Производитель, позволяющий публиковать записи и данные по темам.
- Потребитель, который читает сообщения и данные из топиков.
Сначала создайте тему TutorialTopic
, набрав:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Вы можете создать производителя из командной строки, используя скрипт kafka-console-producer.sh
. Ожидайте имя хоста, порт и имя в качестве аргумента сервера Kafka.
Опубликуйте строку Hello, World
в теме TutorialTopic
, введя:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Флаг --broker-list
определяет список брокеров сообщений для отправки сообщения, в данном случае localhost:9092
. --topic
обозначает тему как TutorialTopic
.
Затем вы можете создать потребителя Kafka с помощью сценария kafka-console-consumer.sh
. Ожидайте имя хоста и порт сервера ZooKeeper и имя в качестве аргументов.
Следующая команда использует сообщения из TutorialTopic
. Обратите внимание на использование флага --from-beginning
, который позволяет использовать сообщения, опубликованные до запуска потребителя:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
--bootstrap-server
предоставляет список входных данных для кластера Kafka. В этом случае вы используете localhost:9092
.
Вы увидите Hello, World
в своем терминале:
Hello, World
Сценарий будет продолжать работать, ожидая публикации новых сообщений по теме. Не стесняйтесь открывать новый терминал и запускать продюсера, чтобы опубликовать еще несколько сообщений. Вы должны иметь возможность видеть их все в потребительском выводе. Если вы хотите узнать больше о том, как использовать Kafka, ознакомьтесь с официальной документацией Kafka.
Когда тест завершится, нажмите CTRL+C
, чтобы остановить потребительский скрипт. Теперь, когда вы протестировали установку, вы можете перейти к установке KafkaT, чтобы лучше управлять кластером Kafka.
Шаг 6. Установите KafkaT (необязательно)
KafkaT — это инструмент Airbnb, который позволяет легко просматривать сведения о кластере Kafka и выполнять определенные административные задачи из командной строки. Поскольку это драгоценный камень Ruby, вам понадобится Ruby для его использования. Вам также понадобится пакет build-essential
, чтобы иметь возможность создавать другие драгоценные камни, от которых он зависит. Установите их с помощью apt
:
sudo apt install ruby ruby-dev build-essential
Теперь вы можете установить KafkaT с помощью команды gem
:
sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
Параметр CFLAGS=-Wno-error=format-overflow
отключает предупреждения о переполнении формата и требуется для драгоценного камня ZooKeeper, который является зависимостью KafkaT.
KafkaT использует .kafkatcfg
в качестве файла конфигурации для определения каталогов установки и реестра сервера Kafka. В нем также должна быть запись, указывающая KafkaT на экземпляр ZooKeeper.
Создайте новый файл с именем .kafkatcfg
:
nano ~/.kafkatcfg
Добавьте следующие строки, чтобы указать необходимую информацию о вашем сервере Kafka и экземпляре Zookeeper:
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
Теперь вы готовы использовать KafkaT. Для начала вот как вы можете использовать его для просмотра сведений обо всех разделах Kafka:
kafkat partitions
Вы увидите следующий вывод:
OutputTopic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]...
Этот вывод показывает TutorialTopic
, а также __consumer_offsets
— внутренний аргумент, используемый Kafka для хранения информации о клиенте. Вы можете спокойно игнорировать строки, начинающиеся с __consumer_offsets
.
Чтобы узнать больше о KafkaT, обратитесь к репозиторию GitHub.
Теперь, когда у вас установлен KafkaT, вы можете дополнительно настроить Kafka на кластере серверов Debian 10 для создания многоузлового кластера.
Шаг 7. Настройте многоузловой кластер (необязательно)
Если вы хотите создать кластер с несколькими брокерами, используя несколько серверов Debian 10, повторите шаги 1, 4 и 5 на каждой из новых машин. Также внесите следующие изменения в файл ~/kafka/config/server.properties
для каждого нового сервера:
- Измените значение свойства
broker.id
, чтобы оно было уникальным в кластере. Это свойство однозначно идентифицирует каждый сервер в кластере и может иметь любую строку в качестве значения. Например,"server1"
,"server2"
и т. д. можно было бы использовать в качестве идентификаторов. - Измените значение свойства
zookeeper.connect
, чтобы все узлы указывали на один и тот же экземпляр ZooKeeper. Это свойство указывает адрес экземпляра ZooKeeper в формате<HOSTNAME/IP_ADDRESS>:<PORT>
. В этом руководстве вы должны использоватьyour_first_server_IP:2181
, заменивyour_first_server_IP
IP-адресом сервера Debian 10, который вы уже установили.
Если вы хотите иметь несколько экземпляров ZooKeeper для кластера, значение свойства zookeeper.connect
на каждом узле должно быть идентичной строкой, разделенной запятыми, в которой перечислены IP-адреса и номера портов всех экземпляров ZooKeeper.
Примечание.2181
Шаг 8. Ограничьте пользователя Kafka
Теперь, когда все установки завершены, вы можете удалить права администратора пользователя kafka
. Прежде чем продолжить, выйдите из системы и снова войдите в систему как любой другой пользователь sudo без полномочий root. Если вы все еще используете тот же сеанс оболочки, с которого начали это руководство, просто введите exit
.
Удалите пользователя kafka
из группы sudo:
sudo deluser kafka sudo
Чтобы еще больше повысить безопасность вашего сервера Kafka, заблокируйте пароль пользователя kafka
с помощью команды passwd
. Это гарантирует, что никто не сможет напрямую получить доступ к серверу, используя эту учетную запись:
sudo passwd kafka -l
На данный момент только пользователь root или sudo может войти в систему как kafka
, введя следующую команду:
sudo su - kafka
В будущем, если вы захотите его разблокировать, используйте команду passwd
с параметром -u
:
sudo passwd kafka -u
Теперь вы успешно ограничили права администратора пользователя kafka
.
Вывод
Apache Kafka теперь безопасно работает на вашем сервере Debian. Вы можете использовать его в своих проектах, создавая производителей и потребителей Kafka с помощью клиентов Kafka, доступных для большинства языков программирования. Чтобы узнать больше о Kafka, вы также можете обратиться к документации Apache Kafka.