前言
Mysql中ResultSet默认会将一次查询的结果存入内存中。如果数据量比较大,就会占用大量的内存。如果内存不够,就会报错。
Mysql版本
mysql-connector-java分为5.1版本和8.0版本,但有的朋友可能在项目中用到过6.0版本的jar包,而我们在官网的截图中确实没看到6.0版本。对此我也有很大的疑问,通过查看官方文档,我了解到8.0以前就是6.0,后来版本号更改,6.0变成了8.0,官方推荐升级使用8.0版本的mysql-connector-java。
对比
5.1版本支持java5及其以上的版本,支持5.6、5.7、8.0版本的mysql数据库,支持3.0、4.0、4.1、4.2版本的jdbc。在5.1中,Driver的实现类的全路径名是com.mysql.jdbc.Driver。
8.0版本支持java8及其以上的版本,支持5.6、5.7、8.0版本的mysql数据库,支持4.2版本的jdbc。在8.0中,Driver的实现类的全路径名是com.mysql.cj.jdbc.Driver。
依赖
1 2 3 4 5 6
| <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.30</version> </dependency>
|
方式1
流式处理结果,让驱动每次返回1行数据
1 2 3 4
| conn = DriverManager.getConnection("jdbc:mysql://192.168.7.102/", "user", "123456"); stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); rs = stmt.executeQuery("SELECT * FROM tablename");
|
statement进行java.sql.ResultSet.TYPE_FORWARD_ONLY,java.sql.ResultSet.CONCUR_READ_ONLY和stmt.setFetchSize(Integer.MIN_VALUE)的组合设置,会告诉mysql服务器流式处理返回结果,一行一行的返回数据。
这是mysql规定的设置,一开始还搞不懂为啥setFetchSize会是Integer.MIN_VALUE,设置上就完了,这是规定的设置!
注意:当你使用此方式处理数据时,你必须处理完resultset中的所有数据,或者将resultset关闭后才能使用此连接进行下一次的查询等操作,否则会抛出异常。
方式2
使用基于游标的处理,setFetchSize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import java.sql.*;
public class MysqlTest { public static void main(String[] args) throws Exception { Statement stmt; ResultSet rs;
Class.forName("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql://110.110.110.110:13306/xhkjedu_cb?useCursorFetch=true"; String uname = "root"; String pwd = "123456"; Connection conn = DriverManager.getConnection(url, uname, pwd); System.out.println(conn.getClass().getName()); stmt = conn.createStatement(); String sql = "select * from t_dict"; rs = stmt.executeQuery(sql); while (rs.next()) { int id = rs.getInt("DICT_ID"); String name = rs.getString("DICT_NAME"); System.out.println(String.format("id:%s name:%s", id + "", name)); } } }
|
准备数据
1 2 3 4
| create database ztest; use ztest; create table t_user(id int,name VARCHAR(100)); insert into t_user(id,name) values(1,'xiaoming');
|
使用HiveJDBC实现
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.22</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency>
|
Mysql2Hive
1 2 3 4 5 6 7 8 9 10 11 12 13
| import com.alibaba.fastjson2.JSONObject; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Mysql2Hive { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<JSONObject> mysqlData = env.addSource(new MysqlReader()); mysqlData.addSink(new HiveWriter()); mysqlData.print(); env.execute("Mysql2Hive"); } }
|
MysqlReader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.xhkjedu.mysql2hive;
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;
class MysqlReader extends RichSourceFunction<Row> { private transient Statement st = null;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); Connection con = DriverManager.getConnection("jdbc:mysql://192.168.7.102/ztest?useCursorFetch=true", "root", "123456"); st = con.createStatement(); st.setFetchSize(3); }
@Override public void run(SourceContext<Row> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) { Integer id = rs.getInt("id"); String name = rs.getString("name");
ctx.collect(Row.ofKind(RowKind.INSERT, id, name)); }
}
@Override public void cancel() {
} }
|
ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。
HiveWriter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.xhkjedu.mysql2hive;
import com.alibaba.fastjson2.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement;
class HiveWriter extends RichSinkFunction<JSONObject> {
private transient Statement st = null;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", ""); st = con.createStatement(); }
@Override public void close() throws Exception { super.close(); }
@Override public void invoke(JSONObject json, Context context) throws Exception { Integer id = json.getInteger("id"); String name = json.getString("name"); String sql = "insert into t_user02(id,name) VALUES (" + id + ",'" + name + "')"; System.out.println("Running: " + sql); st.execute(sql); } }
|
方式2(推荐)
Mysql2Hive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.xhkjedu.mysql2hive;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row;
public class Mysql2Hive { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Row> mysqlData = env.addSource(new MysqlReader(),getRowTypeInfo()); String name = "hive"; String defaultDatabase = "default"; String hiveConfDir = "/data/tools/bigdata/apache-hive-2.1.0-bin/conf";
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.useDatabase("default"); Table table = tEnv.fromDataStream(mysqlData);
tEnv.createTemporaryView("flink_user", table); String insertSql = "insert into t_user02(id,name) SELECT id, name FROM flink_user"; tEnv.executeSql(insertSql); }
private static RowTypeInfo getRowTypeInfo() { TypeInformation[] types = new TypeInformation[2]; types[0] = BasicTypeInfo.INT_TYPE_INFO; types[1] = BasicTypeInfo.STRING_TYPE_INFO; String[] fieldNames = new String[2]; fieldNames[0] = "id"; fieldNames[1] = "name"; return new RowTypeInfo(types, fieldNames); } }
|
MysqlReader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.xhkjedu.mysql2hive;
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;
class MysqlReader extends RichSourceFunction<Row> { private transient Statement st = null;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); Connection con = DriverManager.getConnection("jdbc:mysql://192.168.7.102/ztest?useCursorFetch=true", "root", "123456"); st = con.createStatement(); st.setFetchSize(3); }
@Override public void run(SourceContext<Row> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) { Integer id = rs.getInt("id"); String name = rs.getString("name");
ctx.collect(Row.ofKind(RowKind.INSERT, id, name)); }
}
@Override public void cancel() {
} }
|
报错
An input of GenericTypeInfo cannot be converted to Table.
设置类型
1 2 3 4 5 6 7 8 9 10
| DataStream<Row> mysqlData = env.addSource(new MysqlReader(),getRowTypeInfo()); private static RowTypeInfo getRowTypeInfo() { TypeInformation[] types = new TypeInformation[2]; types[0] = BasicTypeInfo.INT_TYPE_INFO; types[1] = BasicTypeInfo.STRING_TYPE_INFO; String[] fieldNames = new String[2]; fieldNames[0] = "id"; fieldNames[1] = "name"; return new RowTypeInfo(types, fieldNames); }
|
支持的数据类型
前面示例中的 DataStream,流中的数据类型都是定义好的 POJO 类。如果 DataStream 中的类型是简单的基本类型,还可以直接转换成表吗?这就涉及了Table 中支持的数据类型。
整体来看,DataStream 中支持的数据类型,Table 中也是都支持的,只不过在进行转换时需要注意一些细节。
1. 原子类型
在 Flink 中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作”原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在 fromDataStream()方法里增加参数,用来重新命名列字段。
1 2
| Table table = tableEnv.fromDataStream(stream, $(“myLong”));
|
2. Tuple 类型
当原子类型不做重命名时,默认的字段名就是”f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1 的处理结果。
Table 支持 Flink 中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名 f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的 as()方法来进行重命名。
1 2 3 4 5 6
| Table table = tableEnv.fromDataStream(stream, $("f1"));
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
|
3. POJO 类型
Flink 也支持多种数据类型组合成的”复合类型”,最典型的就是简单 Java 对象(POJO 类型)。由于 POJO 中已经定义好了可读性强的字段名,这种类型的数据流转换成 Table 就显得无比顺畅了。
将 POJO 类型的DataStream 转换成 Table,如果不指定字段名称,就会直接使用原始 POJO类型中的字段名称。POJO 中的字段同样可以被重新排序、提却和重命名,这在之前的例子中已经有过体现。
1 2 3
| Table table = tableEnv.fromDataStream(stream); Table table = tableEnv.fromDataStream(stream, $("user")); Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
|
4. Row 类型
Flink 中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是 Table 中数据的基本组织形式。Row 类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建 Table 时调用的 CREATE 语句就会将所有的字段名称和类型指定,这在 Flink 中被称为表的”模式结构”(Schema)。除此之外,Row 类型还附加了一个属性 RowKind,用来表示当前行在更新操作中的类型。这样, Row 就可以用来表示更新日志流(changelog stream)中的数据,从而架起了 Flink 中流和表的转换桥梁。
所以在更新日志流中,元素的类型必须是 Row,而且需要调用 ofKind()方法来指定更新类型。下面是一个具体的例子:
1 2 3 4 5 6 7
| DataStream<Row> dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)); // 将更新日志流转换为表 Table table = tableEnv.fromChangelogStream(dataStream);
|