PySpark在DataFrame上的处理方式与Pandas的处理方法大致是类似的,笔者认为初学PySpark可以直接从用户文档中的pyspark.sql模块下的各个模块与方法开始看,一方面这块与Pandas的函数用法有很多相同的地方,另一方面这块有很多例子可以参考,相比于其他模块要形象得多,也可以辅助理解其他模块的用法。 如下图所示,pyspark.sql下的主要内容在Module Context与pyspark.sql.functions模块中,前者主要是如何创建数据表以及数据表相关操作,后者则是一些主要应用于数据列的函数(其实这部分只要搞清楚udf和pandas_udf就不用管其他的了,其他一大堆函数都是可以简单复现的,虽然在效率上可能不如内置的函数)👇 这几天窝在宿舍里把pyspark.sql模块的用户文档过了一遍,感觉是Python的既然已经有Pandas,为什么还要吃力不讨好的学个语法相近但是复杂不少的PySpark,大不了用Pandas做好再转成PySpark的数据格式不也完事了。目前笔者的认知是就pyspark.sql而言基本上与Pandas的功能相近,但是PySpark数据存储与处理上使用了分布式存储与并行式运算,相对于Pandas在数据处理效率与数据处理规模上是更加卓越的;此外PySpark可以处理更多类型的源数据文件(如Parquet格式的文件),更有利于与数据库对接,相对于Pandas的“小打小闹”肯定是现在有较大数据管理需求的企业更为青睐的方法。 笔者因为在看用户文档的过程中记录的点比较杂乱,在Notepad跟Notebook里都记了一些,为了方便记忆,本文将主要基于pyspark.sql模块中对DataFrame的处理,依据与Pandas中数据处理的区别为线索展开,提供一些常见报错的解决方案,大致做一个入门性质的整理工作。 使用PySpark做数据处理前,总是需要设置两个作为入口性质数据处理器对象:spark与sqlContext 👇 注意上面的变量spark与sc的创建顺序不可颠倒,否则会报错,原因是规定不允许通识运行两个SparkContext对象,在变量spark的创建使用的是getOrCreate方法,如果先创建变量spark就即默认Create,过程会创建SparkContext对象,再创建变量sc就会报错。反过来则后创建变量spark就即默认Get,即使用sc作为全局变量的配置,就不会报错了。 事实上如果同时创建两个sc变量,也会发生同样的错误,除非在两行之间插入sc1.stop()的语句来消除👇 以下为利用spark与sqlContext 生成DataFrame的一些例子,可能相对来说Example 6使用Pandas作为中介是相对容易的👇 输出结果如下所示👇 特别地,PySpark与Pandas在DataFrame数据格式上的转换如下所示,如果觉得PySpark麻烦,那就在Pandas中处理好再转过来就是了,虽然有点蠢[汗]👇 得到了用于数据处理的表,以下将DataFrame中的常见方法做一个汇总👇 这里主要可以发现筛选,去重、分类汇总等大部分方法跟Pandas的用法都是完全相同的,笔者就几个与Pandas中有显著区别的点做些注释: 这里举了一个比较蠢的例子,假设DataFrame类型的变量df2中只有字段TRANSLEADTIME,我们进行一个简单的多列运算(若TRANSLEADTIME为0,则生成新的一列为字符串类型的数据:“test 0 0”)如上所示👆,当然你可以写得更复杂,更有意义一些,重要的是因为需要使用JDK编译,与Java函数一样,需要用修饰符@来标注映射函数的返回值的数据类型;特别地,如上面代码中的注释部分所述因为笔者的Python, PySpark, JDK版本分别为3.6.1,2.4.5,9.0.4,似乎是在版本上有所不兼容,如果使用pandas_udf则会报错,但是udf仍然是可行的,根据用户文档所述,udf在简单的算术运算符构造的映射函数下是具有较高的效率,但如果映射函数复杂,pandas_udf更加适用。 特别地,无论是DataFrame还是自定义函数UDF,都可以在数据库中注册成数据表和函数,通过SQL语句进行调用(使用sqlContext.sql(“SELECT * FROM DF”)或spark.sql(“SELECT * FROM DF”),前提是DF已经被注册为表,返回结果仍为DataFrame)。以注册函数为例👇 输出结果如下所示👇 最后在DataFrame遍历上也与PySpark与Pandas也存在很大区别,具体如下所示👇 其中df_spark.collect()返回值类型为list,当中每个元素为pyspark.sql.Row类的对象。不过似乎也可以用df.toLocalIterator()得到一个生成器来遍历。 作为这部分的结束,笔者将列数据pyspark.sql.Column下的一些方法做一个汇总,基本与Pandas较为类似; 这个部分有很多有意义的函数,主要是对pyspark.sql.Column对象进行的函数操作,虽然只要能熟练运用pandas_udf与udf就可以了,但是这些自定义函数的编译是相当耗时间的(因为要转为Java的编译文件),能用一些现成的函数总归是方便的,很多方法依然与Pandas类似,比较常用的是lit函数用于生成一个单一值的Column,可以用于向DataFrame添加新的默认值的列,此外collect_set与collect_list基本上对应了Pandas中的df.col.unique()与df.col.tolist(),总之PySpark的数据由于存储结构的问题在处理上不如Pandas便捷(也可能是因为使用经验少,不够熟练),总之还是熟能生巧吧。 另外PySpark在ml与mllib模块中提供了大量的统计分析与机器学习的接口,不过笔者可能认为会scipy,sklearn差不多也够了,这里举一个SVM的简单使用示例,现在感觉sklearn很长时间不用也不知道怎么用了。 溜了,感觉PySpark还是需要在实践中才能学好的,大致入个门也就差不多,近期准备做个有趣点的爬虫,如果能做好的话感觉可以快乐一阵子。 学习,共同进步!序言
DataFrame创建及常规处理方法
from pyspark import SparkContext from pyspark.sql import SparkSession, SQLContext sc = SparkContext("local","$mytest") spark = SparkSession.builder.master("local").appName("$mytest").getOrCreate() sqlContext = SQLContext(sc)
sc1 = SparkContext("local","$mytest1") sc2 = SparkContext("local","$mytest2") # ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=$mytest1, master=local)
# CreateDataFrame方法示例 # Example 1 从tuple列表中生成DataFrame print("========= Example 1 ==========") l = [('Alice', 1)] print(spark.createDataFrame(l).collect()) print(spark.createDataFrame(l,['name','age']).collect()) # Example 2 从dict列表中生成DataFrame(最熟悉的) print("========= Example 2 ==========") d = [{'name': 'Alice', 'age': 1}] print(spark.createDataFrame(d).collect()) # Example 3 从rdd生成DataFrame print("========= Example 3 ==========") rdd = sc.parallelize(l) # 利用包含多个tuple列表 print(spark.createDataFrame(rdd).collect()) df = spark.createDataFrame(rdd,['name', 'age']) print(df.collect()) # Example 4 利用Row对象确定字段名 print("========= Example 4 ==========") from pyspark.sql import Row Person = Row('name', 'age') person = rdd.map(lambda r: Person(*r)) df2 = sqlContext.createDataFrame(person) print(df2.collect()) # Example 5 结构体/SQL字典式生成dataFrame print("========= Example 5 ==========") from pyspark.sql.types import * schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) df3 = sqlContext.createDataFrame(rdd, schema) print(df3.collect()) # Example 6 利用pandas的结果导入 print("========= Example 6 ==========") print(sqlContext.createDataFrame(df.toPandas()).collect()) print(sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()) # Example 7 print("========= Example 7 ==========") print(sqlContext.createDataFrame(rdd, "a: string, b: int").collect()) # 确定 rdd = rdd.map(lambda row: row[1]) print(sqlContext.createDataFrame(rdd, "int").collect()) # 取int的结果输出 #print(sqlContext.createDataFrame(rdd, "boolean").collect()) # 会报错: 因为没有布尔型的结果
========= Example 1 ========== [Row(_1='Alice', _2=1)] [Row(name='Alice', age=1)] ========= Example 2 ========== E:Pythonlibsite-packagespyspark-2.4.5-py3.6.eggpysparksqlsession.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead warnings.warn("inferring schema from dict is deprecated," [Row(age=1, name='Alice')] ========= Example 3 ========== [Row(_1='Alice', _2=1)] [Row(name='Alice', age=1)] ========= Example 4 ========== [Row(name='Alice', age=1)] ========= Example 5 ========== [Row(name='Alice', age=1)] ========= Example 6 ========== [Row(name='Alice', age=1)] [Row(0=1, 1=2)] ========= Example 7 ========== [Row(a='Alice', b=1)] [Row(value=1)]
df_pandas = df_spark.toPandas() df_spark = sqlContext.createDataFrame(df_pandas)
1. dataframe常规操作 - people = spark.read.parquet("...") # 读入一张表 - ageCol = people.age # 取列 - 经典的多表查询 + people = spark.read.parquet("...") + department = spark.read.parquet("...") + people.filter(people.age > 30). join(department, people.deptId == department.id). # 表的连接 groupBy(department.name, "gender"). # group方法 agg({"salary": "avg", "age": "max"}) # agg方法 - 求mean max min 等 + df.agg({"age": "max"}).collect() # [Row(max(age)=5)] + from pyspark.sql import functions as F df.agg(F.min(df.age)).collect() # [Row(min(age)=2)] - alias(alias)方法: 表重命名(表内连接) + from pyspark.sql.functions import * + df_as1 = df.alias("df_as1") + df_as2 = df.alias("df_as2") + joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') # 表的连接(内连接) + joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() - df.collect()方法是返回df中每个ROW的列表 - df.columns属性 同df.columns - df.count() 行数 - df.corr(列名1,列名2,method="pearson") 计算相关系数 - df.cov(列名1,列名2) 计算协方差 - df.createGlobalTempView("people") # 创建全局视图people - df.createOrReplaceGlobalTempView("people") # 创建/修正视图 - df.createTempView(name)[source] # 创建临时视图 - df.createOrReplaceTempView("people") # 同上 - df.filter(df.id=="3") 筛选 完全可以写成df[df.id==3] - df.join() - df.select(*cols) # 筛选出多列 - df.crossJoin(df2) # 全连接(一般用不到) - df.describe(*cols) - df.distinct.count() 不重复计数 - df.drop("age")或df.drop(df.age) 删除一列 - df.dropDuplicates()或df.dropDuplicates(["col1","col2"]) - df.drop_duplicates 同上 - df.dropna(how='any', thresh=None, subset=None)[source] - df.dtypes 类型 - df.exceptAll(df1) 保留在df中但不在df2中的字段(保留重复) - df.explain(extended=False) - df.fillna(value,subset=None) + df4.na.fill({'age': 50, 'name': 'unknown'}).show() - df.first()第一行 - df.foreach(f) 同df.rdd.foreach() 将f应用到每一行(如f=lambda x: x.name) - df.foreachPartition(f) # def f(people): for p in people: print(person.name) - df.freqItems(["col1","col2"]) 频繁项集 - df.groupBy(*cols) # 这个结果跟pandas的用法有点区别, 可以看下面GroupedData类的情况 - df.groupby(*cols) - df.head(10) - df.intersect(df1) # 去df与df2的交集 SQL的INTERSECT - df.intersectAll(other) # 同上保留重复 - df.limit(1) 类似head - df.orderBy(*cols,**kwargs) + df.sort(df.age.desc()) + df.sort("age", ascending=False) + df.orderBy(df.age.desc()) + df.sort(asc("age")) + df.orderBy(desc("age"), "name") + df.orderBy(["age", "name"], ascending=[0, 1]) # 常用类似pandas - df.rdd 转为pyspark.RDD of Row - df.registerTempTable(name) 同上view - df.replace(to_replace,value=<no value>,subset=None) - df.rollup(*cols) - df = spark.range(10) df.sample(withReplacement=True, fraction=0.5, seed=3) - df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) - df.schema - df.selectExpr(*expr) + df.selectExpr("age * 2", "abs(age)") - df.show(n=20,truncate=True,vertical=False) - df.sort() 参数用法同orderBy - df.subtract(df1) df - df1 集合减法 - df.union(df1) / df.unionAll(df1) / df.unionByName(df1) 集合并 - df.summary() - df.take() 同head limit - df.toDF(*cols) 转为新的df - df.toJSON(use_unicode=True) 转为新的JSON - df.toLocalIterator() 转为新的生成器 - df.toPandas() 转pandas的df - df.where() 等价于filter - df.withColumn(colName,col) + 如df.withColumn('age2', df.age + 2) 生成新的列age2 - df.withColumnRenamed("age","age2") 重命名列 - df.withWatermark(eventTime, delayThreshold)
from pyspark.sql.functions import udf, PandasUDFType df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v")) @udf("id long, v double", PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std()) df.groupby("id").apply(normalize).show() # 输出结果: 对于每个id对应的一些数据行,将它们的v字段进行归一标准化(均0方1)
from pyspark.sql.functions import col, pandas_udf, udf from pyspark.sql.types import LongType, StringType @udf('string') # 这里的udf可以改写成pandas_udf, 同样可以出结果, 但是会发现之后res无法collect,show, 报错为Py4J Error: collectToPython 之类的, 初步查明是jdk python pyspark之间版本不兼容的问题, 但是改写成udf相对可行 def multiply_func(a,b): return "test {} {}".format(a,b) res = df2.select(multiply_func(col("TRANSLEADTIME"),col("TRANSLEADTIME")))
# udf.register方法示例 # Example 1 当f为python函数 print("========= Example 1 ==========") # 1.1 strlen = spark.udf.register("stringLengthString", lambda x: len(x)) print(spark.sql("SELECT stringLengthString('test')").collect()) print(spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()) # 1.2 _ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) print(spark.sql("SELECT stringLengthInt('test')").collect()) # 1.3 _ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) print(spark.sql("SELECT stringLengthInt('test')").collect()) # Example 2 当f为自定义函数 print("========= Example 2 ==========") ## 2.1 slen = udf(lambda s: len(s), IntegerType()) _ = spark.udf.register("slen", slen) print(spark.sql("SELECT slen('test')").collect()) ## 2.2 random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() new_random_udf = spark.udf.register("random_udf", random_udf) print(spark.sql("SELECT random_udf()").collect()) ## 2.3 pandas_udf示例 pandas_udf("integer", PandasUDFType.SCALAR) def add_one(x): return x + 1 _ = spark.udf.register("add_one", add_one) print(spark.sql("SELECT add_one(id) FROM range(3)").collect()) ## 2.4 pandas_udf("integer", PandasUDFType.GROUPED_AGG) def sum_udf(v): return v.sum() _ = spark.udf.register("sum_udf", sum_udf) q = "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2" #print(spark.sql(q).collect()) # 会报错: 应该会输出[Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]
========= Example 1 ========== [Row(stringLengthString(test)='4')] [Row(stringLengthString(text)='3')] [Row(stringLengthInt(test)=4)] [Row(stringLengthInt(test)=4)] ========= Example 2 ========== [Row(slen(test)=4)] [Row(random_udf()=44)] [Row(add_one(id)='1'), Row(add_one(id)='2'), Row(add_one(id)='3')]
# pandas遍历dataframe for i in range(df_pandas.shape[0]): print(df_pandas.loc[i,"colName"]) # pyspark遍历dataframe for row in df_spark.collect(): print(row.colName)
5. 数据列pyspark.sql.Column用法 - df.colName 或者 df["colName"] - df.colName + 1 或者 1/df.colName - 下面用col代表df.colName: - col.asc() 升序 用于排序 + df.orderBy(df.name.asc()) + 派生: asc_nulls_first() asc_nulls_last() desc() desc_nulls_first() desc_nulls_last() - col.astype(dataType) 等价于cast(可以是StringType(), 也可以写成"string") - col.between(lowerBound, upperBound) 用于筛选 - col.bitwiseAND(col1) 做AND运算 - col.bitwiseOR(col1) - col.bitwiseXOR(col1) - col.contains(other) 字符串型的列包含某个子串的的 + 派生: endswith(other) - isNotNull() isNull() 是否为空值 可用于filter - isin(列表) 用法完全与 - df.name.substr(1,3) 字符串切片 - 条件语句when/otherwise df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
非常重要的pyspark.sql.functions模块
6. pyspark.sql.functions 模块下的诸多方法: 虽然用udf基本上可以解决大部分的问题, 但一些内置函数总归可以有好处的 - 这些函数都有一个通用的方法!!!!!!!!!!!!!!!!! + df.select(FUNCTION(...).alias("result")).collect() - abs(col) - approx_count_distinct(col) <==> 这个貌似跟理解的也不太一样 + df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect() - array(*cols) 返回列表(跟collect()的效果差不多) - array_contains(col,value) 返回包含某个value的列单元(String) - array_distinct(col) 好像跟我理解的不太一样 - array_except(col1,col2) - array_intersect(col1,col2) - lit(value) + df.withColumn('new_col', lit(1)) 创建新的一列默认值全是1的数据 - asc asc_null_first 同上面的df.col的方法 - ascii(col) - atan(col) - atan2(col1,col2) - concat(*cols) # 非列名 - col(colName) 选定一个列(见apply的用法) - collect_list(colName) - collect_set(colName) + df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',)) + df2.agg(collect_list('age')).collect() # [Row(collect_list(age)=[2, 5, 5])] + df2.agg(collect_set('age')).collect() # [Row(collect_set(age)=[5, 2])] - avg(col) - base64(col) - bin(col) 转为二进制 - count(col) - date_sub(df.date,1) 前一天 date_add(df.date,1) 后一天 - date_format('date','MM/dd/yyy') 将日期格式的转为字符串 - date_diff(df.d1,df.d2) # 两个日期的差距 - dayofmonth(col) - dayofweek(col) - dayofyear(col) - weekofyear(col) - decode(col) - expr(str) # 很重要的一个函数 + df.select(expr("length(name)")).collect() - factorial(col) - flatten(col) 如果col中每个元素是一个多维列表, 化为一维的列表 - format_string('%d %s', df.a, df.b)) - least/greatest(df.a,df.b,df.c) # 取每行abc字段最小/大的那个 - year month hour minute second等时间的函数 - isnan(col) 与 isnull(col) 缺失与空值 - instr(df.s,substring="ly") - length(col) 字符串型的 - levenshtein(df.a,df.b) - locate(substr="b",df.s,pos=1) # 确定子串的位置 - log(col) - lower(col) - max min mean m - df.groupby("id").agg(mean_udf(df['v'])).show() - df.groupby("id").apply(normalize).show() 分组后分别标准归一化 - round(col,scale=0) 默认保留0位小数4 - substring(col,pos,len) 取子串 - to_date(col,format) 完全与pandas.to_date相同 - trim(col) 去除空格 - udf(f=None,returnType=StringType) + random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
# -*- coding: UTF-8 -*- import pandas from pyspark import SparkContext from pyspark.sql import SparkSession, SQLContext from pyspark.sql import Row from pyspark.ml.classification import LinearSVC, LinearSVCModel from pyspark.ml.linalg import Vectors sc = SparkContext("local","$mytest") spark = SparkSession.builder.master("local").appName("$mytest").getOrCreate() sqlContext = SQLContext(sc) df = sc.parallelize([ Row(label=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), Row(label=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() svm = LinearSVC(maxIter=5,regParam=0.01) model = svm.fit(df) print(model.coefficients) # DenseVector([0.0, -0.2792, -0.1833]) print(model.intercept) # 1.0206118982229047 print(model.numClasses) # 2 print(model.numFeatures) # 3 test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, -1.0, -1.0))]).toDF() result = model.transform(test0).head() print(result.prediction) # 1.0 print(result.rawPrediction) # DenseVector([-1.4831, 1.4831]) svm_path = "svm" svm.save(svm_path) svm2 = LinearSVC.load(svm_path) print(svm2.getMaxIter()) # 5 model_path = "svm_model" model.save(model_path) model2 = LinearSVCModel.load(model_path) print(model.coefficients[0] == model2.coefficients[0]) # True print(model.intercept == model2.intercept) # True
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算