SUM()
AVG()
PARQUET_TABLE
[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;
[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET
LOCATION '/user/etl/destination';
INSERT
LOAD DATA
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET;
CREATE TABLE
STORED AS PARQUET
CREATE TABLE
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
PARTITION (year INT, month TINYINT, day TINYINT)
STORED AS PARQUET;
[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;
*
SELECT
INSERT...SELECT
INSERT
INSERT
INSERT
INSERT
INSERT...VALUES
INSERT...VALUES
INSERT...VALUES
LOAD DATA
CREATE TABLE
LOCATION
CREATE EXTERNAL TABLE
DROP TABLE
REFRESH
hadoop distcp -pb
-put
-cp
笔记:
spark.sql.parquet.binaryAsString
--as-parquetfile
LOAD DATA
CREATE EXTERNAL TABLE ... LOCATION
INSERT...SELECT
INSERT
注意:CREATE TABLE AS SELECT
INSERT ... SELECT
INSERT ... SELECT
SELECT
WHERE
select avg(income) from census_data where state = 'CA';
STATE
'CA'
select * from census_data;
COMPUTE STATS
fs.s3a.block.size
fs.s3a.block.size
fs.s3a.block.size
PARQUET_OBJECT_STORE_SPLIT_SIZE
WHERE
X
WHERE x > 200
SORT BY
WHERE
INSERT
PARQUET_WRITE_PAGE_INDEX
FALSE
WHERE
YEAR
MONTH
DAY
INSERT
PARTITION
PARTITION (year=2010)
dfs.datanode.max.transfer.threads
INSERT
COMPRESSION_CODEC
PARQUET_COMPRESSION_CODEC
snappy
gzip
zstd
lz4
none
COMPRESSION_CODEC
snappy
[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s
COMPRESSION_CODEC
gzip
[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s
COMPRESSION_CODEC
none
[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=none;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s
$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G /user/hive/warehouse/parquet_compression.db/parquet_none
[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name | type | comment |
+-----------+---------+---------+
| id | int | |
| val | int | |
| zfill | string | |
| name | string | |
| assertion | boolean | |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s
COMPRESSION_CODEC
PARQUET_SNAPPY
PARQUET_GZIP
PARQUET_NONE
PARQUET_EVERYTHING
[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy
hdfs dfs -cp
hadoop distcp -pb
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
REFRESH
[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...
Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0 |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s
ARRAY
STRUCT
MAP
ALTER TABLE table_name SET FILEFORMAT PARQUET;
ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";
hadoop distcp -pb
hdfs fsck -blocks HDFS_path_of_impala_table_dirPARQUET_FILE_SIZEhadoop distcp
PLAIN
PLAIN_DICTIONARY
BIT_PACKED
RLE
RLE_DICTIONARY
RLE_DICTIONARY
parquet.writer.version
PARQUET_2_0
RLE_DICTIONARY
$PATH
bin
cat
-j
head
schema
meta
dump
parquet-tools -h
$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools schema sample.parq
message schema {
optional int32 year;
optional int32 month;
optional int32 day;
optional int32 dayofweek;
optional int32 dep_time;
optional int32 crs_dep_time;
optional int32 arr_time;
optional int32 crs_arr_time;
optional binary carrier;
optional int32 flight_num;
...
$ parquet-tools meta sample.parq
creator: impala version 2.2.0-...
file schema: schema
-------------------------------------------------------------------
year: OPTIONAL INT32 R:0 D:1
month: OPTIONAL INT32 R:0 D:1
day: OPTIONAL INT32 R:0 D:1
dayofweek: OPTIONAL INT32 R:0 D:1
dep_time: OPTIONAL INT32 R:0 D:1
crs_dep_time: OPTIONAL INT32 R:0 D:1
arr_time: OPTIONAL INT32 R:0 D:1
crs_arr_time: OPTIONAL INT32 R:0 D:1
carrier: OPTIONAL BINARY R:0 D:1
flight_num: OPTIONAL INT32 R:0 D:1
...
row group 1: RC:20636601 TS:265103674
-------------------------------------------------------------------
year: INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month: INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day: INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek: INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time: INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time: INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time: INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time: INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier: BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num: INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...
笔记:
INSERT
dfs.block.size
dfs.blocksize
PROFILE
SELECT
WHERE
INSERT
INSERT
INSERT
BOOLEAN
TIMESTAMP
-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;
-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
select year, month, day, url, referer, user_agent, http_code, response_time
from web_stats;
INSERT
INSERT
INSERT
NUM_NODES
INSERT
CREATE TABLE AS SELECT
SET NUM_NODES=1
CREATE TABLE AS SELECT
INSERT ... SELECT
create view production_table as select * from table_with_many_small_files;
-- CTAS or INSERT...SELECT all the data into a more efficient layout...
alter view production_table as select * from table_with_few_big_files;
select * from production_table where c1 = 100 and c2 < 50 and ...;
ALTER TABLE ... REPLACE COLUMNS
ALTER TABLE
INSERT
INSERT
ALTER TABLE ... REPLACE COLUMNS
ALTER TABLE ... REPLACE COLUMNS
NULL
ALTER TABLE ... REPLACE COLUMNS
TINYINT
SMALLINT
INT
TINYINT
SMALLINT
INT
SMALLINT
INT
TINYINT
SMALLINT
INT
BIGINT
ALTER TABLE
INT
STRING
FLOAT
DOUBLE
TIMESTAMP
STRING
DECIMAL(9,0)
DECIMAL(5,2)
C1,C2,C3,C4
C4,C2
PARQUET_FALLBACK_SCHEMA_RESOLUTION=name
create database schema_evolution;
use schema_evolution;
create table t1 (c1 int, c2 boolean, c3 string, c4 timestamp)
stored as parquet;
insert into t1 values
(1, true, 'yes', now()),
(2, false, 'no', now() + interval 1 day);
select * from t1;
+----+-------+-----+-------------------------------+
| c1 | c2 | c3 | c4 |
+----+-------+-----+-------------------------------+
| 1 | true | yes | 2016-06-28 14:53:26.554369000 |
| 2 | false | no | 2016-06-29 14:53:26.554369000 |
+----+-------+-----+-------------------------------+
desc formatted t1;
...
| Location: | /user/hive/warehouse/schema_evolution.db/t1 |
...
-- Make T2 have the same data file as in T1, including 2
-- unused columns and column order different than T2 expects.
load data inpath '/user/hive/warehouse/schema_evolution.db/t1'
into table t2;
+----------------------------------------------------------+
| summary |
+----------------------------------------------------------+
| Loaded 1 file(s). Total files in destination location: 1 |
+----------------------------------------------------------+
-- 'position' is the default setting.
-- Impala cannot read the Parquet file if the column order does not match.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=position;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to position
select * from t2;
WARNINGS:
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
-- With the 'name' setting, Impala can read the Parquet data files
-- despite mismatching column order.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=name;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to name
select * from t2;
+-------------------------------+-------+
| c4 | c2 |
+-------------------------------+-------+
| 2016-06-28 14:53:26.554369000 | true |
| 2016-06-29 14:53:26.554369000 | false |
+-------------------------------+-------+
原始类型
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
逻辑类型
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
复杂类型:
ARRAY
MAP
STRUCT