Prometheus安装与使用(指标Metric)、与Flink集成

前言

Prometheus 是一款基于时序数据库的开源监控告警系统,非常适合Kubernetes集群的监控。

Prometheus的基本原理是通过HTTP协议周期性抓取被监控组件的状态,任意组件只要提供对应的HTTP接口就可以接入监控。不需要任何SDK或者其他的集成过程。这样做非常适合做虚拟化环境监控系统,比如VM、Docker、Kubernetes等。输出被监控组件信息的HTTP接口被叫做exporter 。

目前互联网公司常用的组件大部分都有exporter可以直接使用,比如Varnish、Haproxy、Nginx、MySQL、Linux系统信息(包括磁盘、内存、CPU、网络等等)。

Grafana 是一个仪表盘,而仪表盘必然是用来显示数据的。

Grafana 本身并不负责数据层,它只提供了通用的接口,让底层的数据库可以把数据给它。

而我们起的另一个服务,叫 Prometheus (中文名普罗米修斯数据库)则是负责存储和查询数据的。

也就是说,Grafana 每次要展现一个仪表盘的时候,会向 Prometheus 发送一个查询请求。

那么配置里的另一个服务 Prometheus-exporter 又是什么呢?

这个就是你真正监测的数据来源了,Prometheus-exporter 这个服务,会查询你的本地电脑的信息,比如内存还有多少、CPU 负载之类,然后将数据导出至普罗米修斯数据库。

三者的关系

image-20221227104652041

准备工作

在所有节点上安装 ntpdate 工具,并进行时间同步(因为 Prometheus 对时间要求非常严格)

1
2
yum -y install ntpdate
/usr/sbin/ntpdate ntp1.aliyun.com

部署Node-Exporter

在部署Prometheus后,发现图像界面的Target中node状态异常,

因为prometheus无法通过 http://192.168.7.101:9100/metrics 接口取到监控数据。

1
2
3
4
5
6
mkdir /opt/apps
cd /root
# 下载地址页面:https://prometheus.io/download/
wget https://github.com/prometheus/node_exporter/releases/download/v1.0.1/node_exporter-1.0.1.linux-amd64.tar.gz
tar -zxvf node_exporter-1.0.1.linux-amd64.tar.gz -C /data/tools/bigdata/
ln -s /data/tools/bigdata/node_exporter-1.0.1.linux-amd64 /opt/apps/node_exporter

设置用户和组

1
2
3
groupadd -g 9100 monitor
useradd -g 9100 -u 9100 -s /sbin/nologin -M monitor
chown -R monitor:monitor /data/tools/bigdata/node_exporter-1.0.1.linux-amd64

修改配置

1
vi /usr/lib/systemd/system/node_exporter.service

配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=node-exporter service
After=network.target

[Service]
User=monitor
Group=monitor
KillMode=control-group
Restart=on-failure
RestartSec=60
# 启动参数可以使用 /opt/apps/node_exporter/node_exporter --help 查看
ExecStart=/opt/apps/node_exporter/node_exporter --collector.disable-defaults --log.level=error --collector.cpu --collector.meminfo --collector.cpu.info --collector.diskstats --collector.ipvs --collector.loadavg --collector.netclass

[Install]
WantedBy=multi-user.target

重启服务

1
2
3
systemctl daemon-reload 
systemctl start node_exporter.service
systemctl enable node_exporter.service

这时候就能够访问

http://192.168.7.101:9100/metrics

部署Prometheus

安装与配置

下载

1
2
3
4
5
cd /root
# 下载不成功,可以使用浏览器下载,或者多线程下载器。https://prometheus.io/download/
wget https://github.com/prometheus/prometheus/releases/download/v2.22.0-rc.0/prometheus-2.22.0-rc.0.linux-amd64.tar.gz
tar -zxvf prometheus-2.22.0-rc.0.linux-amd64.tar.gz -C /data/tools/bigdata/
ln -s /data/tools/bigdata/prometheus-2.22.0-rc.0.linux-amd64 /opt/apps/prometheus

设置用户和组

1
2
3
groupadd prometheus
useradd -g prometheus -s /sbin/nologin -M prometheus
chown -R prometheus:prometheus /data/tools/bigdata/prometheus-2.22.0-rc.0.linux-amd64

修改配置

1
vi /opt/apps/prometheus/prometheus.yml

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
global:
# 采集周期
scrape_interval: 60s
external_labels:
monitor: 'codelab-monitor'

scrape_configs:
# prometheus 自身监控
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']

# node_exporter 监控数据采集
- job_name: 'node'
static_configs:
- targets:
- "192.168.7.101:9100"
relabel_configs:
- source_labels: [__address__]
regex: (.+):[0-9]+
target_label: host

检查配置文件

1
/opt/apps/prometheus/promtool check config /opt/apps/prometheus/prometheus.yml

成功提示

SUCCESS: 0 rule files found

修改服务

1
vi /usr/lib/systemd/system/prometheus.service

如下

1
2
3
4
5
6
7
8
9
10
11
[Unit]
Description=prometheus
After=network.target

[Service]
User=prometheus
Group=prometheus
WorkingDirectory=/opt/apps/prometheus
ExecStart=/opt/apps/prometheus/prometheus
[Install]
WantedBy=multi-user.target

启动

1
2
3
4
5
systemctl daemon-reload 
systemctl start prometheus.service
systemctl enable prometheus.service

lsof -i:9090

访问

启动 promethues 后,可以通过 9090 端口访问web页面

http://192.168.7.101:9090/

具体的数据:

http://192.168.7.101:9090/metrics

例如输入

1
node_load1

我们就能看到

image-20221227142345425

使用influxdb

默认情况下 Prometheus 会将采集的数据存储到本机的 /opt/apps/prometheus/data 目录,存储数据的大小受限和扩展不便;

所以这里使用 influxdb 作为后端的数据库来存储数据。

安装influxdb

1)安装

1
2
3
4
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.8.x86_64.rpm
yum -y localinstall influxdb-1.7.8.x86_64.rpm
cp /etc/influxdb/influxdb.conf /etc/influxdb/influxdb.conf.default
systemctl enable --now influxdb

2)验证

1
influx

创建数据库

1
2
3
4
5
6
7
8
9
# 创建库
create database prometheus;
# 查看库
show databases;
# 使用库
use prometheus;
# 查看表
show measurements;
exit

3)配置 Prometheus 集成 infuxdb

1
vi /opt/apps/prometheus/prometheus.yml

在最后面添加:

1
2
3
4
remote_write:
- url: "http://localhost:8086/api/v1/prom/write?db=prometheus"
remote_read:
- url: "http://localhost:8086/api/v1/prom/read?db=prometheus"

重启

1
systemctl restart prometheus

注意:

如果你们 influxdb 配置密码,请参考 官网文档 来进行配置。

重启后,所有的表就都生成了

1
2
use prometheus;
select * from node_load1;

就可以看到如下

image-20221227142846407

用户操作

打开连接

1
influx

创建用户

1
create user influx with password 'influxdb';

创建一个管理员用户,给他所有权限

1
create user "admin" with password 'influxdb' with all privileges;

查看用户

1
show users

修改密码

1
set password for influx = 'influxdb'

删除用户

1
drop user admin

Java连接

pom.xml引入相关jar文件,如下:

1
2
3
4
5
6
<!-- 引入influxdb依赖 -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.8</version>
</dependency>

influxDB工具类封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

/**
* InfluxDB数据库连接操作类
*/

public class InfluxDBConnection {
// 用户名
private String username;
// 密码
private String password;
// 连接地址
private final String openurl;
// 数据库
private final String database;
// 保留策略
private final String retentionPolicy;

private InfluxDB influxDB;

public InfluxDBConnection(
String username,
String password,
String openurl,
String database,
String retentionPolicy
) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}

public InfluxDBConnection(
String openurl,
String database,
String retentionPolicy
) {
this.openurl = openurl;
this.database = database;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}

/**
* 创建数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}

/**
* 删除数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}

/**
* 测试连接是否正常
*
* @return true 正常
*/
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}

/**
* 连接时序数据库 ,若不存在则创建
*
* @return
*/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
if (username == null || password == null) {
influxDB = InfluxDBFactory.connect(openurl);
} else {
influxDB = InfluxDBFactory.connect(openurl, username, password);
}

}
influxDB.setRetentionPolicy(retentionPolicy);
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
return influxDB;
}

/**
* 创建自定义保留策略
*
* @param policyName 策略名
* @param duration 保存天数
* @param replication 保存副本数量
* @param isDefault 是否设为默认保留策略
*/
public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
database, duration, replication);
if (isDefault) {
sql = sql + " DEFAULT";
}
this.query(sql);
}

/**
* 创建默认的保留策略
* default,保存天数:30天,保存副本数量:1
* 设为默认保留策略
*/
public void createDefaultRetentionPolicy() {
String command = String.format(
"CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"default",
database,
"30d",
1
);
this.query(command);
}

/**
* 查询
*
* @param command 查询语句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}

/**
* 插入
*
* @param measurement 表
* @param tags 标签
* @param fields 字段
*/
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
TimeUnit timeUnit) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
if (0 != time) {
builder.time(time, timeUnit);
}
influxDB.write(database, retentionPolicy, builder.build());
}

/**
* 批量写入测点
*
* @param batchPoints
*/
public void batchInsert(BatchPoints batchPoints) {
influxDB.write(batchPoints);
// influxDB.enableGzip();
// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);
// influxDB.disableGzip();
// influxDB.disableBatch();
}

/**
* 批量写入数据
*
* @param database 数据库
* @param retentionPolicy 保存策略
* @param consistency 一致性
* @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}

/**
* 删除
*
* @param command 删除语句
* @return 返回错误信息
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, database));
return result.getError();
}

/**
* 关闭数据库
*/
public void close() {
influxDB.close();
}

/**
* 构建Point
*
* @param measurement
* @param time
* @param fields
* @return
*/
public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();
return point;
}
}

查询数据

InfluxDB支持一次查询多个SQL,SQL之间用逗号隔开即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.influxdb.dto.QueryResult;

import java.util.List;
import java.util.stream.Collectors;

public class Test {
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("http://192.168.7.101:8086", "prometheus", null);
QueryResult results = influxDBConnection
.query("SELECT * FROM node_load1 order by time desc limit 1000");
//results.getResults()是同时查询多条SQL语句的返回值,此处我们只有一条SQL,所以只取第一个结果集即可。
QueryResult.Result oneResult = results.getResults().get(0);
if (oneResult.getSeries() != null) {
List<List<Object>> valueList = oneResult
.getSeries()
.stream()
.map(QueryResult.Series::getValues)
.collect(Collectors.toList())
.get(0);
if (valueList != null && valueList.size() > 0) {
System.out.println(valueList.size());
for (List<Object> value : valueList) {
// 数据库中字段1取值
String field1 = value.get(0) == null ? null : value.get(0).toString();
// 数据库中字段2取值
String field2 = value.get(1) == null ? null : value.get(1).toString();
System.out.println("field1:"+field1);
}
}
}
}
}

插入数据

InfluxDB的字段类型,由第一条插入的值得类型决定;tags的类型只能是String型,可以作为索引,提高检索速度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection(
"influx",
"influxdb",
"http://192.168.7.101:8086",
"prometheus",
null
);
Map<String, String> tags = new HashMap<String, String>();
tags.put("tag1", "标签值");
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("field1", "哈哈");
// 数值型,InfluxDB的字段类型,由第一天插入的值得类型决定
fields.put("field2", 3.141592657);
// 时间使用毫秒为单位
influxDBConnection.insert("表名", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

部署Grafana

下载安装

下载

1
2
3
4
cd /root
wget https://dl.grafana.com/oss/release/grafana-7.2.1.linux-amd64.tar.gz
tar -zxvf grafana-7.2.1.linux-amd64.tar.gz -C /data/tools/bigdata/
ln -s /data/tools/bigdata/grafana-7.2.1 /opt/apps/grafana

修改配置

1
2
cp /opt/apps/grafana/conf/sample.ini /opt/apps/grafana/conf/grafana.ini
vi /opt/apps/grafana/conf/grafana.ini

修改内容如下

1
2
3
4
5
6
7
[paths]
data = /data/grafana
logs = /opt/logs/grafana
plugins = /opt/apps/grafana/plugins
[log]
mode = file
level = warn

添加用户

1
2
3
4
groupadd -g 9100 monitor
useradd -g 9100 -u 9100 -s /sbin/nologin -M monitor
mkdir -p /data/grafana /opt/logs/grafana
chown -R monitor:monitor /data/tools/bigdata/grafana-7.2.1 /data/grafana /opt/logs/grafana

修改服务

1
vi /usr/lib/systemd/system/grafana.service

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[Unit]
Description=grafana service
After=network.target

[Service]
User=monitor
Group=monitor
KillMode=control-group
Restart=on-failure
RestartSec=60
ExecStart=/opt/apps/grafana/bin/grafana-server -config /opt/apps/grafana/conf/grafana.ini -pidfile /opt/apps/grafana/grafana.pid -homepath /opt/apps/grafana

[Install]
WantedBy=multi-user.target

配置生效与重启

1
2
3
4
systemctl daemon-reload
systemctl restart grafana.service
systemctl enable grafana.service
netstat -lntp | grep grafana

结果如下

1
tcp6       0      0 :::3000                 :::*                    LISTEN      1547/grafana-server

访问

http://192.168.7.101:3000/

默认账号密码都是 admin

Docker模式安装

1
docker run -d -p 3000:3000 --name=grafana -v /data/grafana-storage:/var/lib/grafana grafana/grafana

模板

https://grafana.com/grafana/dashboards/

使用Docker安装

https://github.com/Kalasearch/grafana-tutorial

与Flink集成

Flink Metric数据流转的流程是

Flink Metric => Pushgateway => Prometheus

复制Jar

1
cp $FLINK_HOME/plugins/metrics-prometheus/flink-metrics-prometheus-1.12.7.jar $FLINK_HOME/lib

进入到 Flink 的 conf 目录,修改 flink-conf.yaml

1
vi $FLINK_HOME/conf/flink-conf.yaml

添加如下配置

1
2
3
4
5
6
7
8
9
10
##### 与 Prometheus 集成配置 #####
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
# PushGateway 的主机名与端口号
metrics.reporter.promgateway.host: hadoop01
metrics.reporter.promgateway.port: 9091
# Flink metric 在前端展示的标签(前缀)与随机后缀
metrics.reporter.promgateway.jobName: flink-metrics-ppg
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.interval: 30 SECONDS

配置分发

1
2
3
4
ssh hadoop02 "rm -rf $FLINK_HOME/lib"
ssh hadoop03 "rm -rf $FLINK_HOME/lib"

ha-fenfa.sh $FLINK_HOME

配置Pushgateway

只在Prometheus所在机器上配置

Docker安装

1
2
docker pull prom/pushgateway
docker run -d --name=pushgateway -p 9091:9091 prom/pushgateway

普通安装

下载

1
wget https://github.com/prometheus/pushgateway/releases/download/v1.4.1/pushgateway-1.4.1.linux-amd64.tar.gz

解压并安装

1
2
3
tar -xvf pushgateway-1.4.1.linux-amd64.tar.gz 
cd pushgateway-1.4.1.linux-amd64
sudo cp pushgateway /usr/local/bin/

查看版本号验证是否正常

1
pushgateway --version

启动服务,默认端口为9091,可通过--web.listen-address更改监听端口

1
2
pushgateway --web.listen-address=0.0.0.0:9091 &
pushgateway &

访问

http://hadoop01:9091/metrics

停止

1
2
jobs
kill %num

配置Prometheus

1
vi /opt/apps/prometheus/prometheus.yml

添加配置

1
2
3
4
5
6
scrape_configs:
- job_name: 'pushgateway'
static_configs:
- targets: ['localhost:9091']
labels:
instance: 'pushgateway'

添加后整体配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
global:
# 采集周期
scrape_interval: 60s
external_labels:
monitor: 'codelab-monitor'

scrape_configs:
# prometheus 自身监控
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']

# node_exporter 监控数据采集
- job_name: 'node'
static_configs:
- targets:
- "192.168.7.101:9100"
relabel_configs:
- source_labels: [__address__]
regex: (.+):[0-9]+
target_label: host

- job_name: 'pushgateway'
static_configs:
- targets: ['localhost:9091']
labels:
instance: 'pushgateway'
remote_write:
- url: "http://localhost:8086/api/v1/prom/write?db=prometheus"
remote_read:
- url: "http://localhost:8086/api/v1/prom/read?db=prometheus"

重启Prometheus

启动

1
2
3
systemctl restart prometheus.service

lsof -i:9090

启动 promethues 后,可以通过 9090 端口访问web页面

http://192.168.7.101:9090/

具体的数据:

http://192.168.7.101:9090/metrics

测试

监听端口

1
2
3
yum install nc -y

nc -lk 9999

运行Flink任务

1
flink run -t yarn-per-job $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --hostname hadoop01 --port 9999

在Prometheus中就能看到

1
flink_jobmanager_taskSlotsTotal