Como podemos criar um tópico em Kafka a partir do IDE usando API

Como podemos criar um tópico em Kafka a partir do IDE usando API porque quando eu faço isso:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

eu entendo o erro:

bash: bin/kafka-create-topic.sh: No such file or directory
E eu segui a configuração do desenvolvimento como está.

Author: f_puras, 2013-06-05

11 answers

No Kafka 0. 8. 1+ -- a última versão do Kafka a partir de hoje -- você pode programaticamente criar um novo tópico via AdminCommand. A funcionalidade de CreateTopicCommand (parte do Kafka 0.8.0 antigo) que foi mencionada numa das respostas anteriores a esta pergunta foi transferida para AdminCommand.

Scala exemplo para Kafka 0.8.1:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

Criar dependências, usando o sbt como exemplo:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)

Editar: adicionou um exemplo Java para o Kafka 0. 9. 0. 0 (última versão a partir de Jan 2016).

Dependências Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

Código:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  }

}

EDIT 2: adicionou um exemplo Java para o Kafka 0.10.2.0 (última versão em abril de 2017).

Dependências Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>

Código:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  }

}
 67
Author: Michael G. Noll, 2017-04-28 07:48:46

A partir de 0. 11. 0, 0 só precisa de:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
Este artefacto agora contém o AdminClient (org.apache.kafka.clients.admin).

AdminClient pode lidar com muitas tarefas de administração do Kafka, incluindo a criação de tópicos:

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

AdminClient admin = AdminClient.create(config);

Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;

admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));

O resultado deste comando é um CreateTopicsResult, que pode usar para obter um Future para toda a operação ou para cada criação de tópico individual:

  • para ter um futuro para toda a operação, use CreateTopicsResult#all().
  • para obter Future s para todos os tópicos individualmente, use CreateTopicsResult#values().

Por exemplo:

CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();

Ou:

CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
    try {
        entry.getValue().get();
        log.info("topic {} created", entry.getKey());
    } catch (InterruptedException | ExecutionException e) {
        if (Throwables.getRootCause(e) instanceof TopicExistsException) {
            log.info("topic {} existed", entry.getKey());
        }
    }
}

KafkaFuture is "a flexible future which supports call chaining and other asynchronous programming patterns," and " will eventually become a thin shim on top of Java 8's CompletebleFuture."

 26
Author: Dmitry Minkovsky, 2017-07-17 14:33:31

Para criar um tópico através da API java e do Kafka 0. 8+ tente o seguinte,

Primeira importação abaixo da declaração

import kafka.utils.ZKStringSerializer$;

Crie um objecto para o ZkClient da seguinte forma,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
 13
Author: Jaya Ananthram, 2015-03-06 05:20:28
Podes tentar com o kafka.administrador.CreateTopicCommand scala classe para criar tópico a partir de código Java...fornecendo os argumentos necessários.
String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);

NB: deve adicionar as dependências maven para jopt-simple-4.5 & zkclient-0.1

 10
Author: Biks, 2016-08-08 09:50:01

Se estiver a usar o Kafka 0. 10. 0+, a criação de tópico a partir de Java requer o parâmetro de aprovação do tipo RackAwareMode. É um objeto de caso Scala, e obter a instância de Java é complicado (prova: Como "obter" um objeto de caso Scala de Java? por exemplo. Mas não é aplicável para o nosso caso).

Felizmente, o rackAwareMode é um parâmetro opcional. Ainda assim, o Java não suporta parâmetros opcionais. Como resolvemos isso? Aqui está uma solução:
AdminUtils.createTopic(zkUtils, topic, 1, 1, 
    AdminUtils.createTopic$default$5(),
    AdminUtils.createTopic$default$6());

Usa-o com o miguno. responde e podes ir.

 2
Author: Dmitriusan, 2017-05-23 12:10:43
Há algumas maneiras de a sua chamada não resultar.
  1. Se o seu aglomerado de Kafka não tivesse nós suficientes para suportar um valor de replicação de 3.

  2. Se existir um prefixo de localização do chroot, terá de o Adicionar depois do porto do tratador.

  3. Você não está na pasta de instalação do Kafka ao executar (esta é a pasta mais provável)

 1
Author: Gregory Patmore, 2013-12-19 23:16:51

De Kafka 0. 8 exemplo do produtor a amostra em baixo irá criar um tópico chamado page_visits e também irá começar a produzir se o atributo auto.create.topics.enable for definido como true (por omissão) no ficheiro Kafka Broker config

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,www.example.com,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        }
        producer.close();
   }
}
 1
Author: Hild, 2016-08-08 09:50:16

Criação de tópico em scala. Se estiver a correr no modo seudo no ficheiro de configuração do corretor

Auto.criar.topico.activar = true

Irá permitir a criação automática do tópico no servidor. Se isto for definido como true, então as tentativas de produzir, consumir ou obter metadados para um tópico inexistente irão automaticamente criá-lo com o Fator de replicação padrão e o número de partições.

Abaixo do Código do excerto.

import scala.util.Random
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date

class SimpleProducer {


  def sendmessages(){
  val rnd = new Random();

  val props = new Properties();  
  props.put("metadata.broker.list", "192.1.1.1:6667"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  //props.put("partitioner.class", "rtbi.dis.producers.SimplePartitioner")

  val config = new ProducerConfig(props);
  val producer = new Producer[String, String](config);
    for (event<-1 to 5000) { 
               val runtime = new Date().getTime;  
               val ip = "192.1.1.1" + rnd.nextInt(255); 
               val msg = runtime + ",www.example.com," + ip; 
               val data = new KeyedMessage[String, String]("mytopic", ip, msg); //here mytopic is a topic 
               producer.send(data);
        }
        producer.close();
  }
 }
  object SimpleProducer extends App{   
    val s= new SimpleProducer().sendmessages();
  }
 0
Author: madhu, 2016-02-22 17:19:12
De qual ides estás a tentar ?

Por favor indique a localização completa, abaixo está o comando do terminal que irá criar um tópico

  1. cd kafka/bin
  2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181
 0
Author: Sanket, 2016-08-08 09:51:04

Existe uma nova API Administrazkclient que podemos usar para gerir tópicos no servidor Kafka.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

Pode consultar este link para mais detalhes https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

 0
Author: Mahesh Mogal, 2018-07-02 18:18:20

A partir de Kafka 0.10.1 o Zkstringserializador mencionado por Michael é privado (para Scala). Você pode usar os métodos de fábrica createZkClient ou createZkClientAndConnection em ZkUtils.

Scala exemplo para Kafka 0.10.1:

import kafka.utils.ZkUtils

val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
  "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 
Então, cria o tópico como o Michael sugeriu:
import kafka.admin.AdminUtils

val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
 -1
Author: its_a_paddo, 2017-06-20 09:05:02