package org.apache.atlas.repository.audit;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.common.params.SpatialParams;
import org.apache.solr.schema.JsonPreAnalyzedParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Singleton
@Component
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true)
/* loaded from: input_file:WEB-INF/lib/atlas-repository-1.1.0.jar:org/apache/atlas/repository/audit/HBaseBasedAuditRepository.class */
public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository {
    public static final String CONFIG_TABLE_NAME = "atlas.audit.hbase.tablename";
    public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS";
    private TableName tableName;
    private Connection connection;
    private final AtlasInstanceConverter instanceConverter;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HBaseBasedAuditRepository.class);
    public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt");
    public static final byte[] COLUMN_ACTION = Bytes.toBytes("a");
    public static final byte[] COLUMN_DETAIL = Bytes.toBytes(SpatialParams.DISTANCE);
    public static final byte[] COLUMN_USER = Bytes.toBytes("u");
    public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f");
    public static final byte[] COLUMN_TYPE = Bytes.toBytes(JsonPreAnalyzedParser.TOKEN_KEY);

    @Inject
    public HBaseBasedAuditRepository(AtlasInstanceConverter atlasInstanceConverter) {
        this.instanceConverter = atlasInstanceConverter;
    }

    @Override // org.apache.atlas.repository.audit.EntityAuditRepository
    public void putEventsV1(List<EntityAuditEvent> list) throws AtlasException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Putting {} events", Integer.valueOf(list.size()));
        }
        Table table = null;
        try {
            try {
                table = this.connection.getTable(this.tableName);
                ArrayList arrayList = new ArrayList(list.size());
                for (int i = 0; i < list.size(); i++) {
                    EntityAuditEvent entityAuditEvent = list.get(i);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding entity audit event {}", entityAuditEvent);
                    }
                    Put put = new Put(getKey(entityAuditEvent.getEntityId(), Long.valueOf(entityAuditEvent.getTimestamp()), i));
                    addColumn(put, COLUMN_ACTION, entityAuditEvent.getAction());
                    addColumn(put, COLUMN_USER, entityAuditEvent.getUser());
                    addColumn(put, COLUMN_DETAIL, entityAuditEvent.getDetails());
                    addColumn(put, COLUMN_TYPE, EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1);
                    if (persistEntityDefinition) {
                        addColumn(put, COLUMN_DEFINITION, entityAuditEvent.getEntityDefinitionString());
                    }
                    arrayList.add(put);
                }
                table.put(arrayList);
                close(table);
            } catch (IOException e) {
                throw new AtlasException(e);
            }
        } catch (Throwable th) {
            close(table);
            throw th;
        }
    }

    @Override // org.apache.atlas.repository.audit.EntityAuditRepository
    public void putEventsV2(List<EntityAuditEventV2> list) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Putting {} events", Integer.valueOf(list.size()));
        }
        Table table = null;
        try {
            try {
                table = this.connection.getTable(this.tableName);
                ArrayList arrayList = new ArrayList(list.size());
                for (int i = 0; i < list.size(); i++) {
                    EntityAuditEventV2 entityAuditEventV2 = list.get(i);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding entity audit event {}", entityAuditEventV2);
                    }
                    Put put = new Put(getKey(entityAuditEventV2.getEntityId(), Long.valueOf(entityAuditEventV2.getTimestamp()), i));
                    addColumn(put, COLUMN_ACTION, entityAuditEventV2.getAction());
                    addColumn(put, COLUMN_USER, entityAuditEventV2.getUser());
                    addColumn(put, COLUMN_DETAIL, entityAuditEventV2.getDetails());
                    addColumn(put, COLUMN_TYPE, EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2);
                    if (persistEntityDefinition) {
                        addColumn(put, COLUMN_DEFINITION, entityAuditEventV2.getEntity());
                    }
                    arrayList.add(put);
                }
                table.put(arrayList);
                try {
                    close(table);
                } catch (AtlasException e) {
                    throw new AtlasBaseException(e);
                }
            } catch (Throwable th) {
                try {
                    close(table);
                    throw th;
                } catch (AtlasException e2) {
                    throw new AtlasBaseException(e2);
                }
            }
        } catch (IOException e3) {
            throw new AtlasBaseException(e3);
        }
    }

    @Override // org.apache.atlas.repository.audit.EntityAuditRepository
    public List<EntityAuditEventV2> listEventsV2(String str, String str2, short s) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", str, str2, Short.valueOf(s));
        }
        Table table = null;
        ResultScanner resultScanner = null;
        try {
            try {
                table = this.connection.getTable(this.tableName);
                Scan small = new Scan().setReversed(true).setFilter((Filter) new PageFilter(s)).setStopRow(Bytes.toBytes(str)).setCaching(s).setSmall(true);
                resultScanner = table.getScanner(StringUtils.isEmpty(str2) ? small.setStartRow(getKey(str, Long.MAX_VALUE, Integer.MAX_VALUE)) : small.setStartRow(Bytes.toBytes(str2)));
                ArrayList arrayList = new ArrayList();
                while (true) {
                    Result next = resultScanner.next();
                    if (next == null || arrayList.size() >= s) {
                        break;
                    }
                    EntityAuditEventV2 fromKeyV2 = fromKeyV2(next.getRow());
                    if (fromKeyV2.getEntityId().equals(str)) {
                        fromKeyV2.setUser(getResultString(next, COLUMN_USER));
                        fromKeyV2.setAction(EntityAuditEventV2.EntityAuditActionV2.fromString(getResultString(next, COLUMN_ACTION)));
                        fromKeyV2.setDetails(getEntityDetails(next));
                        fromKeyV2.setType(getAuditType(next));
                        if (persistEntityDefinition) {
                            fromKeyV2.setEntityDefinition(getEntityDefinition(next));
                        }
                        arrayList.add(fromKeyV2);
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", str, str2, Integer.valueOf(arrayList.size()));
                }
                try {
                    close(resultScanner);
                    close(table);
                    return arrayList;
                } catch (AtlasException e) {
                    throw new AtlasBaseException(e);
                }
            } catch (Throwable th) {
                try {
                    close(resultScanner);
                    close(table);
                    throw th;
                } catch (AtlasException e2) {
                    throw new AtlasBaseException(e2);
                }
            }
        } catch (IOException e3) {
            throw new AtlasBaseException(e3);
        }
    }

    private String getEntityDefinition(Result result) throws AtlasBaseException {
        String resultString = getResultString(result, COLUMN_DEFINITION);
        if (getAuditType(result) != EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2) {
            resultString = AtlasType.toJson(toAtlasEntity((Referenceable) AtlasType.fromV1Json(resultString, Referenceable.class)));
        }
        return resultString;
    }

    private String getEntityDetails(Result result) throws AtlasBaseException {
        return getAuditType(result) == EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2 ? getResultString(result, COLUMN_DETAIL) : getV2Details(result);
    }

    private EntityAuditEventV2.EntityAuditType getAuditType(Result result) {
        String resultString = getResultString(result, COLUMN_TYPE);
        return resultString != null ? EntityAuditEventV2.EntityAuditType.valueOf(resultString) : EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1;
    }

    private String getV2Details(Result result) throws AtlasBaseException {
        String str = null;
        String resultString = getResultString(result, COLUMN_DETAIL);
        if (StringUtils.isNotEmpty(resultString)) {
            EntityAuditEvent.EntityAuditAction fromString = EntityAuditEvent.EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION));
            if (fromString == EntityAuditEvent.EntityAuditAction.TERM_ADD || fromString == EntityAuditEvent.EntityAuditAction.TERM_DELETE) {
                str = resultString;
            } else {
                String[] split = resultString.split(EntityAuditListener.getV1AuditPrefix(fromString));
                if (ArrayUtils.isNotEmpty(split) && split.length == 2) {
                    String str2 = split[1];
                    Referenceable referenceable = (Referenceable) AtlasType.fromV1Json(str2, Referenceable.class);
                    String v2Json = referenceable != null ? toV2Json(referenceable, fromString) : str2;
                    if (v2Json != null) {
                        str = EntityAuditListener.getV2AuditPrefix(fromString) + v2Json;
                    }
                } else {
                    str = resultString;
                }
            }
        }
        return str;
    }

    private String toV2Json(Referenceable referenceable, EntityAuditEvent.EntityAuditAction entityAuditAction) throws AtlasBaseException {
        return (entityAuditAction == EntityAuditEvent.EntityAuditAction.TAG_ADD || entityAuditAction == EntityAuditEvent.EntityAuditAction.TAG_UPDATE || entityAuditAction == EntityAuditEvent.EntityAuditAction.TAG_DELETE) ? AtlasType.toJson(this.instanceConverter.toAtlasClassification(referenceable)) : AtlasType.toJson(toAtlasEntity(referenceable));
    }

    private AtlasEntity toAtlasEntity(Referenceable referenceable) throws AtlasBaseException {
        AtlasEntity atlasEntity = null;
        AtlasEntity.AtlasEntitiesWithExtInfo atlasEntity2 = this.instanceConverter.toAtlasEntity(referenceable);
        if (atlasEntity2 != null && CollectionUtils.isNotEmpty(atlasEntity2.getEntities())) {
            atlasEntity = atlasEntity2.getEntities().get(0);
        }
        return atlasEntity;
    }

    private <T> void addColumn(Put put, byte[] bArr, T t) {
        if (t == null || t.toString().isEmpty()) {
            return;
        }
        put.addColumn(COLUMN_FAMILY, bArr, Bytes.toBytes(t.toString()));
    }

    @Override // org.apache.atlas.repository.audit.EntityAuditRepository
    public List<EntityAuditEvent> listEventsV1(String str, String str2, short s) throws AtlasException {
        String resultString;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", str, str2, Short.valueOf(s));
        }
        Table table = null;
        ResultScanner resultScanner = null;
        try {
            try {
                table = this.connection.getTable(this.tableName);
                Scan small = new Scan().setReversed(true).setFilter((Filter) new PageFilter(s)).setStopRow(Bytes.toBytes(str)).setCaching(s).setSmall(true);
                resultScanner = table.getScanner(StringUtils.isEmpty(str2) ? small.setStartRow(getKey(str, Long.MAX_VALUE, Integer.MAX_VALUE)) : small.setStartRow(Bytes.toBytes(str2)));
                ArrayList arrayList = new ArrayList();
                while (true) {
                    Result next = resultScanner.next();
                    if (next == null || arrayList.size() >= s) {
                        break;
                    }
                    EntityAuditEvent fromKey = fromKey(next.getRow());
                    if (fromKey.getEntityId().equals(str)) {
                        fromKey.setUser(getResultString(next, COLUMN_USER));
                        fromKey.setAction(EntityAuditEvent.EntityAuditAction.fromString(getResultString(next, COLUMN_ACTION)));
                        fromKey.setDetails(getResultString(next, COLUMN_DETAIL));
                        if (persistEntityDefinition && (resultString = getResultString(next, COLUMN_DEFINITION)) != null) {
                            fromKey.setEntityDefinition(resultString);
                        }
                        arrayList.add(fromKey);
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", str, str2, Integer.valueOf(arrayList.size()));
                }
                close(resultScanner);
                close(table);
                return arrayList;
            } catch (IOException e) {
                throw new AtlasException(e);
            }
        } catch (Throwable th) {
            close(resultScanner);
            close(table);
            throw th;
        }
    }

    private String getResultString(Result result, byte[] bArr) {
        byte[] value = result.getValue(COLUMN_FAMILY, bArr);
        if (value != null) {
            return Bytes.toString(value);
        }
        return null;
    }

    private EntityAuditEvent fromKey(byte[] bArr) {
        String bytes = Bytes.toString(bArr);
        EntityAuditEvent entityAuditEvent = new EntityAuditEvent();
        if (StringUtils.isNotEmpty(bytes)) {
            String[] split = bytes.split(":");
            entityAuditEvent.setEntityId(split[0]);
            entityAuditEvent.setTimestamp(Long.valueOf(split[1]).longValue());
            entityAuditEvent.setEventKey(bytes);
        }
        return entityAuditEvent;
    }

    private EntityAuditEventV2 fromKeyV2(byte[] bArr) {
        String bytes = Bytes.toString(bArr);
        EntityAuditEventV2 entityAuditEventV2 = new EntityAuditEventV2();
        if (StringUtils.isNotEmpty(bytes)) {
            String[] split = bytes.split(":");
            entityAuditEventV2.setEntityId(split[0]);
            entityAuditEventV2.setTimestamp(Long.valueOf(split[1]).longValue());
            entityAuditEventV2.setEventKey(bytes);
        }
        return entityAuditEventV2;
    }

    private void close(Closeable closeable) throws AtlasException {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (IOException e) {
                throw new AtlasException(e);
            }
        }
    }

    public static Configuration getHBaseConfiguration(org.apache.commons.configuration.Configuration configuration) throws AtlasException {
        org.apache.commons.configuration.Configuration subsetConfiguration = ApplicationProperties.getSubsetConfiguration(configuration, AbstractStorageBasedAuditRepository.CONFIG_PREFIX);
        Configuration create = HBaseConfiguration.create();
        Iterator<String> keys = subsetConfiguration.getKeys();
        while (keys.hasNext()) {
            String next = keys.next();
            create.set(next, subsetConfiguration.getString(next));
        }
        return create;
    }

    private void createTableIfNotExists() throws AtlasException {
        try {
            try {
                Admin admin = this.connection.getAdmin();
                LOG.info("Checking if table {} exists", this.tableName.getNameAsString());
                if (admin.tableExists(this.tableName)) {
                    LOG.info("Table {} exists", this.tableName.getNameAsString());
                } else {
                    LOG.info("Creating table {}", this.tableName.getNameAsString());
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(this.tableName);
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(COLUMN_FAMILY);
                    hColumnDescriptor.setMaxVersions(1);
                    hColumnDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
                    hColumnDescriptor.setCompressionType(Compression.Algorithm.GZ);
                    hColumnDescriptor.setBloomFilterType(BloomType.ROW);
                    hTableDescriptor.addFamily(hColumnDescriptor);
                    admin.createTable(hTableDescriptor);
                }
                close(admin);
            } catch (IOException e) {
                throw new AtlasException(e);
            }
        } catch (Throwable th) {
            close(null);
            throw th;
        }
    }

    @Override // org.apache.atlas.service.Service
    public void start() throws AtlasException {
        org.apache.commons.configuration.Configuration configuration = ApplicationProperties.get();
        startInternal(configuration, getHBaseConfiguration(configuration));
    }

    @VisibleForTesting
    void startInternal(org.apache.commons.configuration.Configuration configuration, Configuration configuration2) throws AtlasException {
        this.tableName = TableName.valueOf(configuration.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME));
        try {
            this.connection = createConnection(configuration2);
            if (HAConfiguration.isHAEnabled(configuration)) {
                return;
            }
            LOG.info("HA is disabled. Hence creating table on startup.");
            createTableIfNotExists();
        } catch (IOException e) {
            throw new AtlasException(e);
        }
    }

    @VisibleForTesting
    protected Connection createConnection(Configuration configuration) throws IOException {
        return ConnectionFactory.createConnection(configuration);
    }

    @Override // org.apache.atlas.service.Service
    public void stop() throws AtlasException {
        close(this.connection);
    }

    @Override // org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository, org.apache.atlas.listener.ActiveStateChangeHandler
    public void instanceIsActive() throws AtlasException {
        LOG.info("Reacting to active: Creating HBase table for Audit if required.");
        createTableIfNotExists();
    }
}
