/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.ratelimit;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.ratelimit.QueuedRequest;
import org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker;

public abstract class ConcurrentWorker<RequestType extends QueuedRequest>
extends RateLimitedRequestWorker<RequestType> {
    private static final Logger LOG = LogManager.getLogger(ConcurrentWorker.class);
    private Semaphore permits;
    private Instant lastExecuteTime;
    private Duration executionTtl;

    public ConcurrentWorker(String queueName, long heapSizeInBytes, int singleRequestSizeInBytes, Setting<Float> maxHeapPercentForQueueSetting, ClusterService clusterService, Random random, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, String threadPoolName, Settings settings, float maxQueuedTaskRatio, Clock clock, float mediumSegmentPruneRatio, float lowSegmentPruneRatio, int maintenanceFreqConstant, Setting<Integer> concurrencySetting, Duration executionTtl, Duration stateTtl, NodeStateManager nodeStateManager, AnalysisType context) {
        super(queueName, heapSizeInBytes, singleRequestSizeInBytes, maxHeapPercentForQueueSetting, clusterService, random, circuitBreakerService, threadPool, threadPoolName, settings, maxQueuedTaskRatio, clock, mediumSegmentPruneRatio, lowSegmentPruneRatio, maintenanceFreqConstant, stateTtl, nodeStateManager, context);
        this.permits = new Semaphore((Integer)concurrencySetting.get(settings));
        clusterService.getClusterSettings().addSettingsUpdateConsumer(concurrencySetting, it -> {
            this.permits = new Semaphore((int)it);
        });
        this.lastExecuteTime = clock.instant();
        this.executionTtl = executionTtl;
    }

    @Override
    public void maintenance() {
        super.maintenance();
        if (this.lastExecuteTime.plus(this.executionTtl).isBefore(this.clock.instant()) && this.permits.availablePermits() == 0 && !this.isQueueEmpty()) {
            LOG.warn("previous execution has been running for too long.  Maybe there are bugs.");
            this.permits.release();
        }
    }

    @Override
    protected void triggerProcess() {
        this.threadPool.executor(this.threadPoolName).execute(() -> {
            if (this.permits.tryAcquire()) {
                try {
                    this.lastExecuteTime = this.clock.instant();
                    this.execute(() -> {
                        this.permits.release();
                        this.process();
                    }, () -> this.permits.release());
                }
                catch (Exception e) {
                    this.permits.release();
                    throw e;
                }
            }
        });
    }

    protected abstract void execute(Runnable var1, Runnable var2);
}

