1 前言
Druid中,所有的数据都会被分割成段文件,每个文件中最多能够有几百万行数据。在环境下加载数据的操作一般被称为“摄取”或“索引”,即从源数据系统读取数据并为该数据创建段文件的过程。
在Druid的数据摄取过程中,大多数的任务会由MiddleManager或Indexer进程来完成。
当然也有例外:Druid中页存在基于Hadoop的摄取操作,即使用Hadoop MapReduce在YARN上完成整个过程,即便如此,整个Hadoop的作业过程仍会由MiddleManager或Indexer进程来进行监控。
2 摄取方式
1)流式摄取
最推荐、也是最流行的流式摄取方法是直接从Kafka读取数据的Kafka索引服务。当然,您也可以根据需要选择Kinersis和Tranquility,下表简要比较了三种主要的流式摄取方法:
Method
|
Kafka
|
Kinesis
|
Tranquility
|
Supervisor类型
|
kafka
|
kinesis
|
N/A
|
如何工作
|
Druid直接从 Apache Kafka读取数据
|
Druid直接从Amazon Kinesis中读取数据
|
Tranquility, 一个独立于Druid的库,用来将数据推送到Druid
|
可以摄入迟到的数据
|
Yes
|
Yes
|
No(迟到的数据将会被基于 windowPeriod 的配置丢弃掉)
|
保证不重不丢(Exactly-once)
|
Yes
|
Yes
|
No
|
2)批量摄取
在使用“批量摄取”方法加载文件时,应使用一次性任务,其中有三个选项需要进行设置:index_parallel
(本地并行批任务)、index_hadoop
(基于hadoop)或index
(本地简单批任务)。
一般来说,如果本地批处理能满足您的需要时我们建议使用它,因为设置更为简单(它不依赖于外部Hadoop集群)。但仍有一些情况是基于Hadoop的批摄取会更好,例如,当您已经有一个正在运行的Hadoop集群并希望使用现有集群的集群资源进行批摄取时,可以选择这种方式。
下表对三种任务类型进行了简单比较:
方式
|
本地批任务(并行)
|
基于Hadoop
|
本地批任务(简单)
|
任务类型
|
index_parallel
|
index_hadoop
|
index
|
并行?
|
如果 inputFormat 是可分割的且 tuningConfig 中的 maxNumConcurrentSubTasks > 1, 则 Yes
|
Yes
|
No,每个任务都是单线程的
|
支持追加或者覆盖
|
都支持
|
只支持覆盖
|
都支持
|
外部依赖
|
无
|
Hadoop集群,用来提交Map-Reduce任务
|
无
|
输入位置
|
任何输入数据源
|
任何Hadoop文件系统或者Druid数据源
|
任何输入数据源
|
文件格式
|
任何输入格式
|
任何Hadoop输入格式
|
任何输入格式
|
Rollup modes
|
如果 tuningConfig 中的 forceGuaranteedRollup = true, 则为Perfect(最佳rollup)
|
总是Perfect(最佳rollup)
|
如果 tuningConfig 中的 forceGuaranteedRollup = true, 则为 Perfect(最佳rollup)
|
分区选项
|
可选的有Dynamic, hash-based 和 range-based 三种分区方式
|
通过partitionsSpec中指定 hash-based 和 range-based分区
|
可选的有Dynamic和hash-based二种分区方式
|
3 Druid数据模型
1)数据源
与传统RDBMS中的表类似,Druid的数据存储在数据源中。
Druid提供了一个与关系模型、时间序列模型类似的数据建模系统。
2)主时间戳列(Timestamp)
Druid Schema必须始终包含一个主时间戳。
主时间戳用于对数据进行分区和排序。
通过主时间戳,Druid能够快速识别、检索到与其相对应的数据,此外Druid还可以根据主时间戳对数据进行基于时间的简单操作,如删除、覆盖等。
主时间戳是基于timestampSpec
进行解析的。此外,granularitySpec
控制基于主时间戳的其他重要操作。无论从哪个输入字段读取主时间戳,它都将作为列名为__time
的数据列存储在Druid数据源中。
如果有多个时间戳列,则其他列存储可视作为辅助时间戳。
3)维度(Dimensions)
维度是按数据原有格式进行存储的列,用于在查询中对指定维度进行分组、筛选或聚合。
通过dimensionSpec
配置维度。
4)指标(Metrics)
指标是以聚合形式存储的列。
用户可以指定任意指标来对数据进行聚合操作。建议在启动rollup的环境下使用,因为:
- 即使需要保留摘要信息,也可以将多个数据行折叠为一行。在Rollup教程中,这用于将netflow数据折叠为每(
minute
,srcIP
,dstIP
)元组一行,同时保留有关总数据包和字节计数的聚合信息。
- 对于一些聚合器(特别是是近似聚合器)而言,即使在接收非汇总数据过程中进行局部计算,Druid也可以在查询过程中进行更加快速的计算。
通过metricsSpec
配置。
4 Rollup
1)什么是rollup
Druid可以在接收数据过程中对数据进行汇总操作,以对原始数据压缩至最小化的操作。
而Rollup则是一种汇总或预聚合的形式,它可以极大程度地减少需要存储的数据大小,从而减少数据行的数量级。
但在使用rollup进行汇总数据时,我们无法对某个单独事件进行查询。
- 当禁用rollup,Druid将按数据原样加载所有的行,且不进行任何形式的聚合。
- 当启用rollup,那么任何具有相同维度和时间戳的行(在基于
queryGranularity
的截断之后)都可以在Druid中折叠或汇总为一行。
- 系统默认的rollup模式为启用状态。
2)启用或者禁用rollup
Rollup由granularitySpec
中的rollup
选项进行配置。
如果你想让Druid按原样存储每条记录,而不需要任何汇总,将该值设置为false
。
3)最大化rollup比率
通过比较Druid中的行数与接收的事件数量,可以测量数据源的汇总率,汇总率越高说明rollup越奏效。
汇总率可以通过Druid SQL语句来查询得到:
SELECT SUM("cnt") / COUNT(*) * 1.0 FROM datasource
在这个查询中,cnt
引用在摄取数据时所指定的“count”类型指标。
最大化rollup过程中需要注意的地方:
- 一般来说,维度越少,其基数也越低。
- 为了不对rollup比率造成负面影响,可以使用Sketches来避免存储高基数维度。
- 在摄入时调整
queryGranularity
,例如使用PT5M
而不是PT1M
会增加Druid中两行具有匹配时间戳的可能性,并可以提高汇总率。
- 将相同的数据加载到多个Druid数据源中是有益的。有些用户选择创建禁用rollup(或启用rollup,但汇总率设为最小)的“完整”数据源和具有较少维度和较高汇总率的“压缩”数据源。当查询请求只涉及“压缩”数据集的维度时,查询过程能够在更快的时间内完成——这种方案只需稍微增加存储空间即可完成。
- 如果您使用的best-effort rollup,摄取配置不能保证完全汇总(perfect rollup),则可以通过切换到有保障的完全汇总选项,或在初始摄取后在后台重新编制(reindex)数据索引,从而潜在提高汇总率。
5)perfect rollup VS best-effort rollup
使用该rollup方式意味着输入数据在摄取时会被“完美”地聚合。
使用该rollup方式意味着输入的数据可能无法完全聚合,因此可能有多个段保存了具有相同时间戳和维度值的行。一般而言,选择best-effort rollup的用户是出于一定目的的,他们想要获取没有经过清晰步骤的数据。
5 分区
1)为什么分区
通常情况下,对数据进行分区能够优化压缩性能,往往会提高查询性能。
Druid中支持对指定时间块儿内的段文件做进一步的分区。一般而言,使用指定维度进行辅助分区能够改善一定的操作性能,这意味着具有该相同维度值的行将存储在一起并支持快速访问。
2)如何设置分区
并不是所有的摄入方式都支持显式的分区配置,也不是所有的方法都具有同样的灵活性。
在当前的Druid版本中,如果您是通过一个不太灵活的方法(如Kafka)进行初始摄取,那么您可以使用重新索引的技术(reindex),在初始摄取数据后再对其进行重新分区。这是一种强大的技术:即使您不断地从流中添加新数据, 也可以使用它来确保任何早于某个阈值的数据都得到最佳分区。
下表显示了每个摄取方法如何处理分区:
方法
|
如何工作
|
本地批
|
通过tuningConfig中的partitionsSpec
|
Hadoop批
|
通过tuningConfig中的partitionsSpec
|
Kafka索引服务
|
Druid中的分区是由Kafka主题的分区方式决定的。您可以在初次摄入后 重新索引的技术(reindex)以重新分区
|
Kinesis索引服务
|
Druid中的分区是由Kinesis流的分区方式决定的。您可以在初次摄入后 重新索引的技术(reindex)以重新分区
|
6 摄入规范
无论使用哪一种摄入方式,数据要么是通过一次性tasks或者通过持续性的supervisor(运行并监控一段时间内的一系列任务)来被加载到Druid中。
在任意一种情况下,task或者supervisor都需要在摄入规范中进行定义。
摄入规范包括以下三个主要的部分:
dataSchema
, 包含了 数据源名称
, 主时间戳列
, 维度
, 指标
和 转换与过滤
ioConfig
, 该部分告诉Druid如何去连接数据源系统以及如何去解析数据。
tuningConfig
, 该部分控制着每一种摄入方法的不同的特定调整参数
一个 index_parallel
类型任务的示例摄入规范如下:
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{ "type": "string", "page" },
{ "type": "string", "language" },
{ "type": "long", "name": "userId" }
]
},
"metricsSpec": [
{ "type": "count", "name": "count" },
{ "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
{ "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
],
"granularitySpec": {
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2013-08-31/2013-09-01"
]
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "examples/indexing/",
"filter": "wikipedia_data.json"
},
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{ "type": "path", "name": "userId", "expr": "$.user.id" }
]
}
}
},
"tuningConfig": {
"type": "index_parallel"
}
}
}
dataSchema 包含了以下部分:数据源名称、主时间戳列、维度、指标、转换与过滤
一个dataSchema的示例如下:
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{ "type": "string", "page" },
{ "type": "string", "language" },
{ "type": "long", "name": "userId" }
]
},
"metricsSpec": [
{ "type": "count", "name": "count" },
{ "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
{ "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
],
"granularitySpec": {
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2013-08-31/2013-09-01"
]
}
}
dataSource位于dataSchema -> dataSource中,简单的标识了数据将被写入的数据源的名称,示例如下:
"dataSource": "my-first-datasource"
timestampSpec位于dataSchema -> timestampSpec中,用来配置主时间戳,示例如下:
"timestampSpec": {
"column": "timestamp",
"format": "auto"
}
timestampSpec可以包含以下的部分:
字段
|
描述
|
默认值
|
column
|
要从中读取主时间戳的输入行字段。
不管这个输入字段的名称是什么,主时间戳总是作为一个名为”__time”的列存储在您的Druid数据源中
|
timestamp
|
format
|
时间戳格式,可选项有:
- iso: 使用”T”分割的ISO8601,像”2000-01-01T01:02:03.456″
- posix: 自纪元以来的秒数
- millis: 自纪元以来的毫秒数
- micro: 自纪元以来的微秒数
- nano: 自纪元以来的纳秒数
- auto: 自动检测ISO或者毫秒格式
- 任何Joda DateTimeFormat字符串
|
auto
|
missingValue
|
用于具有空或缺少时间戳列的输入记录的时间戳。应该是ISO8601格式,如“2000-01-01T01:02:03.456”。由于Druid需要一个主时间戳,因此此设置对于接收根本没有任何时间戳的数据集非常有用。
|
none
|
dimensionsSpec位于dataSchema -> dimensionsSpec,用来配置维度。示例如下:
"dimensionsSpec" : {
"dimensions": [
"page",
"language",
{ "type": "long", "name": "userId" }
],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
dimensionsSpec可以包括以下部分:
字段
|
描述
|
默认值
|
dimensions
|
维度名称或者对象的列表,在 dimensions 和 dimensionExclusions 中不能包含相同的列。
如果该配置为一个空数组,Druid将会把所有未出现在 dimensionExclusions 中的非时间、非指标列当做字符串类型的维度列。
|
[]
|
dimensionExclusions
|
在摄取中需要排除的列名称,在该配置中只支持名称,不支持对象。在 dimensions 和 dimensionExclusions 中不能包含相同的列。
|
[]
|
spatialDimensions
|
一个空间维度的数组
|
[]
|
在dimensions列的每一个维度可以是一个名称,也可以是一个对象。 提供一个名称等价于提供了一个给定名称的string类型的维度对象。例如: page 等价于 {“name”: “page”, “type”: “string”}。
维度对象可以有以下的部分:
字段
|
描述
|
默认值
|
type
|
string, long, float 或者 double
|
string
|
name
|
维度名称,将用作从输入记录中读取的字段名,以及存储在生成的段中的列名。
注意: 如果想在摄取的时候重新命名列,可以使用 transformSpec
|
none(必填)
|
createBitmapIndex
|
对于字符串类型的维度,是否应为生成的段中的列创建位图索引。创建位图索引需要更多存储空间,但会加快某些类型的筛选(特别是相等和前缀筛选)。仅支持字符串类型的维度。
|
true
|
- Inclusions and exclusions
Druid以两种可能的方式来解释 dimensionsSpec : normal 和 schemaless
当 dimensions 或者 spatialDimensions 为非空时, 将会采用正常的解释方式。 在该情况下, 前边说的两个列表结合起来的集合当做摄入的维度集合。
当 dimensions 和 spatialDimensions 同时为空或者null时候,将会采用无模式的解释方式。 在该情况下,维度集合由以下方式决定:
- 首先,从 inputFormat (或者 flattenSpec, 如果正在使用 )中所有输入字段集合开始
- 排除掉任何在 dimensionExclusions 中的列
- 排除掉在 timestampSpec 中的时间列
- 排除掉 metricsSpec 中用于聚合器输入的列
- 排除掉 metricsSpec 中任何与聚合器同名的列
- 所有的其他字段都被按照默认配置摄入为 string 类型的维度
metricsSpec位于dataSchema -> metricsSpec中,是一个在摄入阶段要应用的聚合器列表。 在启用了rollup时是很有用的,因为它将配置如何在摄入阶段进行聚合。
一个metricsSpec实例如下:
"metricsSpec": [
{ "type": "count", "name": "count" },
{ "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
{ "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
]
granularitySpec位于dataSchema -> granularitySpec, 用来配置以下操作:
- 通过 segmentGranularity 来将数据源分区到时间块
- 如果需要的话,通过 queryGranularity 来截断时间戳
- 通过 interval 来指定批摄取中应创建段的时间块
- 通过 rollup 来指定是否在摄取时进行汇总
- 除了 rollup, 这些操作都是基于主时间戳列
一个granularitySpec实例如下:
"granularitySpec": {
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2013-08-31/2013-09-01"
],
"rollup": true
}
transformSpec位于dataSchema -> transformSpec,用来摄取时转换和过滤输入数据。 一个transformSpec实例如下:
"transformSpec": {
"transforms": [
{ "type": "expression", "name": "countryUpper", "expression": "upper(country)" }
],
"filter": {
"type": "selector",
"dimension": "country",
"value": "San Serriffe"
}
}
除了这些属性之外,每个摄取方法都有自己的特定调整属性。