3.PySpark-SparkSQL

在Spark上运用SQL处理结构化数据

3.1 SparkSQL快速入门

3.1.1 什么是SparkSQL

image-20230819174120604

SparkSQL是Spark的一个模块,用于处理海量结构化数据

限定:结构化数据处理

3.1.2 为什么要学习SparkSQL

image-20230819173142314

SparkSQL是非常成熟的 海量结构化数据处理框架。

学习SparkSQL主要在2个点:

  1. SparkSQL本身十分优秀, 支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等
  2. 企业大面积在使用SparkSQL处理业务数据
  • ​     离线开发
    
  • ​ 数仓搭建
  • ​ 科学计算
  • ​ 数据分析

3.1.3 SparkSQL特点

image-20230819174237330

3.1.4 SparkSQL发展史

image-20230819174207223

在许多年前(2012\2013左右)Hive逐步火热起来, 大片抢占分布式SQL计算市场

Spark作为通用计算框架, 也不可能放弃这一细分领域.

于是, Spark官方模仿Hive推出了Shark框架(Spark 0.9版本)

Shark框架是几乎100%模仿Hive, 内部的配置项\优化项等都是直接模仿而来.

不同点在于将执行引擎由MapReduce更换为了Spark.

因为Shark框架太模仿Hive, Hive是针对MR优化, 很多地方和SparkCore(RDD)水土不服, 最终被放弃

Spark官方下决心开发一个自己的分布式SQL引擎 也就是诞生了现在的SparkSQL


image-20230819173814501

● 2014年 1.0正式发布

● 2015年 1.3 发布DataFrame数据结构, 沿用至今

● 2016年 1.6 发布Dataset数据结构(带泛型的DataFrame), 适用于支持泛型的语言(Java\Scala)

● 2016年 2.0 统一了Dataset 和 DataFrame, 以后只有Dataset了, Python用的DataFrame就是 没有泛型的Dataset

● 2019年 3.0 发布, 性能大幅度提升,SparkSQL变化不大

3.1.5 总结

  1. SparkSQL用于处理大规模结构化数据的计算引擎

  2. SparkSQL在企业中广泛使用,并性能极好,学习它不管是工作还是就业都有很大帮助

  3. SparkSQL:使用简单、API统一、兼容HIVE、支持标准化JDBC和ODBC连接

  4. SparkSQL 2014年正式发布,当下使用最多的2.0版,Spark发布于2016年,当下使用的最新3.0办发布于2019年

3.2 SparkSQL概述

3.2.1 SparkSQL和Hive的异同

image-20230819190806954

  • Hive和Spark均是:“分布式SQL计算引擎”
  • 均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
  • 目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

3.2.2 SparkSQL的数据抽象

image-20230819191543792


image-20230819191526430

3.2.3 SparkSQL数据抽象的发展

image-20230819191713814

从SparkSQL的发展历史可以看到:

14年最早的数据抽象是:SchemaRDD(内部存储二维表数据结构的RDD),SchemaRDD就是魔改的RDD,将RDD支持的存储数据,限定

为二维表数据结构用以支持SQL查询。由于是魔改RDD,只是一个过渡产品,现已废弃。

15年发布DataFrame对象,基于Pandas的DataFrame(模仿)独立于RDD进行实现,将数据以二维表结构进行存储并支持分布式运行

16年发布DataSet对象,在DataFrame之上添加了泛型的支持,用以更好的支持Java和Scala这两个支持泛型的编程语言

16年,Spark2.0版本,将DataFrame和DataSet进行合并。其底层均是DataSet对象,但在Python和R语言到用时,显示为DataFrame对象。和老的DataFrame对象没有区别

3.2.4 DataFrame概述

DataFrame和RDD都是:弹性的、分布式的、数据集

只是,DataFrame存储的数据结构“限定”为:二维表结构化数据

而RDD可以存储的数据则没有任何限制,想处理什么就处理什么

image-20230819192349796


假定有如下数据集:

image-20230819193133157

DataFrame按二维表存储:

image-20230819193144162

RDD按数组对象存储:
image-20230819193159002

  • DataFrame 是按照二维表格的形式存储数据
  • RDD则是存储对象本身

3.2.5 SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext

在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象

  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

    所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象

    image-20230819193053095


    现在,来体验一下构建执行环境入口对象:SparkSession

    构建SparkSession核心代码

    image-20230819193407862

    应该是.master(“local[*]”),见下文代码

3.2.6 SparkSQL HelloWorld

需求:读取文件,找出学科为“语文”的数据,并限制输出5条

where subject = ‘语文’ limit 5

代码演示:00_spark_session_create.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# coding:utf8

# SparkSession对象的导包,对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession

if __name__ == '__main__':
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName("00_spark_session_create.py"). \
master("local[*]"). \
getOrCreate()

# 通过SparkSession对象,获取SparkContext对象
sc = spark.sparkContext

# SparkSQL的HelloWorld
df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
df2 = df.toDF("id", "name", "score")
df2.printSchema() # 打印表结构
print("读取并打印数据")
df2.show() # 显示数据

df2.createTempView("score") # 构建score表
# SQL风格
print("SQL 风格结果:")
spark.sql("""
select * from score where name ='语文' limit 5
""").show()

# DSL 风格
print("DSL 风格结果:")
df2.where("name='语文'").limit(5).show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- score: string (nullable = true)

读取并打印数据
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
| 6|语文| 99|
| 7|语文| 99|
| 8|语文| 99|
| 9|语文| 99|
| 10|语文| 99|
| 11|语文| 99|
| 12|语文| 99|
| 13|语文| 99|
| 14|语文| 99|
| 15|语文| 99|
| 16|语文| 99|
| 17|语文| 99|
| 18|语文| 99|
| 19|语文| 99|
| 20|语文| 99|
+---+----+-----+
only showing top 20 rows

SQL 风格结果:
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----+

DSL 风格结果:
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----+

3.2.7 总结

  1. SparkSQL 和 Hive同样,都是用于大规模SQL分布式计算的计算框架,均可以运行在YARN之上,在企业中广泛被应用

  2. SparkSQL的数据抽象为:SchemaRDD(废弃)、DataFrame(Python、R、Java、Scala)、DataSet(Java、Scala)。

  3. DataFrame同样是分布式数据集,有分区可以并行计算,和RDD不同的是,DataFrame中存储的数据结构是以表格形式组织的,方便进行SQL计算

  4. DataFrame对比DataSet基本相同,不同的是DataSet支持泛型特性,可以让Java、Scala语言更好的利用到。

  5. SparkSession是2.0后推出的新执行环境入口对象,可以用于RDD、SQL等编程

3.3 DataFrame入门

3.3.1 DataFrame的组成

DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:

  • 表结构表述

比如,在MySQL中的一张表:

  • 由许多行组成

  • 数据也被分成多个列

  • 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:

在结构层面:

  • StructType对象描述整个DataFrame的表结构
  • StructField对象描述一个列的信息

在数据层面:

  • Row对象记录一行数据
  • Column对象记录一列数据并包含列的信息

image-20230824222446129

如图, 在表结构层面,DataFrame的表结构由StructType描述,如下图:

image-20230824222534304

一个StructField记录:列名、列类型、列是否运行为空

多个StructField组成一个StructType对象。

一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

同时,一行数据描述为Row对象,如Row(1, 张三, 11)

一列数据描述为Column对象,Column对象包含一列数据和列的信息

Row、Column、StructType、StructField的编程我们在后面编码阶段会接触

3.3.2 DataFrame的代码构建

3.3.2.1 基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构

将RDD转换为DataFrame方式1:

调用spark

1
2
3
4
5
6
# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame

这里只传入列名称,类型从RDD中进行推断,是否允许为空,默认为允许(True)

完整代码演示:01_dataframe_create1.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("01_dataframe_create1.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 1.基于RDD转换成DataFrame
rdd = sc.textFile("../data/input/sql/people.txt"). \
map(lambda x: x.split(",")). \
map(lambda x: (x[0], int(x[1]))) # 需要做类型转换, 因为类型从RDD中探测

# 2.构建DataFrame对象
# 参数1 被转换的RDD
# 参数2 指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])

# 打印DataFrame的表结构
df.printSchema()

# 打印df中的数据
# 参数1 表示展示出多少条数据,默认不传的话是20
# 参数2 表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示,以...代替
# 如果给False 表示不截断,全部显示,默认是True
df.show(20, False)

# 将DF对象转换成临时视图表,可供sql语句查询
df.createOrReplaceTempView("people")
spark.sql("select * from people where age < 30").show()



输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)

+-------+---+
|name |age|
+-------+---+
|Michael|29 |
|Andy |30 |
|Justin |19 |
+-------+---+

+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+

3.3.2.2基于RDD方式2

将RDD转换为DataFrame方式2:

​ 通过StructType对象来定义DataFrame的“表结构”转换RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)

完整代码演示:02_dataframe_create2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("02_dataframe_create2.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 基于RDD转换成DataFrame
rdd = sc.textFile("../data/input/sql/people.txt"). \
map(lambda x: x.split(",")). \
map(lambda x: (x[0], int(x[1])))

# 构建表结构的描述对象:StructType对象:StructType对象
schema = StructType().add("name",StringType(),nullable=True).\
add("age",IntegerType(),nullable=False)

# 基于StructType对象去构建RDD到DF的转换
df = spark.createDataFrame(rdd,schema=schema)

df.printSchema()
df.show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+

3.3.2.3 基于RDD方式3

将RDD转换为DataFrame方式3:使用RDD的toDF方法转换RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()

完整代码演示:03_dataframe_create3.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("03_dataframe_create3.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 基于RDD转换成DataFrame
rdd = sc.textFile("../data/input/sql/people.txt"). \
map(lambda x: x.split(",")). \
map(lambda x: (x[0], int(x[1])))

# toDF的方式构建DataFrame
df1 = rdd.toDF(["name", "age"]) # 只有数据列名,没有指定数据类型,只适合和数据不敏感的情况
df1.printSchema()
df1.show()

# toDF的方式2 通过StructType来构建
schema = StructType().add("name", StringType(), nullable=True). \
add("age", IntegerType(), nullable=False)

df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)

+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+

root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+

3.3.2.4 基于Pandas的DF

将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

1
2
3
4
5
6
7
8
# 构建Pandas的DF
pdf = pd.DataFrame({
"id": [1, 2, 3],
"name": ["张大仙", '王晓晓', '王大锤'],
"age": [11, 11, 11]
})
# 将Pandas的DF对象转换成Spark的DF
df = spark.createDataFrame(pdf)

完整代码演示:04_dataframe_create4.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("04_dataframe_create4.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext
# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
pdf = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["靓仔", "吊毛", "如花"],
"age": [11, 21, 11]
}
)

df = spark.createDataFrame(pdf)
df.printSchema()
df.show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)

+---+----+---+
| id|name|age|
+---+----+---+
| 1|靓仔| 11|
| 2|吊毛| 21|
| 3|如花| 11|
+---+----+---+

3.3.2.5 读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame

统一API示例代码:

1
2
3
4
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")

读取text数据源

使用format(“text”)读取文本数据

读取到的DataFrame只会有一个列,列名默认称之为:value

示例代码:

1
2
3
4
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")

完整代码演示:05_dataframe_create5_text.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("05_dataframe_create5_text.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 构建StructType,text数据源,读取数据的特点是,将一整行只作为一个列读取,默认列名是value 类型是String
# 我们设置列名为value
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text"). \
schema(schema=schema). \
load("../data/input/sql/people.txt")

df.printSchema()
df.show()

输出结果:

1
2
3
4
5
6
7
8
9
10
root
|-- data: string (nullable = true)

+-----------+
| data|
+-----------+
|Michael, 29|
| Andy, 30|
| Justin, 19|
+-----------+

读取json数据源

使用format("json")读取json数据

示例代码:

1
2
3
4
5
df = spark.read.format("json").\
load("../data/sql/people.json")
# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df.printSchema()
df.show()

完整代码演示:06_dataframe_create6_json.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("06_dataframe_create6_json.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# JSON类型自带有Schema信息
df = spark.read.format("json").load("../data/input/sql/people.json")
df.printSchema()
df.show()


输出结果:

1
2
3
4
5
6
7
8
9
10
11
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

读取csv数据源

使用format(“csv”)读取csv数据

示例代码:

1
2
3
4
5
6
7
8
df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV标头
.option("encoding", "utf-8")\ # 编码
.schema("name STRING, age INT, job STRING")\ # 指定列名和类型
.load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()

完整代码演示:07_dataframe_create7_csv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("07_dataframe_create7_csv.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 读取csv文件
df = spark.read.format("csv"). \
option("sep", ";"). \
option("header", True). \
option("encoding", "utf-8"). \
schema("name STRING,age INT,job STRING"). \
load("../data/input/sql/people.csv")

df.printSchema()
df.show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- job: string (nullable = true)

+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|null| Manager|
|Alice| 9| null|
+-----+----+---------+

读取parquet数据源

使用format(“parquet”)读取parquet数据

示例代码:

1
2
3
4
5
# parquet 自带schema, 直接load啥也不需要了
df = spark.read.format("parquet").\
load("../data/sql/users.parquet")
df.printSchema()
df.show()

完整代码演示:08_dataframe_create8_parquet.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("08_dataframe_create8_parquet.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

# 读取parquet文件
df = spark.read.format("parquet").load("../data/input/sql/users.parquet")

df.printSchema()
df.show()


输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)

+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式

parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)

  • 存储是以列作为存储格式

  • 存储是序列化存储在文件中的(有压缩属性体积小)

Parquet文件不能直接打开查看,如果想要查看内容可以在PyCharm中安装如下插件来查看:

image-20230825175402666

3.3.3 DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格

  • SQL风格

DSL语法风格:

DSL称之为:领域特定语言。

其实就是指DataFrame的特有API

DSL风格意思就是以调用API的方式来处理Data

比如:df.where().limit()

SQL语法风格:

SQL风格就是使用SQL语句处理DataFrame的数据

比如:spark.sql(“SELECT * FROM xxx)

3.3.3.1 DSL风格

DSL-show方法

功能:展示DataFrame中的数据, 默认展示20条

语法:

1
2
3
df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否截断列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True

如图,某个df.show后的展示结果:image-20230825220122185


DSL-printSchema方法

功能:打印输出df的schema信息

语法:

1
df.printSchema()

image-20230825220212413


DSL-select

功能:选择DataFrame中的指定列(通过传入参数进行指定)

语法:

image-20230825222239059

可传递:

  • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列

  • List[Column]对象或者List[str]对象, 用来选择多个列

image-20230825220357050


DLS-filter和where

功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame

语法:

df.filter()

df.where()

where和filter功能上是等价的

image-20230825222215745

image-20230825221901984


DSL-groupBy分组

功能:按照指定的列进行数据的分组, 返回值是GroupedData对象

语法:

df.groupBy()

image-20230825221842036

传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark按照哪个列分组

image-20230825221832645


GroupedData对象

GroupedData对象是一个特殊的DataFrame数据集,其类全名:<class 'pyspark.sql.group.GroupedData'>,这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据,GroupedData对象其实也有很多API,比如前面的count方法就是这个对象的内置方法。除此之外,像:min、max、avg、sum、等等许多方法都存在,后续会再次使用它。

代码演示:09_dataframe_process_dsl.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("09_dataframe_process_dsl.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

df = spark.read.format("csv").\
schema("id INT,subject STRING,score INT").\
load("../data/input/sql/stu_score.txt")

# Column对象的获取
id_column = df['id']
subject_column = df['subject']

# DSL风格演示
df.select(["id",'subject']).show()
# df.select("id","subject").show()
# df.select(id_column,subject_column).show()

# filter API
df.filter("score < 99").show()
# df.filter(df["score"] < 99).show()
#
# # where API
df.where("score < 99").show()
# df.where(df['score'] < 99).show()

# group By API
df.groupBy("subject").count().show()
# df.groupBy(df["subject"]).count().show()

# df.groupBy API的返回值 GroupedData
# GroupedData对象 不是DataFrame
# 它是一个 有分组关系的数据结构,有一些API供我们对分组做聚合
# SQL: group by 后接上聚合:sum avg count min max
# GroupedData 类似于SQL分组后的数据结构,同样有上述5种聚合方法
# GroupedData 调用聚合方法后,返回值依旧是DataFrame
# GroupedData 只是一个中转的对象,最终还是要获得DataFrame的结果
r = df.groupBy("subject")
print(type(r))
print(r.max().show())
print(r.min().show())

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
+---+-------+
| id|subject|
+---+-------+
| 1| 语文|
| 2| 语文|
| 3| 语文|
| 4| 语文|
| 5| 语文|
| 6| 语文|
| 7| 语文|
| 8| 语文|
| 9| 语文|
| 10| 语文|
| 11| 语文|
| 12| 语文|
| 13| 语文|
| 14| 语文|
| 15| 语文|
| 16| 语文|
| 17| 语文|
| 18| 语文|
| 19| 语文|
| 20| 语文|
+---+-------+
only showing top 20 rows

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
| 1| 数学| 96|
| 2| 数学| 96|
| 3| 数学| 96|
| 4| 数学| 96|
| 5| 数学| 96|
| 6| 数学| 96|
| 7| 数学| 96|
| 8| 数学| 96|
| 9| 数学| 96|
| 10| 数学| 96|
| 11| 数学| 96|
| 12| 数学| 96|
| 13| 数学| 96|
| 14| 数学| 96|
| 15| 数学| 96|
| 16| 数学| 96|
| 17| 数学| 96|
| 18| 数学| 96|
| 19| 数学| 96|
| 20| 数学| 96|
+---+-------+-----+
only showing top 20 rows

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
| 1| 数学| 96|
| 2| 数学| 96|
| 3| 数学| 96|
| 4| 数学| 96|
| 5| 数学| 96|
| 6| 数学| 96|
| 7| 数学| 96|
| 8| 数学| 96|
| 9| 数学| 96|
| 10| 数学| 96|
| 11| 数学| 96|
| 12| 数学| 96|
| 13| 数学| 96|
| 14| 数学| 96|
| 15| 数学| 96|
| 16| 数学| 96|
| 17| 数学| 96|
| 18| 数学| 96|
| 19| 数学| 96|
| 20| 数学| 96|
+---+-------+-----+
only showing top 20 rows

+-------+-----+
|subject|count|
+-------+-----+
| 英语| 30|
| 语文| 30|
| 数学| 30|
+-------+-----+

<class 'pyspark.sql.group.GroupedData'>
+-------+-------+----------+
|subject|max(id)|max(score)|
+-------+-------+----------+
| 英语| 33| 99|
| 语文| 30| 99|
| 数学| 30| 96|
+-------+-------+----------+

None
+-------+-------+----------+
|subject|min(id)|min(score)|
+-------+-------+----------+
| 英语| 1| 99|
| 语文| 1| 99|
| 数学| 1| 96|
+-------+-------+----------+

None

3.3.3.2 SQL风格

SQL风格语法-注册DataFrame成为表

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中,使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

image-20230825221811807

image-20230825221047238


SQL风格语法-使用SQL查询

注册好表后,可以通过:

sparksession.sql(sql语句)来执行sql查询

返回值是一个新的df

示例:


pyspark.sql.functions包

PySpark提供了一个包: pyspark.sql.functions

这个包里面提供了 一系列的计算函数供SparkSQL使用,如何用呢?

导包

from pyspark.sql import functions as F

然后就可以用F对象调用函数计算了。这些功能函数, 返回值多数都是Column对象:

示例

image-20230825223308733

代码演示:10_dataframe_process_sql.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("10_dataframe_process_sql.py"). \
master("local[*]"). \
getOrCreate()

sc = spark.sparkContext

df = spark.read.format("csv"). \
schema("id INT,subject STRING,score INT"). \
load("../data/input/sql/stu_score.txt")

# 注册成临时表
df.createTempView("score") # 注册临时视图(表)
df.createOrReplaceTempView("score_2") # 注册 或者 替换 临时视图
df.createGlobalTempView("score_3") # 注册全局临时视图 全局临时视图在使用的时候 需要在前面带上global_temp.前缀

# 可以通过SparkSession对象的sql api来完成sql语句的执行
spark.sql("select subject,count(*) as cnt from score group by subject").show()
spark.sql("select subject,count(*) as cnt from score_2 group by subject").show()
spark.sql("select subject,count(*) as cnt from global_temp.score_3 group by subject").show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
+-------+---+
|subject|cnt|
+-------+---+
| 英语| 30|
| 语文| 30|
| 数学| 30|
+-------+---+

+-------+---+
|subject|cnt|
+-------+---+
| 英语| 30|
| 语文| 30|
| 数学| 30|
+-------+---+

+-------+---+
|subject|cnt|
+-------+---+
| 英语| 30|
| 语文| 30|
| 数学| 30|
+-------+---+

3.3.4 词频统计案例

我们来完成一个单词计数需求,使用DSL和SQL两种风格来实现。

代码演示:11_wordcount_demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("11_wordcount_demo.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# TODO 1:SQL 风格进行处理
rdd = sc.textFile("../data/input/words.txt"). \
flatMap(lambda x: x.split(" ")). \
map(lambda x: [x])
# dataFrame需要嵌套形式的数据, 比如:[['hello'], ['spark'], ['hello'], ['hadoop'], ['hello'], ['flink']]

df = rdd.toDF(["word"])

# 注册DF为表格
df.createTempView("words")

spark.sql("select word,count(*) as cnt from words group by word order by cnt desc ").show()

# TODO 2: DSL风格处理
df = spark.read.format("text").load("../data/input/words.txt")

# withColumn方法
# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字和老列相同,那么替换,否则作为新列存在
df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
df2.groupBy("value"). \
count(). \
withColumnRenamed("value", "word"). \
withColumnRenamed("count", "cnt"). \
orderBy("cnt", ascending=False). \
show()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
| flink| 1|
|hadoop| 1|
+------+---+

+------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
|hadoop| 1|
| flink| 1|
+------+---+

3.5 电影数据分析

需求:

  1. 查询用户平均分
  2. 查询电影平均分
  3. 查询大于平均分的电影的数量
  4. 查询高分电影中(>3)打分次数最多的用户,并求出此人打的平均分
  5. 查询每个用户文档平均打分,最低打分,最高打分
  6. 查询被评分超过100次的电影,的平均分排名Top100

代码演示:12_movie_demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("12_movie_demo.py"). \
master("local[*]"). \
config("spark.sql.shuffle.partitions", 2). \
getOrCreate()
sc = spark.sparkContext

"""
spark.sql.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200个,
对于集群模式来说,200个默认也算比较合适
如果在local下运行,200个很多,在调度上会带来额外损耗
所以在local下建议修改比较低 比如2/4/10均可
这个参数和Spark RDD中设置并行度的参数 是相互独立的
"""

# 1. 读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)

df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("../data/input/sql/u.data")

# df.show()

# TODO 1:用户平均分的计算
# DSL实现
df.groupBy("user_id"). \
avg("rank"). \
withColumnRenamed("avg(rank)", "avg_rank"). \
withColumn("avg_rank", F.round("avg_rank", 2)). \
orderBy("avg_rank", ascending=False). \
show()
#
# # SQL实现
# df.createTempView("movie")
# spark.sql("""
# select user_id,round(avg(rank),2) as avg_rank from movie group by user_id order by avg_rank desc
# """).show()
print("*" * 200)

# TODO 2: 电影的平均分查询
df.createTempView("movie")
spark.sql("""
select movie_id,round(avg(rank),2) as avg_rank from movie group by movie_id order by avg_rank desc
""").show()
print("*" * 200)

# TODO 3: 查询大于平均分的电影的数量
# DSL实现
above_average_scores = df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count()
print("大于平均分电影数量数:", above_average_scores)

# # SQL实现
# df.createTempView("movie")
# spark.sql("""
# select count(user_id) as above_average_scores from movie
# where rank >(
# select avg(rank) as avg_rank from movie
# )
#
# """).show()
print("*" * 200)

# TODO 4: 查询高分电影中(>3)打分次数最多的用户,此人打分的平均分
# DSL实现
# 先找出这个人
user_id = df.where("rank>3"). \
groupBy("user_id"). \
count(). \
withColumnRenamed("count", "cnt"). \
orderBy("cnt", ascending=False). \
first()["user_id"]

print(user_id)
# 计算这个人的打分平均分
df.filter(df['user_id'] == user_id). \
select(F.round(F.avg("rank"), 2)).show()
#
# # SQL实现
# df.createTempView("movie")
# spark.sql("""
# select round(avg(rank),2) from movie where user_id = (
# select user_id from(
# select user_id,count(user_id) as cnt from movie where rank > 3 group by user_id order by cnt desc limit 1
# )
# )
# """).show()
print("*" * 200)

# TODO 5:查询每个用户的平均打分,最低打分,最高打分
# DSL实现
df.groupBy("user_id"). \
agg(
F.round(F.avg("rank"), 2).alias("avg_rank"),
F.min("rank").alias("min_rank"),
F.max("rank").alias("max_rank")
).show()
#
# # SQL实现
# df.createTempView("movie")
# spark.sql("""
# select user_id,round(avg(rank),2) as avg_rank,min(rank) as min_rank,max(rank) as max_rank from movie
# group by user_id order by user_id
# """).show() # 默认只显示20条
print("*" * 200)

# TODO 6: 查询评分超过100次的电影,的平均分 排名Top10

# DSL实现
df.groupBy("movie_id"). \
agg(
F.count("movie_id").alias("cnt"),
F.round(F.avg("rank"), 2).alias("avg_rank")
).where("cnt > 100"). \
orderBy("avg_rank", ascending=False). \
limit(10). \
show()

# # SQL实现
# df.createTempView("movie")
# spark.sql("""
# select movie_id,m_cnt,avg_rank from
# (select movie_id,count(movie_id) as m_cnt,round(avg(rank),2) as avg_rank from movie group by movie_id)
# where m_cnt > 100 order by avg_rank desc limit 10
# """).show()

time.sleep(100000)

"""
1. agg: 它是GroupData对象的API,作用是 在里面可以写多个聚合
2. alias: 它是Column对象的API,可以针对一个列 进行改名
3. withColumnRenamed:它是DataFrame的API,可以对DF中的列进行改名,一次改一个列,改多个列 可以链式调用
4. orderBy:DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True) 或降序 False
5. first:DataFrame的API,取出DF的第一行数据,返回结果是Row对象
# Row对象 就是一个数组,你可以通过row['列名']来取出当前行中,某一列的具体数值,
# 返回值不再是DF或者GroupData或者Column,而是具体的字符串或者数字
"""

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
+-------+--------+
|user_id|avg_rank|
+-------+--------+
| 849| 4.87|
| 688| 4.83|
| 507| 4.72|
| 628| 4.7|
| 928| 4.69|
| 118| 4.66|
| 907| 4.57|
| 686| 4.56|
| 427| 4.55|
| 565| 4.54|
| 469| 4.53|
| 850| 4.53|
| 225| 4.52|
| 330| 4.5|
| 477| 4.46|
| 242| 4.45|
| 636| 4.45|
| 583| 4.44|
| 252| 4.43|
| 767| 4.43|
+-------+--------+
only showing top 20 rows

********************************************************************************************************************************************************************************************************
+--------+--------+
|movie_id|avg_rank|
+--------+--------+
| 1500| 5.0|
| 1201| 5.0|
| 1189| 5.0|
| 1536| 5.0|
| 1293| 5.0|
| 1653| 5.0|
| 1599| 5.0|
| 1467| 5.0|
| 1122| 5.0|
| 814| 5.0|
| 1449| 4.63|
| 119| 4.5|
| 1398| 4.5|
| 1594| 4.5|
| 1642| 4.5|
| 408| 4.49|
| 318| 4.47|
| 169| 4.47|
| 483| 4.46|
| 64| 4.45|
+--------+--------+
only showing top 20 rows

********************************************************************************************************************************************************************************************************
大于平均分电影数量数: 55375
********************************************************************************************************************************************************************************************************
450
+-------------------+
|round(avg(rank), 2)|
+-------------------+
| 3.86|
+-------------------+

********************************************************************************************************************************************************************************************************
+-------+--------+--------+--------+
|user_id|avg_rank|min_rank|max_rank|
+-------+--------+--------+--------+
| 186| 3.41| 1| 5|
| 244| 3.65| 1| 5|
| 200| 4.03| 2| 5|
| 210| 4.06| 2| 5|
| 303| 3.37| 1| 5|
| 122| 3.98| 1| 5|
| 194| 2.96| 1| 5|
| 291| 3.69| 1| 5|
| 119| 3.95| 1| 5|
| 167| 3.38| 1| 5|
| 299| 3.46| 1| 5|
| 102| 2.62| 1| 4|
| 160| 3.92| 1| 5|
| 225| 4.52| 2| 5|
| 290| 3.35| 1| 5|
| 97| 4.16| 1| 5|
| 157| 3.78| 1| 5|
| 201| 3.03| 1| 5|
| 287| 4.11| 1| 5|
| 246| 2.93| 1| 5|
+-------+--------+--------+--------+
only showing top 20 rows

********************************************************************************************************************************************************************************************************
+--------+---+--------+
|movie_id|cnt|avg_rank|
+--------+---+--------+
| 408|112| 4.49|
| 169|118| 4.47|
| 318|298| 4.47|
| 483|243| 4.46|
| 64|283| 4.45|
| 603|209| 4.39|
| 12|267| 4.39|
| 50|583| 4.36|
| 178|125| 4.34|
| 427|219| 4.29|
+--------+---+--------+

3.3.6 SparkSQL Shuffle 分区数目

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。image-20230909151638832

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions) 为200,在实际项目中要合理的设置。

可以设置在:

  1. 配置文件: conf/spark-defaults.conf: spark.sql.shuffle.partitions 100

  2. 在客户端提交参数中:bin/spark-submit –conf “spark.sql.shuffle.partitions = 100”

  3. 在代码中可以设置:

    1
    2
    3
    4
    5
    spark = SparkSession.builder. \
    appName("12_movie_demo.py"). \
    master("local[*]"). \
    config("spark.sql.shuffle.partitions", 2). \
    getOrCreate()

3.3.7 SparkSQL 数据清洗API

前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

去重方法:dropDuplicates

功能:对DF的数据进行去重,如果重复数据有多条,取第一条

1
2
3
4
# 去重API dropDuplicates,无参数是对数据进行整体去重
df.dropDuplicates().show()
# API 同样可以针对字段进行去重,如下传入age字段,表示只要年龄一样 就认为你是重复数据
df.dropDuplicates(['age','job']).show()

删除有缺失值的行方法 :dropna

功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据

1
2
3
4
5
6
7
8
9
# 如果有缺失值,进行数据删除
# 无参数 为 how=any执行,只要有一个列是null 数据整行删除,如果填入how='all'表示全部列为空 才会删除,how参数默认是any
df.dropna().show()
# 指定阀值进行删除,tresh=3 表示,有效的列最少有3个,这行数据才保留
# 设定thresh后,how参数无效了
df.dropna(thresh=3).show()
# 可以指定阀值 以及配合指定列进行工作
# thresh=2,subset=['name','age']表示针对 这两个列,有效列最少为2个才保留数据
df.dropna(thresh=2,subset=['name','age']).show()

填充缺失值数据 fillna

功能:根据参数的规则,来进行null替换

1
2
3
4
5
6
# 将所有的空,按照你指定的值进行填充,不理会列 任何空都被填充
df.fillna("loss").show()
# 指定列进行填充
df.fillna("loss",subset=['job']).show()
# 给定字典 设定各个列的填充规则
df.fillna({"name":"未知姓名","age":1,"job":"worker"}).show()

完整代码演示:13_data_clear_api.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("13_data_clear_api.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

"""读取数据"""
df = spark.read.format("csv"). \
option("sep", ";"). \
option("header", True). \
load("../data/input/sql/people.csv")

# 数据清洗1:数据去重
# dropDuplicates 是DataFrame的API,可以完成数据去重
# 无参数使用,对全部的列 联合起来进行比较,去除重复值,只保留一条
df.dropDuplicates().show()
df.dropDuplicates(['age', 'job']).show() # 按指定列去重

# 数据清洗2:缺失值处理
# dropna api 是可以对缺失值数据进行删除
# 无参数使用,只要列中有null,就删除这一行数据
# df.dropna().show()
# # thresh = 3 表示,最少满足3个有效列,不满足则删除当前行数据
# df.dropna(thresh=3).show()
# df.dropna(thresh=2, subset=['name', 'age']).show()

# 缺失值处理也可以完成对缺失值进行填充
# DataFrame的fillna 对缺失值的列进行填充
# 全局填充
df.fillna("loss").show()
# # 指定列进行填充
# df.fillna("N/A", subset=['job']).show()
# # 设定一个字典,对所有的列提供填充规则
# df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()


输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Alice| 9| null|
| Ani| 11|Developer|
|Alice| 9| Manager|
|Alice|null| Manager|
| Lily| 11| Manager|
| Put| 11|Developer|
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+----+---------+

+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Alice|null| Manager|
| Ani| 11|Developer|
| Lily| 11| Manager|
|Jorge| 30|Developer|
| Bob| 32|Developer|
|Alice| 9| null|
|Alice| 9| Manager|
+-----+----+---------+

+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|loss| Manager|
|Alice| 9| loss|
+-----+----+---------+

3.3.8 DataFrame数据写出

统一API语法:

1
2
3
4
5
6
df.write.mode().format().option(K,V).save(PATH)
# mode,传入模式字符串可选:append追加,overwrite覆盖,ignore忽略,error重复就报异常(默认的)
# format,传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
# 注意text源只支持单列df写出
# option设置属性,如:.option("sep",",")
# save 写出的路径,支持本地文件和HDFS

完整代码演示:14_dataframe_write.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("14_dataframe_write.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# 1.读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)

df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("../data/input/sql/u.data")

# Write Text写出,只能写出一个列的数据,需要将df转换为单列df
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")). \
write. \
mode("overwrite"). \
format("text"). \
save("../data/output/sql/text")

# Write csv
df.write.mode("overwrite"). \
format("csv"). \
option("sep", ";"). \
option("header", True). \
save("../data/output/sql/csv")

# write json
df.write.mode("overwrite").\
format("json").\
save("../data/output/sql/json")

# write parquet
df.write.mode("overwrite").\
format("parquet").\
save("../data/output/sql/parquet")


3.3.9 DataFrame 通过JDBC读写数据库(MySQL示例)

读取JDBC是需要有驱动的,我们读取的是jdbc:mysql://这个协议,也就是读取的是mysql的数据

既然如此,就需要有msyql的驱动jar包给spark程序用

如果不给驱动jar包,会提示No suitable Driver

image-20230909194609652

对于windows系统(使用本地解释器)(以Anaconda环境演示)

将jar包放在:Anaconda3的安装路径下\envs\虚拟环境\Lib\site-packages\pyspark\jars

对于Linux系统(使用远程解释器执行)(以Anaconda环境演示)

将jar包放在:Anaconda3的安装路径下/envs/虚拟环境/lib/python3.8/site-packages/pyspark/jars

代码演示:15_dataframe_jdbc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("15_dataframe_jdbc.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# 1.读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)

df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("../data/input/sql/u.data")

# # # 1. 写出df到msyql数据库中
# df.write.mode("overwrite"). \
# format("jdbc"). \
# option("url", "jdbc:mysql://Tnode1:3306/bigdata?useSSL=false&useUnicode=true"). \
# option("dbtable", "movie_data"). \
# option("user", "root"). \
# option("password", "123456"). \
# save()

# 2. 通过jdbc从mysql中读取数据
df2 = spark.read.format("jdbc"). \
option("url", "jdbc:mysql://Tnode1:3306/bigdata?useSSl=false&useUnicode=true"). \
option("dbtable", "movie_data"). \
option("user", "root"). \
option("password", "123456"). \
load()
df2.printSchema()
df2.show()

"""
JDBC写出,会自动创建表的。
因为DadaFrame中有表结构信息,StructType记录的各个字段的名称类型和是否运行为空
"""

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
root
|-- user_id: string (nullable = true)
|-- movie_id: integer (nullable = true)
|-- rank: integer (nullable = true)
|-- ts: string (nullable = true)

+-------+--------+----+---------+
|user_id|movie_id|rank| ts|
+-------+--------+----+---------+
| 196| 242| 3|881250949|
| 186| 302| 3|891717742|
| 22| 377| 1|878887116|
| 244| 51| 2|880606923|
| 166| 346| 1|886397596|
| 298| 474| 4|884182806|
| 115| 265| 2|881171488|
| 253| 465| 5|891628467|
| 305| 451| 3|886324817|
| 6| 86| 3|883603013|
| 62| 257| 2|879372434|
| 286| 1014| 5|879781125|
| 200| 222| 5|876042340|
| 210| 40| 3|891035994|
| 224| 29| 3|888104457|
| 303| 785| 3|879485318|
| 122| 387| 5|879270459|
| 194| 274| 2|879539794|
| 291| 1042| 4|874834944|
| 234| 1184| 2|892079237|
+-------+--------+----+---------+
only showing top 20 rows

注意:

  • jdbc连接字符串中,建议使用 useSSL=false 确保连接可以正常连接 (不使用SSL安全协议进行连接)

  • jdbc连接字符串中,建议使用 useUnicode=true 来确保传输中不出现乱码

  • save()不要填参数,没有路径,是写出数据库

  • dbtable属性:指定写出的表名

  • 读出来的是自带schema,不需要设置schema,因为数据库就有schema

  • load()不需要加参数,没有路径,从数据库汇总读取的

  • dbtable:是指定读取的表名

3.3.10 总结

  1. DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据

  2. DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建

  3. spark.read.format()df.write.format() 是DataFrame读取和写出的统一化标准API

  4. SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能

  5. dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值

  6. SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作

3.4 SparkSQL函数定义

3.4.1 SparkSQL定义UDF函数

无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛

回顾Hive中自定义函数有三种类型:

  • 第一种:UDF(User-Defined-Function)函数

​ 一对一的关系,输入一个值,经过函数以后输出一个值;

​ 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

  • 第二种:UDAF(User-Defined Aggregation Function)聚合函数

​ 多对一的关系,输入个值输出一个值,通常与groupBy联合使用;

  • 第三种:UDTF(User-Defined Table-Generating Functions)函数

​ 一对多的关系,输入一个值输出多个值(一行变为多行)

​ 用户自定义生成函数,有点像flatMap;


目前来说Spark框架各个版本及各种语言对自定义函数的支持:

image-20231013153818332

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数。目前Python仅支持UDF。


UDF定义方式有两种:

  1. sparksession.udf.register()注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内给的名字用于SQL风格
  2. pyspark.sql.functions.udf,仅能用于DSL风格

代码案例:16_udf_define.py

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("16_udf_define.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# 构建一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])
df = rdd.toDF(["num"])


# TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可以使用
# UDF的处理方法
def num_multi_10(num):
return num * 10


# 参数1:注册的UDF名称,这个udf名称,仅可以用于SQL风格
# 参数2:UDF的处理逻辑,是一个单独的方法
# 参数3:声明UDF的返回值类型,注意:UDF注册时候,必须声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
# 返回值对象:这是一个UDF对象,仅可以用于DSL语法
# 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用于DSL风格
udf2 = spark.udf.register("udf1", num_multi_10, IntegerType())

# SQL风格中使用
# selectExpr 以SELECT的表达式执行,SQL风格的表达式(字符串)
# select方法,接收普通的字符串字段名,或者返回值是Column对象的计算
df.selectExpr("udf1(num)").show()

# DSL风格中使用
# 返回值UDF对象 如果作为方法使用,传入的参数一定是Column对象
df.select(udf2(df['num'])).show()

# TODO 2:方式2注册,仅能用于DSL风格
udf3 = F.udf(num_multi_10,IntegerType())
df.select(udf3(df['num'])).show()

代码运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+---------+

+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+---------+

+-----------------+
|num_multi_10(num)|
+-----------------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+-----------------+

注册一个返回值为ArrayType(数字\list)类型的UDF

代码案例:17_udf_return_array.py

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("17_udf_return_array.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# 构建一个RDD
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])


# 注册UDF,UDF的执行函数定义
def split_line(data):
return data.split(" ") # 返回值是一个Array对象


# TODO 1: 方式1 构建UDF
udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

# DSL风格
df.select(udf2(df['line'])).show()

# SQL风格
df.createTempView("lines")
spark.sql("SELECT udf1(line) FROM lines").show(truncate=False) # truncate=False不切割字符串,全部显示出来

# TODO 2:方式2的形式构建UDF
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
+--------------------+
| udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

+----------------------+
|udf1(line) |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

+----------------------+
|split_line(line) |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

注意:数组或者list类型,可以使用spark的ArrayType来描述即可

声明ArrayType要类似这样:ArrayType(StringType()),在ArrayType中传入数组内的数据类型


注册一个返回值为字典类型的UDF

代码案例:18_udf_return_dict.py

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding:utf8
import string

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("18_udf_return_dict.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

# 假设 有三个数字 1 2 3 ,我们传入数字,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
# 比如传入1 我们返回{"num":1,"letters":"a"}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])


# 注册UDF
def process(data):
return {"num": data, "letters": string.ascii_letters[data]}


"""
UDF的返回值是字典的话,需要用StructType来接收
"""
udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True). \
add("letters", StringType(), nullable=True)
)
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+

+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+

注意:字典类型返回值,可以用StructType来进行描述

StructType是一个普通的Spark支持的结构化类型

只是可以用在:

  • DF中用于描述Schema
  • UDF中用于描述返回值是字典的数据

用mapPartitions API 完成UDAF聚合

代码案例:19_udaf_by_rdd.py

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# coding:utf8
import string

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("19_udaf_by_rdd.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
df = rdd.map(lambda x: [x]).toDF(['num'])

# 折中的方式 就是使用RDD的mapPartitions 算子来完成聚合操作
# 如果用mapPartitions API 完成UDAF聚合,一定要单分区
single_partition_rdd = df.rdd.repartition(1)


# print(single_partition_rdd.collect())

def process(iter):
sum = 0
for row in iter:
sum += row['num']
return [sum] # 一定要嵌套list,因为mapPartitions方法要求的返回值是list对象


print(single_partition_rdd.mapPartitions(process).collect())

运行结果:

1
[15]

注意:

使用UDF两种方式的注册均可以。

唯一需要注意的就是:返回值类型一定要有合适的类型来声明

返回 int 可以用IntergerType

返回小数,可以用FloatType或者DoubleType

返回数组list 可以用ArrayType描述

返回字典 可以用StructType描述

…..

这些Spark内置的数据类型均存储在:

pyspark.sql.types包中。

3.4.2 SparkSQL使用函数窗口

开窗函数

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

窗口函数的语法:

image-20231110144853033

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# coding:utf8
import string

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType

if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder. \
appName("20_window_function.py"). \
master("local[*]"). \
getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([
('张1', 'class_1', 99),
('张2', 'class_2', 35),
('张3', 'class_3', 57),
('张4', 'class_4', 12),
('张5', 'class_5', 99),
('张6', 'class_1', 90),
('张7', 'class_2', 91),
('张8', 'class_3', 33),
('张9', 'class_4', 55),
('张10', 'class_5', 66),
('张11', 'class_1', 11),
('张12', 'class_2', 33),
('张13', 'class_3', 36),
('张14', 'class_4', 79),
('张15', 'class_5', 90),
('张16', 'class_1', 90),
('张17', 'class_2', 90),
('张18', 'class_3', 11),
('张19', 'class_1', 11),
('张20', 'class_2', 3),
('张21', 'class_3', 99),
])
schema = StructType().add("name", StringType()). \
add("class", StringType()). \
add("score", IntegerType())

df = rdd.toDF(schema)
df.createTempView("stu")

# TODO 聚合窗口函数的演示
spark.sql("""
select *,avg(score) over() as avg_score from stu
""").show()

# TODO 排序相关的窗口函数计算
# RANK over,DENSE_RANK over ROW_NUMBER over
spark.sql("""
SELECT *,ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()

# TODO NTILE
spark.sql("""
select * ,NTILE(6) OVER(ORDER BY score desc) AS NTILE_result from stu
""").show()



输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
+----+-------+-----+------------------+
|name| class|score| avg_score|
+----+-------+-----+------------------+
| 张1|class_1| 99|56.666666666666664|
| 张2|class_2| 35|56.666666666666664|
| 张3|class_3| 57|56.666666666666664|
| 张4|class_4| 12|56.666666666666664|
| 张5|class_5| 99|56.666666666666664|
| 张6|class_1| 90|56.666666666666664|
| 张7|class_2| 91|56.666666666666664|
| 张8|class_3| 33|56.666666666666664|
| 张9|class_4| 55|56.666666666666664|
|张10|class_5| 66|56.666666666666664|
|张11|class_1| 11|56.666666666666664|
|张12|class_2| 33|56.666666666666664|
|张13|class_3| 36|56.666666666666664|
|张14|class_4| 79|56.666666666666664|
|张15|class_5| 90|56.666666666666664|
|张16|class_1| 90|56.666666666666664|
|张17|class_2| 90|56.666666666666664|
|张18|class_3| 11|56.666666666666664|
|张19|class_1| 11|56.666666666666664|
|张20|class_2| 3|56.666666666666664|
+----+-------+-----+------------------+
only showing top 20 rows


+----+-------+-----+---------------+----------+----+
|name| class|score|row_number_rank|dense_rank|rank|
+----+-------+-----+---------------+----------+----+
| 张1|class_1| 99| 1| 1| 19|
| 张6|class_1| 90| 5| 2| 14|
|张16|class_1| 90| 7| 2| 14|
|张11|class_1| 11| 18| 3| 2|
|张19|class_1| 11| 20| 3| 2|
| 张7|class_2| 91| 4| 1| 18|
|张17|class_2| 90| 8| 2| 14|
| 张2|class_2| 35| 14| 3| 8|
|张12|class_2| 33| 16| 4| 6|
|张20|class_2| 3| 21| 5| 1|
|张21|class_3| 99| 3| 1| 19|
| 张3|class_3| 57| 11| 2| 11|
|张13|class_3| 36| 13| 3| 9|
| 张8|class_3| 33| 15| 4| 6|
|张18|class_3| 11| 19| 5| 2|
|张14|class_4| 79| 9| 1| 13|
| 张9|class_4| 55| 12| 2| 10|
| 张4|class_4| 12| 17| 3| 5|
| 张5|class_5| 99| 2| 1| 19|
|张15|class_5| 90| 6| 2| 14|
+----+-------+-----+---------------+----------+----+
only showing top 20 rows

+----+-------+-----+------------+
|name| class|score|NTILE_result|
+----+-------+-----+------------+
| 张1|class_1| 99| 1|
| 张5|class_5| 99| 1|
|张21|class_3| 99| 1|
| 张7|class_2| 91| 1|
| 张6|class_1| 90| 2|
|张15|class_5| 90| 2|
|张16|class_1| 90| 2|
|张17|class_2| 90| 2|
|张14|class_4| 79| 3|
|张10|class_5| 66| 3|
| 张3|class_3| 57| 3|
| 张9|class_4| 55| 3|
|张13|class_3| 36| 4|
| 张2|class_2| 35| 4|
| 张8|class_3| 33| 4|
|张12|class_2| 33| 5|
| 张4|class_4| 12| 5|
|张11|class_1| 11| 5|
|张18|class_3| 11| 6|
|张19|class_1| 11| 6|
+----+-------+-----+------------+

3.4.3 总结

  1. SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
  2. UDF定义支持2种方式,1:使用SparkSession对象构建。2:使用functions包中提供的UDF API构建。要注意,方式1可用DSL和SQL风格,方式2可用于DSL风格。
  3. SparkSQL支持窗口函数使用,常用SQL中的窗口函数均支持,如聚合窗口、排序窗口、NTILE分组窗口等。