对于同样面临使用Flink(v1.16/Scala 2.12)连接多个(>;2)流(可能不同的数据类型)问题的开发人员来说,这个线程应该是一个小的“入门”或起点。对于这个任务,我选择了级联方法(总是一个接一个地连接两个流)或专门开发的“联合类型”。使用MultipleInputStreamOperator Flink提供了一个涵盖此用例的工具。不幸的是,我没有找到任何示例或大量文档。因此,我决定写这篇文章,并提供一小段我的实现。这个例子并不完美,但它应该能让人理解这个想法。
首先,您需要一个扩展MultipleInputStreamOperator和AbstractStreamOperatorV2的类以及一个工厂。
class MultiInputJoinOperator[OUT](parameters: StreamOperatorParameters[OUT], nInputs: Int) (implicit outInfo: TypeInformation[OUT]) extends AbstractStreamOperatorV2[OUT](parameters, nInputs) with MultipleInputStreamOperator[OUT { override def getInputs = util.Arrays.asList(new ExampleInput1(this, 1)) class ExampleInput1 (owner: AbstractStreamOperatorV2[OUT], inputId: Int) extends AbstractInput[InputEvent, OUT](owner: AbstractStreamOperatorV2[OUT], inputId: Int) { override def processElement(element: StreamRecord[InputEvent]): Unit = println("InputEvent1: " + element.getValue.toString) } }
class MultiInputJoinOperatorFactory[OUT] (inputCount: Int) (implicit outInfo: TypeInformation[OUT]) extends AbstractStreamOperatorFactory[OUT] { def createStreamOperator[T <: StreamOperator[OUT]] (parameters: StreamOperatorParameters[OUT]): T = new MultiInputJoinOperator(parameters, inputCount).asInstanceOf[T] def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: StreamOperator[OUT]] = classOf[MultiInputJoinOperator[OUT]] }
然后您可以按以下方式组合使用它:
val transformOp = new KeyedMultipleInputTransformation[OutputEvent]( "MultipleInputOperator" , new MultiInputJoinOperatorFactory[OutputEvent](1) , Types.of[OutputEvent] //TypeOfOutputEvent , env.getParallelism , Types.of[String] //TypeOfKey ) transformOp.addInput(inputStream.javaStream.getTransformation(),new KeySelector[InputEvent,String] { override def getKey(value: InputEvent): String = value.keyAttribute }) transformOp.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES) env.getJavaEnv.addOperator(transformOp) val connectedStreams = new MultipleConnectedStreams(env.getJavaEnv).transform(transformOp) connectedStreams .name("MultiInputStreamOperation") .map(x => doSomethingWithIt(x)) .print()
我很高兴从您与操作员的经验中获得任何反馈或输入。
诚挚的问候Dominik