Hive=>HBase
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.22</version> </dependency>
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.10</version> </dependency>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency>
|
主类
1 2 3 4 5 6 7 8 9 10 11 12 13
| import com.alibaba.fastjson2.JSONObject; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hive2Hbase { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<JSONObject> hbaseData = env.addSource(new HiveReader()); hbaseData.addSink(new HbaseWriter()); hbaseData.print(); env.execute("Flink write data to hbase"); } }
|
读取Hive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import com.alibaba.fastjson2.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;
class HiveReader extends RichSourceFunction<JSONObject> { private transient Statement st = null;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive"); st = con.createStatement(); }
@Override public void run(SourceContext<JSONObject> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) { Integer id = rs.getInt("id"); String name = rs.getString("name");
JSONObject json = new JSONObject(); json.put("rowKey", id); json.put("name", name); ctx.collect(json); }
}
@Override public void cancel() {
} }
|
ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。
写入HBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import com.alibaba.fastjson2.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes;
import java.util.Random;
class HbaseWriter extends RichSinkFunction<JSONObject> {
private Table queryListTable = null; private Random rnd = new Random();
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03"); conf.set("hbase.zookeeper.property.clientPort", "2181"); org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(conf); queryListTable = connection.getTable(TableName.valueOf("zdb", "tuser")); }
@Override public void close() throws Exception { super.close(); } @Override public void invoke(JSONObject json, Context context) throws Exception { try { System.out.println("json = " + json); String rowKey = json.getString("rowKey"); String name = json.getString("name");
System.out.println("rowKey:" + rowKey); System.out.println("name:" + name); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("name"), Bytes.toBytes(""), Bytes.toBytes(name)); queryListTable.put(put); } catch (Exception ex) { ex.printStackTrace(); } } }
|
Hbase
删除命名空间下的表
1 2
| disable 'zdb:tuser' drop 'zdb:tuser'
|
创建表
1
| create 'zdb:tuser','name'
|
查看表
插入数据
1
| put 'zdb:tuser','100','name','LiYing'
|
查询数据
1 2 3 4 5 6
| get 'zdb:tuser','100' get 'zdb:tuser','100','name'
scan 'zdb:tuser'
scan 'zdb:tuser', {FORMATTER => 'toString'}
|