# PySpark 介绍
# 定义
- Spark 对 Python 语言的支持,重点体现在,PySpark 库上
- PySpark 作为 Python 库进行数据处理
- 提交至 Spark 集群进行分布式集群 1 计算
# 使用
| |
| from pyspark import SparkConf, SparkContext |
| |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") |
| |
| sc = SparkContext(conf=conf) |
| |
| print(sc.version) |
| |
| sc.stop() |
# PySpark 的编程模型
- SparkContext 类对象,是 PySpark 编程中一切功能的入口
- PySpark 编程,主要分为三大步骤
- 数据输入:通过 SparkContext 类对象的成员方法,完成数据读取操作,读取后得到 RDD 类对象
- 数据处理计算:通过 RDD 类对象的成员方法,完成各类数据计算的需求
- 数据输出:将处理完的 RDD 对象,调用各种成员方法完成写出文件、转换为 list 等操作
![image-20230427194111746]()
# 数据输入
# RDD 对象
# 介绍
- RDD 全称:弹性分布式数据集(Resilient Distributed DataSets)
- PySpark 针对数据处理,都是以 RDD 对象作为载体
- 数据存储在 RDD 内
- 各类数据的计算方法,也都是以 RDD 的成员方法
- RDD 的数据计算方法,返回值依然是 RDD 对象
![image-20230427194603214]()
# Python 数据容器转换 RDD 对象
PySpark 支持通过 SparkContext 对象的 parallelize 成员方法,将 list,tuple,set,dict,str 转换为 PySpark 的 RDD 对象
| from pyspark import SparkConf, SparkContext |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(数据容器对象) |
| |
| print(rdd.collect()) |
- 字符串会被拆分出 1 个个字符,存储 RDD 对象
- 字典仅有 key 会被存入 RDD 对象
# 读取文件转 RDD 对象
PySpark 也支持通过 SparkContext 入口对象,来读取文件,来构建出 RDD 对象
| from pyspark import SparkConf, SparkContext |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") |
| sc = SparkContext(conf=conf) |
| rdd = sc.textFile(文件路径) |
| |
| print(rdd.collect()) |
# 数据计算
# map 方法
# map 算子
功能:map 算子,是将 RDD 的数据一条条处理(处理逻辑,基于 map 算子中接收的处理函数),返回新的 RDD
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| list1 = [1,2,3] |
| rdd = sc.parallelize(list1) |
| rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5) |
| print(rdd2.collect()) |
| sc.stop() |
# flatMap 方法
# flatMap 算子
功能:对 rdd 执行 map 操作,然后执行解除嵌套操作
| |
| lst = [[1,2,3], [4,5,6]] |
| |
| lst = [1,2,3,4,5,6] |
# 代码
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| list1 = ["baozi 666", "baobao 521"] |
| rdd = sc.parallelize(list1) |
| rdd2 = rdd.flatMap(lambda x: x.split(" ")) |
| print(rdd2.collect()) |
| sc.stop() |
# reeduceByKey 算法
# reduceByKey 算子
功能:针对 KV 型 RDD,自动按照 key 分组,然后根据你提供的聚合逻辑,完成 == 组内数据 (value)== 的聚合操作
# 代码
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)]) |
| rdd2 = rdd.reduceByKey(lambda a, b: a + b) |
| print(rdd2.collect()) |
| |
| sc.stop() |
# filter 方法
# 介绍
功能:过滤想要的数据进行保留
# 代码
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize([1, 2, 3, 4, 5]) |
| rdd2 = rdd.filter(lambda x: x % 2 == 0) |
| print(rdd2.collect()) |
| sc.stop() |
# distinct 方法
# distinct 算子
功能:对 RDD 数据进行去重,返回新的 rdd
# 代码
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize([1, 2, 1, 4, 3]) |
| rdd2 = rdd.distinct() |
| print(rdd2.collect()) |
| sc.stop() |
# sortBy 方法
# sortBy 算子
功能:对 RDD 数据进行排序,基于你指定的排序依据
| rdd.sortBy(func, ascending=False, numPartion=1) |
| |
| |
| |
# 代码
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize([1, 2, 1, 4, 3]) |
| rdd2 = rdd.sortBy(lambda x: x,ascending=False, numPartitions=1) |
| print(rdd2.collect()) |
| sc.stop() |
# 数据输出
# 输出为 Python 对象
# collect 算子
功能:将 RDD 各个分区内数据,统一收集到 Driver 中,形成一个 List 对象
# reduce 算子
功能:对 RDD 数据集按照传入的逻辑进行聚合
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(range(1, 10)) |
| num = rdd.reduce(lambda a, b: a + b) |
| print(num) |
| sc.stop() |
# take 算子
功能:取 RDD 的前 N 个元素,组合成 list 返回给你
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(range(1, 10)) |
| take_list = rdd.take(5) |
| print(take_list) |
| sc.stop() |
# count 算子
功能:返回 RDD 中有多少个元素
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(range(1, 10)) |
| count = rdd.count() |
| print(count) |
| sc.stop() |
# 输出到文件中
# saveTextFile 算子
功能:将 RDD 的数据写入文本文件中,默认 16 个分区
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| os.environ['HADOOP_HOME'] = 'D:/python/hadoop/hadoop-3.0.0' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| conf.set("spark.default.parallelism", "1") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(range(1, 10)) |
| rdd.saveAsTextFile("./123") |
| sc.stop() |
| from pyspark import SparkConf, SparkContext |
| import os |
| os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe' |
| os.environ['HADOOP_HOME'] = 'D:/python/hadoop/hadoop-3.0.0' |
| conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name") |
| sc = SparkContext(conf=conf) |
| rdd = sc.parallelize(range(1, 10), 1) |
| rdd.saveAsTextFile("./123") |
| sc.stop() |