1 概述
此连接器允许将Apache Kafka中的主题用作Presto中的表。
每条消息在Presto中显示为一行。
主题可以是实时的:行将在数据到达时出现并随着消息被丢弃而消失。如果在单个查询中多次访问同一个表(例如,执行自联接),这可能会导致异常的行为。
注意:支持Apache Kafka 2.3.1+。
2 配置Kafka连接器(Kafka Connector)
要配置Kafka连接器,请创建一个etc/catalog/kafka.properties包含以下内容的目录属性文件,并根据需要替换这些属性:
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
3 多个Kafka集群
您可以根据需要拥有任意数量的目录,因此如果您有其他Kafka集群,只需添加另一个etc/catalog具有不同名称的属性文件(确保以结尾.properties)。例如,如果您将属性文件命名为sales.properties,Presto将创建一个sales使用配置的连接器命名的目录。
4 配置属性
以下配置属性可用:
Preoperty名称
|
描述
|
kafka.table-names
|
目录提供的所有表的列表
|
kafka.default-schema
|
表的默认架构名称
|
kafka.nodes
|
Kafka集群中的节点列表
|
kafka.connect-timeout
|
连接到Kafka集群超时
|
kafka.max-poll-records
|
每次投票的最大记录数
|
kafka.max-partition-fetch-bytes
|
每次轮询一个分区的最大字节数
|
kafka.table-description-dir
|
包含主题描述文件的目录
|
kafka.hide-internal-columns
|
控制内部列是否是表架构的一部分
|
此目录提供的所有表的逗号分隔列表。表名可以是未限定的(简单名称),并将被放入默认模式(见下文)或使用模式名称(<schema-name>.<table-name>)进行限定。
对于此处定义的每个表,可能存在一个表描述文件(见下文)。如果不存在表描述文件,则在Kafka上以表名作为主题名,没有数据列映射到表中。该表仍将包含所有内部列(见下文)。
此属性是必需的;没有默认值,必须至少定义一张表。
定义架构,该架构将包含所有在没有限定架构名称的情况下定义的表。
此属性是可选的;默认为default。
hostname:portKafka数据节点的逗号分隔列表。
此属性是必需的;没有默认值,必须至少定义一个节点。
注意:即使这里只指定了一个子集,Presto仍然必须能够连接到集群的所有节点,因为消息可能仅位于特定节点上。
连接到数据节点的超时。一个繁忙的Kafka集群在接受连接之前可能需要相当长的时间;当看到由于超时而失败的查询时,增加这个值是一个很好的策略。
此属性是可选的;默认值为10秒(10s)。
来自Kafka的每个poll的最大记录数。
此属性是可选的;默认为500。
- kafka.max-partition-fetch-bytes
每次轮询一个分区的最大字节数
此属性是可选的;默认为1MB。
- kafka.table-description-dir
引用Presto部署中的一个文件夹,其中包含一个或多个.json包含表描述文件的JSON文件(必须以结尾)。
此属性是可选的;默认为etc/kafka。
- kafka.hide-internal-columns
除了表描述文件中定义的数据列之外,连接器还为每个表维护了许多附加列。如果这些列被隐藏,它们仍然可以在查询中使用,但不会显示在或中。DESCRIBE<table-name>SELECT*
此属性是可选的;默认为true。
5 内部列
对于每个定义的表,连接器维护以下列:
列名
|
类型
|
描述
|
_partition_id
|
大数据
|
包含此行的Kafka分区的ID。
|
_partition_offset
|
大数据
|
此行在Kafka分区内的偏移量。
|
_message_corrupt
|
布尔值
|
如果解码器无法解码此行的消息,则为真。如果为true,则从消息映射的数据列应视为无效。
|
_message
|
VARCHAR
|
消息字节为UTF-8编码字符串。这仅对文本主题有用。
|
_message_length
|
大数据
|
消息中的字节数。
|
_key_corrupt
|
布尔值
|
如果密钥解码器无法解码此行的密钥,则为真。如果为true,则从键映射的数据列应视为无效。
|
_key
|
VARCHAR
|
关键字节作为UTF-8编码字符串。这仅对文本键有用。
|
_key_length
|
大数据
|
密钥中的字节数。
|
对于没有表定义文件的表,_key_corrupt和_message_corrupt列将始终为false。、
6 表定义文件
Kafka仅将主题作为字节消息来维护,并将其留给生产者和消费者来定义应如何解释消息。对于Presto,必须将此数据映射到列中以允许对数据进行查询。
注意:
对于包含JSON数据的文本主题,完全可以不使用任何表定义文件,而是使用PrestoJSON函数和运算符来解析_message包含映射到UTF-8字符串的字节的列。但是,这非常麻烦,并且很难编写SQL查询。
表定义文件由表的JSON定义组成。
{
"tableName":...,
"schemaName":...,
"topicName":...,
"key":{
"dataFormat":...,
"fields":[
...
]
},
"message":{
"dataFormat":...,
"fields":[
...
]
}
}
区域
|
必需的
|
类型
|
描述
|
tableName
|
必需的
|
细绳
|
此文件定义的Presto表名。
|
schemaName
|
可选的
|
细绳
|
将包含表的架构。如果省略,则使用默认架构名称。
|
topicName
|
必需的
|
细绳
|
映射的Kafka主题。
|
key
|
可选的
|
JSON对象
|
映射到消息键的数据列的字段定义。
|
message
|
可选的
|
JSON对象
|
映射到消息本身的数据列的字段定义。
|
7 Kafka中的Key和Message
从Kafka0.8开始,主题中的每条消息都可以有一个可选的键。表定义文件包含用于将数据映射到表列的键和消息部分。
表定义中的每个key和message字段都是一个JSON对象,必须包含两个字段:
区域
|
必需的
|
类型
|
描述
|
dataFormat
|
必需的
|
细绳
|
为这组字段选择解码器。
|
fields
|
必需的
|
JSON数组
|
字段定义列表。每个字段定义都会在Presto表中创建一个新列。
|
每个字段定义都是一个JSON对象:
{
"name":...,
"type":...,
"dataFormat":...,
"mapping":...,
"formatHint":...,
"hidden":...,
"comment":...
}
区域
|
必需的
|
类型
|
描述
|
name
|
必需的
|
细绳
|
Presto表中列的名称。
|
type
|
必需的
|
细绳
|
列的Presto类型。
|
dataFormat
|
可选的
|
细绳
|
选择此字段的列解码器。默认为此行数据格式和列类型的默认解码器。
|
dataSchema
|
可选的
|
细绳
|
Avro架构所在的路径或URL。仅用于Avro解码器。
|
mapping
|
可选的
|
细绳
|
列的映射信息。这是特定于解码器的,见下文。
|
formatHint
|
可选的
|
细绳
|
为列解码器设置特定于列的格式提示。
|
hidden
|
可选的
|
布尔值
|
从和隐藏列。默认为.DESCRIBE<tablename>SELECT*false
|
comment
|
可选的
|
细绳
|
添加显示为的列注释。DESCRIBE<tablename>
|
键或消息的字段描述没有限制。
8 行解码
对于键和消息,解码器用于将消息和键数据映射到表列。
- Kafka连接器包含以下解码器:
- raw-不解释Kafka消息,原始消息字节范围映射到表列
- csv-Kafka消息被解释为逗号分隔的消息,字段映射到表列
- json-Kafka消息被解析为JSON并且JSON字段被映射到表列
- avro-基于Avro模式解析Kafka消息,并将Avro字段映射到表列
注意:如果表不存在表定义文件,dummy则使用不公开任何列的解码器。
9 raw解码器
原始解码器支持从Kafka消息或密钥读取原始(基于字节的)值并将其转换为Presto列。
- 对于字段,支持以下属性:
- dataFormat-选择转换的数据类型的宽度
- type-Presto数据类型(请参阅下表以获取支持的数据类型列表)
- mapping-<start>[:<end>];要转换的字节的开始和结束位置(可选)
该dataFormat属性选择转换的字节数。如果不存在,BYTE则假定。所有值都已签名。
支持的值是:
- BYTE-一个字节
- SHORT-两个字节(大端)
- INT-四个字节(大端)
- LONG-八字节(大端)
- FLOAT-四字节(IEEE754格式)
- DOUBLE-八字节(IEEE754格式)
该type属性定义了值映射到的Presto数据类型。
根据分配给列的Presto类型,可以使用不同的dataFormat值:
Presto数据类型
|
允许dataFormat值
|
BIGINT
|
BYTE,SHORT,INT,LONG
|
INTEGER
|
BYTE,SHORT,INT
|
SMALLINT
|
BYTE,SHORT
|
TINYINT
|
BYTE
|
DOUBLE
|
DOUBLE,FLOAT
|
BOOLEAN
|
BYTE,SHORT,INT,LONG
|
VARCHAR/VARCHAR(x)
|
BYTE
|
该mapping属性指定用于解码的密钥或消息中的字节范围。它可以是由冒号(<start>[:<end>])分隔的一个或两个数字。
如果只给出起始位置:
对于固定宽度类型,列将使用指定的适当字节数dateFormat(见上文)。
当VARCHARvalue被解码时,将从开始位置到消息结束的所有字节都将被使用。
如果给出开始和结束位置,则:
对于固定宽度类型,大小必须等于指定的字节数dataFormat。
使用VARCHAR开始(包含)和结束(不包含)之间的所有字节。
如果未mapping指定任何属性,则等效于将开始位置设置为0而未定义结束位置。
数字数据类型(BIGINT,INTEGER,SMALLINT,TINYINT,DOUBLE)的解码方案很简单。从输入消息中读取字节序列并根据以下任一条件进行解码:
大端编码(对于整数类型)
IEEE754格式(forDOUBLE)。
解码字节序列的长度由dataFormat.
对于VARCHAR数据类型,根据UTF-8编码解释字节序列。
10 csv解码器
CSV解码器使用UTF-8编码将表示消息或密钥的字节转换为字符串,然后将结果解释为CSV(逗号分隔值)行。
对于字段,必须定义type和mapping属性:
- type-Presto数据类型(请参阅下表以获取支持的数据类型列表)
- mapping-CSV记录中字段的索引
dataFormat和formatHint不受支持,必须省略。
下表列出了可用于type解码方案的受支持的Presto类型:
Presto数据类型
|
解码规则
|
BIGINT
INTEGER
SMALLINT
TINYINT
|
使用Java解码Long.parseLong()
|
DOUBLE
|
使用Java解码Double.parseDouble()
|
BOOLEAN
|
“真”字符序列映射到true;其他字符序列映射到false
|
VARCHAR/VARCHAR(x)
|
按原样使用
|
11 json解码器
JSON解码器根据以下内容将表示消息或密钥的字节转换为JSONRFC4627。请注意,消息或键必须转换为JSON对象,而不是数组或简单类型。
对于字段,支持以下属性:
- type-Presto类型的列。
- dataFormat-用于列的字段解码器。
- mapping-以斜线分隔的字段名称列表,用于从JSON对象中选择一个字段
- formatHint-仅用于custom-date-time,见下文
JSON解码器支持多个字段解码器,_default用于标准表列和许多基于日期和时间类型的解码器。
下表列出了Presto数据类型,可用作intype和匹配字段解码器,可通过dataFormat属性指定。
Presto数据类型
|
允许dataFormat值
|
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
BOOLEAN
VARCHAR
VARCHAR(x)
|
默认字段解码器(省略dataFormat属性)
|
TIMESTAMP
TIMESTAMPWITHTIMEZONE
TIME
TIMEWITHTIMEZONE
|
custom-date-time,iso8601,rfc2822,milliseconds-since-epoch,seconds-since-epoch
|
DATE
|
custom-date-time,iso8601,rfc2822,
|
12 默认字段解码器
这是支持所有Presto物理数据类型的标准字段解码器。字段值将被JSON转换规则强制转换为boolean、long、double或string值。对于非基于日期/时间的列,应使用此解码器。
13 日期和时间解码器
从JSON对象值变换为的PrestoDATE,TIME,或列,解码器特别必须使用所选择的一个字段定义的属性。TIMEWITHTIMEZONE`,“TIMESTAMPTIMESTAMPWITHTIMEZONEdataFormat
- iso8601-基于文本,将文本字段解析为ISO8601时间戳。
- rfc2822-基于文本,将文本字段解析为RFC2822时间戳。
custom-date-time-基于文本,根据Joda格式模式解析文本字段
- 通过formatHint属性指定。
- milliseconds-since-epoch-基于数字,将文本或数字解释为自纪元以来的毫秒数。
- seconds-since-epoch-基于数字,将文本或数字解释为自纪元以来的毫秒数。
对于和数据类型,如果解码值中存在时区信息,则它将在Presto值中使用。否则结果时区将设置为:TIMESTAMPWITHTIMEZONETIMEWITHTIMEZONEUTC。
14 avro解码器
Avro解码器根据模式转换表示Avro格式的消息或密钥的字节。该消息必须嵌入了Avro架构。Presto不支持无模式Avro解码。
对于密钥/消息,使用avro解码器,dataSchema必须定义。这应该指向需要解码的消息的有效Avro模式文件的位置。此位置可以是远程Web服务器或本地文件系统。如果无法从Presto协调器节点访问此位置,则解码器将失败。dataSchema:’http://example.org/schema/avro_data.avsc’dataSchema:’/usr/local/schema/avro_data.avsc’
对于字段,支持以下属性:
- name-Presto表中列的名称。
- type-Presto类型的列。
- mapping-以斜线分隔的字段名称列表,用于从Avro架构中选择一个字段。如果mapping在原始Avro架构中不存在指定的字段,则读取操作将返回NULL。
下表列出了可用于type等效Avro字段类型的受支持Presto类型。
Presto数据类型
|
允许的Avro数据类型
|
BIGINT
|
INT,LONG
|
DOUBLE
|
DOUBLE,FLOAT
|
BOOLEAN
|
BOOLEAN
|
VARCHAR/VARCHAR(x)
|
STRING
|
VARBINARY
|
FIXED,BYTES
|
ARRAY
|
ARRAY
|
MAP
|
MAP
|
15 Avro模式演变
Avro解码器支持具有向后兼容性的模式演化功能。通过向后兼容,可以使用较新的架构来读取使用较旧架构创建的Avro数据。Avro架构中的任何更改也必须反映在Presto的主题定义文件中。新添加/重命名的字段必须在Avro架构文件中具有默认值。
模式演化行为如下:
- 在新模式中添加的列:当表使用新模式时,使用旧模式创建的数据将产生默认值。
- 在新模式中删除的列:使用旧模式创建的数据将不再从已删除的列中输出数据。
- 列在新模式中重命名:这相当于删除列并添加新列,当表使用新模式时,使用旧模式创建的数据将产生默认值。
- 更改新模式中的列类型:如果Avro支持类型强制,则转换发生。不兼容的类型会引发错误。