Welcome加拿大28为梦而年轻!

好程序员-千锋教育旗下高端IT职业教育品牌

400-811-9990
  • 客服QQ
  • 官方微信

    好程序员

    专注高端IT职业培训

[BigData] 好程序员大数据培训教程分享Master的jps

[复制链接]
29 0
好程序员大数据培训教程分享Master的jps


SparkSubmit
  类启动后的服务进程,用于提交任务,
  哪一段启动提交任务,哪一段启动submit(Driver端)

提交任务流程
1.Driver端提交任务到Master(启动sparkSubmit进程)
2.Master生成任务信息,放入对列中
3.Master通知Worker启动Executor,(Master过滤出存活的Worker,将任务分配给空闲资源多的worker)
4.worker的Executor向Driver端注册(只有executor真正参与计算) -> worker从Dirver端拿信息
5.Driver端启动Executor将任务划分阶段,分成小的task,再广播给相应的Worker让他去执行
6.worker会将执行完的任务回传给Driver


range 相当于集合子类
scala> 1.to(10)
res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,
9, 10)
[size=10.5000pt]
scala> 1 to 10
res1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,
9, 10)

提交任务到集群的任务类 :
Spark context available as sc
SQL context available as sqlContext
直接调用:
spark WordCount
构建模板代码:
SparkConf:构建配置信息类,该配置优先于集群配置文件
setAppName:指定应用程序名称,如果不指定,会自动生成一个类似于uuid产生的名称
setMaster:指定运行模式:local-用1个线程模拟集群运行,
local[2]: 用2个线程模拟集群运行,loca-当前有多少空闲到的线程就用多少线程来运行该任务
/**
  * 用spark实现单词计数
  */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 构建模板代码
      */
    val conf: SparkConf = new SparkConf()
      .setAppName("SparkWordCount")
//      .setMaster("local[2]")
[size=10.5000pt]
    // 创建提交任务到集群的入口类(上下文对象)
    val sc: SparkContext = new SparkContext(conf)
[size=10.5000pt]
    // 获取HDFS的数据
    val lines: RDD[String] = sc.textFile(args(0))
[size=10.5000pt]
    // 切分数据,生成一个个单词
    val words: RDD[String] = lines.flatMap(_.split(" "))
[size=10.5000pt]
    // 把单词生成一个个元组
    val tuples: RDD[(String, Int)] = words.map((_, 1))
[size=10.5000pt]
    // 进行聚合操作
//    tuples.reduceByKey((x, y) => x + y)
    val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
[size=10.5000pt]
    // 以单词出现的次数进行降序排序
    val sorted: RDD[(String, Int)] = sumed.sortBy(_._2, false)
[size=10.5000pt]
    // 打印到控制台
//    println(sorted.collect.toBuffer)
//    sorted.foreach(x => println(x))
//    sorted.foreach(println)
[size=10.5000pt]
    // 把结果存储到HDFS
    sorted.saveAsTextFile(args(1))
[size=10.5000pt]
    // 释放资源
    sc.stop()
  }
[size=10.5000pt]}
打包后上传Linux
1.首先启动zookeeper,hdfs和Spark集群
启动hdfs
/usr/local/hadoop-2.6.1/sbin/start-dfs.sh
启动spark
/usr/local/spark-1.6.1-bin-hadoop2.6/sbin/start-all.sh

2.使用spark-submit命令提交Spark应用(注意参数的顺序)
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
--class com.qf.spark.WordCount \
--master spark://node01:7077 \
--executor-memory 2G \
--total-executor-cores 4 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node01:9000/words.txt \
hdfs://node01:9000/out

3.查看程序执行结果
hdfs dfs -cat hdfs://node01:9000/out/part-00000

javaSparkWC
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
[size=10.5000pt]
import java.util.Arrays;
import java.util.List;
[size=10.5000pt]
public class JavaSparkWC {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaSparkWC").setMaster("local[1]");

        //提交任务入口类
        JavaSparkContext jsc = new JavaSparkContext(conf);
[size=10.5000pt]
        //获取数据
        JavaRDD<String> lines = jsc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt");
        //切分数据
        JavaRDD<String> words =
                lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                List<String> splited = Arrays.asList(s.split(" ")); //生成list
                return splited;
            }
        });
[size=10.5000pt]
        //生成元祖                               //一对一组 ,(输入单词,输出单词,输出1)
        JavaPairRDD<String, Integer> tuples =
                words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
[size=10.5000pt]
        //聚合                                                  //2个相同key的value,聚合
        JavaPairRDD<String, Integer> sumed =
                tuples.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
[size=10.5000pt]
        //此前key为String类型,没有办法排序
        //Java api并没有提供sortBy算子,此时需要把两个值位置调换,排序完成后,在换回来
        final JavaPairRDD<Integer, String> swaped =
                sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
//                return new Tuple2<Integer, String>(tup._2, tup._1);
                return tup.swap(); //swap(),交换方法
            }
        });
[size=10.5000pt]
        //降序排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
        //再次交换
        JavaPairRDD<String, Integer> res = sorted.mapToPair(
            new PairFunction<Tuple2<Integer, String>, String, Integer>() {
               @Override
               public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)throws Exception {
                    return tup.swap();
               }
        });
[size=10.5000pt]
        System.out.println(res.collect());
[size=10.5000pt]
        jsc.stop();//释放资源
    }
[size=10.5000pt]}
好程序员大数据培训官网:http://www.uvidit.com/

精彩内容,一键分享给更多人!
收藏
收藏0
转播
转播
分享
淘帖0
支持
支持0
反对
反对0
您需要登录后才可以回帖

本版积分规则

关注加拿大28
好程序员
千锋好程序员

北京校区(总部):北京市海淀区宝盛北里西区28号中关村智诚科创大厦

深圳西部硅谷校区:深圳市宝安区宝安大道5010号深圳西部硅谷B座A区605-619

杭州龙驰智慧谷校区:浙江省杭州市下沙经济技术开发区元成路199号龙驰智慧谷B座7层

郑州校区:郑州市二七区航海中路60号海为科技园C区10层、12层

Copyright 2007-2019 北京千锋互联科技加拿大28 .All Right

京ICP备12003911号-5 京公安网11010802011455号

请您保持通讯畅通1对1咨询马上开启