SUM()AVG()
PARQUET_TABLE
[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;
[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET
LOCATION '/user/etl/destination';
INSERTLOAD DATA
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET;
CREATE TABLESTORED AS PARQUET
CREATE TABLE
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
PARTITION (year INT, month TINYINT, day TINYINT)
STORED AS PARQUET;
[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;
*SELECT
INSERT...SELECTINSERT
INSERT
INSERTINSERT
INSERT...VALUESINSERT...VALUESINSERT...VALUES
LOAD DATACREATE TABLELOCATIONCREATE EXTERNAL TABLEDROP TABLEREFRESHhadoop distcp -pb-put-cp笔记:
spark.sql.parquet.binaryAsString
--as-parquetfile
LOAD DATACREATE EXTERNAL TABLE ... LOCATIONINSERT...SELECT
INSERT
注意:CREATE TABLE AS SELECTINSERT ... SELECTINSERT ... SELECT
SELECTWHERE
select avg(income) from census_data where state = 'CA';
STATE'CA'
select * from census_data;
COMPUTE STATS
fs.s3a.block.sizefs.s3a.block.sizefs.s3a.block.size
PARQUET_OBJECT_STORE_SPLIT_SIZE
WHEREXWHERE x > 200SORT BYWHEREINSERT
PARQUET_WRITE_PAGE_INDEXFALSE
WHEREYEARMONTHDAY
INSERTPARTITIONPARTITION (year=2010)dfs.datanode.max.transfer.threadsINSERTCOMPRESSION_CODECPARQUET_COMPRESSION_CODECsnappygzipzstdlz4none
COMPRESSION_CODECsnappy
[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s
COMPRESSION_CODECgzip
[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s
COMPRESSION_CODECnone
[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=none;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s
$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G /user/hive/warehouse/parquet_compression.db/parquet_none
[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name | type | comment |
+-----------+---------+---------+
| id | int | |
| val | int | |
| zfill | string | |
| name | string | |
| assertion | boolean | |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s
COMPRESSION_CODECPARQUET_SNAPPYPARQUET_GZIPPARQUET_NONEPARQUET_EVERYTHING
[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy
hdfs dfs -cphadoop distcp -pb
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
REFRESH
[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...
Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0 |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s
ARRAYSTRUCTMAP
ALTER TABLE table_name SET FILEFORMAT PARQUET;
ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";
hadoop distcp -pbhdfs fsck -blocks HDFS_path_of_impala_table_dirPARQUET_FILE_SIZEhadoop distcp
PLAINPLAIN_DICTIONARYBIT_PACKEDRLERLE_DICTIONARYRLE_DICTIONARYparquet.writer.versionPARQUET_2_0RLE_DICTIONARY
$PATHbin
cat-jheadschemametadumpparquet-tools -h
$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools schema sample.parq
message schema {
optional int32 year;
optional int32 month;
optional int32 day;
optional int32 dayofweek;
optional int32 dep_time;
optional int32 crs_dep_time;
optional int32 arr_time;
optional int32 crs_arr_time;
optional binary carrier;
optional int32 flight_num;
...
$ parquet-tools meta sample.parq
creator: impala version 2.2.0-...
file schema: schema
-------------------------------------------------------------------
year: OPTIONAL INT32 R:0 D:1
month: OPTIONAL INT32 R:0 D:1
day: OPTIONAL INT32 R:0 D:1
dayofweek: OPTIONAL INT32 R:0 D:1
dep_time: OPTIONAL INT32 R:0 D:1
crs_dep_time: OPTIONAL INT32 R:0 D:1
arr_time: OPTIONAL INT32 R:0 D:1
crs_arr_time: OPTIONAL INT32 R:0 D:1
carrier: OPTIONAL BINARY R:0 D:1
flight_num: OPTIONAL INT32 R:0 D:1
...
row group 1: RC:20636601 TS:265103674
-------------------------------------------------------------------
year: INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month: INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day: INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek: INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time: INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time: INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time: INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time: INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier: BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num: INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...
笔记:
INSERT
dfs.block.sizedfs.blocksize
PROFILE
SELECTWHERE
INSERTINSERTINSERT
BOOLEANTIMESTAMP
-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;
-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
select year, month, day, url, referer, user_agent, http_code, response_time
from web_stats;
INSERT
INSERTINSERTNUM_NODESINSERTCREATE TABLE AS SELECTSET NUM_NODES=1CREATE TABLE AS SELECTINSERT ... SELECT
create view production_table as select * from table_with_many_small_files;
-- CTAS or INSERT...SELECT all the data into a more efficient layout...
alter view production_table as select * from table_with_few_big_files;
select * from production_table where c1 = 100 and c2 < 50 and ...;
ALTER TABLE ... REPLACE COLUMNS
ALTER TABLEINSERTINSERTALTER TABLE ... REPLACE COLUMNSALTER TABLE ... REPLACE COLUMNSNULLALTER TABLE ... REPLACE COLUMNSTINYINTSMALLINTINT
TINYINTSMALLINTINTSMALLINTINTTINYINTSMALLINTINTBIGINTALTER TABLEINTSTRINGFLOATDOUBLETIMESTAMPSTRINGDECIMAL(9,0)DECIMAL(5,2)C1,C2,C3,C4C4,C2PARQUET_FALLBACK_SCHEMA_RESOLUTION=name
create database schema_evolution;
use schema_evolution;
create table t1 (c1 int, c2 boolean, c3 string, c4 timestamp)
stored as parquet;
insert into t1 values
(1, true, 'yes', now()),
(2, false, 'no', now() + interval 1 day);
select * from t1;
+----+-------+-----+-------------------------------+
| c1 | c2 | c3 | c4 |
+----+-------+-----+-------------------------------+
| 1 | true | yes | 2016-06-28 14:53:26.554369000 |
| 2 | false | no | 2016-06-29 14:53:26.554369000 |
+----+-------+-----+-------------------------------+
desc formatted t1;
...
| Location: | /user/hive/warehouse/schema_evolution.db/t1 |
...
-- Make T2 have the same data file as in T1, including 2
-- unused columns and column order different than T2 expects.
load data inpath '/user/hive/warehouse/schema_evolution.db/t1'
into table t2;
+----------------------------------------------------------+
| summary |
+----------------------------------------------------------+
| Loaded 1 file(s). Total files in destination location: 1 |
+----------------------------------------------------------+
-- 'position' is the default setting.
-- Impala cannot read the Parquet file if the column order does not match.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=position;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to position
select * from t2;
WARNINGS:
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
-- With the 'name' setting, Impala can read the Parquet data files
-- despite mismatching column order.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=name;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to name
select * from t2;
+-------------------------------+-------+
| c4 | c2 |
+-------------------------------+-------+
| 2016-06-28 14:53:26.554369000 | true |
| 2016-06-29 14:53:26.554369000 | false |
+-------------------------------+-------+
原始类型
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
逻辑类型
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
复杂类型:
ARRAYMAPSTRUCT