Flink消费Kafka报错:NoOffsetForPartitionException: Undefined offset with no reset policy for partitions

Flink-1.14.2

Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test-0, test-1]
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:621)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
    at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
    at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:375)
    at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:260)
    at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more


解决方法:

方法一:

使用命令手动创建一个消费组 ./kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –group <group> –topic <topic>

方法二:

在kafka的connector配置中,显示指定scan.startup.mode=latest-offset

参考:

https://github.com/apache/flink/pull/17588