×

一、Spark架构与原理

1、Spark的特点

· 速度快:由于采用了内存计算和DAG(Directed Acyclic Graph, 有向无环图)调度引擎,Spark比Hadoop MapReduce快几千倍

· 易用性:提供了丰富的API接口,支持Scala、Java、Python和R等多种编程语言。

· 通用性:不仅支持批处理任务,还支持流处理、交互式查询和机器学习等多种计算任务。

· 高容错性:利用数据冗余和RDD(Resilient Distributed Dataset,弹性分布式数据集)确保数据在计算过程中的容错能力。

2、Spark与Hadoop的区别

· 计算模型:Hadoop使用的是MapReduce编程模型,而Spark使用的是RDD模型,支持更多的操作和更灵活的数据处理方式。

· 内存计算:Spark的内存计算能力使其在处理迭代和交互式查询时比Hadoop快得多,而Hadoop主要依赖磁盘I/O。

· 生态系统:Spark包含多个内置的模块,如Spark SQL、Spark Streaming、MLlib等,而Hadoop需要依赖其他组件(如Hive、Pig、Mahout)来实现类似功能。

· 易用性:Spark提供了更高级的API和丰富的编程接口,简化了大数据处理的开发工作。

3、Spark生态系统组件及基本用途

Spark Core:核心计算引擎,负责基本的任务调度、内存管理、错误恢复、交互等功能。

· Spark SQL:处理结构化数据的模块,提供了对SQL查询的支持,可以将SQL查询与Spark程序无缝集成。

· Spark Streaming:处理实时数据流的模块,支持从多种数据源(如Kafka、Flume、Kinesis等)读取数据,并进行实时处理。

· MLlib机器学习库,提供了各种机器学习算法的实现,如分类、回归、聚类、协同过滤等。

· GraphX:图计算引擎,支持对图数据进行并行计算,提供了图算法库(如PageRank、Connected Components等)。

· SparkR:支持R语言的模块,可以使用R语言编写Spark应用程序,适合统计分析和数据科学家使用。

4、Spark的应用场景

· 数据批处理:适用于大规模数据的批量处理,如日志分析、ETL(Extract, Transform, Load)任务等。

· 实时数据处理:可以用于实时数据流处理,如实时监控、实时数据分析、欺诈检测等。

· 机器学习:通过MLlib可以进行大规模数据的机器学习模型训练和预测,如推荐系统、分类、回归分析等。

· 图计算:利用GraphX进行社交网络分析、推荐系统中的图计算等。

· 交互式数据分析:支持使用Spark SQL进行交互式查询,适合数据探索和临时查询任务。

5、Spark架构,各节点职责

Driver(驱动程序):负责任务的调度和协调,向集群中的各个Executor分发任务,并跟踪任务的执行情况。Driver还负责将用户编写的程序转换为DAG(有向无环图),并进一步将其分解为具体的任务。

Cluster Manager(集群管理器):在集群上获取资源的外部服务(例如: Standalone, Mesos, Yarn

Executor(执行器):执行者。是Application运行在worker node上的一个进程,该进程负责运行Task,并且负责讲数据存在内存或者磁盘上。每个Application会申请各自的Executors来处理任务

Application:Spark的应用程序,包含一个Driver program和若干个Executor. SparkContext:Spark应用程序的入口,负责调度各个运算资源,协调各个WorkerNode 上的Executor.

Driver Program:驱动。运行Application的main()函数并且创建SparkContext

Worker Node:集群中任何可以运行Application代码的节点,运行一个或多个Executor进程 Task:任务。运行在Executor上的工作单元 Job:作业。SparkContext提交的具体Action操作,常和Action对应

Stage:阶段。每个Job会被拆分很多组任务(task),每组任务被称为stage,也称为TaskSet DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler TaskScheduler:将Taskset提交给Worker node集群运行并返回结果 Transformations/Action: Spark API的两种类型: Transformation返回值还是一个RDD ; Action返回值不是一个RDD ,而是一个集合。所有的 Transformation都是采用的懒策略,如果只是讲 Transformation提交是不会执行计算的。计算只有Action被提 交才会被触发

6、Spark作业运行流程

1.启动SparkContext

2.注册申请资源

3.分配资源,然后启动Executor

4.ExecutorSparkContext注册

5.分配并提交任务

6·注销释放资源

提交作业:用户通过SparkContext提交应用程序,Driver启动并解析用户代码。

生成DAG:Driver将用户代码转换为DAG,表示计算逻辑的有向无环图。

任务划分:DAG进一步分解为多个Stage(阶段),每个Stage由一组并行执行的Task(任务)组成。

资源调度:Driver向Cluster Manager请求资源,获取计算资源后,启动Executor进程。

任务分发:Driver将任务分发到各个Executor上,并根据DAG的依赖关系执行。

任务执行:Executor执行任务,将计算结果存储在内存或磁盘中,并将结果返回给Driver。

结果收集:Driver收集各个Executor返回的结果,完成计算并将最终结果返回给用户。

二、Spark RDD

1、RDD的概念、内容、运行流程

概念

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心抽象(基本数据抽象),RDD是只读的、分区记录的集合,RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建.

弹性

  1. RDD可以在内存和磁盘之间手动或自动切换
  2. RDD可以通过转换成为其他的RDD
  3. RDD可以存储任意类型的数据

内容

  • 分区:RDD由多个分区(Partition)组成,每个分区代表数据的一个子集。
  • 依赖关系:记录了RDD之间的转换关系,用于容错和重计算。
  • 计算函数:定义了对数据的具体操作(如map、filter)。
  • 分区器:定义了数据如何在集群中分布(对于Key-Value类型的RDD)。
  • 首选位置:表示每个分区首选在哪些节点上计算,优化数据局部性。
  • 初代RDD:存储的只是真实数据的分区信息,还有就是针对单个分区的读取方法,血统的顶层 子代RDD:存储初代RDD到底干了什么才会产生自己,还有就是初代RDD的引用,血统的下层

运行流程

  1. RDD创建:从外部数据源(如HDFS、HBase、S3等)或现有RDD转换生成新RDD。
  2. RDD转换:通过各种转换算子(如map、filter、join等)定义数据操作逻辑。
  3. RDD行动:通过行动算子(如collect、count、saveAsTextFile等)触发实际计算。
  4. 任务划分:Spark将RDD的操作逻辑划分为多个任务,每个任务对应于一个RDD分区。
  5. 任务调度与执行:Driver将任务分发到集群中的Executor上并行执行,Executor将计算结果返回给Driver。数据读取发生在什么时候? 发生在运行的Task中,即数据是在任务分发的executor上运行的时候读取的

运行模式

1,在mesos或者yarn集群管理器上部署运行 2,在standalone和local(没有集群)的模式上部署运行。

RDD的五个主要属性

分区信息(Partition):数据集的基本组成单位

Compute函数:对于给定的数据集,需要做哪些计算

Partitioner函数:对于计算出来的数据结果如何分发

优先位置列表:对于datapartition的位置偏好

依赖关系:RDD的依赖关系,描述了RDD之间的Lineage

2、RDD的创建方式

从集合创建sc.parallelize([1, 2, 3, 4, 5])

从外部数据源创建sc.textFile("hdfs://path/to/file")

从现有RDD转换:通过对已有RDD应用转换算子创建新RDD,例如rdd.map(x => x * 2)

3、转换算子与执行算子区分

转换算子(Transformation):对RDD进行转换操作,生成新的RDD,如mapfilterflatMap等。转换算子是惰性的(Lazy),只有在行动算子触发时才实际执行。

执行算子(Action):对RDD进行操作,触发实际计算并返回结果,如collectcountsaveAsTextFile等。

4、常用转换算子及含义、用法(一维类、key-value类)

一维类

  • map:对RDD的每个元素应用一个函数,返回新的RDD。rdd2=sc.parallelize([‘a’,’b’])
    rdd2 = rdd1.map(lambda x :(x,1)).collect()
    [(‘a’,1),(‘b’,1)]
  • filter:过滤RDD中的元素,返回满足条件的元素组成的新RDD。>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    >>> rdd.filter(lambda x: x % 2 == 0).collect()
    [2, 4]
  • flatMap:将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD>>> rdd = sc.parallelize([2,3,4])
    >>> rdd2 = rdd.map(lambda x: range(1,x))
    >>> rdd2.collect()
    [[1], [1, 2], [1, 2, 3]]
    >>> rdd1 = rdd.flatMap(lambda x: range(1,x))
    >>> rdd1.collect()
    [1, 1, 2, 1, 2, 3]
  • distinct():对RDD算子中的元素进行去重rdd1.distinct()
  • union():合并两个RDD,结果中包含两个RDD中的所有元素rdd1.union(rdd2)
  • sortBy:给rdd算子排序.sortBy(lambda x 🙂
  • mapPartitions:它的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    >>> def f(iterator): yield sum(iterator)
    >>> rdd.mapPartitions(f).collect()
    [3, 7]
  • intersection:返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
    >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
    >>> rdd1.intersection(rdd2).collect()
    [1, 2, 3]
  • subtract(other):返回在RDD1中出现,但是不在RDD2中出现的元素,不去重rdd1= sc.parallelize([(“a”,1), (“b”,4), (“b”,5), (“a”,3)])
    rdd2= sc.parallelize([(“a”,3), (“c”,None)])
    rdd1.subtract(rdd2).collect()
    [(‘a’, 1), (‘b’, 4), (‘b’, 5)]
  • reduce()通过指定的聚合方法来对RDD中元素进行聚合>>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
    15

Key-Value类

  • reduceByKey:对相同Key的元素进行聚合操作,返回新的RDD。>>> from operator import add
    >>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
    >>> sorted(rdd.reduceByKey(add).collect())
    [(‘a’, 2), (‘b’, 1)]
  • groupByKey:将具有相同Key的元素分组,返回Key和对应的元素集合组成的新RDD。>>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
    >>> sorted(rdd.groupByKey().mapValues(len).collect()
    [(‘a’, 2), (‘b’, 1)]
    >>> sorted(rdd.groupByKey().mapValues(list).collect())
    [(‘a’, [1, 1]), (‘b’, [1])]
  • join:对两个RDD进行内连接操作,返回Key相同的元素对。>>> x = sc.parallelize([(“a”, 1), (“b”, 4)])
    >>> y = sc.parallelize([(“a”, 2), (“a”, 3)])
    >>> x.join(y).collect()
    [(‘a’, (1, 2)), (‘a’, (1, 3))]
  • sortBy:根据指定的Key进行排序>>> tmp = [(‘a’,1), (‘b’,2), (‘1’,3), (‘d’,4), (‘2’, 5)]
    >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect)
    [(‘1’, 3), (‘2’, 5), (‘a’, 1), (‘b’, 2), (‘d’, 4)]
    >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
    [(‘a’, 1), (‘b’, 2), (‘1’, 3), (‘d’, 4), (‘2’, 5)]
  • mapValues(f)针对(Key, Value)型数据中的Value 进行Map操作,而不对Key进行处理>>> rdd = sc.parallelize([(“a”, [“apple”, “banana”, “lemon”]), (“b”, [“grapes”]),(“a”, [“apple”,
    “banana”])])
    >>> def f(x): return len(x)
    >>> rdd.mapValues(f).collect()
    [(‘a’, 3), (‘b’, 1) , (‘a’, 2)]
  • flatMapValues(f):完成mapValues处理后,再对结果进行扁平化处理>>> rdd = sc.parallelize([(“a”,[“x”,”y”]),(“b”,[“p”, “r”])])
    >>> def f(x): return x
    >>> rdd.flatMapValues(f).collect)
    [(‘a’, ‘x’), (‘a’, ‘y’), (‘b’, ‘p’), (‘b’, ‘r’)]
  • sortByKey:根据key值进行排序,默认升序>>> tmp = [(‘a’, 1), (‘B’, 2), (‘1’, 3), (‘d’, 4)]
    >>> sc.parallelize(tmp).sortByKey0
    [(‘1’, 3), (‘B’, 2), (‘a’, 1), (‘d’, 4)]
    >>> sc.parallelize(tmp).sortByKey(True, None, keyfunc=lambda k: k.lower()).collect()
    [(‘1’, 3), (‘a’, 1), (‘B’, 2), (‘d’, 4)]
  • leftOuterJoin(rdd):左外连接,与SQL中的左外连接一致>>> x = sc.parallelize([(“a”, 1) , (“b”, 4)])
    >>> y = sc.parallelize([(“a”, 2)])
    >>> x.leftOuterJoin(y).collect()
    [(‘a’, (1, 2)) , (‘b’, (4, None))]
  • take(num)从RDD中返回前num个元素的列表>>> sc.parallelize([4, 6, 8, 2, 9]).take(2)
    [4, 6]
    >>> sc.parallelize([4, 6, 8, 2, 9]).take(10)
    [4, 6, 8, 2, 9]
  • takeOrdered(num):从RDD中返回前num个最小的元素的列表,结果默认升序排列>>> sc.parallelize([4, 6, 8, 2, 9]).takeOrdered(2)
    [2, 4]
    >>> sc.parallelize([4, 6, 8, 2, 9]).takeOrdered(10)
    [2, 4, 6, 8, 9]
  • top从RDD中返回最大的前num个元素列表,结果默认降序排列 如果key参数有值,则先对各元素进行对应处理 注:会把所以数据都加载到内存,所以该方法只有在数据很小时使用>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
    [12]
    >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
    [6, 5]
    >>> sc.parallelize([10, 4, 2, 12, 3]).top(4, key=str)
    [4, 3, 2, 12]
  • foreach(f):遍历RDD的每个元素,并执行f函数操作,无返回值>>> def f(x): print(x)
    >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
  • foreachPartiton(f):对每个分区执行f函数操作,无返回值>>> def f(iterator):
    s=sum(iterator)
    print(s)
    >>> sc.parallelize([1,2,3,4,5],3).foreachPartition(f)
    1
    9
    5
  • saveAsTextFile将RDD中的元素以字符串的格式存储在文件系统中>>> rdd = sc.parallelize([‘foo’, ‘bar’],2)
    >>> rdd.saveAsTextFile(‘/home/ua15/sparkRdd_result’)
    >>> rdd.saveAsTextFile(‘hdfs://xdata-m0:8020/user/ua15/spark-data’)
  • collectAsMap()以字典形式,返回PairRDD中的键值对如果key重复,则后面的value覆盖前面的>>> rdd = sc.parallelize([(1, 2), (3, 4)])
    >>> rdd.collectAsMap()
    {1: 2, 3: 4}
    >>> rdd = sc.parallelize([(1, 2), (3, 4),(1,4)])
    {1: 4, 3: 4}
  • countByKey ()以字典形式,返回PairRDD中key值出现的次数>>> rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
    >>> rdd.countByKey()
    [(‘a’, 2), (‘b’, 1)]

5、常用执行算子及含义、用法

collect:将RDD的所有元素拉取到驱动程序中。

val result = rdd.collect()

count:计算RDD中的元素数量。

val num = rdd.count()

saveAsTextFile:将RDD保存为文本文件。

rdd.saveAsTextFile("hdfs://path/to/output")

6、两种共享变量的含义和作用

广播变量(Broadcast Variables):一个全局共享变量,可以广播只读变量。

!!!注意事项!!! 1、能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 2、广播变量只能在Driver端定义,不能在Executor端定义。 3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。 4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有 多少Driver端的变量副本。 5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份 Driver端的变量副本。

>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([O, O]).flatMap(lambda x:
b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()

累加器(Accumulators):用于在执行过程中对某个值进行聚合(如计数、求和等)。

!!!注意事项!!! 1、累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值, 在Excutor端更新。 2、累加器不是一个调优的操作,因为如果不这样做,结果是错的

counter = 0
def increment(x):
global counter
counter + = x
rdd= sc.parallelize(range(10))
rdd.foreach(increment)
print("Spark Counter value: ", counter)

7、宽依赖、窄依赖

窄依赖(Narrow Dependency):是指每个父RDD的一个Partition最多被子RDD的一个Partition所用(一对一)。例如mapfilter、union。

宽依赖(Wide Dependency):·是指一个父RDD的Partition会被多个子RDD的Partition所使用(多对多)。例如reduceByKeygroupByKey、sortByKey。

8、stage划分方法

基于宽依赖进行划分,每遇到一个宽依赖操作,就会触发新的Stage划分。窄依赖操作在同一个Stage中处理。

9、RDD持久化方式

内存持久化:将RDD数据存储在内存中(MEMORY_ONLY(将RDD以JAVA对象的形式保存到JVM内存如果分片太大,内存缓存不下,就不缓存)、MEMORY_AND_DISK(将RDD数据集以JAVA对象的形保存到JVM内存中如果分片太大不能保存到内存中,则保存到磁盘上,并在下次用时重新从磁盘读取。))。

磁盘持久化:将RDD数据存储在磁盘中(DISK_ONLY)。

序列化持久化:以序列化形式存储RDD数据,减少内存占用(MEMORY_ONLY_SER(将RDD以序列化的JAVA对象形式保存到内存)、MEMORY_AND_DISK_SER(与MEMORY_ONLY_SER类似,但当分片太大,不能保存到内存中,会将其保存到磁盘中))。

三、Spark SQL

1、Spark SQL的作用和特点

作用:用于处理结构化数据,通过提供SQL查询接口,使数据处理更容易和直观。支持与Spark其他组件无缝集成。

特点

  • 与Hive兼容,可以直接查询Hive表。
  • 支持JDBC和ODBC,方便与其他数据工具集成。
  • 提供了丰富的SQL函数和优化引擎。

2、创建DataFrame的方式

从RDD创建:通过隐式转换将RDD转换为DataFrame。

from pyspark.sql import Sparksession
sqlContext=SparkSession.builder.getOrCreate()
from pyspark.salimport Row
sale Rows=satesRDD.map(Lambda p:
                   Row(
                   PSPTLINE p[0].
                   RPONESSLINE1p[15].
                   PRICEEANENUMBER p[31.
                   STATUS-B[9]
                   POSTALCODE p[19]
                   CONTACTLASTNAME=p[221]
                   CONTACTFIRSTNAME=p[23]
                            )
                   )
sale_Rows.take(5)
                                     
sale_df=colContext createnataFrame(sale_Rows)
sale_df.printSchema()

使用Spark SQL

sale_df.registerTempTable("sale_table")
sqlContext.sql(" select count(*) counts from sale_table").show()

从数据源创建:读取外部数据源创建DataFrame。

val df = spark.read.json("path/to/json")

直接创建:通过编程接口直接创建DataFrame。

val df = spark.createDataFrame(Seq((1, "a"), (2, "b"))).toDF("id", "value")

3、DataFrame的常用操作函数

使用DataFrames增加计算字段

sale_df.select("ORDERNUMBER","PRODUCTCODE",(2018 - sale_df.YEAR_ID)).show(5)

使用Spark SQL增加计算字段

sqlContext.sql(" select ORDERNUMBER, PRODUCTCODE, (2018 - YEAR_ID) from sale_table").show(5)

filter:筛选数据。

sale_df.filter("YEAR_ID='2003'").show(5)

groupBy:分组聚合。

sale_df.select("PRODUCTCODE").groupby("PRODUCTCoDE").count().show()
sqlContext.sql(" select PRODUCTCODE, count(*) counts from sale_table group by PRODUCTCODE").show()#sql语句

join:连接两个DataFrame。

val df3 = df1.join(df2, "column1")

distince:数据去重

sale_df.select("PRODUCTCODE").distinct().show()
sqlContext.sql(" select distinct PRODUCTCODE from sale_table").show()#sql语句

orderBy:排序。

sale_df.select("ORDERNUMBER","PRODUCTCODE",(2018 - sale_df.YEAR_ID)) .orderBy("YEAR_ID"). show(5)

withColumn:对列的数据做更改.cast是用于类型转换

df.withColumn('列名',df['列'].cast('类型'))

df的sql语句:查找。

df.selcet("目标列").filter("条件").show()

df的sql语句:计数和排序

.count()    .sorby(ascending=False)"默认视从低到高,加上括号内的就变为高到低"

数据连接

RawZipRDD = sc.textFile("file:///home/anaconda/test/Zipssortedbycitystate.csv")
ZipRDD = RawZipRDD.map(lambda line:line.split(","))
Zip_Rows ZipRDD.map(lambda p:
Row(
CITY ="+p[0]+"
STATE=+p[1]+
POSTALCODE =+p[2]+
ZIP df = sqlContext.createDataFrame(Zip Rows)
ZIP df.registerTempTable("zip table")
sqlContext.sql("select from zip_table").show()

joined_df = sale_df.join(ZIP_df,sale_df.CITY== ZIP_df.CITY,"left outer")
joined_df.filter('ORDERNUMBER="10159"').distinct().show()

四、Spark Streaming

1、流数据概念及应用场景

概念

流数据(Stream Data)是指在不断生成并需要持续处理的数据流。流数据具有高吞吐量、低延迟、实时处理等特点。

应用场景

  • 实时监控:如网络流量监控、服务器状态监控。
  • 实时分析:如金融交易分析、用户行为分析。
  • 实时处理:如日志处理、异常检测、流媒体处理。
  • 物联网:如传感器数据处理、智能家居数据处理。

2、Spark Streaming概念及应用

概念

Spark Streaming是Spark的一个扩展,用于实时流数据处理。它可以从多种数据源(如Kafka、Flume、Kinesis等)读取数据,并进行实时处理和分析。

应用

  • 实时数据清洗:过滤、转换和聚合实时数据流。
  • 实时数据分析:如趋势分析、实时报告。
  • 实时机器学习:如在线学习模型的训练和应用。
  • 事件检测:如实时检测和响应异常事件。

Spark Streaming加载数据的关键步骤;

创建sparkcontext 创建streamingcontext对象 创建Dsteam的加载函数

3、Spark Streaming工作原理

Spark Streaming将实时数据流划分为多个小批次(micro-batches),每个批次会生成一个RDD,然后使用Spark的批处理引擎处理这些RDD。

工作流程:

  1. 数据接收:从数据源接收实时数据。
  2. 数据划分:将数据流划分为小批次。
  3. 数据处理:对每个小批次的数据进行处理,生成新的RDD。
  4. 结果输出:将处理结果输出到外部存储或系统。

4、DStream的概念及原理

概念

DStream(Discretized Stream)是Spark Streaming中的基本抽象,表示连续的数据流。每个DStream由一系列RDD组成,每个RDD对应一个时间间隔内的数据。

原理

  • RDD序列:DStream内部表示为一系列按时间顺序排列的RDD。
  • 转换操作:可以对DStream应用各种转换操作,生成新的DStream。
  • 持续计算:Spark Streaming会定期对新生成的RDD进行处理。

5、DStream的几个操作函数

创建SparkContext对象

conf = SparkConf().setMaster('local[4]').setAppName('app')
Sc = SparkContext(conf=conf)

创建StreamingContext对象

ssc = StreamingContext(sc,20)

创建InputStream

input = ssc.textFileStream('file:///home/ub22/testtest/spark')

map:对DStream的每个RDD中的每个元素应用一个函数。

counts = lines.map(lambda x:(x,1))
counts.pprint()
[a,b,c]->[(a,1),(b,1),(c,1)]

filter:对DStream的每个RDD中的每个元素进行过滤。

counts = lines.flatMap(lambda line: line.split( ".")
counts.pprint()
[Hello ,this
is ,a
spark ,demo]->(Hello,this,is,a,spark,demo)

union:合并两个Dstream

words = lines.flatMap(lambda line: line.split(" "))
wordone = words.map(lambda x:x+'one')
wordtwo = words.map(lambda y:y+'two')
unionwords = wordone.union(wordtwo)

reduceByKey:对Key-Value类型的DStream进行按Key聚合。

(hello , 1)
( hello , 1) (hello , 2)
(word, 1)-》 (word, 1)

window:对DStream应用窗口操作。

val windowedStream = dstream.window(Seconds(30), Seconds(10))

foreachRDD:对DStream的每个RDD应用一个函数。

dstream.foreachRDD(rdd => {
 // 对RDD进行处理
})

6、DStream持久化方法

持久化:通过持久化操作将DStream的数据缓存到内存或磁盘中。

saveAsTextFiles()
#统计词频
running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda
word:\ (word,1)).reduceByKey(lambda x,y:x+y)
#统计结果存储到本地
running_counts.saveAsTextFiles("file:///usr/local/test/spark/output.txt")
#打印统计结果
running_counts.pprint()

7、DStream窗口的概念,滑动时间、窗口时间的概念

窗口(Window):对DStream进行分片处理,基于时间窗口进行计算。

窗口时间(windowLength):窗口的时间长度,例如30秒。

滑动时间(slideInterval):前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数

window(windowLength, slideInterval)

五、Spark机器学习

1、机器学习的概念及一般流程

概念

机器学习是指通过数据训练模型,使计算机能够自动从数据中学习并进行预测或决策的技术。

准备数据、训练模型、模型评估

一般流程

  1. 数据收集:获取用于训练和测试的数据集。
  2. 数据预处理:清洗、转换和标准化数据。
  3. 特征工程:提取和选择有效的特征。
  4. 模型训练:是将预处理过的数据按照模型的需要进行划分,并进行模型训练的过程
  5. 模型划分:有监督学习:具有特征和标签值的训练集和测试集; 无监督学习: 给定数据和模型假设空间,构建优化问题。
  6. 模型评估
  7. 模型部署:将模型应用于实际环境中。
  8. 模型监控:监控模型的性能,并进行更新和改进。

2、常用回归、分类、聚类算法及大体原理

回归

  • 线性回归:建立输入与输出之间的线性关系。
  • 决策树回归:通过构建树形结构来预测连续值。

分类

  • 逻辑回归:用于二分类问题,通过Sigmoid函数预测概率。
  • 支持向量机(SVM):寻找最佳超平面将数据分为不同类别。
  • 随机森林:通过多个决策树的投票结果进行分类。

聚类

  • K-Means:将数据分为K个簇,每个簇的中心点为均值。
  • 层次聚类:通过构建层次树状结构进行聚类。

3、Spark ML组件的概念(转化器、预测器、管道)

核心功能:机器学习算法、特征化方法、持久化方法、实用功能、管道方法

转化器(Transformer):针对特征:读入一列特征,输出新的DataFrame类型列; 针对模型学习:读入一组特征,输出DataFrame类型的预测结果。

val scaler = new StandardScaler().fit(data)
val scaledData = scaler.transform(data)

预测器(Estimator):用于模型训练,如线性回归、决策树等。算法,调用fit方法训练数据。

val lr = new LinearRegression()
val model = lr.fit(trainingData)

4、Spark ML工作过程中DataFrame的内容变化

初始DataFrame:原始数据集。

特征工程:增加或修改特征列。

模型训练:生成模型,并在DataFrame中增加预测结果列。

六、Spark图计算

1、图的概念

图(Graph)是由顶点(Vertex)和边(Edge)组成的结构,用于表示实体及其关系。

2、图的名词解释(度、路径、距离、连通、三角、pagerank)

度(Degree):一个顶点的连接边数。

路径(Path):从一个顶点到另一个顶点的边的序列。

距离(Distance):路径的长度。

连通(Connectivity):两个顶点之间是否存在路径。

三角(Triangle):三个顶点两两相连形成的子图。

PageRank:衡量顶点重要性的一种算法,常用于搜索引擎排名。

3、图的构建(顶点表、边表的格式要求)

顶点表:顶点DataFrame :必须包含列名为”id”的列,作为顶点的唯一标识。

 vertices = spark.createDataFrame([
("a", "Alice", 34),("b", "Bob", 36)]
, ["id", "name", "age"])

边表:边DataFrame :必须包含列名为“src”和”dst”的列,保存头和尾的唯一标识id。

>>> edges = spark.createDataFrame([
("a", "b", "friend")]
, ["src", "dst", "relationship"])

4、图的常用查看函数、操作函数

通过GraphFrame提供的三个属性: degrees, inDegrees, outDegrees 可以获得顶点的度、入度和出度。

graph.degrees.show()
graph.inDegrees.show()
graph.outDegrees.show()

采用形如“(a)-[e]->(b)”的模式描述有向边

motifs = graph.find("(a)-[e]->(b)")
motifs.show()

模式视图是DataFrame类型的,同样可以进一步进行查询、过滤和统计操作。

motifs.filter( "b.age > 30" ).show()

模式中有多条边时,需要用分号(“;”)拼接,

例1:"(a)-[e]->(b);(b)-[e2]->(c)” 表示一条从a到b,然后从b到c的路径不包含某条边,在边的模式前面加上“!”
例2 : "(a)-[e]->(b); ! (b)-[e2]->(a)” ,表示不选取包含重边的边。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

作者

1538509536@qq.com

这是软考的知识点

考试须知 考试项目 考试时间 基础知识 选择...

读出全部