Spark集群计算环境配置和使用

Hadoop配置与使用基础上配置Spark集群计算环境,以及简单Scala程序在集群的运行。

在Hadoop通用并行框架上配置Spark

  • Apache Spark下载合适的版本,以Pre-build for Hadoop 2.6为例,将压缩包解压到某目录,该压缩包目录作为环境变量SPARK_HOME
  • 向环境变量添加%SPARK_HOME%/bin并source生效。
  • %HADOOP_HOME%\etc\hadoop作为HADOOP_CONF_DIR添加到环境变量。
  • 按Hadoop集群的运行方式启动Hadoop
1
2
3
4
bin/hdfs namenode -format
bin/hdfs dfs -mkdir /user/<username>
sbin/start-dfs.sh
sbin/start-yarn.sh

在Hadoop集群上执行Spark作业

一个Scala的MapReduce样本

  • 此样例为多维属性向量的均值计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Statistic {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("example")
val spark = new SparkContext(conf)
val set = spark.textFile(args(0)).flatMap {
line => line.split(" ").map { opt => (opt.split(":")(0).toInt, opt.split(":")(1).toFloat) }
}.reduceByKey(_+_)
val count = set.count()
set.collect.map { case (key, value) => value/count}.foreach{println}
spark.stop()
}
}
  • 样例代码中,set为(Int, Float)的<key, value>对的RDD,通过reduceByKey将相同属性求和,在最后collect回收所有分布式集群上的结果汇总到本地。

执行作业

  • 将要执行的代码export为jar包,一下文件名为example.jar
  • ssh到localhost(此处使用伪分布式Hadoop集群)。
  • 执行如下命令,其中path_to_example_jar为jar包所在的位置,MainClass为jar包中要执行的object,args为传入MainClass的参数列表,空格隔开,可变长。
1
spark-submit --class MainClass --master yarn-cluster path_to_example_jar args...
  • 执行作业后可在Hadoop监视器的logs中看到输出。

Scala的部分Spark API


原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(https://forec.github.io/2016/07/11/spark-initial/) 、作者信息(Forec)和本声明。

分享到