Kafka- Spark消费Kafka

 

在高版本的API中

复制代码
val brokers = properties.getProperty(".host.list")
val topics = Set(properties.getProperty(".application.topic"))
val kafkaParams = Map[String, String](
  "bootstrap.servers"           -> brokers,
  "group.id"                    -> "ntaflowgroup",
  "auto.commit.interval.ms"     -> "1000",
  "key.deserializer"            -> "org.apache..common.serialization.StringDeserializer",
  "value.deserializer"          -> "org.apache..common.serialization.StringDeserializer",
  "auto.offset.reset"           -> "latest",
  "enable.auto.commit"          -> "true"
)
val ntaflowCache: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
复制代码

 

 

 

人已赞赏
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
有新消息 消息中心
搜索