uncategorized

Kafka Consumer offset reset ํ•˜๊ธฐ

date: 2018-11-22 16:25:22

๊ทธ๋ฃน ์ง€์ •ํ•˜์—ฌ ํ† ํ”ฝ ์ˆ˜์‹ 

1
2
3
4
5
6
7
8
9
10
11
12
 โœ˜ jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group testGroup --from-beginning
aaa
bbb
ccc
```

## ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน๋ณ„ offset ์ƒํƒœ ํ™•์ธ
```bash
โœ˜ jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-consumer-groups --bootstrap-server localhost:9092 --group testGroup --describe

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 3 3 0 consumer-1-39ea110a-7f65-4ec3-8a9d-22c82d8be469 /172.26.113.148 consumer-1
  • TOPIC: ํ† ํ”ฝ ์ด๋ฆ„
  • PARTITION: consumer group ๋‚ด์˜ ๊ฐ consumer๊ฐ€ ํ• ๋‹น๋œ ํŒŒํ‹ฐ์…˜ ๋ฒˆํ˜ธ
  • CURRENT-OFFSET: ํ˜„์žฌ consumer group์˜ consumer๊ฐ€ ๊ฐ ํŒŒํ‹ฐ์…˜์—์„œ ๋งˆ์ง€๋ง‰์œผ๋กœ offset์„ commitํ•œ ๊ฐ’
  • LOG-END-OFFSET: producer์ชฝ์—์„œ ๋งˆ์ง€๋ง‰์œผ๋กœ ์ƒ์„ฑํ•œ ๋ ˆ์ฝ”๋“œ์˜ offset
  • LAG: LOG-END-OFFSET์—์„œ CURRENT-OFFSET๋ฅผ ๋บ€ ๊ฐ’.

์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์˜ offset reset

1
kafka-consumer-groups --bootstrap-server localhost:9092 --group testGroup --topic test --reset-offsets --to-earliest --execute
  • โ€“topic ๋Œ€์‹  โ€“all-topics๋ฅผ ์ง€์ •ํ•˜๋ฉด ๋ชจ๋“  ํ† ํ”ฝ์— ๋Œ€ํ•ด์„œ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
  • โ€“execute ์˜ต์…˜์„ ์ œ๊ฑฐํ•˜๊ณ , โ€“dry-run์˜ต์…˜์œผ๋กœ ์‹คํ–‰ํ•˜๋ฉด ์‹ค์ œ ๋ฐ˜์˜๋˜์ง€ ์•Š๊ณ  ์–ด๋–ป๊ฒŒ ๋ณ€ํ• ์ง€ ๊ฒฐ๊ณผ๋งŒ ์ถœ๋ ฅํ•˜๋Š” dry run์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

    1
    2
    3
    4
     jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-consumer-groups --bootstrap-server localhost:9092 --group testGroup --topic test --reset-offsets --to-earliest --dry-run

    TOPIC PARTITION NEW-OFFSET
    test 0 0
  • ์ฃผ์˜์‚ฌํ•ญ: ํ•ด๋‹น ๊ทธ๋ฃน์˜ ์ปจ์Šˆ๋จธ๋ฅผ ๋ฉˆ์ถ”๊ณ  ๋ฆฌ์…‹ํ•ด์•ผ ํ•œ๋‹ค.(consumer group์ด ์‹คํ–‰์ค‘์ธ ์ƒํƒœ์— offset reset์„ ์ง„ํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ reset์€ ์‹คํŒจํ•œ๋‹ค.)

    1
    2
    3
    4
    5
     jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-consumer-groups --bootstrap-server localhost:9092 --group testGroup --topic test --reset-offsets --to-earliest --dry-run
    Error: Assignments can only be reset if the group 'testGroup' is inactive, but the current state is Stable.

    TOPIC PARTITION NEW-OFFSET
    jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ

์ƒ˜ํ”Œ

๊ธฐ์กด์˜ ํ”„๋กœ๋“€์„œ๊ฐ€ ์ง€์†์ ์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ

1
2
3
4
5
6
7
jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-console-producer --broker-list localhost:9092 --topic test
>aaa
>bbb
>ccc
>ddd
>eee
>

์ค‘๊ฐ„๋ถ€ํ„ฐ ์ปจ์Š˜

1
2
3
โœ˜ jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group testGroup
ddd
eee

๋ฆฌ์…‹ ํ›„ ์ฒ˜์Œ๋ถ€ํ„ฐ ์ปจ์Š˜ํ•˜๋„๋ก ๋ณ€๊ฒฝ

1
2
3
4
5
jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-consumer-groups --bootstrap-server localhost:9092 --group testGroup --topic test --reset-offsets --to-earliest --execute

TOPIC PARTITION NEW-OFFSET
test 0 0
jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ

์ปจ์Š˜ ์žฌ๊ฐœ ํ›„ ๊ฒฐ๊ณผ (offset ๋ฆฌ์…‹๋˜์–ด, ์ฒ˜์Œ๋ถ€ํ„ฐ ์ปจ์Š˜ํ•œ๋‹ค.)

1
2
3
4
5
6
 โœ˜ jake.ko@jakekoui-MacBook-Pro ๎‚ฐ ~ ๎‚ฐ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group testGroup
aaa
bbb
ccc
ddd
eee

์˜คํ”„์…‹์˜ ์œ„์น˜๋ฅผ ์žฌ์„ค์ •ํ•˜๊ธฐ ์œ„ํ•œ ์•„๋ž˜์™€๊ฐ™์€ ์ƒ์„ธ ์˜ต์…˜๋“ค์ด ์žˆ๋‹ค.

  • โ€“shift-by <Long: number-of-offsets> ํ˜•์‹ (+/- ๋ชจ๋‘ ๊ฐ€๋Šฅ)
  • โ€“to-offset <Long: offset>
  • โ€“to-current
  • โ€“by-duration <String: duration> : ํ˜•์‹ โ€˜PnDTnHnMnSโ€™
  • โ€“to-datetime <String: datetime> : ํ˜•์‹ โ€˜YYYY-MM-DDTHH:mm:SS.sssโ€™
  • โ€“to-latest
  • โ€“to-earliest

โ€“to-datetime์˜ ๊ฒฝ์šฐ kafka์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์„œ ๋‹ค๋ฅธ๊ณณ์— ์ €์žฅํ•˜๋Š” ์ค‘์— ๋ฐ์ดํ„ฐ ์œ ์‹ค ๋˜๋Š” ์ค‘๋ณต write ๋“ฑ์ด ๋ฐœ์ƒํ•œ ๊ฒฝ์šฐ์— ๋‚ ์งœ ๋‹จ์œ„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ๋ถˆ๋Ÿฌ์™€์„œ ์žฌ์ฒ˜๋ฆฌํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ ๋งค์šฐ ์œ ์šฉํ•˜๋‹ค.

CLI๊ฐ€ ์•„๋‹Œ Java ์ฝ”๋“œ๋กœ offset resetํ•˜๊ธฐ

Kafka์˜ ๊ฒฝ์šฐ ์‚ฌ์šฉ ํ˜•ํƒœ์— ๋”ฐ๋ผ Consumer API์™€ Stream API ๋‘๊ฐ€์ง€๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

  • Consumer API
    • ๊ฐœ๋ณ„ ์ด๋ฒคํŠธ ๋‹จ์œ„์˜ low level ์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ
    • datetime, offset ์„ ์ง€์ •ํ•ด์„œ ์›ํ•˜๋Š” ๋Œ€๋กœ reset ๊ฐ€๋Šฅ
  • Stream API
    • Stream processing์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ
    • offset reset ๊ธฐ๋Šฅ ์—†์Œ (์œ„์—์„œ ์–ธ๊ธ‰ํ•œ CLI tool์„ ์‚ฌ์šฉํ•ด์•ผํ•จ)

Consumer API๋ฅผ ์‚ฌ์šฉํ• ๋•Œ Java์ฝ”๋“œ ๋ ˆ๋ฒจ์—์„œ programmaticalํ•˜๊ฒŒ offset์„ ๋ฆฌ์…‹ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.
๋จผ์ € KafkaConsumer๊ฐ€ ์ƒ์„ฑํ•œ ํ›„์—

1
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties, keyDeser, valueDeser);

consumer loop์— ์ง„์ž…ํ•˜์—ฌ consumer.poll()์„ ๋ถ€๋ฅด๊ธฐ ์ „์—, ์ƒ์„ฑ๋œ consumer ๊ฐ์ฒด์— ๋Œ€ํ•ด offset์„ ๋ณ€๊ฒฝํ•˜๋Š” ๋‹ค์Œ ํ•จ์ˆ˜๋“ค์„ ํ˜ธ์ถœํ•˜์—ฌ offset์„ ์›ํ•˜๋Š” ๋Œ€๋กœ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • seekToBeginning: earliest๋กœ reset
  • seekToEnd: latest๋กœ reset
  • seek : ์ง€์ • offset์œผ๋กœ reset
  • offsetsForTimes: datetime ๊ธฐ์ค€์œผ๋กœ reset

reference

tags: kafka, offset

Share