Spark是一款非常流行同时功能又十分强大的实时数据分析工具。在本 Spark序列教程的第一部分,我们已经对Spark进行了介绍,讲解了Spark的历史,详细解释了用于在Spark集群中进行数据分片存储的弹性分布式数据集( RDDs)并对Apache Spark的生态系统进行了介绍。 本教程(第二部分)将对Spark生态系统中占有重要地位的Spark SQL和DataFrame进行介绍,给大家演示Spark、Spark SQL结合Cassandara的使用。如果你还没有学习过本序列教程的第一部分,请点击 Apache Spark介绍(第一部分):实时数据分析进行学习。 Apache Spark是 Hadoop的有效继任者并对其进行了有效补充,它引领了大数据技术的发展趋势。Spark为数据分析运行在大规模分布式系统任务上提供了易于使用的API,它能够比其它形式的数据分析运行得更快,这缘于其大多数的任务都能够在内存中完成。Apache Spark为一个普通的开发人员提供了实时大数据分析能力, Spark SQL便是明证,Spark SQL API不仅易于使用而且功能强大。
Spark SQL使得运行SQL和HiveQL查询十分简单(注意: HiveQL源于Apache Hive,Hive是一个构建在Hadoop平台基础上的数据仓库系统,用于提供大数据分析服务)。Spark SQL 能够轻易地定位相应的表和元数据。Spark SQL 为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用人们熟知的DataFrame API(RDD)。Spark SQL支持多语言编程包括Java、Scala、Python及R,开发人员可以根据自身喜好进行选择。
使用Java 查询数据
String query = "SELECT * FROM table"; ResultSet results = session.execute(query);
DataFrame是Spark SQL的核心,它将数据保存为行构成的集合,行对应列有相应的列名。使用DataFrames可以非常方便地查询数据、给数据绘图及进行数据过滤。
DataFrames也可以用于数据的输入与输出,例如利用Spark SQL中的DataFrames,可以轻易地将下列数据格式加载为表并进行相应的查询操作:
数据一旦被读取,借助于DataFrames便可以很方便地进行数据过滤、列查询、计数、求平均值及将不同数据源的数据进行整合。
如果你正计划通过读取和写数据来进行分析,Spark SQL可以轻易地帮你实现并将整个过程自动化。
在后面的例子中,我们将在Python Spark shell中给大家演示如何使用Spark SQL和DataFrames。从GitHub上获取提交的 QBit, the Java Microservices Lib历史数据,然后将其加载到Spark当中,并对数据进行相应的操作,具体步骤如下:
在终端上启动Python Spark shell:
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
从github上获取QBit的提交历史,并保存到名称为test.log的文件中:
抽取提交历史并保存为log文件
git log > test.log
由于此次使用的是Python,我们先通过textFile方法将test.log加载为RDD,然后在该RDD上执行一些操作:
textFile = sc.textFile("../qbit/test.log")
执行完上面这条语句,可以得到一个textFile RDD,该RDD由文本行组成的分区数据构成,先来统计一个RDD中的文本行数:
textFile.count() 5776
代码执行完,得到的行数为5776。然后我们先行中带有commit关键字的行筛选出来:
linesWithCommit = textFile.filter(lambda line: "commit" in line)
通过前面的操作足以说明通过Python 使用RDD 的简便性。
为后面演示DataFrame的使用,先让github的历史记录文件抽取保存为JSON类型并将文件命名为sparktest.json:
将github上的提交历史保存为JSON
git log --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae","date":"%ad","message":"%f"}' > sparktest.json
在正式进行Spark SQL操作之前,先得创建sqlContext,它可以通过SparkContext进行创建:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
在shell命令行中,sqlContext 同SparkContext 一样都是自动创建的,无需自己手动去创建,SparkContext以SC变量名的形式存在,sqlContext则以sqlContext 变量名的形式存在。
接下来,将JSON数据加载为Spark的DataFrame,变量命名为dataframe:
将JSON数据加载成DataFrame ,变量命名为dataframe
dataframe = sqlContext.load("../qbit/sparktest.json", "json")
加载数据时,只需调用sqlContext 的load()方法,方法中传入的参数为文件目录和文件类型。Spark会为dataframe解析所有的列及对应名称,为确保所有的工作都已按预期执行,可以打印出dataframe的模式(Schema):
打印dataframe的模式(Schema)
dataframe.printSchema() root |-- author: string (nullable = true) |-- author_email: string (nullable = true) |-- commit: string (nullable = true) |-- date: string (nullable = true) |-- message: string (nullable = true)
上面这个带根(root)的图展示了各行对应的列名及其对应类型。本例中的每行表示的是Gihub上 QBit Microservices Lib项目对应的一次提交。所有的准备工作完成后,便可以在数据上进行相应的操作。
例如,我们可以获取文件的第一条提交记录,该提交记录表示的是github的最近一次提交。
获取最近的提交记录用以分析
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
0
我们可以查询所有列中的某一列并显示其内容,例如,只查询 QBit Microservices Lib项目的作者(author)列并显示最近的20个源码贡献者,默认情况下Spark会返回最近的20条记录。
采用Spark SQL进行分析—查询author列并返回最近的20条记录
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
1
当然,也可以设置show()函数的参数以返回需要的记录行数,这里只返回最近5个为 QBit Microservices Lib项目贡献过源码作者:
查询作者列并返回最近的5个贡献过源码的作者
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
2
我们可以再好好想想,这里使用的是一些相对非结构化的数据,在这个案例中,我们抓取项目的git提交日志后,可以马上执行相应的查询。现在我们想象一下,如果要在成千上万的项目上执行同样的操作,所有这些项目构成的可能是一个大公司git库,另外经常需要对所有的数据进行分析,而不只是对其中一个项目数据进行分析的话,便可以使用Spark集群处理大量的非结构化数据。此时你便可以看到Spark作为一个实时数据分析平台的处理能力,它具有简单易用、可扩展且处理能力强的特点。
查询Date列并显示最近的20条提交日期记录:
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
3
通过dataframe获取 QBit Microservices Lib已提交次数,计算dataframe的行数:
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
4
914便是提交次数,该提交次数也可以从Github上看到。
我们也使用DataFrame的 filter函数进行提交次数统计,例如可以统计有多少提交是由Richard Hightower或Geoffrey Chandler完成的。
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
5
Richard Hightower的提交次数是708。
cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ '_/ /__ / .__//_,_/_/ /_//_/ version 1.5.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext.
6
本网页所有文字内容由 imapbox邮箱云存储,邮箱网盘, iurlBox网页地址收藏管理器 下载并得到。
ImapBox 邮箱网盘 工具地址: https://www.imapbox.com/download/ImapBox.5.5.1_Build20141205_CHS_Bit32.exe
PC6下载站地址:PC6下载站分流下载
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox 网页视频 工具地址: https://www.imapbox.com/download/ImovieBox.5.1.6_Build20151120_CHS_Bit32.exe
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算