博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
52、Spark Streaming之输入DStream之基础数据源以及基于HDFS的实时wordcount程序
阅读量:5219 次
发布时间:2019-06-14

本文共 4170 字,大约阅读时间需要 13 分钟。

一、概述

1、Socket:之前的wordcount例子,已经演示过了,StreamingContext.socketTextStream()2、HDFS文件基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。streamingContext.fileStream
(dataDirectory)streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。

二、代码实现

1、java实现

package cn.spark.study.streaming;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;public class HDFSWordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local[2]")                .setAppName("WordCount");                  JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));                // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流        JavaDStream
lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir"); // 执行wordcount操作 JavaDStream
words = lines.flatMap(new FlatMapFunction
() { private static final long serialVersionUID = 1L; @Override public Iterable
call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream
pairs = words.mapToPair(new PairFunction
() { private static final long serialVersionUID = 1L; @Override public Tuple2
call(String word) throws Exception { return new Tuple2
(word, 1); } }); JavaPairDStream
wordcounts = pairs.reduceByKey(new Function2
() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordcounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); }}###运行脚本[root@spark1 streaming]# cat hdfswordcount.sh /usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \--class cn.spark.study.streaming.HDFSWordCount \--num-executors 3 \--driver-memory 100m \--executor-memory 100m \--executor-cores 3 \--files /usr/local/hive/conf/hive-site.xml \--driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \/usr/local/spark-study/java/streaming/saprk-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \##此时打包上传,启动运行脚本,他就会一直监视hdfs的指定目录##把准备好的文件上传到hdfs,程序会马上读取到,并统计出来hdfs dfs -mkdir /wordcount_dirhdfs dfs -put t1.txt /wordcount_dir/t1.txt

2、scala实现

package cn.spark.study.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsobject HDFSWordCount {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount")    val ssc = new StreamingContext(conf, Seconds(3))        val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")    val words = lines.flatMap(_.split(" "))    val pairs = words.map(word => (word, 1))    val wordCounts = pairs.reduceByKey(_ + _)    wordCounts.print()        ssc.start()    ssc.awaitTermination()      }}##运行脚本[root@spark1 streaming]# cat hdfswordcount.sh /usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \--class cn.spark.study.streaming.HDFSWordCount \--num-executors 3 \--driver-memory 100m \--executor-memory 100m \--executor-cores 3 \--files /usr/local/hive/conf/hive-site.xml \--driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \/usr/local/spark-study/scala/streaming/spark-study-scala.jar \##打包--上传,运行脚本##程序会监控着hdfs目录,此时上传一个新文件到hdfs,程序会马上读取到并统计出来hdfs dfs -put t2.txt /wordcount_dir/t2.txt

转载于:https://www.cnblogs.com/weiyiming007/p/11338880.html

你可能感兴趣的文章
Idea 提交代码到码云(提交到github也大同小异)
查看>>
c#连接excel2007未安装ISAM解决
查看>>
Mono 异步加载数据更新主线程
查看>>
初识lua
查看>>
我是插件狂人,jDuang,jValidator,jModal,jGallery
查看>>
张季跃 201771010139《面向对象程序设计(java)》第四周学习总结
查看>>
如何解除循环引用
查看>>
android中fragment的使用及与activity之间的通信
查看>>
字典【Tire 模板】
查看>>
jquery的contains方法
查看>>
python3--算法基础:二分查找/折半查找
查看>>
Perl IO:随机读写文件
查看>>
Perl IO:IO重定向
查看>>
转:基于用户投票的排名算法系列
查看>>
WSDL 详解
查看>>
[转]ASP数组全集,多维数组和一维数组
查看>>
C# winform DataGridView 常见属性
查看>>
逻辑运算和while循环.
查看>>
Nhiberate (一)
查看>>
c#后台计算2个日期之间的天数差
查看>>