LionKing数据科学专栏

购买普通会员高级会员可以解锁网站精华内容且享受VIP服务的优惠

想要查看更多数据科学相关的内容请关注我们的微信公众号知乎专栏

Spark基本用法

词频统计(word count)实例

本文使用Spark进行文本词频统计(word count),作为Spark基本用法的引入。

假设文本文件存储在input目录(directory)下,分别为上一讲的a.txt, b.txt, c.txt。

input
├── a.txt: To be, or not to be: that is the question.
├── b.txt: What is done cannot be undone.
└── c.txt: Some are born great, some achieve greatness, and some have greatness thrust upon them.
	

终端内运行spark-shell打开交互式(interactive)界面。

首先将数据读入Spark的Dataset结构。注意Spark 2.0以前的版本使用RDD数据结构,使用上差别不大,但是Dataset性能更优。

val input = spark.read.textFile("input")
	

如果是在hadoop文件系统(hdfs)下,格式为"hdfs://localhost:????/???/???/input"

可以查看输入的行数

可以查看输入的第一个或者前几个,在代码开发阶段可以帮助debug。

Spark中可以利用缓存(cache)将结果放入跨集群的缓存中,从而减少重复的数据抓取,提高性能。

input.cache()
	

如前所述,Mapper将每一行分割成一个个单词,因此首先将每行拆为一个个单词:

将每个单词w转换为(w, 1)的键值对。

由于Dataset没有reduceByKey函数,我们首先将数据转换为RDD,然后使用reduceByKey函数关于每个键(key)进行Reduce步骤。

最后将结果以数组的形式返回,一般的预处理是使用filter筛选出一小部分数据从而防止数据过大。

val counts = result.collect()
	
输出:
Array[(String, Int)] = Array((thrust,1), (are,1), (cannot,1), (have,1), (upon,1), (or,1), (great,1), (undone,1), (the,1), (not,1), (is,2), (born,1), (that,1), (be,3), (some,3), (what,1), (question,1), (them,1), (and,1), (to,2), (done,1), (greatness,2), (achieve,1))
	

为了便于复习,最后我们将所有的代码放在一起。

练习题

Spark基本用法练习题

进阶:PageRank实例

我们首先简要复习PageRank算法,更详细的介绍见信息检索和网页搜索(information retrieval and web search)

将互联网的每一个网站看作一个结点(node),如果一个网站有超链接到另一个网站,看作有一条有向的边(edge)从第一个结点到第二个结点。

记结点为$0, 1, 2, \ldots, N - 1$。

PageRank算法通过如下的迭代方式得到每一个结点的分值。

其中$d$为阻尼系数,一般取0.85。

注意,如果某一个结点没有向外的边,看作该结点连向所有的结点,包括自身。为了方便起见,我们这里的例子假设不包含这样的结点。

约定数据的存储形式为csv文件,每一行包含src,dst代表从src指向dst的边。

0,1
1,4
3,1
2,0
4,2
0,3
	

首先设定阻尼系数$d = 0.85$。

val d = 0.85
	

将csv文件转换为(结点, 结点的邻居)。

得到结点的个数并初始化pagerank的字典。

val numNodes = outEdges.count().toDouble
var pagerank = outEdges.map{case (src, dst) => (src, 1.0 / numNodes)}
	

现在对于一个结点,我们在pagerank变量中知道该结点的当前pagerank分值,我们在outEdges变量中知道该结点的邻居信息。通过这两个信息,可以计算从该结点可以发送给各邻居的分值,如果$i \rightarrow j$,且$i$当前的分值为$r$,$i$的出度为$d_{out}$,则$i$发送给$j$的分值为$r / d_{out}$。

我们用一个函数将一个结点的分值发送(emit)给邻居。

object MyFunctions {
  def assignScoreToNeighbors(neighbors: Iterable[String], rank: Double): Iterable[(String, Double)] = {
    val degreeOut = neighbors.size
    neighbors.map(neighbor => (neighbor, rank / degreeOut))
  }
}
	

在每一次迭代中,

  1. 我们首先将pagerank和outEdges的信息结合在一起:

    val joined = outEdges.join(pagerank)
    	  
  2. 每个结点发送分值给所有邻居:

    val scoreFromNeighbor = joined.flatMap{
      case (src, (neighbors, rank)) => MyFunctions.assignScoreToNeighbors(neighbors, rank)
    }
    val totalScoreFromNeighbors = scoreFromNeighbor.reduceByKey((a, b) => a + b)
              
  3. 乘以阻尼系数并加上$\frac{1 - d}{N}$:

    pagerank = totalScoreFromNeighbors.map{
      case (node, score) => (node, score * d + (1 - d) / numNodes)
    }
              

我们进行20次迭代,得到最终的PageRank分值如下:

输出:
Array[(String, Double)] = Array((4,0.22149204396020608), (0,0.21456022471948238), (2,0.2178564173484022), (3,0.12129105051022326), (1,0.22480026346168602))
	

全部的代码如下:

进阶:k-means实例

k-means算法的步骤如下:

更详细的介绍见聚类分析(cluster analysis)页面。

为了方便起见,我们考虑两维的情形,数据使用k-means算法中使用的数据。

每一行的形式为x,y,分别为横纵坐标。

下载csv文件

首先设定$K = 5$。

val K = 5
	

读取文件,并转为$(x, y)$的形式。

val points = spark.read.textFile("k-means-data.csv").map(line => line.split(",")).map(x => (x(0).toDouble, x(1).toDouble)).rdd.cache()
	

随机选取其中的$K$个作为centroid的初始值。zipWithIndex会将结果转为一个数组,每个元素的形式为((x, y), 下标)。

var centroids = points.takeSample(false, K, seed=100).zipWithIndex
	

接下来我们实现一些辅助函数。

接下来需要迭代该算法,每次对于每个数据$(x, y)$,计算距离其最近的中心的下标$i$;对于相同下标$i$的数据,将它们的几何重心更新为新的中心。

var assignment = points.map(point => (MyFunctions.findClosestCentroidIndex(point, centroids), point))
	

将每个$(x, y)$转换为$(i, (x, y))$。

将其关于$i$分组,则$i$的值为所有分到改组的点$(x^1, y^1), (x^2, y^2), \ldots$,计算它们的几何重心即可得到新的centroids。

centroids = assignment.groupByKey().mapValues(MyFunctions.getCentroid).map{case (a, b) => (b, a)}.collect()
        

最终得到的分组如图:

Scala不方便作图,这里首先将结果存为csv文件,再用Python绘图,代码如下。

回到目录

上一节:Scala基本用法

下一节:Spark操作数据库

更多Spark相关问题见本网站论坛Spark练习题Spark版面

更多面试问题见面试真题汇总

想要查看更多数据科学相关的内容请关注我们的微信公众号知乎专栏