Impala 使用Parquet文件格式


Impala 允许您创建、管理和查询 Parquet 表。Parquet 是一种面向列的二进制文件格式,旨在高效处理 Impala 最擅长的大规模查询类型。Parquet格式是用于查询扫描特定列的表内,例如,以查询特别好“宽”表具有很多列,或执行聚集操作,如SUM()和 AVG()需要处理从列大多数或所有的值。Impala 编写的每个 Parquet 数据文件都包含一组行(称为“行组”)的值)。在数据文件中,每列中的值都经过组织,因此它们都是相邻的,从而可以对该列中的值进行良好的压缩。对 Parquet 表的查询可以以最少的 I/O 快速检索和分析任何列中的这些值。

在 Impala 中创建Parquet格式表

要创建一个PARQUET_TABLE使用 Parquet 格式命名的表,您可以使用如下命令,替换您自己的表名、列名和数据类型:

[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;

或者,克隆现有表的列名和数据类型:

[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;

在 Impala 1.4.0 及更高版本中,即使没有现有的 Impala 表,您也可以从原始 Parquet 数据文件派生列定义。例如,您可以创建一个指向 HDFS 目录的外部表,并将列定义基于该目录中的文件之一:

CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET
  LOCATION '/user/etl/destination';

或者,您可以参考现有数据文件并使用合适的列定义创建一个新的空表。然后您可以使用INSERT创建新数据文件或 LOAD DATA将现有数据文件传输到新表中。

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET;

新创建的表的默认属性与任何其他CREATE TABLE语句的相同 。例如,默认文件格式是文本;如果您希望新表使用 Parquet 文件格式,请同时包含该STORED AS PARQUET文件。

在此示例中,新表按年、月和日进行分区。这些分区键列不是数据文件的一部分,因此您在CREATE TABLE语句中指定它们:

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  PARTITION (year INT, month TINYINT, day TINYINT)
  STORED AS PARQUET;

创建表后,要将数据插入该表,请使用类似于以下的命令,并再次使用您自己的表名:

[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;

如果 Parquet 表的列数或列名与其他表不同,请指定其他表中的列名,而不是 *SELECT语句中。

将数据加载到 Parquet 表中

从以下用于将数据加载到 Parquet 表的技术中进行选择,具体取决于原始数据是已经在 Impala 表中,还是作为原始数据文件存在于 Impala 之外。

如果您已经在 Impala 或 Hive 表中拥有数据,可能是不同的文件格式或分区方案,您可以使用 ImpalaINSERT...SELECT语法将数据传输到 Parquet 表 。作为同一INSERT语句的一部分,您可以对数据进行转换、过滤、重新分区和执行其他操作。

在插入分区表时,尤其是使用 Parquet 文件格式时,您可以在INSERT语句中包含一个提示,以微调操作的整体性能及其资源使用情况。

INSERTParquet 表的 任何语句都需要 HDFS 文件系统中有足够的可用空间来写入一个块。由于 Parquet 数据文件默认使用 1 GB 的块大小,INSERT如果您的 HDFS 空间不足,则可能会失败(即使是非常少量的数据)。

避免INSERT...VALUES了Parquet表语法,因为 INSERT...VALUES会为每一个单独的小数据文件 INSERT...VALUES语句,Parquet的强度是在处理数据(压缩,并行化,等等)的 大大块。

如果您有一个或多个在 Impala 之外生成的 Parquet 数据文件,您可以通过以下方法之一通过 Impala 快速查询数据:

  • LOAD DATA语句将单个数据文件或一个充满数据文件的目录移动到 Impala 表的数据目录中。它不验证或转换数据。原始数据文件必须在 HDFS 中的某个位置,而不是本地文件系统。
  • CREATE TABLE带有LOCATION子句 的语句创建一个表,其中数据继续驻留在 Impala 数据目录之外。原始数据文件必须在 HDFS 中的某个位置,而不是本地文件系统。为了更加安全,如果数据旨在长期保存并被其他应用程序重用,您可以使用CREATE EXTERNAL TABLE语法,以便 ImpalaDROP TABLE语句不会删除数据文件。
  • 如果 Parquet 表已经存在,可以直接将 Parquet 数据文件复制进去,然后使用REFRESH语句让 Impala 识别新添加的数据。请记住通过使用hadoop distcp -pb命令而不是对 Parquet 文件的-put或 -cp操作来保留 Parquet 数据文件的块大小 。

笔记:

目前,Impala 总是根据列的顺序位置来解码 Parquet 文件中的列数据,而不是通过根据名称查找每列的位置。在 Impala 之外生成的 Parquet 文件必须以与 Impala 表中声明的列相同的顺序写入列数据。从数据文件中省略的任何可选列必须是 Impala 表定义中最右边的列。

如果您通过 Impala 以外的其他工具创建压缩 Parquet 文件,请确保 Impala 支持 Parquet 中的任何压缩编解码器。例如,Impala 目前不支持 Parquet 文件中的 LZO 压缩。还要仔细检查您是否在其他工具中使用了任何推荐的兼容性设置,例如 spark.sql.parquet.binaryAsString通过 Spark 编写 Parquet 文件时。

最新版本的 Sqoop 可以使用该--as-parquetfile选项生成 Parquet 输出文件 。

如果数据存在于 Impala 之外并且是某种其他格式,则将上述两种技术结合起来。首先,使用LOAD DATAorCREATE EXTERNAL TABLE ... LOCATION语句将数据放入使用适当文件格式的 Impala 表中。然后,使用INSERT...SELECT语句将数据复制到 Parquet 表,在此过程中转换为 Parquet 格式。

将数据加载到 Parquet 表是一项内存密集型操作,因为传入的数据会被缓冲,直到达到一个数据块的大小,然后在写出之前在内存中组织和压缩该数据块。插入时数据到分区镶木表,因为一个单独的数据文件被用于分区键列值的每个组合编写的,潜在地需要若干内存消耗可以较大 大的块在存储器立刻被操纵。

当插入到分区的 Parquet 表时,Impala 会在节点之间重新分配数据以减少内存消耗。您可能仍需要在插入操作期间临时增加 Impala 专用内存,或将加载操作分解为多个INSERT语句,或两者兼而有之。

注意: 上述所有技术都假定您正在加载的数据与目标表的结构相匹配,包括列顺序、列名和分区布局。要转换或重组数据,首先将数据加载到匹配数据底层结构的 Parquet 表中,然后使用表复制技术之一,例如CREATE TABLE AS SELECTINSERT ... SELECT重新排序或重命名列,在多个分区之间划分数据,等等。例如,要将单个综合 Parquet 数据文件加载到分区表中,您可以使用INSERT ... SELECT带有动态分区的语句让 Impala 创建具有适当分区值的单独数据文件。

Impala Parquet 表的查询性能

Parquet 表的查询性能取决于处理查询的SELECT列表和WHERE子句所需的列数,将数据划分为块大小等于文件大小的大数据文件的方式,通过读取数据来减少 I/O压缩格式的每一列,可以跳过哪些数据文件(对于分区表),以及解压缩每列数​​据的 CPU 开销。

例如,以下是对 Parquet 表的有效查询:

select avg(income) from census_data where state = 'CA';

该查询仅处理大量总列中的 2 列。如果按STATE列对表进行分区,效率更高,因为查询只需要从每个数据文件中读取和解码1列,并且它可以只读取分区目录中'CA'的数据文件作为状态,跳过数据文件对于所有其他状态,这些状态将物理位于其他目录中。

下面是一个相对低效的 Parquet 表查询:

select * from census_data;

Impala 将不得不读取每个大数据文件的全部内容 ,并为每个行组解压缩每一列的内容,从而否定面向列格式的 I/O 优化。对于 Parquet 表,此查询可能仍然比具有其他文件格式的表更快,但它没有利用 Parquet 数据文件的独特优势。

Impala 可以优化 Parquet 表上的查询,尤其是连接查询,当所有表都有统计数据时,效果会更好。COMPUTE STATS 在将大量数据加载或附加到每个表后,为每个表发出语句。

Impala 2.5及更高版本中 提供的运行时过滤功能最适用于 Parquet 表。每行过滤方面仅适用于 Parquet 表。

在Impala 2.6及更高版本中,Impala 查询针对存储在 Amazon S3 中的文件进行了优化。对于使用文件格式的实木复合地板,ORC,RCFile,SequenceFile,Avro的,和未压缩文本因帕拉表中,设置 fs.s3a.block.size了在核心的site.xml 配置文件决定帕拉如何划分的读取数据文件的I / O工作。此配置设置以字节为单位指定。默认情况下,此值为 33554432 (32 MB),这意味着 Impala 将文件上的 S3 读取操作并行化,就好像它们由 32 MB 块组成一样。例如,如果您的 S3 查询主要访问由 MapReduce 或 Hive 编写的 Parquet 文件,则增加fs.s3a.block.size到 134217728 (128 MB) 以匹配这些文件的行组大小。如果大多数 S3 查询涉及 Impala 编写的 Parquet 文件,请增加到fs.s3a.block.size268435456 (256 MB) 以匹配 Impala 生成的行组大小。

从 Impala 3.4.0 开始,使用查询选项 PARQUET_OBJECT_STORE_SPLIT_SIZE来控制非块存储(例如 S3、ADLS 等)的 Parquet 拆分大小。默认值为 256 MB。

在Impala 2.9及更高版本中,Impala 编写的 Parquet 文件包含嵌入式元数据,用于指定每个列、每个行组内和行组内每个数据页的最小值和最大值。Impala 编写的 Parquet 文件通常包含单个行组;一个行组可以包含许多数据页。在查询期间读取每个 Parquet 数据文件时,Impala 使用此信息(当前,仅每个行组的元数据)来快速确定文件中的每个行组是否可能包含与WHERE子句中的条件匹配的任何行 。例如,如果列X在特定 Parquet 文件中,最小值为 1,最大值为 100,那么包含该子句的查询WHERE x > 200可以快速确定跳过该特定文件是安全的,而不是扫描所有关联的列值。这种优化技术对于将SORT BY子句用于最常检入WHERE子句的列的 表特别有效 ,因为INSERT对此类表的任何操作都会生成 Parquet 数据文件,每个文件中的列值范围相对较窄。

要在创建 Parquet 文件时禁止 Impala 写入 Parquet 页面索引,请将PARQUET_WRITE_PAGE_INDEX查询选项设置为FALSE.

Parquet 表的分区

Parquet 文件格式非常适合包含许多列的表,其中大多数查询仅引用列的一小部分。当您将 Parquet 表与分区结合使用时,这种方法的性能优势会被放大。根据WHERE引用分区键列的子句中的比较,Impala 可以完全跳过某些分区的数据文件。例如,查询分区表上经常分析基于诸如列的时间间隔的数据YEAR, MONTH和/或DAY,或地理区域。请记住 Parquet 数据文件使用大块大小,因此在决定数据分区的精细程度时,请尝试找到每个分区包含256 MB或更多数据的粒度,而不是创建大量拆分到多个分区的较小文件.

插入分区 Parquet 表可能是一项资源密集型操作,因为每个 Impala 节点可能会针对分区键列的不同值的每个组合将单独的数据文件写入 HDFS。大量同时打开的文件可能会超过 HDFS “收发器”限制。为避免超过此限制,请考虑以下技术:

  • 使用INSERT带有PARTITION子句特定值的单独语句加载不同的数据子集,例如 PARTITION (year=2010).
  • 增加HDFS的“收发器”值,有时拼写为“xcievers” (原文如此)。hdfs-site.xml配置文件中的属性值为dfs.datanode.max.transfer.threads. 例如,如果您要加载按年、月和日分区的 12 年数据,即使值 4096 也可能不够高。

Parquet 数据文件的压缩

当 Impala 使用该INSERT语句写入 Parquet 数据文件时,底层压缩由COMPRESSION_CODEC查询选项控制。(在 Impala 2.0 之前,查询选项名称为 PARQUET_COMPRESSION_CODEC。)此查询选项的允许值为snappy(默认)gzipzstd、 lz4、 和none。选项值不区分大小写。如果选项设置为无法识别的值,则所有类型的查询都将因选项设置无效而失败,而不仅仅是涉及 Parquet 表的查询。

带有 Snappy 压缩的 Parquet 表示例

默认情况下,Parquet 表的底层数据文件使用 Snappy 进行压缩。快速压缩和解压的结合使其成为许多数据集的不错选择。为了确保使用 Snappy 压缩,例如在尝试其他压缩编解码器之后,在插入数据之前将COMPRESSION_CODEC查询选项 设置为snappy

[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s

使用 GZip 压缩的 Parquet 表示例

如果您需要更密集的压缩(以在查询期间解压缩更多 CPU 周期为代价),请在插入数据之前将COMPRESSION_CODEC查询选项设置为gzip

[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s

未压缩 Parquet 表示例

如果您的数据压缩得很差,或者您想完全避免压缩和解压的 CPU 开销,请在插入数据之前将COMPRESSION_CODEC 查询选项设置为none

[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=none;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s

压缩Parquet的大小和速度示例

以下是一些示例,显示了 10 亿行合成数据的数据大小和查询速度差异,这些数据使用每种编解码器进行压缩。与往常一样,使用您自己的真实数据集运行类似的测试。实际压缩率以及相对插入和查询速度将根据实际数据的特征而有所不同。

在这种情况下,从 Snappy 切换到 GZip 压缩会使数据额外缩小 40% 左右,而从 Snappy 压缩切换到不压缩也会将数据扩展约 40%:

$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G  /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G  /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G  /user/hive/warehouse/parquet_compression.db/parquet_none

由于 Parquet 数据文件通常很大,因此每个目录将具有不同数量的数据文件,并且行组的排列方式也不同。

同时,压缩的力度越小,数据解压缩的速度就越快。在这种情况下,使用具有 10 亿行的表,评估特定列的所有值的查询在不压缩的情况下比使用 Snappy 压缩运行得更快,使用 Snappy 压缩比使用 Gzip 压缩运行得更快。查询性能取决于其他几个因素,因此一如既往,使用您自己的数据运行您自己的基准测试,以确定数据大小、CPU 效率以及插入和查询操作速度之间的理想权衡。

[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name      | type    | comment |
+-----------+---------+---------+
| id        | int     |         |
| val       | int     |         |
| zfill     | string  |         |
| name      | string  |         |
| assertion | boolean |         |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s

复制 Parquet 数据文件的示例

这是最后一个示例,用于说明使用各种压缩编解码器的数据文件如何相互兼容以进行读取操作。有关压缩格式的元数据写入每个数据文件,并且可以在查询期间进行解码,而不管当时有效的COMPRESSION_CODEC设置如何。在这个例子中,我们从数据文件拷贝 PARQUET_SNAPPYPARQUET_GZIP和 PARQUET_NONE在前面的例子中使用的表,每个含有1个十亿行,所有新表的数据目录 PARQUET_EVERYTHING。几个示例查询表明,新表现在包含 30 亿行,其中包含数据文件的各种压缩编解码器。

首先,我们在 Impala 中创建表,以便在 HDFS 中有一个目标目录来放置数据文件:

[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy

然后在shell中,我们将相关数据文件复制到这个新表的数据目录中。我们不是hdfs dfs -cp像使用典型文件那样使用,而是使用它hadoop distcp -pb来确保保留 Parquet 数据文件的特殊 块大小。

$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...

回到impala-shell解释器中,我们使用该 REFRESH语句来提醒 Impala 服务器注意此表的新数据文件,然后我们可以运行查询来证明数据文件表示 30 亿行,以及其中一个数字列的值匹配原始小表中的内容:

[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...

Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0        |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s

Impala 复杂类型的Parquet表

在帕拉2.3和更高,帕拉支持复杂类型 ARRAYSTRUCTMAP。在 Impala 3.2及更高版本中,Impala 还支持 ORC 中的这些复杂类型。目前仅 Parquet 或 ORC 文件格式支持这些复杂类型。由于 Impala 在 Parquet 上的性能比 ORC 更好,如果您打算使用复杂类型,请先熟悉 Parquet 的性能和存储方面。

与其他 Hadoop 组件交换 Parquet 数据文件

您可以从其他 Hadoop 组件读取和写入 Parquet 数据文件。

以前,无法通过 Impala 创建 Parquet 数据并在 Hive 中重用该表。现在 Hive 可以使用 Parquet 支持,在 Hive 中重用现有的 Impala Parquet 数据文件需要更新表元数据。如果您已经在运行 Impala 1.1.1 或更高版本,请使用以下命令:

ALTER TABLE table_name SET FILEFORMAT PARQUET;

如果您运行的 Impala 版本早于 1.1.1,请通过 Hive 执行元数据更新:

ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
  INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
  OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";

Impala 1.1.1 及更高版本可以重用 Hive 创建的 Parquet 数据文件,无需任何操作。

Impala 支持您可以在 Parquet 数据文件中编码的标量数据类型,但不支持复合或嵌套类型,例如映射或数组。在 Impala 2.2及更高版本中,Impala 可以查询包含复合或嵌套类型的 Parquet 数据文件,只要查询仅引用具有标量类型的列。

如果您在节点之间,甚至在同一节点的不同目录之间复制 Parquet 数据文件,请确保使用命令保留块大小hadoop distcp -pb。要验证块大小是否已保留,请发出命令 并检查平均块大小是否等于或接近256 MB(或查询选项定义的任何其他大小)。. (该操作通常会留下一些名称与_distcp_logs_*匹配的目录,之后您可以从目标目录中删除这些目录。)发出命令hadoop distcp以获取有关distcp命令语法的详细信息 。 hdfs fsck -blocks HDFS_path_of_impala_table_dirPARQUET_FILE_SIZEhadoop distcp

帕拉可以查询使用Parquet文件PLAIN, PLAIN_DICTIONARYBIT_PACKEDRLE 和RLE_DICTIONARY编码。RLE_DICTIONARY仅在Impala 4.0及更高版本中受支持。在 Impala 之外创建供 Impala 使用的文件时,请确保使用受支持的编码之一。特别是,对于 MapReduce 作业, parquet.writer.version不得定义(尤其是 PARQUET_2_0)来编写 Parquet MR 作业的配置。使用默认版本(或格式)。默认格式 1.0 包括一些与旧版本兼容的增强功能。由于使用了RLE_DICTIONARY编码,Impala 可能无法使用使用 2.0 格式的数据。

要检查 Parquet 文件的内部结构和数据,可以使用 parquet-tools命令。确保此命令在您的 $PATH. (通常,它是从/usr/bin符号链接的;有时,根据您的安装设置,您可能需要将其放置在备用bin目录下。)此命令的参数允许您执行以下操作:

  • cat: 将文件内容打印到标准输出。在 Impala 2.3及更高版本中,您可以使用该-j 选项来输出 JSON。
  • head: 将文件的前几条记录打印到标准输出。
  • schema:打印文件的 Parquet 架构。
  • meta:打印文件页脚元数据,包括键值属性(如 Avro 架构)、压缩率、编码、使用的压缩和行组信息。
  • dump:打印所有数据和元数据。

使用parquet-tools -h查看使用信息的所有参数。以下是一些显示parquet-tools用法的示例:

$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...


$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...


$ parquet-tools schema sample.parq
message schema {
  optional int32 year;
  optional int32 month;
  optional int32 day;
  optional int32 dayofweek;
  optional int32 dep_time;
  optional int32 crs_dep_time;
  optional int32 arr_time;
  optional int32 crs_arr_time;
  optional binary carrier;
  optional int32 flight_num;
...


$ parquet-tools meta sample.parq
creator:             impala version 2.2.0-...

file schema:         schema
-------------------------------------------------------------------
year:                OPTIONAL INT32 R:0 D:1
month:               OPTIONAL INT32 R:0 D:1
day:                 OPTIONAL INT32 R:0 D:1
dayofweek:           OPTIONAL INT32 R:0 D:1
dep_time:            OPTIONAL INT32 R:0 D:1
crs_dep_time:        OPTIONAL INT32 R:0 D:1
arr_time:            OPTIONAL INT32 R:0 D:1
crs_arr_time:        OPTIONAL INT32 R:0 D:1
carrier:             OPTIONAL BINARY R:0 D:1
flight_num:          OPTIONAL INT32 R:0 D:1
...

row group 1:         RC:20636601 TS:265103674
-------------------------------------------------------------------
year:                 INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month:                INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day:                  INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek:            INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time:             INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time:         INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time:             INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time:         INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier:              BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num:           INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...

Parquet 数据文件的组织方式

尽管 Parquet 是一种面向列的文件格式,但不要期望为每一列找到一个数据文件。Parquet 将行的所有数据保存在同一个数据文件中,以确保行的列始终在同一节点上可用以进行处理。Parquet 的作用是设置大的 HDFS 块大小和匹配的最大数据文件大小,以确保 I/O 和网络传输请求适用于大批量数据。

在该数据文件中,一组行的数据被重新排列,以便第一列中的所有值都组织在一个连续的块中,然后是第二列中的所有值,依此类推。将同一列中的值彼此相邻让 Impala 对该列中的值使用有效的压缩技术。

笔记:

ImpalaINSERT语句使用与数据文件大小匹配的HDFS 块大小写入 Parquet 数据文件,以确保每个数据文件由单个 HDFS 块表示,并且可以在单个节点上处理整个文件,而无需任何远程读取。

如果您在 Impala 之外创建 Parquet 数据文件,例如通过 MapReduce 或 Pig 作业,请确保 HDFS 块大小大于或等于文件大小,以便保持“每个块一个文件”的关系。将dfs.block.sizedfs.blocksize属性设置得 足够大,使每个文件都适合单个 HDFS 块,即使该大小大于正常的 HDFS 块大小。

如果在文件复制期间将块大小重置为较低的值,您将看到涉及这些文件的查询的性能降低,并且该PROFILE 语句将显示某些 I/O 通过远程读取未达到最佳状态。

当 Impala 检索或测试特定列的数据时,它会打开所有数据文件,但只读取每个文件中包含该列值的部分。列值连续存储,最大限度地减少处理单个列中的值所需的 I/O。如果在SELECT 列表或WHERE子句中命名了其他列,则同一行中所有列的数据在同一数据文件中可用。

如果一条INSERT语句带来的数据少于 一个 Parquet 块的数据,则生成的数据文件小于理想情况。因此,如果您确实拆分 ETL 作业以使用多个 INSERT语句,请尝试将每个INSERT语句的数据量保持在 大约256 MB 或 256 MB 的倍数。

Parquet 数据文件的 RLE 和字典编码

Parquet 基于对实际数据值的分析,使用一些自动压缩技术,例如游程编码 (RLE) 和字典编码。一旦以紧凑形式对数据值进行编码,就可以可选地使用压缩算法进一步压缩编码数据。Impala 创建的 Parquet 数据文件可以使用 Snappy、GZip 或不压缩;Parquet 规范也允许 LZO 压缩,但目前 Impala 不支持 LZO 压缩的 Parquet 文件。

RLE 和字典编码是 Impala 自动应用于 Parquet 数据值组的压缩技术,以及应用于整个数据文件的任何 Snappy 或 GZip 压缩。这些自动优化可以为您节省传统数据仓库通常需要的时间和规划。例如,字典编码减少了将数字 ID 创建为较长字符串值的缩写的需要。

运行长度编码压缩重复数据值的序列。例如,如果许多连续行都包含相同的国家/地区代码值,则这些重复值可以由该值后跟连续出现次数的计数来表示。

字典编码采用列中存在的不同值,并以紧凑的 2 字节形式表示每个值,而不是原始值,原始值可能是几个字节。(额外的压缩应用于压缩的值,以节省额外的空间。)当列的不同值的数量小于 2**16 (16,384) 时,这种类型的编码适用。它不适用于BOOLEAN已经很短的数据类型列 。TIMESTAMP 列有时每行都有一个唯一值,在这种情况下,它们可能会很快超过不同值的 2**16 限制。对于每个数据文件,列内不同值的 2**16 限制被重置,因此如果几个不同的数据文件每个包含 10,000 个不同的城市名称,每个数据文件中的城市名称列仍然可以使用字典编码进行压缩。

压缩 Parquet 表的数据文件

如果您对 Parquet 表重用现有的表结构或 ETL 过程,您可能会遇到“许多小文件”的情况,这对于查询效率来说是次优的。例如,像这样的语句可能会产生低效组织的数据文件:

-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;

-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
  select year, month, day, url, referer, user_agent, http_code, response_time
  from web_stats;

以下技术可帮助您在 ParquetINSERT操作中生成大型数据文件 ,并压缩现有的过小的数据文件:

  • 插入分区 Parquet 表时,请使用静态分区 INSERT语句,其中分区键值指定为常量值。理想情况下,INSERT对每个分区使用单独的语句。
  • 您可以将NUM_NODES选项设置为 1 简要、期间 INSERTCREATE TABLE AS SELECT语句。通常,这些语句会为每个数据节点生成一个或多个数据文件。如果写入操作涉及少量数据、Parquet 表和/或分区表,则默认行为可能会产生许多小文件,而直觉上您可能只期望一个输出文件。SET NUM_NODES=1关闭写操作的“分布式”方面,使其更有可能只生成一个或几个数据文件。
  • 准备好减少您习惯使用传统分析数据库系统的分区键列的数量。
  • 不要期望 Impala 编写的 Parquet 文件填满整个 Parquet 块大小。在确定要写入每个 Parquet 文件的数据量时,Impala 会保守估计。通常,通过 Parquet 文件格式的压缩和编码技术,内存中未压缩数据在磁盘上的数量会大大减少。最终的数据文件大小取决于数据的可压缩性。因此,如果将256 MB的文本数据转换为 2 个 Parquet 数据文件,每个文件都小于 256 MB,这并不表示存在问题。

如果您不小心最终得到一个包含许多小数据文件的表,请考虑使用一种或多种上述技术并将所有数据通过CREATE TABLE AS SELECTorINSERT ... SELECT语句复制到新的 Parquet 表中。

为避免重写查询以更改表名,您可以采用始终针对视图运行重要查询的约定。更改视图定义会立即切换任何后续查询以使用新的基础表:

create view production_table as select * from table_with_many_small_files;
-- CTAS or INSERT...SELECT all the data into a more efficient layout...
alter view production_table as select * from table_with_few_big_files;
select * from production_table where c1 = 100 and c2 < 50 and ...;

Parquet 表的模式演变

模式演化是指使用语句ALTER TABLE ... REPLACE COLUMNS来更改表中的名称、数据类型或列数。您可以按如下方式对 Parquet 表执行模式演变:

  • ImpalaALTER TABLE语句从不更改表中的任何数据文件。在 Impala 方面,模式演变涉及根据新表定义解释相同的数据文件。某些类型的架构更改是有意义的并且可以正确表示。其他类型的更改无法以合理的方式表示,并在查询期间产生特殊的结果值或转换错误。
  • INSERT语句始终使用最新的表定义创建数据。如果您执行一系列INSERTand ALTER TABLE ... REPLACE COLUMNS语句,您最终可能会得到具有不同列数或内部数据表示的数据文件。
  • 如果使用ALTER TABLE ... REPLACE COLUMNS在末尾定义附加列,则在查询中使用原始数据文件时,这些最终列将被视为所有NULL值。
  • 如果您ALTER TABLE ... REPLACE COLUMNS用来定义比以前更少的列,那么在查询中使用原始数据文件时,数据文件中仍然存在的未使用列将被忽略。
  • 镶木表示TINYINTSMALLINT和 INT类型的内部相同的,所有的存储在32位整数。
    • 这意味着很容易将一TINYINT列提升为 SMALLINTINT,或将一SMALLINT 列提升为INT。这些数字在数据文件中完全相同,并且被提升的列不会包含任何超出范围的值。
    • 如果您将这些列类型中的任何一个更改为较小的类型,则任何超出新类型范围的值都会错误地返回,通常为负数。
    • 你不能改变TINYINTSMALLINT或 INTBIGINT,或周围的其他方法。尽管ALTER TABLE成功,但任何查询这些列的尝试都会导致转换错误。
    • 列的任何其他类型转换在查询期间都会产生转换错误。例如,INTSTRING、 FLOATDOUBLETIMESTAMP对 STRINGDECIMAL(9,0)对 DECIMAL(5,2)等。

您可能会发现在 Parquet 文件中,列的排列顺序与 Impala 表中的顺序不同。例如,您可能有一个 Parquet 文件,它是包含 columns 的表的一部分C1,C2,C3,C4,现在您想在具有 columns 的表中重用相同的 Parquet 文件C4,C2。默认情况下,Impala 期望数据文件中的列以与为表定义的列相同的顺序出现,这使得进行某些类型的文件重用或模式演变是不切实际的。在 Impala 2.6及更高版本中,查询选项 PARQUET_FALLBACK_SCHEMA_RESOLUTION=name允许 Impala 按名称解析列,从而处理数据文件中的乱序或额外列。例如:

create database schema_evolution;
use schema_evolution;
create table t1 (c1 int, c2 boolean, c3 string, c4 timestamp)
  stored as parquet;
insert into t1 values
  (1, true, 'yes', now()),
  (2, false, 'no', now() + interval 1 day);

select * from t1;
+----+-------+-----+-------------------------------+
| c1 | c2    | c3  | c4                            |
+----+-------+-----+-------------------------------+
| 1  | true  | yes | 2016-06-28 14:53:26.554369000 |
| 2  | false | no  | 2016-06-29 14:53:26.554369000 |
+----+-------+-----+-------------------------------+

desc formatted t1;
...
| Location:   | /user/hive/warehouse/schema_evolution.db/t1 |
...

-- Make T2 have the same data file as in T1, including 2
-- unused columns and column order different than T2 expects.
load data inpath '/user/hive/warehouse/schema_evolution.db/t1'
  into table t2;
+----------------------------------------------------------+
| summary                                                  |
+----------------------------------------------------------+
| Loaded 1 file(s). Total files in destination location: 1 |
+----------------------------------------------------------+

-- 'position' is the default setting.
-- Impala cannot read the Parquet file if the column order does not match.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=position;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to position

select * from t2;
WARNINGS:
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]

File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]

-- With the 'name' setting, Impala can read the Parquet data files
-- despite mismatching column order.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=name;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to name

select * from t2;
+-------------------------------+-------+
| c4                            | c2    |
+-------------------------------+-------+
| 2016-06-28 14:53:26.554369000 | true  |
| 2016-06-29 14:53:26.554369000 | false |
+-------------------------------+-------+

Parquet 表的数据类型注意事项

Parquet 格式定义了一组数据类型,其名称与对应的 Impala 数据类型的名称不同。如果您正在使用其他 Hadoop 组件(例如 Pig 或 MapReduce)准备 Parquet 文件,您可能需要使用 Parquet 定义的类型名称。下表列出了 Parquet 定义的类型和 Impala 中的等效类型。

原始类型

拼花类型

黑斑羚类型

二进制

细绳

布尔值

布尔值

双倍的

双倍的

漂浮

漂浮

INT32

情报局

INT64

大数据

INT96

时间戳

逻辑类型

Parquet 使用类型注释来扩展它可以存储的类型,通过指定应该如何解释基本类型。

Parquet 原始类型和注释

黑斑羚类型

用 UTF8 OriginalType 注释的 BINARY

细绳

用 STRING LogicalType 注释的 BINARY

细绳

用 ENUM OriginalType 注释的 BINARY

细绳

用 DECIMAL OriginalType 注释的 BINARY

十进制

用 TIMESTAMP_MILLIS OriginalType 注释的 INT64

时间戳(在Impala 3.2或更高版本中)

或者

BIGINT(为了向后兼容)

用 TIMESTAMP_MICROS OriginalType 注释的 INT64

时间戳(在Impala 3.2或更高版本中)

或者

BIGINT(为了向后兼容)

用 TIMESTAMP LogicalType 注释的 INT64

时间戳(在Impala 3.2或更高版本中)

或者

BIGINT(为了向后兼容)

复杂类型:

对于Impala 2.3及更高版本中可用的复杂类型(ARRAYMAP和 STRUCT),Impala 仅支持对 Parquet 表中的这些类型进行查询。