# PySpark 介绍

# 定义

  1. Spark 对 Python 语言的支持,重点体现在,PySpark 库上
  2. PySpark 作为 Python 库进行数据处理
  3. 提交至 Spark 集群进行分布式集群 1 计算

# 使用

# 导包
from pyspark import SparkConf, SparkContext
# 创建 SparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于 SparkConf 类对象创建 SparkContext 对象
sc = SparkContext(conf=conf)
# 打印 PySpark 的运行版本
print(sc.version)
# 停止 SparkContext 对象的运行 (停止 PySpark 程序)
sc.stop()

# PySpark 的编程模型

  1. SparkContext 类对象,是 PySpark 编程中一切功能的入口
  2. PySpark 编程,主要分为三大步骤
    1. 数据输入:通过 SparkContext 类对象的成员方法,完成数据读取操作,读取后得到 RDD 类对象
    2. 数据处理计算:通过 RDD 类对象的成员方法,完成各类数据计算的需求
    3. 数据输出:将处理完的 RDD 对象,调用各种成员方法完成写出文件、转换为 list 等操作

image-20230427194111746

# 数据输入

# RDD 对象

# 介绍

  1. RDD 全称:弹性分布式数据集(Resilient Distributed DataSets)
  2. PySpark 针对数据处理,都是以 RDD 对象作为载体
    1. 数据存储在 RDD 内
    2. 各类数据的计算方法,也都是以 RDD 的成员方法
    3. RDD 的数据计算方法,返回值依然是 RDD 对象

image-20230427194603214

# Python 数据容器转换 RDD 对象

PySpark 支持通过 SparkContext 对象的 parallelize 成员方法,将 list,tuple,set,dict,str 转换为 PySpark 的 RDD 对象

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(数据容器对象)
# 输出 RDD 的内容
print(rdd.collect())
  1. 字符串会被拆分出 1 个个字符,存储 RDD 对象
  2. 字典仅有 key 会被存入 RDD 对象

# 读取文件转 RDD 对象

PySpark 也支持通过 SparkContext 入口对象,来读取文件,来构建出 RDD 对象

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile(文件路径)
# 输出 RDD 的内容
print(rdd.collect())

# 数据计算

# map 方法

# map 算子

功能:map 算子,是将 RDD 的数据一条条处理(处理逻辑,基于 map 算子中接收的处理函数),返回新的 RDD

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
list1 = [1,2,3]
rdd = sc.parallelize(list1)
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
sc.stop()

# flatMap 方法

# flatMap 算子

功能:对 rdd 执行 map 操作,然后执行解除嵌套操作

# 嵌套 list
lst = [[1,2,3], [4,5,6]]
# 解除嵌套
lst = [1,2,3,4,5,6]

# 代码

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
list1 = ["baozi 666", "baobao 521"]
rdd = sc.parallelize(list1)
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
sc.stop()

# reeduceByKey 算法

# reduceByKey 算子

功能:针对 KV 型 RDD,自动按照 key 分组,然后根据你提供的聚合逻辑,完成 == 组内数据 (value)== 的聚合操作

rdd.reduceByKey(func)
# func:(V, V) -> V
# 接受 2 个参数(类型要一致),返回一个返回值,类型和传入要求一致

# 代码

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1)])
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
# 输出 [('b', 2), ('a', 3)]
sc.stop()

# filter 方法

# 介绍

功能:过滤想要的数据进行保留

rdd.filter(func)

# 代码

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x: x % 2 == 0)
print(rdd2.collect())
sc.stop()

# distinct 方法

# distinct 算子

功能:对 RDD 数据进行去重,返回新的 rdd

rdd.distinct()

# 代码

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 1, 4, 3])
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()

# sortBy 方法

# sortBy 算子

功能:对 RDD 数据进行排序,基于你指定的排序依据

rdd.sortBy(func, ascending=False, numPartion=1)
# func: (T) -> U 告知按照 rdd 中哪个数据进行排序,比如 lambda x: x [1] 表示按照 rdd 中的第二列元素进行排序
# ascending True 升序 False 降序
# numPartitions:用多少分区排序

# 代码

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 1, 4, 3])
rdd2 = rdd.sortBy(lambda x: x,ascending=False, numPartitions=1)
print(rdd2.collect())
sc.stop()

# 数据输出

# 输出为 Python 对象

# collect 算子

功能:将 RDD 各个分区内数据,统一收集到 Driver 中,形成一个 List 对象

rdd.collect()	# 返回值是一个 list

# reduce 算子

功能:对 RDD 数据集按照传入的逻辑进行聚合

rdd.reduce(func)
# func: (T, T) -> T
# 2 参数传入 1 个返回值,返回值和参数需求类型不一致
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()

# take 算子

功能:取 RDD 的前 N 个元素,组合成 list 返回给你

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
take_list = rdd.take(5)
print(take_list)
sc.stop()

# count 算子

功能:返回 RDD 中有多少个元素

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
count = rdd.count()
print(count)
sc.stop()

# 输出到文件中

# saveTextFile 算子

功能:将 RDD 的数据写入文本文件中,默认 16 个分区

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
os.environ['HADOOP_HOME'] = 'D:/python/hadoop/hadoop-3.0.0'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
conf.set("spark.default.parallelism", "1")	# 设置分区只有 1 个
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10))
rdd.saveAsTextFile("./123")
sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/86178/AppData/Local/Programs/Python/Python310/python.exe'
os.environ['HADOOP_HOME'] = 'D:/python/hadoop/hadoop-3.0.0'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_name")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1, 10), 1)	# 第二个参数设置分区只有 1 个
rdd.saveAsTextFile("./123")
sc.stop()
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Baozi 微信支付

微信支付

Baozi 支付宝

支付宝

Baozi 微信

微信