本章内容:
1、功能概述
SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。Spark 对任务的计算都依托于 Executor 的能力,所有的 Executor 都有自己的 Spark 的执行环境 SparkEnv。有了 SparkEnv,就可以将数据存储在存储体系中;就能利用计算引擎对计算任务进行处理,就可以在节点间进行通信等。在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。
创建SparkEnv主要使用SparkEnv的createDriverEnv方法,有四个参数:conf、isLocal、listenerBus 以及在本地模式下driver运行executor需要的numberCores。
/** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). * * NOTE: This is not intended for external use. This is exposed for Shark and may be made private * in a future release. */@DeveloperApiclass SparkEnv ( val executorId: String, private[spark] val rpcEnv: RpcEnv, val serializer: Serializer, val closureSerializer: Serializer, val serializerManager: SerializerManager, val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging
图1 在 Driver 上创建 SparkEnv
图2 在 Executor 上创建 SparkEnv
2、相关组件
名称 | 说明 |
SecurityManager | 主要对账户、权限及身份认证进行设置与管理。 |
RpcEnv | 各个组件之间通信的执行环境。 |
SerializerManager | Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。 |
BroadcastManager | 用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。 |
MapOutputTracker | 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。 |
ShuffleManager | 负责管理本地及远程的Block数据的shuffle操作。 |
MemoryManager | 一个抽象的内存管理器,用于执行内存如何在执行和存储之间共享。 |
NettyBlockTransferService | 使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上Block的集合。 |
BlockManagerMaster | 负责对BlockManager的管理和协调。 |
BlockManager | 负责对Block的管理,管理整个Spark运行时的数据读写的,当然也包含数据存储本身,在这个基础之上进行读写操作。 |
MetricsSystem | 一般是为了衡量系统的各种指标的度量系统。 |
OutputCommitCoordinator | 确定任务是否可以把输出提到到HFDS的管理者,使用先提交者胜的策略。 |
3、代码分析
代码 | 说明 |
// Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) | 创建 Spark 运行时环境(包括:cache、map output tracker 等)
|
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } | 这个函数允许 SparkEnv 创建的组件在测试单元中被模仿。 类名:SparkContext 函数:createSparkEnv 参数:
用意:直接调用 SparkEnv.createDriverEnv()函数 |
/** * Create a SparkEnv for the driver. */ private[spark] def createDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val bindAddress = conf.get(DRIVER_BIND_ADDRESS) val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS) val port = conf.get("spark.driver.port").toInt val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(conf)) } else { None } create( conf, SparkContext.DRIVER_IDENTIFIER, bindAddress, advertiseAddress, Option(port), isLocal, numCores, ioEncryptionKey, listenerBus = listenerBus, mockOutputCommitCoordinator = mockOutputCommitCoordinator ) } | 为 Driver 创建一个 SparkEnv 对象 类名:SparkEnv 函数:createDriverEnv 参数:
用意:做了 HOST和 PORT 判断,然后调用 create()函数 |
/** * Helper method to create a SparkEnv for a driver or an executor. */ private def create( conf: SparkConf, executorId: String, bindAddress: String, advertiseAddress: String, port: Option[Int], isLocal: Boolean, numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER // Listener bus is only used on the driver if (isDriver) { assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!") } val securityManager = new SecurityManager(conf, ioEncryptionKey) if (isDriver) { securityManager.initializeAuth() } ioEncryptionKey.foreach { _ => if (!securityManager.isEncryptionEnabled()) { logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " + "wire.") } } val systemName = if (isDriver) driverSystemName else executorSystemName val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf, securityManager, numUsableCores, !isDriver) // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. if (isDriver) { conf.set("spark.driver.port", rpcEnv.address.port.toString) } // Create an instance of the class with the given name, possibly initializing it with our conf def instantiateClass[T](className: String): T = { val cls = Utils.classForName(className) // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just // SparkConf, then one taking no arguments try { cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) .newInstance(conf, new java.lang.Boolean(isDriver)) .asInstanceOf[T] } catch { case _: NoSuchMethodException => try { cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] } catch { case _: NoSuchMethodException => cls.getConstructor().newInstance().asInstanceOf[T] } } } // Create an instance of the class named by the given SparkConf property, or defaultClassName // if the property is not set, possibly initializing it with our conf def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { instantiateClass[T](conf.get(propertyName, defaultClassName)) } val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) val closureSerializer = new JavaSerializer(conf) def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else { new MapOutputTrackerWorker(conf) } // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) } val blockManagerPort = if (isDriver) { conf.get(DRIVER_BLOCK_MANAGER_PORT) } else { conf.get(BLOCK_MANAGER_PORT) } val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) val metricsSystem = if (isDriver) { // Don't start metrics system right now for Driver. // We need to wait for the task scheduler to give us an app ID. // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. conf.set("spark.executor.id", executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) val envInstance = new SparkEnv( executorId, rpcEnv, serializer, closureSerializer, serializerManager, mapOutputTracker, shuffleManager, broadcastManager, blockManager, securityManager, metricsSystem, memoryManager, outputCommitCoordinator, conf) // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath envInstance.driverTmpDir = Some(sparkFilesDir) } envInstance } | 为 Driver 和 Executor 创建一个 SparkEnv 的 Helper 方法 类名:SparkEnv 函数:create 用意:
|
3.1 创建安全管理器 SecurityManager
SecurityManager主要对帐号、权限以及身份认证进行设置和管理。如果 Spark 的部署模式为 YARN,则需要生成 secret key (密钥)并存储 Hadoop UGI。而在其他模式下,则需要设置环境变量 _SPARK_AUTH_SECRET(优先级更高)或者 spark.authenticate.secret 属性指定 secret key (密钥)。最后SecurityManager 中设置了默认的口令认证实例 Authenticator,此实例采用匿名内部类实现,用于每次使用 HTTP client 从 HTTP 服务器获取用户的用户和密码。这是由于 Spark 的节点间通信往往需要动态协商用户名、密码,这种方式灵活地支持了这种需求。
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval securityManager = new SecurityManager(conf, ioEncryptionKey)if (isDriver) { securityManager.initializeAuth()} // 变量处理// 第一步:new SecurityManager()// 包名:org.apache.spark// 类名:SecurityManager// 使用 HTTP 链接设置口令认证// Set our own authenticator to properly negotiate(协商/达成) user/password for HTTP connections.// This is needed by the HTTP client fetching from the HttpServer. Put here so its// only set once.if (authOn) { Authenticator.setDefault( // 创建口令认证实例,复写PasswordAuthentication方法,获得用户名和密码 new Authenticator() { override def getPasswordAuthentication(): PasswordAuthentication = { var passAuth: PasswordAuthentication = null val userInfo = getRequestingURL().getUserInfo() if (userInfo != null) { val parts = userInfo.split(":", 2) passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray()) } return passAuth } } )} // 第二步:initializeAuth()// 包名:org.apache.spark// 类名:SecurityManager/** * Initialize the authentication secret. * * If authentication is disabled, do nothing. * * In YARN mode, generate a new secret and store it in the current user's credentials. * * In other modes, assert that the auth secret is set in the configuration. */def initializeAuth(): Unit = { if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { return } if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") { require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") return } val rnd = new SecureRandom() val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE val secretBytes = new Array[Byte](length) rnd.nextBytes(secretBytes) val creds = new Credentials() val secretStr = HashCodes.fromBytes(secretBytes).toString() creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8)) UserGroupInformation.getCurrentUser().addCredentials(creds)}
3.2 创建 RPC 通信层 RpcEnv
Spark1.6推出的RpcEnv、RpcEndPoint、RpcEndpointRef为核心的新型架构下的RPC通信方式,在底层封装了Akka和Netty,为未来扩充更多的通信系统提供了可能。RpcEnv是RPC的环境,所有的RpcEndpoint都需要注册到RpcEnv实例对象中,管理着这些注册的RpcEndpoint的生命周期:
- 根据name或者uri注册RpcEndpoint;
- 管理各种消息的处理;
- 停止RpcEndpoint
Spark RPC中最为重要的三个抽象(“三剑客”)为:RpcEnv、RpcEndpoint、RpcEndpointRef,这样做的好处有:
- 对上层的API来说,屏蔽了底层的具体实现,使用方便
- 可以通过不同的实现来完成指定的功能,方便扩展
- 促进了底层实现层的良性竞争,Spark 1.6.3中默认使用了Netty作为底层的实现,但Akka的依赖依然存在;而Spark 2.1.0中的底层实现只有Netty,这样用户可以方便的使用不同版本的Akka或者将来某种更好的底层实现
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvprivate[spark] val driverSystemName = "sparkDriver"private[spark] val executorSystemName = "sparkExecutor" val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER val systemName = if (isDriver) driverSystemName else executorSystemNameval rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf, securityManager, numUsableCores, !isDriver) // 变量处理// 第一步// 包名:org.apache.spark.rpc// 类名:RpcEnvdef create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, numUsableCores: Int, clientMode: Boolean): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, numUsableCores, clientMode) new NettyRpcEnvFactory().create(config)} // 第二步// 包名:org.apache.spark.rpc.netty// 类名:NettyRpcEnvdef create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager, config.numUsableCores) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv}
3.3 创建序列化管理器 SerializerManager
Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。SparkEnv 中有两个序列化组件,分别是SerializerManager和ClosureSerializer。
创建 SparkEnv 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnv// Create an instance of the class named by the given SparkConf property, or defaultClassName// if the property is not set, possibly initializing it with our confdef instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { instantiateClass[T](conf.get(propertyName, defaultClassName))} val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer")logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) val closureSerializer = new JavaSerializer(conf)
可以看到这里创建的serializer默认为org.apache.spark.serializer.JavaSerializer,用户可以通过spark.serializer属性配置其他的序列化实现,如org.apache.spark.serializer.KryoSerializer。而 closureSerializer 的实际类型固定为org.apache.spark.serializer.JavaSerializer,用户不能够自己指定。JavaSerializer采用 Java 语言自带的序列化 API 实现。
3.4 创建广播管理器 BroadcastManager
BroadcastManager用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。
图3 向 Executor 广播一个变量
创建 BroadcastManager 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval broadcastManager = new BroadcastManager(isDriver, conf, securityManager) // 变量处理// 包名:org.apache.spark.broadcast// 类名:BroadcastManagerinitialize() // Called by SparkContext or Executor before using Broadcastprivate def initialize() { synchronized { if (!initialized) { broadcastFactory = new TorrentBroadcastFactory broadcastFactory.initialize(isDriver, conf, securityManager) initialized = true } }}
BroadcastManager 在其初始化的过程中就会调用自身的 initialize 方法,当 initialize 执行完毕,BroadcastManager 就会正式生效。
3.5 创建 Map 任务输出跟踪器 MapOutputTracker
MapOutputTracker 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。每个Map任务或者Reduce任务都会有其唯一的标识,分别为mapId 和 reduceId。每个Reduce任务的输入可能是多个Map任务的输出,Reduce会到各个Map任务的所在节点上拉取Block,这一过程叫做Shuffle。每个Shuffle过程都有唯一的表示shuffleId。
MapOutputTracker 有两个子类:MapOutputTrackerMaster(for driver) 和 MapOutputTrackerWorker(for executors);因为它们使用了不同的HashMap来存储元数据。
创建 MapOutputTracker 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal)} else { new MapOutputTrackerWorker(conf)} // 变量处理// 第一步// 包名:org.apache.spark// 类名:SparkEnv// MapOutputTracker.ENDPOINT_NAME 变量声明为 val ENDPOINT_NAME = "MapOutputTracker"// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint// requires the MapOutputTracker itselfmapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) // 第二步// 包名:org.apache.spark// 类名:SparkEnvdef registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) }} // 第三步// 包名:org.apache.spark.rpc.netty// 类名:NettyRpcEnvoverride def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint)} // 包名:org.apache.spark.util// 类名:RpcUtils/** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)}
在 MapOutputTracker 初始化中,可以看到针对当前实例是 Driver 还是 Executor,创建其方式有所不同。
- 如果当前应用程序是 Driver,则创建 MapOutputTrackerMaster,然后创建 MapOutputTrackerMasterEndpoint,并且注册到 Dispatcher 中,注册名为 MapOutputTracker;
- 如果当前应用程序是 Executor,则创建 MapOutputTrackerWorker,并从远端 Driver 实例的 NettyRpcEnv 的 Dispatcher 中查找 MapOutputTrackerMasterEndpoint 的引用。
无论是 Driver 还是 Executor,最后都由 MapOutputTracker 的属性 trackerEndpoint 持有 MapOutputTrackerEndpoint 的引用。
3.6 创建 ShuffleManager
ShuffleManager负责管理本地及远程的Block数据的shuffle操作。ShuffleManager根据默认的 spark.shuffle.manager 属性,通过反射方式生成的SortShuffleManager的实例。默认使用的是sort模式的SortShuffleManager,Spark 2.x.x 版本提供 sort 和 tungsten-sort 两种 ShuffleManager 的实现。无论是 sort 还是 tungsten-sort,我们看到实现类都是 SortShuffleManager。
// 变量声明// 包名:org.apache.spark// 类名:SparkEnv// Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) // 变量处理// 第一步// 包名:org.apache.spark// 类名:SparkEnv// Create an instance of the class with the given name, possibly initializing it with our confdef instantiateClass[T](className: String): T = { val cls = Utils.classForName(className) // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just // SparkConf, then one taking no arguments try { cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) .newInstance(conf, new java.lang.Boolean(isDriver)) .asInstanceOf[T] } catch { case _: NoSuchMethodException => try { cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] } catch { case _: NoSuchMethodException => cls.getConstructor().newInstance().asInstanceOf[T] } }} // 第二步// 包名:org.apache.spark.util// 类名:Utils// scalastyle:off classforname/** Preferred alternative to Class.forName(className) */def classForName(className: String): Class[_] = { Class.forName(className, true, getContextOrSparkClassLoader) // scalastyle:on classforname} /** * Get the Context ClassLoader on this thread or, if not present, the ClassLoader that * loaded Spark. * * This should be used whenever passing a ClassLoader to Class.ForName or finding the currently * active loader when setting up ClassLoader delegation chains. */def getContextOrSparkClassLoader: ClassLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader) /** * Get the ClassLoader which loaded Spark. */def getSparkClassLoader: ClassLoader = getClass.getClassLoader
3.7 创建内存管理器 MemoryManager
MemoryManager 的主要实现有 StaticMemoryManager 和 UnifiedMemoryManager。其中 StaticMemoryManager 是 Spark 早期版本遗留下来的内存管理器实现,可以配置 spark.memory.useLegacyMode 属性来指定,该属性默认为 false,因此默认的内存管理器是 UnifiedMemoryManager;而UnifiedMemoryManager 是在Spark1.6中增加了一个新的内存管理模型,该模型可以使得execution部分和storage部分的内存不像之前的(StaticMemoryManager)由比例参数限定住,而是两者可以互相借用空闲的内存。
创建 MemoryManager 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) }
3.8 创建块传输服务 NettyBlockTransferService
在Spark1.6中只保留了NettyBlockTransferService,已经没有了NioBlockTransferService。NettyBlockTransferService使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上Block的集合。 在这里使用的是 BlockTransferService 的子类 NettyBlockTransferService创建块传输服务 BlockTransferService,NettyBlockTransferService 将提供对外的块传输服务。也正是因为 MapOutputTracker 与 NettyBlockTransferService 的配合,才实现了 Spark 的 Shuffle。
创建 BlockTransferManager 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval blockManagerPort = if (isDriver) { conf.get(DRIVER_BLOCK_MANAGER_PORT)} else { conf.get(BLOCK_MANAGER_PORT)} val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores)
3.9 创建 BlockManagerMaster
BlockManagerMaster 负责对BlockManager的管理和协调,具体操作依赖于BlockManagerMasterEndpoint。
创建 BlockManagerMaster 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // 第二步// 包名:org.apache.spark// 类名:SparkEnvdef registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) }} // 第三步// 包名:org.apache.spark.rpc.netty// 类名:NettyRpcEnvoverride def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint)} // 包名:org.apache.spark.util// 类名:RpcUtils/** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)}
图4 为 Driver 创建块管理的流程
图5 为 Executor 创建块管理的流程
这里通过 registerOrLookupEndpoint 方法查找或者注册 BlockManagerMasterEndpoint,而对Driver和Executor处理BlockManagerMaster的方式不同:
- 当前应用程序是 Driver,则创建 BlockManagerMasterEndpoint,并且注册到 Dispatcher 中,注册名为 BlockManagerMaster;
- 当前应用程序是 Executor,则从远端 Driver 实例的 NettyRpcEnv 的 Dispatcher 中查找 BlockManagerMasterEndpoint 的引用。
无论是 Driver 还是 Executor,最后都由 BlockManagerMaster 的属性 driverEndpoint 持有 BlockManagerMasterEndpoint 的引用。
提示:这里的BlockManagerMaster 的创建逻辑与 MapOutputTracker 基本一致,可以互相对照着分析,能更好理解 Spark RPC 服务。
3.10 创建块管理器 BlockManager
BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize()被调用后才是有效的。
创建 BlockManager 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnv// NB: blockManager is not valid until initialize() is called later.val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) // 变量处理// 包名:org.apache.spark// 类名:SparkContext_env.blockManager.initialize(_applicationId)
BlockManager 对象在 SparkContext 初始化创建 SparkEnv 执行环境被创建,而在 SparkContext 后续的初始化过程中调用其initialize()完成其初始化。
3.11 创建测量系统 MetricsSystem
MetricsSystem 是Spark的测量系统,在 SparkEnv 中,度量系统也是必不可少的一个子组件。
创建 MetricsSystem 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval metricsSystem = if (isDriver) { // Don't start metrics system right now for Driver. // We need to wait for the task scheduler to give us an app ID. // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager)} else { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. conf.set("spark.executor.id", executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms} // 变量处理// 第一步// 包名:org.apache.spark.metrics// 类名:MetricsSystemdef createMetricsSystem( instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr)} // 第二步(当是 Driver 时)// 包名:org.apache.spark// 类名:SparkContext// The metrics system for Driver need to be set spark.app.id to app ID.// So it should start after we get app ID from the task scheduler and set spark.app.id._env.metricsSystem.start()// Attach the driver metrics servlet handler to the web ui after the metrics system is started._env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
根据代码描述,可以看出创建度量系统根据当前实例是 Driver 还是 Executor 有所区别:
- 当前实例为 Driver:创建度量系统,并且指定度量系统的实例名为 driver。此时虽然创建了,但是并未启动,目的是等待 SparkContext 中的任务调度器 TaskScheculer 告诉度量系统应用程序ID后再启动。
- 当前实例为 Executor:设置spark.executor.id属性为当前 Executor 的ID,然后再创建并启动度量系统。
创建度量系统使用了伴生对象 MetricsSystem 的 createMetricsSystem 方法(类似 Java 的静态方法)
3.12 创建 OutputCommitCoordinator
当 Spark 应用程序使用了 Spark SQL (包括 Hive)或者需要将任务的输出保存到 HDFS 时,就会用到输出提交协调器 OutputCommitCoordinator,OutputCommitCoordinator 将决定任务是否可以提交输出到 HDFS。无论是 Driver 还是 Executor,在 SparkEnv 中都包含了子组件 OutputCommitCoordinator。在 Driver 上注册了 OutputCommitCoordinatorEndpoint,在所有 Executor 上的 OutputCommitCoordinator 都是通过 OutputCommitCoordinatorEndpoint 的 RpcEndpointRef 来询问 Driver 上的 OutputCommitCoordinator,是否能够将输出提交到 HDFS。
创建 OutputCommitCoordinator 的代码:
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver)}val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) // 第二步// 包名:org.apache.spark// 类名:SparkEnvdef registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) }} // 第三步// 包名:org.apache.spark.rpc.netty// 类名:NettyRpcEnvoverride def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint)} // 包名:org.apache.spark.util// 类名:RpcUtils/** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)}
根据代码可以看出 OutputCommitCoordinator 的创建步骤如下:
- 当前实例为 Driver 时,则创建 OutputCommitCoordinatorEndpoint,并且注册到 Dispatcher 中,注册名为 OutputCommitCoordinator;
- 当前实例为 Executor 时,则从远端 Driver 实例的 NettyRpcEnv 的 Dispatcher 中查找 OutputCommitCoordinatorEndpoint 的引用。
无论是 Driver 还是 Executor,最后都由 OutputCommitCoordinator 的属性 coordinatorRef 持有 OutputCommitCoordinatorEndpoint 的引用。
提示:这里的BlockManagerMaster 的创建逻辑与 MapOutputTracker 基本一致,可以互相对照着分析,能更好理解 Spark RPC 服务。
3.13 创建 SparkEnv
当 SparkEnv 内的所有组件都实例化完毕,将正式构建 SparkEnv。
// 变量声明// 包名:org.apache.spark// 类名:SparkEnvval envInstance = new SparkEnv( executorId, rpcEnv, serializer, closureSerializer, serializerManager, mapOutputTracker, shuffleManager, broadcastManager, blockManager, securityManager, metricsSystem, memoryManager, outputCommitCoordinator, conf)
如果当前实例为 Driver 时,还要为其创建临时目录,相关代码如下:
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is// called, and we only need to do it for driver. Because driver may run as a service, and if we// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.if (isDriver) { val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath envInstance.driverTmpDir = Some(sparkFilesDir)}
从上面的注释可以看出,当 Driver 调用 stop() 函数停止时,这些创建的临时目录将会被删除。但是当一个 SparkContext 实例停止时,则不会被删除,因为 Driver 是作为一个服务运行的,因此将会创建很多的临时目录。
参考文献:
- 深入理解 Spark - 核心思想与源码分析 @耿嘉安
- Spark 内核设计的艺术 - 架构设计与实现 @耿嘉安
- Spark 大数据处理 - 技术、应用与性能优化 @高彦杰
- 图解 Spark 核心技术与案例实战 @郭景瞻
- Spark 技术内幕 - 深入解析 Spark 内核、架构设计与实现原理 @张安站