Spark作业提交方式

Spark作业提交方式

最近碰到个问题,在CDH上采用Spark Streaming方式运行Elasticsearch(ES)相关的处理程序,先后遇到两个JAR冲突:

  • ES的jackson.jar最低需求版本2.4,与CDH的hive-jdbc-1.1.0-cdh5.7.2-standalone.jar里包含的jackson版本冲突
  • Spark集群默认为JAVA1.7版本,而ES-5.4需要用JAVA1.8

解决问题过程中发现,不同的Spark提交作业方式,得有不同的解决办法,不过基本解决思路却是一致的:

  • 设置Spark启动Driver、Work时的JAVA_HOME,使之指向JAVA1.8,而非默认指向的JAVA1.7
  • 将ES依赖的jackson.jar放置在JAVA CLASS PATH最前面,以便优先使用。

本文主要聊聊常见模式下Spark Driver、Work的JAVA启动参数:

  • local模式
  • yarn-client模式
  • yarn-cluster模式

local模式

local模式下,spark driver、worker都在本地线程池里运行,位于同一个 JAVA进程。

1
2
3
4
5
spark-submit \
--master local[2] \
--jars jackson-core-2.8.6.jar,my-dependent.jar \
--class clife.data.spark.ESUpdate
my-main.jar
  1. 调用SPARK_HOME/bin/spark-submit脚本,转向spark-class

    1
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
  2. SPARK_HOME/bin/spark-class 加载spark env 变量后,启动JVM执行org.apache.spark.launcher.Main 获取后续执行命令,最后执行该命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    build_command() {
    "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
    printf "%d\0" $?
    }
    CMD=()
    while IFS= read -d '' -r ARG; do
    CMD+=("$ARG")
    done < <(build_command "$@")
    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"
  3. 运行org.apache.spark.launcher.Main,输出JAVA命令。

    • 会先将SPARK_CLASSPATH变量对应的值加入JAVA CLASSPATH,随后才是SPARK_CONF、SPARK_JAR、lib等等
    1
    2
    3
    4
    5
    6
    7
    8
    List<String> buildClassPath(String appClassPath) throws IOException {
    String sparkHome = getSparkHome();
    List<String> cp = new ArrayList<String>();
    addToClassPath(cp, getenv("SPARK_CLASSPATH"));
    addToClassPath(cp, appClassPath);
    addToClassPath(cp, getConfDir());
    • 输出命令:

      1
      2
      3
      4
      5
      6
      7
      8
      ${JAVA_HOME}/bin/java \
      -cp /CDH-5.7.2-1.cdh5.7/jars/* \
      -XX:MaxPermSize=256m \
      org.apache.spark.deploy.SparkSubmit \
      --master local[2] \
      --class my.main.class.name \
      ...
      my-main.jar
  4. SPARK_HOME/bin/spark-class运行上述命令,正式执行Spark程序

  5. jar包冲突的解决办法:

    • 解决jackson冲突: export SPARK_CLASSPATH=.:jackson-core-2.8.6.jar,会将该jar放在JVM CLASS_PATH最前面,优先加载
    • 解决java版本冲突:本地export JAVA_HOME=jdk1.8

yarn-client模式

会在本地启动spark driver,向yarn申请资源,集群运行task。

1
2
--master yarn \
--deploy-mode client \
解决Driver端的冲突

JAVA版本冲突:

  • 由于client模式的Driver运行在本地,可以本地export JAVA_HOME=jdk1.8,即可解决java版本问题。

Jackson Jar冲突:

  • 脚本运行流程上和local模式一致,在执行org.apache.spark.deploy.SparkSubmit时,与local模式出现不同。简单来说,就是根据输入的不同参数,生成不同的启动Driver的上下文。

  • 设置--verbose 可打印在SparkSubmit里通过反射执行childMainClass 时的参数。

  • 重点关注childClasspath,会将其首先加入classloader里:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    if (verbose) {
    printStream.println(s"Main class:\n$childMainClass")
    printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
    printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
    printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
    printStream.println("\n")
    }
    val loader =
    if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
    new ChildFirstURLClassLoader(new Array[URL](0),
    Thread.currentThread.getContextClassLoader)
    } else {
    new MutableURLClassLoader(new Array[URL](0),
    Thread.currentThread.getContextClassLoader)
    }
    Thread.currentThread.setContextClassLoader(loader)
    for (jar <- childClasspath) {
    addJarToClasspath(jar, loader)
    }
    1
    2
    3
    Classpath elements:
    file:/home/my/my-main.jar
    file:/home/my/jackson-core-2.8.6.jar
  • 而client模式下,一般通过 --jars方式提交jar包,会加载jar并传达给childClasspath

    1
    2
    3
    4
    5
    6
    7
    8
    if (deployMode == CLIENT) {
    childMainClass = args.mainClass
    if (isUserJar(args.primaryResource)) {
    childClasspath += args.primaryResource
    }
    if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    if (args.childArgs != null) { childArgs ++= args.childArgs }
    }
  • 这样在Driver端的jar冲突就解决了。

解决Executor Task的冲突

ApplicationMaster用于Spark向Yarn申请资源,ExecutorRunnable会生成Executor JVM启动时的参数(SPARK默认配置下会打印)。

1
2
3
4
5
6
7
8
9
10
11
12
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
appId, localResources)
logInfo(s"""
|===============================================================================
|YARN executor launch context:
| env:
|${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
| ${commands.mkString(" ")}
|===============================================================================
""".stripMargin)

JAVA版本冲突:

  • 会从环境变量里获取JAVA_HOME

    1
    2
    3
    val commands = prefixEnv ++ Seq(
    YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
    "-server",
  • Executor启动前的环境(设置环境变量、class-path、相关参数)也是在ExecutorRunnable里设置的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    lazy val env = prepareEnvironment(container) //准备Container启动的环境(环境变量、class-path、jvm参数、Executor参数)
    sparkConf.getExecutorEnv.foreach { case (key, value) =>
    // This assumes each executor environment variable set here is a path
    // This is kept for backward compatibility and consistency with hadoop
    YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
    }
    /** Get all executor environment variables set on this SparkConf */
    def getExecutorEnv: Seq[(String, String)] = {
    val prefix = "spark.executorEnv."
    getAll.filter{case (k, v) => k.startsWith(prefix)}
    .map{case (k, v) => (k.substring(prefix.length), v)}
    }
    def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
    val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value
    env.put(key, newValue)
    }
  • 解决办法:提交Spark作业时指定--conf spark.executorEnv.JAVA_HOME=/JAVA1.8-dir/,即可复写默认JAVA_HOME

Jackson.jar冲突:

  • ExecutorRunnable设置class-path,加载顺序为:

    • spark.executor.extraClassPath
    • Environment.PWD
    • spark.yarn.jarSPARK_JAR
    • HadoopClasspath
    • SPARK_DIST_CLASSPATH
  • CDH的hive-jdbc-1.1.0-cdh5.7.2-standalone.jar(含冲突jackson)位于HadoopClasspath

  • 解决办法:提交Spark作业时指定--conf spark.executor.extraClassPath=/jackson.jar

yarn-cluster模式

向yarn申请资源,运行driver、task。

1
2
--master yarn \
--deploy-mode cluster \

命令行提交spark作业时,spark-submit会自动JVM,执行main-class: org.apache.spark.deploy.Client,然后向Yarn申请资源,submitApplication,运行Driver。

解决Driver端冲突

yarn.Client.createContainerLaunchContext() 生成Driver提交环境。

解决java版本冲突:

  • 类似yarn-client模式下运行work,JAVA_HOME也是从环境变量中取,spark.yarn.appMasterEnv.JAVA_HOME可以覆写。

解决jackson.jar冲突:

  • 类似yarn-client模式下运行work,设置spark.driver.extraClassPath=/jackson.jar解决
解决Executor Task端冲突
  • 解决方法同yarn-client模式下运行的work

总结

在不能升级CDH集群配置前提下,解决Spark Jar冲突、JAVA版本不一致的方法其实很简单:

  • 定位Driver、Work的JVM启动命令
  • 将程序需要的Jar放置于class-path最前面
  • 改写默认JAVA_HOME