Ниже приведено пошаговое руководство, которое научит вас устанавливать и запускать Apache Zookeeper и Apache Kafka в ОС Windows.
Введение
В этой статье описывается, как настроить и запустить Apache Kafka в ОС Windows. Это руководство поможет вам установить Java и Apache Zookeeper.
Apache Kafka — это быстрая и масштабируемая очередь сообщений, которая может обрабатывать большие нагрузки чтения и записи, то есть операции, связанные с вводом-выводом. Для получения дополнительной информации см.http://kafka.apache.org. Поскольку Zookeeper может предоставлять надежные службы распределенной координации, Apache Kafka должен запускать экземпляр Zookeeper. Для получения дополнительной информации о Zookeeper, пожалуйста, проверьтеhttps://zookeeper.apache.org/。
Конкретные шаги по установке Kafka в Windows вы можете посмотреть в этом видео:https://youtu.be/OJKesEpO6ok
Скачать необходимые документы
- В зависимости от ОС и архитектуры процессора загрузите сервер JRE здесьhttp://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html
- Загрузите и установите 7-zip здесьhttp://www.7-zip.org/download.html
- Загрузите Zookeeper здесь и разархивируйте его с помощью 7-ziphttp://zookeeper.apache.org/releases.html
- Загрузите Kafka здесь и разархивируйте его с помощью 7-ziphttp://kafka.apache.org/downloads.html
В этом руководстве мы извлечем Zookeeper и Kafka на диск C, но вы также можете выбрать другие места. Здесь мы хотим использовать полную версию zookeeper, а не ту, что поставляется с Kafka, потому что это экземпляр Zookeeper с одним узлом. Вы также можете запустить Kafka в пакете с zookeeper, он находится в библиотеке \ kafka \ bin \ windows.
монтаж
A. Установка JDK
1. Запустите установку JRE, установите флажок «Изменить целевой путь» и нажмите «Установить».
2. Измените каталог установки, в имени папки не должно быть пробелов, например: C: \ Java \ jre1.8.0_xx \ (по умолчанию C: \ Program Files \ Java \ jre1.8.0_xx), а затем нажмите Далее.
3. Теперь нажмите «Панель управления» -> «Система» -> «Дополнительные параметры системы» -> «Переменные среды», чтобы открыть диалоговое окно «Переменные системной среды».
4. Нажмите кнопку новой пользовательской переменной в пользовательских переменных, затем введите JAVA_HOME в имя переменной и заполните значение переменной пользовательским путем jre. Как показано ниже:
Путь и версия Java могут меняться в зависимости от используемой версии Kafka.
5. Теперь нажмите ОК.
6. В только что открытом диалоговом окне «Переменные среды» есть столбец «Системные переменные», найдите в нем переменные пути.
7. Измените путь и введите «;% JAVA_HOME% \ bin», как показано ниже:
8. Подтвердите установку Java, откройте cmd, введите «java -version», вы должны увидеть только что установленную версию java.
Если приглашение командной строки похоже на изображение выше, продолжайте. В противном случае необходимо еще раз проверить, соответствует ли установленная версия архитектуре ОС (x86, x64) и правильный ли путь к переменной среды.
Б. Установка Zookeeper
1. Войдите в каталог настроек Zookeeper, например C: \ zookeeper-3.4.7 \ conf
2. Переименуйте «zoo_sample.cfg» в «zoo.cfg».
3. Откройте zoo.cfg в любом текстовом редакторе (например, в блокноте), я лично предпочитаю блокнот ++.
4. Найдите и отредактируйте dataDir = / tmp / zookeeper в: \ zookeeper-3.4.7 \ data.
5. Как и в Java, мы добавляем в системные переменные среды:
а. Добавьте ZOOKEEPER_HOME = C: \ zookeeper-3.4.7 в системные переменные
б. Отредактируйте системную переменную и назовите ее как путь Системная переменная% ZOOKEEPER_HOME% \ bin;
6. Измените порт Zookeeper по умолчанию (порт по умолчанию 2181) в файле zoo.cfg.
7. Откройте новый cmd, введите zkserver и запустите Zookeeper.
8. Приглашение командной строки выглядит следующим образом:
Поздравляем, Zookeeper готов и работает на порту 2181.
В. Установить Kafka
- Войдите в каталог конфигурации Kafka, например C: \ kafka_2.11-0.9.0.0 \ config.
- Отредактируйте файл server.properties
- Найдите и отредактируйте «log.dirs = / tmp / kafka-logs» на «log.dir = C: \ kafka_2.11-0.9.0.0 \ kafka-logs»
- Если Zookeeper запущен на другом компьютере или кластере, вы можете изменить «zookeeper.connect: 2181» на собственный IP-адрес и порт. В этой демонстрации мы использовали ту же машину, поэтому нет необходимости изменять ее. Также можно настроить порт Kafka и broker.id в файле. Остальные настройки остаются без изменений.
- Kafka по умолчанию будет работать на порту 9092 и подключаться к порту по умолчанию zookeeper: 2181.
D. Запустите сервер Kafka
Важно: перед запуском сервера Kafka убедитесь, что экземпляр Zookeeper готов и работает.
1. Войдите в каталог установки Kafka C: \ kafka_2.11-0.9.0.0 \
2. Нажмите Shift + правая клавиша, выберите параметр «Открыть окно команд», чтобы открыть командную строку.
3. Теперь введите. \ bin \ windows \ kafka-server-start.bat. \ config \ server.properties и нажмите Enter.
.\bin\windows\kafka-server-start.bat .\config\server.properties
4. Если все в порядке, командная строка должна выглядеть так:
5. Теперь, когда Kafka готов и работает, вы можете создавать темы для хранения сообщений. Мы также можем генерировать или использовать данные из кода Java / Scala или непосредственно из командной строки.
E. Создайте тему
- Теперь создайте тему, назовите ее «test» и коэффициент репликации = 1 (потому что работает только 1 сервер Kafka). Если в кластере работает более одного сервера Kafka, вы можете соответственно увеличить коэффициент репликации, чтобы улучшить доступность данных и отказоустойчивость системы.
- Откройте новую командную строку в C: \ kafka_2.11-0.9.0.0 \ bin \ windows.
- Введите следующую команду и нажмите Enter:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
F. Создайте производителя и потребителя для тестирования сервера.
1. Откройте новую командную строку в C: \ kafka_2.11-0.9.0.0 \ bin \ windows.
2. Введите следующую команду, чтобы запустить производитель:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
3. Снова откройте новую командную строку в том же месте C: \ kafka_2.11-0.9.0.0 \ bin \ windows.
4. Теперь введите следующую команду для запуска потребителя:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
5. Теперь есть два окна командной строки, как показано ниже:
6. Введите любой контент в командную строку производителя и нажмите Enter, вы увидите соответствующее сообщение в других командных строках потребителя.
7. Если сообщение можно отправить потребителю и отобразить, установка Kafka завершена.
Некоторые полезные команды
1. Список тем: kafka-themes.bat –list –zookeeper localhost: 2181
2. Опишите тему: kafka-themes.bat –describe –zookeeper localhost: 2181 –topic [Название темы]
3. Прочтите сообщение с самого начала: kafka-console-consumer.bat –zookeeper localhost: 2181 –topic [Название темы] –from-begin
4. Удалите тему: kafka-run-class.bat kafka.admin.TopicCommand –delete –topic [topic_to_delete] –zookeeper localhost: 2181
https://dzone.com/articles/running-apache-kafka-on-windows-os
Исходный адрес:Setting Up and Running Apache Kafka on Windows OS
Автор статьи: Гопал Тивари
Переводчик: Сунь Вэй
Ответственный редактор: Чжун Хао
Время на прочтение
11 мин
Количество просмотров 205K
Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.
Зачем?
Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.
Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.
Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.
Установка и настройка ZooKeeper и Apache Kafka на Windows 10
Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.
Скачать свежею версию ZooKeeper можно с официального сайта.
Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.
Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0
Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;
Запускаем командную строку и пишем команду:
zkserver
Если всё сделано правильно, вы увидите примерно следующее.
Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads
В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.
Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.
В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.
Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.
Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:
start zookeeper-server-start.bat
Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:
start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties
Теперь всё должно стартануть нормально.
Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:
start kafka-server-start.bat c:\kafka\config\server.properties
Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:
start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
timeout 10
start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties
Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.
Работа с kafka из IDEA
Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web
В итоге файл pom.xml должен выглядеть так:
Для того, чтобы отправлять сообщения, нам потребуется объект KafkaTemplate<K, V>. Как мы видим объект является типизированным. Первый параметр – это тип ключа, второй – самого сообщения. Пока оба параметра мы укажем как String. Объект будем создавать в классе-рестконтроллере. Объявим KafkaTemplate и попросим Spring инициализировать его, поставив аннотацию Autowired.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.
@RestController
@RequestMapping("msg")
public class MsgController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping
public void sendOrder(String msgId, String msg){
kafkaTemplate.send("msg", msgId, msg);
}
}
Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.
Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.
@KafkaListener(topics="msg")
У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.
Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.
@EnableKafka
@SpringBootApplication
public class SimpleKafkaExampleApplication {
@KafkaListener(topics="msg")
public void msgListener(String msg){
System.out.println(msg);
}
public static void main(String[] args) {
SpringApplication.run(SimpleKafkaExampleApplication.class, args);
}
}
Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.
spring.kafka.consumer.group-id=app.1
Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.
Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.
Усложняем проект
Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.
Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture<SendResult<K, V>>. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.
@PostMapping
public void sendMsg(String msgId, String msg){
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg);
future.addCallback(System.out::println, System.err::println);
kafkaTemplate.flush();
}
Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:
SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]
Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…
Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.
Если мы попробуем вручную создать объект KafkaTemplate, то увидим, что в конструктор в качестве параметра передаётся объект интерфейса ProducerFactory<K, V>, например DefaultKafkaProducerFactory<>. Для того, чтобы создать DefaultKafkaProducerFactory, нам нужно в его конструктор передать Map, содержащий настройки продюсера. Весь код по конфигурации и созданию продюсера вынесем в отдельный класс. Для этого создадим пакет config и в нём класс KafkaProducerConfig.
@Configuration
public class KafkaProducerConfig {
private String kafkaServer="localhost:9092";
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.
Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.
@Data
public class UserDto {
private Long age;
private String name;
private Address address;
}
@Data
@AllArgsConstructor
public class Address {
private String country;
private String city;
private String street;
private Long homeNumber;
private Long flatNumber;
}
Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.
@Configuration
public class KafkaProducerConfig {
private String kafkaServer="localhost:9092";
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<Long, UserDto> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, UserDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Отправим сообщение. В консоль будет выведена следующая строка:
Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.
@KafkaListener(topics="msg")
public void orderListener(ConsumerRecord<Long, UserDto> record){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
}
Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.
Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaServer;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Long, UserDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
}
Класс KafkaConsumerConfig очень похож на KafkaProducerConfig, который мы создавали ранее. Здесь так же присутствует Map, содержащий необходимые конфигурации, например, такие как десериализатор для ключа и значения. Созданная мапа используется при создании ConsumerFactory<>, которая в свою очередь, нужна для создания KafkaListenerContainerFactory<?>. Важная деталь: метод возвращающий KafkaListenerContainerFactory<?> должен называться kafkaListenerContainerFactory(), иначе Spring не сможет найти нужного бина и проект не скомпилируется. Запускаем.
Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.
Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.
Prerequisites:
Install JRE is mandatory because Zookeeper is written Java so we will need JDK as well.
Zookeeper installation:
Step 1: Download Zookeeper from apache official website:
Zookeeper Download
Step 2: Goto your Zookeeper config directory. For example my local driver like C:/zookeeper – 3.x.xx\conf
2.After that simply rename file “zoo_sample.cfg” to “zoo.cfg”
3.Open zoo.cfg in notepad++ for proper view compared to other notepads.
4.Then find & edit dataDir = /tmp/zookeeper to C:/zookeeper – 3.x.xx\data
5. In this step, we need to setup System Environment Variables for Java-like ZOOKEEPER _HOME and specific PATH
A. Here we need to add in new system variable ZOOKEEPER_HOME = C:\zookeeper-3.x.xx\data
B. After that edit system variable named “Path” add; %ZOOKEEPER_HOME%\bin;
6. In case if you want to change the Zookeeper port in zoo.cfg file In Zookeeper default port is 2181.
7. The open new command prompt and run the zkserver
Summary: In the Windows 10 operating system Zookeeper installation is very simple by following the above steps. In this article, we will provide a simple installation of Zookeeper with pictures for Windows 10 users. Here first download the Zookeeper zip file from Apache Zookeeper official website. Then configure the Zookeeper configuration file with the proper directory then setup JAVA environment variables for Zookeeper Home for accessing anywhere from the command prompt, in case it is not set up we need to go Zookeeper path then only run the zk server so Zookeeper_ HOME setup is mandatory.
Today, I will talk about how to install Apache ZooKeeper and run the instance of it.
Prerequisites: JRE 1.6 or higher required (for development purposes, I would install JDK instead 1.6 or higher) At the time of writing this blog, the current stable version of Java is 1.8 and that should work perfectly fine (I have 1.7.0_51)
NOTE: I noticed that some of my peers tend to forget to set the environment variables, so please remember to set them before proceeding.
1. Installing Apache ZooKeeper
1. Download Apache ZooKeeper. You can choose from any given mirror – http://www.apache.org/dyn/closer.cgi/zookeeper/
2. Extract it to where you want to install ZooKeeper. I prefer to save it in the C:\dev\tools
directory. Unless you prefer this way as well, you will have to create that directory yourself.
3. Set up the environment variable.
- To do this, first go to Computer, then click on the
System Properties
button. - Click on the
Advanced System Settings
link to the left. - On a new window pop-up, click on the
Environment Variables...
button. - Under the
System Variables
section, clickNew...
- For the
Variable Name
, type inZOOKEEPER_HOME. Variable Value
will be the directory of where you installed the ZooKeeper. Taking mine for example, it would beC:\dev\tools\zookeeper-3.x.x
. - Now we have to edit the
PATH
variable. SelectPath
from the list and clickEdit...
- It is VERY important that you DO NOT erase the pre-existing value of the
Path
variable. At the very end of the variable value, add the following:%ZOOKEEPER_HOME%\bin;
Also, each value needs to be separated by semicolon. - Once that’s done, click OK and exit out of them all.
That takes care of the ZooKeeper installation part. Now we have to configure it so the instance of ZooKeeper will run properly.
2. Configuring ZooKeeper Server
If you look at the <zookeeper-install-directory>
there should be a conf
folder. Open that up, and then you’ll see a zoo-sample.cfg
file. Copy and paste it in the same directory, it should produce a zoo-sample - Copy.cfg
file. Open that with your favorite text editor (Microsoft Notepad should work as well).
Edit the file as follows:
tickTime=2000 initLimit=5 syncLimit=5 dataDir=/usr/zookeeper/data clientPort=2181 server.1=localhost:2888:3888
NOTE: you really don’t need lines 2 (initLimit=5
), 3 (syncLimit=5
), and 6 (server.1=localhost:2888:3888
). They’re just there for a good practice purposes, and especially for setting up a multi-server cluster, which we are not going to do here.
Save it as zoo.cfg
. Also the original zoo-sample.cfg
file, go ahead and delete it, as it is not needed.
Next step is to create a myid
file. If you noticed earlier in the zoo.cfg
file, we wrote dataDir=/usr/zookeeper/data
. This is actually a directory you’re going to have to create in the C drive. Simply put, this is the directory that ZooKeeper is going to look at to identify that instance of ZooKeeper. We’re going to write 1 in that file.
So go ahead and create that usr/zookeeper/data
directory, and then open up your favorite text editor.
Just type in 1, and save it as myid
, set the file type as All files
. This may not be insignificant, but we are going to not provide it any file extension, this is just for the convention.
Don’t worry about the version-2
directory from the picture. That is automatically generated once you start the instance of ZooKeeper server.
At this point, you should be done configuring ZooKeeper. Now close out of everything, click the Start button, and open up a command prompt.
3. Test an Instance of Running ZooKeeper Server
Type in the following command: zkServer.cmd
and hit enter. You should get some junk like this that don’t mean much to us.
Now open up another command prompt in a new window. Type in the following command: zkCli.cmd
and hit enter. Assuming you did everything correctly, you should get [zk: localhost:2181<CONNECTED> 0]
at the very last line. See picture below:
If you are getting the same result, then you setup ZooKeeper server correctly. Thanks for reading, and happy zookeeping!
Apache Kafka is a distributed streaming platform. It provides a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can be used as a Messaging system like ActiveMQ or RabbitMQ. It supports fault tolerance using a replica set within the cluster. Kafka can be used for storing the data and stream processing to read the data in nearly real-time. The producers produce the message to topics. The consumer consumes the data from topics.
Kafka contains broker, topics, and replica set objects. Kafka support many clients include java, C++, Python, and more. https://cwiki.apache.org/confluence/display/KAFKA/Clients
Apache Kafka supports the following use case with many different domains including financial, IOT and more.
Messaging
Website Activity Tracking
Metrics
Log Aggregation
Stream Processing
Event Sourcing
Commit Log
Apache Kafka uses Zookeeper for managing the Kafka components in the cluster. ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All these kinds of services are used in some form or another by distributed applications. ZooKeeper is a consistent file system for configuration information.
Download the latest Kafka (2.5.0) from the following location
apache.org/dyn/closer.cgi?path=/kafka/2.5.0..
download the latest zookeeper (Ex 3.6.1) from the following location
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
download the latest released version of JDK 1.8
https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u252-b09.1/OpenJDK8U-jdk_x64_windows_hotspot_8u252b09.msi
Install JDK Install the open JDK and set the following variable in windows path
User variables JAVA_HOME= C:\java\openjdk\jdk-8.0.252.09-hotspot\
Append System variables Path variable ( Ex PATH=% JAVA_HOME%\bin;). Run the following command to confirm the java installation.
C:>java -version
openjdk version «1.8.0_252»
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)
Install Zookeeper
Extract zookeeper and open the zookeeper configuration file
Copy C:\software\apache-zookeeper-3.6.1-bin\conf\ zoo_sample.cfg too zoo.cfg.
Update the following property
Run the following command
C:\software\apache-zookeeper-3.6.1-bin\bin\zkServer.cmd
Make sure zookeeper runs successfully and listen the port 2181.
Install Kafka Extract Kafka and open the configuration files. The default configuration provided with the Kafka distribution is sufficient to run the single node Kafka. broker.id should be unique in the environment. the default stand-alone configuration uses a single broker only. Kafka uses the default listener on TCP port 9092. the log directory should be writable. Kafka persists all messages to disk specified in the log.dirs configuration.
Run the following command
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-server-start.bat ….\config\server.properties Make sure Kafka runs successfully.
Create topics and list topics The Kafka topics created automatically when auto.create.topics.enable configured. It creates by default using the following use cases
When a producer starts writing messages When a consumer starts reading messages When any client requests metadata
Create the topics
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-topics.bat —create —bootstrap-server localhost:9092 —replication-factor 1 —partitions 1 —topic test
Created topic test.
List the topics
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-topics.bat —list —bootstrap-server localhost:9092
test
Produce and consume Messages The producer writes the messages to the topic. The consumer consumes the messages from the topic. Apache Kafka deploys the build-in client for accessing the producer and consumer API. The command-line utility helps to produce the consume the messages without writing any code and helpful for testing the Kafka installations/inspect the Kafka components.
produce the message
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-console-producer.bat —bootstrap-server localhost:9092 —topic test
consume the messages
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-console-consumer.bat —bootstrap-server localhost:9092 —topic test —from-beginning
Apache Kafka in Spring Boot Application The command-line tools help to understand the concepts. The enterprise application uses high-level programming language to produce and consume the messages using the API. the following section uses Java and Spring boot application to produce and consume the messages.
Producer Kafka producer gives many flexible options for publishing the messages in different use cases. Example credit card transaction requirement doesn’t allow duplicate messages or lose any messages. The producer creates ProducerRecord and publishes the messages using producer API. The producer and consumer set key.serializer and value.serializer properties. The data serialize based on these properties. The Kafka client API provides a set of serialization options. It supports like Avro, Thrift, or Protobuf or custom serialization. if the system requires multiple versions of schema and the system expects to store the schema. We can use a common architecture pattern and use a Schema Registry like Confluent Schema Registry.
The producer set the acks parameter to control the message write policies. It supports write and does not wait for a reply, write a message to leader and returns, write to all in-sync replicas.
The following code set the properties for producers.
public static Properties getProducerProperties(String kafkaBootstrapServers) {
Properties producerProperties = new Properties();
producerProperties.put(«bootstrap.servers», kafkaBootstrapServers);
producerProperties.put(«acks», «all»);
producerProperties.put(«retries», 0);
producerProperties.put(«batch.size», 16384);
producerProperties.put(«linger.ms», 1);
producerProperties.put(«buffer.memory», 33554432);
producerProperties.put(«key.serializer», «org.apache.kafka.common.serialization.StringSerializer»);
producerProperties.put(«value.serializer», «org.apache.kafka.common.serialization.StringSerializer»);
return producerProperties;
}
The Kafka manages and writes the data to the appropriate partition based on the record details. If the messages were successfully written to Kafka, it will return a RecordMetadata object includes the topic, partition, and the offset. Otherwise, it will return an error. The producer sends messages Fire-and-forget, Synchronous, and ASynchronous send options.
private void sendMessagesToKafka(String kafkaBootstrapServers) {
KafkaProducer producer = new KafkaProducer<>(
SimpleKafkaUtility.getProducerProperties(kafkaBootstrapServers));
for (int index = 0; index < 10; index++) {
JSONObject jsonObject = new JSONObject();
try {
jsonObject.put(«index», index);
jsonObject.put(«message», «The index is now: » + index);
} catch (JSONException e) {
System.out.println(e.getMessage());
}
producer.send(new ProducerRecord<>(theTechCheckTopicName, «indexMessge»,jsonObject.toString()));
System.out.println(«Send Message to Kafka:»+ jsonObject.toString());
}
}
Consumer Kafka consumer provides many options to consume messages. The ConsumerRecords return from the poll and iterate the records. The consumer also sets bootstrap.servers, key.serializer, and value.serializer properties. The consumer uses a similar set of properties plus consumer group property. Kafka provides a consumer group which contains the group of consumers.
The following code set properties for consumers.
public static Properties getConsumerProperties(String kafkaBootstrapServers, String zookeeperGroupId) {
Properties consumerProperties = new Properties();
consumerProperties.put(«bootstrap.servers», kafkaBootstrapServers);
consumerProperties.put(«group.id», zookeeperGroupId);
consumerProperties.put(«zookeeper.session.timeout.ms», «6000»);
consumerProperties.put(«zookeeper.sync.time.ms»,»2000″);
consumerProperties.put(«auto.commit.enable», «false»);
consumerProperties.put(«auto.commit.interval.ms», «1000»);
consumerProperties.put(«consumer.timeout.ms», «-1»);
consumerProperties.put(«max.poll.records», «1»);
consumerProperties.put(«value.deserializer», «org.apache.kafka.common.serialization.StringDeserializer»);
consumerProperties.put(«key.deserializer», «org.apache.kafka.common.serialization.StringDeserializer»);
return consumerProperties;
}
The consumer consumes messages from one or more topics. The consumer supports regular expression for giving a set of topics. For example, If the producer publishes the message to topic T with partitions like P1,P2 and P3. The consumer group may contain a group of consumers. If the consumer group contains three consumers, each partition map with a separate consumer. The more time-consuming consumer can’t possibly keep up with the rate data flows into a topic and adding more consumers helps to increase the scalability. When the consumer group adds new consumers or shutdown due to failure, the consumer group rebalance the consumers. the process of moving partition ownership from one consumer to another is called a rebalance.
// Start consumer thread to read messages
Thread kafkaConsumerThread = new Thread(() -> {
System.out.println(«Starting Kafka consumer thread.»);
SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer(theTechCheckTopicName,
SimpleKafkaUtility.getConsumerProperties(kafkaBootstrapServers, zookeeperGroupId));
simpleKafkaConsumer.runSingleWorker();
});
kafkaConsumerThread.start();
public class SimpleKafkaConsumer {
private KafkaConsumer kafkaConsumer;
public SimpleKafkaConsumer(String theTechCheckTopicName, Properties consumerProperties) {
kafkaConsumer = new KafkaConsumer<>(consumerProperties);
kafkaConsumer.subscribe(Collections.singletonList(theTechCheckTopicName));
}
public void runSingleWorker() {
while (true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
String message = record.value();
System.out.println(«Received message: » + message);
// Commit the offset
Map commitMessage = new HashMap<>();
commitMessage.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
kafkaConsumer.commitSync(commitMessage);
System.out.println(«Offset committed to Kafka.»);
}
}
}
}
Reference
kafka.apache.org
docs.confluent.io/current/index.html
kafka.apache.org/documentation.html#produce..
kafka.apache.org/documentation.html#consume..