Apache火花<年代p一个n class="tm">™例子

这些例子给火花API的一个简短的概述。引发的概念是建立在分布式数据集,其中包含任意Java、Python对象。从外部数据创建一个数据集,然后运用并行操作。火花API是它的构建块<一个href="//www.leiyimei360.com/docs/latest/rdd-programming-guide.html">抽样API。在抽样的API中,有两种类型的操作:转换,这在之前的基础上,定义一个新的数据集行动开始工作,执行一个集群。火花的抽样API之上,高水平提供了API,例如。<一个href="//www.leiyimei360.com/docs/latest/sql-programming-guide.html">DataFrame API和<一个href="//www.leiyimei360.com/docs/latest/mllib-guide.html">机器学习API。这些高级api提供一种简洁的方式进行一定的数据操作。在这个页面中,我们将展示使用抽样API示例以及使用高级API示例。

抽样API的例子

字数

在这个例子中,我们使用一些转换构建一个数据集(String、Int)对的计数然后将其保存到一个文件。

text_file=sc文本文件(“hdfs: / /……”)计数=text_fileflatMap(λ:分裂(”“))\<年代p一个n class="p">。地图(λ:(,1))\<年代p一个n class="p">。reduceByKey(λ一个,b:一个+b)计数saveAsTextFile(“hdfs: / /……”)
瓦尔文本文件=sc文本文件(“hdfs: / /……”)瓦尔计数=文本文件flatMap(= >分裂(”“))地图(= >(,1))reduceByKey(_+_)计数saveAsTextFile(“hdfs: / /……”)
JavaRDD<字符串>文本文件=sc文本文件(“hdfs: / /……”);JavaPairRDD<字符串,整数>计数=文本文件flatMap(年代- >数组asList(年代分裂(”“))。迭代器())mapToPair(- >Tuple2< > (,1))reduceByKey((一个,b)- >一个+b);计数saveAsTextFile(“hdfs: / /……”);

π估计

火花也可以用来计算密集型任务。这段代码估计<年代p一个n style="font-family: serif; font-size: 120%;">π通过“掷飞镖”循环。我们选择随机点在单位平方((0,0)(1,1)),看看有多少落在单位圆。分数应该<年代p一个n style="font-family: serif; font-size: 120%;">π/ 4,所以我们用它来估计。

def内部(p):x,y=随机随机(),随机随机()返回x*x+y*y<1=sc并行化(范围(0,NUM_SAMPLES))\<年代p一个n class="p">。过滤器(内部)。()打印(“π是大约% f”%(4.0*/NUM_SAMPLES))
瓦尔=sc并行化(1NUM_SAMPLES)。过滤器{_= >瓦尔x=数学随机瓦尔y=数学随机x*x+y*y<1}。()println(年代“π是大约${4.0 *数/ NUM_SAMPLES} ")
列表<整数>l=ArrayList< > (NUM_SAMPLES);(int=0;<NUM_SAMPLES;+ +){l添加();}=sc并行化(l)。过滤器(- >{x=数学随机();y=数学随机();返回x*x+y*y<1;})。();系统println(“π”大概是+4.0*/NUM_SAMPLES);

DataFrame API的例子

在火花,<一个href="//www.leiyimei360.com/docs/latest/sql-programming-guide.html">DataFrame是一个分布式数据组织成命名列的集合。用户可以使用DataFrame API来执行各种关系操作外部数据源和火花的内置分布式集合不提供处理数据的具体过程。同时,项目基于DataFrame API将由火花的内置优化器,自动优化催化剂。

文本搜索

在这个例子中,我们搜索一个日志文件中的错误消息。

文本文件=sc文本文件(“hdfs: / /……”)#创建一个DataFrame拥有单个列命名为“线”df=文本文件地图(λr:(r))。toDF([“行”])错误=df过滤器(上校(“行”)。就像(“% %”错误))#所有的错误错误()#提到MySQL计数错误错误过滤器(上校(“行”)。就像(MySQL“% %”))。()#获取MySQL错误作为一个字符串数组错误过滤器(上校(“行”)。就像(MySQL“% %”))。收集()
瓦尔文本文件=sc文本文件(“hdfs: / /……”)/ /创建一个DataFrame有一个专栏叫“线”瓦尔df=文本文件toDF(“行”)瓦尔错误=df过滤器(上校(“行”)。就像(“% %”错误))/ /计数所有的错误错误()/ /计数错误提到MySQL错误过滤器(上校(“行”)。就像(MySQL“% %”))。()/ /获取MySQL错误作为一个字符串数组错误过滤器(上校(“行”)。就像(MySQL“% %”))。收集()
/ /创建一个DataFrame有一个专栏叫“线”JavaRDD<字符串>文本文件=sc文本文件(“hdfs: / /……”);JavaRDD<>rowRDD=文本文件地图(RowFactory::创建);列表<StructField>字段=数组asList(数据类型createStructField(“行”,数据类型StringType,真正的));StructType模式=数据类型createStructType(字段);DataFramedf=sqlContextcreateDataFrame(rowRDD,模式);DataFrame错误=df过滤器(上校(“行”)。就像(“% %”错误));/ /计数所有的错误错误();/ /计数错误提到MySQL错误过滤器(上校(“行”)。就像(MySQL“% %”))。();/ /获取MySQL错误作为一个字符串数组错误过滤器(上校(“行”)。就像(MySQL“% %”))。收集();

简单的数据操作

在这个例子中,我们读到一个表存储在一个数据库和计算每个时代的人数。最后,我们将计算结果保存到S3的JSON格式。一个简单的示例中使用的MySQL表“人”这个表有两列,“名字”和“年龄”。

#创建一个DataFrame基于一个名为“人”的表#存储在一个MySQL数据库。url=\<年代p一个n class="s">" jdbc: mysql: / / yourIP: yourPort /测试?用户= yourUsername;密码= yourPassword”df=sqlContext\<年代p一个n class="p">。\<年代p一个n class="p">。格式(“jdbc”)\<年代p一个n class="p">。选项(“url”,url)\<年代p一个n class="p">。选项(“数据表”,“人”)\<年代p一个n class="p">。负载()#看起来这个DataFrame的模式。dfprintSchema()#计算人的年龄countsByAge=dfgroupBy(“年龄”)。()countsByAge显示()#保存countsByAge S3的JSON格式。countsByAge格式(“json”)。保存(“s3a: / /……”)
/ /创建一个DataFrame基于一个名为“人”的表/ /存储在一个MySQL数据库。瓦尔url=" jdbc: mysql: / / yourIP: yourPort /测试?用户= yourUsername;密码= yourPassword”瓦尔df=sqlContext格式(“jdbc”)选项(“url”,url)选项(“数据表”,“人”)负载()/ /看起来这个DataFrame的模式。dfprintSchema()/ /计数人按年龄瓦尔countsByAge=dfgroupBy(“年龄”)。()countsByAge显示()/ /保存countsByAge S3的JSON格式。countsByAge格式(“json”)。保存(“s3a: / /……”)
/ /创建一个DataFrame基于一个名为“人”的表/ /存储在一个MySQL数据库。字符串url=" jdbc: mysql: / / yourIP: yourPort /测试?用户= yourUsername;密码= yourPassword”;DataFramedf=sqlContext()格式(“jdbc”)选项(“url”,url)选项(“数据表”,“人”)负载();/ /看起来这个DataFrame的模式。dfprintSchema();/ /计数人按年龄DataFramecountsByAge=dfgroupBy(“年龄”)。();countsByAge显示();/ /保存countsByAge S3的JSON格式。countsByAge()。格式(“json”)。保存(“s3a: / /……”);

机器学习的例子

MLlib,火花的机器学习(ML)库,提供了许多分布式ML算法。这些算法任务,如特征提取、分类、回归、聚类、建议等等。MLlib还提供了工具,如毫升管道构建工作流,CrossValidator调优参数,模型的持久性保存和加载模型。

与逻辑回归预测

在这个例子中,我们把标签和特征向量的数据集。我们学会预测标签使用逻辑回归算法从特征向量。

#每个记录这个DataFrame包含标签和#功能由一个向量表示。df=sqlContextcreateDataFrame(数据,(“标签”,“特征”])#设置参数的算法。#这里,我们限制的迭代数到10。lr=LogisticRegression(麦克斯特=10)#适合模型的数据。模型=lr适合(df)#给定一个数据集,预测每个点的标签,并显示结果。模型变换(df)。显示()
/ /每一个这个DataFrame包含标签和记录/ /功能由一个向量表示。瓦尔df=sqlContextcreateDataFrame(数据)。toDF(“标签”,“特征”)/ /设置参数的算法。/ /在这里,我们限制的迭代数到10。瓦尔lr=LogisticRegression()。setMaxIter(10)/ /符合模型的数据。瓦尔模型=lr适合(df)/ /检查模型:功能权重。瓦尔权重=模型权重/ /给定一个数据集,预测每个点的标签,并显示结果。模型变换(df)。显示()
/ /每一个这个DataFrame包含标签和记录/ /功能由一个向量表示。StructType模式=StructType(StructField[]{StructField(“标签”,数据类型倍增式,,元数据()),StructField(“特征”,VectorUDT(),,元数据()),});DataFramedf=jsqlcreateDataFrame(数据,模式);/ /设置参数的算法。/ /在这里,我们限制的迭代数到10。LogisticRegressionlr=LogisticRegression()。setMaxIter(10);/ /符合模型的数据。LogisticRegressionModel模型=lr适合(df);/ /检查模型:功能权重。向量权重=模型权重();/ /给定一个数据集,预测每个点的标签,并显示结果。模型变换(df)。显示();

附加的例子

许多额外的例子与火花分布:

  • 基本的火花:<一个href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples">Scala的例子,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples">Java示例,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python示例火花流:<一个href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming">Scala的例子,<一个href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming">Java示例
Baidu
map