apachespark源码
1. 怎么用Eclipse搭建Spark源码阅读环境
第一部分、软件安装
1、 安装JDK (版本为1.7.0_11)
2、 安装Scala (版本为2.11.2)
3、 安装ScalaIDE(版本为3.0.4)
第二部分:加压缩官网下载的源代码包或者找到通过Git抽取的Spark源文件:
我用的是spark-1.1.1版本(最新版本),由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。
源码下载(用git工具):
# Masterdevelopment branch
gitclone git://github.com/apache/spark.git
# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1
gitclone git://github.com/apache/spark.git -b branch-1.1
源码更新(用git工具同步跟新源码):
gitclone https://github.com/apache/spark.git
第三部分:通过sbt工具,构建Scala的Eclipse工程,详细步骤如下所示
1、通过cmd命令进入DOS界面,之后通过cd命令进入源代码项目中,我下载的Spark.1.1.1版本的源代码放在(E:\Spark计算框架的研究\spark_1_1_1_eclipse)文件夹中,之后运行sbt命令,如下所示:
2、运行sbt命令之后,解析编译相关的jar包,并出现sbt命令界面窗口,出现的效果图如下所示,之后运行eclipse命令,sbt对这个工程进行编译,构建Eclipse项目,效果图如下所示:
4、 打开ScalaIDE工具,File à Import à Existing Projects into Workspace à
Next à
选择刚好用sbt工具编译好的Eclipse工程(E:\Spark计算框架的研究\spark_1_1_1_eclipse),如下图所示。
5、 通过上面的操作,就可以将通过sbt工具编译生成的Eclipse项目导入到EclipseIDE开发环境中,效果图如下所示:
错误提示如下所示:我导入的包为,如下文件夹中所示。
(E:\Spark计算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)
Description Resource Path Location Type
akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled
in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with
an incompatible version of Scala (2.10). In case of errorneous report,
this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
上面这些包兼容性问题还没有解决,修改相应的jar包就可以解决。
2. spark mllib算法接口源码在什么地方查看
1.1LDA实例实例步骤:1)加载数据返回的数据格式为:documents:RDD[(Long,Vector)],其中:Long为文章ID,Vector为文章分词后的词向量;用户可以读取指定目录下的数据,通过分词以及数据格式的转换,转换成RDD[(Long,Vector)]即可。2)建立模型模型参数设置说明:k:主题数,或者聚类中心数DocConcentration:文章分布的超参数(Dirichlet分布的参数),必需>1.0TopicConcentration:主题分布的超参数(Dirichlet分布的参数),必需>1.0MaxIterations:迭代次数setSeed:随机种子CheckpointInterval:迭代计算时检查点的间隔Optimizer:优化计算方法,目前支持"em","online"3)结果输出topicsMatrix以及topics(word,topic))输出。实例代码如下:[java]viewplainimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.mllib.clustering.LDAimportorg.apache.spark.mllib.linalg.Vectorsobjectlda{defmain(args:Array[String]){//0构建Spark对象valconf=newSparkConf().setAppName("lda")valsc=newSparkContext(conf)Logger.getRootLogger.setLevel(Level.WARN)//1加载数据,返回的数据格式为:documents:RDD[(Long,Vector)]//其中:Long为文章ID,Vector为文章分词后的词向量//可以读取指定目录下的数据,通过分词以及数据格式的转换,转换成RDD[(Long,Vector)]即可valdata=sc.textFile("data/mllib/sample_lda_data.txt")valparsedData=data.map(s=>Vectors.dense(s.trim.split('').map(_.toDouble)))//=parsedData.zipWithIndex.map(_.swap).cache()//2建立模型,设置训练参数,训练模型/***k:主题数,或者聚类中心数*DocConcentration:文章分布的超参数(Dirichlet分布的参数),必需>1.0*TopicConcentration:主题分布的超参数(Dirichlet分布的参数),必需>1.0*MaxIterations:迭代次数*setSeed:随机种子*CheckpointInterval:迭代计算时检查点的间隔*Optimizer:优化计算方法,目前支持"em","online"*/valldaModel=newLDA().setK(3).setDocConcentration(5).setTopicConcentration(5).setMaxIterations(20).setSeed(0L).setCheckpointInterval(10).setOptimizer("em").run(corpus)//3模型输出,模型参数输出,结果输出//Outputtopics.Eachisadistributionoverwords(matchingwordcountvectors)println("Learnedtopics(asdistributionsovervocabof"+ldaModel.vocabSize+"words):")valtopics=ldaModel.topicsMatrixfor(topic<-Range(0,3)){print("Topic"+topic+":")for(word<-Range(0,ldaModel.vocabSize)){print(""+topics(word,topic));}println()}}}
3. 如何对Spark 源码修改后在Eclipse中使用
Eclipse 下开发调试环境的配置
该小节中使用的各项工具分别为:Windows 7+Eclipse Java EE 4.4.2+Scala 2.10.4+Sbt 0.13.8+Maven3.3.3,测试的 Spark 版本为 1.4.0。
1.配置 IDE:
选择菜单项 Help->Install new software,添加站点 ,选择安装 Scala IDE for Eclipse 以及 Scala IDE Plugins。
对于标准版 Eclipse,还需要安装单独的 Maven 插件。
出于配置简便考虑,也可以使用 Scala 官方提供的已将所有依赖打包好的 Scala IDE。
特别的,由于项目本身存在一些错误,请先暂时关闭 Project->Build Automatically 的功能以节省时间。
2.下载 Spark 源代码:
创建空目录,执行如下语句:git clone
除了使用 git 指令之外,也可以从 Spark 的 Github 页面下载打包好的源代码。
3.将源码转化为 Eclipse 项目:
进入源代码根目录,执行如下语句:sbt eclipse。Sbt 执行期间会下载 Spark 所需要的所有 jar 包,因此该步骤会花费很长的时间。其中有一些 jar 包需要使用网络代理等方法才能下载。
4.导入项目至 Eclipse:
选择菜单项 File->Import,并选择 General->Existing Projects into Workspace,项目的根路径选择源代码的根路径,导入所有项目(共有 25 个)。
5.修改 Scala 版本:
进入 Preference->Scala->Installations,添加机器上安装的 Scala 2.10.4(选择 lib 目录)。由于该版本 Spark(1.4.0)是在 Scala 2.10.4 的环境下编写的,需要在 Eclipse 中修改项目使用的 Scala 版本。方法为:全选项目,右键选择 Scala->Set the Scala Installation 并选择相应的 Scala 版本。
6.为 old-deps 项目添加 Scala Library:
右键选择 old-deps 项目,选择 Scala->Add Scala Library to Build Path。
7.Maven install 以生成 spark-streaming-flume-sink 所需要的类:
首先将源代码根目录中的 scalastyle-config.xml 文件复制到 spark-streaming-flume-sink 项目根目录中,而后在 Eclipse 中打开该项目,右键选择 pom.xml 文件,选择 Run as->Maven install。
8.修改 spark-sql 与 spark-hive 的包错误:
由于源代码的包设置有错误,为此需要将类文件移至正确的包中
对于 spark-sql 项目,分别选择 src/test/java 中的 test.org.apache.spark.sql 以及 test.org.apache.spark.sql.sources 包中的所有类,右键选择 Refactor->Move,移动至 org.apache.spark.sql 以及 org.apache.spark.sql.sources 包。
对于 spark-hive 项目,分别选择 src/test/java 中的 test.org.apache.spark.sql.hive 以及 test.org.apache.spark.sql.hive.execution 包中的所有类,移动至 org.apache.spark.sql.hive 以及 org.apache.spark.sql.hive.execution 包。
9.编译所有项目:
打开 Project->Build Automatically 功能,等待所有项目编译成功。
10.检查是否安装成功:
将 core 项目中的 src->main->resources->org 文件夹拷贝到 examples 项目中的 target->scala-2.10->classes 中。而后执行 examples 项目中的 org.apache.spark.examples.SparkPi 程序,并设置其 jvm 参数为-Dspark.master=local
4. org.apache.spark.api.java.optional在哪个包下
如果你想看源码的话:https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/api/java/Optional.java
如果你只想导入包的话(了解不多,1.6版本可以导入google的optional,网络“com.google.common”,2.1版本已有自己的optional):在spark-core_对应版本号.jar。
5. spark2.7.0源码在哪里下载
https://github.com/apache/spark
这里就可以下载。
6. 有本spark的书 第三章为sparkcontext 请问书名叫书名
我手上正好有本书,《Apache Spark源码剖析》,第3章是“sparkcontext初始化”,不知道是不是你找的
7. 如何使用intellij搭建spark开发环境
注意,客户端和虚拟集群中hadoop、spark、scala的安装目录是一致的,这样开发的spark应用程序的时候不需要打包spark开发包和scala的库文件,减少不必要的网络IO和磁盘IO。当然也可以不一样,不过在使用部署工具spark-submit的时候需要参数指明classpath。
1:IDEA的安装
官网jetbrains.com下载IntelliJ IDEA,有Community Editions 和& Ultimate Editions,前者免费,用户可以选择合适的版本使用。
根据安装指导安装IDEA后,需要安装scala插件,有两种途径可以安装scala插件:
启动IDEA -> Welcome to IntelliJ IDEA -> Configure -> Plugins -> Install JetBrains plugin... -> 找到scala后安装。
启动IDEA -> Welcome to IntelliJ IDEA -> Open Project -> File -> Settings -> plugins -> Install JetBrains plugin... -> 找到scala后安装。
如果你想使用那种酷酷的黑底界面,在File -> Settings -> Appearance -> Theme选择Darcula,同时需要修改默认字体,不然菜单中的中文字体不能正常显示。2:建立Spark应用程序
下面讲述如何建立一个Spark项目week2(,正在录制视频),该项目包含3个object:
取自spark examples源码中的SparkPi
计词程序WordCount1
计词排序程序WordCount2
A:建立新项目
创建名为dataguru的project:启动IDEA -> Welcome to IntelliJ IDEA -> Create New Project -> Scala -> Non-SBT -> 创建一个名为week2的project(注意这里选择自己安装的JDK和scala编译器) -> Finish。
设置week2的project structure
增加源码目录:File -> Project Structure -> Meles -> week2,给week2创建源代码目录和资源目录,注意用上面的按钮标注新增加的目录的用途。
增加开发包:File -> Project Structure -> Libraries -> + -> java -> 选择
/app/hadoop/spark100/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
/app/scala2104/lib/scala-library.jar可能会提示错误,可以根据fix提示进行处理
B:编写代码
在源代码scala目录下创建1个名为week2的package,并增加3个object(SparkPi、WordCoun1、WordCount2):
SparkPi代码
package week2
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.rece(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
复制代码
WordCount1代码
package week2
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object WordCount1 {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: WordCount1 <file1>")
System.exit(1)
}
val conf = new SparkConf().setAppName("WordCount1")
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).receByKey(_ + _).take(10).foreach(println)
sc.stop()
}
}
复制代码
WordCount2代码
package week2
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object WordCount2 {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: WordCount2 <file1>")
System.exit(1)
}
val conf = new SparkConf().setAppName("WordCount2")
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).receByKey(_ + _).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).take(10).foreach(println)
sc.stop()
}
}
复制代码
C:生成程序包
生成程序包之前要先建立一个artifacts,File -> Project Structure -> Artifacts -> + -> Jars -> From moudles with dependencies,然后随便选一个class作为主class。
按OK后,对artifacts进行配置,修改Name为week2,删除Output Layout中week2.jar中的几个依赖包,只剩week2项目本身。
按OK后, Build -> Build Artifacts -> week2 -> rebuild进行打包,经过编译后,程序包放置在out/artifacts/week2目录下,文件名为week2.jar。
3:Spark应用程序部署
将生成的程序包week2.jar复制到spark安装目录下,切换到用户hadoop,然后切换到/app/hadoop/spark100目录,进行程序包的部署。具体的部署参见应用程序部署工具spark-submit 。
8. spark独立模式还需要编译吗
spark有三种集群部署方式:
1、独立部署模式standalone,spark自身有一套完整的资源管理方式
2、架构于hadoop之上的spark集群
3、架构于mesos之上的spark集群
尝试了下搭建第一种独立部署模式集群,将安装方式记录如下:
环境ubuntu 12.04 (两台),部署方式是和hadoop类似,先在一台机器上部署成功后直接将文件打包拷贝到其他机器上,这里假设现在A机器上部署,并且A为master,最后B为slave
A和B均上创建用户spark
sudo useradd spark
以后spark的目录在集群所有机器的/home/spark/spark下(第一个spark是用户名,第二个spark是spark文件目录名)
保证A能无密码登陆到B上的spark用户,在ssh里面设置
这部分是现在master机器(A)上配置
0 首先保证A能无密码方式ssh至localhost和B ,具体方式参见: 点击打开链接
0.1 在A机器上执行
ssh-keygen -t rsa
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
ssh localhost
那么A可以实现无密码登陆localhost
0.2 在B机器上执行
ps -e|grep ssh
如果出现:
695 ? 00:00:00 sshd
1754 ? 00:00:00 ssh-agent
若没有sshd那么在B上执行
sudo apt-get install openssh-server
在B上安装ssh服务端(ubuntu有可能默认只有agent端)
0.3 在B上执行:
ssh-keygen -t rsa
scp spark@A:~/.ssh/authorized_keys ~/.ssh
第一句是为了保证在B上有.ssh目录
第二句是将A的公钥拷贝到B上,从而实现A无密码访问B
0.4 在A上执行gedit ~/.ssh/config添加
user spark
这里是为了A以默认用户spark无密码登陆B,其实这一步没有必要,因为A和B机器上都是在spark用户下操作的,那么机器A的saprk执行ssh B也是以spark用户登陆的
1 每台机器确保有java ,一个简单的方式:
sudo apt-get install eclipse
2 需要maven编译spark源码 ,下载maven 点击打开链接 ,随便下载一个版本
简单的方式:
sudo apt-get install maven
复杂的方式:
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.2.2/binaries/apache-maven-3.2.2-bin.tar.gz
tar -zxvf apache-maven-3.2.2-bin.tar.gz
mv apache-maven-3.2.2-bin.tar.gz maven
sudo mv maven /usr/local
然后gedit /etc/profile末尾添加如下:
#set maven environment
M2_HOME=/usr/local/maven
export MAVEN_OPTS="-Xms256m -Xmx512m"
export PATH=$M2_HOME/bin:$PATH
验证maven安装成功:
source /etc/profile
mvn -v
出现类似语句:Apache Maven 3.2.2 (; 2014-06-17T21:51:42+08:00)
3 下载spark, 点击打开链接 ,注意不要下载带有hadoop之类字样的版本,而是source package比如spark-1.0.0.tgz
tar -zxvf spark-1.0.0.tgz
mv spark-1.0.0 spark
cd spark
sh make-distribution.sh
最后一步会
编译spark源码
,过程可能有点长,取决于网络和机器配置,我的用了19min,编译成功类似如下图(图来自网上):
4 配置spark
4.1 gedit ./conf/spark-env.sh在spark-env.sh末尾添加如下:
export SPARK_MASTER_IP=A
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=1g
export MASTER=spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
注意这里的SPARK_MASTER_IP我觉得还是设置为master机器的IP地址比较好,这里我假设master的hostname是A
SPARK_WORKER_INSTANCES表示slave机器的数目,这里只有B一台故设为1
4.2 gedit ./conf/slaves添加B的hostname,这里B机器的hostname假设就为B故在文件中追加一个B即可。文件里原来有一个localhost如果你想要master同时也为worker机器那么可保留该行,否则可以删除
5 验证master机器A能否单机启动spark
9. spark 怎么启动worker
基于spark1.3.1的源码进行分析
Spark master启动源码分析
1、在start-master.sh调用master的main方法,main方法调用
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor
actorSystem.awaitTermination()
}
2、调用startSystemAndActor启动系统和创建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
3、调用AkkaUtils.createActorSystem来创建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、调用Utils.startServiceOnPort启动一个端口上的服务,创建成功后调用doCreateActorSystem创建ActorSystem
5、ActorSystem创建成功后创建Actor
6、调用Master的主构造函数,执行preStart()
1、start-slaves.sh调用Worker类的main方法
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2、调用startSystemAndActor启动系统和创建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
3、调用AkkaUtils的createActorSystem创建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、创建完ActorSystem后调用Worker的主构造函数,执行preStart方法
override def preStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
5、调用registerWithMaster方法向Master注册启动的worker
def registerWithMaster() {
// DisassociatedEvent may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheled.
registrationRetryTimer match {
case None =>
registered = false
tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheler.schele(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheled already.")
}
}
6、调用tryRegisterAllMasters向Master发送注册的Worker消息
private def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
7、Master的receiveWithLogging接收到消息执行
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schele()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
}
8、失败向worker返回失败消息,成功则返回Master的相关信息
9、返回消息后调用schele,但是因为没有application,所以这时候不会进行资源的分配
至此整个Spark集群就已经启动完成