大数据环境搭建-Flume

前言

全局命令

在环境变量中增加如下命令,可以使用 bd 快速切换到 /data/tools/bigdata

1
2
cd /etc/profile.d/
vi bd.sh

内容如下

1
alias bd='cd /data/tools/bigdata'

配置生效

1
source /etc/profile

Flume的安装

下载地址

https://dlcdn.apache.org/flume/1.9.0/

上传至虚拟机,并解压

1
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /data/tools/bigdata

环境变量

创建配置文件

1
vi /etc/profile.d/flume.sh

内容设置为

1
2
3
# FLUME
export FLUME_HOME=/data/tools/bigdata/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin

配置生效

1
source /etc/profile

查看是否生效

1
echo $FLUME_HOME

查看flume版本

1
flume-ng version

测试flume

监控一个目录,将数据打印出来

配置文件

1
vi $FLUME_HOME/conf/spoolingtest.conf

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# a1表示agent的名字 可以自定义
# 给sources(在一个agent里可以定义多个source)取个名字
a1.sources = r1
# 给channel个名字
a1.channels = c1
# 给channel个名字
a1.sinks = k1

# 对source进行配置
# agent的名字.sources.source的名字.参数 = 参数值

# source的类型 spoolDir(监控一个目录下的文件的变化)
a1.sources.r1.type = spooldir
# 监听哪一个目录
a1.sources.r1.spoolDir = /root/data
# 是否在event的headers中保存文件的绝对路径
a1.sources.r1.fileHeader = true
# 给拦截器取个名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp拦截器,将处理数据的时间保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink为logger
# 直接打印到控制台
a1.sinks.k1.type = logger

# 将source、channel、sink组装成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新建/root/data目录

1
mkdir /root/data

启动agent

1
flume-ng agent -n a1 -f $FLUME_HOME/conf/spoolingtest.conf -Dflume.root.logger=DEBUG,console

在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志

随意在test.txt中加入一些内容

1
vi /root/data/test.txt

我们会发现它做了如下操作

  1. 会把文件中的每一行都打印出来
  2. 打印完成后文件重命名为test.txt.COMPLETED

flume的使用

系统文件到HDFS

创建配置文件

1
vi $FLUME_HOME/conf/spoolingToHDFS.conf

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /root/data
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = student
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt

#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

在 /root/data/目录下准备数据

1
vi /root/data/study.txt

内容如下

1
2
good good study
day day up

启动agent

如果HDFS没有启动要先启动HDFS

1
flume-ng agent -n a -f $FLUME_HOME/conf/spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console

启动HDFS

1
$HADOOP_HOME/sbin/start-dfs.sh

停止HDFS

1
$HADOOP_HOME/sbin/stop-dfs.sh

可以从网址中查看文件保存情况(192.168.160.130是我的服务器IP)

http://192.168.160.130:50070/explorer.html#/

HBase日志到HDFS

1
vi $FLUME_HOME/conf/hbaseLogToHDFS.conf

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = tail -f /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt

#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

启动

1
flume-ng agent -n a -f $FLUME_HOME/conf/hbaseLogToHDFS.conf -Dflume.root.logger=DEBUG,console

HBase日志到HBase

创建配置文件

1
vi $FLUME_HOME/conf/hbaselogToHBase.conf

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = t_log
a.sinks.k1.columnFamily = cf1

#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

在hbase中创建log表

1
create 't_log','cf1'

启动

1
flume-ng agent -n a -f $FLUME_HOME/conf/hbaselogToHBase.conf -Dflume.root.logger=DEBUG,console

网络日志获取

创建配置文件

1
vi $FLUME_HOME/conf/netcatLogger.conf

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = netcat
a.sources.r1.bind = 0.0.0.0
a.sources.r1.port = 8888

#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

启动

先启动agent

1
flume-ng agent -n a -f $FLUME_HOME/conf/netcatLogger.conf -Dflume.root.logger=DEBUG,console

新打开一个控制台

安装telnet

1
yum install telnet

再启动telnet

1
telnet localhost 8888

网络请求日志

创建配置文件

1
vi $FLUME_HOME/conf/httpToLogger.conf

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = http
a.sources.r1.port = 6666

#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

先启动agent

1
flume-ng agent -n a -f $FLUME_HOME/conf/httpToLogger.conf -Dflume.root.logger=DEBUG,console

再使用curl发起一个http请求

新打开一个控制台

1
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://localhost:6666