pyspark创建DataFrame怎么实现,如何配置DataFrame和临时表
Admin 2022-07-21 群英技术资讯 1041 次浏览
本篇内容介绍了“pyspark创建DataFrame怎么实现,如何配置DataFrame和临时表”的有关知识,在实际项目的操作过程或是学习过程中,不少人都会遇到这样的问题,接下来就让小编带大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。
在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。
这里简单看一下RDD和DataFrame的类型。
print(type(rdd)) # <class 'pyspark.rdd.RDD'> print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'>
翻阅了一下源码的定义,可以看到他们之间并没有继承关系。
class RDD(object):
"""
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be
operated on in parallel.
"""
class DataFrame(object):
"""A distributed collection of data grouped into named columns.
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
and can be created using various functions in :class:`SparkSession`::
...
"""
RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。
但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。

实际工作中,我们用的更多的还是DataFrame。
尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。
a = [('Alice', 1)]
output = spark.createDataFrame(a).collect()
print(output)
# [Row(_1='Alice', _2=1)]
output = spark.createDataFrame(a, ['name', 'age']).collect()
print(output)
# [Row(name='Alice', age=1)]
这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。
spark.createDataFrame(a).show() # +-----+---+ # | _1| _2| # +-----+---+ # |Alice| 1| # +-----+---+ spark.createDataFrame(a, ['name', 'age']).show() # +-----+---+ # | name|age| # +-----+---+ # |Alice| 1| # +-----+---+
d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)
# [Row(age=1, name='Alice')]
a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)
# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]
from pyspark.sql import Row
a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)
# [Row(name='Alice', age=1)]
from pyspark.sql.types import *
a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)
# [Row(name='Alice', age=1)]
df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。
df = spark.createDataFrame(rdd, ['name', 'age']) print(df) # DataFrame[name: string, age: bigint] print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'> # 传入pandas DataFrame output = spark.createDataFrame(df.toPandas()).collect() print(output) # [Row(name='Alice', age=1)]
output = spark.range(1, 7, 2).collect() print(output) # [Row(id=1), Row(id=3), Row(id=5)] output = spark.range(3).collect() print(output) # [Row(id=0), Row(id=1), Row(id=2)]
通过临时表得到DataFrame
spark.registerDataFrameAsTable(df, "table1")
df2 = spark.table("table1")
b = df.collect() == df2.collect()
print(b)
# True
在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。
a = [('Alice', 1)]
rdd = sc.parallelize(a)
# 指定类型于预期数据对应时,正常创建
output = spark.createDataFrame(rdd, "a: string, b: int").collect()
print(output) # [Row(a='Alice', b=1)]
rdd = rdd.map(lambda row: row[1])
print(rdd) # PythonRDD[7] at RDD at PythonRDD.scala:53
# 只有int类型对应上,过滤掉其他列。
output = spark.createDataFrame(rdd, "int").collect()
print(output) # [Row(value=1)]
# 没有列能对应上,会抛出错误。
output = spark.createDataFrame(rdd, "boolean").collect()
# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>
spark.registerDataFrameAsTable(df, "table1")
spark.dropTempTable("table1")
print(spark.getConf("spark.sql.shuffle.partitions")) # 200
print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 10
print(spark.setConf("spark.sql.shuffle.partitions", u"50")) # None
print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 50
spark.registerFunction("stringLengthString", lambda x: len(x))
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)='4')]
spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)=4)]
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthInt('test')").collect()
print(output)
# [Row(stringLengthInt(test)=4)]
可以查看所有临时表名称和对象。
spark.registerDataFrameAsTable(df, "table1")
print(spark.tableNames()) # ['table1']
print(spark.tables()) # DataFrame[database: string, tableName: string, isTemporary: boolean]
print("table1" in spark.tableNames()) # True
print("table1" in spark.tableNames("default")) # True
spark.registerDataFrameAsTable(df, "table1")
df2 = spark.tables()
df2.filter("tableName = 'table1'").first()
print(df2) # DataFrame[database: string, tableName: string, isTemporary: boolean]
前提是需要下载jar包。
Mysql-connector-java.jar
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
"useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
"useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
Python内置函数-reversed函数。reversed 函数返回一个反转的迭代器。
Radiobutton组件用于实现多选一的问题,本文主要介绍了Tkinter组件实现Radiobutton的示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
Pandas是Python语言的一个扩展程序库,提供高性能、易于使用的数据结构和数据分析工具,下面这篇文章主要给大家介绍了关于如何使用pandas对超大csv文件进行快速拆分的相关资料,需要的朋友可以参考下
要求给定指定的行、列以及对应的工作表作为参数,能够正确解析合并单元格,获取指定单元格的值。如果直接根据行列获取对应单元格的值,则合并单元格非左上角的其他单元格都会获取到None值。
这篇文章主要为大家介绍了Python内建属性getattribute拦截器使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008