Druid入门指南 加载Kafka数据


本教程介绍了如何把Kafka数据加载在Druid中。

本教程默认使用micro-quickstart单机配置。

如您不清楚如何调取Druid安装包数据,请参考Apache Druid 启动

 

1 下载并启动Kafka

本教程将使用到的Kafka版本为2.1.0。

在终端输入以下命令,下载Kafka:

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

输入以下命令,启动Kafka Broker:

./bin/kafka-server-start.sh config/server.properties

输入以下命令,新建一个用于发送Kafka数据的主题,并命名为“wikipedia”:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

2 发送数据到Kafka

接下来为创建好的主题发送数据。

在Druid目录下,输入以下命令并运行:

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在Kafka目录下,输入以下命令并运行。{PATH_TO_DRUID}替换为Druid目录的路径:

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

上述命令将sample发送到先前创建名为wikipedia的Kafka主题。

现在,我们将在Druid中使用Kafka索引功能从新建主题中提取数据。

3 使用数据加载器(Data Loader)

浏览器访问 localhost:8888 ,然后点击控制台中的Load data

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

选择 Apache Kafka 然后点击 Connect data

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

在 Bootstrap servers 输入 localhost:9092, 在 Topic 输入 wikipedia

点击 Preview 后确保您看到的数据是正确的

数据定位后,您可以点击“Next: Parse data”来进入下一步。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

Druid的体系结构需要一个主时间列(内部存储为名为__time的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value) 。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。

点击”Next:…”两次完成 Transform 和 Filter 步骤。您无需在这些步骤中输入任何内容,因为使用摄取时间变换和过滤器不在本教程范围内。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

在 Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

在这里,您可以调整如何在Druid中将数据拆分为多个段。 由于这是一个很小的数据集,因此在此步骤中无需进行任何调整。

点击完成 Tune 步骤。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

在 Tune 步骤中,将 Use earliest offset 设置为 True,因为我们需要从流的开始位置消费数据。 其他地方不需要更改,然后进入到 Publish 步骤

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

我们将该数据源命名为 wikipedia-kafka

最后点击 Next 预览摄入规范:

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

您可以进入任务视图,重点关注新创建的supervisor。任务视图设置为自动刷新,请等待直到Supervisor启动了一个任务。

当一项任务开始运行时,它将开始处理其摄入的数据。

从标题导航到 Datasources 视图。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

当 wikipedia-kafka 数据源出现在这儿的时候就可以进行查询操作了。

注意:如果过了几分钟之后数据源还是没有出现在这里,可能是在 Tune 步骤中没有设置为从流的开始进行消费数据。

此时,您可以在 Query 视图中运行SQL查询了。

因为“wikipedia”是一个小数据集,因此你可以直接运行 SELECT * FROM "wikipedia-kafka" 来获取结果,不必使用“where”等语句进行过滤。

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

4 通过控制台提交supervisor

在控制台中点击Submit supervisor打开提交supervisor对话框:

Druid Kafka,加载kafka数据,Druid kafka数据,Druid加载kafka数据

粘贴以下规范后点击 Submit

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 2,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      }
    }
  }
}

这将启动supervisor,该supervisor继而产生一些任务,这些任务将开始监听传入的数据。

5 直接提交supervisor

为了直接启动服务,我们可以在Druid的根目录下运行以下命令来提交一个supervisor规范到Druid Overlord中。

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

如果supervisor被成功创建后,将会返回一个supervisor的ID,在本例中看到的是 {"id":"wikipedia"}

6 数据查询

数据被发送到Kafka流之后,立刻就可以被查询了。

7 清理数据

如果您希望学习其他的Druid教程,则需要关闭集群并通过删除Druid软件包下的var目录的内容来重置集群状态,因为其他教程将写入相同的”wikipedia”数据源。