Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/05/19 09:58:38 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.10.19.186 instead (on interface eth0) 15/05/19 09:58:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/05/19 09:58:38 INFO SecurityManager: Changing view acls to: root 15/05/19 09:58:38 INFO SecurityManager: Changing modify acls to: root 15/05/19 09:58:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/05/19 09:58:43 INFO DAGScheduler: Stopping DAGScheduler 15/05/19 09:58:44 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/05/19 09:58:44 INFO MemoryStore: MemoryStore cleared 15/05/19 09:58:44 INFO BlockManager: BlockManager stopped 15/05/19 09:58:44 INFO BlockManagerMaster: BlockManagerMaster stopped 15/05/19 09:58:44 INFO SparkContext: Successfully stopped SparkContext 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { l.add(i); }
for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2 echo "You need to build Spark before running this program" 1>&2 exit 1 fi SPARK_EXAMPLES_JAR="$f" JAR_COUNT=$((JAR_COUNT+1)) done
if [ "$JAR_COUNT" -gt "1" ]; then echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2 ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2 echo "Please remove all but one jar." 1>&2 exit 1 fi
case "$1" in # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master') OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS" OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} ;;
Show moreShow more icon
对于 JDK8 有特殊的设置,JDK8 开始不再支持 MaxPermSize 等参数设置 JVM。
清单 20. JDK8
1 2 3 4 5 6 7 8
# Set JAVA_OPTS to be able to load native libraries and to set heap size if [ "$JAVA_VERSION" -ge 18 ]; then JAVA_OPTS="$OUR_JAVA_OPTS" else JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS" fi JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then # This is used only if the properties file actually contains these special configs # Export the environment variables needed by SparkSubmitDriverBootstrapper export RUNNER export CLASSPATH export JAVA_OPTS export OUR_JAVA_MEM export SPARK_CLASS=1 shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@" else # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then echo -n "Spark Command: " 1>&2 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 echo -e "========================================\n" 1>&2 fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" fi
private[spark] object SparkSubmitDriverBootstrapper { // Start the driver JVM val filteredCommand = command.filter(_.nonEmpty) val builder = new ProcessBuilder(filteredCommand) val env = builder.environment()
if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) { val libraryPaths = confLibraryPath ++ sys.env.get( Utils.libraryPathEnvName) env.put(Utils.libraryPathEnvName, libraryPaths.mkString( sys.props("path.separator"))) }
val process = builder.start()
// If we kill an app while it's running, its sub-process should be killed too. Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { if (process != null) { process.destroy() process.waitFor() } } })
// Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") stdoutThread.start() stderrThread.start()
// Redirect stdin to child JVM only if we're not running Windows. This is because the // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows val isSubprocess = sys.env.contains("IS_SUBPROCESS") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",propagateEof = true) stdinThread.start() // Spark submit (JVM) may run as a subprocess,and so this JVM should terminate on // broken pipe, signaling that the parent process has exited. //This is the case if the application is launched directly from python, //as in the PySpark shell. In Windows,the termination logic is handled in java_gateway.py if (isSubprocess) { stdinThread.join() process.destroy() } } val returnCode = process.waitFor() sys.exit(returnCode) }
Reprint policy:
All articles in this blog are used except for special statements
CC BY 4.0
reprint policy. If reproduced, please indicate source
John Doe
!