下载 最新的 Kafka 版本并解压:
$ tar -xzf kafka_2.13-3.0.0.tgz $ cd kafka_2.13-3.0.0
注意:您的本地环境必须安装 Java 8+。
运行以下命令以按正确顺序启动所有服务:
# Start the ZooKeeper service # Note: Soon, ZooKeeper will no longer be required by Apache Kafka. $ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话并运行:
# Start the Kafka broker service $ bin/kafka-server-start.sh config/server.properties
成功启动所有服务后,您将拥有一个基本的 Kafka 环境正在运行并可以使用。
Kafka 是一个分布式事件流平台,可让您在多台机器上读取、写入、存储和处理事件(在文档中也称为记录或 消息)。
示例事件包括支付交易、来自手机的地理定位更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件就是该文件夹中的文件。
因此,在编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:
$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092
Kafka 的所有命令行工具都有附加选项:运行kafka-topics.sh
不带任何参数的命令以显示使用信息。例如,它还可以向您显示 新主题的分区计数等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理就会以持久和容错的方式存储事件,只要您需要——甚至永远。
运行控制台生产者客户端将一些事件写入您的主题。默认情况下,您输入的每一行都会导致将单独的事件写入主题。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event
您可以随时停止生产者客户端Ctrl-C
。
打开另一个终端会话并运行控制台使用者客户端以读取您刚刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event
您可以随时停止消费者客户端Ctrl-C
。
随意尝试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看事件如何立即显示在您的消费者终端中。
因为事件持久地存储在 Kafka 中,所以它们可以被任意数量的消费者读取任意多次。您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。
您可能在现有系统(如关系数据库或传统消息传递系统)中拥有大量数据,以及许多已经使用这些系统的应用程序。 Kafka Connect允许您不断地将数据从外部系统摄取到 Kafka,反之亦然。因此,将现有系统与 Kafka 集成非常容易。为了使这个过程更容易,有数百个这样的连接器随时可用。
查看Kafka Connect 部分 ,了解有关如何持续将数据导入和导出 Kafka 的更多信息。
一旦您的数据作为事件存储在 Kafka 中,您就可以使用Java/Scala的Kafka Streams客户端库处理数据 。它允许您实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 服务器端集群技术的优势相结合,使这些应用程序具有高度的可扩展性、弹性、容错性和分布式。该库支持一次性处理、有状态操作和聚合、窗口、连接、基于事件时间的处理等等。
为了让您第一次尝试,以下是实现流行WordCount
算法的方法:
KStream<String, String> textLines = builder.stream("quickstart-events"); KTable<String, Long> wordCounts = textLines .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))) .groupBy((keyIgnored, word) -> word) .count(); wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
在卡夫卡流演示 和应用开发教程 演示如何代码并运行从开始到结束这样的流应用。
现在您已经到了快速入门的结尾,可以随意拆除 Kafka 环境——或者继续玩转。
Ctrl-C
,如果您还没有这样做的话。Ctrl-C
。Ctrl-C
.如果您还想删除本地 Kafka 环境的任何数据,包括您在此过程中创建的任何事件,请运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
您已成功完成 Apache Kafka 快速入门。
评论区(0)