package org.apache.flink.connector.rocketmq.sink;

import com.google.common.util.concurrent.MoreExecutors;
import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.sink.committer.SendCommittable;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.class */
public class InnerProducerImpl implements InnerProducer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InnerProducerImpl.class);
    private final Configuration configuration;
    private final TransactionMQProducer producer;
    private MQClientInstance mqClientInstance;
    private final String endPoints;
    private final String groupId;

    public InnerProducerImpl(final Configuration configuration) {
        this.configuration = configuration;
        this.groupId = configuration.getString(RocketMQSinkOptions.PRODUCER_GROUP);
        this.endPoints = configuration.getString(RocketMQSinkOptions.ENDPOINTS);
        String string = configuration.getString(RocketMQSinkOptions.OPTIONAL_ACCESS_KEY);
        String string2 = configuration.getString(RocketMQSinkOptions.OPTIONAL_SECRET_KEY);
        if (StringUtils.isNullOrWhitespaceOnly(string) || StringUtils.isNullOrWhitespaceOnly(string2)) {
            this.producer = new TransactionMQProducer(this.groupId);
        } else {
            this.producer = new TransactionMQProducer(this.groupId, new AclClientRPCHook(new SessionCredentials(string, string2)));
        }
        this.producer.setNamesrvAddr(this.endPoints);
        this.producer.setVipChannelEnabled(false);
        this.producer.setInstanceName(String.join(UtilAll.SEPARATOR, ManagementFactory.getRuntimeMXBean().getName(), this.groupId, UUID.randomUUID().toString()));
        int integer = configuration.getInteger(RocketMQSinkOptions.EXECUTOR_NUM);
        this.producer.setExecutorService(new ThreadPoolExecutor(integer, integer, 100L, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(this.groupId);
            return thread;
        }));
        this.producer.setTransactionListener(new TransactionListener() { // from class: org.apache.flink.connector.rocketmq.sink.InnerProducerImpl.1
            @Override // org.apache.rocketmq.client.producer.TransactionListener
            public LocalTransactionState executeLocalTransaction(Message message, Object obj) {
                return LocalTransactionState.UNKNOW;
            }

            @Override // org.apache.rocketmq.client.producer.TransactionListener
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                if (System.currentTimeMillis() - messageExt.getBornTimestamp() > ((Long) configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT)).longValue()) {
                    InnerProducerImpl.LOG.info("Exceeded the transaction maximum time, return rollback. topic={}, msgId={}", messageExt.getTopic(), messageExt.getTransactionId());
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                InnerProducerImpl.LOG.info("Not exceeded the transaction maximum time, return unknown. topic={}, msgId={}", messageExt.getTopic(), messageExt.getTransactionId());
                return LocalTransactionState.UNKNOW;
            }
        });
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public void start() {
        try {
            this.producer.start();
            this.mqClientInstance = this.producer.getDefaultMQProducerImpl().getMqClientFactory();
            LOG.info("RocketMQ producer in flink sink writer init success, endpoint={}, groupId={}, clientId={}", this.endPoints, this.groupId, this.producer.getInstanceName());
        } catch (MQClientException e) {
            LOG.error("RocketMQ producer in flink sink writer init failed", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public String getProducerGroup() {
        return this.groupId;
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public CompletableFuture<SendResult> send(Message message) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                SendResult send = this.producer.send(message);
                LOG.info("Send message successfully, topic={}, messageId={}", message.getTopic(), send.getMsgId());
                return send;
            } catch (Exception e) {
                LOG.error("Failed to send message, topic={}", message.getTopic(), e);
                throw new RuntimeException(e);
            }
        }, MoreExecutors.directExecutor());
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public CompletableFuture<SendResult> sendMessageInTransaction(Message message) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                message.setTopic(NamespaceUtil.wrapNamespace(this.producer.getNamespace(), message.getTopic()));
                if (message.getDelayTimeLevel() != 0) {
                    MessageAccessor.clearProperty(message, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                }
                message.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, String.valueOf(((Long) this.configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT)).longValue()));
                MessageAccessor.putProperty(message, MessageConst.PROPERTY_TRANSACTION_PREPARED, Boolean.TRUE.toString().toLowerCase());
                MessageAccessor.putProperty(message, MessageConst.PROPERTY_PRODUCER_GROUP, this.groupId);
                SendResult send = this.producer.send(message);
                if (SendStatus.SEND_OK.equals(send.getSendStatus())) {
                    LOG.info("Send transaction message successfully, topic={}, transId={}", message.getTopic(), send.getTransactionId());
                } else {
                    LOG.warn("Failed to send message, topic={}, message={}", message.getTopic(), message);
                }
                return send;
            } catch (Exception e) {
                LOG.error("Failed to send message, topic={}", message.getTopic(), e);
                throw new RuntimeException(e);
            }
        }, MoreExecutors.directExecutor());
    }

    public void endTransaction(SendCommittable sendCommittable, TransactionResult transactionResult) {
        try {
            String brokerNameFromMessageQueue = this.mqClientInstance.getBrokerNameFromMessageQueue(this.producer.queueWithNamespace(sendCommittable.getMessageQueue()));
            String findBrokerAddressInPublish = this.mqClientInstance.findBrokerAddressInPublish(brokerNameFromMessageQueue);
            EndTransactionRequestHeader endTransactionRequestHeader = new EndTransactionRequestHeader();
            endTransactionRequestHeader.setTransactionId(sendCommittable.getTransactionId());
            endTransactionRequestHeader.setCommitLogOffset(Long.valueOf(sendCommittable.getMessageOffset()));
            endTransactionRequestHeader.setBname(brokerNameFromMessageQueue);
            endTransactionRequestHeader.setProducerGroup(this.groupId);
            endTransactionRequestHeader.setTranStateTableOffset(sendCommittable.getQueueOffset());
            endTransactionRequestHeader.setFromTransactionCheck(true);
            endTransactionRequestHeader.setMsgId(sendCommittable.getMsgId());
            switch (transactionResult) {
                case COMMIT:
                    endTransactionRequestHeader.setCommitOrRollback(8);
                    break;
                case ROLLBACK:
                    endTransactionRequestHeader.setCommitOrRollback(12);
                    break;
                case UNKNOWN:
                    endTransactionRequestHeader.setCommitOrRollback(0);
                    break;
            }
            if (sendCommittable.getMessageOffset() != -1) {
                endTransaction(findBrokerAddressInPublish, endTransactionRequestHeader, "", this.producer.getSendMsgTimeout());
            } else {
                LOG.error("Convert message physical offset error, msgId={}", sendCommittable.getMsgId());
            }
        } catch (Exception e) {
            LOG.error("Try end transaction error", (Throwable) e);
        }
    }

    public void endTransaction(String str, EndTransactionRequestHeader endTransactionRequestHeader, String str2, long j) throws RemotingException, InterruptedException {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(37, endTransactionRequestHeader);
        createRequestCommand.setRemark(str2);
        this.mqClientInstance.getMQClientAPIImpl().getRemotingClient().invokeSync(str, createRequestCommand, j);
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public CompletableFuture<Void> commit(SendCommittable sendCommittable) {
        return CompletableFuture.runAsync(() -> {
            endTransaction(sendCommittable, TransactionResult.COMMIT);
        }, this.producer.getExecutorService());
    }

    @Override // org.apache.flink.connector.rocketmq.sink.InnerProducer
    public CompletableFuture<Void> rollback(SendCommittable sendCommittable) {
        return CompletableFuture.runAsync(() -> {
            endTransaction(sendCommittable, TransactionResult.ROLLBACK);
        }, this.producer.getExecutorService());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.producer != null) {
            String instanceName = this.producer.getInstanceName();
            this.producer.shutdown();
            this.mqClientInstance = null;
            LOG.info("RocketMQ producer has shutdown, groupId={}, clientId={}", this.groupId, instanceName);
        }
    }
}
