上传数据文件到HDFS上:hadoop fs -put /root/person.txt / 创建RDD: 3.接下来就可以使用DataFrame的函数操作 SparkSQL提供了一个领域特定语言(DSL)以方便操作结构化数据 DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL查询,结果将作为一个DataFrame返回
入口-SparkSession
SQLContext是创建DataFrame和执行SQL的入口
HiveContext通过hive sql语句操作hive表数据,兼容hive操作,hiveContext继承自SQLContext。
SparkSession 封装了SqlContext及HiveContext所有功能。通过SparkSession还可以获取到SparkConetxt。
SparkSession可以执行SparkSQL也可以执行HiveSQL.
创建DataFrame
1. 读取文本文件
vim /root/person.txt 1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40
2. 在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割
打开spark-shell/export/servers/spark/bin/spark-shell
val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
val personDF = personRDD.toDF //DataFrame
personDF.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 20| | 2| lisi| 29| | 3| wangwu| 25| | 4| zhaoliu| 30| | 5| tianqi| 35| | 6| kobe| 40| +---+--------+---+ personDF.printSchema
personDF.createOrReplaceTempView("t_person")
spark.sql("select id,name from t_person where id > 3").show
val dataFrame=spark.read.text("hdfs://node01:8020/person.txt") dataFrame.show //注意:直接读取的文本文件没有完整schema信息 dataFrame.printSchema
2. 读取json文件
//使用spark安装包下的json文件 PATH:more/export/servers/spark/examples/src/main/resources/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
//注意:直接读取json文件有schema信息,因为json文件本身含有Schema信息,SparkSQL可以自动解析 jsonDF.show
3. 读取parquet文件
使用spark安装包下的parquet文件more /export/servers/spark/examples/src/main/resources/users.parquet
val parquetDF=spark.read.parquet("file:///export/servers/spark/examples/src/main/resources/users.parquet")
//注意:直接读取parquet文件有schema信息,因为parquet文件中保存了列的信息 parquetDF.show
创建DataSet
1. 通过spark.createDataset创建Dataset
val fileRdd = sc.textFile("hdfs://node01:8020/person.txt") //RDD[String] val ds1 = spark.createDataset(fileRdd) //DataSet[String] ds1.show
2. 通RDD.toDS方法生成DataSet
case class Person(name:String, age:Int) val data = List(Person("zhangsan",20),Person("lisi",30)) //List[Person] val dataRDD = sc.makeRDD(data) val ds2 = dataRDD.toDS //Dataset[Person] ds2.show
3. 通过DataFrame.as[泛型]转化生成DataSet
case class Person(name:String, age:Long) val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json") val jsonDS = jsonDF.as[Person] //DataSet[Person] jsonDS.show
4. DataSet也可以注册成表进行查询
jsonDS.createOrReplaceTempView("t_person") spark.sql("select * from t_person").show
两种查询风格
准备工作
//读取文件并转换为DataFrame或DataSet val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" ")) case class Person(id:Int, name:String, age:Int) val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) val personDF = personRDD.toDF personDF.show //val personDS = personRDD.toDS //personDS.show
DSL风格
personDF.select(personDF.col("name")).show personDF.select(personDF("name")).show personDF.select(col("name")).show personDF.select("name").show
personDF.select("name", "age").show
personDF.select(personDF.col("name"), personDF.col("age") + 1).show personDF.select(personDF("name"), personDF("age") + 1).show personDF.select(col("name"), col("age") + 1).show personDF.select("name","age").show //personDF.select("name", "age"+1).show personDF.select($"name",$"age",$"age"+1).show
personDF.filter(col("age") >= 25).show personDF.filter($"age" >25).show
personDF.filter(col("age")>30).count() personDF.filter($"age" >30).count()
personDF.groupBy("age").count().show
SQL风格
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:personDF.createOrReplaceTempView("t_person") spark.sql("select * from t_person").show
spark.sql("desc t_person").show
spark.sql("select * from t_person order by age desc limit 2").show
spark.sql("select * from t_person where age > 30 ").show
spark.sql("select name, age + 1 from t_person").show spark.sql("select name, age from t_person where age > 25").show spark.sql("select count(age) from t_person where age > 30").show spark.sql("select age, count(age) from t_person group by age").show
总结
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算