package org.apache.atlas.notification;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.model.notification.MessageVersion;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/atlas-notification-1.2.0.jar:org/apache/atlas/notification/AtlasNotificationMessageDeserializer.class */
public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AtlasNotificationMessageDeserializer.class);
    public static final String VERSION_MISMATCH_MSG = "Notification message version mismatch. Expected %s but recieved %s. Message %s";
    private final TypeReference<T> messageType;
    private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType;
    private final MessageVersion expectedVersion;
    private final Logger notificationLogger;
    private final Map<String, SplitMessageAggregator> splitMsgBuffer;
    private final long splitMessageBufferPurgeIntervalMs;
    private final long splitMessageSegmentsWaitTimeMs;
    private long splitMessagesLastPurgeTime;
    private final AtomicLong messageCountTotal;
    private final AtomicLong messageCountSinceLastInterval;

    public AtlasNotificationMessageDeserializer(TypeReference<T> typeReference, TypeReference<AtlasNotificationMessage<T>> typeReference2, MessageVersion messageVersion, Logger logger) {
        this(typeReference, typeReference2, messageVersion, logger, AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000, AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
    }

    public AtlasNotificationMessageDeserializer(TypeReference<T> typeReference, TypeReference<AtlasNotificationMessage<T>> typeReference2, MessageVersion messageVersion, Logger logger, long j, long j2) {
        this.splitMsgBuffer = new HashMap();
        this.splitMessagesLastPurgeTime = System.currentTimeMillis();
        this.messageCountTotal = new AtomicLong(0L);
        this.messageCountSinceLastInterval = new AtomicLong(0L);
        this.messageType = typeReference;
        this.notificationMessageType = typeReference2;
        this.expectedVersion = messageVersion;
        this.notificationLogger = logger;
        this.splitMessageSegmentsWaitTimeMs = j;
        this.splitMessageBufferPurgeIntervalMs = j2;
    }

    public TypeReference<T> getMessageType() {
        return this.messageType;
    }

    public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() {
        return this.notificationMessageType;
    }

    @Override // org.apache.atlas.notification.MessageDeserializer
    public T deserialize(String str) {
        Object fromV1Json;
        SplitMessageAggregator splitMessageAggregator;
        this.messageCountTotal.incrementAndGet();
        this.messageCountSinceLastInterval.incrementAndGet();
        AtlasNotificationBaseMessage atlasNotificationBaseMessage = (AtlasNotificationBaseMessage) AtlasType.fromV1Json(str, AtlasNotificationBaseMessage.class);
        if (atlasNotificationBaseMessage == null || atlasNotificationBaseMessage.getVersion() == null) {
            fromV1Json = AtlasType.fromV1Json(str, this.messageType);
        } else {
            String str2 = str;
            if (atlasNotificationBaseMessage.getMsgSplitCount() > 1) {
                AtlasNotificationStringMessage atlasNotificationStringMessage = (AtlasNotificationStringMessage) AtlasType.fromV1Json(str2, AtlasNotificationStringMessage.class);
                checkVersion(atlasNotificationStringMessage, str2);
                String msgId = atlasNotificationStringMessage.getMsgId();
                if (StringUtils.isEmpty(msgId)) {
                    LOG.error("Received multi-part message with no message ID. Ignoring message");
                    atlasNotificationBaseMessage = null;
                } else {
                    int msgSplitIdx = atlasNotificationStringMessage.getMsgSplitIdx();
                    int msgSplitCount = atlasNotificationStringMessage.getMsgSplitCount();
                    if (msgSplitIdx == 0) {
                        splitMessageAggregator = new SplitMessageAggregator(atlasNotificationStringMessage);
                        this.splitMsgBuffer.put(splitMessageAggregator.getMsgId(), splitMessageAggregator);
                    } else {
                        splitMessageAggregator = this.splitMsgBuffer.get(msgId);
                    }
                    if (splitMessageAggregator == null) {
                        LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, Integer.valueOf(msgSplitIdx + 1), Integer.valueOf(msgSplitCount));
                        atlasNotificationBaseMessage = null;
                    } else if (splitMessageAggregator.getTotalSplitCount() <= msgSplitIdx) {
                        LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, Integer.valueOf(msgSplitIdx + 1), Integer.valueOf(msgSplitCount));
                        atlasNotificationBaseMessage = null;
                    } else {
                        LOG.info("Received msgID={}: {} of {}", msgId, Integer.valueOf(msgSplitIdx + 1), Integer.valueOf(msgSplitCount));
                        if (splitMessageAggregator.add(atlasNotificationStringMessage)) {
                            this.splitMsgBuffer.remove(msgId);
                            boolean z = true;
                            StringBuilder sb = new StringBuilder();
                            int i = 0;
                            while (true) {
                                if (i >= splitMessageAggregator.getTotalSplitCount()) {
                                    break;
                                }
                                atlasNotificationStringMessage = splitMessageAggregator.get(i);
                                if (atlasNotificationStringMessage == null) {
                                    LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, Integer.valueOf(i + 1), Integer.valueOf(msgSplitCount));
                                    z = false;
                                    break;
                                }
                                sb.append(atlasNotificationStringMessage.getMessage());
                                i++;
                            }
                            if (z) {
                                String sb2 = sb.toString();
                                if (AtlasNotificationBaseMessage.CompressionKind.GZIP.equals(atlasNotificationStringMessage.getMsgCompressionKind())) {
                                    byte[] bytesUtf8 = AtlasNotificationBaseMessage.getBytesUtf8(sb2);
                                    byte[] decodeBase64AndGzipUncompress = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(bytesUtf8);
                                    str2 = AtlasNotificationBaseMessage.getStringUtf8(decodeBase64AndGzipUncompress);
                                    LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, Integer.valueOf(msgSplitCount), Integer.valueOf(bytesUtf8.length), Integer.valueOf(decodeBase64AndGzipUncompress.length));
                                } else {
                                    byte[] decodeBase64 = AtlasNotificationBaseMessage.decodeBase64(AtlasNotificationBaseMessage.getBytesUtf8(sb2));
                                    str2 = AtlasNotificationBaseMessage.getStringUtf8(decodeBase64);
                                    LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, Integer.valueOf(msgSplitCount), Integer.valueOf(decodeBase64.length));
                                }
                                atlasNotificationBaseMessage = (AtlasNotificationBaseMessage) AtlasType.fromV1Json(str2, AtlasNotificationBaseMessage.class);
                            } else {
                                atlasNotificationBaseMessage = null;
                            }
                        } else {
                            atlasNotificationBaseMessage = null;
                        }
                    }
                }
            }
            if (atlasNotificationBaseMessage != null) {
                if (AtlasNotificationBaseMessage.CompressionKind.GZIP.equals(atlasNotificationBaseMessage.getMsgCompressionKind())) {
                    AtlasNotificationStringMessage atlasNotificationStringMessage2 = (AtlasNotificationStringMessage) AtlasType.fromV1Json(str2, AtlasNotificationStringMessage.class);
                    byte[] bytesUtf82 = AtlasNotificationBaseMessage.getBytesUtf8(atlasNotificationStringMessage2.getMessage());
                    byte[] decodeBase64AndGzipUncompress2 = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(bytesUtf82);
                    str2 = AtlasNotificationBaseMessage.getStringUtf8(decodeBase64AndGzipUncompress2);
                    LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", atlasNotificationStringMessage2.getMsgId(), Integer.valueOf(bytesUtf82.length), Integer.valueOf(decodeBase64AndGzipUncompress2.length));
                }
                AtlasNotificationMessage atlasNotificationMessage = (AtlasNotificationMessage) AtlasType.fromV1Json(str2, this.notificationMessageType);
                checkVersion(atlasNotificationMessage, str2);
                fromV1Json = atlasNotificationMessage.getMessage();
            } else {
                fromV1Json = null;
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.splitMessagesLastPurgeTime >= this.splitMessageBufferPurgeIntervalMs) {
            purgeStaleMessages(this.splitMsgBuffer, currentTimeMillis, this.splitMessageSegmentsWaitTimeMs);
            LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", Long.valueOf(this.messageCountTotal.get()), Long.valueOf(this.messageCountSinceLastInterval.getAndSet(0L)));
            this.splitMessagesLastPurgeTime = currentTimeMillis;
        }
        return (T) fromV1Json;
    }

    @VisibleForTesting
    static void purgeStaleMessages(Map<String, SplitMessageAggregator> map, long j, long j2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + map.size() + ")");
        }
        ArrayList<SplitMessageAggregator> arrayList = null;
        for (SplitMessageAggregator splitMessageAggregator : map.values()) {
            if (j - splitMessageAggregator.getFirstSplitTimestamp() >= j2) {
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(splitMessageAggregator);
            }
        }
        if (arrayList != null) {
            for (SplitMessageAggregator splitMessageAggregator2 : arrayList) {
                LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", splitMessageAggregator2.getMsgId(), Long.valueOf(splitMessageAggregator2.getTotalSplitCount()), Long.valueOf(splitMessageAggregator2.getReceivedSplitCount()));
                map.remove(splitMessageAggregator2.getMsgId());
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + map.size() + ")");
        }
    }

    protected void checkVersion(AtlasNotificationBaseMessage atlasNotificationBaseMessage, String str) {
        int compareVersion = atlasNotificationBaseMessage.compareVersion(this.expectedVersion);
        if (compareVersion > 0) {
            String format = String.format(VERSION_MISMATCH_MSG, this.expectedVersion, atlasNotificationBaseMessage.getVersion(), str);
            this.notificationLogger.error(format);
            throw new IncompatibleVersionException(format);
        }
        if (compareVersion < 0) {
            this.notificationLogger.info(String.format(VERSION_MISMATCH_MSG, this.expectedVersion, atlasNotificationBaseMessage.getVersion(), str));
        }
    }
}
