//String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from ts ";
//Table table = ste.sqlQuery(query);
// Table table = from kafka?
JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver");
jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure;autocommit=true");
jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING, Types.STRING);
JDBCAppendTableSink sink = jdbc.build();
table.writeToSink(sink);
Source: https://issues.apache.org/jira/browse/FLINK-8356
https://hortonworks.com/blog/hbase-hive-better-together/
Refinery Hive connector? IAE Spark job?