RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
SparkStreaming是什么

这篇文章主要讲解了“Spark Streaming是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark Streaming是什么”吧!

十多年的大通网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站建设的优势是能够根据用户设备显示端的尺寸不同,自动调整大通建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联从事“大通网站设计”,“大通网站推广”以来,每个客户项目都认真落实执行。

一:Spark Streaming 概览。

1.1    简单了解 Spark Streaming。

         Spark Streaming 是核心 Spark API的一个扩展。具有可扩展性,高吞吐量,容错性,实时性等特征。

        数据从许多来如中摄入数据,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets。

也可以使用复杂的算法与高级别的功能像map,reduce,join和window处理。

        最后,也可以将处理过的数据推送到文件系统、数据库。事实上,我们也可以用Spark 的机器学习和图形处理数据流上的算法。用图形表示如下:Spark Streaming是什么
        在内部,其工作原理如下。Spark Streaming接收实时输入的数据流和数据划分批次,然后由Spark引擎批处理生成的最终结果流。如图示:  Spark Streaming是什么

      另外,Spark Streaming提供一个高级抽象,称为离散的流或 DStream,表示连续的流的数据。DStreams 可以被创建从输入的数据流,如Kafka, Flume, and Kinesis,

        或采用其他的DStreams高级别操作的输入的数据流。

        在内部,DStream 是以 RDDs 的序列来表示。

首先,看看Maven的依赖包(spark-streaming_2.10)管理:

        
			org.apache.spark
			spark-streaming_2.10
			1.6.1
		

1.2    eg:从一个数据服务器监听 TCP 套接字接收的文本数据中的单词进行计数

package com.berg.spark.test5.streaming;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

public class SparkStreamingDemo1 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 创建一个DStream, 将连接 hostname:port, 比如 master:9999
		JavaReceiverInputDStream lines = jssc.socketTextStream("master", 9999);
		System.out.println("lines : " + lines);

		JavaDStream words = lines.flatMap(new FlatMapFunction() {

			private static final long serialVersionUID = 1L;

			public Iterable call(String x) {
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream pairs = words.mapToPair(new PairFunction() {
			public Tuple2 call(String s) {
				return new Tuple2(s, 1);
			}
		});
		JavaPairDStream wordCounts = pairs.reduceByKey(new Function2() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to
		// the console
		wordCounts.print();

		jssc.start();  // Start the computation
		jssc.awaitTermination();  // Wait for the computation to terminate
	}
}

至于代码如何运行了,首先在Linux下终端输入:$ nc -lk 9999

然后在Eclipse中运行代码 。

随意输入一行文本单词,单词之间用空格隔开,如下:

hadoop@master:~$ nc -lk 9999
berg hello world berg hello

可以在Eclipse控制台看到如下结果:

Time: 1465386060000 ms
-------------------------------------------
(hello,2)
(berg,2)
(world,1)

1.3 将HDFS目录下的某些文件内容当做 输入的数据流。

public class SparkStreamingDemo2 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 创建一个DStream, 读取HDFS上的文件,作为数据源。
		JavaDStream lines = jssc.textFileStream("hdfs://master:9000/txt/sparkstreaming/");

		System.out.println("lines : " + lines);

		// Split each line into words
		JavaDStream words = lines.flatMap(new FlatMapFunction() {
			private static final long serialVersionUID = 1L;

			public Iterable call(String x) {
				System.out.println(Arrays.asList(x.split(" ")).get(0));
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream pairs = words.mapToPair(new PairFunction() {
			private static final long serialVersionUID = 1L;

			public Tuple2 call(String s) {
				return new Tuple2(s, 1);
			}
		});
		System.out.println(pairs);
		
		JavaPairDStream wordCounts = pairs.reduceByKey(new Function2() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to the console
		wordCounts.print();
		
		JavaDStream count = wordCounts.count();
		count.print(); // 统计
		
		DStream>  dstream = wordCounts.dstream();
		dstream.saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		//wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		
		jssc.start();  
		jssc.awaitTermination();   // Wait for the computation to terminate
	}
}

上述代码完成的操作是,一直监听HDFS即hdfs://master:9000/txt/sparkstreaming/目录下是否有文件存入,如果有,则统计文件中的单词。。。。

尝试运行程序后,然后往该目录中手动添加一个文件,会在控制台看到对该文件内容中的单词统计后的数据。

注意参数的意义:


 public JavaDStream textFileStream(java.lang.String directory)
  Create an input stream that monitors a Hadoop-compatible filesystem for 
             new files and reads them as text 
                      files (using key as LongWritable, value as Text and input format as TextInputFormat).
                 Files must be written to the monitored directory 
                 by "moving" them from another location within the same file system. 
                 File names starting with . are ignored.
 Parameters:
 directory - HDFS directory to monitor for new file
 Returns:
 (undocumented)

感谢各位的阅读,以上就是“Spark Streaming是什么”的内容了,经过本文的学习后,相信大家对Spark Streaming是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


新闻标题:SparkStreaming是什么
文章路径:http://scpingwu.com/article/giijso.html