當前位置:首頁 » 操作系統 » apachespark源碼

apachespark源碼

發布時間: 2022-04-27 07:50:41

1. 怎麼用Eclipse搭建Spark源碼閱讀環境

第一部分、軟體安裝

1、 安裝JDK (版本為1.7.0_11)

2、 安裝Scala (版本為2.11.2)

3、 安裝ScalaIDE(版本為3.0.4)

第二部分:加壓縮官網下載的源代碼包或者找到通過Git抽取的Spark源文件:

我用的是spark-1.1.1版本(最新版本),由於idea 13已經原生支持sbt,所以無須為idea安裝sbt插件。

源碼下載(用git工具):

# Masterdevelopment branch

gitclone git://github.com/apache/spark.git

# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1

gitclone git://github.com/apache/spark.git -b branch-1.1

源碼更新(用git工具同步跟新源碼):

gitclone https://github.com/apache/spark.git

第三部分:通過sbt工具,構建Scala的Eclipse工程,詳細步驟如下所示

1、通過cmd命令進入DOS界面,之後通過cd命令進入源代碼項目中,我下載的Spark.1.1.1版本的源代碼放在(E:\Spark計算框架的研究\spark_1_1_1_eclipse)文件夾中,之後運行sbt命令,如下所示:

2、運行sbt命令之後,解析編譯相關的jar包,並出現sbt命令界面窗口,出現的效果圖如下所示,之後運行eclipse命令,sbt對這個工程進行編譯,構建Eclipse項目,效果圖如下所示:

4、 打開ScalaIDE工具,File à Import à Existing Projects into Workspace à
Next à
選擇剛好用sbt工具編譯好的Eclipse工程(E:\Spark計算框架的研究\spark_1_1_1_eclipse),如下圖所示。

5、 通過上面的操作,就可以將通過sbt工具編譯生成的Eclipse項目導入到EclipseIDE開發環境中,效果圖如下所示:

錯誤提示如下所示:我導入的包為,如下文件夾中所示。

(E:\Spark計算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)

Description Resource Path Location Type

akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled

in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with

an incompatible version of Scala (2.10). In case of errorneous report,

this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

上面這些包兼容性問題還沒有解決,修改相應的jar包就可以解決。

2. spark mllib演算法介面源碼在什麼地方查看

1.1LDA實例實例步驟:1)載入數據返回的數據格式為:documents:RDD[(Long,Vector)],其中:Long為文章ID,Vector為文章分詞後的詞向量;用戶可以讀取指定目錄下的數據,通過分詞以及數據格式的轉換,轉換成RDD[(Long,Vector)]即可。2)建立模型模型參數設置說明:k:主題數,或者聚類中心數DocConcentration:文章分布的超參數(Dirichlet分布的參數),必需>1.0TopicConcentration:主題分布的超參數(Dirichlet分布的參數),必需>1.0MaxIterations:迭代次數setSeed:隨機種子CheckpointInterval:迭代計算時檢查點的間隔Optimizer:優化計算方法,目前支持"em","online"3)結果輸出topicsMatrix以及topics(word,topic))輸出。實例代碼如下:[java]viewplainimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.mllib.clustering.LDAimportorg.apache.spark.mllib.linalg.Vectorsobjectlda{defmain(args:Array[String]){//0構建Spark對象valconf=newSparkConf().setAppName("lda")valsc=newSparkContext(conf)Logger.getRootLogger.setLevel(Level.WARN)//1載入數據,返回的數據格式為:documents:RDD[(Long,Vector)]//其中:Long為文章ID,Vector為文章分詞後的詞向量//可以讀取指定目錄下的數據,通過分詞以及數據格式的轉換,轉換成RDD[(Long,Vector)]即可valdata=sc.textFile("data/mllib/sample_lda_data.txt")valparsedData=data.map(s=>Vectors.dense(s.trim.split('').map(_.toDouble)))//=parsedData.zipWithIndex.map(_.swap).cache()//2建立模型,設置訓練參數,訓練模型/***k:主題數,或者聚類中心數*DocConcentration:文章分布的超參數(Dirichlet分布的參數),必需>1.0*TopicConcentration:主題分布的超參數(Dirichlet分布的參數),必需>1.0*MaxIterations:迭代次數*setSeed:隨機種子*CheckpointInterval:迭代計算時檢查點的間隔*Optimizer:優化計算方法,目前支持"em","online"*/valldaModel=newLDA().setK(3).setDocConcentration(5).setTopicConcentration(5).setMaxIterations(20).setSeed(0L).setCheckpointInterval(10).setOptimizer("em").run(corpus)//3模型輸出,模型參數輸出,結果輸出//Outputtopics.Eachisadistributionoverwords(matchingwordcountvectors)println("Learnedtopics(asdistributionsovervocabof"+ldaModel.vocabSize+"words):")valtopics=ldaModel.topicsMatrixfor(topic<-Range(0,3)){print("Topic"+topic+":")for(word<-Range(0,ldaModel.vocabSize)){print(""+topics(word,topic));}println()}}}

3. 如何對Spark 源碼修改後在Eclipse中使用

Eclipse 下開發調試環境的配置
該小節中使用的各項工具分別為:Windows 7+Eclipse Java EE 4.4.2+Scala 2.10.4+Sbt 0.13.8+Maven3.3.3,測試的 Spark 版本為 1.4.0。
1.配置 IDE:
選擇菜單項 Help->Install new software,添加站點 ,選擇安裝 Scala IDE for Eclipse 以及 Scala IDE Plugins。
對於標准版 Eclipse,還需要安裝單獨的 Maven 插件。
出於配置簡便考慮,也可以使用 Scala 官方提供的已將所有依賴打包好的 Scala IDE。
特別的,由於項目本身存在一些錯誤,請先暫時關閉 Project->Build Automatically 的功能以節省時間。
2.下載 Spark 源代碼:
創建空目錄,執行如下語句:git clone
除了使用 git 指令之外,也可以從 Spark 的 Github 頁面下載打包好的源代碼。
3.將源碼轉化為 Eclipse 項目:
進入源代碼根目錄,執行如下語句:sbt eclipse。Sbt 執行期間會下載 Spark 所需要的所有 jar 包,因此該步驟會花費很長的時間。其中有一些 jar 包需要使用網路代理等方法才能下載。
4.導入項目至 Eclipse:
選擇菜單項 File->Import,並選擇 General->Existing Projects into Workspace,項目的根路徑選擇源代碼的根路徑,導入所有項目(共有 25 個)。
5.修改 Scala 版本:
進入 Preference->Scala->Installations,添加機器上安裝的 Scala 2.10.4(選擇 lib 目錄)。由於該版本 Spark(1.4.0)是在 Scala 2.10.4 的環境下編寫的,需要在 Eclipse 中修改項目使用的 Scala 版本。方法為:全選項目,右鍵選擇 Scala->Set the Scala Installation 並選擇相應的 Scala 版本。
6.為 old-deps 項目添加 Scala Library:
右鍵選擇 old-deps 項目,選擇 Scala->Add Scala Library to Build Path。
7.Maven install 以生成 spark-streaming-flume-sink 所需要的類:
首先將源代碼根目錄中的 scalastyle-config.xml 文件復制到 spark-streaming-flume-sink 項目根目錄中,而後在 Eclipse 中打開該項目,右鍵選擇 pom.xml 文件,選擇 Run as->Maven install。
8.修改 spark-sql 與 spark-hive 的包錯誤:
由於源代碼的包設置有錯誤,為此需要將類文件移至正確的包中
對於 spark-sql 項目,分別選擇 src/test/java 中的 test.org.apache.spark.sql 以及 test.org.apache.spark.sql.sources 包中的所有類,右鍵選擇 Refactor->Move,移動至 org.apache.spark.sql 以及 org.apache.spark.sql.sources 包。
對於 spark-hive 項目,分別選擇 src/test/java 中的 test.org.apache.spark.sql.hive 以及 test.org.apache.spark.sql.hive.execution 包中的所有類,移動至 org.apache.spark.sql.hive 以及 org.apache.spark.sql.hive.execution 包。
9.編譯所有項目:
打開 Project->Build Automatically 功能,等待所有項目編譯成功。
10.檢查是否安裝成功:
將 core 項目中的 src->main->resources->org 文件夾拷貝到 examples 項目中的 target->scala-2.10->classes 中。而後執行 examples 項目中的 org.apache.spark.examples.SparkPi 程序,並設置其 jvm 參數為-Dspark.master=local

4. org.apache.spark.api.java.optional在哪個包下

如果你想看源碼的話:https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/api/java/Optional.java
如果你只想導入包的話(了解不多,1.6版本可以導入google的optional,網路「com.google.common」,2.1版本已有自己的optional):在spark-core_對應版本號.jar。

5. spark2.7.0源碼在哪裡下載

https://github.com/apache/spark
這里就可以下載。

6. 有本spark的書 第三章為sparkcontext 請問書名叫書名

我手上正好有本書,《Apache Spark源碼剖析》,第3章是「sparkcontext初始化」,不知道是不是你找的

7. 如何使用intellij搭建spark開發環境

注意,客戶端和虛擬集群中hadoop、spark、scala的安裝目錄是一致的,這樣開發的spark應用程序的時候不需要打包spark開發包和scala的庫文件,減少不必要的網路IO和磁碟IO。當然也可以不一樣,不過在使用部署工具spark-submit的時候需要參數指明classpath。
1:IDEA的安裝
官網jetbrains.com下載IntelliJ IDEA,有Community Editions 和& Ultimate Editions,前者免費,用戶可以選擇合適的版本使用。
根據安裝指導安裝IDEA後,需要安裝scala插件,有兩種途徑可以安裝scala插件:
啟動IDEA -> Welcome to IntelliJ IDEA -> Configure -> Plugins -> Install JetBrains plugin... -> 找到scala後安裝。
啟動IDEA -> Welcome to IntelliJ IDEA -> Open Project -> File -> Settings -> plugins -> Install JetBrains plugin... -> 找到scala後安裝。

如果你想使用那種酷酷的黑底界面,在File -> Settings -> Appearance -> Theme選擇Darcula,同時需要修改默認字體,不然菜單中的中文字體不能正常顯示。2:建立Spark應用程序
下面講述如何建立一個Spark項目week2(,正在錄制視頻),該項目包含3個object:
取自spark examples源碼中的SparkPi
計詞程序WordCount1
計詞排序程序WordCount2

A:建立新項目
創建名為dataguru的project:啟動IDEA -> Welcome to IntelliJ IDEA -> Create New Project -> Scala -> Non-SBT -> 創建一個名為week2的project(注意這里選擇自己安裝的JDK和scala編譯器) -> Finish。
設置week2的project structure
增加源碼目錄:File -> Project Structure -> Meles -> week2,給week2創建源代碼目錄和資源目錄,注意用上面的按鈕標注新增加的目錄的用途。


增加開發包:File -> Project Structure -> Libraries -> + -> java -> 選擇
/app/hadoop/spark100/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
/app/scala2104/lib/scala-library.jar可能會提示錯誤,可以根據fix提示進行處理

B:編寫代碼
在源代碼scala目錄下創建1個名為week2的package,並增加3個object(SparkPi、WordCoun1、WordCount2):


SparkPi代碼
package week2

import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.rece(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
復制代碼

WordCount1代碼

package week2

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object WordCount1 {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: WordCount1 <file1>")
System.exit(1)
}

val conf = new SparkConf().setAppName("WordCount1")
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).receByKey(_ + _).take(10).foreach(println)
sc.stop()
}
}
復制代碼

WordCount2代碼

package week2

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object WordCount2 {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: WordCount2 <file1>")
System.exit(1)
}

val conf = new SparkConf().setAppName("WordCount2")
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).receByKey(_ + _).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).take(10).foreach(println)
sc.stop()
}
}
復制代碼

C:生成程序包
生成程序包之前要先建立一個artifacts,File -> Project Structure -> Artifacts -> + -> Jars -> From moudles with dependencies,然後隨便選一個class作為主class。


按OK後,對artifacts進行配置,修改Name為week2,刪除Output Layout中week2.jar中的幾個依賴包,只剩week2項目本身。


按OK後, Build -> Build Artifacts -> week2 -> rebuild進行打包,經過編譯後,程序包放置在out/artifacts/week2目錄下,文件名為week2.jar。

3:Spark應用程序部署
將生成的程序包week2.jar復制到spark安裝目錄下,切換到用戶hadoop,然後切換到/app/hadoop/spark100目錄,進行程序包的部署。具體的部署參見應用程序部署工具spark-submit 。

8. spark獨立模式還需要編譯嗎

spark有三種集群部署方式:

1、獨立部署模式standalone,spark自身有一套完整的資源管理方式

2、架構於hadoop之上的spark集群

3、架構於mesos之上的spark集群

嘗試了下搭建第一種獨立部署模式集群,將安裝方式記錄如下:

環境ubuntu 12.04 (兩台),部署方式是和hadoop類似,先在一台機器上部署成功後直接將文件打包拷貝到其他機器上,這里假設現在A機器上部署,並且A為master,最後B為slave

A和B均上創建用戶spark

sudo useradd spark
以後spark的目錄在集群所有機器的/home/spark/spark下(第一個spark是用戶名,第二個spark是spark文件目錄名)

保證A能無密碼登陸到B上的spark用戶,在ssh裡面設置

這部分是現在master機器(A)上配置

0 首先保證A能無密碼方式ssh至localhost和B ,具體方式參見: 點擊打開鏈接

0.1 在A機器上執行

ssh-keygen -t rsa
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
ssh localhost
那麼A可以實現無密碼登陸localhost

0.2 在B機器上執行

ps -e|grep ssh
如果出現:

695 ? 00:00:00 sshd

1754 ? 00:00:00 ssh-agent

若沒有sshd那麼在B上執行

sudo apt-get install openssh-server
在B上安裝ssh服務端(ubuntu有可能默認只有agent端)

0.3 在B上執行:

ssh-keygen -t rsa
scp spark@A:~/.ssh/authorized_keys ~/.ssh
第一句是為了保證在B上有.ssh目錄

第二句是將A的公鑰拷貝到B上,從而實現A無密碼訪問B

0.4 在A上執行gedit ~/.ssh/config添加

user spark
這里是為了A以默認用戶spark無密碼登陸B,其實這一步沒有必要,因為A和B機器上都是在spark用戶下操作的,那麼機器A的saprk執行ssh B也是以spark用戶登陸的

1 每台機器確保有java ,一個簡單的方式:

sudo apt-get install eclipse
2 需要maven編譯spark源碼 ,下載maven 點擊打開鏈接 ,隨便下載一個版本

簡單的方式:

sudo apt-get install maven
復雜的方式:

wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.2.2/binaries/apache-maven-3.2.2-bin.tar.gz
tar -zxvf apache-maven-3.2.2-bin.tar.gz
mv apache-maven-3.2.2-bin.tar.gz maven
sudo mv maven /usr/local
然後gedit /etc/profile末尾添加如下:

#set maven environment
M2_HOME=/usr/local/maven
export MAVEN_OPTS="-Xms256m -Xmx512m"
export PATH=$M2_HOME/bin:$PATH
驗證maven安裝成功:

source /etc/profile
mvn -v
出現類似語句:Apache Maven 3.2.2 (; 2014-06-17T21:51:42+08:00)

3 下載spark, 點擊打開鏈接 ,注意不要下載帶有hadoop之類字樣的版本,而是source package比如spark-1.0.0.tgz

tar -zxvf spark-1.0.0.tgz
mv spark-1.0.0 spark
cd spark
sh make-distribution.sh
最後一步會

編譯spark源碼

,過程可能有點長,取決於網路和機器配置,我的用了19min,編譯成功類似如下圖(圖來自網上):

4 配置spark

4.1 gedit ./conf/spark-env.sh在spark-env.sh末尾添加如下:

export SPARK_MASTER_IP=A
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=1g
export MASTER=spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
注意這里的SPARK_MASTER_IP我覺得還是設置為master機器的IP地址比較好,這里我假設master的hostname是A

SPARK_WORKER_INSTANCES表示slave機器的數目,這里只有B一台故設為1

4.2 gedit ./conf/slaves添加B的hostname,這里B機器的hostname假設就為B故在文件中追加一個B即可。文件里原來有一個localhost如果你想要master同時也為worker機器那麼可保留該行,否則可以刪除

5 驗證master機器A能否單機啟動spark

9. spark 怎麼啟動worker

基於spark1.3.1的源碼進行分析
Spark master啟動源碼分析
1、在start-master.sh調用master的main方法,main方法調用
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//啟動系統和actor
actorSystem.awaitTermination()
}
2、調用startSystemAndActor啟動系統和創建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
3、調用AkkaUtils.createActorSystem來創建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、調用Utils.startServiceOnPort啟動一個埠上的服務,創建成功後調用doCreateActorSystem創建ActorSystem
5、ActorSystem創建成功後創建Actor
6、調用Master的主構造函數,執行preStart()

1、start-slaves.sh調用Worker類的main方法
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2、調用startSystemAndActor啟動系統和創建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
3、調用AkkaUtils的createActorSystem創建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、創建完ActorSystem後調用Worker的主構造函數,執行preStart方法
override def preStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()

metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
5、調用registerWithMaster方法向Master注冊啟動的worker
def registerWithMaster() {
// DisassociatedEvent may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheled.
registrationRetryTimer match {
case None =>
registered = false
tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheler.schele(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheled already.")
}
}
6、調用tryRegisterAllMasters向Master發送注冊的Worker消息
private def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
7、Master的receiveWithLogging接收到消息執行
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schele()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
}
8、失敗向worker返回失敗消息,成功則返回Master的相關信息
9、返回消息後調用schele,但是因為沒有application,所以這時候不會進行資源的分配

至此整個Spark集群就已經啟動完成

熱點內容
我的手機如何恢復安卓系統 發布:2025-01-20 00:55:48 瀏覽:366
eclipsejsp編譯 發布:2025-01-20 00:51:02 瀏覽:860
虛擬機連宿主機ftp 發布:2025-01-20 00:43:04 瀏覽:356
最小生成樹的prim演算法 發布:2025-01-20 00:39:40 瀏覽:325
淘寶助理無法上傳 發布:2025-01-20 00:34:33 瀏覽:883
如何做一個代理伺服器 發布:2025-01-20 00:18:39 瀏覽:803
android背單詞源碼 發布:2025-01-19 23:57:21 瀏覽:727
領動配置怎麼樣 發布:2025-01-19 23:56:35 瀏覽:83
python造數據 發布:2025-01-19 23:51:31 瀏覽:903
linux下卸載mysql 發布:2025-01-19 23:40:34 瀏覽:339