闪电般的统一分析引擎

Apache Spark Examples

这些示例简要介绍了Spark API. Spark建立在分布式数据集的概念上,该数据集包含任意Java或Python对象. 您从外部数据创建数据集,然后对其应用并行操作. Spark API的构建块是其RDD API . 在RDD API中,有两种类型的操作: 转换(transforms)动作(actions) ,前者基于以前的操作定义一个新的数据集,后者启动一个作业以在集群上执行. 除了Spark的RDD API之外,还提供了高级API,例如DataFrame APIMachine Learning API . 这些高级API提供了执行某些数据操作的简洁方法. 在此页面中,我们将显示使用RDD API的示例以及使用高级API的示例.

RDD API Examples

Word Count

在本示例中,我们使用一些转换来构建( counts ,整数)对的数据集(称为counts ,然后将其保存到文件中.

text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaPairRDD<String, Integer> counts = textFile
    .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://...");

Pi Estimation

Spark也可以用于计算密集型任务. 此代码通过"掷飞镖"来估计π . 我们在单位正方形((0,0)至(1,1))中选择随机点,并查看有多少个落入单位圆. 该分数应为π/ 4 ,因此我们使用它来获取估计值.

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
             .filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
List<Integer> l = new ArrayList<>(NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
  l.add(i);
}

long count = sc.parallelize(l).filter(i -> {
  double x = Math.random();
  double y = Math.random();
  return x*x + y*y < 1;
}).count();
System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);

DataFrame API Examples

在Spark中, DataFrame是组织为命名列的分布式数据集合. 用户可以使用DataFrame API在外部数据源和Spark的内置分布式集合上执行各种关系操作,而无需提供处理数据的特定过程. 同样,基于DataFrame API的程序将由Spark的内置优化器Catalyst自动进行优化.

Text Search

在此示例中,我们在日志文件中搜索错误消息.

textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
val textFile = sc.textFile("hdfs://...")

// Creates a DataFrame having a single column named "line"
val df = textFile.toDF("line")
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
// Creates a DataFrame having a single column named "line"
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<Row> rowRDD = textFile.map(RowFactory::create);
List<StructField> fields = Arrays.asList(
  DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

DataFrame errors = df.filter(col("line").like("%ERROR%"));
// Counts all the errors
errors.count();
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count();
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect();

Simple Data Operations

在此示例中,我们读取了存储在数据库中的表格,并计算了每个年龄段的人数. 最后,我们将计算结果以JSON格式保存到S3. 示例中使用了一个简单的MySQL表" people",该表有两列," name"和" age".

# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
  .read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load()

// Looks the schema of this DataFrame.
df.printSchema()

// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()

// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
String url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
DataFrame df = sqlContext
  .read()
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load();

// Looks the schema of this DataFrame.
df.printSchema();

// Counts people by age
DataFrame countsByAge = df.groupBy("age").count();
countsByAge.show();

// Saves countsByAge to S3 in the JSON format.
countsByAge.write().format("json").save("s3a://...");

Machine Learning Example

Spark的机器学习(ML)库MLlib提供了许多分布式ML算法. 这些算法涵盖诸如特征提取,分类,回归,聚类,推荐等任务. MLlib还提供了一些工具,例如用于构建工作流的ML管道,用于调整参数的CrossValidator以及用于保存和加载模型的模型持久性.

Prediction with Logistic Regression

在此示例中,我们采用标签和特征向量的数据集. 我们学习使用Logistic回归算法从特征向量预测标签.

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")

// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)

// Fit the model to the data.
val model = lr.fit(df)

// Inspect the model: get the feature weights.
val weights = model.weights

// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
// Every record of this DataFrame contains the label and
// features represented by a vector.
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);

// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
LogisticRegression lr = new LogisticRegression().setMaxIter(10);

// Fit the model to the data.
LogisticRegressionModel model = lr.fit(df);

// Inspect the model: get the feature weights.
Vector weights = model.weights();

// Given a dataset, predict each point's label, and show the results.
model.transform(df).show();

Additional Examples

Spark附带了许多其他示例:

by  ICOPY.SITE