package org.apache.atlas.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/atlas-notification-1.2.0.jar:org/apache/atlas/kafka/AtlasKafkaConsumer.class */
public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AtlasKafkaConsumer.class);
    private final KafkaConsumer kafkaConsumer;
    private final boolean autoCommitEnabled;
    private long pollTimeoutMilliSeconds;

    public AtlasKafkaConsumer(NotificationInterface.NotificationType notificationType, KafkaConsumer kafkaConsumer, boolean z, long j) {
        this(notificationType.getDeserializer(), kafkaConsumer, z, j);
    }

    public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> atlasNotificationMessageDeserializer, KafkaConsumer kafkaConsumer, boolean z, long j) {
        super(atlasNotificationMessageDeserializer);
        this.pollTimeoutMilliSeconds = 1000L;
        this.autoCommitEnabled = z;
        this.kafkaConsumer = kafkaConsumer;
        this.pollTimeoutMilliSeconds = j;
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public List<AtlasKafkaMessage<T>> receive() {
        return receive(this.pollTimeoutMilliSeconds);
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public List<AtlasKafkaMessage<T>> receive(long j) {
        ArrayList arrayList = new ArrayList();
        ConsumerRecords poll = this.kafkaConsumer.poll(j);
        if (poll != null) {
            Iterator it2 = poll.iterator();
            while (it2.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value());
                }
                T t = null;
                try {
                    t = this.deserializer.deserialize(consumerRecord.value().toString());
                } catch (OutOfMemoryError e) {
                    LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value(), e);
                }
                if (t != null) {
                    arrayList.add(new AtlasKafkaMessage(t, consumerRecord.offset(), consumerRecord.partition()));
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.atlas.notification.AbstractNotificationConsumer, org.apache.atlas.notification.NotificationConsumer
    public void commit(TopicPartition topicPartition, long j) {
        if (this.autoCommitEnabled) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.info(" commiting the offset ==>> " + j);
        }
        this.kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j)));
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public void wakeup() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
        }
    }
}
