博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark 源码分析之二 -- SparkContext 的初始化过程
阅读量:5022 次
发布时间:2019-06-12

本文共 15441 字,大约阅读时间需要 51 分钟。

创建或使用现有Session

从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下:

1 val spark = SparkSession2   .builder3   .appName("SparkTC")4   .getOrCreate()

首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如下:

  

1 def getOrCreate(): SparkSession = synchronized { 2   assertOnDriver() // 注意,spark session只能在 driver端创建并访问 3   // Get the session from current thread's active session. 4 // activeThreadSession 是一个InheritableThreadLocal(继承自ThreadLocal)方法。因为数据在 ThreadLocal中存放着,所以不需要加锁 5   var session = activeThreadSession.get() 6 // 如果session不为空,且session对应的sparkContext已经停止了,可以使用现有的session 7   if ((session ne null) && !session.sparkContext.isStopped) { 8     options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } 9     if (options.nonEmpty) {10       logWarning("Using an existing SparkSession; some configuration may not take effect.")11     }12     return session13   }14 15   // 给SparkSession 对象加锁,防止重复初始化 session16 SparkSession.synchronized {17     // If the current thread does not have an active session, get it from the global session.18 // 如果默认session 中有session存在,切其sparkContext 已经停止,也可以使用19     session = defaultSession.get()20     if ((session ne null) && !session.sparkContext.isStopped) {21       options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }22       if (options.nonEmpty) {23         logWarning("Using an existing SparkSession; some configuration may not take effect.")24       }25       return session26     }27 28     // 创建session29     val sparkContext = userSuppliedContext.getOrElse { // 默认userSuppliedContext肯定没有SparkSession对象30       val sparkConf = new SparkConf()31       options.foreach { case (k, v) => sparkConf.set(k, v) }32 33       // set a random app name if not given.34       if (!sparkConf.contains("spark.app.name")) {35         sparkConf.setAppName(java.util.UUID.randomUUID().toString)36       }37 38       SparkContext.getOrCreate(sparkConf)39       // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.40     }41 42     // Initialize extensions if the user has defined a configurator class.43     val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)44     if (extensionConfOption.isDefined) {45       val extensionConfClassName = extensionConfOption.get46       try {47         val extensionConfClass = Utils.classForName(extensionConfClassName)48         val extensionConf = extensionConfClass.newInstance()49           .asInstanceOf[SparkSessionExtensions => Unit]50         extensionConf(extensions)51       } catch {52         // Ignore the error if we cannot find the class or when the class has the wrong type.53         case e @ (_: ClassCastException |54                   _: ClassNotFoundException |55                   _: NoClassDefFoundError) =>56           logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)57       }58     }59    // 初始化 SparkSession,并把刚初始化的 SparkContext 传递给它60     session = new SparkSession(sparkContext, None, None, extensions)61     options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }62 // 设置 default session63     setDefaultSession(session)64 // 设置 active session65 setActiveSession(session)66 67     // Register a successfully instantiated context to the singleton. This should be at the68     // end of the class definition so that the singleton is updated only if there is no69     // exception in the construction of the instance.70     // 设置 apark listener ,当application 结束时,default session 重置71 sparkContext.addSparkListener(new SparkListener {72       override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {73         defaultSession.set(null)74       }75     })76   }77 78   return session79 }

org.apache.spark.SparkContext#getOrCreate方法如下:

1 def getOrCreate(config: SparkConf): SparkContext = { 2   // Synchronize to ensure that multiple create requests don't trigger an exception 3   // from assertNoOtherContextIsRunning within setActiveContext 4 // 使用Object 对象锁 5   SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { 6 // activeContext是一个AtomicReference 实例,它的数据set或update都是原子性的 7     if (activeContext.get() == null) { 8 // 一个session 只有一个 SparkContext 上下文对象 9       setActiveContext(new SparkContext(config), allowMultipleContexts = false)10     } else {11       if (config.getAll.nonEmpty) {12         logWarning("Using an existing SparkContext; some configuration may not take effect.")13       }14     }15     activeContext.get()16   }17 }

Spark Context 初始化

SparkContext 代表到 spark 集群的连接,它可以用来在spark集群上创建 RDD,accumulator和broadcast 变量。一个JVM 只能有一个活动的 SparkContext 对象,当创建一个新的时候,必须调用stop 方法停止活动的 SparkContext。

当调用了构造方法后,会初始化类的成员变量,然后进入初始化过程。由 try catch 块包围,这个 try catch 块是在执行构造函数时执行的,参照我写的一篇文章:

这块孤立的代码块如下:  

1 try {  2   // 1. 初始化 configuration  3   _conf = config.clone()  4   _conf.validateSettings()  5   6   if (!_conf.contains("spark.master")) {  7     throw new SparkException("A master URL must be set in your configuration")  8   }  9   if (!_conf.contains("spark.app.name")) { 10     throw new SparkException("An application name must be set in your configuration") 11   } 12  13   // log out spark.app.name in the Spark driver logs 14   logInfo(s"Submitted application: $appName") 15  16   // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster 17   if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { 18     throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + 19       "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") 20   } 21  22   if (_conf.getBoolean("spark.logConf", false)) { 23     logInfo("Spark configuration:\n" + _conf.toDebugString) 24   } 25  26   // Set Spark driver host and port system properties. This explicitly sets the configuration 27   // instead of relying on the default value of the config constant. 28   _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) 29   _conf.setIfMissing("spark.driver.port", "0") 30  31   _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) 32  33   _jars = Utils.getUserJars(_conf) 34   _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) 35     .toSeq.flatten 36   // 2. 初始化日志目录并设置压缩类 37   _eventLogDir = 38     if (isEventLogEnabled) { 39       val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) 40         .stripSuffix("/") 41       Some(Utils.resolveURI(unresolvedDir)) 42     } else { 43       None 44     } 45  46   _eventLogCodec = { 47     val compress = _conf.getBoolean("spark.eventLog.compress", false) 48     if (compress && isEventLogEnabled) { 49       Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) 50     } else { 51       None 52     } 53   } 54   // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener. 55   _listenerBus = new LiveListenerBus(_conf) 56  57   // Initialize the app status store and listener before SparkEnv is created so that it gets 58   // all events. 59   // 4. 给 app 提供一个 kv store(in-memory) 60   _statusStore = AppStatusStore.createLiveStore(conf) 61   // 5. 注册 AppStatusListener 到 LiveListenerBus 中 62   listenerBus.addToStatusQueue(_statusStore.listener.get) 63  64   // Create the Spark execution environment (cache, map output tracker, etc) 65   // 6. 创建 driver端的 env 66   // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。 67   // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv, 68   // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。 69   _env = createSparkEnv(_conf, isLocal, listenerBus) 70   SparkEnv.set(_env) 71  72   // If running the REPL, register the repl's output dir with the file server. 73   _conf.getOption("spark.repl.class.outputDir").foreach { path => 74     val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) 75     _conf.set("spark.repl.class.uri", replUri) 76   } 77   // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API 78   _statusTracker = new SparkStatusTracker(this, _statusStore) 79  80   // 8. console 进度条 81   _progressBar = 82     if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { 83       Some(new ConsoleProgressBar(this)) 84     } else { 85       None 86     } 87  88   // 9. spark ui, 使用jetty 实现 89   _ui = 90     if (conf.getBoolean("spark.ui.enabled", true)) { 91       Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", 92         startTime)) 93     } else { 94       // For tests, do not enable the UI 95       None 96     } 97   // Bind the UI before starting the task scheduler to communicate 98   // the bound port to the cluster manager properly 99   _ui.foreach(_.bind())100 101   // 10. 创建 hadoop configuration102   _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)103 104   // 11. Add each JAR given through the constructor105   if (jars != null) {106     jars.foreach(addJar)107   }108 109   if (files != null) {110     files.foreach(addFile)111   }112   // 12. 计算 executor 的内存113   _executorMemory = _conf.getOption("spark.executor.memory")114     .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))115     .orElse(Option(System.getenv("SPARK_MEM"))116     .map(warnSparkMem))117     .map(Utils.memoryStringToMb)118     .getOrElse(1024)119 120   // Convert java options to env vars as a work around121   // since we can't set env vars directly in sbt.122   for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))123     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {124     executorEnvs(envKey) = value125   }126   Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>127     executorEnvs("SPARK_PREPEND_CLASSES") = v128   }129   // The Mesos scheduler backend relies on this environment variable to set executor memory.130   // TODO: Set this only in the Mesos scheduler.131   executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"132   executorEnvs ++= _conf.getExecutorEnv133   executorEnvs("SPARK_USER") = sparkUser134 135   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will136   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)137   // 13. 创建 HeartbeatReceiver endpoint138   _heartbeatReceiver = env.rpcEnv.setupEndpoint(139     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))140 141   // Create and start the scheduler142   // 14. 创建 task scheduler 和 scheduler backend143   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)144   _schedulerBackend = sched145   _taskScheduler = ts146   // 15. 创建DAGScheduler实例147   _dagScheduler = new DAGScheduler(this)148   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)149 150   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's151   // constructor152   // 16. 启动 task scheduler153   _taskScheduler.start()154 155   // 17. 从task scheduler 获取 application ID156   _applicationId = _taskScheduler.applicationId()157   // 18. 从 task scheduler 获取 application attempt id158   _applicationAttemptId = taskScheduler.applicationAttemptId()159   _conf.set("spark.app.id", _applicationId)160   if (_conf.getBoolean("spark.ui.reverseProxy", false)) {161     System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)162   }163   // 19. 为ui 设置 application id164   _ui.foreach(_.setAppId(_applicationId))165   // 20. 初始化 block manager166   _env.blockManager.initialize(_applicationId)167 168   // The metrics system for Driver need to be set spark.app.id to app ID.169   // So it should start after we get app ID from the task scheduler and set spark.app.id.170   // 21. 启动 metricsSystem171   _env.metricsSystem.start()172   // Attach the driver metrics servlet handler to the web ui after the metrics system is started.173   // 22. 将 metricSystem 的 servlet handler 给 ui 用174   _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))175 176   // 23. 初始化 event logger listener177   _eventLogger =178     if (isEventLogEnabled) {179       val logger =180         new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,181           _conf, _hadoopConfiguration)182       logger.start()183       listenerBus.addToEventLogQueue(logger)184       Some(logger)185     } else {186       None187     }188 189   // Optionally scale number of executors dynamically based on workload. Exposed for testing.190   // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之191   val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)192   _executorAllocationManager =193     if (dynamicAllocationEnabled) {194       schedulerBackend match {195         case b: ExecutorAllocationClient =>196           Some(new ExecutorAllocationManager(197             schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,198             _env.blockManager.master))199         case _ =>200           None201       }202     } else {203       None204     }205   _executorAllocationManager.foreach(_.start())206 207   // 25. 初始化 ContextCleaner,并启动之208   _cleaner =209     if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {210       Some(new ContextCleaner(this))211     } else {212       None213     }214   _cleaner.foreach(_.start())215   // 26. 建立并启动 listener bus216   setupAndStartListenerBus()217   // 27.  task scheduler 已就绪,发送环境已更新请求218   postEnvironmentUpdate()219   // 28.  发送 application start 请求事件220   postApplicationStart()221 222   // Post init223   // 29.等待 直至task scheduler backend 准备好了224   _taskScheduler.postStartHook()225   // 30. 注册 dagScheduler metricsSource226   _env.metricsSystem.registerSource(_dagScheduler.metricsSource)227   // 31. 注册 metric source228   _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))229   //32. 注册 metric source230   _executorAllocationManager.foreach { e =>231     _env.metricsSystem.registerSource(e.executorAllocationManagerSource)232   }233 234   // Make sure the context is stopped if the user forgets about it. This avoids leaving235   // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM236   // is killed, though.237   logDebug("Adding shutdown hook") // force eager creation of logger238   // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作239   _shutdownHookRef = ShutdownHookManager.addShutdownHook(240     ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>241     logInfo("Invoking stop() from shutdown hook")242     try {243       stop()244     } catch {245       case e: Throwable =>246         logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)247     }248   }249 } catch {250   case NonFatal(e) =>251     logError("Error initializing SparkContext.", e)252     try {253       stop()254     } catch {255       case NonFatal(inner) =>256         logError("Error stopping SparkContext after init error.", inner)257     } finally {258       throw e259     }260 } 

 

从上面可以看出,spark context 的初始化是非常复杂的,涉及的spark 组件很多,包括 异步事务总线系统LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics监测系统、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暂且当作是总述,后面对部分组件会有比较全面的剖析。

转载于:https://www.cnblogs.com/johnny666888/p/11116052.html

你可能感兴趣的文章
支配集,点覆盖集,点独立集之间的联系
查看>>
SetCapture ReleaseCapture
查看>>
DataGridView ——管理员对用户的那点操作
查看>>
POJ - 1185 炮兵阵地 (状态压缩)
查看>>
ios7 JavaScriptCore.framework
查看>>
算法6-5:哈希表应用之集合
查看>>
压力单位MPa、Psi和bar之间换算公式
查看>>
Moscow Pre-Finals Workshop 2016. National Taiwan U Selection
查看>>
程序员面试、算法研究、编程艺术、红黑树4大系列集锦与总结 .
查看>>
idea tomcat 配置
查看>>
冲刺第二天
查看>>
LeetCode 405. Convert a Number to Hexadecimal (把一个数转化为16进制)
查看>>
ASP.NET MVC 3–Global Action Filters
查看>>
OFFICE安装提示1935错误
查看>>
jva基础网络编程
查看>>
js 正计时和倒计时
查看>>
复合数据类型,英文词频统计
查看>>
you-get帮助使用手册
查看>>
nyoj756_重建二叉树_先序遍历
查看>>
sin()函数的实现
查看>>