这期内容当中小编将会给大家带来有关ApacheFlink中Flink数据流编程是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
目前累计服务客户上1000+,积累了丰富的产品开发及服务经验。以网站设计水平和技术实力,树立企业形象,为客户提供成都网站设计、网站制作、网站策划、网页设计、网络营销、VI设计、网站改版、漏洞修补等服务。成都创新互联公司始终以务实、诚信为根本,不断创新和提高建站品质,通过对领先技术的掌握、对创意设计的研究、对客户形象的视觉传递、对应用系统的结合,为客户提供更好的一站式互联网解决方案,携手广大客户,共同发展进步。
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFile(path) readFile(),当然,也可以写一个自定义的数据源(可以通过实现SourceFunction方法,但是无法并行执行。或者实现可以并行实现的接口ParallelSourceFunction或者继承RichParallelSourceFunction)
入门
首先做一个简单入门,建立一个DataStreamSourceApp
Scala
object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment socketFunction(env) env.execute("DataStreamSourceApp") } def socketFunction(env: StreamExecutionEnvironment): Unit = { val data=env.socketTextStream("192.168.152.45", 9999) data.print() } }
这个方法将会从socket中读取数据,因此我们需要在192.168.152.45中开启服务:
nc -lk 9999
然后运行DataStreamSourceApp,在服务器上输入:
iie4bu@swarm-manager:~$ nc -lk 9999 apache flink spark
在控制台中也会输出:
3> apache 4> flink 1> spark
前面的 341表示的是并行度。可以通过设置setParallelism来操作:
data.print().setParallelism(1)
Java
public class JavaDataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); socketFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void socketFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSourcedata = executionEnvironment.socketTextStream("192.168.152.45", 9999); data.print().setParallelism(1); } }
自定义添加数据源方式
Scala
实现SourceFunction接口
这种方式不能并行处理。
新建一个自定义数据源
class CustomNonParallelSourceFunction extends SourceFunction[Long]{ var count=1L var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning){ ctx.collect(count) count+=1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
这个方法首先定义一个初始值count=1L,然后执行的run方法,方法主要是输出count,并且执行加一操作,当执行cancel方法时结束。调用方法如下:
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) nonParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction()) data.print() }
输出结果就是控制台一直输出count值。
无法设置并行度,除非设置并行度是1.
val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)
那么控制台报错:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55) at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16) at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11) at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)
继承ParallelSourceFunction方法
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{ var isRunning = true var count = 1L override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while(isRunning){ ctx.collect(count) count+=1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning=false } }
方法的功能跟上面是一样的。main方法如下:
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) parallelSourceFunction(env) env.execute("DataStreamSourceApp") } def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3) data.print() }
可以设置并行度3,输出结果如下:
2> 1 1> 1 2> 1 2> 2 3> 2 3> 2 3> 3 4> 3 4> 3
继承RichParallelSourceFunction方法
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] { var isRunning = true var count = 1L override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) // parallelSourceFunction(env) richParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3) data.print() }
Java
实现SourceFunction接口
import org.apache.flink.streaming.api.functions.source.SourceFunction; public class JavaCustomNonParallelSourceFunction implements SourceFunction{ boolean isRunning = true; long count = 1; @Override public void run(SourceFunction.SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); nonParallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()); data.print().setParallelism(1); }
当设置并行度时:
DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);
那么报错异常:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55) at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16) at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)
实现ParallelSourceFunction接口
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; public class JavaCustomParallelSourceFunction implements ParallelSourceFunction{ boolean isRunning = true; long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment); parallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2); data.print().setParallelism(1); }
可以设置并行度,输出结果:
1 1 2 2 3 3 4 4 5 5
继承抽象类RichParallelSourceFunction
public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction{ boolean isRunning = true; long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment); // parallelSourceFunction(environment); richpParallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2); data.print().setParallelism(1); }
输出结果:
1 1 2 2 3 3 4 4 5 5 6 6
SourceFunction ParallelSourceFunction RichParallelSourceFunction类之间的关系
上述就是小编为大家分享的ApacheFlink中Flink数据流编程是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
分享标题:ApacheFlink中Flink数据流编程是怎样的
文章地址:http://scpingwu.com/article/gigcjs.html