Windows环境
https://www.psvmc.cn/article/2022-04-21-bigdata-spark-idea.html
两种操作
转换操作
- filter
- map
- flatMap
- groupByKey
- reduceByKey
行动操作
- count()
- collect()
- first()
- take(n)
- reduce(func)
- foreach(func)
- saveAsTextFile()
转换操作并不会开始计算,只是记录下要做啥计算,之后在调用行动操作的时候才会开始计算,并且每次都是从开始开始计算。
Spark编程概要
传递给spark的master url可以有如下几种:
local
本地单线程local[K]
本地多线程(指定K个内核)local[*]
本地多线程(指定所有可用内核)spark://HOST:PORT
连接到指定的 Spark standalone cluster master,需要指定端口。mesos://HOST:PORT
连接到指定的 Mesos 集群,需要指定端口。yarn-client客户端模式
连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。yarn-cluster集群模式
连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。
获取sc
1 | object Test { |
RDD创建
RDD创建的两种方式
方式1 加载文件
1 | val inputFile = "file:///D:\\spark_study\\wordcount.txt" |
或者
1 | val inputFile = "hdfs://localhost:9000/user/hadoop/wordcount.txt" |
方式2
1 | val list = Array(1, 2, 3, 4, 5, 6, 7, 8) |
简单示例
1 | import org.apache.spark.{SparkConf, SparkContext} |
持久化
1 | val list = Array(1, 2, 3, 4, 5, 6, 7, 8) |
注意
rdd持久化可以用两个方法
- rdd.cache()
- rdd.persist(StorageLevel.MEMORY_ONLY)
这两个方法作用是一样的,只不过后者可以设置持久化的位置,cache()
则是直接持久化到内存中。
分区
1 | val arr = Array(1, 2, 3, 4, 5) |
Pair
创建
1 | val list = List("Hadoop", "Spark", "Hive", "Spark") |
或者
1 | val rdd = sc.textFile("file:///D:\\spark_study\\wordcount.txt") |
wordcount.txt
1 | good good study |
reduceByKey
1 | val list = List("good good study", "day day up") |
结果
(up,1)
(day,2)
(good,2)
(study,1)
groupByKey()
1 | val list = List("good good study", "day day up") |
结果
(up,CompactBuffer(1))
(day,CompactBuffer(1, 1))
(good,CompactBuffer(1, 1))
(study,CompactBuffer(1))
keys/values
1 | val list = List("good good study", "day day up") |
结果
up
day
good
studyCompactBuffer(1)
CompactBuffer(1, 1)
CompactBuffer(1, 1)
CompactBuffer(1)
sortByKey()
1 | val list = List("good good study", "day day up") |
结果
(day,1)
(day,1)
(good,1)
(good,1)
(study,1)
(up,1)
mapValues(func)
1 | val list = List("good good study", "day day up") |
结果
(good,101)
(good,101)
(study,101)
(day,101)
(day,101)
(up,101)
join
1 | val pairRDD1 = sc.parallelize(Array(("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5))) |
结果
join:
(spark,(1,100))
(spark,(2,100))fullOuterJoin:
(spark,(Some(1),Some(100)))
(spark,(Some(2),Some(100)))
(hadoop,(Some(3),None))
(hadoop,(Some(5),None))leftOuterJoin:
(spark,(1,Some(100)))
(spark,(2,Some(100)))
(hadoop,(3,None))
(hadoop,(5,None))rightOuterJoin:
(spark,(Some(1),100))
(spark,(Some(2),100))
共享变量
广播变量
广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:
1 | val broadcastVar = sc.broadcast(Array(1, 2, 3)) |
这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。
累加器
累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。
Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。
一个数值型的累加器,可以通过调用SparkContext.longAccumulator()
或者SparkContext.doubleAccumulator()
来创建。
运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。
下面是一个代码实例,演示了使用累加器来对一个数组中的元素进行求和:
1 | val accum = sc.longAccumulator("My Accumulator") |
结果
10
JSON处理
转换的库的网址
https://github.com/json4s/json4s/
1 | import org.json4s._ |
JSON文件json.txt
1 | {"name": "xiaoming","age": 10, "luckNumbers":[1,2,3,4,5]} |
代码
1 | import org.apache.spark.{SparkConf, SparkContext} |
结果
xiaoming
xiaohong
xiaogang
读取HBase
创建项目
修改build.sbt
1 | name := "SparkDemo01" |
示例
1 | package org.example |
如果运行报错
Error running HBaseSpark. Command line is too long.
找到.idea
下的workspace.xml
搜索 PropertiesComponent
内部添加
1 | <property name="dynamic.classpath" value="true" /> |
添加后结构如下
1 | <component name="PropertiesComponent"> |
Kafka作为数据源
数据产生者
1 | import java.util.HashMap |
消费及运算
1 | package org.example |
build.sbt
1 | name := "SparkDemo01" |
运行时我们就能看到
生产者每1秒都会产生一组数据
1 | 9 3 1 5 9 9 8 9 2 7 |
每10秒进行一次统计
1 | (4,22) |