在Hadoop配置与使用基础上配置Spark集群计算环境,以及简单Scala程序在集群的运行。
在Hadoop通用并行框架上配置Spark
- 从Apache Spark下载合适的版本,以
Pre-build for Hadoop 2.6
为例,将压缩包解压到某目录,该压缩包目录作为环境变量SPARK_HOME
。
- 向环境变量添加
%SPARK_HOME%/bin
并source生效。
- 将
%HADOOP_HOME%\etc\hadoop
作为HADOOP_CONF_DIR
添加到环境变量。
- 按Hadoop集群的运行方式启动Hadoop
1 2 3 4
| bin/hdfs namenode -format bin/hdfs dfs -mkdir /user/<username> sbin/start-dfs.sh sbin/start-yarn.sh
|
在Hadoop集群上执行Spark作业
一个Scala的MapReduce样本
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Statistic { def main(args: Array[String]) { val conf = new SparkConf().setAppName("example") val spark = new SparkContext(conf) val set = spark.textFile(args(0)).flatMap { line => line.split(" ").map { opt => (opt.split(":")(0).toInt, opt.split(":")(1).toFloat) } }.reduceByKey(_+_) val count = set.count() set.collect.map { case (key, value) => value/count}.foreach{println} spark.stop() } }
|
- 样例代码中,set为(Int, Float)的
<key, value>
对的RDD,通过reduceByKey
将相同属性求和,在最后collect回收所有分布式集群上的结果汇总到本地。
执行作业
- 将要执行的代码export为jar包,一下文件名为
example.jar
。
- ssh到localhost(此处使用伪分布式Hadoop集群)。
- 执行如下命令,其中
path_to_example_jar
为jar包所在的位置,MainClass为jar包中要执行的object,args为传入MainClass的参数列表,空格隔开,可变长。
1
| spark-submit --class MainClass --master yarn-cluster path_to_example_jar args...
|
- 执行作业后可在Hadoop监视器的logs中看到输出。
Scala的部分Spark API
原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(https://forec.github.io/2016/07/11/spark-initial/) 、作者信息(Forec)和本声明。