前言 本文使用环境版本
Hive:2.1.0
Flink:flink-1.12.7-bin-scala_2.12
Flink Table
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/
依赖 1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-java</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-scala_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-scala_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
flink-stream-java和flink-stream-scala都提供了类似的API来管理Flink流; 您只需要使用其中一种语言即可,具体取决于您的语言。
请注意,无论您选择什么,诸如flink-runtime和flink-clients之类的某些依赖项都依赖于scala的版本(2.11或2.12),因为Flink基于Akka框架(用scala编写的框架)。
正在不断努力从更高级别的API flink-table(FLINK-11063)中删除akka依赖性。
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-scala_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-java-bridge_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-scala-bridge_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
读取HDFS文件
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > ${hadoop.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-hadoop-compatibility_2.12</artifactId > <version > ${flink.version}</version > </dependency >
环境设置 Hive 需要开启元数据服务
1 nohup hive --service metastore >/dev/ null 2 >&1 &
Hive下配置
hive-site.xml
1 2 3 4 <property > <name > hive.metastore.uris</name > <value > thrift://192.168.7.101:9083</value > </property >
依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <properties > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > <flink.version > 1.12.7</flink.version > <scala.version > 2.12.15</scala.version > <hadoop.version > 2.7.7</hadoop.version > <scala.binary.version > 2.12</scala.binary.version > </properties > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-hive_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-java-bridge_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.hive</groupId > <artifactId > hive-exec</artifactId > <version > ${hive.version}</version > </dependency > </dependencies >
调用代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment}import org.apache.flink.table.catalog.hive.HiveCatalogobject WordCount { case class Student (id: String, name: String, sex: String, age: Int , department: String) def main(args: Array[String]): Unit = { val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build() val tableEnv: TableEnvironment = TableEnvironment.create(settings) val name: String = "hive" val defaultDataBase: String = "default" val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.1.0-bin/conf" val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir) tableEnv.registerCatalog("myHive" , hive) tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.useCatalog("myHive" ) tableEnv.useDatabase("default" ) tableEnv.executeSql("show tables" ).print() val selectTables_sql = "select * from t_user" val result = tableEnv.executeSql(selectTables_sql) result.print() } }
我们看一下初始化的方法
1 2 3 public HiveCatalog (String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, @Nullable String hadoopConfDir, @Nullable String hiveVersion) { this (catalogName, defaultDatabase, createHiveConf(hiveConfDir, hadoopConfDir), hiveVersion); }
其中
参数
必填
默认值
类型
描述
catalogName
Yes
(none)
String
catalog 的类型,创建 HiveCatalog 时必须设置为:hive
defaultDatabase
No
default
String
指定当前 catalog 使用的默认数据库。
hiveConfDir
No
(none)
String
包含 hive-site.xml 文件的 hive 目录的 URI ,而且必须是 hadoop 文件系统支持的 URI。如果使用了相对路径的 URI ,比如没有指定 schema ,则会假定指定的本地文件系统。如果没有指定该选项,flink 将会从 class path 中搜索 hive-site.xml 文件。
hadoopConfDir
No
(none)
String
hadoop 配置文件路径,只支持本地文件系统路径。 建议的指定 hadoop 配置文件目录的方式是将 HADOOP_CONF_DIR 配置为环境变量,比如 /etc/profile 。 只有在环境变量不起作用时,使用该参数,比如你想分别配置每个 HiveCatalog 环境。
hiveVersion
No
(none)
String
HiveCatalog 有能力自动推断使用的 hive 版本,建议不要 指定 hive 版本号,除非自动推断失败。
使用本地配置文件 把hive-site.xml放在项目下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 object WordCount { def main (args: Array [String ]): Unit = { case class Student (id: String , name: String , sex: String , age: Int , department: String ) val settings: EnvironmentSettings = EnvironmentSettings .newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv: TableEnvironment = TableEnvironment .create(settings) val name: String = "hive" val defaultDataBase: String = "default" val classPath = Thread .currentThread.getContextClassLoader.getResource("" ).getPath val hiveConfDir: String = classPath+"/conf" val hive = new HiveCatalog (name, defaultDataBase, hiveConfDir) tableEnv.registerCatalog("myHive" , hive) tableEnv.getConfig.setSqlDialect(SqlDialect .HIVE ) tableEnv.useCatalog("myHive" ) tableEnv.useDatabase("default" ) tableEnv .executeSql("show tables" ) .print() val sinkDDL = "create table if not exists t_user3 (id varchar(20), name string)" tableEnv.sqlUpdate(sinkDDL) } }
插入数据 1 2 3 4 5 6 7 val sql2 = "INSERT INTO t_user4(id,name) SELECT id, name FROM t_user" val result2 = tableEnv.executeSql(sql2)result2.print() val selectTables_sql3 = "select * from t_user4" val result3 = tableEnv.executeSql(selectTables_sql3)result3.print()
判断传入参数 1 2 3 4 5 if (args==null || args.length!=1 ){ println("缺少参数,使用方法:flink run WordCount-1.0-jar-with-dependencies.jar -c cn.psvmc.WordCount \"hdfs://hadoop01:9000/bigdata_study/stu_list.txt\"" ) System.exit(1 ) } val filePath=args(0 )
Table和TableResult
Table 可以调用计算处理相关方法 Table调用execute返回TableResult
TableResult 可以用来打印
1 2 3 4 tableEnv.sqlQuery(sqlstr) tableEnv.executeSql(sqlstr)
table语法示例
1 2 3 4 5 6 val selectTables_sql3 = "select * from t_user4" val result3 = tableEnv.sqlQuery(selectTables_sql3)result3.groupBy($"name" ) .select($"name" ,$"id" .count as "num" ) .execute() .print()
FlinkSQL Cli集成Hive Flink 支持以下 Hive 版本。
1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)
2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)
3.1.x(3.1.0、3.1.1、3.1.2)
环境变量 Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。
这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
添加HADOOP_CLASSPATH
1 vi /etc/profile.d/hadoop.sh
内容
1 2 3 4 5 6 7 export HADOOP_HOME=/data/tools/bigdata/hadoop-2.7.7export YARN_CONF_DIR=$HADOOP_HOME /etc/hadoopexport HADOOP_CONF_DIR=$HADOOP_HOME /etc/hadoopexport PATH=$PATH :$HADOOP_HOME /binexport PATH=$PATH :$HADOOP_HOME /sbinexport HADOOP_CLASSPATH=`hadoop classpath`
配置生效
测试
添加Jar 注意
添加这些jar会和自己开发打的jar包冲突,所以,如果没有服务器上使用FlinkSQL的需求的化,不用添加下面的Jar包。
Flink1.12.7集成Hive2.1.0只需要在Flink的lib中添加如下三个jar包
以Hive2.1.0为例,分别为:
(1) flink-sql-connector-hive-2.2.0_2.12-1.12.7.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.12/1.12.7/
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.12/1.12.7/flink-sql-connector-hive-2.2.0_2.12-1.12.7.jar
查看Metastore Version
这个版本和Hive的版本一致
对应关系
Metastore version
Maven dependency
1.0.0 - 1.2.2
flink-sql-connector-hive-1.2.2
2.0.0 - 2.2.0
flink-sql-connector-hive-2.2.0
2.3.0 - 2.3.6
flink-sql-connector-hive-2.3.6
3.0.0 - 3.1.2
flink-sql-connector-hive-3.1.2
(2) flink-connector-hive_2.12-1.12.7.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/flink-connector-hive_2.12-1.12.7.jar
(3) hive-exec-2.1.0.jar
Hive安装路径下的lib文件夹
1 cp $HIVE_HOME /lib/hive-exec-2.1.0.jar $FLINK_HOME /lib/
配置 Hive下配置 hive-site.xml
1 2 3 4 <property > <name > hive.metastore.uris</name > <value > thrift://192.168.7.101:9083</value > </property >
Flink下配置 sql-client-defaults.yaml
该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:
修改为
1 2 3 4 5 catalogs: - name: myhive type : hive hive-conf-dir: /data/tools/bigdata/apache-hive-2.1.0-bin/conf default-database: default
开启元数据服务元数据服务 Hive 需要开启元数据服务
1 nohup hive --service metastore >/dev/ null 2 >&1 &
Hive中创建表 进入Hive客户端
输入
1 2 3 create table t_user(id int,name string,password string); INSERT INTO t_user VALUES (1,'Zhang San' , '123456' ); select * from t_user;
退出
FlinkSQL Cli操作Hive 基本操作 首先启动FlinkSQL Cli,命令如下:
1 $FLINK_HOME /bin/sql-client.sh embedded
接下来,我们可以查看注册的catalog
结果
default_catalog myhive
使用注册的myhive catalog
FlinkSQL操作Hive中的表,比如查询,写入数据。
1 2 3 show tables;INSERT INTO t_user VALUES (2 ,'李四' , '123456' );select * from t_user;
退出
运行报错
java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore
修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false
1 2 3 4 5 6 7 8 9 10 <property > <name > yarn.nodemanager.pmem-check-enabled</name > <value > false</value > </property > <property > <name > yarn.nodemanager.vmem-check-enabled</name > <value > false</value > </property >
show 1 2 3 4 5 6 7 8 9 10 11 12 13 14 SHOW CATALOGS;SHOW DATABASES;SHOW TABLES;SHOW FUNCTIONS;SHOW MODULES;
create 创建库
创建表
CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CREATE TABLE [IF NOT EXISTS ] [catalog_name.][db_name.]table_name ( { < physical_column_definition> | < metadata_column_definition> | < computed_column_definition> }[ , ...n] [ < watermark_definition> ] [ < table_constraint> ][ , ...n] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1= val1, key2= val2, ...) [ LIKE source_table [( < like_options> )] ] CREATE TABLE Orders_with_watermark ( `user ` BIGINT , product STRING, order_time TIMESTAMP (3 ), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' , 'scan.startup.mode' = 'latest-offset' );
drop DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。
1 2 3 4 5 6 7 8 DROP TABLE [IF EXISTS ] [catalog_name.][db_name.]table_nameDROP DATABASE [IF EXISTS ] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]DROP [TEMPORARY] VIEW [IF EXISTS ] [catalog_name.][db_name.]view_nameDROP [TEMPORARY| TEMPORARY SYSTEM ] FUNCTION [IF EXISTS ] [catalog_name.][db_name.]function_name;
alter ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。
1 2 3 4 5 6 7 8 9 ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_nameALTER TABLE [catalog_name.][db_name.]table_name SET (key1= val1, key2= val2, ...)ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_nameALTER DATABASE [catalog_name.]db_name SET (key1= val1, key2= val2, ...)
insert INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statementINSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]INSERT INTO country_page_view PARTITION (date = '2019-8-30' , country= 'China' ) SELECT user , cnt FROM page_view_source; INSERT INTO country_page_view PARTITION (date = '2019-8-30' ) SELECT user , cnt, country FROM page_view_source; INSERT OVERWRITE country_page_view PARTITION (date = '2019-8-30' , country= 'China' ) SELECT user , cnt FROM page_view_source; INSERT OVERWRITE country_page_view PARTITION (date = '2019-8-30' ) SELECT user , cnt, country FROM page_view_source;
三种建表的方式 1.基于DataSteam流来创建
注:此时先创建的一个Table对象,如果使用Table APi操作的话就可以直接操作了,如果要使用Sql的方式则需要先注册成一个view然后再操作
关键语句
1 Table tableApi = tableEnv.fromDataStream(mapDataStream);
举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt" ); DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> { String[] split = el.split("," ); return new SensorReading (split[0 ], Double.valueOf(split[2 ]), Long.valueOf(split[1 ])); }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table tableApi = tableEnv.fromDataStream(mapDataStream); Table resultTableApi = tableApi.select("id,temperature" ) .where("id='sensor_2'" ); tableEnv.createTemporaryView("sensor_table" , mapDataStream); Table resultTableSql = tableEnv.sqlQuery("select id,temperature from sensor_table where id='sensor_2'" ); tableEnv.toAppendStream(resultTableApi, Row.class).print("api" ); tableEnv.toAppendStream(resultTableSql, Row.class).print("sql" ); env.execute(); }
2. 基于connect+withFormat+withSchema方式
注:此时是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象
关键语句
1 2 3 4 5 6 7 8 9 tableEnv.connect(new FileSystem ().path(filePath)) .withFormat(new Csv ()) .withSchema(new Schema () .field("id" , DataTypes.STRING()) .field("time" , DataTypes.BIGINT()) .field("temp" , DataTypes.DOUBLE())) .createTemporaryTable("inputTable" );
举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt" ;tableEnv.connect(new FileSystem ().path(filePath)) .withFormat(new Csv ()) .withSchema( new Schema () .field("id" , DataTypes.STRING()) .field("time" , DataTypes.BIGINT()) .field("temp" , DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable" ); Table inputTable = tableEnv.from("inputTable" );Table tableApiSelect = inputTable.select("id,temp" ) .filter("id='sensor_2'" ); Table tableApiAgg = inputTable.groupBy("id" ) .select("id,id.count as cnt,temp.avg as avgTemp" ); Table tableSqlSelect = tableEnv.sqlQuery("select id,temp from inputTable where id='sensor_2'" );Table tableSqlAgg = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temp) as avgTemp from inputTable group by id" );
从kafka读取数据转换成表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment , createTypeInformation}import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala.{StreamTableEnvironment , tableConversions}import org.apache.flink.table.descriptors.{Csv , Kafka , Schema }object createtable_kafka { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) val tableEnv = StreamTableEnvironment .create(env) tableEnv .connect( new Kafka () .version("0.11" ) .topic("sensor" ) .property("zookeeper.connect" ,"192.168.100.3:2128" ) .property("bootstrap.servers" ,"192.168.100.3:9092" ) ) .withFormat(new Csv ()) .withSchema( new Schema () .field("id" ,DataTypes .STRING ()) .field("timestamp" ,DataTypes .BIGINT ()) .field("temperature" ,DataTypes .DOUBLE ()) ) .createTemporaryTable("kafkaInputTable" ) val kafkaInputTable = tableEnv.from("kafkaInputTable" ) kafkaInputTable.toAppendStream[(String ,Long ,Double )].print() env.execute("table from kafka" ) } }
3.基于DDL建表语句,就是Create Table方式
注:此时和方式2一样,是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象
关键
1 tableEnv.sqlUpdate (sinkDDL)
举例
1 2 val sinkDDL = "create table if not exists t_user2 (id varchar(20), name string)" tableEnv.sqlUpdate(sinkDDL)