前言
Hbase中的数据读取起来不太方便,所以这里使用Phoenix来保存数据。
准备Hive
启动Hive服务
1
| nohup $HIVE_HOME/bin/hiveserver2&
|
连接Hive服务
1
| beeline -n hive -u jdbc:hive2://hadoop01:10000/default
|
插入数据
1
| INSERT INTO t_user01(id,name) VALUES (1,'李四');
|
查询数据
1 2
| select * from t_user01; select * from t_user01 limit 10;
|
准备Phoenix
注意
在Phoenix中无论表还是字段只要没有双引号引起来的字段都会变成大写。
这里不建议用双引号,在后期拼接SQL的时候比较麻烦。
启动query server
1 2 3
| queryserver.py start
lsof -i:8765
|
连接
1
| sqlline-thin.py http://hadoop01:8765
|
创建schema
使用这个新建的 schema:
创建表
1 2 3 4
| CREATE TABLE IF NOT EXISTS tuser( id VARCHAR primary key, name VARCHAR );
|
插入数据
1 2 3
| upsert into tuser values('1001','zhangsan'); upsert into tuser values('1002','lisi'); upsert into tuser(id,name) values('1003','liwu');
|
查询记录
1 2 3
| select * from tuser; select * from tuser where id='1001'; select * from tuser where name like 'li%';
|
分页查询
1 2
| select * from tuser order by id desc limit 1 offset 0; select * from tuser order by id desc limit 1 offset 1;
|
其中
Hive=>Phoenix
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.22</version> </dependency>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency>
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-queryserver-client</artifactId> <version>6.0.0</version> </dependency>
|
主类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import com.alibaba.fastjson2.JSONObject; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hive2Phoenix { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<JSONObject> sourceData = env.addSource(new HiveReader()); sourceData.addSink(new PhoenixWriter()); sourceData.print(); env.execute("Hive2Phoenix"); } }
|
读取Hive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import com.alibaba.fastjson2.JSONObject; import com.xhkjedu.pojo.DBModel; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;
public class HiveReader extends RichSourceFunction<JSONObject> { private transient Statement st = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive"); st = con.createStatement(); }
@Override public void run(SourceContext<JSONObject> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) { Integer id = rs.getInt("id"); String name = rs.getString("name");
JSONObject json = new JSONObject(); json.put("id", id); json.put("name", name); ctx.collect(json); }
}
@Override public void cancel() {
} }
|
ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。
写入Phoenix
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import com.alibaba.fastjson2.JSONObject; import com.xhkjedu.pojo.DBModel; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement;
public class PhoenixWriter extends RichSinkFunction<JSONObject> {
private transient Statement st = null; static Connection conn;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters);
System.out.println("open:" + Thread.currentThread().getId()); Class.forName("org.apache.phoenix.queryserver.client.Driver"); if (conn == null) { conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://192.168.7.101:8765;serialization=PROTOBUF"); } st = conn.createStatement(); }
@Override public void close() throws Exception { System.out.println("close"); conn.commit(); st.close(); conn.close(); super.close(); }
@Override public void invoke(JSONObject json, Context context) throws Exception { String id = json.getString("id"); String name = json.getString("name"); String sql = String.format("upsert into mdb.tuser(id,name) VALUES ('%s','%s')", id, name); System.out.println("sql: " + sql); st.execute(sql); } }
|