Saturday, June 30, 2018

quick-start guide to ingest data into druid using batch mode on HDP platform.

source : http://druid.io/docs/latest/tutorials/tutorial-batch.html ENV : HDP-2.6.4

pageview.json

{"time": "2015-09-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
{"time": "2015-09-01T01:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}
{"time": "2015-09-01T01:30:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

index task json

cp /usr/hdp/2.6.4.0-91/druid/quickstart/wikiticker-index.json /tmp/sampledata-index.json

modified index task json

cat sampledata-index.json
{
  "type" : "index_hadoop",
  "spec" : {
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "/tmp/pageview.json"
      }
    },
    "dataSchema" : {
      "dataSource" : "pageviews",
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-01/2015-09-02"]
      },
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : ["url","user"]
          },
          "timestampSpec" : {
            "format" : "auto",
            "column" : "time"
          }
        }
      },
      "metricsSpec" : [
        {
          "name" : "views",
          "type" : "count"
        },
        {
          "name" : "latencyMs",
          "type" : "doubleSum",
          "fieldName" : "latencyMs"
     } 
     ]
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "jobProperties" : {}
    }
  }
}

copy pageviews.json to hdfs /tmp directory

submit index task to overlord.

curl -X 'POST' -H 'Content-Type:application/json' -d @sampledata-index.json  `hostname`:8090/druid/indexer/v1/task

look for completed/running task in overlord console UI.

check the coordinator UI for new datasource with the name of pageviews

Realtime ingestion in druid using tranquility on HDP

ENV HDP-2.6.4

download tranquility

curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgz
tar -xzf tranquility-distribution-0.8.0.tgz
cd tranquility-distribution-0.8.0

start tranquility

[druid@c215-node2 tranquility-distribution-0.8.0]$ pwd
/home/druid/tranquility-distribution-0.8.0
bin/tranquility server -configFile /usr/hdp/current/druid-broker/conf-quickstart/tranquility/server.json 
.
.

.
2018-07-01 18:12:05,620 [main] INFO  org.eclipse.jetty.server.Server - Started @5026ms

send data into Druid

[root@c215-node2 bin]# pwd
/usr/hdp/2.6.4.0-91/druid/bin
[root@c215-node2 bin]# python generate-example-metrics -c 1000 | curl -XPOST -H'Content-Type: application/json' --data-binary @- http://localhost:8200/v1/post/metrics
{"result":{"received":1000,"sent":1000}}[root@c215-node2 bin]# 

create hive table on top of druid datasource

CREATE EXTERNAL TABLE merices_hive
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "metrics");

now you can query realtime data through hive.


Hive druid Integration : quick test to create druid table from hive table

generate data for hive table
echo "generating sample data for hive table"
echo {-1..-181451}hours | xargs -n1 date +"%Y-%m-%d %H:%M:%S" -d >> /tmp/dates.data
echo {-1..-18145}minutes | xargs -n1 date +"%Y-%m-%d %H:%M:%S" -d >> /tmp/dates.data
echo {-1..-1825}days | xargs -n1 date +"%Y-%m-%d %H:%M:%S" -d >> /tmp/dates.data
cat /tmp/dates.data | while read LINE ; do echo $LINE,"user"$((1 + RANDOM % 10000)),$((1 + RANDOM % 1000)) >> /tmp/hive_user_table.data; done

create hive table

create table hive_user_table(`timecolumn` timestamp, `username` string, `credit_rating` int) row format delimited fields terminated by ',';
load data local inpath '/tmp/hive_user_table.data' into table hive_user_table;
select count(*) from hive_user_table;
// 201421
select * from hive_user_table limit 5;

default hive conf to work with druid

hive.druid.bitmap.type=roaring
hive.druid.broker.address.default=brokernode:8888
hive.druid.coordinator.address.default=coordinatorenode:8081
hive.druid.http.numConnection=20
hive.druid.http.read.timeout=PT10M
hive.druid.indexer.memory.rownum.max=75000
hive.druid.indexer.partition.size.max=1000000
hive.druid.indexer.segments.granularity=DAY
hive.druid.maxTries=5
hive.druid.metadata.base=druid
hive.druid.metadata.db.type=mysql
hive.druid.metadata.password=druid
hive.druid.metadata.uri=jdbc:mysql://mysqlhost:3306/druid?createDatabaseIfNotExist=true
hive.druid.metadata.username=druid
hive.druid.passiveWaitTimeMs=30000
hive.druid.select.distribute=true
hive.druid.select.threshold=10000
hive.druid.sleep.time=PT10S
hive.druid.storage.storageDirectory=/apps/druid/warehouse
hive.druid.working.directory=/tmp/druid-indexing

create druid table from hive using CTAS

drop table druid_user_table;
CREATE TABLE druid_user_table STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.segment.granularity" = "DAY", "druid.query.granularity" = "HOUR") 
AS select `timecolumn` as `__time`, `username`, `credit_rating` FROM hive_user_table;

-- since druid.segment.granularity is day so it will create ~1800 segments in druid
select * from druid_user_table limit 5;

check druid coordinatore UI, you will see a new datasource with the name of database.druid_table_name.


Wednesday, June 27, 2018

Hive Compaction failing with FileAlreadyExistsException

ENV

HDP263

Exception

Client
ERROR [Thread-123]: compactor.Worker (Worker.java:run(191)) - Caught exception while trying to compact id:123,dbname:hive_acid,tableName:hive_acid_table,partName:hive_acid_part=part_name,state:^@,type:MAJOR,properties:null,runAs:null,tooManyAborts:false,highestTxnId:0.  Marking failed to avoid repeated failures,    java.io.IOException: Minor compactor job failed for Hadoop JobId:job_XXXXXX_XXXX     at org.apache.hadoop.hive.ql.txn.compactor.CompactorMR.launchCompactionJob(CompactorMR.java:314)
     at org.apache.hadoop.hive.ql.txn.compactor.CompactorMR.run(CompactorMR.java:269)
     at org.apache.hadoop.hive.ql.txn.compactor.Worker$1.run(Worker.java:175)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:422)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
     at org.apache.hadoop.hive.ql.txn.compactor.Worker.run(Worker.java:172)
     

Exception in mapreduce job

FATAL [IPC Server handler 11 on 12345] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_XXXXX - exited : org.apache.hadoop.fs.FileAlreadyExistsException: XXXXXXXXXXXXXXXXXX/base_00000XX/bucket_00000 for client  already exists
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2811)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2698)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2582)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)

Reason

during compaction hive trigger mapreduce job which create a base or delta file at TMP_LOCATION depending on the what compaction it is running.
ORC writing is memory intensive operation and situation become worse if you writing wide ORC table(too many columns). writing wide ORC require more
memory and small yarn container size are not good enough. In above scenario user is running mapreduce job with 2G container size which was not enough, as 
memory start growing and reached beyond 2G of container, yarn physical memory checker kills the container which does not give the chance to container to 
clean up TMP_LOCATION and subsequent task start failing with FileAlreadyExistsException.

Resoultion

try running the compaction with big yarn container size.
ALTER TABLE TABLENAME partition (PART_NAME='PART_VALUE') COMPACT 'MINOR' WITH OVERWRITE TBLPROPERTIES ('compactor.mapreduce.map.memory.mb'='4096') 

Bonus

// run compaction job in debug mode
ALTER TABLE TABLENAME partition (PART_NAME='PART_VALUE') COMPACT 'MINOR' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.log.level"="DEBUG","compactor.yarn.app.mapreduce.am.log.level"="DEBUG");