宝塔服务器面板,一键全能部署及管理,送你10850元礼包,点我领取

这篇文章主要介绍“Dstream的创建方法”,在日常操作中,相信很多人在Dstream的创建方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Dstream的创建方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1. RDD队列(了解)

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例

object SparkStreaming02_RDDQueue {
  def main(args: Array[String]): Unit = {
    //创建配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")
    //创建SparkStreaming上下文环境对象
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    //创建队列,里面放的是RDD
    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
    //从队列中采集数据,获取DS
    val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false)
    //处理采集到的数据
    val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_)
    //打印结果
    resDS.print()
    //启动采集器
    ssc.start()
    //循环创建RDD,并将创建的RDD放到队列里
    for( i <- 1 to 5){
      rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10))
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }
}

2. 自定义数据源(某些场景需要自定义)

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

用一个案例来说明

/**
  * Author: Felix
  * Date: 2020/5/20
  * Desc: 通过自定义数据源方式创建DStream
  *     模拟从指定的网络端口获取数据
  */
object SparkStreaming03_CustomerReceiver {
  def main(args: Array[String]): Unit = {
    //创建配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")

    //创建SparkStreaming上下文环境对象
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))

    //通过自定义数据源创建Dstream
    val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999))

    //扁平化
    val flatMapDS: DStream[String] = myDS.flatMap(_.split(" "))

    //结构转换  进行计数
    val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))

    //聚合
    val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)

    //打印输出
    reduceDS.print

    ssc.start()

    ssc.awaitTermination()
  }
}

//Receiver[T]  泛型表示的是 读取的数据类型
class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  private var socket: Socket = _

  // 真正的处理接收数据的逻辑
  def receive() {
    try {
      //创建连接
      socket = new Socket(host,port)
      //根据连接对象获取输入流
      val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
      //定义一个变量,用于接收读取到的一行数据
      var input:String = null
      while((input = reader.readLine())!= null){
        store(input)
      }
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    } finally {
      onStop()
    }
  }
  override def onStart(): Unit = {
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  override def onStop(): Unit = {
    synchronized {
      if (socket != null) {
        socket.close()
        socket = null
      }
    }
  }
}

3. kafka数据源(重要)

1. 版本选型

Dstream的创建方法-冯金伟博客园

2. Kafka 0-8 Receive模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  1. 编写代码 0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看

object Spark04_ReceiverAPI {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.使用ReceiverAPI读取Kafka数据创建DStream
    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
      "hadoop202:2181,hadoop203:2181,hadoop204:2181",
      "bigdata",
      //v表示的主题的分区数
      Map("mybak" -> 2))

    //4.计算WordCount并打印        new KafkaProducer[String,String]().send(new ProducerRecord[]())
    val lineDStream: DStream[String] = kafkaDStream.map(_._2)
    val word: DStream[String] = lineDStream.flatMap(_.split(" "))
    val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
    val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
    wordToCountDStream.print()

    //5.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

3. Kafka 0-8 Direct模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  1. 编写代码(自动维护offset1)

offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息

object Spark05_DirectAPI_Auto01 {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")

    //3.准备Kafka参数
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )

    //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))

    //5.计算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 编写代码(自动维护offset2)

offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate

这种方式缺点:

  • checkpoint小文件过多

  • checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次

object Spark06_DirectAPI_Auto02 {

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)

    ssc.start()
    ssc.awaitTermination()
  }

  def getStreamingContext: StreamingContext = {
    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")
    
    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
    
    //3.准备Kafka参数
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )
    
    //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))
    
    //5.计算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()
    
    //6.返回结果
    ssc
  }
}
  1. 编写代码(手动维护offset)

object Spark07_DirectAPI_Handler {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //3.创建Kafka参数
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )

    //4.获取上一次消费的位置信息
    val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](
      TopicAndPartition("mybak", 0) -> 13L,
      TopicAndPartition("mybak", 1) -> 10L
    )

    //5.使用DirectAPI手动维护offset的方式消费数据
    val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc,
      kafkaParams,
      fromOffsets,
      (m: MessageAndMetadata[String, String]) => m.message())

    //6.定义空集合用于存放数据的offset
    var offsetRanges = Array.empty[OffsetRange]

    //7.将当前消费到的offset进行保存
    kafakDStream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        println(s"${o.fromOffset}-${o.untilOffset}")
      }
    }

    //8.开启任务
    ssc.start()
    ssc.awaitTermination()

  }
}

4. Kafka 0-10 Direct模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖,为了避免和0-8冲突,我们新建一个module演示

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

3)编写代码

object Spark01_DirectAPI010 {
  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //3.构建Kafka参数
    val kafkaParmas: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    //4.消费Kafka数据创建流
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))

    //5.计算WordCount并打印
    kafkaDStream.map(_.value())
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.启动任务
    ssc.start()
    ssc.awaitTermination()

  }

}

5. 消费Kafka数据模式总结

  1. 0-8 ReceiverAPI:

  • 1)专门的Executor读取数据,速度不统一

  • 2)跨机器传输数据,WAL

  • 3)Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union

  • 4)offset存储在Zookeeper中

  1. 0-8 DirectAPI:

  • 1)Executor读取数据并计算

  • 2)增加Executor个数来增加消费的并行度

  • 3)offset存储

    • a)CheckPoint(getActiveOrCreate方式创建StreamingContext)

    • b)手动维护(有事务的存储系统)

    • c)获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  1. 0-10 DirectAPI:

  • 1)Executor读取数据并计算

  • 2)增加Executor个数来增加消费的并行度

  • 3)offset存储

    • i.a.__consumer_offsets系统主题中

    • ii.b.手动维护(有事务的存储系统)