Spark实践笔记4:用Spark-shell做交互式分析

首先在MASTER机器上启动hadoop及spark;
把位于${SPARK_HOME}下的README.md文件put到HDFS/data目录下;
启动spark-shell,启动spark-shell时需要指定MASTER机器,否则执行后续命令会报错:
${SPARK_HOME}/bin/spark-shell —master spark://ubuntu1:7077
scala> val textFile = sc.textFile(“/data/README.md”)
textFile.count()
textFile.first()
能看到,程序正常执行,而且执行第二个命令first的速度比第一个命令count要快很多。
通过Spark-shell的web能看到任务信息。

个人理解:val定义了一个RDD(弹性分布式数据集)textFile。RDD拥有自己的一些actions,如count、first、take、countByKey、reduce等,可以直接调用。RDD还有一种操作类型叫 transformation ,个人理解为转换,是从一个RDD生成另一个RDD,如filter、map等,如:

scala> val linesWithSpark = textFile.filter(line => line.contains(“Spark”))
scala> linesWithSpark.count()
map-reduce的spark实现:
如何创建一个RDD,RDD的创建方式除了刚才除了使用sc.textFile从HDFS装载完,还可以直接使用关键词parallelize从数组创建
一维数组:scala> val a = sc.parallelize(1 to 9, 3),3是指定分区数
二维数组:scala> val a = sc.parallelize(list((1,2),(3,4),(5,6)),3),需注意scala里数组用小括号

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
a.collect() / collect的作用是打印出对应的RDD /
val b = a.map( x => x2 ) / 使用map这个transformation生成了RDD b。是把a的每个元素2 /
b.collect()
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止
b.reduce((x,y)=>if(x>y) x else y)/ 取得最大值 /以上整个map-reduce的过程可以合成一句:
a.map(x=>x*2).reduce((x,y)=>if(x>y) x else y)很简洁,很精彩

在函数体中还可以直接使用java的类,降低开发量,比如以上的规则可以导入Math类实现
import java.lang.Math
a.map(x=>x*2).reduce((x,y)=>Math.max(x,y))
其他map-reduce的变体:
val wordCounts = textFile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
flatMap的作用是一转多,在这个场景里他把按行的RDD拆成了按词的RDD,再接map把词做成key,数量为value,再使用reduceByKey方法,按key去做了合计。执行可以得到词频数

缓存:Spark允许通过RDD.caching实现显式缓存
一个完整的Spark Application
预先准备:需要安装打包工具sbt;下载地址。安装完照理配置一下环境变量。需要额外注意的是sbt第一次运行时会下载很多额外的依赖包。所以最好把依赖包下载地址改成国内的,详见。真是无语,sbt编译成功第一个SparkApp竟然下载了1392个依赖。花了快1个小时!!!!

构建目录:SimpleApp/src/main/scala,其中SimpleApp是SparkApp的名字。
编写源码SimpleApp.scala并放到SimpleApp/src/main/scala下
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = “/data/README.md”
val conf = new SparkConf().setAppName(“Simple Application”)
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile,2).cache()
val numAs = logData.filter(line => line.contains(“a”)).count()
val numBs = logData.filter(line => line.contains(“b”)).count()
println(“Lines with a: %s ,Lines with b: %s “.format(numAs,numBs))
}
}
编写sbt配置文件并放到SimpleApp下
name := “Simple Project”
version := “1.0”
scalaVersion :=”2.11.6”
libraryDependencies += “org.apache.spark” %% “spark-core” % “1.3.1”
编译并打包,在SimpleApp目录下执行sbt package。同样的,由于是第一次执行,sbt会下载N个依赖包。最终在target目录下生成了jar包
运行看看。spark-submit —class “SimpleApp” —master spark://ubuntu1:7077 simple-project_2.11-1.0.jar。可以看见输出了期望的结果。
SimpleApp是object name,simple-project_2.11-1.0.jar是jar包得名字,此处应该写对路径。master是执行的主机。

END