当前位置: 首页>編程日記>正文

Spark入门基本操作

Spark入门基本操作

Spark基本操作

一,Spark的安裝

以后补上

二,Spark介紹

2.1 RDD

2.1.1 RDD及其特點

RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集

2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。

4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

2.1.2 創建RDD

进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

Spark Core提供了三种创建RDD的方式:

  1. 使用程序中的集合创建RDD(主要用于测试)

    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers);
    
  2. 使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)

    SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
    sparkConf.setMaster("local[*]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    String tmpPath = "E:\\data\\spark\\input.txt";
    JavaRDD<String> lines = javaSparkContext.textFile(tmpPath);
    
  3. 使用HDFS文件创建RDD(生产环境的常用方式)

SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);String hdfsBasePath = "hdfs://10.197.29.203:9000";
//文本文件的hdfs路径
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
//导入文件(創建RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

2.1.3 操作RDD

Spark支持两种RDD操作:transformation和action。

transformation

transformation 操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执 行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。

常用的transformation介绍:

  • map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。
  • flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。
  • mapToPair:於map類似,輸出類型為 <k, v>格式。
  • filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
  • groupByKey:根据key进行分组,每个key对应一个Iterable<value>
  • reduceByKey:对每个key对应的value进行reduce操作。
  • sortByKey:对每个key对应的value进行排序操作。
  • join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。
  • cogroup:同join,但是每个key对应的Iterable都会传入自定义函数进行处理。

action

action 操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个 spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

常用的action介绍:

  • reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
  • collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。
  • count:获取RDD元素总数。
  • take(n):获取RDD中前n个元素。
  • saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。
  • countByKey:对每个key对应的值进行count计数。
  • foreach:遍历RDD中的每个元素。

2.2 排序和關聯

2.2.1 SortByKey

这个方法只是对Key进行排序,value不排序。

默認排序

		JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Tuple2<String, Integer>> source = Arrays.asList(new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79),new Tuple2<>("marry", 89), new Tuple2<>("sb", 89), new Tuple2<>("lemoon", 79));//導入數據JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);System.out.println(textFile.sortByKey().collect());//降序System.out.println(textFile.sortByKey(false).collect());//升序打印結果:
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]

自定義排序

		List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");//導入數據JavaRDD<String> textFile = javaSparkContext.parallelize(source);JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));List<Tuple2<String, String>> list = ss.take(10);for (Tuple2<String, String> t: list) {System.err.println(t._1() + "  :   " + t._2());}打印結果:
1  :   1
1  :   5
1  :   7
1  :   12
2  :   4
2  :   7
3  :   6
5  :   3

自定義比較類MySort:

class MySort implements Serializable, Comparator<String> {@Overridepublic int compare(String o1, String o2) {String[] o1s = o1.split(" ");String[] o2s = o2.split(" ");int o1s_1 = Integer.valueOf(o1s[0]);int o1s_2 = Integer.valueOf(o1s[1]);int o2s_1 = Integer.valueOf(o2s[0]);int o2s_2 = Integer.valueOf(o2s[1]);if (o1s_1 > o2s_1) {return 1;}else if (o1s_1 == o2s_1){return o1s_2 - o2s_2;}return -1;}
}

2.2.2 join和cogroup

join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并。

package com.lmc.spark.test;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;/*** @ClassName: JoinTest* @author: Leemon* @Description: TODO* @date: 2021/3/17 13:54* @version: 1.0*/
public class JoinTest {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");sparkConf.setMaster("local[*]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Tuple2<String, Integer>> source1 = Arrays.asList(new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));List<Tuple2<String, Integer>> source2 = Arrays.asList(new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));//導入數據JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);//關聯數據JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();//輸出target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));System.out.println("===================================================");target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));}}//輸出結果
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58

三,Spark基本案例(WordCount)

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>lmc-user</artifactId><groupId>com.lmc</groupId><version>0.0.1-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>spark-test</artifactId><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>2.4.3</version></dependency></dependencies></project>

3.2 WordCount.java

需求:計算文件中每個單詞出現的次數

源文件:

One morning a fox sees a cock.He think,"This is my breakfast.''He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''

計算:

package com.lmc.spark;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;/*** @ClassName: WorkCount* @author: Leemon* @Description: TODO* @date: 2021/1/28 15:10* @version: 1.0*/
public class WordCount {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");sparkConf.setMaster("local[*]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);String hdfsBasePath = "hdfs://10.197.29.203:9000";//文本文件的hdfs路径String inputPath = hdfsBasePath + "/wordCount/input/input.txt";//输出结果文件的hdfs路径String outputPath = hdfsBasePath + "/wordCount/output2/";System.out.println("input path : " + inputPath);System.out.println("output path : " + outputPath);//导入文件JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);JavaPairRDD<String, Integer> counts = textFile//每一行都分割成单词,返回后组成一个大集合.flatMap(s -> Arrays.asList(s.split(" ")).iterator())//key是单词,value是1.mapToPair(word -> new Tuple2<>(word, 1))//基于key进行reduce,逻辑是将value累加.reduceByKey((a, b) -> a + b);//先将key和value倒过来,再按照key排序JavaPairRDD<String, Integer> sorts = counts//key和value颠倒,生成新的map.mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));//按照key倒排序//.sortByKey(false);//取前10个List<Tuple2<String, Integer>> collect = sorts.take(10);//打印出来for(Tuple2<String, Integer> tuple2 : collect){System.out.println(tuple2._1() + "\t" + tuple2._2());}//分区合并成一个,再导出为一个txt保存在hdfsjavaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);//关闭contextjavaSparkContext.close();}}

結果:

(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)

3.3 FileSort.java

需求:

  1. 按照文件中的第一列排序。
  2. 如果第一列相同,则按照第二列排序。
/*** @ClassName: FileSort* @author: Leemon* @Description: TODO * @date: 2021/3/8 16:33* @version: 1.0*/
public class FileSort {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");sparkConf.setMaster("local[*]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");//導入數據JavaRDD<String> textFile = javaSparkContext.parallelize(source);JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));List<Tuple2<String, String>> list = ss.take(10);for (Tuple2<String, String> t: list) {System.err.println(t._1() + "  :   " + t._2());}}static class MySort implements Serializable, Comparator<String> {@Overridepublic int compare(String o1, String o2) {String[] o1s = o1.split(" ");String[] o2s = o2.split(" ");int o1s_1 = Integer.valueOf(o1s[0]);int o1s_2 = Integer.valueOf(o1s[1]);int o2s_1 = Integer.valueOf(o2s[0]);int o2s_2 = Integer.valueOf(o2s[1]);if (o1s_1 > o2s_1) {return 1;}else if (o1s_1 == o2s_1){return o1s_2 - o2s_2;}return -1;}}}
//1  :   1
//1  :   5
//1  :   7
//1  :   12
//2  :   4
//2  :   7
//3  :   6
//5  :   3

3.4 ClassSort.java

需求:对每个班级内的学生成绩,取出前3名。

數據源:

class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91

計算:

/*** @ClassName: ClassSort* @author: Leemon* @Description: TODO* @date: 2021/3/9 17:15* @version: 1.0*/
public class ClassSort {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");sparkConf.setMaster("local[*]");sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);String hdfsBasePath = "hdfs://10.197.29.203:9000";//文本文件的hdfs路径String inputPath = hdfsBasePath + "/spark/classSort/input.txt";//输出结果文件的hdfs路径String outputPath = hdfsBasePath + "/spark/classSort/";System.out.println("input path : " + inputPath);System.out.println("output path : " + outputPath);//导入文件JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);//分行JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));//排序JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());//切割key - valueJavaPairRDD<String, Integer> splits =sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));//分組JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();//只獲取value前三個JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());//獲取10個key的內容List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);for (Tuple2<String, List<Integer>> t: tuple2s) {for (Integer i : t._2() ) {System.err.println(t._1() + "  " + i);}System.err.println("--------------------");}}static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {@Overridepublic Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {Iterator<Integer> its = stringIterableTuple2._2().iterator();List<Integer> ints = new ArrayList<>();int count = 0;while (its.hasNext()) {++count;ints.add(its.next());if (count >= 3) break;}return new Tuple2<>(stringIterableTuple2._1(), ints);}}static class MySort1 implements Serializable, Comparator<String> {@Overridepublic int compare(String o1, String o2) {String[] o1s = o1.split(" ");String[] o2s = o2.split(" ");int o1s_2 = Integer.valueOf(o1s[1]);int o2s_2 = Integer.valueOf(o2s[1]);if (o1s_2 >= o2s_2) return -1;return 1;}}
}

結果:

class2  88
class2  78
class2  64
--------------------
class3  91
class3  88
class3  77
--------------------
class1  92
class1  88
class1  79
--------------------


https://www.fengoutiyan.com/post/13241.html

相关文章:

  • 菜鸟教程docker命令
  • Spark sql
  • hadoop零基础入门
  • hadoop入门
  • Apache Spark
  • docker菜鸟教程
  • hadoop菜鸟教程
  • excel基础入门教程
  • 鏡像模式如何設置在哪,圖片鏡像操作
  • 什么軟件可以把圖片鏡像翻轉,C#圖片處理 解決左右鏡像相反(旋轉圖片)
  • 手機照片鏡像翻轉,C#圖像鏡像
  • 視頻鏡像翻轉軟件,python圖片鏡像翻轉_python中鏡像實現方法
  • 什么軟件可以把圖片鏡像翻轉,利用PS實現圖片的鏡像處理
  • 照片鏡像翻轉app,java實現圖片鏡像翻轉
  • 什么軟件可以把圖片鏡像翻轉,python圖片鏡像翻轉_python圖像處理之鏡像實現方法
  • matlab下載,matlab如何鏡像處理圖片,matlab實現圖像鏡像
  • 圖片鏡像翻轉,MATLAB:鏡像圖片
  • 鏡像翻轉圖片的軟件,圖像處理:實現圖片鏡像(基于python)
  • canvas可畫,JavaScript - canvas - 鏡像圖片
  • 圖片鏡像翻轉,UGUI優化:使用鏡像圖片
  • Codeforces,CodeForces 1253C
  • MySQL下載安裝,Mysql ERROR: 1253 解決方法
  • 勝利大逃亡英雄逃亡方案,HDU - 1253 勝利大逃亡 BFS
  • 大一c語言期末考試試題及答案匯總,電大計算機C語言1253,1253《C語言程序設計》電大期末精彩試題及其問題詳解
  • lu求解線性方程組,P1253 [yLOI2018] 扶蘇的問題 (線段樹)
  • c語言程序設計基礎題庫,1253號C語言程序設計試題,2016年1月試卷號1253C語言程序設計A.pdf
  • 信奧賽一本通官網,【信奧賽一本通】1253:抓住那頭牛(詳細代碼)
  • c語言程序設計1253,1253c語言程序設計a(2010年1月)
  • 勝利大逃亡英雄逃亡方案,BFS——1253 勝利大逃亡
  • 直流電壓測量模塊,IM1253B交直流電能計量模塊(艾銳達光電)
  • c語言程序設計第三版課后答案,【渝粵題庫】國家開放大學2021春1253C語言程序設計答案
  • 18轉換為二進制,1253. 將數字轉換為16進制
  • light-emitting diode,LightOJ-1253 Misere Nim
  • masterroyale魔改版,1253 Dungeon Master
  • codeformer官網中文版,codeforces.1253 B
  • c語言程序設計考研真題及答案,2020C語言程序設計1253,1253計算機科學與技術專業C語言程序設計A科目2020年09月國家開 放大學(中央廣播電視大學)
  • c語言程序設計基礎題庫,1253本科2016c語言程序設計試題,1253電大《C語言程序設計A》試題和答案200901
  • 肇事逃逸車輛無法聯系到車主怎么辦,1253尋找肇事司機