Quick Start

本教程提供了使用Spark的快速介绍. 我们将首先通过Spark的交互式外壳(在Python或Scala中)介绍API,然后展示如何用Java,Scala和Python编写应用程序.

要遵循本指南,请首先从Spark网站下载Spark的打包版本. 由于我们不会使用HDFS,因此您可以下载适用于任何版本Hadoop的软件包.

请注意,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD). 在Spark 2.0之后,RDD被Dataset取代,Dataset的类型像RDD一样强,但具有更丰富的优化功能. 仍支持RDD界面,您可以在RDD编程指南中获得更详细的参考. 但是,我们强烈建议您切换到使用数据集,该数据集的性能比RDD更好. 请参阅《 SQL编程指南》以获取有关数据集的更多信息.

Security

默认情况下,Spark中的安全性处于关闭状态. 这可能意味着您默认情况下容易受到攻击. 运行Spark之前,请参阅Spark Security .

Interactive Analysis with the Spark Shell

Basics

Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:

./bin/spark-shell

Spark的主要抽象是称为数据集的项目的分布式集合. 可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建数据集. 让我们从Spark源目录中的README文件的文本中创建一个新的数据集:

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以通过调用某些操作直接从数据集中获取值,或转换数据集以获取新值. 有关更多详细信息,请阅读API文档 .

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在,让我们将此数据集转换为新的数据集. 我们调用filter返回一个新的数据集,其中包含文件中项的子集.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以将转换和动作链接在一起:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/pyspark

或者,如果在当前环境中PySpark与pip一起安装:

pyspark

Spark的主要抽象是称为数据集的项目的分布式集合. 可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建数据集. 由于Python的动态性质,我们不需要在Python中对数据集进行强类型化. 结果,Python中的所有数据集都是Dataset [Row],我们将其DataFrame与Pandas和R中的数据框概念一致.让我们从Spark源目录中的README文件的文本中创建一个新的DataFrame:

>>> textFile = spark.read.text("README.md")

您可以通过调用一些操作直接从DataFrame中获取值,或转换DataFrame以获取新的值. 有关更多详细信息,请阅读API文档 .

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

现在,让我们将此DataFrame转换为一个新的. 我们调用filter返回一个新的DataFrame,其中包含文件中各行的子集.

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我们可以将转换和动作链接在一起:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

More on Dataset Operations

数据集操作和转换可用于更复杂的计算. 假设我们要查找包含最多单词的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先,将一条线映射到一个整数值,以创建一个新的数据集. 在该数据集上调用reduce来查找最大的字数. mapreduce的参数是Scala函数文字(闭包),可以使用任何语言功能或Scala / Java库. 例如,我们可以轻松地调用在其他地方声明的函数. 我们将使用Math.max()函数使此代码更易于理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一种常见的数据流模式是Hadoop流行的MapReduce. Spark可以轻松实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

在这里,我们调用flatMap将行的数据集转换为单词的数据集,然后结合groupByKeycount来计算文件中每个单词的计数,作为(String,Long)对的Dataset. 要收集外壳中的字数,我们可以调用collect

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

首先,将一条线映射到一个整数值,并将其别名为" numWords",从而创建一个新的DataFrame. 在该DataFrame上调用agg以查找最大的字数. selectagg的参数都是Column ,我们可以使用df.colName从DataFrame获取列. 我们还可以导入pyspark.sql.functions,它提供了许多方便的功能来从旧的列构建新的列.

一种常见的数据流模式是Hadoop流行的MapReduce. Spark可以轻松实现MapReduce流:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

Here, we use the explode function in select , to transform a Dataset of lines to a Dataset of words, and then combine groupBy and count to compute the per-word counts in the file as a DataFrame of 2 columns: "word" and "count". 在这里,我们在select使用explode函数,将行的数据集转换为单词的数据集,然后结合groupBycount来计算文件中每个单词的计数,作为2列的DataFrame:" word"和"计数". To collect the word counts in our shell, we can call collect : 要收集外壳中的字数,我们可以调用collect

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Caching

Spark还支持将数据集提取到群集范围的内存中缓存中. 当重复访问数据时,例如查询小的"热"数据集或运行迭代算法(如PageRank)时,这非常有用. 作为一个简单的示例,让我们标记我们的linesWithSpark数据集为要缓存的:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

使用Spark浏览和缓存100行文本文件似乎很愚蠢. 有趣的是,即使在数十或数百个节点上进行条带化时,这些相同的函数也可以用于非常大的数据集. 您也可以通过将bin/spark-shell连接到集群来交互式地执行此操作,如RDD编程指南中所述 .

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用Spark浏览和缓存100行文本文件似乎很愚蠢. 有趣的是,即使在数十或数百个节点上进行条带化时,这些相同的函数也可以用于非常大的数据集. 您也可以通过将bin/pyspark连接到集群来交互式地执行此操作,如RDD编程指南中所述 .

Self-Contained Applications

假设我们希望使用Spark API编写一个独立的应用程序. 我们将逐步介绍一个Scala(带有sbt),Java(带有Maven)和Python(pip)的简单应用程序.

我们将在Scala中创建一个非常简单的Spark应用程序-实际上如此简单,它名为SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

请注意,应用程序应定义main()方法,而不是扩展scala.App . scala.App子类可能无法正常工作.

该程序只计算Spark自述文件中包含" a"的行数和包含" b"的行数. 请注意,您需要用安装Spark的位置替换YOUR_SPARK_HOME. 与早期带有Spark外壳的示例(其初始化其自己的SparkSession)不同,我们将SparkSession初始化为程序的一部分.

我们调用SparkSession.builder构造一个[[SparkSession]],然后设置应用程序名称,最后调用getOrCreate获得[[SparkSession]]实例.

我们的应用程序依赖于Spark API,因此我们还将包括一个sbt配置文件build.sbt ,该文件解释了Spark是一个依赖项. 该文件还添加了Spark依赖的存储库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"

对于SBT正常工作,我们需要布置SimpleApp.scalabuild.sbt根据目录结构. 安装好之后,我们可以创建一个包含应用程序代码的JAR包,然后使用spark-submit脚本运行我们的程序.

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

本示例将使用Maven编译应用程序JAR,但是任何类似的构建系统都可以使用.

我们将创建一个非常简单的Spark应用程序SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

该程序只计算Spark自述文件中包含" a"的行数和包含" b"的行数. 请注意,您需要用安装Spark的位置替换YOUR_SPARK_HOME. 与早期带有Spark外壳的示例(其初始化其自己的SparkSession)不同,我们将SparkSession初始化为程序的一部分.

为了构建程序,我们还编写了一个Maven pom.xml文件,其中将Spark列为依赖项. 请注意,Spark工件带有Scala版本标记.

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>2.4.4</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

We lay out these files according to the canonical Maven directory structure:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用Maven打包应用程序,并使用./bin/spark-submit执行它.

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

现在,我们将展示如何使用Python API(PySpark)编写应用程序.

如果要构建打包的PySpark应用程序或库,则可以按以下方式将其添加到setup.py文件中:

    install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

作为示例,我们将创建一个简单的Spark应用程序SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

该程序只计算文本文件中包含" a"的行数和包含" b"的行数. 请注意,您需要用安装Spark的位置替换YOUR_SPARK_HOME. 与Scala和Java示例一样,我们使用SparkSession创建数据集. 对于使用自定义类或第三方库的应用程序,我们还可以通过将代码依赖性打包到.zip文件中来通过其--py-files参数将代码依赖性添加到spark-submit (有关详细信息,请参见spark-submit --help ). SimpleApp非常简单,我们不需要指定任何代码依赖项.

我们可以使用bin/spark-submit脚本运行此应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您的环境中安装了PySpark pip(例如pip install pyspark ),则可以使用常规Python解释器运行应用程序,也可以根据需要使用提供的" spark-submit".

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

Where to Go from Here

祝贺您运行第一个Spark应用程序!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

by  ICOPY.SITE