
一、Spark架构与原理
1、Spark的特点
· 速度快:由于采用了内存计算和DAG(Directed Acyclic Graph, 有向无环图)调度引擎,Spark比Hadoop MapReduce快几千倍
· 易用性:提供了丰富的API接口,支持Scala、Java、Python和R等多种编程语言。
· 通用性:不仅支持批处理任务,还支持流处理、交互式查询和机器学习等多种计算任务。
· 高容错性:利用数据冗余和RDD(Resilient Distributed Dataset,弹性分布式数据集)确保数据在计算过程中的容错能力。
2、Spark与Hadoop的区别
· 计算模型:Hadoop使用的是MapReduce编程模型,而Spark使用的是RDD模型,支持更多的操作和更灵活的数据处理方式。
· 内存计算:Spark的内存计算能力使其在处理迭代和交互式查询时比Hadoop快得多,而Hadoop主要依赖磁盘I/O。
· 生态系统:Spark包含多个内置的模块,如Spark SQL、Spark Streaming、MLlib等,而Hadoop需要依赖其他组件(如Hive、Pig、Mahout)来实现类似功能。
· 易用性:Spark提供了更高级的API和丰富的编程接口,简化了大数据处理的开发工作。
3、Spark生态系统组件及基本用途
Spark Core:核心计算引擎,负责基本的任务调度、内存管理、错误恢复、交互等功能。
· Spark SQL:处理结构化数据的模块,提供了对SQL查询的支持,可以将SQL查询与Spark程序无缝集成。
· Spark Streaming:处理实时数据流的模块,支持从多种数据源(如Kafka、Flume、Kinesis等)读取数据,并进行实时处理。
· MLlib:机器学习库,提供了各种机器学习算法的实现,如分类、回归、聚类、协同过滤等。
· GraphX:图计算引擎,支持对图数据进行并行计算,提供了图算法库(如PageRank、Connected Components等)。
· SparkR:支持R语言的模块,可以使用R语言编写Spark应用程序,适合统计分析和数据科学家使用。
4、Spark的应用场景
· 数据批处理:适用于大规模数据的批量处理,如日志分析、ETL(Extract, Transform, Load)任务等。
· 实时数据处理:可以用于实时数据流处理,如实时监控、实时数据分析、欺诈检测等。
· 机器学习:通过MLlib可以进行大规模数据的机器学习模型训练和预测,如推荐系统、分类、回归分析等。
· 图计算:利用GraphX进行社交网络分析、推荐系统中的图计算等。
· 交互式数据分析:支持使用Spark SQL进行交互式查询,适合数据探索和临时查询任务。
5、Spark架构,各节点职责
Driver(驱动程序):负责任务的调度和协调,向集群中的各个Executor分发任务,并跟踪任务的执行情况。Driver还负责将用户编写的程序转换为DAG(有向无环图),并进一步将其分解为具体的任务。
Cluster Manager(集群管理器):在集群上获取资源的外部服务(例如: Standalone, Mesos, Yarn
Executor(执行器):执行者。是Application运行在worker node上的一个进程,该进程负责运行Task,并且负责讲数据存在内存或者磁盘上。每个Application会申请各自的Executors来处理任务
Application:Spark的应用程序,包含一个Driver program和若干个Executor. SparkContext:Spark应用程序的入口,负责调度各个运算资源,协调各个WorkerNode 上的Executor.
Driver Program:驱动。运行Application的main()函数并且创建SparkContext
Worker Node:集群中任何可以运行Application代码的节点,运行一个或多个Executor进程 Task:任务。运行在Executor上的工作单元 Job:作业。SparkContext提交的具体Action操作,常和Action对应
Stage:阶段。每个Job会被拆分很多组任务(task),每组任务被称为stage,也称为TaskSet DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler TaskScheduler:将Taskset提交给Worker node集群运行并返回结果 Transformations/Action: Spark API的两种类型: Transformation返回值还是一个RDD ; Action返回值不是一个RDD ,而是一个集合。所有的 Transformation都是采用的懒策略,如果只是讲 Transformation提交是不会执行计算的。计算只有Action被提 交才会被触发
6、Spark作业运行流程
1.启动SparkContext
2.注册申请资源
3.分配资源,然后启动Executor
4.ExecutorSparkContext注册
5.分配并提交任务
6·注销释放资源
提交作业:用户通过SparkContext提交应用程序,Driver启动并解析用户代码。
生成DAG:Driver将用户代码转换为DAG,表示计算逻辑的有向无环图。
任务划分:DAG进一步分解为多个Stage(阶段),每个Stage由一组并行执行的Task(任务)组成。
资源调度:Driver向Cluster Manager请求资源,获取计算资源后,启动Executor进程。
任务分发:Driver将任务分发到各个Executor上,并根据DAG的依赖关系执行。
任务执行:Executor执行任务,将计算结果存储在内存或磁盘中,并将结果返回给Driver。
结果收集:Driver收集各个Executor返回的结果,完成计算并将最终结果返回给用户。
二、Spark RDD
1、RDD的概念、内容、运行流程
概念
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心抽象(基本数据抽象),RDD是只读的、分区记录的集合,RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建.
弹性
- RDD可以在内存和磁盘之间手动或自动切换
- RDD可以通过转换成为其他的RDD
- RDD可以存储任意类型的数据
内容
- 分区:RDD由多个分区(Partition)组成,每个分区代表数据的一个子集。
- 依赖关系:记录了RDD之间的转换关系,用于容错和重计算。
- 计算函数:定义了对数据的具体操作(如map、filter)。
- 分区器:定义了数据如何在集群中分布(对于Key-Value类型的RDD)。
- 首选位置:表示每个分区首选在哪些节点上计算,优化数据局部性。
- 初代RDD:存储的只是真实数据的分区信息,还有就是针对单个分区的读取方法,血统的顶层 子代RDD:存储初代RDD到底干了什么才会产生自己,还有就是初代RDD的引用,血统的下层
运行流程
- RDD创建:从外部数据源(如HDFS、HBase、S3等)或现有RDD转换生成新RDD。
- RDD转换:通过各种转换算子(如map、filter、join等)定义数据操作逻辑。
- RDD行动:通过行动算子(如collect、count、saveAsTextFile等)触发实际计算。
- 任务划分:Spark将RDD的操作逻辑划分为多个任务,每个任务对应于一个RDD分区。
- 任务调度与执行:Driver将任务分发到集群中的Executor上并行执行,Executor将计算结果返回给Driver。数据读取发生在什么时候? 发生在运行的Task中,即数据是在任务分发的executor上运行的时候读取的
运行模式
1,在mesos或者yarn集群管理器上部署运行 2,在standalone和local(没有集群)的模式上部署运行。
RDD的五个主要属性
分区信息(Partition):数据集的基本组成单位
Compute函数:对于给定的数据集,需要做哪些计算
Partitioner函数:对于计算出来的数据结果如何分发
优先位置列表:对于datapartition的位置偏好
依赖关系:RDD的依赖关系,描述了RDD之间的Lineage
2、RDD的创建方式
从集合创建:sc.parallelize([1, 2, 3, 4, 5])
从外部数据源创建:sc.textFile("hdfs://path/to/file")
从现有RDD转换:通过对已有RDD应用转换算子创建新RDD,例如rdd.map(x => x * 2)
3、转换算子与执行算子区分
转换算子(Transformation):对RDD进行转换操作,生成新的RDD,如map
、filter
、flatMap
等。转换算子是惰性的(Lazy),只有在行动算子触发时才实际执行。
执行算子(Action):对RDD进行操作,触发实际计算并返回结果,如collect
、count
、saveAsTextFile
等。
4、常用转换算子及含义、用法(一维类、key-value类)
一维类
- map:对RDD的每个元素应用一个函数,返回新的RDD。rdd2=sc.parallelize([‘a’,’b’])
rdd2 = rdd1.map(lambda x :(x,1)).collect()
[(‘a’,1),(‘b’,1)] - filter:过滤RDD中的元素,返回满足条件的元素组成的新RDD。>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4] - flatMap:将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD>>> rdd = sc.parallelize([2,3,4])
>>> rdd2 = rdd.map(lambda x: range(1,x))
>>> rdd2.collect()
[[1], [1, 2], [1, 2, 3]]
>>> rdd1 = rdd.flatMap(lambda x: range(1,x))
>>> rdd1.collect()
[1, 1, 2, 1, 2, 3] - distinct():对RDD算子中的元素进行去重rdd1.distinct()
- union():合并两个RDD,结果中包含两个RDD中的所有元素rdd1.union(rdd2)
- sortBy:给rdd算子排序.sortBy(lambda x 🙂
- mapPartitions:它的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7] - intersection:返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3] - subtract(other):返回在RDD1中出现,但是不在RDD2中出现的元素,不去重rdd1= sc.parallelize([(“a”,1), (“b”,4), (“b”,5), (“a”,3)])
rdd2= sc.parallelize([(“a”,3), (“c”,None)])
rdd1.subtract(rdd2).collect()
[(‘a’, 1), (‘b’, 4), (‘b’, 5)] - reduce()通过指定的聚合方法来对RDD中元素进行聚合>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
Key-Value类
- reduceByKey:对相同Key的元素进行聚合操作,返回新的RDD。>>> from operator import add
>>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[(‘a’, 2), (‘b’, 1)] - groupByKey:将具有相同Key的元素分组,返回Key和对应的元素集合组成的新RDD。>>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect()
[(‘a’, 2), (‘b’, 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[(‘a’, [1, 1]), (‘b’, [1])] - join:对两个RDD进行内连接操作,返回Key相同的元素对。>>> x = sc.parallelize([(“a”, 1), (“b”, 4)])
>>> y = sc.parallelize([(“a”, 2), (“a”, 3)])
>>> x.join(y).collect()
[(‘a’, (1, 2)), (‘a’, (1, 3))] - sortBy:根据指定的Key进行排序>>> tmp = [(‘a’,1), (‘b’,2), (‘1’,3), (‘d’,4), (‘2’, 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect)
[(‘1’, 3), (‘2’, 5), (‘a’, 1), (‘b’, 2), (‘d’, 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[(‘a’, 1), (‘b’, 2), (‘1’, 3), (‘d’, 4), (‘2’, 5)] - mapValues(f)针对(Key, Value)型数据中的Value 进行Map操作,而不对Key进行处理>>> rdd = sc.parallelize([(“a”, [“apple”, “banana”, “lemon”]), (“b”, [“grapes”]),(“a”, [“apple”,
“banana”])])
>>> def f(x): return len(x)
>>> rdd.mapValues(f).collect()
[(‘a’, 3), (‘b’, 1) , (‘a’, 2)] - flatMapValues(f):完成mapValues处理后,再对结果进行扁平化处理>>> rdd = sc.parallelize([(“a”,[“x”,”y”]),(“b”,[“p”, “r”])])
>>> def f(x): return x
>>> rdd.flatMapValues(f).collect)
[(‘a’, ‘x’), (‘a’, ‘y’), (‘b’, ‘p’), (‘b’, ‘r’)] - sortByKey:根据key值进行排序,默认升序>>> tmp = [(‘a’, 1), (‘B’, 2), (‘1’, 3), (‘d’, 4)]
>>> sc.parallelize(tmp).sortByKey0
[(‘1’, 3), (‘B’, 2), (‘a’, 1), (‘d’, 4)]
>>> sc.parallelize(tmp).sortByKey(True, None, keyfunc=lambda k: k.lower()).collect()
[(‘1’, 3), (‘a’, 1), (‘B’, 2), (‘d’, 4)] - leftOuterJoin(rdd):左外连接,与SQL中的左外连接一致>>> x = sc.parallelize([(“a”, 1) , (“b”, 4)])
>>> y = sc.parallelize([(“a”, 2)])
>>> x.leftOuterJoin(y).collect()
[(‘a’, (1, 2)) , (‘b’, (4, None))] - take(num)从RDD中返回前num个元素的列表>>> sc.parallelize([4, 6, 8, 2, 9]).take(2)
[4, 6]
>>> sc.parallelize([4, 6, 8, 2, 9]).take(10)
[4, 6, 8, 2, 9] - takeOrdered(num):从RDD中返回前num个最小的元素的列表,结果默认升序排列>>> sc.parallelize([4, 6, 8, 2, 9]).takeOrdered(2)
[2, 4]
>>> sc.parallelize([4, 6, 8, 2, 9]).takeOrdered(10)
[2, 4, 6, 8, 9] - top从RDD中返回最大的前num个元素列表,结果默认降序排列 如果key参数有值,则先对各元素进行对应处理 注:会把所以数据都加载到内存,所以该方法只有在数据很小时使用>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
[12]
>>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
[6, 5]
>>> sc.parallelize([10, 4, 2, 12, 3]).top(4, key=str)
[4, 3, 2, 12] - foreach(f):遍历RDD的每个元素,并执行f函数操作,无返回值>>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) - foreachPartiton(f):对每个分区执行f函数操作,无返回值>>> def f(iterator):
s=sum(iterator)
print(s)
>>> sc.parallelize([1,2,3,4,5],3).foreachPartition(f)
1
9
5 - saveAsTextFile将RDD中的元素以字符串的格式存储在文件系统中>>> rdd = sc.parallelize([‘foo’, ‘bar’],2)
>>> rdd.saveAsTextFile(‘/home/ua15/sparkRdd_result’)
>>> rdd.saveAsTextFile(‘hdfs://xdata-m0:8020/user/ua15/spark-data’) - collectAsMap()以字典形式,返回PairRDD中的键值对如果key重复,则后面的value覆盖前面的>>> rdd = sc.parallelize([(1, 2), (3, 4)])
>>> rdd.collectAsMap()
{1: 2, 3: 4}
>>> rdd = sc.parallelize([(1, 2), (3, 4),(1,4)])
{1: 4, 3: 4} - countByKey ()以字典形式,返回PairRDD中key值出现的次数>>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
>>> rdd.countByKey()
[(‘a’, 2), (‘b’, 1)]
5、常用执行算子及含义、用法
collect:将RDD的所有元素拉取到驱动程序中。
val result = rdd.collect()
count:计算RDD中的元素数量。
val num = rdd.count()
saveAsTextFile:将RDD保存为文本文件。
rdd.saveAsTextFile("hdfs://path/to/output")
6、两种共享变量的含义和作用
广播变量(Broadcast Variables):一个全局共享变量,可以广播只读变量。
!!!注意事项!!! 1、能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 2、广播变量只能在Driver端定义,不能在Executor端定义。 3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。 4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有 多少Driver端的变量副本。 5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份 Driver端的变量副本。
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([O, O]).flatMap(lambda x:
b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
累加器(Accumulators):用于在执行过程中对某个值进行聚合(如计数、求和等)。
!!!注意事项!!! 1、累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值, 在Excutor端更新。 2、累加器不是一个调优的操作,因为如果不这样做,结果是错的
counter = 0
def increment(x):
global counter
counter + = x
rdd= sc.parallelize(range(10))
rdd.foreach(increment)
print("Spark Counter value: ", counter)
7、宽依赖、窄依赖
窄依赖(Narrow Dependency):是指每个父RDD的一个Partition最多被子RDD的一个Partition所用(一对一)。例如map
、filter
、union。
宽依赖(Wide Dependency):·是指一个父RDD的Partition会被多个子RDD的Partition所使用(多对多)。例如reduceByKey
、groupByKey
、sortByKey。
8、stage划分方法
基于宽依赖进行划分,每遇到一个宽依赖操作,就会触发新的Stage划分。窄依赖操作在同一个Stage中处理。
9、RDD持久化方式
内存持久化:将RDD数据存储在内存中(MEMORY_ONLY
(将RDD以JAVA对象的形式保存到JVM内存如果分片太大,内存缓存不下,就不缓存)、MEMORY_AND_DISK
(将RDD数据集以JAVA对象的形保存到JVM内存中如果分片太大不能保存到内存中,则保存到磁盘上,并在下次用时重新从磁盘读取。))。
磁盘持久化:将RDD数据存储在磁盘中(DISK_ONLY
)。
序列化持久化:以序列化形式存储RDD数据,减少内存占用(MEMORY_ONLY_SER
(将RDD以序列化的JAVA对象形式保存到内存)、MEMORY_AND_DISK_SER
(与MEMORY_ONLY_SER类似,但当分片太大,不能保存到内存中,会将其保存到磁盘中))。
三、Spark SQL
1、Spark SQL的作用和特点
作用:用于处理结构化数据,通过提供SQL查询接口,使数据处理更容易和直观。支持与Spark其他组件无缝集成。
特点:
- 与Hive兼容,可以直接查询Hive表。
- 支持JDBC和ODBC,方便与其他数据工具集成。
- 提供了丰富的SQL函数和优化引擎。
2、创建DataFrame的方式
从RDD创建:通过隐式转换将RDD转换为DataFrame。
from pyspark.sql import Sparksession
sqlContext=SparkSession.builder.getOrCreate()
from pyspark.salimport Row
sale Rows=satesRDD.map(Lambda p:
Row(
PSPTLINE p[0].
RPONESSLINE1p[15].
PRICEEANENUMBER p[31.
STATUS-B[9]
POSTALCODE p[19]
CONTACTLASTNAME=p[221]
CONTACTFIRSTNAME=p[23]
)
)
sale_Rows.take(5)
sale_df=colContext createnataFrame(sale_Rows)
sale_df.printSchema()
使用Spark SQL
sale_df.registerTempTable("sale_table")
sqlContext.sql(" select count(*) counts from sale_table").show()
从数据源创建:读取外部数据源创建DataFrame。
val df = spark.read.json("path/to/json")
直接创建:通过编程接口直接创建DataFrame。
val df = spark.createDataFrame(Seq((1, "a"), (2, "b"))).toDF("id", "value")
3、DataFrame的常用操作函数
使用DataFrames增加计算字段
sale_df.select("ORDERNUMBER","PRODUCTCODE",(2018 - sale_df.YEAR_ID)).show(5)
使用Spark SQL增加计算字段
sqlContext.sql(" select ORDERNUMBER, PRODUCTCODE, (2018 - YEAR_ID) from sale_table").show(5)
filter:筛选数据。
sale_df.filter("YEAR_ID='2003'").show(5)
groupBy:分组聚合。
sale_df.select("PRODUCTCODE").groupby("PRODUCTCoDE").count().show()
sqlContext.sql(" select PRODUCTCODE, count(*) counts from sale_table group by PRODUCTCODE").show()#sql语句
join:连接两个DataFrame。
val df3 = df1.join(df2, "column1")
distince:数据去重
sale_df.select("PRODUCTCODE").distinct().show()
sqlContext.sql(" select distinct PRODUCTCODE from sale_table").show()#sql语句
orderBy:排序。
sale_df.select("ORDERNUMBER","PRODUCTCODE",(2018 - sale_df.YEAR_ID)) .orderBy("YEAR_ID"). show(5)
withColumn:对列的数据做更改.cast是用于类型转换
df.withColumn('列名',df['列'].cast('类型'))
df的sql语句:查找。
df.selcet("目标列").filter("条件").show()
df的sql语句:计数和排序
.count() .sorby(ascending=False)"默认视从低到高,加上括号内的就变为高到低"
数据连接
RawZipRDD = sc.textFile("file:///home/anaconda/test/Zipssortedbycitystate.csv")
ZipRDD = RawZipRDD.map(lambda line:line.split(","))
Zip_Rows ZipRDD.map(lambda p:
Row(
CITY ="+p[0]+"
STATE=+p[1]+
POSTALCODE =+p[2]+
ZIP df = sqlContext.createDataFrame(Zip Rows)
ZIP df.registerTempTable("zip table")
sqlContext.sql("select from zip_table").show()
joined_df = sale_df.join(ZIP_df,sale_df.CITY== ZIP_df.CITY,"left outer")
joined_df.filter('ORDERNUMBER="10159"').distinct().show()
四、Spark Streaming
1、流数据概念及应用场景
概念
流数据(Stream Data)是指在不断生成并需要持续处理的数据流。流数据具有高吞吐量、低延迟、实时处理等特点。
应用场景
- 实时监控:如网络流量监控、服务器状态监控。
- 实时分析:如金融交易分析、用户行为分析。
- 实时处理:如日志处理、异常检测、流媒体处理。
- 物联网:如传感器数据处理、智能家居数据处理。
2、Spark Streaming概念及应用
概念
Spark Streaming是Spark的一个扩展,用于实时流数据处理。它可以从多种数据源(如Kafka、Flume、Kinesis等)读取数据,并进行实时处理和分析。
应用
- 实时数据清洗:过滤、转换和聚合实时数据流。
- 实时数据分析:如趋势分析、实时报告。
- 实时机器学习:如在线学习模型的训练和应用。
- 事件检测:如实时检测和响应异常事件。
Spark Streaming加载数据的关键步骤;
创建sparkcontext 创建streamingcontext对象 创建Dsteam的加载函数
3、Spark Streaming工作原理
Spark Streaming将实时数据流划分为多个小批次(micro-batches),每个批次会生成一个RDD,然后使用Spark的批处理引擎处理这些RDD。
工作流程:
- 数据接收:从数据源接收实时数据。
- 数据划分:将数据流划分为小批次。
- 数据处理:对每个小批次的数据进行处理,生成新的RDD。
- 结果输出:将处理结果输出到外部存储或系统。
4、DStream的概念及原理
概念
DStream(Discretized Stream)是Spark Streaming中的基本抽象,表示连续的数据流。每个DStream由一系列RDD组成,每个RDD对应一个时间间隔内的数据。
原理
- RDD序列:DStream内部表示为一系列按时间顺序排列的RDD。
- 转换操作:可以对DStream应用各种转换操作,生成新的DStream。
- 持续计算:Spark Streaming会定期对新生成的RDD进行处理。
5、DStream的几个操作函数
创建SparkContext对象
conf = SparkConf().setMaster('local[4]').setAppName('app')
Sc = SparkContext(conf=conf)
创建StreamingContext对象
ssc = StreamingContext(sc,20)
创建InputStream
input = ssc.textFileStream('file:///home/ub22/testtest/spark')
map:对DStream的每个RDD中的每个元素应用一个函数。
counts = lines.map(lambda x:(x,1))
counts.pprint()
[a,b,c]->[(a,1),(b,1),(c,1)]
filter:对DStream的每个RDD中的每个元素进行过滤。
counts = lines.flatMap(lambda line: line.split( ".")
counts.pprint()
[Hello ,this
is ,a
spark ,demo]->(Hello,this,is,a,spark,demo)
union:合并两个Dstream
words = lines.flatMap(lambda line: line.split(" "))
wordone = words.map(lambda x:x+'one')
wordtwo = words.map(lambda y:y+'two')
unionwords = wordone.union(wordtwo)
reduceByKey:对Key-Value类型的DStream进行按Key聚合。
(hello , 1)
( hello , 1) (hello , 2)
(word, 1)-》 (word, 1)
window:对DStream应用窗口操作。
val windowedStream = dstream.window(Seconds(30), Seconds(10))
foreachRDD:对DStream的每个RDD应用一个函数。
dstream.foreachRDD(rdd => {
// 对RDD进行处理
})
6、DStream持久化方法
持久化:通过持久化操作将DStream的数据缓存到内存或磁盘中。
saveAsTextFiles()
#统计词频
running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda
word:\ (word,1)).reduceByKey(lambda x,y:x+y)
#统计结果存储到本地
running_counts.saveAsTextFiles("file:///usr/local/test/spark/output.txt")
#打印统计结果
running_counts.pprint()
7、DStream窗口的概念,滑动时间、窗口时间的概念
窗口(Window):对DStream进行分片处理,基于时间窗口进行计算。
窗口时间(windowLength):窗口的时间长度,例如30秒。
滑动时间(slideInterval):前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数
window(windowLength, slideInterval)
五、Spark机器学习
1、机器学习的概念及一般流程
概念
机器学习是指通过数据训练模型,使计算机能够自动从数据中学习并进行预测或决策的技术。
准备数据、训练模型、模型评估
一般流程
- 数据收集:获取用于训练和测试的数据集。
- 数据预处理:清洗、转换和标准化数据。
- 特征工程:提取和选择有效的特征。
- 模型训练:是将预处理过的数据按照模型的需要进行划分,并进行模型训练的过程
- 模型划分:有监督学习:具有特征和标签值的训练集和测试集; 无监督学习: 给定数据和模型假设空间,构建优化问题。
- 模型评估:
- 模型部署:将模型应用于实际环境中。
- 模型监控:监控模型的性能,并进行更新和改进。
2、常用回归、分类、聚类算法及大体原理
回归:
- 线性回归:建立输入与输出之间的线性关系。
- 决策树回归:通过构建树形结构来预测连续值。
分类:
- 逻辑回归:用于二分类问题,通过Sigmoid函数预测概率。
- 支持向量机(SVM):寻找最佳超平面将数据分为不同类别。
- 随机森林:通过多个决策树的投票结果进行分类。
聚类:
- K-Means:将数据分为K个簇,每个簇的中心点为均值。
- 层次聚类:通过构建层次树状结构进行聚类。
3、Spark ML组件的概念(转化器、预测器、管道)
核心功能:机器学习算法、特征化方法、持久化方法、实用功能、管道方法
转化器(Transformer):针对特征:读入一列特征,输出新的DataFrame类型列; 针对模型学习:读入一组特征,输出DataFrame类型的预测结果。
val scaler = new StandardScaler().fit(data)
val scaledData = scaler.transform(data)
预测器(Estimator):用于模型训练,如线性回归、决策树等。算法,调用fit方法训练数据。
val lr = new LinearRegression()
val model = lr.fit(trainingData)
4、Spark ML工作过程中DataFrame的内容变化
初始DataFrame:原始数据集。
特征工程:增加或修改特征列。
模型训练:生成模型,并在DataFrame中增加预测结果列。
六、Spark图计算
1、图的概念
图(Graph)是由顶点(Vertex)和边(Edge)组成的结构,用于表示实体及其关系。
2、图的名词解释(度、路径、距离、连通、三角、pagerank)
度(Degree):一个顶点的连接边数。
路径(Path):从一个顶点到另一个顶点的边的序列。
距离(Distance):路径的长度。
连通(Connectivity):两个顶点之间是否存在路径。
三角(Triangle):三个顶点两两相连形成的子图。
PageRank:衡量顶点重要性的一种算法,常用于搜索引擎排名。
3、图的构建(顶点表、边表的格式要求)
顶点表:顶点DataFrame :必须包含列名为”id”的列,作为顶点的唯一标识。
vertices = spark.createDataFrame([
("a", "Alice", 34),("b", "Bob", 36)]
, ["id", "name", "age"])
边表:边DataFrame :必须包含列名为“src”和”dst”的列,保存头和尾的唯一标识id。
>>> edges = spark.createDataFrame([
("a", "b", "friend")]
, ["src", "dst", "relationship"])
4、图的常用查看函数、操作函数
通过GraphFrame提供的三个属性: degrees, inDegrees, outDegrees 可以获得顶点的度、入度和出度。
graph.degrees.show()
graph.inDegrees.show()
graph.outDegrees.show()
采用形如“(a)-[e]->(b)”的模式描述有向边
motifs = graph.find("(a)-[e]->(b)")
motifs.show()
模式视图是DataFrame类型的,同样可以进一步进行查询、过滤和统计操作。
motifs.filter( "b.age > 30" ).show()
模式中有多条边时,需要用分号(“;”)拼接,
例1:"(a)-[e]->(b);(b)-[e2]->(c)” 表示一条从a到b,然后从b到c的路径不包含某条边,在边的模式前面加上“!”
例2 : "(a)-[e]->(b); ! (b)-[e2]->(a)” ,表示不选取包含重边的边。