- 1.转换算子:
- 案例需求:
- sparkstreaming + kafka 整合 :
- 版本选择:
- 2.spark整合kafka api:
- 查看kafka topic命令:
- sparkstreaming里面: 开发模式:***
- 3.提交offset信息
- kafka消费语义:
- 存储offset:
transform
DStream 和 rdd之间数据进行交互的算子
流处理 数据源:
一个数据来自于 mysql数据/hdfs上文本数据 【量小】 从表/维表
一个数据 来自于 kafka sss 读取形成 DStream数据 【量大】 主业务 =》 主表
案例需求:弹幕 过滤的功能 /黑名单的功能
离线:
弹幕: 主表
不好看
垃圾
男主真帅
女主真好看
666
过滤的弹幕:维表
热巴真丑
鸡儿真美
王鹤棣退出娱乐圈
实时:
sparkstreaming + kafka 整合 :kafka =》 sparkstreaming
版本选择:spark 2.x : kafka版本: 0.8 0.10.0 or higher ok
spark 3.x =>kafka : 1.kafka版本: 0.10.0 or higher ok
spark 去kafka读取数据的方式:
1.kafka 0.8 reciver方式读取kafka数据 【效率低 、代码开发复杂】
2.kafka 0.10.0版本之后 direct stream的方式加载kafka数据 【效率高、代码开发简单】
kafka:
版本也有要求: 0.11.0 版本之后
交付语义: consumer producer
producer 默认就是精准一次
consumer 交付语义取决于 consumer 框架本身
交付语义: consumer
至多一次 数据丢失问题
至少一次 数据不会丢失,数据重复消费
精准一次 数据不会丢失 数据也不会重复消费
spark 整合kafka 版本 0.10.0版本之后:
1.kafka 0.11.0之后 2.2.1 =>direct stream
2.sparkstreaming 默认消费kafka数据 交付语义:
至少一次
- spark消费kafka, DStream 【rdd 分区数】 =》 kafka topic 分区数 是一一对应的
1:1 correspondence between Kafka partitions and Spark partitions,
1.simple API =》 过时不用了
- new Kafka consumer API 整合 kafka 主流
3.引入依赖: org.apache.spark spark-streaming-kafka-0-10_2.12 3.2.1
!!!不需要引入 kafka-clients 依赖
查看kafka topic命令:kafka-topics.sh --list
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
kafka-topics.sh --create
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
–topic spark-kafka01 --partitions 3 --replication-factor 1
producer:
kafka-console-producer.sh
–broker-list bigdata33:9092,bigdata34:9092
–topic spark-kafka01
consumer:
kafka-console-consumer.sh
–bootstrap-server bigdata33:9092,bigdata34:9092
–topic spark-kafka
–from-beginning
val kafkaParams = Map[String, Object](
“bootstrap.servers” ->“bigdata33:9092,bigdata34:9092”,
“key.deserializer” ->classOf[StringDeserializer],
“value.deserializer” ->classOf[StringDeserializer],
“group.id” ->“dl2262_01”,
“auto.offset.reset” ->“latest”,
“enable.auto.commit” ->(false: java.lang.Boolean)
)
需求:
消费kafka数据 wc 将 结果写到 mysql里面
input
todo
output
kafka =>spark =>mysql 链路打通了
模拟:spark作业挂掉 =》 重启
“消费完kafka的数据 程序重启之后接着从上次消费的位置接着消费 ”
目前: code不能满足
1.目前代码 这两个参数 不能动
“auto.offset.reset” ->“earliest”
“enable.auto.commit” ->(false: java.lang.Boolean)
2.主要原因 : spark作业 消费kafka数据:
1.获取kafka offset =》 处理kafka数据 =》 “提交offset的操作” 没有
解决:
1.获取kafka offset // todo
2. 处理kafka数据
3.提交offset的操作 // todo
1.获取kafka offset // todo
1. kafka offset 信息
2.spark rdd分区数 和 kafka topic 的分区数 是不是 一对一
报错:
org.apache.spark.rdd.ShuffledRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges
ShuffledRDD =》 HasOffsetRanges 说明 代码有问题
sparkstreaming里面: 开发模式:*** 1.获取kafka 流数据
2. 流 Dstream =》 调用foreachRDD算子 进行输出:
0.获取offset 信息
1.做业务逻辑
2.结果数据输出
3.提交offset信息
offset解释:
01 batch:
rdd的分区数:3
topic partition fromOffset untilOffset
spark-kafka01 0 0 1
spark-kafka01 1 0 1
spark-kafka01 2 0 0
02 batch:
rdd的分区数:3
topic partition fromOffset untilOffset
spark-kafka01 0 1 1
spark-kafka01 1 1 1
spark-kafka01 2 0 0
此时 kafka 里面数据已经消费完了 fromOffset=untilOffset
3.提交offset信息2.存储offset信息
spark流式处理 默认消费语义 : 至少一次
精准一次:
1.output + offset 同时完成
1.生产上Checkpoints不能用
2.Kafka itself =》至少一次
推荐使用 =》 简单 高效
90% 都可以解决 10% 精准一次
3.Your own data store: =》 开发大量代码 =》
mysql、redis、hbase、
至少一次
精准一次
mysql:
获取offset
todo
output
提交offset
spark作业挂了 =》 启动spark作业 :
1.从mysql里面获取offset
todo
output
提交offset
1.至多一次 【丢数据】
2.至少一次 【不会丢数据 可能会重复消费数据】
3.精准一次 【不丢、不重复消费】
offset信息提交 :
1.spark todo :
至少一次:
1 2 3 4
offset get
业务逻辑 output db
提交offset
精准一次:output + 提交offset 一起发生 =》 事务来实现
事务: 一次操作要么成功 要么失败
topic partition fromOffset untilOffset
spark-kafka01 0 3 3
spark-kafka01 2 2 2
spark-kafka01 1 2 2
kafka 本身:
offset 信息存储在哪?
kafka 某个topic下:
__consumer_offsets =》 spark作业 消费kafka的offset信息
topic offset 信息存储的地方
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网页题目:【SparkStreaming-创新互联
分享路径:http://scpingwu.com/article/dpgepc.html