/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.LockValue;
import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncConditionalWriter;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.ParallelSnapshotScanner;
import org.apache.fluo.core.impl.SnapshotScanner;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.core.impl.TxInfo;
import org.apache.fluo.core.impl.TxStats;
import org.apache.fluo.core.impl.scanner.ColumnScannerImpl;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.Flutation;
import org.apache.fluo.core.util.SpanUtil;

public class TransactionImpl
extends AbstractTransactionBase
implements AsyncTransaction,
Snapshot {
    public static final byte[] EMPTY = new byte[0];
    public static final Bytes EMPTY_BS = Bytes.of((byte[])EMPTY);
    private static final Bytes DELETE = Bytes.of((String)"special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
    private static final Bytes NTFY_VAL = Bytes.of((String)"special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
    private final long startTs;
    private final Map<Bytes, Map<Column, Bytes>> updates = new HashMap<Bytes, Map<Column, Bytes>>();
    private final Map<Bytes, Set<Column>> weakNotifications = new HashMap<Bytes, Set<Column>>();
    private final Set<Column> observedColumns;
    private final Environment env;
    final Map<Bytes, Set<Column>> columnsRead = new HashMap<Bytes, Set<Column>>();
    private final TxStats stats;
    private Notification notification;
    private Notification weakNotification;
    private TransactorNode tnode = null;
    private TxStatus status = TxStatus.OPEN;
    private boolean commitAttempted = false;
    private boolean stopAfterPreCommit = false;
    private boolean stopAfterPrimaryCommit = false;

    private static boolean isWrite(Bytes val) {
        return val != NTFY_VAL;
    }

    private static boolean isDelete(Bytes val) {
        return val == DELETE;
    }

    public TransactionImpl(Environment env, Notification trigger, long startTs) {
        Objects.requireNonNull(env, "environment cannot be null");
        Preconditions.checkArgument((startTs >= 0L ? 1 : 0) != 0, (Object)"startTs cannot be negative");
        this.env = env;
        this.stats = new TxStats(env);
        this.startTs = startTs;
        this.observedColumns = env.getObservers().keySet();
        if (trigger != null && env.getWeakObservers().containsKey(trigger.getColumn())) {
            this.weakNotification = trigger;
        } else {
            this.notification = trigger;
        }
        if (this.notification != null) {
            HashMap<Column, Bytes> colUpdates = new HashMap<Column, Bytes>();
            colUpdates.put(this.notification.getColumn(), NTFY_VAL);
            this.updates.put(this.notification.getRow(), colUpdates);
        }
    }

    public TransactionImpl(Environment env, Notification trigger) {
        this(env, trigger, TransactionImpl.allocateTimestamp(env).getTxTimestamp());
    }

    public TransactionImpl(Environment env) {
        this(env, null, TransactionImpl.allocateTimestamp(env).getTxTimestamp());
    }

    public TransactionImpl(Environment env, long startTs) {
        this(env, null, startTs);
    }

    private static Stamp allocateTimestamp(Environment env) {
        return env.getSharedResources().getTimestampTracker().allocateTimestamp();
    }

    public Bytes get(Bytes row, Column column) {
        this.checkIfOpen();
        return this.get(row, Collections.singleton(column)).get(column);
    }

    public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
        this.checkIfOpen();
        return this.getImpl(row, columns);
    }

    public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
        this.checkIfOpen();
        if (rows.size() == 0 || columns.size() == 0) {
            return Collections.emptyMap();
        }
        this.env.getSharedResources().getVisCache().validate(columns);
        ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rows, columns, this.env, this.startTs, this.stats);
        Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
        for (Map.Entry<Bytes, Map<Column, Bytes>> entry : ret.entrySet()) {
            this.updateColumnsRead(entry.getKey(), entry.getValue().keySet());
        }
        return ret;
    }

    public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
        this.checkIfOpen();
        if (rowColumns.size() == 0) {
            return Collections.emptyMap();
        }
        ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, this.env, this.startTs, this.stats);
        Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
        HashMap<RowColumn, Bytes> ret = new HashMap<RowColumn, Bytes>();
        for (Map.Entry<Bytes, Map<Column, Bytes>> entry : scan.entrySet()) {
            this.updateColumnsRead(entry.getKey(), entry.getValue().keySet());
            for (Map.Entry<Column, Bytes> colVal : entry.getValue().entrySet()) {
                ret.put(new RowColumn(entry.getKey(), colVal.getKey()), colVal.getValue());
            }
        }
        return ret;
    }

    private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
        SnapshotScanner.Opts opts;
        this.env.getSharedResources().getVisCache().validate(columns);
        boolean shouldCopy = false;
        for (Column column : columns) {
            if (!column.isVisibilitySet()) continue;
            shouldCopy = true;
        }
        if (shouldCopy) {
            HashSet<Column> cols = new HashSet<Column>();
            for (Column column : columns) {
                if (column.isVisibilitySet()) {
                    cols.add(new Column(column.getFamily(), column.getQualifier()));
                    continue;
                }
                cols.add(column);
            }
            opts = new SnapshotScanner.Opts(Span.exact((Bytes)row), columns);
        } else {
            opts = new SnapshotScanner.Opts(Span.exact((Bytes)row), columns);
        }
        HashMap<Column, Bytes> ret = new HashMap<Column, Bytes>();
        Iterable scanner = Iterables.transform((Iterable)new SnapshotScanner(this.env, opts, this.startTs, this.stats), ColumnScannerImpl::entry2cv);
        for (ColumnValue cv : scanner) {
            if (shouldCopy) {
                if (!columns.contains(cv.getColumn())) continue;
                ret.put(cv.getColumn(), cv.getValue());
                continue;
            }
            ret.put(cv.getColumn(), cv.getValue());
        }
        this.updateColumnsRead(row, columns);
        return ret;
    }

    public ScannerBuilder scanner() {
        this.checkIfOpen();
        return new ScannerBuilderImpl(this);
    }

    private void updateColumnsRead(Bytes row, Set<Column> columns) {
        Set<Column> colsRead = this.columnsRead.get(row);
        if (colsRead == null) {
            colsRead = new HashSet<Column>();
            this.columnsRead.put(row, colsRead);
        }
        colsRead.addAll(columns);
    }

    public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
        Bytes curVal;
        this.checkIfOpen();
        Objects.requireNonNull(row);
        Objects.requireNonNull(col);
        Objects.requireNonNull(value);
        if (col.getFamily().equals((Object)ColumnConstants.NOTIFY_CF)) {
            throw new IllegalArgumentException(ColumnConstants.NOTIFY_CF + " is a reserved family");
        }
        this.env.getSharedResources().getVisCache().validate(col);
        Map<Column, Bytes> colUpdates = this.updates.get(row);
        if (colUpdates == null) {
            colUpdates = new HashMap<Column, Bytes>();
            this.updates.put(row, colUpdates);
        }
        if ((curVal = colUpdates.get(col)) != null && TransactionImpl.isWrite(curVal)) {
            throw new AlreadySetException("Value already set " + row + " " + col);
        }
        colUpdates.put(col, value);
    }

    public void setWeakNotification(Bytes row, Column col) {
        this.checkIfOpen();
        Objects.requireNonNull(row);
        Objects.requireNonNull(col);
        if (!this.env.getWeakObservers().containsKey(col)) {
            throw new IllegalArgumentException("Column not configured for weak notifications " + col);
        }
        this.env.getSharedResources().getVisCache().validate(col);
        Set<Column> columns = this.weakNotifications.get(row);
        if (columns == null) {
            columns = new HashSet<Column>();
            this.weakNotifications.put(row, columns);
        }
        columns.add(col);
    }

    public void delete(Bytes row, Column col) throws AlreadySetException {
        this.checkIfOpen();
        Objects.requireNonNull(row);
        Objects.requireNonNull(col);
        this.set(row, col, DELETE);
    }

    private ConditionalFlutation prewrite(ConditionalFlutation cm, Bytes row, Column col, Bytes val, Bytes primaryRow, Column primaryColumn, boolean isTriggerRow) {
        boolean isTrigger;
        IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
        PrewriteIterator.setSnaptime((IteratorSetting)iterConf, (long)this.startTs);
        boolean bl = isTrigger = isTriggerRow && col.equals((Object)this.notification.getColumn());
        if (isTrigger) {
            PrewriteIterator.enableAckCheck((IteratorSetting)iterConf, (long)this.notification.getTimestamp());
        }
        Condition cond = new FluoCondition(this.env, col).setIterators(new IteratorSetting[]{iterConf});
        if (cm == null) {
            cm = new ConditionalFlutation(this.env, row, cond);
        } else {
            cm.addCondition(cond);
        }
        if (TransactionImpl.isWrite(val) && !TransactionImpl.isDelete(val)) {
            cm.put(col, 0xA000000000000000L | this.startTs, val.toArray());
        }
        cm.put(col, 0xE000000000000000L | this.startTs, LockValue.encode((Bytes)primaryRow, (Column)primaryColumn, (boolean)TransactionImpl.isWrite(val), (boolean)TransactionImpl.isDelete(val), (boolean)isTriggerRow, (Long)this.getTransactorID()));
        return cm;
    }

    private ConditionalFlutation prewrite(Bytes row, Column col, Bytes val, Bytes primaryRow, Column primaryColumn, boolean isTriggerRow) {
        return this.prewrite(null, row, col, val, primaryRow, primaryColumn, isTriggerRow);
    }

    private void prewrite(ConditionalFlutation cm, Column col, Bytes val, Bytes primaryRow, Column primaryColumn, boolean isTriggerRow) {
        this.prewrite(cm, null, col, val, primaryRow, primaryColumn, isTriggerRow);
    }

    private boolean isTriggerRow(Bytes row) {
        return this.notification != null && this.notification.getRow().equals((Object)row);
    }

    public boolean preCommit(CommitData cd) {
        return this.preCommit(cd, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean preCommit(CommitData cd, RowColumn primary) {
        TransactionImpl transactionImpl = this;
        synchronized (transactionImpl) {
            this.checkIfOpen();
            this.status = TxStatus.COMMIT_STARTED;
            this.commitAttempted = true;
            this.stopAfterPreCommit = true;
        }
        SyncCommitObserver sco = new SyncCommitObserver();
        this.beginCommitAsync(cd, sco, primary);
        try {
            sco.waitForCommit();
        }
        catch (AlreadyAcknowledgedException e) {
            throw e;
        }
        catch (CommitException e) {
            return false;
        }
        return true;
    }

    private void readUnread(CommitData cd) throws Exception {
        HashMap columnsToRead = new HashMap();
        for (Map.Entry entry : cd.getRejected().entrySet()) {
            Set<Column> rowColsRead = this.columnsRead.get(entry.getKey());
            if (rowColsRead == null) {
                columnsToRead.put(entry.getKey(), entry.getValue());
                continue;
            }
            HashSet colsToRead = new HashSet((Collection)entry.getValue());
            colsToRead.removeAll(rowColsRead);
            if (colsToRead.size() <= 0) continue;
            columnsToRead.put(entry.getKey(), colsToRead);
        }
        for (Map.Entry entry : columnsToRead.entrySet()) {
            this.getImpl((Bytes)entry.getKey(), (Set)entry.getValue());
        }
    }

    private boolean checkForAckCollision(ConditionalMutation cm) {
        Bytes row = Bytes.of((byte[])cm.getRow());
        if (this.isTriggerRow(row)) {
            List updates = cm.getUpdates();
            for (ColumnUpdate cu : updates) {
                Scanner scanner;
                Column col = new Column(Bytes.of((byte[])cu.getColumnFamily()), Bytes.of((byte[])cu.getColumnQualifier()), Bytes.of((byte[])cu.getColumnVisibility()));
                if (!this.notification.getColumn().equals((Object)col)) continue;
                Key startKey = SpanUtil.toKey(this.notification.getRowColumn());
                startKey.setTimestamp(-2305843009213693953L);
                Key endKey = SpanUtil.toKey(this.notification.getRowColumn());
                endKey.setTimestamp(0xC000000000000000L | this.notification.getTimestamp() + 1L);
                Range range = new Range(startKey, endKey);
                try {
                    scanner = this.env.getConnector().createScanner(this.env.getTable(), this.env.getAuthorizations());
                }
                catch (TableNotFoundException e) {
                    throw new RuntimeException(e);
                }
                scanner.setRange(range);
                if (!scanner.iterator().hasNext()) continue;
                this.env.getSharedResources().getBatchWriter().writeMutationAsync(this.notification.newDelete(this.env));
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
        this.stopAfterPrimaryCommit = true;
        SyncCommitObserver sco = new SyncCommitObserver();
        cd.commitObserver = sco;
        try {
            this.beginSecondCommitPhase(cd, commitStamp);
            sco.waitForCommit();
        }
        catch (CommitException e) {
            return false;
        }
        catch (Exception e) {
            throw new FluoException((Throwable)e);
        }
        return true;
    }

    public CommitData createCommitData() {
        CommitData cd = new CommitData();
        cd.cw = this.env.getSharedResources().getConditionalWriter();
        cd.acw = this.env.getSharedResources().getAsyncConditionalWriter();
        cd.bacw = this.env.getSharedResources().getBulkAsyncConditionalWriter();
        return cd;
    }

    public synchronized void commit() throws CommitException {
        SyncCommitObserver sco = new SyncCommitObserver();
        this.commitAsync(sco);
        sco.waitForCommit();
    }

    void deleteWeakRow() {
        if (this.weakNotification != null) {
            this.env.getSharedResources().getBatchWriter().writeMutation(this.weakNotification.newDelete(this.env, this.startTs));
        }
    }

    @Override
    public TxStats getStats() {
        return this.stats;
    }

    public long getStartTs() {
        return this.startTs;
    }

    @VisibleForTesting
    public TransactionImpl setTransactor(TransactorNode tnode) {
        this.tnode = tnode;
        return this;
    }

    private Long getTransactorID() {
        if (this.tnode == null) {
            this.tnode = this.env.getSharedResources().getTransactorNode();
        }
        return this.tnode.getTransactorID().getLongID();
    }

    private synchronized void close(boolean checkForStaleScan) {
        if (this.status != TxStatus.CLOSED) {
            Stamp stamp;
            this.status = TxStatus.CLOSED;
            if (checkForStaleScan && !this.commitAttempted && this.startTs < (stamp = this.env.getSharedResources().getOracleClient().getStamp()).getGcTimestamp()) {
                throw new StaleScanException();
            }
            this.env.getSharedResources().getTimestampTracker().removeTimestamp(this.startTs);
        }
    }

    public void close() {
        this.close(true);
    }

    private synchronized void checkIfOpen() {
        if (this.status != TxStatus.OPEN) {
            throw new IllegalStateException("Transaction is no longer open! status = " + (Object)((Object)this.status));
        }
    }

    protected void finalize() throws Throwable {
        this.close(false);
    }

    public long getStartTimestamp() {
        return this.startTs;
    }

    @Override
    public int getSize() {
        int size = 0;
        for (Map.Entry<Bytes, Map<Column, Bytes>> entry : this.updates.entrySet()) {
            size += entry.getKey().length();
            for (Map.Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
                Column c = entry2.getKey();
                size += c.getFamily().length();
                size += c.getQualifier().length();
                size += c.getVisibility().length();
                size += entry2.getValue().length();
            }
        }
        for (Map.Entry<Bytes, Object> entry : this.columnsRead.entrySet()) {
            size += entry.getKey().length();
            for (Column c : (Set)entry.getValue()) {
                size += c.getFamily().length();
                size += c.getQualifier().length();
                size += c.getVisibility().length();
            }
        }
        return size;
    }

    @Override
    public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
        this.checkIfOpen();
        this.status = TxStatus.COMMIT_STARTED;
        this.commitAttempted = true;
        try {
            CommitData cd = this.createCommitData();
            this.beginCommitAsync(cd, commitCallback, null);
        }
        catch (Exception e) {
            e.printStackTrace();
            commitCallback.failed(e);
        }
    }

    private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, RowColumn primary) {
        Map<Column, Bytes> colSet;
        Column primCol;
        Bytes primRow;
        if (this.updates.size() == 0) {
            this.deleteWeakRow();
            commitCallback.committed();
            return;
        }
        for (Map<Column, Bytes> cols : this.updates.values()) {
            this.stats.incrementEntriesSet(cols.size());
        }
        if (primary != null) {
            primRow = primary.getRow();
            primCol = primary.getColumn();
            if (this.notification != null && !primary.equals((Object)this.notification.getRowColumn())) {
                throw new IllegalArgumentException("Primary must be notification");
            }
        } else if (this.notification != null) {
            primRow = this.notification.getRow();
            primCol = this.notification.getColumn();
        } else {
            primRow = this.updates.keySet().iterator().next();
            colSet = this.updates.get(primRow);
            primCol = colSet.keySet().iterator().next();
        }
        cd.prow = primRow;
        colSet = this.updates.get(cd.prow);
        cd.pcol = primCol;
        cd.pval = colSet.remove(primCol);
        if (colSet.size() == 0) {
            this.updates.remove(cd.prow);
        }
        cd.commitObserver = commitCallback;
        final ConditionalFlutation pcm = this.prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, this.isTriggerRow(cd.prow));
        ListenableFuture<Iterator<ConditionalWriter.Result>> future = cd.acw.apply(Collections.singletonList(pcm));
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Iterator<ConditionalWriter.Result>>(cd){

            @Override
            protected void onSuccess(CommitData cd, Iterator<ConditionalWriter.Result> result) throws Exception {
                TransactionImpl.this.postLockPrimary(cd, pcm, (ConditionalWriter.Result)Iterators.getOnlyElement(result));
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void postLockPrimary(CommitData cd, final ConditionalMutation pcm, ConditionalWriter.Result result) throws Exception {
        final ConditionalWriter.Status mutationStatus = result.getStatus();
        if (mutationStatus == ConditionalWriter.Status.ACCEPTED) {
            this.lockOtherColumns(cd);
        } else {
            this.env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd){

                @Override
                protected void runCommitStep(CommitData cd) throws Exception {
                    TransactionImpl.this.synchronousPostLockPrimary(cd, pcm, mutationStatus);
                }
            });
        }
    }

    private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation pcm, ConditionalWriter.Status mutationStatus) throws AccumuloException, AccumuloSecurityException, Exception {
        block5: while (mutationStatus == ConditionalWriter.Status.UNKNOWN) {
            TxInfo txInfo = TxInfo.getTransactionInfo(this.env, cd.prow, cd.pcol, this.startTs);
            switch (txInfo.status) {
                case LOCKED: {
                    mutationStatus = ConditionalWriter.Status.ACCEPTED;
                    continue block5;
                }
                case ROLLED_BACK: {
                    mutationStatus = ConditionalWriter.Status.REJECTED;
                    continue block5;
                }
                case UNKNOWN: {
                    mutationStatus = cd.cw.write(pcm).getStatus();
                    continue block5;
                }
            }
            throw new IllegalStateException("unexpected tx state " + (Object)((Object)txInfo.status) + " " + cd.prow + " " + cd.pcol);
        }
        if (mutationStatus != ConditionalWriter.Status.ACCEPTED) {
            cd.addPrimaryToRejected();
            this.getStats().setRejected(cd.getRejected());
            this.readUnread(cd);
            if (this.checkForAckCollision(pcm)) {
                cd.commitObserver.alreadyAcknowledged();
            } else {
                cd.commitObserver.commitFailed();
            }
            return;
        }
        this.lockOtherColumns(cd);
    }

    private void lockOtherColumns(CommitData cd) {
        ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
        for (Map.Entry<Bytes, Map<Column, Bytes>> rowUpdates : this.updates.entrySet()) {
            ConditionalFlutation cm = null;
            for (Map.Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
                if (cm == null) {
                    cm = this.prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
                    continue;
                }
                this.prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
            }
            mutations.add(cm);
        }
        cd.acceptedRows = new HashSet();
        ListenableFuture<Iterator<ConditionalWriter.Result>> future = cd.bacw.apply(mutations);
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Iterator<ConditionalWriter.Result>>(cd){

            @Override
            protected void onSuccess(CommitData cd, Iterator<ConditionalWriter.Result> results) throws Exception {
                TransactionImpl.this.postLockOther(cd, results);
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void postLockOther(CommitData cd, Iterator<ConditionalWriter.Result> results) throws Exception {
        while (results.hasNext()) {
            ConditionalWriter.Result result = results.next();
            Bytes row = Bytes.of((byte[])result.getMutation().getRow());
            if (result.getStatus() == ConditionalWriter.Status.ACCEPTED) {
                cd.acceptedRows.add(row);
                continue;
            }
            cd.addToRejected(row, this.updates.get(row).keySet());
        }
        if (cd.getRejected().size() > 0) {
            this.getStats().setRejected(cd.getRejected());
            this.env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd){

                @Override
                protected void runCommitStep(CommitData cd) throws Exception {
                    TransactionImpl.this.readUnread(cd);
                    TransactionImpl.this.rollbackOtherLocks(cd);
                }
            });
        } else if (this.stopAfterPreCommit) {
            cd.commitObserver.committed();
        } else {
            ListenableFuture<Stamp> future = this.env.getSharedResources().getOracleClient().getStampAsync();
            Futures.addCallback(future, (FutureCallback)new CommitCallback<Stamp>(cd){

                @Override
                protected void onSuccess(CommitData cd, Stamp stamp) throws Exception {
                    TransactionImpl.this.beginSecondCommitPhase(cd, stamp);
                }
            }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
        }
    }

    private void rollbackOtherLocks(CommitData cd) throws Exception {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>(cd.acceptedRows.size());
        for (Bytes row : cd.acceptedRows) {
            Flutation m = new Flutation(this.env, row);
            for (Column col : this.updates.get(row).keySet()) {
                m.put(col, 0x2000000000000000L | this.startTs, DelLockValue.encodeRollback((boolean)false, (boolean)true));
            }
            mutations.add(m);
        }
        ListenableFuture<Void> future = this.env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Void>(cd){

            @Override
            protected void onSuccess(CommitData cd, Void v) throws Exception {
                TransactionImpl.this.rollbackPrimaryLock(cd);
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void rollbackPrimaryLock(CommitData cd) throws Exception {
        Flutation m = new Flutation(this.env, cd.prow);
        m.put(cd.pcol, 0x2000000000000000L | this.startTs, DelLockValue.encodeRollback((long)this.startTs, (boolean)true, (boolean)true));
        m.put(cd.pcol, 0x6000000000000000L | this.startTs, EMPTY);
        ListenableFuture<Void> future = this.env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Void>(cd){

            @Override
            protected void onSuccess(CommitData cd, Void v) throws Exception {
                cd.commitObserver.commitFailed();
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
        if (this.startTs < commitStamp.getGcTimestamp()) {
            this.rollbackOtherLocks(cd);
        } else {
            this.getStats().setCommitTs(commitStamp.getTxTimestamp());
            this.writeNotificationsAsync(cd, commitStamp.getTxTimestamp());
        }
    }

    private void writeNotificationsAsync(CommitData cd, final long commitTs) {
        HashMap<Object, Object> mutations = new HashMap<Object, Object>();
        if (this.env.getObservers().containsKey(cd.pcol) && TransactionImpl.isWrite(cd.pval) && !TransactionImpl.isDelete(cd.pval)) {
            Flutation m = new Flutation(this.env, cd.prow);
            Notification.put(this.env, m, cd.pcol, commitTs);
            mutations.put(cd.prow, (Object)m);
        }
        for (Map.Entry<Bytes, Map<Column, Bytes>> entry : this.updates.entrySet()) {
            for (Map.Entry<Column, Bytes> colUpdates : entry.getValue().entrySet()) {
                Bytes val;
                if (!this.env.getObservers().containsKey(colUpdates.getKey()) || !TransactionImpl.isWrite(val = colUpdates.getValue()) || TransactionImpl.isDelete(val)) continue;
                Mutation m = (Mutation)mutations.get(entry.getKey());
                if (m == null) {
                    m = new Flutation(this.env, entry.getKey());
                    mutations.put(entry.getKey(), m);
                }
                Notification.put(this.env, m, colUpdates.getKey(), commitTs);
            }
        }
        for (Map.Entry entry : this.weakNotifications.entrySet()) {
            Mutation m = (Mutation)mutations.get(entry.getKey());
            if (m == null) {
                m = new Flutation(this.env, (Bytes)entry.getKey());
                mutations.put(entry.getKey(), m);
            }
            for (Column col : (Set)entry.getValue()) {
                Notification.put(this.env, m, col, commitTs);
            }
        }
        ListenableFuture<Void> future = this.env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Void>(cd){

            @Override
            protected void onSuccess(CommitData cd, Void v) throws Exception {
                TransactionImpl.this.commmitPrimary(cd, commitTs);
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void commmitPrimary(CommitData cd, final long commitTs) {
        IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
        PrewriteIterator.setSnaptime((IteratorSetting)iterConf, (long)this.startTs);
        boolean isTrigger = this.isTriggerRow(cd.prow) && cd.pcol.equals((Object)this.notification.getColumn());
        Condition lockCheck = new FluoCondition(this.env, cd.pcol).setIterators(new IteratorSetting[]{iterConf}).setValue(LockValue.encode((Bytes)cd.prow, (Column)cd.pcol, (boolean)TransactionImpl.isWrite(cd.pval), (boolean)TransactionImpl.isDelete(cd.pval), (boolean)isTrigger, (Long)this.getTransactorID()));
        final ConditionalFlutation delLockMutation = new ConditionalFlutation(this.env, cd.prow, lockCheck);
        ColumnUtil.commitColumn(this.env, isTrigger, true, cd.pcol, TransactionImpl.isWrite(cd.pval), TransactionImpl.isDelete(cd.pval), this.startTs, commitTs, this.observedColumns, (Mutation)delLockMutation);
        ListenableFuture<Iterator<ConditionalWriter.Result>> future = cd.acw.apply(Collections.singletonList(delLockMutation));
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Iterator<ConditionalWriter.Result>>(cd){

            @Override
            protected void onSuccess(CommitData cd, Iterator<ConditionalWriter.Result> result) throws Exception {
                TransactionImpl.this.handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation, (ConditionalWriter.Result)Iterators.getOnlyElement(result));
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs, final ConditionalMutation delLockMutation, ConditionalWriter.Result result) throws Exception {
        final ConditionalWriter.Status mutationStatus = result.getStatus();
        if (mutationStatus == ConditionalWriter.Status.UNKNOWN) {
            SynchronousCommitTask task = new SynchronousCommitTask(cd){

                @Override
                protected void runCommitStep(CommitData cd) throws Exception {
                    ConditionalWriter.Status ms = mutationStatus;
                    block4: while (ms == ConditionalWriter.Status.UNKNOWN) {
                        TxInfo txInfo = TxInfo.getTransactionInfo(TransactionImpl.this.env, cd.prow, cd.pcol, TransactionImpl.this.startTs);
                        switch (txInfo.status) {
                            case COMMITTED: {
                                if (txInfo.commitTs != commitTs) {
                                    throw new IllegalStateException(cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
                                }
                                ms = ConditionalWriter.Status.ACCEPTED;
                                continue block4;
                            }
                            case LOCKED: {
                                ms = cd.cw.write(delLockMutation).getStatus();
                                continue block4;
                            }
                        }
                        ms = ConditionalWriter.Status.REJECTED;
                    }
                    TransactionImpl.this.postCommitPrimary(cd, commitTs, ms);
                }
            };
            this.env.getSharedResources().getSyncCommitExecutor().execute(task);
        } else {
            this.postCommitPrimary(cd, commitTs, mutationStatus);
        }
    }

    private void postCommitPrimary(CommitData cd, long commitTs, ConditionalWriter.Status mutationStatus) throws Exception {
        if (mutationStatus != ConditionalWriter.Status.ACCEPTED) {
            cd.commitObserver.commitFailed();
        } else if (this.stopAfterPrimaryCommit) {
            cd.commitObserver.committed();
        } else {
            this.deleteLocks(cd, commitTs);
        }
    }

    private void deleteLocks(CommitData cd, final long commitTs) {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>(this.updates.size() + 1);
        for (Map.Entry<Bytes, Map<Column, Bytes>> rowUpdates : this.updates.entrySet()) {
            Flutation m = new Flutation(this.env, rowUpdates.getKey());
            boolean isTriggerRow = this.isTriggerRow(rowUpdates.getKey());
            for (Map.Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
                ColumnUtil.commitColumn(this.env, isTriggerRow && colUpdates.getKey().equals((Object)this.notification.getColumn()), false, colUpdates.getKey(), TransactionImpl.isWrite(colUpdates.getValue()), TransactionImpl.isDelete(colUpdates.getValue()), this.startTs, commitTs, this.observedColumns, m);
            }
            mutations.add(m);
        }
        ListenableFuture<Void> future = this.env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
        Futures.addCallback(future, (FutureCallback)new CommitCallback<Void>(cd){

            @Override
            protected void onSuccess(CommitData cd, Void v) throws Exception {
                TransactionImpl.this.finishCommit(cd, commitTs);
            }
        }, (Executor)this.env.getSharedResources().getAsyncCommitExecutor());
    }

    @VisibleForTesting
    public boolean finishCommit(CommitData cd, Stamp commitStamp) throws TableNotFoundException, MutationsRejectedException {
        this.deleteLocks(cd, commitStamp.getTxTimestamp());
        return true;
    }

    private void finishCommit(CommitData cd, long commitTs) {
        ArrayList<Mutation> afterFlushMutations = new ArrayList<Mutation>(2);
        Flutation m = new Flutation(this.env, cd.prow);
        m.put(cd.pcol, 0x6000000000000000L | commitTs, EMPTY);
        afterFlushMutations.add(m);
        if (this.weakNotification != null) {
            afterFlushMutations.add(this.weakNotification.newDelete(this.env, this.startTs));
        }
        if (this.notification != null) {
            afterFlushMutations.add(this.notification.newDelete(this.env, this.startTs));
        }
        this.env.getSharedResources().getBatchWriter().writeMutationsAsync(afterFlushMutations);
        cd.commitObserver.committed();
    }

    public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
        return new SnapshotScanner(this.env, new SnapshotScanner.Opts(span, columns), this.startTs, this.stats);
    }

    private static abstract class SynchronousCommitTask
    implements Runnable {
        private CommitData cd;

        SynchronousCommitTask(CommitData cd) {
            this.cd = cd;
        }

        protected abstract void runCommitStep(CommitData var1) throws Exception;

        @Override
        public void run() {
            try {
                this.runCommitStep(this.cd);
            }
            catch (Exception e) {
                this.cd.commitObserver.failed(e);
            }
        }
    }

    private static abstract class CommitCallback<V>
    implements FutureCallback<V> {
        private CommitData cd;

        CommitCallback(CommitData cd) {
            this.cd = cd;
        }

        public void onSuccess(V result) {
            try {
                this.onSuccess(this.cd, result);
            }
            catch (Exception e) {
                this.cd.commitObserver.failed(e);
            }
        }

        protected abstract void onSuccess(CommitData var1, V var2) throws Exception;

        public void onFailure(Throwable t) {
            this.cd.commitObserver.failed(t);
        }
    }

    public static class CommitData {
        ConditionalWriter cw;
        private Bytes prow;
        private Column pcol;
        private Bytes pval;
        private HashSet<Bytes> acceptedRows;
        private Map<Bytes, Set<Column>> rejected = new HashMap<Bytes, Set<Column>>();
        private AsyncConditionalWriter acw;
        private AsyncConditionalWriter bacw;
        private AsyncCommitObserver commitObserver;

        private void addPrimaryToRejected() {
            this.rejected = Collections.singletonMap(this.prow, Collections.singleton(this.pcol));
        }

        private void addToRejected(Bytes row, Set<Column> columns) {
            this.rejected = new HashMap<Bytes, Set<Column>>();
            Set<Column> ret = this.rejected.put(row, columns);
            if (ret != null) {
                throw new IllegalStateException();
            }
        }

        private Map<Bytes, Set<Column>> getRejected() {
            if (this.rejected == null) {
                return Collections.emptyMap();
            }
            return this.rejected;
        }

        public String toString() {
            return this.prow + " " + this.pcol + " " + this.pval + " " + this.rejected.size();
        }
    }

    private static enum TxStatus {
        OPEN,
        COMMIT_STARTED,
        COMMITTED,
        CLOSED;

    }
}

