博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark大规模数据全排序_使用Spark DataFrames进行大规模数据科学
阅读量:2525 次
发布时间:2019-05-11

本文共 4481 字,大约阅读时间需要 14 分钟。

spark大规模数据全排序

当我们首次开源 ,我们旨在提供一种简单的API,以通用编程语言(Java,Python,Scala)进行分布式数据处理。 通过对分布式数据集合(RDD)进行功能转换,Spark启用了分布式数据处理。 这是一个功能强大的API,以前需要花费数千行代码来表达的任务可以减少到数十个。

随着Spark的不断发展,我们希望使大数据工程师以外的更多受众能够利用分布式处理的功能。 新的DataFrame API就是出于这个目标而创建的。 该API受R和Python(Pandas)中的数据框架的启发,但从头开始设计以支持现代大数据和数据科学应用程序。 作为对现有RDD API的扩展,DataFrames功能:

  • 能够将单个笔记本电脑上的千字节数据扩展到大型集群上的PB数据
  • 支持多种数据格式和存储系统
  • 通过最先进的优化和代码生成
  • 通过Spark与所有大数据工具和基础架构无缝集成
  • 适用于Python,Java,Scala和R的API(正在通过开发)

对于熟悉其他编程语言数据框架的新用户,此API应该使他们感到宾至如归。 对于现有的Spark用户,此扩展的API将使Spark易于编程,同时通过智能优化和代码生成来提高性能。

什么是数据框?

在Spark中,DataFrame是组织为命名列的分布式数据集合。 从概念上讲,它等效于关系数据库中的表或R / Python中的数据框,但是在后台进行了更丰富的优化。 可以从多种来源构造DataFrame,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。

以下示例显示了如何在Python中构造DataFrame。 Scala和Java中提供了类似的API。

# Constructs a DataFrame from the users table in Hive. users = context.table( users )

# from JSON files in S3 logs = context.load( s3n://path/to/data.json , json )

一个人如何使用DataFrames?

构建完成后,DataFrames将提供一种特定于域的语言,用于分布式数据操作。 这是一个使用DataFrames处理大量用户的人口统计数据的示例:

# Create a new DataFrame that contains "young users" only

young = users.filter(users.age < 21)

# Alternatively, using Pandas-like syntax

young = users[users.age < 21]

# Increment everybody's age by 1

young.select(young.name, young.age + 1)

# Count the number of young users by gender

young.groupBy( gender ).count()

# Join young users with another DataFrame called logs

young.join(logs, logs.userId == users.userId, left_outer )

您还可以使用Spark SQL在使用DataFrames时合并SQL。 本示例计算年轻DataFrame中的用户数。

young.registerTempTable( young )

context.sql( SELECT count(*) FROM young )

在Python中,您还可以在Pandas DataFrame和Spark DataFrame之间自由转换:

# Convert Spark DataFrame to Pandas

pandas_df = young.toPandas()

# Create a Spark DataFrame from Pandas spark_df = context.createDataFrame(pandas_df)

与RDD相似,DataFrames的计算是延迟的。 也就是说,仅当需要执行某个动作(例如显示结果,保存输出)时才进行计算。 这样可以通过应用谓词下推和字节码生成之类的技术来优化它们的执行,如稍后在“深入了解:智能优化和代码生成”部分中所述。 所有DataFrame操作也将自动并行化并分布在群集上。

支持的数据格式和来源

现代应用程序通常需要从各种来源收集和分析数据。 开箱即用,DataFrame支持从最流行的格式读取数据,包括JSON文件,Parquet文件和Hive表。 它可以通过JDBC从本地文件系统,分布式文件系统(HDFS),云存储(S3)和外部关系数据库系统中读取。 此外,通过Spark SQL的 ,可以扩展DataFrame以支持任何第三方数据格式或源。 现有的第三方扩展已经包括Avro,CSV,ElasticSearch和Cassandra。

DataFrames对数据源的支持使应用程序可以轻松地组合来自不同源的数据(在数据库系统中称为联合查询处理)。 例如,以下代码段将存储在S3中的站点的文本流量日志与PostgreSQL数据库连接起来,以计算每个用户访问该站点的次数。

users = context.jdbc( jdbc:postgresql:production , users )

logs = context.load( /path/to/traffic.log )

logs.join(users, logs.userId == users.userId, left_outer ) \

.groupBy( userId ).agg({ * : count })

应用程序:高级分析和机器学习

数据科学家正在采用越来越复杂的技术,而不仅仅是连接和聚合。 为此,可以在MLlib的直接使用DataFrames。 另外,程序可以在DataFrames上运行任意复杂的用户功能。

可以使用MLlib中的新管道API来指定最常见的高级分析任务。 例如,以下代码创建一个简单的文本分类管道,该管道由标记器,哈希项频率特征提取器和逻辑回归组成。

tokenizer = Tokenizer(inputCol= text , outputCol= words )

hashingTF = HashingTF(inputCol= words , outputCol= features )

lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

设置好管道之后,我们可以使用它直接在DataFrame上进行训练:

df = context.load( /path/to/data )

model = pipeline.fit(df)

对于机器学习管道API所不能提供的更复杂的任务,应用程序还可以在DataFrame上应用任意复杂的功能,也可以使用Spark的现有RDD API对其进行操作。 以下代码段在DataFrame的bio列上执行单词计数,即大数据的“ hello world”。

df = context.load( /path/to/people.json )

# RDD-style methods such as map, flatMap are available on DataFrames
# Split the bio text into multiple words.
words = df.select( bio ).flatMap(lambda row: row.bio.split( ))
# Create a new DataFrame to count the number of words words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF()
word_counts = words_df.groupBy( word ).sum()

内幕:智能优化和代码生成

与R和Python中急切评估的数据帧不同,Spark中的DataFrames的执行由查询优化器自动优化。 在开始对DataFrame进行任何计算之前, 会将用于将DataFrame构建到物理计划中的操作编译为要执行的操作。 因为优化器了解操作的语义和数据的结构,所以它可以做出明智的决策来加快计算速度。

在较高级别上,有两种优化。 首先,Catalyst应用了逻辑优化,例如谓词下推。 优化器可以将过滤谓词下推到数据源中,从而使物理执行能够跳过无关的数据。 对于Parquet文件,可以跳过整个块,并可以通过字典编码将对字符串的比较转换为更便宜的整数比较。 对于关系数据库,将谓词下推到外部数据库中以减少数据流量。

其次,Catalyst将操作编译为物理计划以执行,并为这些计划生成 ,而通常比手写代码更优化。 例如,它可以在广播连接和随机连接之间进行明智选择,以减少网络流量。 它还可以执行较低级别的优化,例如消除昂贵的对象分配并减少虚拟函数调用。 因此,当现有Spark程序迁移到DataFrames时,我们期望它们的性能有所提高。

由于优化器会生成JVM字节码以供执行,因此Python用户将获得与Scala和Java用户相同的高性能。

上图比较了在一台计算机上对1000万个整数对进行逐组聚合的运行时性能( )。 由于Scala和Python DataFrame操作均被编译为JVM字节码以执行,因此两种语言之间的差异很小,并且两者的性能均优于原始Python RDD变量的五倍和Scala RDD变量的二倍。

DataFrames受到以前分布式数据框架工作的启发,包括Adatao的DDF和Ayasdi的BigDF。 但是,与这些项目的主要区别在于DataFrames通过Catalyst优化器,从而实现了类似于Spark SQL查询的优化执行。 随着我们改进Catalyst优化器,该引擎也变得更加智能,每个新版本的Spark都使应用程序更快。

我们Databricks的数据科学团队一直在内部数据管道上使用这种新的DataFrame API。 它为我们的Spark程序带来了性能改进,同时使其更加简洁和易于理解。 我们对此感到非常兴奋,并相信它将使大数据处理更广泛的用户访问。

该API是一部分。 您可以从我上个月的演讲( , )中了解更多信息。 请尝试一下。 我们期待您的反馈。

翻译自:

spark大规模数据全排序

转载地址:http://rxizd.baihongyu.com/

你可能感兴趣的文章
小D课堂 - 新版本微服务springcloud+Docker教程_5-05熔断降级服务异常报警通知
查看>>
小D课堂 - 新版本微服务springcloud+Docker教程_6-03 高级篇幅之zuul常用问题分析
查看>>
小D课堂 - 新版本微服务springcloud+Docker教程_5-08 断路器监控仪表参数
查看>>
小D课堂 - 新版本微服务springcloud+Docker教程_6-02 springcloud网关组件zuul
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_2-1.快速搭建SpringBoot项目,采用Eclipse...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_1-4.在线教育后台数据库设计...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_2-3.热部署在Eclipse和IDE里面的使用...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_1-3.在线教育站点需求分析和架构设计...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_2-4.后端项目分层分包及资源文件处理...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_2-2.快速搭建SpringBoot项目,采用IDEA...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_3-5.PageHelper分页插件使用
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_5-6.微信扫码登录回调本地域名映射工具Ngrock...
查看>>
小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_5-8.用户模块开发之保存微信用户信息...
查看>>
Linux下Nginx安装
查看>>
LVM扩容之xfs文件系统
查看>>
Hbase记录-client访问zookeeper大量断开以及参数调优分析(转载)
查看>>
代码片段收集
查看>>
vue-cli3创建项目时报错
查看>>
输入1-53周,输出1-53周的开始时间和结束时间
查看>>
实验二
查看>>