发布时间:2022-08-09 文章分类:编程知识 投稿人:王小丽 字号: 默认 | | 超大 打印

初识spark-基本概念和例子

spark是一个开源的分布式计算系统,提供快速的数据分析功能。 官网地址http://www.spark-project.org/ 据说性能高出hadoop很多(个人理解主要是因为两点:内存和cache),而且相对更加简单,灵活。非常适合需要反复迭代的计算,比如机器学习。

spark基于scala编写,对我而言也是门陌生的语言,至今还是有很多不理解的地方。

基本概念

RDD

spark最大的亮点是提出RDD(Resilient Distributed Dataset)的概念,也就是可伸缩的分布式数据集合,本身只读,可恢复。spark本身不做物理储存,通过保存足够的信息去实际的储存中计算出RDD

RDD只要通过四种途径获取:

1、从共享的文件系统,比如HDFS
2、在驱动程序里的并行scala集合(例如数组),会发到多个节点上
3、从已存在的RDD转换
4、通过改变现有的RDD持久性。rdd是一个懒散,短暂的。
改变一个RDD的持久化通过两个动作:
cache:在第一次计算的时候保存在内存中,可以重用
save:保存到一个分布式文件系统,比如hdfs,这个保存版本会用于未来的操作
缓存形式只是一个提示。
如果集群中没有足够的内存去缓存所有的并行数据集合,spark将在使用它们的时候重新计算,选择这个方式工作(性能有所下降),如果节点发生故障或者数据集合太大,这个想法是一种松散的虚拟内存。

并行操作

RDD可以执行做个并行操作
reduce:通过相关函数合并数据集合,产生结果
collect: 发送所有元素的数据集合到驱动程序。例如,一个简单的方法去并行更新一个并行中的数组
foreach: 通过用户提供的函数去遍历所有元素,可能仅仅是一个不重要的功能
spark目前不支持在mapreduce中的grouped reduce,

共享变量

程序员通过函数去调用map,filter,reduce
当一个函数被传递到一个spark操作,执行在一个远程集群节点上,节点的工作使用的是独立副本。这些变量被复制到所有机器上。
一般情况下,共享变量的读写支持跨任务将是低效的。然而,spark提供两个共享变量的有限类型:广播变量和蓄电池。
广播变量
广播变量允许程序员保持一个只读变量到每台机器上,而不是运送它的一个副本和任务。spark使用高效的广播算法去分配广播变量,以降低通信成本。
广播变量被创建后,它应该在集群中的任何函数中替代值V, v不能再节点中传输超过一次。广播后值V不能被修改,以确保所有节点具有相同过的广播值。
当一个创建广播变量b的值v,v是一个共享文件系统保存到一个文件。b是这个文件路径的序列化形式。当B在工人节点上查询,spark首先检查V是否在本地缓存,并从文件系统读取。 最初使用hdfs做广播变量,但是正在开发更有效的流媒体广播系统。
蓄电池
蓄电池是唯一的价值是:”通过关联操作,因此可以有效地支持并行的变量。它们可以被用来实现计数器(在MapReduce的)或者sum。spark支持原生的int,double
调用SparkContext.accumulator(v),初始化值v。在集群中做 += 操作,但是我们不能读值,只能通过驱动程序去读值用于
在工人节点上,创建一个单独的副本加器作为每个运行任务的线程的线程本地变量,从0开始。
每个任务运行后,工人发送信息到驱动程序,包含每个蓄电池的更新。驱动程序适用于每个操作的每个分区仅更新一次,以防止doublecounting任务时重新执行因失败
lineage
数据集的出处信息

Interpreter Integration
1、编译输出class文件到共享文件系统,集群中的工人通过java class loader加载它们。
2、为了每一行能够直接引用单例对象,我们改变了生成代码
而不是通过静态方法getInstance

例子

给出一些实时统计日志数据例子,例子都是本地模式计算(集群模式需要Mesos),仅供参考,实现上而言非常简单

统计日志中出现多少次hbase读取:

日志的格式每行第5位是标识字段,第6位是响应时间,第7位是类型字段

val spark = new SparkContext(“local”,”test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(line => line.contains(“hbase_time”))
println(lines.count());
统计读取hbase的平均响应时间:
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val times = lines.map(dd => dd.split(” “)(6).toInt).reduce(_ + _)
println(“times:” + times/lines.count())
统计hbase的请求类型:
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val ones = lines.map(word => (word.split(” “)(7), 1)).reduceByKey(_ + _)
ones.foreach(println)
参考:
http://www.spark-project.org/
Spark: Cluster Computing with Working Sets. Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. USENIX HotCloud 2010. June 2010.
4
Scala, spark, 分布式, 实时

发表评论

点击这里取消回复。

Submitting your comment, please wait...
#
验证图片

*

  • 标签

    cache Cassandra Concurrent Django Dynamo event google Hadoop HBase http io ipad java JBoss jvm linux lucene MapReduce memcached mongodb mysql nio nosql python redis RFS Scala Siege spark SSD TokyoCabine TokyoTyrant Voldemort 分布式 多线程 实时 实时计算 招聘 架构 测试 海量数据 消息队列 源码分析 爬虫 高性能
  • 分类目录

    • Apple (3)
    • Java (11)
    • Linux (4)
    • Nosql (39)
    • other (1)
    • 互联网 (7)
    • 分布式 (15)
    • 实时计算 (8)
    • 抄袭工厂 (19)
    • 搜索引擎 (3)
    • 数据库 (7)
    • 架构 (8)
    • 海量数据 (20)
    • 算法 (11)
  • 近期文章

    • Tenzing A SQL Implemention On The MapReduce Framework(译)
    • Spark范例:K-means算法
    • Spark范例:SortByKey
    • Spark范例:统计CSDN不同邮箱的密码白痴指数
    • 初识spark-基本概念和例子
  • 近期评论

    • wangbo 发表在《对redis数据持久化的一些想法》
    • mcbill 发表在《redis的hash/key lookup的实现》
    • yiihsia 发表在《Spark范例:K-means算法》
    • » redis资料 小熊 发表在《对redis数据持久化的一些想法》
    • 舒の随想日记 » Redis容灾策略 发表在《对redis数据持久化的一些想法》
  • Views

  • 热门文章

val spark = new SparkContext(“local”,”test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(line => line.contains(“hbase_time”))
println(lines.count());
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val times = lines.map(dd => dd.split(” “)(6).toInt).reduce(_ + _)
println(“times:” + times/lines.count())
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val ones = lines.map(word => (word.split(” “)(7), 1)).reduceByKey(_ + _)
ones.foreach(println)