这是整理的Spark官网的《QuickStart》教程。在这篇教程里我们会先使用shell初步接触一下spark,然后再编写一个spark应用。我这里会优先使用scala来完成这些工作。如果想使用python或者java请直接移步原文。
shell操作
基础
sprak shell可以让我们快速的熟悉相关的API,同时它也是一个强大的交互式数据分析工具。目前spark shell只支持scala和python两种语言,这里我们只使用scala。在spark根目录下执行如下语句:
1 |
./bin/spark-shell |
spark最重要的抽象是一个分布式数据集合,叫做RDD( Resilient Distributed Dataset,弹性分布式数据集)。可以从hadoop的InputFormats(比如HDFS文件)中创建RDD,也可以转译其他的RDD为新的RDD。现在我们使用spark根目录下的README文件中的文本创建一个新的RDD:
1 2 |
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 |
RDD有两大类数据操作(也有人称之为算子,算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作):分别是action(行为)和transformation(转换)。其中action返回的是值,transformation返回的是指向新的RDD的指针。
我们先从几个action开始:
1 2 3 4 5 |
scala> textFile.count() res1: Long = 99 scala> textFile.first() res2: String = # Apache Spark |
然后我们体验一下transformation,这里我们将会使用filter来返回README.md文件中所有包含“Spark”字样的行作为一个新的RDD。
1 2 |
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26 |
我们也可以将transformation和action连起来用:
1 2 |
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 19 |
关于RDD的更多操作
RDD的action和transformation还可以用来做一些比较复杂的计算。比如说我们想找出单词最多的一行:
1 2 |
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res0: Int = 22 |
前面的map运算将计算出每行单词的个数,创建了一个新的RDD。然后在这个RDD上调用reduce方法来找出单词数最多的一个行。map和reduce的参数都是scala的函数值(闭包),可以使用任何语言特性或者是java或scala的库。举个例子说:我们可以简单调用其他地方声明的函数。下面我们使用Math.max()来让代码更容易理解些:
1 2 |
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 22 |
一个常见的数据流模型是MapReduce,也是因Hadoop而流行起来的。Spark可以很容易地实现MapReduce流:
1 2 |
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:26 |
这里我们结合了flatMap、map和reduceByKey三种transformation来计算文件中每个单词出现的次数,并生成了一个类型为(String, int)对的RDD。要获取RDD中每个单词出现的次数可以使用collect这个action:
1 2 |
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), (Configuration,1), (This,2), (basic,1), (first,1), (learning,,1), ([Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse),1), (documentation,3), (graph,1), (Hive,2), (several,1), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation... |
Caching
Spark也支持从集群范围的内存缓存中获取数据。这在需要重复访问数据时是很有用的,比如需要频繁重复查询一个的小型数据集时,或者进行类似PageRank这样的迭代运算时。下面我们演示下如何缓存并使用之前获取的linesWithSpark数据集:
1 2 3 4 5 6 7 8 9 10 11 |
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at filter at <console>:26 scala> linesWithSpark.cache() res8: linesWithSpark.type = MapPartitionsRDD[9] at filter at <console>:26 scala> linesWithSpark.count() res9: Long = 19 scala> linesWithSpark.count() res10: Long = 19 |
使用Spark来分析和缓存一个只有100行的文本文件看起来有点儿傻。真正重要的是:同样的函数也可以用在非常巨大的数据集上,即使是在由数十个或者在数百个节点的集群上也可以使用。可以按照这里的教程体验一下如何在集群上使用Spark的spark-shell。
自定义应用
这里我们会使用Spark API来编写一个自定义应用。示例程序使用Scala(使用了sbt)编写。这个应用会非常简单,实际上,应用名就叫做SimpleApp.scala:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } } |
注意:这里应该定义一个main()方法,不要使用继承scala.App这样的方式。scala.App的子类可能不会正常工作。
这个程序只是分别统计了Spark README文件中包含单词‘a’和单词‘b’的行的总数。还得记得需要将程序中的YOUR_SPARK_HOME替换为你计算机上Spark程序安装的位置。
不像前面说的那些使用Spark shell的例子,它们使用的是Spark shell的SparkContext实例,我们初始化了一个SparkContext实例作为程序的一部分。
这里我们先定义了一个SparkConf对象,在这个对象里包含我们的应用的一些信息。然后我们将这个SparkConf对象传递给SparkContext构造器。
我们的应用需要依赖Spark API,所以我们还需要添加一个sbt配置文件:simple.sbt。在这文件中添加了Spark依赖:
1 2 3 4 5 6 7 |
name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" |
为了让sbt正确工作,我们需要按照正确的目录结构放置SimpleApp.scala和simple.sbt这两个文件。当一切都安排妥当后,我么可以创建一个包含应用代码的JAR包,然后使用spark-submit脚本来运行我们的程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23 |
参考文档
spark算子的作用:http://www.jianshu.com/p/4ff6afbbafe4
##########
发表评论