2. PySpark——RDD编程入门 [TOC]
2.1 程序执行入口SparkContext对象 Spark RDD 编程的程序入口对象是SparkContext
对象(不论何种编程语言)
只有构建出SparkContext
, 基于它才能执行后续的API调用和计算
本质上, SparkContext
对编程来说, 主要功能就是创建第一个RDD出来
代码演示:
1 2 3 4 5 6 7 8 9 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName ("helloSpark" ).setMaster("local[*]" ) sc = SparkContext(conf=conf)
master的种类:
local:local[N]:表示以N核CPU执行,local[*]:给予local进程 所有CPU核心的使用权
standlone:spark://node1:7077
yarn 模式
2.2 RDD的创建 RDD的创建主要有2种方式:
• 通过并行化集合创建 ( 本地对象 转 分布式RDD )
• 读取外部数据源 ( 读取文件 )
2.2.1 并行化创建 概念:并行化创建,是指将本地集合转向分布式RDD,这一步就是分布式的开端:本地转分布式
API :
rdd = spakcontext.parallelize(参数1,参数2)
参数1 集合对象即可,比如list
参数2 分区数
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 from pyspark import SparkConf,SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("create rdd" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) data = [1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ] rdd = sc.parallelize(data,numSlices=3 ) print (rdd.collect())
执行结果:
1 2 3 4 5 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). [1, 2, 3, 4, 5, 6, 7, 8, 9] Process finished with exit code 0
2.2.2 获取RDD分区数 getNumPartitions
API :获取RDD分区数量,返回值是Int
数字
用法:rdd.getNumPartitions()
例如,基于上述代码设置了3为分区数,调用以下代码
1 print (rdd.getNumPartitions())
则会输出结果:3
完整案例代码:01_create_parallelize.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName ("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ]) print ("默认分区数:" , rdd.getNumPartitions()) rdd = sc.parallelize([1 , 2 , 3 ], 3 ) print ("分区数:" , rdd.getNumPartitions()) print ("rdd的内容是:" , rdd.collect()) print (type (rdd.collect()))
输出结果:
1 2 3 4 默认分区数: 8 分区数: 3 rdd的内容是: [1, 2, 3] <class 'list'>
2.2.3 读取文件创建 textFile
API
这个API可以读取本地数据,也可以读取hdfs数据
使用方法 :
sparkcontext.textFile(参数1,参数2)
参数1,必填,文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议
参数2 可选,表示最小分区数量
注意:参数2 话语权不足,spark有自己的判断,在它允许的范围内,参数2有效果,超出spark允许的范围,参数2失效
案例代码:02_create_textFile.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("02_create_textFile" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) file_rdd1 = sc.textFile("../data/input/words.txt" ) print ("默认读取分区数:" , file_rdd1.getNumPartitions()) print ("file_rdd1 内容:" , file_rdd1.collect()) file_rdd2 = sc.textFile("../data/input/words.txt" ,3 ) file_rdd3 = sc.textFile("../data/input/words.txt" ,100 ) print ("file_rdd2 分区数:" , file_rdd2.getNumPartitions()) print ("file_rdd3 分区数:" , file_rdd3.getNumPartitions()) hdfs_rdd = sc.textFile("hdfs://Tnode1:8020/input/words.txt" ) print ("hdfs_rdd 分区数:" , hdfs_rdd.getNumPartitions()) print ("hdfs_rdd 内容:" , hdfs_rdd.collect())
输出结果:
1 2 3 4 5 6 默认读取分区数: 2 file_rdd1 内容: ['hello spark', 'hello hadoop', 'hello flink'] file_rdd2 内容: 4 file_rdd3 内容: 38 hdfs_rdd 分区: 2 hdfs_rdd 内容: ['hello spark', 'hello hadoop', 'hello flink']
wholeTextFile
读取文件的API,有个适用场景:适合读取一堆小文件
这个API是小文件读取专用
用法:
1 2 3 4 5 6 7 sparkcontext.textFile(参数1 ,参数2 )
这个API偏向于少量分区读取数据
因为,这个API表明了自己是小文件读取专用,那么文件的数据很小、分区很多,
导致shuffle的几率更高,所以尽量少分区读取数据
案例代码:03_create_wholeTextFile.py
1 2 3 4 5 6 7 8 9 10 11 12 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.wholeTextFiles("../data/input/tiny_files" ) print (rdd.collect()) print (rdd.map (lambda x: x[1 ]).collect())
输出结果:
1 2 3 [('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/1.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/2.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/3.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/4.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/5.txt', 'hello spark\r\nhello hadoop\r\nhello flink')] ['hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink']
2.3 RDD算子 算子是什么?
算子 :分布式集合对象上的API称之为算子
方法、函数:本地对象的API,叫做方法、函数
算子:分布式对象的API,叫做算子
算子分类
RDD的算子 分成2类
Transformation:转换算子
Action:动作(行动)算子
Transformation 算子:
定义:RDD的算子,返回值任然是一个RDD的,称之为转换算子
特性:这类算子lazy 懒加载的,如果没有action算子,Transformation算子是不工作的
Action算子
定义:返回值不是rdd的就是action算子
对于这两类算子来说,Transformation
算子,相当于在构建执行计划,action
是一个指令让这个执行计划开始工作。
如果没有action
,Transformation
算子之间的迭代关系,就是一个没有通电的流水线,
只有action
到来,这个数据处理的流水线才开始工作
2.4.1 map算子 演示代码:04_operators_map.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from pyspark import SparkConf, SparkContextdef addNum (data ): return data * 10 if __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ], 4 ) rdd2 = rdd1.map (lambda x: x * 10 ) rdd3 = rdd1.map (addNum) result = rdd2.collect() print (result) print (rdd3.collect())
输出结果:
1 2 [10, 20, 30, 40, 50, 60, 70, 80, 90] [10, 20, 30, 40, 50, 60, 70, 80, 90]
对于传入参数的lambda表达式
传入方法作为传参的时候,可以选择
定义方法,传入其方法名
使用lambda 匿名方法的方式
一般,如果方法体可以一行写完,用lambda方便。
如果方法体复杂,就直接定义方法更方便
2.4.2 flatMap算子 功能:对rdd执行map操作,然后进行解除嵌套
操作
解除嵌套
:
演示代码:05_operators_flatMap.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from pyspark import SparkConf, SparkContextdef addNum (data ): return data * 10 if __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.parallelize(["hadoop spark hadoop" , "spark hadoop hadoop" , "hadoop flink spark" ]) rdd2 = rdd1.flatMap(lambda line: line.split(" " )) print (rdd2.collect())
输出结果:
1 ['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']
注意:flatMap只适合用于有“嵌套”的rdd,直接用于没有嵌套的rdd会报错
2.4.3 reduceByKey算子 功能:针对KV型的RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
用法:
reduceByKey
的聚合逻辑是:
比如,有[1,2,3,4,5]
,然后聚合函数是:lambda a,b: a+ b
注意:reduceByKey中接收的函数,只负责聚合,不理会分组
分组是自动 byKey
来分组的。
代码演示:06_operators_reduceByKey.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from pyspark import SparkConf, SparkContextdef addNum (data ): return data * 10 if __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('a' , 1 ), ('a' , 1 ), ('b' , 1 ), ('b' , 1 ), ('a' , 1 )]) rdd2 = sc.parallelize([('a' , 1 ), ('a' , 11 ), ('b' , 3 ), ('b' , 1 ), ('a' , 5 )]) rdd3 = sc.parallelize([('a' , 1 ), ('a' , 11 ), ('b' , 3 ), ('b' , 1 ), ('a' , 5 )]) rdd = rdd.reduceByKey(lambda a, b: a + b) rdd2 = rdd2.map (lambda x: (x[0 ], x[1 ] * 10 )) rdd3 = rdd3.mapValues(lambda value: value * 10 ) print (rdd.collect()) print (rdd2.collect()) print (rdd3.collect())
输出结果:
1 2 3 [('a', 3), ('b', 2)] [('a', 10), ('a', 110), ('b', 30), ('b', 10), ('a', 50)] [('a', 10), ('a', 110), ('b', 30), ('b', 10), ('a', 50)]
2.4.4 WordCount回顾 代码演示:07_wordcount_example.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from pyspark import SparkContext, SparkConfif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile(r"../data/input/words.txt" ) word_rdd = file_rdd.flatMap(lambda x: x.split(" " )) word_with_one_rdd = word_rdd.map (lambda word:(word,1 )) result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b) print (result_rdd.collect())
输出结果:
1 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
2.4.5 groupBy算子 功能:将rdd的数据进行分组
语法:
1 2 3 4 5 6 rdd.groupBy(func) # func 函数 # func:(T)——>k # 函数要求传入一个参数,返回一个返回值,类型无所谓 # 这个函数是 拿到你返回值后,将所有相同返回值的放入一个组中 # 分组完成后,每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
代码演示:08_oprators_groupBy.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('a' , 1 ), ('a' , 1 ), ('b' , 1 ), ('b' , 1 ), ('b' , 1 )]) result = rdd.groupBy(lambda t: t[0 ]) print (result.collect()) print ("hello" ) print (result.map (lambda t: (t[0 ], list (t[1 ]))).collect())
输出结果:
1 2 3 [('a', <pyspark.resultiterable.ResultIterable object at 0x7f85fa80eca0>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f85fa80ebb0>)] hello [('a', [('a', 1), ('a', 1)]), ('b', [('b', 1), ('b', 1), ('b', 1)])]
2.4.6 Filter算子 功能:过滤,把想要的数据进行保留
语法:
返回值是True的数据被保留,False的数据被丢弃
代码演示:09_operators_filter.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 2 , 3 , 4 , 5 , 6 ]) result = rdd.filter (lambda x: x % 2 == 1 ) print (result.collect())
输出结果:
2.4.7 distinct算子 功能:对RDD数据进行去重,返回新的RDD
语法:
演示代码:10_operators_distinct.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 1 , 1 , 2 , 2 , 2 , 3 , 3 , 3 ]) print (rdd.distinct().collect()) rdd2 = sc.parallelize([('a' , 1 ), ('a' , 1 ), ('a' , 3 )]) print (rdd2.distinct().collect())
输出结果:
1 2 [1, 2, 3] [('a', 3), ('a', 1)]
2.4.8 union算子 功能:2个rdd合并成1个rdd返回
用法:rdd.union(other_rdd)
注意:只合并,不会去重
代码演示:11_operators_union.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1 , 1 , 3 , 3 ]) rdd2 = sc.parallelize(["a" ,"b" ,"a" ]) rdd3 = rdd1.union(rdd2) print (rdd3.collect()) print (rdd3.distinct().collect()) """ 1. 可以看到union算子是不会去重的 2. RDD的类型不同也是可以合并的 """
输出结果:
1 2 [1, 1, 3, 3, 'a', 'b', 'a'] [1, 3, 'b', 'a']
2.4.9 join算子 功能:对两个RDD执行JOIN操作(可实现SQL的内、外连接)
注意:join算子只能用于二元元组
语法:
1 2 3 rdd.join(other_rdd) rdd.leftOuterJoin(other_rdd) rdd.rightOuterJoin(other_rdd)
代码演示:12_operators_join.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.parallelize([(1001 , "张三" ), (1002 , '李四' ), (1003 , '王五' ), (1004 , '赵六' )]) rdd2 = sc.parallelize([(1001 , "销售部" ), (1002 , '科技部' )]) print (rdd1.join(rdd2).collect()) print (rdd1.leftOuterJoin(rdd2).collect()) print (rdd1.rightOuterJoin(rdd2).collect())
输出结果:
1 2 3 [(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))] [(1001, ('张三', '销售部')), (1002, ('李四', '科技部')), (1003, ('王五', None)), (1004, ('赵六', None))] [(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))]
2.4.10 intersection 算子 功能:求2个rdd的交集,返回一个新rdd
用法:rdd.intersection(other_rdd)
代码演示:13_operators_intersection.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.parallelize([('a' ,1 ),('a' ,3 )]) rdd2 = sc.parallelize([('a' ,1 ),('b' ,3 )]) rdd3 = rdd1.intersection(rdd2) print (rdd3.collect())
输出结果:
2.4.11 glom算子 功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行
比如RDD数据[1,2,3,4,5]
有两个分区
那么,被glom后,数据变成:[[1,2,3],[4,5]]
使用方法:rdd.glom()
代码演示:14_operators_glom.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 ],2 ) rdd2 = sc.parallelize([1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 ]) print (rdd.glom().collect()) print (rdd.glom().flatMap(lambda x:x).collect()) print (rdd2.glom().collect())
输出结果:
1 2 3 [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] [[1], [2], [3], [4, 5], [6], [7], [8], [9, 10]]
2.4.12 groupByKey算子 功能:针对KV型RDD,自动按照key分组
用法:rdd.groupByKey()
自动按照key分组
代码演示:15_operators_groupByKey.py
1 2 3 4 5 6 7 8 9 10 11 12 13 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('a' , 1 ), ('a' , 1 ), ('a' , 1 ,), ('b' , 1 ), ('b' , 1 )]) rdd2 = rdd.groupByKey() print (rdd2.map (lambda x:(x[0 ],list (x[1 ]))).collect())
输出结果:
1 [('a', [1, 1, 1]), ('b', [1, 1])]
2.4.13 sortBy算子 功能:对RDD数据进行排序,基于你指定的排序依据
语法:
1 2 3 4 rdd.sortBy(func,ascending=False ,numPartitions=1 )
注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果
代码演示:16_operators_sortBy.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('g' , 3 ), ('c' , 1 ), ('b' , 2 ,), ('a' , 9 ), ('h' , 10 ), ('i' , 4 ), ('l' , 26 ,), ('o' , 1 ), ('d' , 7 )]) """注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果""" rdd2 = rdd.sortBy(lambda x:x[1 ],ascending=True ,numPartitions=3 ) rdd3 = rdd.sortBy(lambda x:x[0 ],ascending=True ,numPartitions=8 ) print (rdd2.collect()) print (rdd3.collect())
输出结果:
1 2 [('c', 1), ('o', 1), ('b', 2), ('g', 3), ('i', 4), ('d', 7), ('a', 9), ('h', 10), ('l', 26)] [('a', 9), ('b', 2), ('c', 1), ('d', 7), ('g', 3), ('h', 10), ('i', 4), ('l', 26), ('o', 1)]
2.4.14 sortByKey 功能:针对KV型RDD,按照key进行排序
语法:
sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD,<lambda>>)
ascending:升序或降序,True升序,False降序,默认是升序
numPartitions:按照几个分区进行排序,如果全局有序,设置为1
keyfunc:在排序前对key进行处理,语法是:(k)——>U,一个参数传入,返回一个值
代码演示:17_operators_sortByKey.py
1 2 3 4 5 6 7 8 9 10 11 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('g' , 3 ), ('A' , 1 ), ('B' , 2 ,), ('A' , 9 ), ('h' , 10 ), ('i' , 4 ), ('l' , 26 ,), ('o' , 1 ), ('d' , 7 )]) print (rdd.sortByKey(ascending=True , numPartitions=1 , keyfunc=lambda key: str (key).lower()).collect())
输出结果:
1 [('A', 1), ('A', 9), ('B', 2), ('d', 7), ('g', 3), ('h', 10), ('i', 4), ('l', 26), ('o', 1)]
2.4.15 综合案例
代码演示:18_operators_demo.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile("../data/input/order.text" ) jsons_rdd = file_rdd.flatMap(lambda line: line.split("|" )) dict_rdd = jsons_rdd.map (lambda json_str: json.loads(json_str)) beijing_rdd = dict_rdd.filter (lambda d: d['areaName' ] == '北京' ) category_rdd = beijing_rdd.map (lambda x: x['areaName' ] + '_' + x['category' ]) result_rdd = category_rdd.distinct() print (result_rdd.collect())
输出结果:
1 ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']
2.4.16 将案例提交到yarn运行 改动1 :加入环境变量,让pycharm运行yarn的时候,知道hadoop的配置在哪,可以去读取yarn的信息
1 2 3 4 import osfrom defs_19 import city_with_categoryos.environ['HADOOP_CONF_DIR' ]= "/export/server/hadoop/etc/hadoop"
改动2 :在集群上运行,本地文件就不可以用了,需要用hdfs文件
1 2 file_rdd = sc.textFile("hdfs://Tnode1:8020/input/order.text" )
改动3 :
1 2 3 4 5 6 7 """ 如果提交到集群运行,除了主代码以外,还依赖了其它的代码文件 需要设置一个参数,来告知spark,还有依赖文件要同步上传到集群中 参数叫做:spark.submit.pyFiles 参数的值可以是单个.py文件,也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传) """ conf.set ("spark.submit.pyFiles" ,"defs_19.py" )
完整代码:19_operators_runOnYarn.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import jsonfrom pyspark import SparkConf, SparkContextimport osfrom defs_19 import city_with_categoryos.environ['HADOOP_CONF_DIR' ]= "/export/server/hadoop/etc/hadoop" if __name__ == '__main__' : conf = SparkConf().setAppName("SparkDemo01" ).setMaster("yarn" ) """ 如果提交到集群运行,除了主代码以外,还依赖了其它的代码文件 需要设置一个参数,来告知spark,还有依赖文件要同步上传到集群中 参数叫做:spark.submit.pyFiles 参数的值可以是单个.py文件,也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传) """ conf.set ("spark.submit.pyFiles" ,"defs_19.py" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile("hdfs://Tnode1:8020/input/order.text" ) jsons_rdd = file_rdd.flatMap(lambda line: line.split("|" )) dict_rdd = jsons_rdd.map (lambda json_str: json.loads(json_str)) beijing_rdd = dict_rdd.filter (lambda d: d['areaName' ] == '北京' ) category_rdd = beijing_rdd.map (city_with_category) result_rdd = category_rdd.distinct() print (result_rdd.collect())
依赖代码:defs_19.py
1 2 3 4 5 def city_with_category (data ): return data['areaName' ] + '_' +data['category' ]
输出结果:
1 ['北京_书籍', '北京_食品', '北京_服饰', '北京_平板电脑', '北京_家具', '北京_手机', '北京_家电', '北京_电脑']
在服务器上通过spark-submit 提交到集群运行
1 2 3 /export/server/spark/bin /spark-submit --master yarn --py-files ./defs.py ./main.py
服务器上程序运行结果:
注意,在服务器上跑时,需要把conf中的setMaster去掉
即conf = SparkConf().setAppName(“SparkDemo01”).setMaster(“yarn”)改为:
conf = SparkConf().setAppName(“SparkDemo01”)
2.5 常用Action算子 2.5.1 countByKey算子 功能:统计key出现的次数(一般适用于KV型的RDD)
代码演示:20_operators_countByKey.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.textFile("../data/input/words.txt" ) rdd2 = rdd.flatMap(lambda x:x.split(" " )).map (lambda x: (x, 1 )) result = rdd2.countByKey() print (result) print (list (result)) print (result["hello" ]) print (type (result))
输出结果:
1 2 3 4 defaultdict(<class 'int'>, {'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1}) ['hello', 'spark', 'hadoop', 'flink'] 3 <class 'collections.defaultdict'>
2.5.2 collect算子 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:rdd.collect()
这个算子,是将RDD各个分区数据都拉取到Driver
注意的是,RDD是分布式对象,其数据量可以很大,
所以用这个算子之前要心知肚明地了解 结果数据集不会太大。
不然,会把Driver内存撑爆
2.5.3 reduce算子 功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
1 2 3 rdd.reduce(func) # func:(T ,T )——>T # 2 参数传入1 个返回值,返回值要和参数要求类型一致
代码演示:21_operators_reduce.py
1 2 3 4 5 6 7 8 9 10 11 12 13 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,2 ,3 ,4 ,5 ]) print (rdd.reduce(lambda a, b: a + b))
输出结果:
2.5.4 fold算子 功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的,
这个初始值聚合会作用在:
比如:[[1,2,3],[4,5,6],[7,8,9]]
数据量分布在3个分区
分区1: 1、2、3 聚合的时候带上10作为初始值得到16
分区3: 4、5、6 聚合的时候带上10作为初始值得到25
分区4: 7、8、9 聚合的时候带上10作为初始值得到34
3个分区的结果做聚合也带上初始值10,所以结果是10+16+25+34 = 85
代码演示:22_operators_fold.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ],3 ) print (rdd.glom().collect()) print (rdd.fold(10 , lambda a, b: a + b))
输出结果:
1 2 [[1, 2, 3], [4, 5, 6], [7, 8, 9]] 85
2.5.5 first算子 功能:取出RDD的第一个元素
用法
1 2 sc.parallelize([3 ,2 ,1 ]).first() 输出:3
2.5.6 take算子 功能:取RDD的前N个元素。组合成list返回给你
用法:
1 2 >>> sc.parallelize([3,2,1,4,5,6]).take(5) [3, 2, 1, 4, 5]
2.5.7 top算子 功能:对RDD数据集进行降序排序,取前N个
用法:
1 2 >>> sc.parallelize([3,2,1,4,5,6]).top(3) # 表示取降序前3个 [6, 5, 4]
2.5.8 count算子 功能:计算RDD有多少条数据,返回值是一个数字
用法:
1 2 >>> sc.parallelize([3,2,1,4,5,6]).count() 6
2.5.9 takeSample算子 功能:随机抽样RDD的数据
用法:
1 2 3 4 takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子) - 参数1:True表示允许取同一个数据,False表示不允许取同一个数据,和数据内容无关,是否重复表示的是同一个位置的数据(有、无放回抽样) - 参数2:抽样要几个 - 参数3:随机数种子,这个参数传入一个数字即可,随意给
随机数种子 数字可以随便传,如果传同一个数字 那么取出的结果是一致的。
一般参数3 我们不传,Spark会自动给与随机的种子。
代码演示:23_operators_takeSample.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 3 , 5 , 3 , 1 , 3 , 2 , 6 , 7 , 8 , 6 ],1 ) result = rdd.takeSample(False ,5 ,1 ) print (result)
输出结果:
注意:
随机抽样可以抽出相同的数据,只是位置不同而已
随机数种子能让随机数不再继续发生变化
2.5.10 takeOrdered 功能:对RDD进行排序取前N个
用法:
1 2 3 4 rdd.takeOrdered(参数1,参数2) - 参数1 要几个数据 - 参数2 对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子) 这个方法按照元素自然顺序升序排序,如果你想玩倒叙,需要参数2 来对排序的数据进行处理
代码演示:24_operators_takeOrdered.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 3 , 2 , 4 , 7 , 9 , 6 ], 1 ) print (rdd.takeOrdered(3 )) print (rdd.takeOrdered(3 ,lambda x:-x))
输出结果:
2.5.11 foreach算子 功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个思想),但是这个方法没有返回值
用法:
1 2 rdd.foreach(func) # func:(T) ——> None
代码演示:25_operators_foreach.py
1 2 3 4 5 6 7 8 9 10 11 12 13 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 3 , 2 , 4 , 7 , 9 , 6 ], 1 ) rdd.foreach(lambda x: print (x * 10 ))
输出结果:
2.5.12 saveAsTextFile 功能:将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,3 ,2 ,4 ,7 ,9 ,6 ],3 ) rdd.saveAsTextFile("hdfs://Tnode1:8020/test/output/out1" )
运行结果:
注意:保存文件API,是分布式执行的
这个API的执行数据是不经过driver的
如图,写出的时候,每个分区所在的Executor直接控制数据写出到目标文件系统中
所有才会一个分区产生一个结果文件
2.5.13 注意点 我们学习的action中:
这两个算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行
反之:其余的Action算子都会将结果发送至Driver
2.6 分区操作算子 2.6.1 mapPartitions算子 transformation算子
图解:
如图,mapPartition一次被传递的是一整个分区的数据
作为一个迭代器(一次性list)对象传入过来。
代码演示:27_operators_mapPartitions.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import jsonfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__' : start_time = time.time() conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,3 ,2 ,4 ,7 ,9 ,6 ],3 ) def process (iter ): result = [] for it in iter : result.append(it*10 ) return result print (rdd.mapPartitions(process).collect()) end_time = time.time() gap_time = (end_time - start_time) gap_time = round (gap_time, 4 ) print ("执行本程序共耗时:" + str (gap_time) + "s" )
输出结果:
1 2 [10, 30, 20, 40, 70, 90, 60] 执行本程序共耗时:8.0515s
注意:效果和map一样,但是性能比map好,cpu计算没有省,但是网络IO少很多
2.6.2 foreachPartition算子 Action 算子
功能:和普通foreach一致,一次处理的是一整个分区数据
代码演示:28_operators_foreachPartitions.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,3 ,2 ,4 ,7 ,9 ,6 ],3 ) def process (iter ): result = [] for it in iter : result.append(it*10 ) print (result) rdd.foreachPartition(process)
输出结果:
1 2 3 [70, 90, 60] [10, 30] [20, 40]
foreachPartition 就是一个没有返回值的mapPartitions
2.6.3 partitionBy算子 transformation算子
功能:对RDD进行自定义分区操作
用法:
1 2 3 4 5 6 7 8 9 rdd.partitionBy(参数1 ,参数2 ) - 参数1 重新分区后有几个分区 - 参数2 自定义分区规则,函数传入 参数2 :(K)——>int 一个传入参数进来,类型无所谓,但是返回值一定是int 类型, 将key传给这个函数,你自己写逻辑,决定返回一个分区编号 分区编号从0 开始,不要超出分区数-1
代码演示:29_operators_partitionBy.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('hadoop' , 1 ), ('spark' , 1 ), ("hello" , 1 ), ("flink" , 1 ), ("hadoop" , 1 ), ("spark" , 1 )]) def process (k ): if 'hadoop' == k or 'hello' == k: return 0 if 'spark' == k: return 1 return 2 print (rdd.partitionBy(3 , process).glom().collect())
输出结果:分区依次为0、1、2
1 [[('hadoop', 1), ('hello', 1), ('hadoop', 1)], [('spark', 1), ('spark', 1)], [('flink', 1)]]
分区号不要超标,你设置3个分区,分区号只能是0 1 2
设置5个分区 分区号只能是0 1 2 3 4
2.6.4 repartition算子 transformation算子
功能:对RDD的分区执行重新分区(仅数量)
用法:
1 2 rdd.repartition(N) 传入N 决定新的分区数
代码演示:30_operators_repartition_and_coalesce.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 2 , 3 , 4 , 5 ], 3 ) print (rdd.repartition(1 ).getNumPartitions()) print (rdd.repartition(5 ).getNumPartitions()) print (rdd.coalesce(1 ).getNumPartitions()) print (rdd.coalesce(5 ,shuffle=True ).getNumPartitions())
输出结果:
注意:对分区的数量进行操作,一定要慎重
一般情况下,我们写spark代码除了要求全局排序设置为1个分区外,
多数时候,所有API中关于分区相关的代码我们都不太理会
因为,如果你改分区了
会影响并行计算(内存迭代的并行管道数量)后面学
分区如果增加,极大可能导致shuffle
2.6.5 coalesce算子 transformation算子
功能:对分区进行数量增减
用法:
1 2 3 4 5 rdd.coalesce(参数1 ,参数2 ) - 参数1 ,分区数 - 参数2 ,True or False True 表示允许shuffle,也就是可以加分区False 表示不允许shuffle,也就是不能加分区,False 是默认
代码见2.6.4
对比repartition,一般使用coalesce较多,因为加分区要写参数2
这样避免写repartition的时候手抖了加分区了
2.6.6 mapValues算子 Transformation算子
功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作
语法:
代码演示:
1 2 3 4 5 6 7 8 9 10 11 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("create rdd" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([('a' , 1 ), ('a' , 11 ), ('a' , 6 ), ('b' , 3 ), ('b' , 5 )]) print (rdd.mapValues(lambda x: x * 10 ).collect())
输出结果:
1 [('a', 10), ('a', 110), ('a', 60), ('b', 30), ('b', 50)]
2.6.7 join算子 Transformation算子
功能:对两个RDD执行join操作(可以实现SQL的内、外连接)
注意:join算子只能用于二元元组
代码见 2.4.9
2.7 面试题 groupByKey和reduceByKey的区别
在功能上的区别:
groupByKey
仅仅只有分组功能而已
reduceByKey
除了有ByKey
的分组功能外,还有reduce
聚合功能,所以是一个分组+聚合一体化的算子
如果对数据执行分组+聚合,那么使用这2个算子的性能差别是很大的
reduceByKey
的性能是远大于:groupByKey
+聚合逻辑的
因为:
如图,这是groupByKey
+聚合逻辑的执行流程。
因为,groupByKey
只能分组,所以,执行上是先分组(shuffle)后聚合
再来看reduceByKey
:
如图,reduceByKey由于自带聚合逻辑,所以可以完成:
先在分区内做预聚合
然后再走分组流程(shuffle)
分组后再做最终聚合
对于groupByKey,reduceByKey最大的提升在于,分组前进行了预聚合,那么在shuffle分组节点,被shuffle的数据可以极大地减少
这就极大地提升了性能
分组+聚合,首选reduceByKey
,数据越大,对groupByKey的优势就越高
2.8 总结
RDD创建方式有哪几种方法?
通过并行化集合的方式(本地集合转分布式集合)
或者读取数据的方式创建(TextFile、WholeTextFile)
RDD分区数如何查看?
通过getNumPartitions API查看,返回值Int
Transformation和Action的区别? 转换算子的返回值100%是RDD,而Action算子的返回值100%不是RDD
转换算子是懒加载的,只有遇到Action才会执行,Action就是转换算子处理链条的开关。
哪两个Action算子的结果不经过Driver,直接输出?
foreach
和saveAsTextFile
直接由Executor执行后输出,不会将结果发送到Driver上去
reduceByKey和groupByKey的区别?
reduceByKey自带聚合逻辑,groupByKey不带
如果做数据聚合reduceByKey的效率更好,因为可以先聚合后shuffle在最终聚合,传输的IO小
mapPartitions和foreachPartition的区别?
mapPartitions带有返回值 foreachPartition不带
对于分区操作有什么要注意的地方?
尽量不要增加分区,可能破坏内存迭代的计算管道
3. RDD的持久化 3.1 RDD的数据是过程数据 RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新的RDD生成,代表老RDD的消失。
RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
这个特性可以最大化地利用资源,老旧RDD没用了 就从内存中清理,给后续的计算腾出内存空间。
如上图,rdd3被2次使用,第一次使用之后,其实RDD3就不存在了。
第二次使用的时候,只能基于RDD的血缘关系,从RDD1重新执行,构建出来RDD3,供RDD5使用。
3.2 RDD的缓存 3.2.1 缓存 对于上述的场景,肯定要执行优化,优化就是:
RDD3如果不消失,那么RDD1——>RDD2——>RDD3这个链条就不会执行2次,或者更多次
RDD的缓存技术:Spark提供了缓存API,可以让我们通过调用APi,将指定的RDD数据保留在内存或者硬盘
上
缓存的API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 rdd3.cache() rdd3.persist(StorageLevel.MEMORY_ONLY) rdd3.persist(StorageLevel.MEMORY_ONLY_2) rdd3.persist(StorageLevel.DISK_ONLY) rdd3.persist(StorageLevel.DISK_ONLY_2) rdd3.persist(StorageLevel.DISK_ONLY_3) rdd3.persist(StorageLevel.MEMORY_AND_DISK) rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) rdd3.persist(StorageLevel.OFF_HEAP) rdd.unpersist()
3.2.2 缓存特点
缓存技术可以将过程RDD数据,持久化保存到内存或者硬盘上
但是,这个保存在设定上是认为不安全的。
缓存的数据在设计上是认为有丢失风险的。
所以,缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据
缓存如何丢失:
在内存中的缓存是不安全的,比如断电、计算任务内存不足,把缓存清理给计算让路
硬盘中因为硬盘损坏也是可能丢失的。
代码演示:31_cache.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd1 = sc.textFile("../data/input/words.txt" ) rdd2 = rdd1.flatMap(lambda x: x.split(" " )) rdd3 = rdd2.map (lambda x: (x, 1 )) rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) rdd4 = rdd3.reduceByKey(lambda a, b: a + b) result = rdd4.collect() print (result) rdd5 = rdd3.groupByKey() rdd6 = rdd5.mapValues(lambda x:sum (x)) print (rdd6.collect()) rdd3.unpersist() time.sleep(10000000 )
输出结果:
1 2 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)] [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
3.2.3 缓存是如何保存的
如图,RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上。
这是分散存储
3.3 RDD的CheckPoint 3.3.1 RDD CheckPoint CheckPoint技术,也是将RDD的数据,保存起来。
但是它仅支持硬盘存储
并且:
它被设计认为是安全的
不保留血缘关系
3.3.2 CheckPoint是如何保存数据的
如图:CheckPoint存储RDD数据,是集中收集各个分区数据进行存储
。而缓存是分散存储
3.3.3 缓存和CheckPoint的对比
CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,CheckPoint被认为是安全的
CheckPoint不支持内存,缓存可以,缓存如果写内存,性能比CheckPoint要好一些
CheckPoint因为设计是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留
3.3.4 代码 1 2 3 4 5 sc.setCheckpointDir("hdfs://node1:8020/output/bj52ckp" ) rdd.checkpoint()
完整代码演示:32_checkPoint.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__' : conf = SparkConf().setAppName("test" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) sc.setCheckpointDir("hdfs://Tnode1:8020/output/ckp" ) rdd1 = sc.textFile("../data/input/words.txt" ) rdd2 = rdd1.flatMap(lambda x: x.split(" " )) rdd3 = rdd2.map (lambda x: (x, 1 )) rdd3.checkpoint() rdd4 = rdd3.reduceByKey(lambda a, b: a + b) result = rdd4.collect() print (result) rdd5 = rdd3.groupByKey() rdd6 = rdd5.mapValues(lambda x:sum (x)) print (rdd6.collect()) rdd3.unpersist() time.sleep(10000000 )
输出结果:
1 2 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)] [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
3.3.5 注意 CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适。
或者数据量很大,用CheckPoint比较合适。
如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要
Cache和CheckPoint两个API都不是Action类型
所以,想要它俩工作,必须在后面接上Action
接上Action的目的,是让RDD有数据,而不是为了CheckPoint和Cache工作。
3.3.6 总结 1.Cache和CheckPoint的区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)
2.Cache和CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本)
4. Spark案例练习 4.1 搜索引擎日志分析案例 数据格式:
需求:
用户搜索的关键词分析
用户和关键词组合分析
热门搜索时间段分析
案例实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 from pyspark import SparkConf, SparkContextfrom pyspark.storagelevel import StorageLevelimport jiebafrom operator import adddef context_jieba (data ): """通过jieba分词工具 进行分词操作""" seg = jieba.cut_for_search(data) l = [] for word in seg: l.append(word) return l def filter_words (data ): """过滤不要的 谷、帮、客 湖""" return data not in ['谷' , '帮' , '客' , '湖' ] def append_words (data ): """修订某些关键词的内容""" if data == '传智播' : data = '传智播客' if data == '院校' : data = '院校帮' if data == '博学' : data = '博学谷' if data == '数据' : data = '数据湖' return (data, 1 ) def extract_user_and_word (data ): """传入数据是 元组(1,我喜欢传智播客)""" user_id = data[0 ] content = data[1 ] words = context_jieba(content) return_list = [] for word in words: if filter_words(word): return_list.append((user_id + '_' + append_words(word)[0 ], 1 )) return return_list if __name__ == '__main__' : conf = SparkConf().setAppName("SparkDemo2" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile("hdfs://Tnode1/input/SogouQ.txt" ) split_rdd = file_rdd.map (lambda x: x.split("\t" )) split_rdd.persist(StorageLevel.DISK_ONLY) context_rdd = split_rdd.map (lambda x: x[2 ]) words_rdd = context_rdd.flatMap(context_jieba) filtered_rdd = words_rdd.filter (filter_words) final_words_rdd = filtered_rdd.map (append_words) result1 = final_words_rdd.reduceByKey(lambda a, b: a + b). \ sortBy(lambda x: x[1 ], ascending=False , numPartitions=1 ). \ take(5 ) print ("需求1结果:" , result1) user_content_rdd = split_rdd.map (lambda x: (x[1 ], x[2 ])) user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word) result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b). \ sortBy(lambda x: x[1 ], ascending=False , numPartitions=1 ). \ take(5 ) print ("需求2结果:" , result2) time_rdd = split_rdd.map (lambda x: x[0 ]) hour_with_one_rdd = time_rdd.map (lambda x: (x.split(":" )[0 ], 1 )) result3 = hour_with_one_rdd.reduceByKey(add). \ sortBy(lambda x: x[1 ], ascending=False , numPartitions=1 ). \ collect() print ("需求3结果:" , result3)
输出结果:
1 2 3 需求1结果: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002), ('传智汇', 1918), ('itheima', 1680)] 需求2结果: [('6185822016522959_scala', 2016), ('41641664258866384_博学谷', 1372), ('44801909258572364_hadoop', 1260), ('7044693659960919_仓库', 1120), ('15984948747597305_传智汇', 1120)] 需求3结果: [('20', 3479), ('23', 3087), ('21', 2989), ('22', 2499), ('01', 1365)
4.2 提交到集群运行 1 2 3 4 5 6 # 普通提交 /export/server/spark/bin/spark-submit --master yarn SparkDemo2.py # 压榨集群式提交 # 每个executor吃14g内存,8核cpu,总共3个executor /export/server/spark/bin/spark-submit --master yarn --executor-memory 14g --executor-cores 8 --num-executors 3 ./SparkDemo2.py
输出结果:
要注意代码中:
master部分删除
读取的文件路径改为hdfs才可以
4.3 作业
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from pyspark import SparkContext, StorageLevelfrom pyspark import SparkConfif __name__ == '__main__' : conf = SparkConf().setAppName("sparkHomeWork01" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile("../../data/input/apache.log" ) file_rdd.persist(StorageLevel.MEMORY_AND_DISK_2) visit_Num = file_rdd.count() print ("当前网站的被访问次数:" , visit_Num) userNum = file_rdd.distinct().count() print ("当前网站的访问用户数:" , userNum) Ip_rdd1 = file_rdd.map (lambda x: x.split(" " )) Ip_rdd1.cache() Ip_rdd2 = Ip_rdd1.map (lambda x: x[0 ]).distinct() print ("有哪些IP访问了本网站:" , Ip_rdd2.collect()) page_rdd1 = Ip_rdd1.map (lambda x:x[-1 ]) page_rdd2 = page_rdd1.map (lambda x:(x,1 )) page_rdd3 = page_rdd2.reduceByKey(lambda a,b:a+b) page = page_rdd3.takeOrdered(1 ,lambda x:-x[1 ]) page = page[0 ] print (page) print ("访问量最高的页面是:" ,page[0 ],"共被访问:" ,page[1 ],"次" )
输出结果:sparkHomeWork01.py
1 2 3 4 5 当前网站的被访问次数: 14 当前网站的访问用户数: 9 有哪些IP访问了本网站: ['83.149.9.216', '10.0.0.1', '86.149.9.216'] ('/presentations/logstash-monitorama-2013/css/print/paper.css', 13) 访问量最高的页面是: /presentations/logstash-monitorama-2013/css/print/paper.css 共被访问: 13 次
5. 共享变量 5.1 广播变量 5.1.1 问题引出 有如下代码:
上述代码,本地list对象和分布式对象RDD有了关联。如下图:
本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。
executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。
5.1.2 解决方案-广播变量 如果本地list对象标记为广播变量对象
,那么
当上述场景出现的时候,Spark只会:
给每个Executor来一份数据,而不像原本那样,每一个分区的处理线程都来一份,节省内存。
如图,使用广播变量后,每个Executor只会收到一份数据集。
内部的各个线程(分区)共享这一份数据集。
使用方式:
1 2 3 4 5 6 7 8 broadcast = sc.broadcast(stu_info_list) value = broadcast.value
代码演示:33_broadcast.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__' : conf = SparkConf().setAppName("33_broadcast.py" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) stu_info_list = [(1 , '张大仙' , 11 ), (2 , '王晓晓' , 13 ), (3 , '张甜甜' , 11 ), (4 , '王大力' , 11 )] broadcast = sc.broadcast(stu_info_list) score_info_rdd = sc.parallelize([ (1 , '语文' , 99 ), (2 , '数学' , 99 ), (3 , '英语' , 99 ), (4 , '编程' , 99 ), (1 , '语文' , 99 ), (2 , '编程' , 99 ), (3 , '语文' , 99 ), (4 , '英语' , 99 ), (1 , '语文' , 99 ), (3 , '英语' , 99 ), (2 , '编程' , 99 ) ]) def map_func (data ): name = '' id = data[0 ] for stu_info in broadcast.value: if stu_info[0 ] == id : name = stu_info[1 ] break return (name, data[1 ], data[2 ]) print (score_info_rdd.map (map_func).collect()) """ 广播变量使用场景:本地集合对象和 分布式集合对象(RDD) 进行关联的时候 需要将本地集合对象封装为广播变量 可以节省: 1. 网络IO的次数 2. Executor的内存占用 """
输出结果:
1 [('张大仙', '语文', 99), ('王晓晓', '数学', 99), ('张甜甜', '英语', 99), ('王大力', '编程', 99), ('张大仙', '语文', 99), ('王晓晓', '编程', 99), ('张甜甜', '语文', 99), ('王大力', '英语', 99), ('张大仙', '语文', 99), ('张甜甜', '英语', 99), ('王晓晓', '编程', 99)]
5.2 累加器 5.2.1 需求 想要对map
算子计算中的数据,进行数据累加,得到全部数据计算完后的累加结果
5.2.2 没有累加器的代码演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from pyspark import SparkConf, SparkContextif __name__ == '__main__' : conf = SparkConf().setAppName("create rdd" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 ],2 ) count = 0 def map_func (data ): global count count += 1 print (count) rdd.map (map_func).collect() print (count)
输出结果:
代码的问题在于:
count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送)
每个executor各自收到一个,在最后执行print(count) 的时候,这个被打印的count依旧是driver那个
所以,不管executor中累加到多少,都和driver这个count无关
5.2.3 解决方法-累加器 代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__' : conf = SparkConf().setAppName("33_broadcast.py" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) rdd = sc.parallelize([1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ], 2 ) acmlt = sc.accumulator(0 ) def map_func (data ): global acmlt acmlt += 1 print (acmlt) rdd.map (map_func).collect() print (acmlt) """ 累加器使用的注意点: 某个rdd使用完后,再被新的rdd重新调用,有可能会产生和想象中不一样的结果 避免方法:给需要重用的rdd加缓存 """
如上代码,将全部的count
对象,都替换成acmlt
对象即可
这个对象就是累加器对象,构建方式:sc.accumulator(初始值)
即可构建。
这个对象唯一和前面提到的count不同的是,这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上。
输出结果:
1 2 3 4 5 6 7 8 9 10 11 1 2 3 4 5 1 2 3 4 5 10
5.2.4 累加器的注意事项
如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了)
当rdd3构建出来的时候,是依赖rdd2,rdd2没数据,那么rdd2就要重新生成
重新生成就导致累加器累加数据的代码再次被执行,
所以代码的结果是20
也就是说,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用
可能重新构建此rdd
如果累加器累加代码,存在重新构建的步骤中
累加器累加代码就可能被多次执行。
如何解决:加缓存或者CheckPoint即可
5.3 综合案例 5.3.1 需求
对上面的数据执行:
正常的单词进行单词计数
特殊字符统计出现有多少个
特殊字符定义如下:
1 abnormal_char = ["," ,"." ,"!" ,"#" ,"$" ,"%" ]
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeimport reif __name__ == '__main__' : conf = SparkConf().setAppName("35_demo.py" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) file_rdd = sc.textFile("../data/input/accumulator_broadcast_data.txt" ) abnormal_char = [',' , '.' , '!' , '#' , '$' , '%' ] broadcast = sc.broadcast(abnormal_char) acmlt = sc.accumulator(0 ) lines_rdd = file_rdd.filter (lambda line: line.strip()) data_rdd = lines_rdd.map (lambda line: line.strip()) words_rdd = data_rdd.flatMap(lambda line: re.split("\s+" , line)) def filter_func (data ): global acmlt abnormal_chars = broadcast.value if data in abnormal_chars: acmlt += 1 return False else : return True normal_words_rdd = words_rdd.filter (filter_func) result_rdd = normal_words_rdd.map (lambda x: (x, 1 )). \ reduceByKey(lambda a, b: a + b) print ("正常单词计数结果:" ,result_rdd.collect()) print ("特殊字符数量:" ,acmlt)
输出结果:
1 2 正常单词计数结果: [('hadoop', 3), ('hive', 6), ('hdfs', 2), ('spark', 11), ('mapreduce', 4), ('sql', 2)] 特殊字符数量: 8
5.4 总结
广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能。
累加器解决了什么问题
分布式代码执行中,进行全局累加
6.Spark内核调度(重点理解) 6.1 DAG 6.1.1 DAG Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark
的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,
将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的
资源高效地完成任务计算。
以词频统计WordCount程序为例,DAG图:
DAG:有向无环图(拓扑结构)
有向:有方向
无环:没有闭环
DAG:有方向没有形成闭环的一个执行流程图
比如:
此图,就是一个典型的DAG图。
有方向:RDD1——>RDD2——>…——>collect结束
无闭环:以action(collect) 结束了,没有形成闭环循环
作用:标识代码的逻辑
执行流程
6.1.2 Job和Action Action:返回值不是RDD的算子
它的作用是一个触发开关,会将action算子之前的一串rdd依赖执行起来
如图,我们前面写的搜索引擎日志分析案例中,前两个需求就是2个action,就产生了2个DAG
结论:
1个Action会产生1个DAG,如果在代码中有3个Action就产生3个DAG
一个Action产生的一个DAG,会在程序运行中产生一个JOB
所以:1个ACTION = 1个DAG = 1个JOB
如果一个代码中,写了3个Action,那么这个代码运行起来产生3个JOB,每个JOB有自己的DAG
一个代码运行起来,在Spark中称之为:Application
层级关系:
1个Application中,可以有多个JOB,每一个JOB内含一个DAG,同时每一个JOB都是由一个Action产生的。
6.1.3 DAG和分区 DAG是Spark代码的逻辑执行图,这个DAG的最终作用是;为了构建物理上的Spark详细执行计划而生。
所以,由于Spark是分布式(多分区)的,那么DAG和分区之间也是有关联的。
1 2 3 4 5 rdd1 = sc.textFile() rdd2 = rdd1.flatMap() rdd3 = rdd2.map() rdd4 = rdd3.reduceByKey() rdd4.action()
假设,全部RDD都是3个分区在执行
如图,就得到了带有分区关系的DAG图
6.2 DAG的宽窄依赖和阶段划分 在Spark RDD前后之间的关系为:
窄依赖:父RDD的一个分区,全部将数据发送给子RDD的一个分区
宽依赖:父RDD的一个分区,将数据发送给子RDD的多个分区
宽依赖还有一个别名:shuffle
6.2.1 窄依赖
6.2.2 宽依赖
6.2.3 阶段划分 对于Spark来说,会根据DAG,按照宽依赖,划分不同的DAG阶段
划分依据:从后向前,遇到宽依赖就划分出一个阶段,称之为stage
如图,可以看到,在DAG中,基于宽依赖,将DAG划分成了2个stage
在stage的内部,一定都是:窄依赖
6.3 内存迭代计算
如图,基于带有分区的DAG以及阶段划分。可以从图中得到 逻辑上最优的task分配,一个task是一个线程来具体执行
那么如上图,task1中rdd1 rdd2 rdd3的迭代计算,都是由一个task(线程完成),这一阶段的这一条线,是纯内存计算。
如上图,task1 task2 task3,就形成了三个并行的 内存计算管道。
Spark默认受到全局并行度的限制,除了个别算子有特殊分区情况,大部分的算子,都会遵循全局并行度的要求,来规划自己的分区数。
如果全局并行度是3,其实大部分算子分区都是3
注意:Spark ,我们一般推荐只设置全局并行度,不要在算子上设置并行度。
除了一些排序算子外,计算算子就让他默认开分区就可以了
6.3.1 面试题
面试题1:Spark是怎么做内存计算的?DAG的作用?Stage阶段划分的作用?
Spark会产生DAG图
DAG图会基于分区和宽窄依赖关系划分阶段
一个阶段的内部都是窄依赖,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道
这些内存迭代计算的管道,就是一个个具体的执行Task
一个Task是一个具体的线程,任务跑在一个线程内,就是走内存计算了。
面试题2:Spark为什么比MapReduce快
Spark的算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce这个编程模型,很难在一套MR中处理复杂的任务。很多复杂任务,是需要写多个MapReduce进行串联。多个MR串联通过磁盘交互数据。
Spark可以执行内存迭代,算子之间形成DAG,基于依赖划分阶段后,在阶段内形成内存迭代管道。但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的。
总结:
编程模型上Spark占优(算子够多)
算子交互上,和计算行可以尽量多的内存计算而非磁盘迭代
6.4 Spark并行度 Spark的并行度:在同一时间内,有多少个task在同时运行
并行度:并行能力的设置
比如设置并行度6,其实就是要6个task并行在跑
在有了6个task并行的前提下,rdd的分区就被规划成6个分区了。
6.4.1 如何设置并行度 可以在代码中配置文件中以及提交程序的客户端参数中设置
优先级从高到低:
代码中
客户端提交参数中
配置文件中
默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)
全局并行度配置的参数:spark.default.parallelism
6.4.2 全局并行度-推荐 配置文件中:
1 2 conf/spark-defaults.conf 中设置 spark.default.parallelism 100
在客户端提交参数中:
1 bin/spark-submit --conf "spark.default.parallelism=100"
在代码中设置:
1 2 conf = SparkConf() conf.set("spark.default.parallelism",100)
全局并行度是推荐设置,不要针对RDD改分区,可能会影响内存迭代管道的构建,或者会产生额外的shuffle
6.4.3 针对RDD的并行度设置-不推荐 只能在代码中写,算子:
repartition算子
coalesce算子
partitionBy算子
6.4.4 集群中如何规划并行度 结论:设置为CPU总核心的2~10倍
比如集群可用CPU核心是100个,我们建议并行度是200~1000
确保是CPU核心的整数倍即可,最小是2倍,最大一般10倍或者更高(适量)均可
为什么要设置最少2倍?
CPU的一个核心同一时间只能干一件事。
所以,在100个核心的情况下,设置100个并行,就能让CPU100%出力。
这种设置下,如果task的压力不均衡,某个task先执行完了,就导致某个CPU核心空闲
所以,我们将Task(并行)分配的数量变多,比如100个并行,同一时间只有100个在运行,700个在等待,
但是可以确保,某个task运行完了,后续有task补上,不让cpu闲下来,最大程度利用集群的资源。
规划并行度,只看集群总CPU核数
6.5 Spark任务调度 Spark的任务,由Driver进行调度,这个工作包含:
逻辑DAG产生
分区DAG产生
Task划分
将Task分配给Executor并监控其工作
如图,Spark程序的调度流程如图:
Driver被构建出来
构建SparkContext(执行环境入口对象)
基于DAG Scheduler(DAG调度器)构建逻辑Task分配
基于TaskSchedule(Task调度器)将逻辑Task分配到各个Executor上干活,并监控它们。
Worker(Executor),被TaskScheduler管理监控,听从它们的指令干活,并定期汇报进度。
1,2,3,4都是Driver的工作
5是Worker的工作
6.5.1 Drivcer内的两个组件 DAG调度器 :
工作内容:将逻辑的DAG图进行处理,最终得到逻辑上的Task划分
Task调度器 :
工作内容:基于DAG Scheduler的产出,来规划这些逻辑的task,应该在哪些物理的executor上运行,以及监控管理它们的运行。
6.6 拓展-Spark概念名词大全 6.6.1 Spark运行中的概念名词大全
层级关系梳理:
一个Spark环境可以运行多个Application
一个代码运行起来,会成为一个Application
Application内部可以有多个Job
每个Job由一个Action产生,并且每个Job有自己的DAG执行图
一个Job的DAG图会基于宽窄依赖划分成不同的阶段
不同阶段内基于分区数量,形成多个并行的内存迭代管道
每一个内存迭代管道形成一个Task(DAG调度器划分将Job内划分出具体的task任务,一个Job被划分出来的task在逻辑上称之为这个job的taskset)
6.7 SparkShuffle 6.7.1MR Shuffle回顾 首先回顾MapReduce框架中Shuffle过程,整体流程图如下:
6.7.2 简介 Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上
还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中 ,涉及
到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是
Child Stage的第一步。
执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行
Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask
。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为
Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。
Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式,到1.1版本时参考Hadoop MapReduce的实现开始引
入Sort Shuffle,在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用,在1.6中将
Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式,到的2.0版本,Hash Shuffle已被删除,所有
Shuffle方式全部统一到Sort Shuffle一个实现中。
在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着
Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort
Shuffle两种。
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重
的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于
HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但
是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉
取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
6.7.3 Sort Shuffle bypass机制 bypass运行机制的触发条件如下:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,
将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的
。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,
只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的
HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,
也就节省掉了这部分的性能开销
总结:
SortShuffle也分为普通机制和bypass机制
普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。
而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的
shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序
,极大的提高了其性能。
6.7.4 Shuflle的配置选项 Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
spark 的shuffle调优:主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
spark.shuffle.file.buffer
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写
到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性
能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight :
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现
,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait :
spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试
的最大次数。(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction :
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项:
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
6.8 总结
DAG是什么有什么用?
DAG是有向无环图,用以描述任务执行流程,主要作用就是协助DAG调度器构建Task分配用以做任务管理
内存迭代、阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
DAG调度器是?
构建Task分配以做任务管理