package org.apache.flink.connector.rocketmq.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.rocketmq.legacy.common.selector.MessageQueueSelector;
import org.apache.flink.connector.rocketmq.sink.InnerProducer;
import org.apache.flink.connector.rocketmq.sink.InnerProducerImpl;
import org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions;
import org.apache.flink.connector.rocketmq.sink.committer.SendCommittable;
import org.apache.flink.connector.rocketmq.sink.writer.context.RocketMQSinkContext;
import org.apache.flink.connector.rocketmq.sink.writer.context.RocketMQSinkContextImpl;
import org.apache.flink.connector.rocketmq.sink.writer.serializer.RocketMQSerializationSchema;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/sink/writer/RocketMQWriter.class */
public class RocketMQWriter<IN> implements TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, SendCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RocketMQWriter.class);
    private static final String RMQ_PRODUCER_METRIC_NAME = "RocketMQProducer";
    private static final long METRIC_UPDATE_INTERVAL_MILLIS = 500;
    private static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private static final String KEY_REGISTER_METRICS = "register.producer.metrics";
    private static final String ROCKETMQ_PRODUCER_METRICS = "producer-metrics";
    private final transient InnerProducer producer;
    private final DeliveryGuarantee deliveryGuarantee;
    private final MessageQueueSelector messageQueueSelector;
    private final RocketMQSinkContext rocketmqSinkContext;
    private final RocketMQSerializationSchema<IN> serializationSchema;
    private final Map<String, SendResult> sendResultMap = new ConcurrentHashMap();

    public RocketMQWriter(Configuration configuration, MessageQueueSelector messageQueueSelector, RocketMQSerializationSchema<IN> rocketMQSerializationSchema, Sink.InitContext initContext) {
        this.deliveryGuarantee = DeliveryGuarantee.valueOf(configuration.getString(RocketMQSinkOptions.DELIVERY_GUARANTEE));
        this.messageQueueSelector = messageQueueSelector;
        this.serializationSchema = rocketMQSerializationSchema;
        this.rocketmqSinkContext = new RocketMQSinkContextImpl(initContext, configuration);
        this.producer = new InnerProducerImpl(configuration);
        this.producer.start();
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        try {
            Message serialize = this.serializationSchema.serialize(in, this.rocketmqSinkContext, Long.valueOf(System.currentTimeMillis()));
            if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
                this.producer.sendMessageInTransaction(serialize).whenComplete((sendResult, th) -> {
                    this.sendResultMap.put(sendResult.getTransactionId(), sendResult);
                });
            } else {
                this.producer.send(serialize).whenComplete((sendResult2, th2) -> {
                    this.sendResultMap.put(sendResult2.getTransactionId(), sendResult2);
                });
            }
        } catch (Exception e) {
            LOG.error("Send message error", (Throwable) e);
            throw new IOException(e);
        }
    }

    public void flush(boolean z) throws IOException, InterruptedException {
    }

    public Collection<SendCommittable> prepareCommit() throws IOException, InterruptedException {
        LOG.info("Prepare commit");
        if (this.deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<SendResult> it = this.sendResultMap.values().iterator();
        while (it.hasNext()) {
            arrayList.add(new SendCommittable(it.next()));
        }
        LOG.info("Committable size={}.", Integer.valueOf(arrayList.size()));
        this.sendResultMap.clear();
        return arrayList;
    }

    public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {
        super.writeWatermark(watermark);
    }

    public void close() throws Exception {
    }
}
