Spark集群.pdf
《Spark集群.pdf》由会员分享,可在线阅读,更多相关《Spark集群.pdf(21页珍藏版)》请在得力文库 - 分享文档赚钱的网站上搜索。
1、 1.Spark 集群+流水线+RDD 1.1.独立集群方式 1.1.1.独立集群方式 集群搭建分了三种方式:1)独立集群方式 StandAlone 2)和 Yarn 进行整合,由 Yarn 来完成资源的管理和调度,和 hadoop 无缝集成 3)和 Apache Mesos 进行整合,由 Mesos 进行管理和调度,轻松 3 万个节点 1.1.2.修改每台的配置文件 vi conf/spark-env.sh 设置:SPARK_LOCAL_IP=192.168.1.106#注意每台配置的是本机 IP 地址 还可以配置 CPU 和内存,根据实际情况进行配置,不配置时,默认读取当前服务器的实际情况
2、。SPARK_WORKER_CORES、SPARK_WORKER_MEMORY。1.1.3.修改每台的 hosts 文件 vi/etc/hosts 192.168.1.106 spark1 192.168.1.108 spark2 192.168.1.105 spark3 192.168.1.107 spark4 192.168.1.103 hadoophdfs 配置集群中其它主机的 IP 地址和机器名,每台的服务器的 hosts 都要进行修改 1.1.4.启动 Master 任意选择一台服务器作为 Master,其它就都作为 worker ssh 192.168.1.106 cd/usr/l
3、ocal/src/spark/spark-1.5.2-bin-hadoop2.6 sbin/start-master.sh h 192.168.1.106#启动 master sbin/stop-master.sh#停止 master 注意:可以是 IP 地址也可以是主机名 启动后显示日志文件路径。要学会看错误日志。这样当出现错误,可以知道问题出在哪里了。注意:1)akka 访问地址 work 连接的地址 2)WebUI 访问地址,可以通过浏览器查看集群状态(单机是没有这个管理界面的)1.1.5.每台服务器防火墙打开端口 akka 访问端口:7077 和 WebUI 控制台 Web 服务端口
4、8080 service iptables stop#简单测试时,直接关掉防火墙 web 管理界面 http:/192.168.1.106:8080/1.1.6.启动 Worker 在各个 work 上启动各自的 worker sbin/start-slave.sh spark:/192.168.1.106:7077#指向 master 的访问地址#sbin/start-slave.sh spark:/192.168.1.106:7077 h spark1 指定 IP 地址或者主机名访问,连接 HDFS 按主机名会刚好些,IP 可能出错。sbin/stop-slave.sh#停止服务 有时第一
5、次连接时不成功,重新连接次就好了。同样也会有日志输出,刚开始你不熟悉时,最好看一下日志,看下启动时有没有错误,如果你配置有错,或者连接地址有错,或者防火墙没开,它都有可能连接不到 master。所以必须先看下日志,养成一个好习惯。tail f 日志文件路径 可以看到它也会启动一个 Web 管理界面。看到“successful regiestered”搞定。刷新 Master,可以看到有 worker 就管理起来了。State 状态 ALIVE 就表示活着,将来集群很多 worker,master 只会把任务分配给 ALIVE 活着的。如果 DEAN,就不会被分配。Cores 使用 CPU 的个
6、数 Memory 是使用内存的大小 1.1.7.修改配置自动刷新 sbin/stop-slave.sh#关闭服务 增加 CPU 的核数 4,增加内存到 2G sbin/start-slave.sh#启动服务 刷新控制台,可以看到 CPU 和内存都加大了。可以看到旧的已死,新的已启动。启动较慢,稍等几秒。1.1.8.客户端如何连接到集群中进行计算 bin/spark-shell-master=spark:/192.168.1.106:7077#master 的 IP 地址 可以看到管理界面上出现一个 RunningApplication,正在执行的应用。SparkShell就是一个特殊的应用程序
7、,它可在集群环境中运行。1.1.9.运行任务 val rdd=sc.parallelize(List(1,2,3,4,5,6),2)rdd.collect 可以看到进行了两个分区,上面刚好有两个 worker 是存活的。1.1.10.查看执行结果,必须点“Spark shell”进入 执行的 collect 操作 Job 一次完整的计算称为一个 job Submitted 提交任务的时间 Duration 耗时:这个 2s 是很慢的,因为我们在虚拟机中。Stages 执行的阶段,现在的任务比较简单,执行很快,所以就一个阶段,复杂任务会有多个。Tasks 任务:成功执行的任务/总共的任务数。1.
8、1.11.常见错误 failed to bind 如果配置了 SPARK_LOCAL_IP,但是并没有在 slaves 上修改为自己的 IP,则会报错:ERROR netty.NettyTransport:failed to bind to/192.168.6.52:0 .BindException:Failed to bind to:/192.168.1.105:0:Service sparkWorker failed 1.2.*流水线 pipeline 1.2.1.流水线 pipeline Spark 为何比 Hadoop 快?1.2.2.懒惰式命令 一开始学习时容易产生困扰的地方 执行
9、rdd.map,或者 println 都没有发现有动作,刷新 job 也看不到,实际上就没有 立即执行或者说它的行为是懒惰式的。因为它并没有构成一个完整的任务。直到 spark 执行另一类方法时,才会触发计算。例如:rdd.count 或者 rdd.collect。这些方法才会触发计算,才把它看做是一次完整的计算。可以看出 map 不是一次完整的计算。1.2.3.Spark 中把方法分为了两类 一类 transformations 不会触发 job 的提交(没有立即计算),例如:map、filter、flatMap、reduceByKey 等。一类 actions 会触发 job 的提交,例如
10、:collect、count 等。Spark 在执行真正任务之前,都会做一些优化:会计算 rdd 之间的依赖关系,然后根据依赖关系划分阶段,完成优化。1.2.4.*集群实例 1)创建一个 letter.txt 1,a 2,b 3,c 4,d 5,e 3,f 2,g 1,h 注意它的环境,现在是在集群环境中。假如 letter.txt 在 spartk2 的服务器上,那spark3 的服务器呢,肯定没有这个文件就会报错。如何解决呢?这是分布式计算时,需要注意的问题。要解决这个问题 sc.textFile()所要读取的文件必须在每个服务器上都存在。这也是初学者容易犯的错误。2)准备:创建工具类,工
11、具类会触发 job import org.apache.spark.rdd.RDD import scala.reflect.ClassTag object su def debugT:ClassTag(rdd:RDDT)=rdd.mapPartitionsWithIndex(i:Int,iter:IteratorT)=val m=scala.collection.mutable.MapInt,ListT()var list=ListT()while(iter.hasNext)list=list:+iter.next m(i)=list m.iterator ).collect().forea
12、ch(x:Tuple2Int,ListT)=val i=x._1 println(spartition:$i)x._2.foreach println )3)sc.textFile(/root/letter.txt)它也是不会立即执行,等 job 触发时才执行 scala sc.textFile(/root/letter.txt)res1:org.apache.spark.rdd.RDDString=MapPartitionsRDD1 at textFile at:16 scala su.debug(res1)partition:0 1,a 2,b 3,c 4,d 5,e partition:
13、1 3,f 2,g 1,h 说明它把这个文件,前 5 行分到一个分区,后 3 行分到一个分区。4)res1.map(x=val a=x.split(,);(a(0),a(1)将原始的字符串转换为 tuple 类型 partition:0 (1,a)(2,b)(3,c)(4,d)(5,e)partition:1(3,f)(2,g)(1,h)5)res7.map(x=(x._1,x._2+_s)partition:0 (1,a_s)(2,b_s)(3,c_s)(4,d_s)(5,e_s)partition:1(3,f_s)(2,g_s)(1,h_s)1.2.5.*依赖关系 前面的步骤都没有触发jo
14、b,它们这几步都在创建依赖关系,每一步都在生成新的RDD。那如何看这个关系呢?找到最后一个 RDD,它的 toDebugString scala res12.toDebugString res16:String=(2)MapPartitionsRDD6 at map at:22|MapPartitionsRDD4 at map at:20|MapPartitionsRDD1 at textFile at:16|/root/letter.txt HadoopRDD0 at textFile at:16 最后生成的 RDD 叫做 HadoopRDD 对象,它既可以读 Hadoop 文件也可以读普通
15、文件。这是 spark 为我们优化的第一步,它就是 transformations 类型,每一步都会生成新的 RDD 对象,每一个 RDD 不是独立的,它必须依赖上一个 RDD。或者称为父 RDD 对象。它们之间都有父子关系。spark 就会应用一种叫流水线的技术,它特别的节省内存,并且也让性能得到提升。scala res1.partitions.length res19:Int=2 scala res7.partitions.length res20:Int=2 scala res12.partitions.length res21:Int=2 每个 RDD 有两个分区。分区个数由父 RDD
16、 决定。上面的过程只是绘制了一个美好的蓝图,并未执行,那什么时候会真正执行运算呢?那就是执行一个 actions 类型命令时才会运算。res12.collect 触发一次完整的计算,根据分区个数生成任务,根据 2 个分区生成了两个任务(Task)。spark 就把它封装成任务对象,底层通过 akka 框架发给每个 worker。Task 对象中就包括了所有的 RDD 信息。Task 最初在 Driver 上(SparkShell)。真正执行不是在 SparkShell,而是在 worker 上执行。怎么发?就是 akka 框架,先把它序列化为二进制的,再把它发给 worker,worker 收
17、到二进制的,反序列化,就是 Task。RDD 都在 Task 里面,它们之间的关系就知道了。具体计算是怎么计算的呢?它采用的是倒推的方式进行最后的计算。它先看最后一个RDD,res12 去执行时,遇到变量 x,它就去读 res7,res7 遇到变量 x,就去读取 res1,res1 遇到 textFile 就按事先分区好的去读取前 5 行。(文件总共 8 行,前 5 行分给第一个分区,后 3 行分给第二个分区。)每次读一行,因为 res7 的 x,每次只要一个。1.2.6.总结优缺点 不会一次读很大量的数据,不会造成内存溢出 中间结果流动方式无需暂存,内存消耗小 命令批量提交,性能高 Spar
18、k 首先采用了迭代思想,会不会用很大的内存?不会,数据是一条一条处理的。它边读边进行运算。它就不需要中间暂存结果。传统的 Hadoop 为什么慢,它每算一步,中间的结果都要暂存在 HDFS 上。为什么说 spark 效率好呢?就上面的例子,中间的结果用不用暂存?中间的处理是以流动的方式,一个 RDD 传到下一个 RDD,就不需要暂存了。因此它能够充分的利用内存。这是它效果高的因素之一。这种方式就称为流水线(pipeline)。从这也看出来,spark 为何要把一些命令设置为 transformation 方式,先不执行,最后批量一次执行。你要立即执行,就得保存中间结果,供下一次执行使用。缺点:
19、每个分区的数据在一个 worker 上运行,万一有一个 worker 出现了故障,会导致出错的 worker 还需要重新计算,因为没有中间结果,一旦 worker 运算失败,spark 需要找一个正常的 worker 来重新计算,而且从头算,这样就拉长了总的计算时间。当然这个缺点可以配合一些缓存,例如一些中间结果非常重要,可以再加上缓存。以后出现故障不用重新算,从这些“保存点”上重新计算。有些类似有些的中间存盘。后面会详细讲到,加缓存也是 spark 很重要的一块,也是比 hadoop 性能高的原因之一。它的缓存是在内存,比缓存到 HDFS 上快的多。1.3.和 HDFS 结合 1.3.1.和
20、 HDFS 结合 上面的例子有个问题,就是需要处理的问题,必须保证每个 worker 中都有一个,那这种方式及其不方便,而且处理之前难以保证每个服务器都有。如何解决?可以使用 HDFS 分布式文件系统。1.3.2.准备:伪分布 Hadoop 单机 服务访问地址:http:/192.168.1.103:50070/检查 HDFS 环境是否正常 jps#查看 jvm 进程状态 18406 ResourceManager 18253 SecondaryNameNode 18856 Jps 18085 DataNode 18508 NodeManager 16364 NameNode 必须有上面 6
21、个进程才正常 cd/usr/local/src/hadoop/hadoop-2.7.1 bin/hdfs dfs mkdir/test#创建 test 目录,注意必须前面加斜杠/bin/hdfs dfs ls/#浏览目录 上传 letter.txt 到/usr/local/src/hadoop/hadoop-2.7.1 目录 步骤:1、将测试文件放入到 HDFS 系统中 bin/hdfs dfs put letter.txt/test#letter.txt 放入 HDFS 的 test 目录下。2、在 master 的 spark 配置文件中配置(可以不配置)Spark 和 Hadoop 在一
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Spark 集群
限制150内