Spark 快速入門

2021-04-16 17:28 更新

設(shè)置Spark

在本機(jī)設(shè)置和運(yùn)行Spark非常簡單。你只需要下載一個預(yù)構(gòu)建的包,只要你安裝了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上運(yùn)行Spark。確保java程序在PATH環(huán)境變量中,或者設(shè)置了JAVA_HOME環(huán)境變量。類似的,python也要在PATH中。

假設(shè)你已經(jīng)安裝了Java和Python:

  1. 訪問Spark下載頁
  2. 選擇Spark最新發(fā)布版(本文寫作時是1.2.0),一個預(yù)構(gòu)建的Hadoop 2.4包,直接下載。

現(xiàn)在,如何繼續(xù)依賴于你的操作系統(tǒng),靠你自己去探索了。Windows用戶可以在評論區(qū)對如何設(shè)置的提示進(jìn)行評論。

一般,我的建議是按照下面的步驟(在POSIX操作系統(tǒng)上):

1.解壓Spark

~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

2.將解壓目錄移動到有效應(yīng)用程序目錄中(如Windows上的

~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

3.創(chuàng)建指向該Spark版本的符號鏈接到<spark目錄。這樣你可以簡單地下載新/舊版本的Spark,然后修改鏈接來管理Spark版本,而不用更改路徑或環(huán)境變量。

~$ ln -s /srv/spark-1.2.0 /srv/spark

4.修改BASH配置,將Spark添加到PATH中,設(shè)置SPARK_HOME環(huán)境變量。這些小技巧在命令行上會幫到你。在Ubuntu上,只要編輯~/.bash_profile或~/.profile文件,將以下語句添加到文件中:

export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

5.source這些配置(或者重啟終端)之后,你就可以在本地運(yùn)行一個pyspark解釋器。執(zhí)行pyspark命令,你會看到以下結(jié)果:

~$ pyspark
Python 2.7.8 (default, Dec  2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/
 
Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
SparkContext available as sc.
>>>

現(xiàn)在Spark已經(jīng)安裝完畢,可以在本機(jī)以”單機(jī)模式“(standalone mode)使用。你可以在本機(jī)開發(fā)應(yīng)用并提交Spark作業(yè),這些作業(yè)將以多進(jìn)程/多線程模式運(yùn)行的,或者,配置該機(jī)器作為一個集群的客戶端(不推薦這樣做,因為在Spark作業(yè)中,驅(qū)動程序(driver)是個很重要的角色,并且應(yīng)該與集群的其他部分處于相同網(wǎng)絡(luò))??赡艹碎_發(fā),你在本機(jī)使用Spark做得最多的就是利用spark-ec2腳本來配置Amazon云上的一個EC2 Spark集群了。

簡略Spark輸出

Spark(和PySpark)的執(zhí)行可以特別詳細(xì),很多INFO日志消息都會打印到屏幕。開發(fā)過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設(shè)置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”擴(kuò)展名。

~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

編輯新文件,用WARN替換代碼中出現(xiàn)的INFO。你的log4j.properties文件類似:

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

現(xiàn)在運(yùn)行PySpark,輸出消息將會更簡略!感謝@genomegeek在一次District Data Labs的研討會中指出這一點(diǎn)。

在Spark中使用IPython Notebook

當(dāng)搜索有用的Spark小技巧時,我發(fā)現(xiàn)了一些文章提到在PySpark中配置IPython notebook。IPython notebook對數(shù)據(jù)科學(xué)家來說是個交互地呈現(xiàn)科學(xué)和理論工作的必備工具,它集成了文本和Python代碼。對很多數(shù)據(jù)科學(xué)家,IPython notebook是他們的Python入門,并且使用非常廣泛,所以我想值得在本文中提及。

這里的大部分說明都來改編自IPython notebook: 在PySpark中設(shè)置IPython。但是,我們將聚焦在本機(jī)以單機(jī)模式將IPtyon shell連接到PySpark,而不是在EC2集群。如果你想在一個集群上使用PySpark/IPython,查看并評論下文的說明吧!

  1. 1.為Spark創(chuàng)建一個iPython notebook配置
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

記住配置文件的位置,替換下文各步驟相應(yīng)的路徑:

2.創(chuàng)建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代碼:

import os
import sys
 
# Configure the environment
if 'SPARK_HOME' not in os.environ:
     os.environ['SPARK_HOME'] = '/srv/spark'
 
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
 
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

3.使用我們剛剛創(chuàng)建的配置來啟動IPython notebook。

~$ ipython notebook --profile spark

4.在notebook中,你應(yīng)該能看到我們剛剛創(chuàng)建的變量。

print SPARK_HOME

5.在IPython notebook最上面,確保你添加了Spark context。

from pyspark import  SparkContext
sc = SparkContext( 'local', 'pyspark')

6.使用IPython做個簡單的計算來測試Spark context。

def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
    return False
# 2 is the only even prime number
if n == 2:
    return True
# all other even numbers are not primes
if not n & 1:
    return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
    if n % x == 0:
        return False
return True
 
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
 
# Compute the number of primes in the RDD
print nums.filter(isprime).count()

編輯提示:上面配置了一個使用PySpark直接調(diào)用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接啟動一個notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

哪個方法好用取決于你使用PySpark和IPython的具體情景。前一個允許你更容易地使用IPython notebook連接到一個集群,因此是我喜歡的方法。

在EC2上使用Spark

在講授使用Hadoop進(jìn)行分布式計算時,我發(fā)現(xiàn)很多可以通過在本地偽分布式節(jié)點(diǎn)(pseudo-distributed node)或以單節(jié)點(diǎn)模式(single-node mode)講授。但是為了了解真正發(fā)生了什么,就需要一個集群。當(dāng)數(shù)據(jù)變得龐大,這些書面講授的技能和真實(shí)計算需求間經(jīng)常出現(xiàn)隔膜。如果你肯在學(xué)習(xí)詳細(xì)使用Spark上花錢,我建議你設(shè)置一個快速Spark集群做做實(shí)驗。 包含5個slave(和1個master)每周大概使用10小時的集群每月大概需要$45.18。

完整的討論可以在Spark文檔中找到:在EC2上運(yùn)行Spark在你決定購買EC2集群前一定要通讀這篇文檔!我列出了一些關(guān)鍵點(diǎn):

  1. 通過AWS Console獲取AWS EC2 key對(訪問key和密鑰key)。
  2. 將key對導(dǎo)出到你的環(huán)境中。在shell中敲出以下命令,或者將它們添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey

注意不同的工具使用不同的環(huán)境名稱,確保你用的是Spark腳本所使用的名稱。

3.啟動集群:

~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>

4.SSH到集群來運(yùn)行Spark作業(yè)。

ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

5.銷毀集群

ec2$ ./spark-ec2 destroy &lt;cluster-name&gt;.

這些腳本會自動創(chuàng)建一個本地的HDFS集群來添加數(shù)據(jù),copy-dir命令可以同步代碼和數(shù)據(jù)到該集群。但是你最好使用S3來存儲數(shù)據(jù),創(chuàng)建使用s3://URI來加載數(shù)據(jù)的RDDs。

Spark是什么?

既然設(shè)置好了Spark,現(xiàn)在我們討論下Spark是什么。Spark是個通用的集群計算框架,通過將大量數(shù)據(jù)集計算任務(wù)分配到多臺計算機(jī)上,提供高效內(nèi)存計算。如果你熟悉Hadoop,那么你知道分布式計算框架要解決兩個問題:如何分發(fā)數(shù)據(jù)和如何分發(fā)計算。Hadoop使用HDFS來解決分布式數(shù)據(jù)問題,MapReduce計算范式提供有效的分布式計算。類似的,Spark擁有多種語言的函數(shù)式編程API,提供了除map和reduce之外更多的運(yùn)算符,這些操作是通過一個稱作彈性分布式數(shù)據(jù)集(resilient distributed datasets, RDDs)的分布式數(shù)據(jù)框架進(jìn)行的。

本質(zhì)上,RDD是種編程抽象,代表可以跨機(jī)器進(jìn)行分割的只讀對象集合。RDD可以從一個繼承結(jié)構(gòu)(lineage)重建(因此可以容錯),通過并行操作訪問,可以讀寫HDFS或S3這樣的分布式存儲,更重要的是,可以緩存到worker節(jié)點(diǎn)的內(nèi)存中進(jìn)行立即重用。由于RDD可以被緩存在內(nèi)存中,Spark對迭代應(yīng)用特別有效,因為這些應(yīng)用中,數(shù)據(jù)是在整個算法運(yùn)算過程中都可以被重用。大多數(shù)機(jī)器學(xué)習(xí)和最優(yōu)化算法都是迭代的,使得Spark對數(shù)據(jù)科學(xué)來說是個非常有效的工具。另外,由于Spark非常快,可以通過類似Python REPL的命令行提示符交互式訪問。

Spark庫本身包含很多應(yīng)用元素,這些元素可以用到大部分大數(shù)據(jù)應(yīng)用中,其中包括對大數(shù)據(jù)進(jìn)行類似SQL查詢的支持,機(jī)器學(xué)習(xí)和圖算法,甚至對實(shí)時流數(shù)據(jù)的支持。

核心組件如下:

  • Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構(gòu)建在RDD和Spark Core之上的。
  • Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進(jìn)行交互的API。每個數(shù)據(jù)庫表被當(dāng)做一個RDD,Spark SQL查詢被轉(zhuǎn)換為Spark操作。對熟悉Hive和HiveQL的人,Spark可以拿來就用。
  • Spark Streaming:允許對實(shí)時數(shù)據(jù)流進(jìn)行處理和控制。很多實(shí)時數(shù)據(jù)庫(如Apache Store)可以處理實(shí)時數(shù)據(jù)。Spark Streaming允許程序能夠像普通RDD一樣處理實(shí)時數(shù)據(jù)。
  • MLlib:一個常用機(jī)器學(xué)習(xí)算法庫,算法被實(shí)現(xiàn)為對RDD的Spark操作。這個庫包含可擴(kuò)展的學(xué)習(xí)算法,比如分類、回歸等需要對大量數(shù)據(jù)集進(jìn)行迭代的操作。之前可選的大數(shù)據(jù)機(jī)器學(xué)習(xí)庫Mahout,將會轉(zhuǎn)到Spark,并在未來實(shí)現(xiàn)。
  • GraphX:控制圖、并行圖操作和計算的一組算法和工具的集合。GraphX擴(kuò)展了RDD API,包含控制圖、創(chuàng)建子圖、訪問路徑上所有頂點(diǎn)的操作。

由于這些組件滿足了很多大數(shù)據(jù)需求,也滿足了很多數(shù)據(jù)科學(xué)任務(wù)的算法和計算上的需要,Spark快速流行起來。不僅如此,Spark也提供了使用Scala、Java和Python編寫的API;滿足了不同團(tuán)體的需求,允許更多數(shù)據(jù)科學(xué)家簡便地采用Spark作為他們的大數(shù)據(jù)解決方案。

對Spark編程

編寫Spark應(yīng)用與之前實(shí)現(xiàn)在Hadoop上的其他數(shù)據(jù)流語言類似。代碼寫入一個惰性求值的驅(qū)動程序(driver program)中,通過一個動作(action),驅(qū)動代碼被分發(fā)到集群上,由各個RDD分區(qū)上的worker來執(zhí)行。然后結(jié)果會被發(fā)送回驅(qū)動程序進(jìn)行聚合或編譯。本質(zhì)上,驅(qū)動程序創(chuàng)建一個或多個RDD,調(diào)用操作來轉(zhuǎn)換RDD,然后調(diào)用動作處理被轉(zhuǎn)換后的RDD。

這些步驟大體如下:

  1. 定義一個或多個RDD,可以通過獲取存儲在磁盤上的數(shù)據(jù)(HDFS,Cassandra,HBase,Local Disk),并行化內(nèi)存中的某些集合,轉(zhuǎn)換(transform)一個已存在的RDD,或者,緩存或保存。
  2. 通過傳遞一個閉包(函數(shù))給RDD上的每個元素來調(diào)用RDD上的操作。Spark提供了除了Map和Reduce的80多種高級操作。
  3. 使用結(jié)果RDD的動作(action)(如count、collect、save等)。動作將會啟動集群上的計算。

當(dāng)Spark在一個worker上運(yùn)行閉包時,閉包中用到的所有變量都會被拷貝到節(jié)點(diǎn)上,但是由閉包的局部作用域來維護(hù)。Spark提供了兩種類型的共享變量,這些變量可以按照限定的方式被所有worker訪問。廣播變量會被分發(fā)給所有worker,但是是只讀的。累加器這種變量,worker可以使用關(guān)聯(lián)操作來“加”,通常用作計數(shù)器。

Spark應(yīng)用本質(zhì)上通過轉(zhuǎn)換和動作來控制RDD。后續(xù)文章將會深入討論,但是理解了這個就足以執(zhí)行下面的例子了。

Spark的執(zhí)行

簡略描述下Spark的執(zhí)行。本質(zhì)上,Spark應(yīng)用作為獨(dú)立的進(jìn)程運(yùn)行,由驅(qū)動程序中的SparkContext協(xié)調(diào)。這個context將會連接到一些集群管理者(如YARN),這些管理者分配系統(tǒng)資源。集群上的每個worker由執(zhí)行者(executor)管理,執(zhí)行者反過來由SparkContext管理。執(zhí)行者管理計算、存儲,還有每臺機(jī)器上的緩存。

重點(diǎn)要記住的是應(yīng)用代碼由驅(qū)動程序發(fā)送給執(zhí)行者,執(zhí)行者指定context和要運(yùn)行的任務(wù)。執(zhí)行者與驅(qū)動程序通信進(jìn)行數(shù)據(jù)分享或者交互。驅(qū)動程序是Spark作業(yè)的主要參與者,因此需要與集群處于相同的網(wǎng)絡(luò)。這與Hadoop代碼不同,Hadoop中你可以在任意位置提交作業(yè)給JobTracker,JobTracker處理集群上的執(zhí)行。

與Spark交互

使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。

~$ pyspark
[… snip …]
>>>

PySpark將會自動使用本地Spark配置創(chuàng)建一個SparkContext。你可以通過sc變量來訪問它。我們來創(chuàng)建第一個RDD。

>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

textFile方法將莎士比亞全部作品加載到一個RDD命名文本。如果查看了RDD,你就可以看出它是個MappedRDD,文件路徑是相對于當(dāng)前工作目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。我們轉(zhuǎn)換下這個RDD,來進(jìn)行分布式計算的“hello world”:“字?jǐn)?shù)統(tǒng)計”。

>>> from operator import add
>>> def tokenize(text):
...     return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43

我們首先導(dǎo)入了add操作符,它是個命名函數(shù),可以作為加法的閉包來使用。我們稍后再使用這個函數(shù)。首先我們要做的是把文本拆分為單詞。我們創(chuàng)建了一個tokenize函數(shù),參數(shù)是文本片段,返回根據(jù)空格拆分的單詞列表。然后我們通過給flatMap操作符傳遞tokenize閉包對textRDD進(jìn)行變換創(chuàng)建了一個wordsRDD。你會發(fā)現(xiàn),words是個PythonRDD,但是執(zhí)行本應(yīng)該立即進(jìn)行。顯然,我們還沒有把整個莎士比亞數(shù)據(jù)集拆分為單詞列表。

如果你曾使用MapReduce做過Hadoop版的“字?jǐn)?shù)統(tǒng)計”,你應(yīng)該知道下一步是將每個單詞映射到一個鍵值對,其中鍵是單詞,值是1,然后使用reducer計算每個鍵的1總數(shù)。

首先,我們map一下。

>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
|  shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
|  shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2

我使用了一個匿名函數(shù)(用了Python中的lambda關(guān)鍵字)而不是命名函數(shù)。這行代碼將會把lambda映射到每個單詞。因此,每個x都是一個單詞,每個單詞都會被匿名閉包轉(zhuǎn)換為元組(word, 1)。為了查看轉(zhuǎn)換關(guān)系,我們使用toDebugString方法來查看PipelinedRDD是怎么被轉(zhuǎn)換的??梢允褂胷educeByKey動作進(jìn)行字?jǐn)?shù)統(tǒng)計,然后把統(tǒng)計結(jié)果寫到磁盤。

>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")

一旦我們最終調(diào)用了saveAsTextFile動作,這個分布式作業(yè)就開始執(zhí)行了,在作業(yè)“跨集群地”(或者你本機(jī)的很多進(jìn)程)運(yùn)行時,你應(yīng)該可以看到很多INFO語句。如果退出解釋器,你可以看到當(dāng)前工作目錄下有個“wc”目錄。

$ ls wc/
_SUCCESS   part-00000 part-00001

每個part文件都代表你本機(jī)上的進(jìn)程計算得到的被保持到磁盤上的最終RDD。如果對一個part文件進(jìn)行head命令,你應(yīng)該能看到字?jǐn)?shù)統(tǒng)計元組。

$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(u'kinghenryviii@7731', 1)
(u'othello@36737', 1)
(u'loveslabourslost@51678', 1)
(u'1kinghenryiv@54228', 1)
(u'troilusandcressida@83747', 1)
(u'fleeces', 1)
(u'midsummersnightsdream@71681', 1)

注意這些鍵沒有像Hadoop一樣被排序(因為Hadoop中Map和Reduce任務(wù)中有個必要的打亂和排序階段)。但是,能保證每個單詞在所有文件中只出現(xiàn)一次,因為你使用了reduceByKey操作符。你還可以使用sort操作符確保在寫入到磁盤之前所有的鍵都被排過序。

編寫一個Spark應(yīng)用

編寫Spark應(yīng)用與通過交互式控制臺使用Spark類似。API是相同的。首先,你需要訪問<SparkContext,它已經(jīng)由<pyspark自動加載好了。

使用Spark編寫Spark應(yīng)用的一個基本模板如下:

## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    pass
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

這個模板列出了一個Spark應(yīng)用所需的東西:導(dǎo)入Python庫,模塊常量,用于調(diào)試和Spark UI的可識別的應(yīng)用名稱,還有作為驅(qū)動程序運(yùn)行的一些主要分析方法學(xué)。在ifmain中,我們創(chuàng)建了SparkContext,使用了配置好的context執(zhí)行main。我們可以簡單地導(dǎo)入驅(qū)動代碼到pyspark而不用執(zhí)行。注意這里Spark配置通過setMaster方法被硬編碼到SparkConf,一般你應(yīng)該允許這個值通過命令行來設(shè)置,所以你能看到這行做了占位符注釋。

使用<sc.stop()或<sys.exit(0)來關(guān)閉或退出程序。


## Spark Application - execute with spark-submit
 
## Imports
import csv
import matplotlib.pyplot as plt
 
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
 
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
 
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
 
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
 
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
 
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
 
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
 
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
 
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
 
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
 
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
 
## Main functionality
def main(sc):
 
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
 
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
 
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
 
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
 
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
 
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
 
    # Show a bar chart of the delays
    plot(delays)
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

使用<spark-submit命令來運(yùn)行這段代碼(假設(shè)你已有ontime目錄,目錄中有兩個CSV文件):

~$ spark-submit app.py

這個Spark作業(yè)使用本機(jī)作為master,并搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結(jié)果顯示,4月的總延誤時間(單位分鐘),既有早點(diǎn)的(如果你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,我們在app.py中使用matplotlib直接將結(jié)果可視化出來了:

這段代碼做了什么呢?我們特別注意下與Spark最直接相關(guān)的main函數(shù)。首先,我們加載CSV文件到RDD,然后把split函數(shù)映射給它。split函數(shù)使用csv模塊解析文本的每一行,并返回代表每行的元組。最后,我們將collect動作傳給RDD,這個動作把數(shù)據(jù)以Python列表的形式從RDD傳回驅(qū)動程序。本例中,airlines.csv是個小型的跳轉(zhuǎn)表(jump table),可以將航空公司代碼與全名對應(yīng)起來。我們將轉(zhuǎn)移表存儲為Python字典,然后使用sc.broadcast廣播給集群上的每個節(jié)點(diǎn)。

接著,main函數(shù)加載了數(shù)據(jù)量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數(shù)映射給CSV行,此函數(shù)會把日期和時間轉(zhuǎn)成Python的日期和時間,并對浮點(diǎn)數(shù)進(jìn)行合適的類型轉(zhuǎn)換。每行作為一個NamedTuple保存,名為Flight,以便高效簡便地使用。

有了Flight對象的RDD,我們映射一個匿名函數(shù),這個函數(shù)將RDD轉(zhuǎn)換為一些列的鍵值對,其中鍵是航空公司的名字,值是到達(dá)和出發(fā)的延誤時間總和。使用reduceByKey動作和add操作符可以得到每個航空公司的延誤時間總和,然后RDD被傳遞給驅(qū)動程序(數(shù)據(jù)中航空公司的數(shù)目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制臺,并且使用matplotlib進(jìn)行了可視化。

這個例子稍長,但是希望能演示出集群和驅(qū)動程序之間的相互作用(發(fā)送數(shù)據(jù)進(jìn)行分析,結(jié)果取回給驅(qū)動程序),以及Python代碼在Spark應(yīng)用中的角色。

結(jié)論

盡管算不上一個完整的Spark入門,我們希望你能更好地了解Spark是什么,如何使用進(jìn)行快速、內(nèi)存分布式計算。至少,你應(yīng)該能將Spark運(yùn)行起來,并開始在本機(jī)或Amazon EC2上探索數(shù)據(jù)。你應(yīng)該可以配置好iPython notebook來運(yùn)行Spark。

Spark不能解決分布式存儲問題(通常Spark從HDFS中獲取數(shù)據(jù)),但是它為分布式計算提供了豐富的函數(shù)式編程API。這個框架建立在伸縮分布式數(shù)據(jù)集(RDD)之上。RDD是種編程抽象,代表被分區(qū)的對象集合,允許進(jìn)行分布式操作。RDD有容錯能力(可伸縮的部分),更重要的時,可以存儲到節(jié)點(diǎn)上的worker內(nèi)存里進(jìn)行立即重用。內(nèi)存存儲提供了快速和簡單表示的迭代算法,以及實(shí)時交互分析。

由于Spark庫提供了Python、Scale、Java編寫的API,以及內(nèi)建的機(jī)器學(xué)習(xí)、流數(shù)據(jù)、圖算法、類SQL查詢等模塊;Spark迅速成為當(dāng)今最重要的分布式計算框架之一。與YARN結(jié)合,Spark提供了增量,而不是替代已存在的Hadoop集群,它將成為未來大數(shù)據(jù)重要的一部分,為數(shù)據(jù)科學(xué)探索鋪設(shè)了一條康莊大道。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號