This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#### ENV: HDP-3.1 | |
#### Data setup: | |
``` | |
cat sample-data.json | |
{"name": "Raj","address": {"a": "b","c": "d","e": "f"}} | |
{"name": "Raj1","address": {"a": "bb","c": "dd","e": "ff"}} | |
``` | |
#### Create topic in Kafka and Ingest data into it. | |
``` | |
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic dummytopic1 | |
cat sample-data.json | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list `hostname -f`:6667 --topic dummytopic1 | |
``` | |
#### Create Hive table: | |
``` | |
CREATE EXTERNAL TABLE kafka_test_table (name string, address struct<a:string,c:string,e:string>) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES ("kafka.topic" = "dummytopic1" , "kafka.bootstrap.servers" = "hdp31a:6667" , "kafka.consumer.security.protocol" = "PLAINTEXT"); | |
-- show create table | |
+----------------------------------------------------+ | |
| createtab_stmt | | |
+----------------------------------------------------+ | |
| CREATE EXTERNAL TABLE `kafka_test_table`( | | |
| `name` string COMMENT 'from deserializer', | | |
| `address` struct<a:string,c:string,e:string> COMMENT 'from deserializer', | | |
| `__key` binary COMMENT 'from deserializer', | | |
| `__partition` int COMMENT 'from deserializer', | | |
| `__offset` bigint COMMENT 'from deserializer', | | |
| `__timestamp` bigint COMMENT 'from deserializer') | | |
| ROW FORMAT SERDE | | |
| 'org.apache.hadoop.hive.kafka.KafkaSerDe' | | |
| STORED BY | | |
| 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' | | |
| WITH SERDEPROPERTIES ( | | |
| 'serialization.format'='1') | | |
| LOCATION | | |
| 'hdfs://hdp31a.hdp.local:8020/warehouse/tablespace/external/hive/kafka_test_table' | | |
| TBLPROPERTIES ( | | |
| 'bucketing_version'='2', | | |
| 'hive.kafka.max.retries'='6', | | |
| 'hive.kafka.metadata.poll.timeout.ms'='30000', | | |
| 'hive.kafka.optimistic.commit'='false', | | |
| 'hive.kafka.poll.timeout.ms'='5000', | | |
| 'kafka.bootstrap.servers'='hdp31a:6667', | | |
| 'kafka.consumer.security.protocol'='PLAINTEXT', | | |
| 'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe', | | |
| 'kafka.topic'='dummytopic1', | | |
| 'kafka.write.semantic'='AT_LEAST_ONCE', | | |
| 'transient_lastDdlTime'='1553536265') | | |
+----------------------------------------------------+ | |
``` | |
#### Query table: | |
``` | |
-- select * from kafka_test_table; | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
| kafka_test_table.name | kafka_test_table.address | kafka_test_table.__key | kafka_test_table.__partition | kafka_test_table.__offset | kafka_test_table.__timestamp | | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
| Raj | {"a":"b","c":"d","e":"f"} | NULL | 0 | 0 | 1553536022213 | | |
| Raj1 | {"a":"bb","c":"dd","e":"ff"} | NULL | 0 | 1 | 1553536464796 | | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
2 rows selected (0.549 seconds) | |
-- select name,address.a,address.c,address.e from kafka_test_table; | |
+-------+-----+-----+-----+ | |
| name | a | c | e | | |
+-------+-----+-----+-----+ | |
| Raj | b | d | f | | |
| Raj1 | bb | dd | ff | | |
+-------+-----+-----+-----+ | |
``` |