365bet官方贴吧-365bet官网备用网站-365限制投注额度怎么办

【Spark篇】

【Spark篇】

一、前述

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。

二、具体

原始数据集:

1、count

返回数据集中的元素数。会在结果计算完成后回收到Driver端。返回行数

代码语言:javascript复制package com.spark.spark.actions;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

/**

* count

* 返回结果集中的元素数,会将结果回收到Driver端。

*

*/

public class Operator_count {

public static void main(String[] args) {

SparkConf conf = new SparkConf();

conf.setMaster("local");

conf.setAppName("collect");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD lines = jsc.textFile("./words.txt");

long count = lines.count();

System.out.println(count);

jsc.stop();

}

} 结果:返回行数即元素数

2、take(n)

first=take(1) 返回数据集中的第一个元素。

返回一个包含数据集前n个元素的集合。是一个(array)有几个partiotion 会有几个job触发

代码语言:javascript复制package com.spark.spark.actions;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

/**

* take

*

* @author root

*

*/

public class Operator_takeAndFirst {

public static void main(String[] args) {

SparkConf conf = new SparkConf();

conf.setMaster("local").setAppName("take");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD parallelize = jsc.parallelize(Arrays.asList("a","b","c","d"));

List take = parallelize.take(2);

String first = parallelize.first();

for(String s:take){

System.out.println(s);

}

jsc.stop();

}

}结果:

3、foreach

循环遍历数据集中的每个元素,运行相应的逻辑。

4、collect

将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom.

一般在使用过滤算子或者一些能返回少量数据集的算子后

代码语言:javascript复制package com.spark.spark.actions;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

/**

* collect

* 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。

*

*/

public class Operator_collect {

public static void main(String[] args) {

/**

* SparkConf对象中主要设置Spark运行的环境参数。

* 1.运行模式

* 2.设置Application name

* 3.运行的资源需求

*/

SparkConf conf = new SparkConf();

conf.setMaster("local");

conf.setAppName("collect");

/**

* JavaSparkContext对象是spark运行的上下文,是通往集群的唯一通道。

*/

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD lines = jsc.textFile("./words.txt");

JavaRDD resultRDD = lines.filter(new Function() {

/**

*

*/

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String line) throws Exception {

return !line.contains("hadoop");

}

});

List collect = resultRDD.collect();

for(String s :collect){

System.out.println(s);

}

jsc.stop();

}

}结果:

countByKey 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。(也就是个数)

java代码:

代码语言:javascript复制package com.spark.spark.actions;

import java.util.Arrays;

import java.util.Map;

import java.util.Map.Entry;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

/**

* countByKey

*

* 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map

* @author root

*

*/

public class Operator_countByKey {

public static void main(String[] args) {

SparkConf conf = new SparkConf();

conf.setMaster("local").setAppName("countByKey");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD parallelizePairs = sc.parallelizePairs(Arrays.asList(

new Tuple2(1,"a"),

new Tuple2(2,"b"),

new Tuple2(3,"c"),

new Tuple2(4,"d"),

new Tuple2(4,"e")

));

Map countByKey = parallelizePairs.countByKey();

for(Entry entry : countByKey.entrySet()){

System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());

}

}

}结果:

countByValue 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

java代码:

代码语言:javascript复制package com.spark.spark.actions;

import java.util.Arrays;

import java.util.Map;

import java.util.Map.Entry;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

/**

* countByValue

* 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

*

* @author root

*

*/

public class Operator_countByValue {

public static void main(String[] args) {

SparkConf conf = new SparkConf();

conf.setMaster("local").setAppName("countByKey");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD parallelizePairs = sc.parallelizePairs(Arrays.asList(

new Tuple2(1,"a"),

new Tuple2(2,"b"),

new Tuple2(2,"c"),

new Tuple2(3,"c"),

new Tuple2(4,"d"),

new Tuple2(4,"d")

));

Map, Long> countByValue = parallelizePairs.countByValue();

for(Entry, Long> entry : countByValue.entrySet()){

System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());

}

}

} scala代码:

代码语言:javascript复制package com.bjsxt.spark.actions

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

* countByValue

* 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

*/

object Operator_countByValue {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local").setAppName("countByValue")

val sc = new SparkContext(conf)

val rdd1 = sc.makeRDD(List("a","a","b"))

val rdd2 = rdd1.countByValue()

rdd2.foreach(println)

sc.stop()

}

} 代码结果:

java:

scala:

reduce 根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算)

java代码:

代码语言:javascript复制package com.spark.spark.actions;

import java.util.Arrays;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

/**

* reduce

*

* 根据聚合逻辑聚合数据集中的每个元素。

* @author root

*

*/

public class Operator_reduce {

public static void main(String[] args) {

SparkConf conf = new SparkConf();

conf.setMaster("local").setAppName("reduce");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5));

Integer reduceResult = parallelize.reduce(new Function2() {

/**

*

*/

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1+v2;

}

});

System.out.println(reduceResult);

sc.stop();

}

}scala代码:

代码语言:javascript复制package com.bjsxt.spark.actions

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

* reduce

*

* 根据聚合逻辑聚合数据集中的每个元素。

*/

object Operator_reduce {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local").setAppName("reduce")

val sc = new SparkContext(conf)

val rdd1 = sc.makeRDD(Array(1,2))

val result = rdd1.reduce(_+_)

println(result)

sc.stop()

}

} 结果:

java:

scala:

← 上一篇: 课程实施的18种方式
下一篇: 《魔兽怀旧服》WLK鱼类分布大全 →

相关推荐

芩扬这个名字怎么样

芩扬这个名字怎么样

2025-12-31 12:45:59 阅读: 6516
法海搜珍│治鞭伤

法海搜珍│治鞭伤

2025-10-22 15:48:21 阅读: 2783
怎么让微信运动不计步

怎么让微信运动不计步

2026-02-22 02:10:16 阅读: 4085
定点数(fixed-point number)的表示方法

定点数(fixed-point number)的表示方法

2025-09-30 19:17:04 阅读: 7958
汽车之家

汽车之家

2025-09-27 12:37:31 阅读: 6811
琛字的起名意思

琛字的起名意思

2025-10-03 02:29:24 阅读: 3693