Flink开发-FlinkSQL连接Hive环境部署及测试、Flink SQL Cli操作

前言

本文使用环境版本

  • Hive:2.1.0
  • Flink:flink-1.12.7-bin-scala_2.12

Flink Table

https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/

依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

flink-stream-java和flink-stream-scala都提供了类似的API来管理Flink流; 您只需要使用其中一种语言即可,具体取决于您的语言。

请注意,无论您选择什么,诸如flink-runtime和flink-clients之类的某些依赖项都依赖于scala的版本(2.11或2.12),因为Flink基于Akka框架(用scala编写的框架)。

正在不断努力从更高级别的API flink-table(FLINK-11063)中删除akka依赖性。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

读取HDFS文件

1
2
3
4
5
6
7
8
9
10
11
<!--读取hadoop文件-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

环境设置

Hive 需要开启元数据服务

1
nohup hive --service metastore >/dev/null 2>&1 &

Hive下配置

hive-site.xml

1
2
3
4
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.7.101:9083</value>
</property>

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.7</flink.version>
<scala.version>2.12.15</scala.version>
<hadoop.version>2.7.7</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>

调用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

object WordCount {
case class Student(id: String, name: String, sex: String, age: Int, department: String)

def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)

val name: String = "hive"
val defaultDataBase: String = "default"
val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.1.0-bin/conf"

val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
tableEnv.registerCatalog("myHive", hive) // 注册Catalog
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
tableEnv.useDatabase("default") // 设置要查询的数据库
tableEnv.executeSql("show tables").print()

val selectTables_sql = "select * from t_user"
val result = tableEnv.executeSql(selectTables_sql)
result.print()
}
}

我们看一下初始化的方法

1
2
3
public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, @Nullable String hadoopConfDir, @Nullable String hiveVersion) {
this(catalogName, defaultDatabase, createHiveConf(hiveConfDir, hadoopConfDir), hiveVersion);
}

其中

参数 必填 默认值 类型 描述
catalogName Yes (none) String catalog 的类型,创建 HiveCatalog 时必须设置为:hive
defaultDatabase No default String 指定当前 catalog 使用的默认数据库。
hiveConfDir No (none) String 包含 hive-site.xml 文件的 hive 目录的 URI,而且必须是 hadoop 文件系统支持的 URI。如果使用了相对路径的 URI ,比如没有指定 schema ,则会假定指定的本地文件系统。如果没有指定该选项,flink 将会从 class path 中搜索 hive-site.xml 文件。
hadoopConfDir No (none) String hadoop 配置文件路径,只支持本地文件系统路径。
建议的指定 hadoop 配置文件目录的方式是将 HADOOP_CONF_DIR 配置为环境变量,比如 /etc/profile
只有在环境变量不起作用时,使用该参数,比如你想分别配置每个 HiveCatalog 环境。
hiveVersion No (none) String HiveCatalog 有能力自动推断使用的 hive 版本,建议不要指定 hive 版本号,除非自动推断失败。

使用本地配置文件

hive-site.xml放在项目下

image-20221228111820957

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object WordCount {
def main(args: Array[String]): Unit = {
case class Student(id: String, name: String, sex: String, age: Int, department: String)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)

val name: String = "hive"
val defaultDataBase: String = "default"
// val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.1.0-bin/conf"
val classPath = Thread.currentThread.getContextClassLoader.getResource("").getPath
val hiveConfDir: String = classPath+"/conf"

val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
tableEnv.registerCatalog("myHive", hive) // 注册Catalog
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
tableEnv.useDatabase("default") // 设置要查询的数据库
tableEnv
.executeSql("show tables")
.print()
val sinkDDL = "create table if not exists t_user3 (id varchar(20), name string)"
tableEnv.sqlUpdate(sinkDDL)
}
}

插入数据

1
2
3
4
5
6
7
val sql2 = "INSERT INTO t_user4(id,name) SELECT id, name FROM t_user"
val result2 = tableEnv.executeSql(sql2)
result2.print()

val selectTables_sql3 = "select * from t_user4"
val result3 = tableEnv.executeSql(selectTables_sql3)
result3.print()

判断传入参数

1
2
3
4
5
if(args==null || args.length!=1){
println("缺少参数,使用方法:flink run WordCount-1.0-jar-with-dependencies.jar -c cn.psvmc.WordCount \"hdfs://hadoop01:9000/bigdata_study/stu_list.txt\"")
System.exit(1)
}
val filePath=args(0)

Table和TableResult

  • Table 可以调用计算处理相关方法 Table调用execute返回TableResult
  • TableResult 可以用来打印
1
2
3
4
//返回Table 
tableEnv.sqlQuery(sqlstr)
//返回TableResult
tableEnv.executeSql(sqlstr)

table语法示例

1
2
3
4
5
6
val selectTables_sql3 = "select * from t_user4"
val result3 = tableEnv.sqlQuery(selectTables_sql3)
result3.groupBy($"name")
.select($"name",$"id".count as "num")
.execute()
.print()

FlinkSQL Cli集成Hive

Flink 支持以下 Hive 版本。

1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)

2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)

3.1.x(3.1.0、3.1.1、3.1.2)

环境变量

Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。

这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

添加HADOOP_CLASSPATH

1
vi /etc/profile.d/hadoop.sh

内容

1
2
3
4
5
6
7
#HADOOP_HOME
export HADOOP_HOME=/data/tools/bigdata/hadoop-2.7.7
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

配置生效

1
source /etc/profile

测试

1
echo $HADOOP_CLASSPATH

添加Jar

注意

添加这些jar会和自己开发打的jar包冲突,所以,如果没有服务器上使用FlinkSQL的需求的化,不用添加下面的Jar包。

Flink1.12.7集成Hive2.1.0只需要在Flink的lib中添加如下三个jar包

以Hive2.1.0为例,分别为:

(1) flink-sql-connector-hive-2.2.0_2.12-1.12.7.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.12/1.12.7/

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.12/1.12.7/flink-sql-connector-hive-2.2.0_2.12-1.12.7.jar

查看Metastore Version

1
select * from VERSION;

这个版本和Hive的版本一致

对应关系

Metastore version Maven dependency
1.0.0 - 1.2.2 flink-sql-connector-hive-1.2.2
2.0.0 - 2.2.0 flink-sql-connector-hive-2.2.0
2.3.0 - 2.3.6 flink-sql-connector-hive-2.3.6
3.0.0 - 3.1.2 flink-sql-connector-hive-3.1.2

(2) flink-connector-hive_2.12-1.12.7.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/flink-connector-hive_2.12-1.12.7.jar

(3) hive-exec-2.1.0.jar

Hive安装路径下的lib文件夹

1
cp $HIVE_HOME/lib/hive-exec-2.1.0.jar $FLINK_HOME/lib/

配置

Hive下配置

hive-site.xml

1
2
3
4
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.7.101:9083</value>
</property>

Flink下配置

sql-client-defaults.yaml

该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

1
2
3
4
5
6
catalogs: [] # empty list
# A typical catalog definition looks like:
# - name: myhive
# type: hive
# hive-conf-dir: /opt/hive_conf/
# default-database: ...

修改为

1
2
3
4
5
catalogs: 
- name: myhive
type: hive
hive-conf-dir: /data/tools/bigdata/apache-hive-2.1.0-bin/conf
default-database: default

开启元数据服务元数据服务

Hive 需要开启元数据服务

1
nohup hive --service metastore >/dev/null 2>&1 &

Hive中创建表

进入Hive客户端

1
hive

输入

1
2
3
create table t_user(id int,name string,password string);
INSERT INTO t_user VALUES (1,'Zhang San', '123456');
select * from t_user;

退出

1
exit;

FlinkSQL Cli操作Hive

基本操作

首先启动FlinkSQL Cli,命令如下:

1
$FLINK_HOME/bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

1
show catalogs;

结果

default_catalog
myhive

使用注册的myhive catalog

1
use catalog myhive;

FlinkSQL操作Hive中的表,比如查询,写入数据。

1
2
3
show tables;
INSERT INTO t_user VALUES (2,'李四', '123456');
select * from t_user;

退出

1
exit;

运行报错

java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore

修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false

1
2
3
4
5
6
7
8
9
10
<property>
<!--pmem指的是默认检查物理内存,容器使用的物理内存不能超过我们限定的内存大小,因为我们上面设置了所有容器能够使用的最大内存数量,超出这个内存限制,任务就会被kill掉-->
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<!--vmem指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小-->
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

show

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 列出catalog
SHOW CATALOGS;

-- 列出数据库
SHOW DATABASES;

-- 列出表
SHOW TABLES;

-- 列出函数
SHOW FUNCTIONS;

-- 列出所有激活的 module
SHOW MODULES;

create

创建库

1
CREATE DATABASE zdb;

创建表

CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]


-- 例如
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);

drop

DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。

1
2
3
4
5
6
7
8
--删除表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
--删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
--删除视图
DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name
--删除函数
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;

alter

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。

1
2
3
4
5
6
7
8
--修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
--设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
--修改视图名
ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name
--在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

insert

INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 1. 插入别的表的数据
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

-- 2. 将值插入表中
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]



-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;

-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;

-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;

-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;

三种建表的方式

1.基于DataSteam流来创建

注:此时先创建的一个Table对象,如果使用Table APi操作的话就可以直接操作了,如果要使用Sql的方式则需要先注册成一个view然后再操作

关键语句

1
Table tableApi = tableEnv.fromDataStream(mapDataStream);

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
String[] split = el.split(",");
return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table tableApi = tableEnv.fromDataStream(mapDataStream);
//tableApi 的方式查询过滤只有sensor_2的温度数据
Table resultTableApi = tableApi.select("id,temperature")
.where("id='sensor_2'");
//SQL的方式 需要先将dataStream注册成一张表
tableEnv.createTemporaryView("sensor_table", mapDataStream);
Table resultTableSql = tableEnv.sqlQuery("select id,temperature from sensor_table where id='sensor_2'");
//打印
tableEnv.toAppendStream(resultTableApi, Row.class).print("api");
tableEnv.toAppendStream(resultTableSql, Row.class).print("sql");
env.execute();
}

2. 基于connect+withFormat+withSchema方式

注:此时是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象

关键语句

1
2
3
4
5
6
7
8
9
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用来告诉flink我应该怎么处理来源用的每一条数据 比如按csv的格式,号分割
.withFormat(new Csv())
//withSchema 是声明创建表的表结构 要按照解析得到的数据的先后顺序对应
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用来告诉flink我应该怎么处理来源用的每一条数据 比如按csv的格式,号分割
.withFormat(new Csv())
//withSchema 是声明创建表的表结构 要按照解析得到的数据的先后顺序对应
.withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
//2.1 使用table api 完成查询过滤和聚合操作
Table inputTable = tableEnv.from("inputTable");
Table tableApiSelect = inputTable.select("id,temp")
.filter("id='sensor_2'");
Table tableApiAgg = inputTable.groupBy("id")
.select("id,id.count as cnt,temp.avg as avgTemp");
//2.2 使用table sql 实现上面的两个查询
Table tableSqlSelect = tableEnv.sqlQuery("select id,temp from inputTable where id='sensor_2'");
Table tableSqlAgg = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");

从kafka读取数据转换成表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object createtable_kafka {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val tableEnv = StreamTableEnvironment.create(env)

tableEnv
.connect(
new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect","192.168.100.3:2128")
.property("bootstrap.servers","192.168.100.3:9092")
)
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")

val kafkaInputTable = tableEnv.from("kafkaInputTable")

kafkaInputTable.toAppendStream[(String,Long,Double)].print()

env.execute("table from kafka")
}
}

3.基于DDL建表语句,就是Create Table方式

注:此时和方式2一样,是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象

关键

1
tableEnv.sqlUpdate(sinkDDL)

举例

1
2
val sinkDDL = "create table if not exists t_user2 (id varchar(20), name string)"
tableEnv.sqlUpdate(sinkDDL)