/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.worker;

import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.async.CommitManager;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.log.TracingTransaction;
import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.apache.fluo.core.worker.Observers;
import org.apache.fluo.core.worker.TxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkTaskAsync
implements Runnable {
    private static Logger log = LoggerFactory.getLogger(WorkTaskAsync.class);
    private Environment env;
    private Notification notification;
    private Observers observers;
    private NotificationFinder notificationFinder;
    private NotificationProcessor notificationProcessor;

    WorkTaskAsync(NotificationProcessor notificationProcessor, NotificationFinder notificationFinder, Environment env, Notification notification, Observers observers) {
        this.notificationProcessor = notificationProcessor;
        this.notificationFinder = notificationFinder;
        this.env = env;
        this.notification = notification;
        this.observers = observers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Observer observer = this.observers.getObserver(this.notification.getColumn());
        try {
            AbstractTransactionBase atx = new TransactionImpl(this.env, this.notification);
            if (TracingTransaction.isTracingEnabled()) {
                atx = new TracingTransaction((AsyncTransaction)atx, this.notification, observer.getClass());
            }
            observer.process((TransactionBase)atx, this.notification.getRow(), this.notification.getColumn());
            CommitManager commitManager = this.env.getSharedResources().getCommitManager();
            commitManager.beginCommit((AsyncTransaction)atx, observer.getClass(), new WorkTaskCommitObserver());
        }
        catch (Exception e) {
            log.error("Failed to process work " + Hex.encNonAscii(this.notification), (Throwable)e);
        }
        finally {
            this.observers.returnObserver(observer);
        }
    }

    class WorkTaskCommitObserver
    implements AsyncCommitObserver {
        WorkTaskCommitObserver() {
        }

        @Override
        public void committed() {
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
        }

        @Override
        public void failed(Throwable t) {
            WorkTaskAsync.this.notificationFinder.failedToProcess(WorkTaskAsync.this.notification, TxResult.ERROR);
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
            log.error("Failed to process work " + Hex.encNonAscii(WorkTaskAsync.this.notification), t);
        }

        @Override
        public void alreadyAcknowledged() {
            WorkTaskAsync.this.notificationFinder.failedToProcess(WorkTaskAsync.this.notification, TxResult.AACKED);
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
        }

        @Override
        public void commitFailed() {
            WorkTaskAsync.this.notificationProcessor.requeueNotification(WorkTaskAsync.this.notificationFinder, WorkTaskAsync.this.notification);
        }
    }
}

