Bin windows zookeeper server start bat config zookeeper properties

I’m learning about Microservice Architecture these days and I need to run Kafka to follow some tutorials. However, zookeeper-server-start on cmd which is the first step for running up Kafka doesn’t work for me.
It says ‘the syntax of the command is incorrect’

I followed process below

  1. download kafka and extarct
    (https://kafka.apache.org/downloads — kafka_2.11-2.1.0.tgz (binary download))

  2. open cmd and write command

These are what I’ve tried so far (at kafka directory)

  • bin\windows\zookeeper-server-start.bat zookeeper.properties
  • .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • \bin\windows\zookeeper-server-start.bat \config\zookeeper.properties
  • zookeeper-server-start.bat ../../config/zookeeper.properties

and so on… plus, kafka directory is not too long to cause the error.(C:\kafka)

This is an capture image. Second line means 'the syntax of the command is incorrect'

OneCricketeer's user avatar

OneCricketeer

181k19 gold badges134 silver badges249 bronze badges

asked Jan 11, 2019 at 17:27

devyoon's user avatar

5

Change the directory of Kafka to keep simple

Like:

From kafka_2.13-3.0.0 to kafka

It’s worked for me.

answered Jan 17, 2022 at 6:05

Imranmadbar's user avatar

ImranmadbarImranmadbar

4,7413 gold badges18 silver badges30 bronze badges

I have moved Kafka folder directly to a different directory and renamed long Kafka-version as well. After that, it works fine.

answered Jul 17, 2020 at 22:26

Batkhuyag Ochirkhuyag's user avatar

1

Let’s consider you have unzip kafka_xxx.tgz in folder (C:\kafka) then you can use as below command:
C:\kafka\bin\windows>zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
You may get message as «The syntax of the command is incorrect», But you can ignore it.
It will not cause any problem.

If you would like to verify zookeeper is running or not then you can open other command prompt and use below command:
C:\User> jps
Output:
12896 Jps
13264 QuorumPeerMain

answered Jan 11, 2019 at 23:03

Shambhu Nath's user avatar

1

Kafka in window
zookeeper-3.4.10\conf\zoo-sample.cfg
rename: zoo-sample.cfg to zoo.cfg and change into zoo.cfg

dataDir=C:\\Users\\Sumit\\zookeeper-3.4.10\\zookeeper

Now start{zookeeper bin folder}: zkserver


open kafka directory and type following code in cmd-promot

.\bin\windows\kafka-server-start.bat .\config\server.properties

answered Jan 22, 2019 at 12:35

Aarya's user avatar

AaryaAarya

1211 silver badge4 bronze badges

check for correct Path , use from English characters to Path and
Use a shorter address. example :

d:\kafka\

answered Jan 1, 2021 at 15:16

mehnet ali's user avatar

mehnet alimehnet ali

733 silver badges12 bronze badges

I put the kafka folder under c: drive like C:\kafka_2 and also moved the properties files to the windows folder (because it was saying invalid input).
Executed the command like : C:\kafka_2\bin\windows>zookeeper-server-start.bat zookeeper.properties,
and it worked like magic

answered Oct 16, 2022 at 2:42

ADR's user avatar

I faced the same issue as you and the way I solved it, I keep the directory to shortest as possible:
On my side: E:/kafka and the rest of the directories like (bin, config, etc..) will be inside kafka
I hope it works for you.

answered Jan 15 at 16:01

tonydn's user avatar

I have moved path to: .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

worked for me.

answered Jun 1 at 16:05

krishna's user avatar

1

I faced a similar issue [on windows].
tried changing the dataDir variable to

C://path-to-tmp/folder

.
It did not work.
Then changed the drive letter to small i.e.
c:/path-to-tmp/folder
and lo and behold it works like a charm.

True Story.

answered Jan 28, 2021 at 9:24

Vishal R's user avatar

Vishal RVishal R

1,2791 gold badge21 silver badges27 bronze badges

Время на прочтение
11 мин

Количество просмотров 205K

Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.

Зачем?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.

Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.

Установка и настройка ZooKeeper и Apache Kafka на Windows 10

Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.

Скачать свежею версию ZooKeeper можно с официального сайта.

Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.

Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0

Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;

Запускаем командную строку и пишем команду:

zkserver

Если всё сделано правильно, вы увидите примерно следующее.

Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads

В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.

Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.

Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.

Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:

 start zookeeper-server-start.bat

Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties 

Теперь всё должно стартануть нормально.

Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
timeout 10
start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.

Работа с kafka из IDEA

Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web

В итоге файл pom.xml должен выглядеть так:

Для того, чтобы отправлять сообщения, нам потребуется объект KafkaTemplate<K, V>. Как мы видим объект является типизированным. Первый параметр – это тип ключа, второй – самого сообщения. Пока оба параметра мы укажем как String. Объект будем создавать в классе-рестконтроллере. Объявим KafkaTemplate и попросим Spring инициализировать его, поставив аннотацию Autowired.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.

@RestController
@RequestMapping("msg")
public class MsgController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping
    public void sendOrder(String msgId, String msg){
        kafkaTemplate.send("msg", msgId, msg);
    }
}

Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.

Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.

@KafkaListener(topics="msg")

У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.

Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.

@EnableKafka
@SpringBootApplication
public class SimpleKafkaExampleApplication {

    @KafkaListener(topics="msg")
    public void msgListener(String msg){
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(SimpleKafkaExampleApplication.class, args);
    }
}

Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.

spring.kafka.consumer.group-id=app.1

Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.

Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.

Усложняем проект

Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.

Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture<SendResult<K, V>>. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.

@PostMapping
public void sendMsg(String msgId, String msg){
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg);
    future.addCallback(System.out::println, System.err::println);
    kafkaTemplate.flush();
}

Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…

Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.

Если мы попробуем вручную создать объект KafkaTemplate, то увидим, что в конструктор в качестве параметра передаётся объект интерфейса ProducerFactory<K, V>, например DefaultKafkaProducerFactory<>. Для того, чтобы создать DefaultKafkaProducerFactory, нам нужно в его конструктор передать Map, содержащий настройки продюсера. Весь код по конфигурации и созданию продюсера вынесем в отдельный класс. Для этого создадим пакет config и в нём класс KafkaProducerConfig.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.

Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.

@Data
public class UserDto {
    private Long age;
    private String name;
    private Address address;
}

@Data
@AllArgsConstructor
public class Address {
    private String country;
    private String city;
    private String street;
    private Long homeNumber;
    private Long flatNumber;
}

Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.

@Configuration
public class KafkaProducerConfig {

    private String kafkaServer="localhost:9092";

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Long, UserDto> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, UserDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Отправим сообщение. В консоль будет выведена следующая строка:

Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.

@KafkaListener(topics="msg")
public void orderListener(ConsumerRecord<Long, UserDto> record){
    System.out.println(record.partition());
    System.out.println(record.key());
    System.out.println(record.value());
}

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.

Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String kafkaGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Long, UserDto> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
}

Класс KafkaConsumerConfig очень похож на KafkaProducerConfig, который мы создавали ранее. Здесь так же присутствует Map, содержащий необходимые конфигурации, например, такие как десериализатор для ключа и значения. Созданная мапа используется при создании ConsumerFactory<>, которая в свою очередь, нужна для создания KafkaListenerContainerFactory<?>. Важная деталь: метод возвращающий KafkaListenerContainerFactory<?> должен называться kafkaListenerContainerFactory(), иначе Spring не сможет найти нужного бина и проект не скомпилируется. Запускаем.

Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.

Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.

Apache Kafka is an open-source application used for real-time streams for data in huge amount. Apache Kafka is a publish-subscribe messaging system. A messaging system lets you send messages between processes, applications, and servers. Broadly Speaking, Apache Kafka is software where topics can be defined and further processed.

Downloading and Installation

Apache Kafka can be downloaded from its official site kafka.apache.org
Apache-kafka-Download

For the installation process, follow the steps given below:

Step 1: Go to the Downloads folder and select the downloaded Binary file.

Step 2: Extract the file and move the extracted folder to the directory where you wish to keep the files.

Step 3: Copy the path of the Kafka folder. Now go to config inside kafka folder and open zookeeper.properties file. Copy the path against the field dataDir and add /zookeeper-data to the path.

For example if the path is c:/kafka

Step 4: Now in the same folder config open server.properties and scroll down to log.dirs and paste the path. To the path add /kafka-logs

Step 5: This completes the configuration of zookeeper and kafka server. Now open command prompt and change the directory to the kafka folder. First start zookeeper using the command given below:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Step 6: Now open another command prompt and change the directory to the kafka folder. Run kafka server using the command:

.\bin\windows\kafka-server-start.bat .\config\server.properties


Now kafka is running and ready to stream data.

Last Updated :
28 Jun, 2022

Like Article

Save Article

Это вводная статья по использованию Apache Kafka со Spring Boot. О том, как установить и запустить Apache Kafka на Windows, создать топик и отправить туда первое событие из одного Spring Boot приложения. А получить его из  другого Spring Boot приложения.

Kafka запускается в ZooKeeper, но отдельно скачивать ZooKeeper не нужно, он идет вместе с Kafka.

Как запустить Kafka под Windows 10

  • Сначала нужно загрузить  и распаковать tgz-архив.
  • Нужно переименовать папку вида kafka_2.13-2.7.0 — дать более короткое имя, например kafka, иначе возникнут проблемы при запуске из консоли
  • Открыть Windows PowerShell (либо командную строку cmd.exe, неважно)
  • Перейти в переименованную папку kafka (внутри будут папки bin, config и другие папки и файлы)
  • Запустить ZooKeeper, набрав в консоли:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • Открыть вторую консоль, перейти в ту же переименованную папку kafka и запустить Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

Все, Apache Kafka запущен.

Теперь в нем можно создавать топики — как из консоли, так и из Spring Boot. Мы сделаем это из Spring Boot.

У нас будет два Spring Boot приложения. Они совершенно независимы друг от друга. Одно приложение будет отправлять событие в топик (это приложение Producer), а второе — получать их из топика (приложение Consumer).

Mаven-зависимость

Чтобы работать с Kafka, оба приложения (как Producer, так и Consumer) должны иметь зависимость:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Spring Boot Producer

Выглядит приложение просто:

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }
}

Бин topic типа NewTopic создает топик topic1, если его нет.

KafkaTemplate отправляет события в топик topic1. У нас это всего одна  строка  — test.

Перейдем к созданию второго приложения, которое получает события из Kafka.

Spring Boot Consumer

Это приложение слушает топик topic1:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

Если в топик приходит событие, то оно выводится в консоль.

То есть в консоль будет выведена одна строка:

test

Приложение Spring Boot Producer  завершилось быстро, а Spring Boot Consumer висит и слушает топик. Можно запустить Spring Boot Producer еще раз, тогда в консоли Spring Boot Consumer снова будет выведена строка:

test

Итоги

Код примеров доступен на GitHub.

Источники:

  • Spring for Apache Kafka
  • https://kafka.apache.org/intro

Learn to install Apache Kafka on Windows 10 and execute ‘start server‘ and ‘stop server‘ scripts related to Kafka and Zookeeper. We will also verify the Kafka installation by creating a topic, producing a few messages to it and then using a Kafka consumer to read the messages.

1. Prerequisites

  • Minimum Java 8 is required to run the latest downloads from the Kafka site. Install Java if you have not already.
  • Zookeeper (to store metadata about the Kafka cluster) is also mandatory. Kafka comes with an inbuilt Zookeeper to start with. But it is recommended to install Zookeeper separately in the production environment. Download it from its official site.
  • Kafka can be run on any operating system. Linux is the recommended OS. With Windows, Kafka has some known bugs.

This tutorial is for beginners and does not use separate zookeeper instance – to keep things simple and focused towards Kafka only.

2. Download and Install Kafka

  • Download Kafka from the official site. I download the latest version which is 2.5.0, and the file name is “kafka_2.12-2.5.0.tgz“.
  • Copy the downloaded file to some folder and extract it using tar command.
  • Copy the extracted folder to the desired location. I have put it on location “E:\devsetup\bigdata\kafka2.5“.
tar -xzf kafka_2.12-2.5.0.tgz

Installation is pretty much complete, now.

3. Startup and Shutdown

To start Kafka, we first start Zookeeper and then Kafka. I am writing small batch files which navigate to the Kafka installation directory first and then execute the command in the new command prompt window.

3.1. Start Zookeeper

To start zookeeper, we need to run zookeeper-server-start.bat script and pass the zookeeper configuration file path.

cd E:\devsetup\bigdata\kafka2.5
start cmd /k bin\windows\zookeeper-server-start.bat config\zookeeper.properties

3.2. Start Kafka

To start Kafka, we need to run kafka-server-start.bat script and pass broker configuration file path.

cd E:\devsetup\bigdata\kafka2.5
start cmd /k bin\windows\kafka-server-start.bat config\server.properties

3.3. Shutdown Kafka

To stop Kafka, we need to run kafka-server-stop.bat script.

cd E:\devsetup\bigdata\kafka2.5
start cmd /k bin\windows\kafka-server-stop.bat

3.4. Shutdown Zookeeper

To stop Zookeeper, we need to run zookeeper-server-stop.bat script.

cd E:\devsetup\bigdata\kafka2.5
start cmd /k bin\windows\zookeeper-server-stop.bat

Do not stop zookeeper and kafka using CTRL+C commands. Always use above .bat files or commands. Otherwise the data corruption may occur.

4. Verify Kafka Installation

First, start Zookeeper and Kafka using the above scripts.

Open a new command prompt, and create new Kafka topic.

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Created topic test.

Now list all the topics to verify the created topic is present in this list. At this step, we have only one topic.

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

test

Now list all the topics to verify the created topic is present in this list. At this step, we have only one topic.

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

test

Produce some messages and submit to test topic. I added two messages i.e. “Hello” and “Kafka”.

bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

Consume the messages and submit to test topic.

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

Hello
Kafka
Verify Kafka Installation

5. Conclusion

In this tutorial, we learned to install Kafka along with Zookeeper. We learned to start and stop both servers. Additionally, we verified the installation by creating a topic, posting some messages and then consuming using the console consumer script.

The important thing to note is that we should never stop the servers by killing processes or CTRL+C commands. Always use scripts to stop the servers.

Happy Learning !!

Sourcecode Download

  • Bigo live для компьютера скачать windows 10
  • Big time rush windows down скачать
  • Big sur тема для windows 10 скачать
  • Big sur windows 10 suite windows 10
  • Big sur theme for windows 10