代码拉取完成,页面将自动刷新
import com.jfbank.ai.consumer.util.ShutdownCallback;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* @param <K> message key type
* @param <V> message value type
*/
@Slf4j
public class KafkaCusumerThread<K, V> extends Thread implements Closeable {
private final KafkaConsumer<K, V> consumer;
private List<String> topics;
private ConsumerCallback<ConsumerRecord<K, V>> consumerCallback;
public KafkaCusumerThread(KafkaConsumer<K, V> consumer, List<String> topics, ConsumerCallback<ConsumerRecord<K, V>> consumerCallback) {
ShutdownCallback.register(this);
this.consumer = consumer;
this.topics = topics;
this.consumerCallback = consumerCallback;
}
private boolean flag = true;
private ConsumerRecords<K, V> records;
@Override
public void run() {
consumer.subscribe(topics);
while (flag) {
try {
records = consumer.poll(200);
for (ConsumerRecord<K, V> record : records) {
consumerCallback.consumer(record);
}
} catch (Exception e) {
records = null;
}
records = null;
}
}
@Override
public void close() throws IOException {
log.info("start close kafka consumer.");
consumer.commitSync();
consumer.close();
flag = false;
for (int i = 0; i < 10; i++) {
try {
if (records != null && !records.isEmpty()) {
log.info("kafka consumer is closing -> {}", i);
Thread.sleep(200);
} else {
break;
}
} catch (InterruptedException e) {
log.error(e.toString());
}
}
log.info("finished closed kafka consumer.");
}
public interface ConsumerCallback<C> {
void consumer(C t);
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。