Kafka- Spark消费Kafka

 

在高版本的API中

Kafka- Spark消费Kafka
val brokers = properties.getProperty("kafka.host.list")
val topics = Set(properties.getProperty("kafka.application.topic"))
val kafkaParams = Map[String, String](
  "bootstrap.servers"           -> brokers,
  "group.id"                    -> "ntaflowgroup",
  "auto.commit.interval.ms"     -> "1000",
  "key.deserializer"            -> "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer"          -> "org.apache.kafka.common.serialization.StringDeserializer",
  "auto.offset.reset"           -> "latest",
  "enable.auto.commit"          -> "true"
)
val ntaflowCache: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
Kafka- Spark消费Kafka

 

 

 

给TA打赏
共{{data.count}}人
人已打赏
博客

Linux - xshell上传文件报错乱码

2019-8-18 16:45:22

博客大数据

激活idea2018

2019-8-18 16:50:57

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索