/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.client.opensearch._helpers.bulk;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._helpers.bulk.BulkListener;
import org.opensearch.client.opensearch._helpers.bulk.FnCondition;
import org.opensearch.client.opensearch._helpers.bulk.IngesterOperation;
import org.opensearch.client.opensearch._helpers.bulk.RetryableBulkOperation;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.transport.BackoffPolicy;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.client.util.ApiTypeHelper;
import org.opensearch.client.util.ObjectBuilder;

public class BulkIngester<Context>
implements AutoCloseable {
    private static final Log logger = LogFactory.getLog(BulkIngester.class);
    private static final AtomicInteger idCounter = new AtomicInteger();
    private final OpenSearchAsyncClient client;
    @Nullable
    private final BulkRequest globalSettings;
    private final int maxRequests;
    private final long maxSize;
    private final int maxOperations;
    @Nullable
    private final BulkListener<Context> listener;
    private final Long flushIntervalMillis;
    @Nullable
    private ScheduledFuture<?> flushTask;
    @Nullable
    private ScheduledExecutorService scheduler;
    @Nullable
    private ScheduledExecutorService retryScheduler;
    private boolean isExternalScheduler = false;
    private BackoffPolicy backoffPolicy;
    private List<RetryableBulkOperation<Context>> operations = new ArrayList<RetryableBulkOperation<Context>>();
    private long currentSize;
    private int requestsInFlightCount;
    private volatile boolean isClosed = false;
    private final ReentrantLock lock = new ReentrantLock();
    private final FnCondition addCondition = new FnCondition(this.lock, this::canAddOperation);
    private final FnCondition sendRequestCondition = new FnCondition(this.lock, this::canSendRequest);
    private final FnCondition closeCondition = new FnCondition(this.lock, this::closedAndFlushed);
    private final AtomicInteger listenerInProgressCount = new AtomicInteger();
    private final AtomicInteger retriesInProgressCount = new AtomicInteger();

    private BulkIngester(Builder<Context> builder) {
        int ingesterId = idCounter.incrementAndGet();
        this.client = ApiTypeHelper.requireNonNull(((Builder)builder).client, this, "client");
        this.globalSettings = ((Builder)builder).globalSettings;
        this.maxRequests = ((Builder)builder).maxConcurrentRequests;
        this.maxSize = ((Builder)builder).bulkSize < 0L ? Long.MAX_VALUE : ((Builder)builder).bulkSize;
        this.maxOperations = ((Builder)builder).bulkOperations < 0 ? Integer.MAX_VALUE : ((Builder)builder).bulkOperations;
        this.listener = ((Builder)builder).listener;
        this.backoffPolicy = ((Builder)builder).backoffPolicy;
        this.flushIntervalMillis = ((Builder)builder).flushIntervalMillis;
        if (this.flushIntervalMillis != null || this.listener != null) {
            if (((Builder)builder).scheduler == null) {
                this.scheduler = Executors.newScheduledThreadPool(this.maxRequests + 1, r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId());
                    t.setDaemon(true);
                    return t;
                });
            } else {
                this.scheduler = ((Builder)builder).scheduler;
                this.isExternalScheduler = true;
            }
        }
        if (this.flushIntervalMillis != null) {
            long flushInterval = this.flushIntervalMillis;
            this.flushTask = this.scheduler.scheduleWithFixedDelay(this::failsafeFlush, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
        }
        if (this.backoffPolicy == null) {
            this.backoffPolicy = BackoffPolicy.noBackoff();
        } else {
            this.retryScheduler = Executors.newScheduledThreadPool(this.maxRequests + 1, r -> {
                Thread t = Executors.defaultThreadFactory().newThread(r);
                t.setName("bulk-ingester-retry#" + ingesterId + "#" + t.getId());
                t.setDaemon(true);
                return t;
            });
        }
    }

    public int maxOperations() {
        return this.maxOperations;
    }

    public long maxSize() {
        return this.maxSize;
    }

    public int maxConcurrentRequests() {
        return this.maxRequests;
    }

    public Duration flushInterval() {
        if (this.flushIntervalMillis != null) {
            return Duration.ofMillis(this.flushIntervalMillis);
        }
        return null;
    }

    public int pendingOperations() {
        List<RetryableBulkOperation<Context>> operations = this.operations;
        return operations == null ? 0 : operations.size();
    }

    public long pendingOperationsSize() {
        return this.currentSize;
    }

    public int pendingRequests() {
        return this.requestsInFlightCount;
    }

    public long operationsCount() {
        return this.addCondition.invocations();
    }

    public long operationContentionsCount() {
        return this.addCondition.contentions();
    }

    public long requestCount() {
        return this.sendRequestCondition.invocations();
    }

    public long requestContentionsCount() {
        return this.sendRequestCondition.contentions();
    }

    private boolean canSendRequest() {
        return this.requestsInFlightCount < this.maxRequests;
    }

    private boolean canAddOperation() {
        return this.currentSize < this.maxSize && this.operations.size() < this.maxOperations;
    }

    private boolean closedAndFlushed() {
        return this.isClosed && this.operations.isEmpty() && this.requestsInFlightCount == 0 && this.listenerInProgressCount.get() == 0 && this.retriesInProgressCount.get() == 0;
    }

    private BulkRequest.Builder newRequest() {
        BulkRequest.Builder result = new BulkRequest.Builder();
        if (this.globalSettings != null) {
            BulkRequest settings = this.globalSettings;
            result.index(settings.index()).pipeline(settings.pipeline()).refresh(settings.refresh()).requireAlias(settings.requireAlias()).routing(settings.routing()).sourceExcludes(settings.sourceExcludes()).sourceIncludes(settings.sourceIncludes()).source(settings.source()).timeout(settings.timeout()).waitForActiveShards(settings.waitForActiveShards());
        }
        return result;
    }

    private void failsafeFlush() {
        try {
            this.flush();
        }
        catch (Throwable thr) {
            logger.error((Object)"Error in background flush", thr);
        }
    }

    public void flush() {
        ArrayList sentRequests = new ArrayList();
        RequestExecution exec = this.sendRequestCondition.whenReadyIf(() -> !this.operations.isEmpty() && this.operations.stream().anyMatch(RetryableBulkOperation::isSendable), () -> {
            CompletableFuture<BulkResponse> result;
            ArrayList<BulkOperation> immediateOps = new ArrayList<BulkOperation>();
            ArrayList<Context> contexts = new ArrayList<Context>();
            Iterator<RetryableBulkOperation<Context>> it = this.operations.iterator();
            while (it.hasNext()) {
                RetryableBulkOperation<Context> op = it.next();
                if (!op.isSendable()) continue;
                immediateOps.add(op.operation());
                contexts.add(op.context());
                sentRequests.add(op);
                it.remove();
            }
            BulkRequest request = this.newRequest().operations(immediateOps).build();
            this.currentSize = this.operations.size();
            this.addCondition.signalIfReady();
            long id = this.sendRequestCondition.invocations();
            if (this.listener != null) {
                this.listener.beforeBulk(id, request, contexts);
            }
            try {
                result = this.client.bulk(request);
            }
            catch (IOException e) {
                result = CompletableFuture.failedFuture(e);
            }
            ++this.requestsInFlightCount;
            if (this.listener == null) {
                request = null;
            }
            return new RequestExecution(id, request, contexts, result);
        });
        if (exec != null) {
            exec.futureResponse.handle((resp, thr) -> {
                if (resp != null) {
                    List failedRequestsCanRetry = resp.items().stream().filter(i -> i.error() != null && i.status() == 429).collect(Collectors.toList());
                    if (failedRequestsCanRetry.isEmpty() || this.backoffPolicy.equals(BackoffPolicy.noBackoff())) {
                        this.listenerAfterBulkSuccess((BulkResponse)resp, exec);
                    } else {
                        ArrayList<RetryableBulkOperation<Context>> retryableReq = new ArrayList<RetryableBulkOperation<Context>>();
                        ArrayList<RetryableBulkOperation<Context>> refires = new ArrayList<RetryableBulkOperation<Context>>();
                        ArrayList<BulkResponseItem> retryableResp = new ArrayList<BulkResponseItem>();
                        for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
                            int index = resp.items().indexOf(bulkItemResponse);
                            this.selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, retryableReq, refires);
                        }
                        if (!refires.isEmpty()) {
                            this.scheduleRetries(refires);
                        }
                        retryableReq.forEach(sentRequests::remove);
                        if (!sentRequests.isEmpty() && this.listener != null) {
                            ArrayList<BulkOperation> partialOps = new ArrayList<BulkOperation>();
                            ArrayList partialCtx = new ArrayList();
                            for (RetryableBulkOperation op : sentRequests) {
                                partialOps.add(op.operation());
                                partialCtx.add(op.context());
                            }
                            BulkRequest partialRequest = this.newRequest().operations(partialOps).build();
                            List partialItems = resp.items().stream().filter(i -> !retryableResp.contains(i)).collect(Collectors.toList());
                            BulkResponse partialResp = BulkResponse.of((BulkResponse.Builder br) -> br.items(partialItems).errors(resp.errors()).took(resp.took()).ingestTook(resp.ingestTook()));
                            this.listenerInProgressCount.incrementAndGet();
                            this.scheduler.submit(() -> {
                                try {
                                    this.listener.afterBulk(exec.id, partialRequest, partialCtx, partialResp);
                                }
                                finally {
                                    if (this.listenerInProgressCount.decrementAndGet() == 0) {
                                        this.closeCondition.signalIfReady();
                                    }
                                }
                            });
                        }
                    }
                } else {
                    this.listenerAfterBulkException((Throwable)thr, exec);
                }
                this.sendRequestCondition.signalIfReadyAfter(() -> {
                    --this.requestsInFlightCount;
                    this.closeCondition.signalAllIfReady();
                });
                return null;
            });
        }
    }

    private void selectingRetries(int index, BulkResponseItem bulkItemResponse, List<RetryableBulkOperation<Context>> sentRequests, List<BulkResponseItem> retryableResp, List<RetryableBulkOperation<Context>> retryableReq, List<RetryableBulkOperation<Context>> refires) {
        RetryableBulkOperation<Context> original = sentRequests.get(index);
        if (original.canRetry()) {
            retryableResp.add(bulkItemResponse);
            Iterator<Long> retryTimes = Optional.ofNullable(original.retries()).orElse(this.backoffPolicy.iterator());
            RetryableBulkOperation<Context> refire = new RetryableBulkOperation<Context>(original.operation(), original.context(), retryTimes);
            retryableReq.add(original);
            refires.add(refire);
            this.addRetry(refire);
            logger.warn((Object)("Added failed request back in queue, retrying in : " + refire.currentRetryTimeDelay() + " ms"));
        } else {
            logger.warn((Object)("Retries finished for request: " + original.operation()._kind().toString()));
        }
    }

    private void listenerAfterBulkException(Throwable thr, RequestExecution<Context> exec) {
        if (this.listener != null) {
            this.listenerInProgressCount.incrementAndGet();
            this.scheduler.submit(() -> {
                try {
                    this.listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
                }
                finally {
                    if (this.listenerInProgressCount.decrementAndGet() == 0) {
                        this.closeCondition.signalIfReady();
                    }
                }
            });
        }
    }

    private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution<Context> exec) {
        if (this.listener != null) {
            this.listenerInProgressCount.incrementAndGet();
            this.scheduler.submit(() -> {
                try {
                    this.listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
                }
                finally {
                    if (this.listenerInProgressCount.decrementAndGet() == 0) {
                        this.closeCondition.signalIfReady();
                    }
                }
            });
        }
    }

    private void scheduleRetries(List<RetryableBulkOperation<Context>> retryableReq) {
        LongSummaryStatistics statsDelays = retryableReq.stream().map(RetryableBulkOperation::currentRetryTimeDelay).mapToLong(Long::longValue).summaryStatistics();
        this.retryScheduler.schedule(this::flush, statsDelays.getMin(), TimeUnit.MILLISECONDS);
        this.retryScheduler.schedule(this::flush, statsDelays.getMax(), TimeUnit.MILLISECONDS);
    }

    public void add(BulkOperation operation, Context context) {
        if (this.isClosed) {
            throw new IllegalStateException("Ingester has been closed");
        }
        RetryableBulkOperation<Context> repeatableOp = new RetryableBulkOperation<Context>(operation, context, null);
        this.innerAdd(repeatableOp);
    }

    private void addRetry(RetryableBulkOperation<Context> repeatableOp) {
        this.retriesInProgressCount.incrementAndGet();
        this.retryScheduler.submit(() -> {
            try {
                this.innerAdd(repeatableOp);
            }
            finally {
                if (this.retriesInProgressCount.decrementAndGet() == 0) {
                    this.closeCondition.signalIfReady();
                }
            }
        });
    }

    private void innerAdd(RetryableBulkOperation<Context> repeatableOp) {
        IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, ((OpenSearchTransport)this.client._transport()).jsonpMapper());
        this.addCondition.whenReady(() -> {
            this.operations.add(ingestOp.repeatableOperation());
            this.currentSize += ingestOp.size();
            if (!this.canAddOperation()) {
                this.flush();
            } else {
                this.addCondition.signalIfReady();
            }
        });
    }

    public void add(BulkOperation operation) {
        this.add(operation, null);
    }

    public void add(Function<BulkOperation.Builder, ObjectBuilder<BulkOperation>> f) {
        this.add(f.apply(new BulkOperation.Builder()).build(), null);
    }

    public void add(Function<BulkOperation.Builder, ObjectBuilder<BulkOperation>> f, Context context) {
        this.add(f.apply(new BulkOperation.Builder()).build(), context);
    }

    @Override
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        this.flush();
        this.closeCondition.whenReady(() -> {});
        if (this.flushTask != null) {
            this.flushTask.cancel(false);
        }
        if (this.scheduler != null && !this.isExternalScheduler) {
            this.scheduler.shutdownNow();
        }
        if (this.retryScheduler != null) {
            this.retryScheduler.shutdownNow();
        }
    }

    public static <Context> BulkIngester<Context> of(Function<Builder<Context>, Builder<Context>> f) {
        return f.apply(new Builder()).build();
    }

    public static class Builder<Context>
    implements ObjectBuilder<BulkIngester<Context>> {
        private OpenSearchAsyncClient client;
        private BulkRequest globalSettings;
        private int bulkOperations = 1000;
        private long bulkSize = 0x500000L;
        private int maxConcurrentRequests = 1;
        private Long flushIntervalMillis;
        private BulkListener<Context> listener;
        private ScheduledExecutorService scheduler;
        private BackoffPolicy backoffPolicy;

        public Builder<Context> client(OpenSearchAsyncClient client) {
            this.client = client;
            return this;
        }

        public Builder<Context> client(OpenSearchClient client) {
            TransportOptions options = client._transportOptions();
            if (options == ((OpenSearchTransport)client._transport()).options()) {
                options = null;
            }
            return this.client(new OpenSearchAsyncClient((OpenSearchTransport)client._transport(), options));
        }

        public Builder<Context> maxOperations(int count) {
            if (count < -1) {
                throw new IllegalArgumentException("Max operations should be at least -1");
            }
            this.bulkOperations = count;
            return this;
        }

        public Builder<Context> maxSize(long bytes) {
            if (bytes < -1L) {
                throw new IllegalArgumentException("Max size should be at least -1");
            }
            this.bulkSize = bytes;
            return this;
        }

        public Builder<Context> maxConcurrentRequests(int max) {
            if (max < 1) {
                throw new IllegalArgumentException("Max concurrent request should be at least 1");
            }
            this.maxConcurrentRequests = max;
            return this;
        }

        public Builder<Context> flushInterval(long value, TimeUnit unit) {
            if (value < 0L) {
                throw new IllegalArgumentException("Duration should be positive");
            }
            this.flushIntervalMillis = unit.toMillis(value);
            return this;
        }

        public Builder<Context> scheduler(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        public Builder<Context> listener(BulkListener<Context> listener) {
            this.listener = listener;
            return this;
        }

        public Builder<Context> backoffPolicy(BackoffPolicy backoffPolicy) {
            this.backoffPolicy = backoffPolicy;
            return this;
        }

        public Builder<Context> globalSettings(BulkRequest.Builder settings) {
            this.globalSettings = settings != null ? settings.operations(Collections.emptyList()).build() : null;
            return this;
        }

        public Builder<Context> globalSettings(Function<BulkRequest.Builder, BulkRequest.Builder> fn) {
            return this.globalSettings(fn.apply(new BulkRequest.Builder()));
        }

        @Override
        public BulkIngester<Context> build() {
            boolean hasCriteria;
            boolean bl = hasCriteria = this.bulkOperations >= 0 || this.bulkSize >= 0L || this.flushIntervalMillis != null;
            if (!hasCriteria) {
                throw new IllegalStateException("No bulk operation chunking criteria have been set.");
            }
            return new BulkIngester(this);
        }
    }

    private static class RequestExecution<Context> {
        public final long id;
        public final BulkRequest request;
        public final List<Context> contexts;
        public final CompletionStage<BulkResponse> futureResponse;

        RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
            this.id = id;
            this.request = request;
            this.contexts = contexts;
            this.futureResponse = futureResponse;
        }
    }
}

