import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object MultiDataStreamExample { def main(args: Array[String]): Unit = { //设置环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() //设置数据源 val valueDeserializer: SimpleStringSchema = new SimpleStringSchema; val kafkaProps = new Properties() kafkaProps.setProperty("bootstrap.servers", "node01:9092,node02:9092,node-3:9092") kafkaProps.setProperty("group.id", "test") val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps) val hello1Stream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello1", valueDeserializer, kafkaProps) val hello2Stream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello2", valueDeserializer, kafkaProps) val s1: DataStream[String] = env.addSource(helloStream) val s2: DataStream[String] = env.addSource(hello1Stream) val s3: DataStream[String] = env.addSource(hello2Stream) val allStream: DataStream[String] = s1.union(s2).union(s3) //计算逻辑 val flatMapDataStream: DataStream[String] = allStream.flatMap(_.split(" ")) val mapDataStream: DataStream[(String, Int)] = flatMapDataStream.map((_, 1)) val keyedStream: KeyedStream[(String, Int), String] = mapDataStream.keyBy(_._1) val reduceDataStream: DataStream[(String, Int)] = keyedStream.reduce((x, y) => { print(x._2) (x._1, x._2 + y._2) }) //输出结果 reduceDataStream.addSink(x => { print(x) }) //提交任务 env.execute("word count") } }