快速启动

本教程提供了简要介绍如何使用火花。我们将首先介绍API通过火花的交互式shell(在Python或Scala),然后展示如何用Java编写应用程序,Scala和Python。

要跟随本指南,首先下载一个打包发布的火花<一个href="//www.leiyimei360.com/downloads.html">火花的网站。因为我们不会使用HDFS,您可以下载一个包Hadoop的任何版本。

注意,之前2.0火花,火花的主要编程接口是弹性分布式数据集(抽样)。2.0火花后,抽样数据集所取代,这是强类型的抽样,但丰富的优化。抽样界面仍然支持,你可以得到一个更详细的参考<一个href="//www.leiyimei360.com/docs/latest/rdd-programming-guide.html">抽样编程指南。然而,我们强烈建议您切换到使用数据集,比抽样具有更好的性能。看到<一个href="//www.leiyimei360.com/docs/latest/sql-programming-guide.html">SQL编程指南数据集的更多信息。

交互式分析引发壳

基础知识

火花的shell学习API提供了一个简单的方法,以及一个强大的工具来分析数据交互。也可在Scala(运行在Java VM,因此使用现有的Java库的一个好方法)或Python。启动它通过运行以下火花目录:

/ bin / spark-shell

火花的主要抽象是一个分布式称为数据集的项的集合。可以创建数据集来自Hadoop InputFormats(如HDFS文件)或通过改造其他数据集。让我们做一个新的文本数据集的火花源目录的README文件:

scala>瓦尔文本文件=火花文本文件(“README.md”)文本文件:org.apache.spark.sql.Dataset(字符串]=(值:字符串]

您可以直接从数据集获取值,通过调用一些动作,或者改变一个新的数据集。有关详细信息,请阅读API文档

scala>文本文件()/ /在这个数据集的项的数量res0:=126年/ /可能不同于你的自述。医学博士将改变随着时间的推移,类似于其他输出scala>文本文件第一个()/ /第一个项目在这个数据集res1:字符串=#Apache火花

现在,让我们将这个数据集转换成一个新的。我们称之为过滤器返回一个新的数据集文件中的条目的一个子集。

scala>瓦尔linesWithSpark=文本文件过滤器(= >包含(“火花”))linesWithSpark:org.apache.spark.sql.Dataset(字符串]=(值:字符串]

我们可以一起链转换和行动:

scala>文本文件过滤器(= >包含(“火花”))。()/ /多少行包含“火花”?res3:=15
/ bin / pyspark

或者如果PySpark皮普在当前安装环境:

pyspark

火花的主要抽象是一个分布式称为数据集的项的集合。可以创建数据集来自Hadoop InputFormats(如HDFS文件)或通过改造其他数据集。由于Python的动态特性,我们不需要在Python中强类型数据集。结果,所有数据集在Python数据集(行),我们叫它DataFrame与数据帧的概念是一致的熊猫和r .让我们新建一个文本的DataFrame火花源目录的README文件:

> > >文本文件=火花文本(“README.md”)

您可以直接从DataFrame得到值,通过调用一些动作,或者变换DataFrame来获得一个新的。有关详细信息,请阅读API文档

> > >文本文件()在这个DataFrame #的行数126年> > >文本文件第一个()在这个DataFrame #第一行(价值=u“# Apache火花”)

现在让我们改变这个DataFrame一个新的。我们称之为过滤器返回一个新的DataFrame行文件的一个子集。

> > >linesWithSpark=文本文件过滤器(文本文件价值包含(“火花”))

我们可以一起链转换和行动:

> > >文本文件过滤器(文本文件价值包含(“火花”))。()#多少行包含“火花”?15

更多的数据操作

数据集可以用于更复杂的操作和转换计算。假设我们想要找到最多的一行字:

scala>文本文件地图(= >分裂(”“)。大小)。减少((一个,b)= >如果(一个>b)一个其他的b)res4:Int=15

第一行映射到一个整数价值,创建一个新的数据集。减少被称为数据集上找到最大的单词计数。的参数地图减少Scala函数字面量(关闭),可以使用任何语言特性或Scala / Java库。例如,我们可以很容易地调用函数声明。我们将使用Math.max ()函数来让代码更容易理解:

scala>进口java.lang.Math进口java.lang.Mathscala>文本文件地图(= >分裂(”“)。大小)。减少((一个,b)= >数学马克斯(一个,b))res5:Int=15

一个常见的数据流模式是MapReduce, Hadoop的推广。火花很容易实现MapReduce流:

scala>瓦尔wordcount=文本文件flatMap(= >分裂(”“))。groupByKey(身份)。()wordcount:org.apache.spark.sql.Dataset((字符串,<年代pan class="kt">长)]=(值:字符串,<年代pan class="kt">数(1):长整型数字]

在这里,我们调用flatMap将一个数据集的行数据集的话,然后结合groupByKey计算每一项的文件作为数据集(字符串,长)对。收集单词统计在我们的外壳,我们可以打电话收集:

scala>wordcount收集()res6:数组((字符串,<年代pan class="kt">Int)]=数组((意味着,1),(,2),(,3),(因为,1),(Python,2),(同意,1),(集群,1),…)
> > >pyspark.sql.functions进口*> > >文本文件选择(大小(分裂(文本文件价值,“\ s +”))。的名字(“numWords”))。gg(马克斯(上校(“numWords”)))。收集()((马克斯(numWords)=15)]

第一行映射到一个整数值,别名“numWords”,创建一个新的DataFrame。gg是呼吁DataFrame找到最大的单词计数。的参数选择gg都是,我们可以用df.colName从DataFrame列。我们也可以导入pyspark.sql。功能,它提供了很多方便的函数来建立一个新的列从一个旧的。

一个常见的数据流模式是MapReduce, Hadoop的推广。火花很容易实现MapReduce流:

> > >wordcount=文本文件选择(爆炸(分裂(文本文件价值,“\ s +”))。别名(“单词”))。groupBy(“单词”)。()

这里,我们使用爆炸函数选择,将一个数据集的行数据集的话,然后结合groupBy计算每一项的文件作为DataFrame 2列:“词”和“数”。收集单词统计在我们的外壳,我们可以打电话收集:

> > >wordcount收集()((=u“在线”,=1),(=u“图”,=1),…]

缓存

火花还支持将数据集到一个集群范围的内存缓存。反复访问数据时,这是非常有用的,例如当查询一个小“热”数据集或运行时像PageRank迭代算法。作为一个简单的例子,让我们纪念linesWithSpark数据缓存:

scala>linesWithSpark缓存()res7:linesWithSpark。类型=(值:字符串]scala>linesWithSpark()res8:=15scala>linesWithSpark()res9:=15

看起来愚蠢的使用引发探索和缓存100行文本文件。有趣的部分是,这些相同的函数可以用于非常大的数据集,即使他们跨越几十或几百个节点。你也可以交互地通过连接bin / spark-shell到一个集群,如所描述的<一个href="//www.leiyimei360.com/docs/latest/rdd-programming-guide.html">抽样编程指南

> > >linesWithSpark缓存()> > >linesWithSpark()15> > >linesWithSpark()15

看起来愚蠢的使用引发探索和缓存100行文本文件。有趣的部分是,这些相同的函数可以用于非常大的数据集,即使他们跨越几十或几百个节点。你也可以交互地通过连接bin / pyspark到一个集群,如所描述的<一个href="//www.leiyimei360.com/docs/latest/rdd-programming-guide.html">抽样编程指南

独立的应用程序

假设我们希望编写一个自包含的应用程序使用火花API。我们将走在Scala中的一个简单的应用程序(sbt), Java (Maven)和Python (pip)。

我们将创建一个非常简单的火花在Scala-so应用简单,事实上,它的命名SimpleApp.scala:

/ * SimpleApp。scala * /进口org.apache.spark.sql.SparkSession对象SimpleApp{def主要(arg游戏:数组(字符串]){瓦尔日志文件=“YOUR_SPARK_HOME / README.md”/ /应该一些文件在你的系统瓦尔火花=SparkSession构建器浏览器名称(“简单的应用程序”)。getOrCreate()瓦尔logData=火花文本文件(日志文件)。缓存()瓦尔numa=logData过滤器(= >包含(“一个”))。()瓦尔麻木=logData过滤器(= >包含(“b”))。()println(年代“行:$ numa线与b: $麻木”)火花停止()}}

注意,应用程序应该定义一个main ()方法,而不是扩展scala.App。的子类scala.App可能不会正常工作。

这个程序只是数量的行数包含“a”,包含“b”火花README。请注意,你需要替换YOUR_SPARK_HOME火花的位置安装。与前面的例子引发壳,初始化自己的SparkSession,我们初始化一个SparkSession作为该计划的一部分。

我们称之为SparkSession.builder构建一个SparkSession,然后设置应用程序名称,最后调用getOrCreate得到SparkSession实例。

我们的应用程序依赖于火花API,所以我们还将包括一个sbt配置文件,build.sbt,这解释了火花是依赖。这个文件还添加了一个存储库,火花取决于:

的名字:=“简单的项目”版本:=“1.0”scalaVersion:=“2.12.17”libraryDependencies+ =“org.apache.spark”% %“spark-sql”%“3.4.1”

sbt正常工作,我们需要布局SimpleApp.scalabuild.sbt根据典型的目录结构。一旦到位,我们可以创建一个JAR包包含应用程序的代码,然后使用spark-submit脚本运行我们的程序。

#你的目录布局应该是这样的美元找到<年代pan class="nb">。/构建。年代bt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala<年代pan class="c">#包包含您的应用程序的jar美元sbt包……<年代pan class="o">(包装信息)<年代pan class="o">{. .<年代pan class="o">}/<年代pan class="o">{. .<年代pan class="o">}/ / scala - 2.12 / simple-project_2.12-1.0.jar目标<年代pan class="c">#使用spark-submit运行您的应用程序美元YOUR_SPARK_HOME / bin / spark-submit<年代pan class="se">\——类“SimpleApp”\——主当地的(4)<年代pan class="se">\目标/ scala - 2.12 /简单——project_2.12 - 1.0。罐子…与一线:46,行b: 23

这个例子将使用Maven来编译应用程序JAR,但任何类似的构建系统。

我们将创建一个非常简单的火花的应用程序,SimpleApp.java:

/ * SimpleApp。java * /进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.Dataset;公共SimpleApp{公共静态无效主要(字符串[]arg游戏){字符串日志文件=“YOUR_SPARK_HOME / README.md”;/ /应该一些文件在你的系统SparkSession火花=SparkSession构建器()。浏览器名称(“简单的应用程序”)。getOrCreate();数据集<字符串>logData=火花()。文本文件(日志文件)。缓存();numa=logData过滤器(年代- >年代包含(“一个”))。();麻木=logData过滤器(年代- >年代包含(“b”))。();系统println(“行:“+numa+”,与b线:“+麻木);火花停止();}}

这个程序只是数量的行数包含“a”,包含“b”火花README。请注意,你需要替换YOUR_SPARK_HOME火花的位置安装。与前面的例子引发壳,初始化自己的SparkSession,我们初始化一个SparkSession作为该计划的一部分。

建立程序,我们也写一个Mavenpom.xml文件列表作为附件的火花。注意,火花工件与Scala版本标记。

<项目>< groupId >edu.berkeley<年代pan class="nt">< / groupId >< artifactId >简单的项目<年代pan class="nt">< / artifactId >< modelVersion >4.0.0<年代pan class="nt">< / modelVersion ><名称>简单的项目<年代pan class="nt">< /名称><包装>jar<年代pan class="nt">< /包装><版本>1.0<年代pan class="nt">> < /版本< >的依赖关系<依赖>< !——引发依赖- - >< groupId >org.apache.spark<年代pan class="nt">< / groupId >< artifactId >spark-sql_2.12<年代pan class="nt">< / artifactId ><版本>3.4.1<年代pan class="nt">> < /版本<范围>提供<年代pan class="nt">< / >范围< / >的依赖< / >的依赖关系< /项目>

我们把这些文件根据规范化Maven目录结构:

美元找到<年代pan class="nb">。/ pom。xml / src / main / src。/ src / main / java。/ src / main / java / SimpleApp.java

现在,我们可以使用Maven和打包应用程序执行它/ bin / spark-submit

#包包含您的应用程序的JAR美元mvn包……<年代pan class="o">(信息构建jar:<年代pan class="o">{. .<年代pan class="o">}/<年代pan class="o">{. .<年代pan class="o">}/目标/ simple-project-1.0.jar<年代pan class="c">#使用spark-submit运行您的应用程序美元YOUR_SPARK_HOME / bin / spark-submit<年代pan class="se">\——类“SimpleApp”\——主当地的(4)<年代pan class="se">\目标/简单-项目- 1.0。罐子…与一线:46,行b: 23

现在,我们将展示如何编写一个应用程序使用Python API (PySpark)。

如果您正在构建一个打包PySpark应用程序或库可以将它添加到您的设置。py文件:

install_requires=(“pyspark = = 3.4.1”]

作为一个例子,我们将创建一个简单的火花应用程序,SimpleApp.py:

SimpleApp.py“”“”pyspark.sql进口SparkSession日志文件=“YOUR_SPARK_HOME / README.md”#应该是一些文件在您的系统上火花=SparkSession构建器浏览器名称(“SimpleApp”)。getOrCreate()logData=火花文本(日志文件)。缓存()numa=logData过滤器(logData价值包含(“一个”))。()麻木=logData过滤器(logData价值包含(“b”))。()打印(“行:%我,行与b: %我”%(numa,麻木))火花停止()

这个程序只是数量的行数包含“a”和包含“b”的数量在一个文本文件中。请注意,你需要替换YOUR_SPARK_HOME火花的位置安装。与Scala和Java示例中,我们使用一个SparkSession创建数据集。使用定制类的应用程序或第三方库,我们也可以添加代码依赖项spark-submit通过其——py-files参数被打包成一个zip文件(请参阅spark-submit——帮助详情)。SimpleApp很简单,我们不需要指定任何代码依赖项。

我们可以运行这个应用程序使用bin / spark-submit脚本:

#使用spark-submit运行您的应用程序美元YOUR_SPARK_HOME / bin / spark-submit<年代pan class="se">\——主当地的(4)<年代pan class="se">\SimpleApp.py. .。与一线:46,行b: 23

如果你有PySpark pip安装到您的环境(例如,pip安装pyspark),你可以用常规的Python解释器运行您的应用程序或使用所提供的“spark-submit”作为你喜欢。

#使用Python解释器来运行您的应用程序美元python SimpleApp。py……与一线:46,行b: 23

其他依赖项管理工具如Conda和皮普也可以用于自定义类或第三方库。另请参阅<一个href="//www.leiyimei360.com/docs/latest/api/python/user_guide/python_packaging.html">Python包管理

从这里去哪里

祝贺你运行应用程序第一个火花!

  • 为深入API的概述,从<一个href="//www.leiyimei360.com/docs/latest/rdd-programming-guide.html">抽样编程指南和<一个href="//www.leiyimei360.com/docs/latest/sql-programming-guide.html">SQL编程指南或看到“编程指南”菜单其他组件。
  • 集群上运行应用程序,头<一个href="//www.leiyimei360.com/docs/latest/cluster-overview.html">部署概述
  • 最后,引发包括几个样品例子目录(<一个href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples">Scala,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples">Java,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/r">R)。您可以运行如下:
Scala和Java,使用# run-example:. / bin / run-example SparkPi<年代pan class="c">#为Python示例中,直接使用spark-submit:。/ bin / spark-submit例子/ src / main / python / pi.py<年代pan class="c"># R的例子,直接使用spark-submit:. / bin / spark-submit / src / main / r / dataframe.R例子
Baidu
map