/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.connector.base.sink.writer.AIMDRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
    private static final int INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE = 10;
    private static final double INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR = 0.5;
    private final MailboxExecutor mailboxExecutor;
    private final ProcessingTimeService timeService;
    private long lastSendTimestamp = 0L;
    private long ackTime = Long.MAX_VALUE;
    private final SinkWriterMetricGroup metrics;
    private final Counter numBytesSendCounter;
    private final Counter numRecordsSendCounter;
    private final AIMDRateLimitingStrategy rateLimitingStrategy;
    private final int maxBatchSize;
    private final int maxInFlightRequests;
    private final int maxBufferedRequests;
    private final long maxBatchSizeInBytes;
    private final long maxTimeInBufferMS;
    private final long maxRecordSizeInBytes;
    private final ElementConverter<InputT, RequestEntryT> elementConverter;
    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries = new ArrayDeque<RequestEntryWrapper<RequestEntryT>>();
    private int inFlightRequestsCount;
    private int inFlightMessages;
    private double bufferedRequestEntriesTotalSizeInBytes;
    private boolean existsActiveTimerCallback = false;
    private final Consumer<Exception> fatalExceptionCons;

    protected abstract void submitRequestEntries(List<RequestEntryT> var1, Consumer<List<RequestEntryT>> var2);

    protected abstract long getSizeInBytes(RequestEntryT var1);

    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
        this(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, Collections.emptyList());
    }

    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, Collection<BufferedRequestState<RequestEntryT>> states) {
        this.elementConverter = elementConverter;
        this.mailboxExecutor = context.getMailboxExecutor();
        this.timeService = context.getProcessingTimeService();
        Preconditions.checkNotNull(elementConverter);
        Preconditions.checkArgument(maxBatchSize > 0);
        Preconditions.checkArgument(maxBufferedRequests > 0);
        Preconditions.checkArgument(maxInFlightRequests > 0);
        Preconditions.checkArgument(maxBatchSizeInBytes > 0L);
        Preconditions.checkArgument(maxTimeInBufferMS > 0L);
        Preconditions.checkArgument(maxRecordSizeInBytes > 0L);
        Preconditions.checkArgument(maxBufferedRequests > maxBatchSize, "The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
        Preconditions.checkArgument(maxBatchSizeInBytes >= maxRecordSizeInBytes, "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
        this.maxBatchSize = maxBatchSize;
        this.maxInFlightRequests = maxInFlightRequests;
        this.maxBufferedRequests = maxBufferedRequests;
        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
        this.maxTimeInBufferMS = maxTimeInBufferMS;
        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
        this.inFlightRequestsCount = 0;
        this.bufferedRequestEntriesTotalSizeInBytes = 0.0;
        this.inFlightMessages = 0;
        this.rateLimitingStrategy = new AIMDRateLimitingStrategy(10, 0.5, maxBatchSize * maxInFlightRequests, maxBatchSize * maxInFlightRequests);
        this.metrics = context.metricGroup();
        this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
        this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
        this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
        this.fatalExceptionCons = exception -> this.mailboxExecutor.execute(() -> {
            throw exception;
        }, "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
        this.initializeState(states);
    }

    private void registerCallback() {
        ProcessingTimeService.ProcessingTimeCallback ptc = instant -> {
            this.existsActiveTimerCallback = false;
            while (!this.bufferedRequestEntries.isEmpty()) {
                this.flush();
            }
        };
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.maxTimeInBufferMS, ptc);
        this.existsActiveTimerCallback = true;
    }

    @Override
    public void write(InputT element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.bufferedRequestEntries.size() >= this.maxBufferedRequests) {
            this.flush();
        }
        this.addEntryToBuffer((Serializable)this.elementConverter.apply(element, context), false);
        this.nonBlockingFlush();
    }

    private void nonBlockingFlush() throws InterruptedException {
        while (!this.isInFlightRequestOrMessageLimitExceeded() && (this.bufferedRequestEntries.size() >= this.getNextBatchSizeLimit() || this.bufferedRequestEntriesTotalSizeInBytes >= (double)this.maxBatchSizeInBytes)) {
            this.flush();
        }
    }

    private boolean isInFlightRequestOrMessageLimitExceeded() {
        return this.inFlightRequestsCount >= this.maxInFlightRequests || this.inFlightMessages >= this.rateLimitingStrategy.getRateLimit();
    }

    private void flush() throws InterruptedException {
        while (this.isInFlightRequestOrMessageLimitExceeded()) {
            this.mailboxExecutor.yield();
        }
        List<RequestEntryT> batch = this.createNextAvailableBatch();
        int batchSize = batch.size();
        if (batch.size() == 0) {
            return;
        }
        long timestampOfRequest = System.currentTimeMillis();
        Consumer<List<RequestEntryT>> requestResult = failedRequestEntries -> this.mailboxExecutor.execute(() -> this.completeRequest((List<RequestEntryT>)failedRequestEntries, batchSize, timestampOfRequest), "Mark in-flight request as completed and requeue %d request entries", failedRequestEntries.size());
        ++this.inFlightRequestsCount;
        this.inFlightMessages += batchSize;
        this.submitRequestEntries(batch, requestResult);
    }

    private List<RequestEntryT> createNextAvailableBatch() {
        long requestEntrySize;
        int batchSize = Math.min(this.getNextBatchSizeLimit(), this.bufferedRequestEntries.size());
        ArrayList<RequestEntryT> batch = new ArrayList<RequestEntryT>(batchSize);
        int batchSizeBytes = 0;
        for (int i = 0; i < batchSize && (long)batchSizeBytes + (requestEntrySize = this.bufferedRequestEntries.peek().getSize()) <= this.maxBatchSizeInBytes; ++i) {
            RequestEntryWrapper<RequestEntryT> elem = this.bufferedRequestEntries.remove();
            batch.add(elem.getRequestEntry());
            this.bufferedRequestEntriesTotalSizeInBytes -= (double)requestEntrySize;
            batchSizeBytes = (int)((long)batchSizeBytes + requestEntrySize);
        }
        this.numRecordsSendCounter.inc((long)batch.size());
        this.numBytesSendCounter.inc((long)batchSizeBytes);
        return batch;
    }

    private void completeRequest(List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime) throws InterruptedException {
        this.lastSendTimestamp = requestStartTime;
        this.ackTime = System.currentTimeMillis();
        --this.inFlightRequestsCount;
        this.inFlightMessages -= batchSize;
        this.updateInFlightMessagesLimit(failedRequestEntries.size() == 0);
        ListIterator<RequestEntryT> iterator = failedRequestEntries.listIterator(failedRequestEntries.size());
        while (iterator.hasPrevious()) {
            this.addEntryToBuffer((Serializable)iterator.previous(), true);
        }
        this.nonBlockingFlush();
    }

    private void updateInFlightMessagesLimit(boolean isSuccessfulRequest) {
        if (isSuccessfulRequest) {
            this.rateLimitingStrategy.scaleUp();
        } else {
            this.rateLimitingStrategy.scaleDown();
        }
    }

    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
        RequestEntryWrapper<RequestEntryT> wrappedEntry;
        if (this.bufferedRequestEntries.isEmpty() && !this.existsActiveTimerCallback) {
            this.registerCallback();
        }
        if ((wrappedEntry = new RequestEntryWrapper<RequestEntryT>(entry, this.getSizeInBytes(entry))).getSize() > this.maxRecordSizeInBytes) {
            throw new IllegalArgumentException(String.format("The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].", wrappedEntry.getSize(), this.maxRecordSizeInBytes));
        }
        if (insertAtHead) {
            this.bufferedRequestEntries.addFirst(wrappedEntry);
        } else {
            this.bufferedRequestEntries.add(wrappedEntry);
        }
        this.bufferedRequestEntriesTotalSizeInBytes += (double)wrappedEntry.getSize();
    }

    @Override
    public void flush(boolean flush) throws InterruptedException {
        while (this.inFlightRequestsCount > 0 || this.bufferedRequestEntries.size() > 0 && flush) {
            this.yieldIfThereExistsInFlightRequests();
            if (!flush) continue;
            this.flush();
        }
    }

    private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
        if (this.inFlightRequestsCount > 0) {
            this.mailboxExecutor.yield();
        }
    }

    @Override
    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
        return Collections.singletonList(new BufferedRequestState<RequestEntryT>(this.bufferedRequestEntries));
    }

    private void initializeState(Collection<BufferedRequestState<RequestEntryT>> states) {
        for (BufferedRequestState<RequestEntryT> state : states) {
            this.initializeState(state);
        }
    }

    private void initializeState(BufferedRequestState<RequestEntryT> state) {
        this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
        for (RequestEntryWrapper<RequestEntryT> wrapper : this.bufferedRequestEntries) {
            if (wrapper.getSize() <= this.maxRecordSizeInBytes) continue;
            throw new IllegalStateException(String.format("State contains record of size %d which exceeds sink maximum record size %d.", wrapper.getSize(), this.maxRecordSizeInBytes));
        }
        this.bufferedRequestEntriesTotalSizeInBytes += (double)state.getStateSize();
    }

    @Override
    public void close() {
    }

    private int getNextBatchSizeLimit() {
        return Math.min(this.maxBatchSize, this.rateLimitingStrategy.getRateLimit());
    }

    protected Consumer<Exception> getFatalExceptionCons() {
        return this.fatalExceptionCons;
    }
}

