我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题并分别使用FlinkKafkaProducer09和FlinkKafkaConsumer09从相同的kafka主题读取数据.我正在传递产品中的测试数据:
DataStream<String> stream = env.fromElements("tom","jerry","bill");

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom","bill");
List<String> result =  resultSink.getResult();
assertEquals(expected,result);

使用TestListResultSink.

我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.

在Flink或FlinkKafkaConsumer09中以任何方式停止进程或运行特定时间吗?

解决方法

潜在的问题是流媒体程序通常不是有限的并且无限期地运行.

至少在目前,最好的方法是在流中插入一条特殊的控制消息,让源正确终止(只需通过离开读取循环停止读取更多数据).这样Flink就会告诉所有下游操作符,他们可以在消耗完所有数据后停止运营.

或者,您可以在源中引发特殊异常(例如,在一段时间之后),以便您可以区分“正确”终止与故障情况(通过检查错误原因).在源代码中抛出异常将使程序失败.

junit – 如何从程序中停止flink流式传输作业的更多相关文章

  1. 如何使用多对等连接将摄像头从一个iOS设备流式传输到另一个设备

    我们如何在iOS7中使用蓝牙或wifi有效地将摄像头源从一个iOS设备传输到另一个iOS设备.下面是获取流缓冲区的代码.在这里,我们可以获得iOS相机捕获的图像.我们是否可以使用多个对等方将样本缓冲区信息直接发送到另一个设备,或者是否有任何有效的方法将数据传输到其他iOS设备?

  2. 在下载iOS时流式传输视频

    有人知道可能导致这个问题吗?我知道一些文件系统支持“文件大小”和“磁盘大小”作为两个不同的属性…不知道如果iOS有这样的东西?

  3. ios – 如何执行硬件加速的H.264编码和解码流式传输?

    我可以从相机获取RGBA帧数据,我想以H.264格式进行编码.我使用FFmpeg对H.264视频进行编码和解码,但是在640×480的帧大小中,我的需求太慢了.我想使用硬件加速来加快编码和解码,那么我该怎么做呢?此外,我需要能够通过网络流式传输编码的视频,并在另一端进行解码.如何才能做到这一点?

  4. 10分钟后,iOS 8将在后台停止流式传输音频

    我有一个从SHOUTcast服务器播放流音频的应用程序.当应用程序在前台并且禁用自动锁定时,一切正常.该应用程序还可以在后台播放音频,此功能在iOS6和iOS7上一直运行正常.但是现在我的用户报告说,在升级到iOS8后约10分钟后,背景音频会停止.我可以通过在iOS8上运行应用程序自己来重现问题.由于应用程序本身很复杂,所以我做了一个简单的演示来显示问题.我使用Xcode6,BaseSDK设置为iOS8.我已经在我的Info.plist中添加了uibackgroundmodes的音频.有人知道下面的代码有

  5. Swift 不完全函数第 1 部分:如何避免

    实际上,预处理并避免不完全函数能够让我们的程序无论在如何情况下都能可靠地运行。Swift有一个函数precondition就是用来干这个的:测试条件是否满足,并在不满足的情况下触发一个致命错误。事实上在Swift标准库中,几乎每个Swift程序都在间接地使用这种方式,包括了各种和precodition函数类似的断言。不完全函数前置条件会让函数中的某个参数的取值范围缩小为函数签名中指明的范围的一部分。在不完全函数中,已定义的输入值的子集称作已定义域。

  6. Android studio – Faild to resolve:com.android.support:design:26.0.1错误

    我有一个错误叫:我的androidstudio版本是3.0beta1.我的gradle文件如下:我想把“设计”放到我的项目中,但我不能这样做.我该怎么做?解决方法尝试改变和或者不要更改为bulidToolsversion更改依赖项

  7. 在外部蓝牙设备和Android手机之间流式传输音频

    我正在考虑构建一个Android应用程序,允许通过蓝牙与Android手机之间传输音频.我之前在Stackoverflow上读过类似的问题,并且基于我的理解,A2DP不能用作Android不能成为A2DP接收器.从理论上讲,我认为像HFP或HFPAG这样的配置文件应该允许双向音频流.在AndroidAPI中,有没有办法强制蓝牙配对跟随此配置文件,或者OS是否决定使用哪个配置文件?

  8. Android单元测试的正确方法

    我知道这对所有Android开发者来说都是一个乏味的话题.但究竟什么是Android测试的正确方法?解决方法这个问题和我的回答,与Android没有任何关系,但这是一件好事.我略微修改了你的假设,但原理是一样的.>70%单元测试>20%集成测试>10%UI测试应该是70%吗?最后,检查您的UI是否按预期工作.你在这个级别显示什么并不重要.只要在用户未登录您的罚款时显示登录屏幕.这通常被称为TestPyramid,是你所描述的,只是减去了明显的比例.

  9. android – 使用Robolectric测试ActionBarActivity时出错

    我正在使用Robolectric来测试我的应用程序中的活动.该活动扩展了ActionBaractivity.当我测试活动时,我得到错误:“IllegalStateException:你需要在这个活动中使用Theme.AppCompat主题(或后代).”我的测试环境是:AndroidStudio0.8.6,Robolectric2.4-SNAPSHOT,support-v4库版本20.0,appco

  10. android – Galaxy s3上的媒体播放器1004错误

    我有一个视频视图,显示来自网络的内容.当我使用Android4.2玩nexus7时它工作正常,但是当我在GS3上播放时,会返回错误1004IO错误.有谁知道为什么会这样?

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部