RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
结构化Kafkasql的代码框架是怎样的

本篇文章给大家分享的是有关结构化Kafka sql的代码框架是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

创新互联专注于企业成都营销网站建设、网站重做改版、湖南网站定制设计、自适应品牌网站建设、H5技术商城建设、集团公司官网建设、成都外贸网站建设公司、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为湖南等各大城市提供网站开发制作服务。

结构化流的典型应用是持续的读取kafka流。实现机制从SparkSession的readStream开始,readStream就是DataStreamReader:

def readStream: DataStreamReader = new DataStreamReader(self)

下面从DataStreamReader开始。可以想象得到,最终肯定是生成一个RDD来持续读取kafka流数据。

例子:

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

分两步:找到TableProvider;找到SupportRead然后生成StreamingRelationV2。

最后用StreamingRelationV2来调用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。

下面首先要看看TableProvider接口和SupportRead接口是啥东东。

TableProvider

TableProvider接口未找到在哪里定义。

KafkaSourceRDD

先看看kafkaSourceRDD这个类,这是基础类,最基础的来读取kafka数据的RDD,入参包含一个offsetRange,表示读取kafka数据的区间范围。如果是Kafka.lastest则可以表示永久读取kafka。

既然是RDD,那么最重要的方法就是compute方法了,代码不解析了很简单,就是用Kafka的API来读取kafka分区的数据,形成RDD。

KafkaSource

KafkaSource顾名思义就是Kafka的读取者。

KafkaSource的父类是Source,最重要的方法是:getOffset和getBatch。

getBatch返回DataFrame,那么getBatch又是怎么返回DataFrame的呢?看代码就知道原来是通过创建KafkaSourceRDD来达到生成DataFrame的目的的。所以可以认为KafkaSource是KafkaSourceRDD的一种封装形式罢了。

KafkaSourceProvider

The provider class for all Kafka readers and writers。这个类是用来生成各种各样的Kafka的读取者和写入者的,比较重要,先看看这个类的定义:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

    with StreamSourceProvider

    with StreamSinkProvider

    with RelationProvider

    with CreatableRelationProvider

    with TableProvider

    with Logging 

继承了很多的特性或接口。比如:StreamSourceProvider、TableProvider、RelationProvider等等。我们这里就看看和读相关的特性吧,和写相关的不看了(道理差不多)。

(1)createSource

createSource方法返回Source,看代码其实返回的是KafkaSource,KafkaSource前面已经说过了,这里就不涉及了。

(2)createRelation

createRelation返回BaseRelation,实际返回的是KafkaRelation。

KafkaRelation继承BaseRelation,重写父 类的buildScan方法,buildScan方法返回KafkaSourceRDD作为RDD[Row]。

(3)KafkaTable

KafkaTable继承Table并且继承SupportsRead特性,其定义:

class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite 

里面辗转反侧看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。

(4)KafkaContinuousStream

KafkaContinuousStream继承自ContinuousStream,具体的看代码,最后反正都是调用了Kafka的API来读取数据,所不同的只是外部表现形式的不同罢了。

以上就是结构化Kafka sql的代码框架是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


当前名称:结构化Kafkasql的代码框架是怎样的
文章链接:http://scpingwu.com/article/jcscjs.html