/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.index;

import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.SegmentReplicationStats;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.threadpool.ThreadPool;

public class SegmentReplicationPressureService
implements Closeable {
    private volatile boolean isSegmentReplicationBackpressureEnabled;
    private volatile int maxCheckpointsBehind;
    private volatile double maxAllowedStaleReplicas;
    private volatile TimeValue replicationTimeLimitBackpressure;
    private volatile TimeValue replicationTimeLimitFailReplica;
    private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class);
    public static final Setting<Boolean> SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED = Setting.boolSetting("segrep.pressure.enabled", false, Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final Setting<Integer> MAX_INDEXING_CHECKPOINTS = Setting.intSetting("segrep.pressure.checkpoint.limit", 30, 1, Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting("segrep.pressure.time.limit", TimeValue.timeValueMinutes((long)5L), Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting("segrep.replication.time.limit", TimeValue.timeValueMinutes((long)0L), Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final Setting<Double> MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting("segrep.pressure.replica.stale.limit", 0.5, 0.0, 1.0, Setting.Property.Dynamic, Setting.Property.NodeScope);
    private final IndicesService indicesService;
    private final ThreadPool threadPool;
    private final SegmentReplicationStatsTracker tracker;
    private final ShardStateAction shardStateAction;
    private volatile AsyncFailStaleReplicaTask failStaleReplicaTask;

    @Inject
    public SegmentReplicationPressureService(Settings settings, ClusterService clusterService, IndicesService indicesService, ShardStateAction shardStateAction, SegmentReplicationStatsTracker tracker, ThreadPool threadPool) {
        this.indicesService = indicesService;
        this.tracker = tracker;
        this.shardStateAction = shardStateAction;
        this.threadPool = threadPool;
        ClusterSettings clusterSettings = clusterService.getClusterSettings();
        this.isSegmentReplicationBackpressureEnabled = SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.get(settings);
        clusterSettings.addSettingsUpdateConsumer(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED, this::setSegmentReplicationBackpressureEnabled);
        this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings);
        clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind);
        this.replicationTimeLimitBackpressure = MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.get(settings);
        clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING, this::setReplicationTimeLimitBackpressure);
        this.replicationTimeLimitFailReplica = MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.get(settings);
        clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, this::setReplicationTimeLimitFailReplica);
        this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
        clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);
        this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this);
    }

    AsyncFailStaleReplicaTask getFailStaleReplicaTask() {
        return this.failStaleReplicaTask;
    }

    public void isSegrepLimitBreached(ShardId shardId) {
        IndexService indexService = this.indicesService.indexService(shardId.getIndex());
        if (indexService != null) {
            IndexShard shard = indexService.getShard(shardId.id());
            if (this.isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabledOrRemoteNode() && shard.routingEntry().primary()) {
                this.validateReplicationGroup(shard);
            }
        }
    }

    private void validateReplicationGroup(IndexShard shard) {
        double maxStaleLimit;
        float percentStale;
        Set<SegmentReplicationShardStats> replicaStats = shard.getReplicationStatsForTrackedReplicas();
        Set<SegmentReplicationShardStats> staleReplicas = this.getStaleReplicas(replicaStats);
        if (!staleReplicas.isEmpty() && (double)(percentStale = (float)staleReplicas.size() * 100.0f / (float)(shard.getReplicationGroup().getInSyncAllocationIds().size() - 1)) >= (maxStaleLimit = this.maxAllowedStaleReplicas * 100.0)) {
            this.tracker.incrementRejectionCount(shard.shardId());
            logger.warn("Rejecting write requests for shard, stale shards [{}%] shards: {}", (Object)Float.valueOf(percentStale), staleReplicas);
            throw new OpenSearchRejectedExecutionException("rejected execution on primary shard: " + String.valueOf(shard.shardId()) + " Stale Replicas: " + String.valueOf(staleReplicas) + "]", false);
        }
    }

    private Set<SegmentReplicationShardStats> getStaleReplicas(Set<SegmentReplicationShardStats> replicas) {
        return replicas.stream().filter(entry -> entry.getCheckpointsBehindCount() > (long)this.maxCheckpointsBehind).filter(entry -> entry.getCurrentReplicationTimeMillis() > this.replicationTimeLimitBackpressure.millis()).collect(Collectors.toSet());
    }

    public SegmentReplicationStats nodeStats() {
        return this.tracker.getStats();
    }

    public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) {
        return this.tracker.getStatsForShard(indexShard);
    }

    public boolean isSegmentReplicationBackpressureEnabled() {
        return this.isSegmentReplicationBackpressureEnabled;
    }

    public void setSegmentReplicationBackpressureEnabled(boolean segmentReplicationBackpressureEnabled) {
        this.isSegmentReplicationBackpressureEnabled = segmentReplicationBackpressureEnabled;
    }

    public void setMaxCheckpointsBehind(int maxCheckpointsBehind) {
        this.maxCheckpointsBehind = maxCheckpointsBehind;
    }

    public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) {
        this.maxAllowedStaleReplicas = maxAllowedStaleReplicas;
    }

    public void setReplicationTimeLimitFailReplica(TimeValue replicationTimeLimitFailReplica) {
        this.replicationTimeLimitFailReplica = replicationTimeLimitFailReplica;
        this.updateAsyncFailReplicaTask();
    }

    private synchronized void updateAsyncFailReplicaTask() {
        try {
            this.failStaleReplicaTask.close();
        }
        finally {
            this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this);
        }
    }

    public void setReplicationTimeLimitBackpressure(TimeValue replicationTimeLimitBackpressure) {
        this.replicationTimeLimitBackpressure = replicationTimeLimitBackpressure;
    }

    @Override
    public void close() throws IOException {
        this.failStaleReplicaTask.close();
    }

    boolean shouldScheduleAsyncFailTask() {
        return !TimeValue.ZERO.equals((Object)this.replicationTimeLimitFailReplica);
    }

    static final class AsyncFailStaleReplicaTask
    extends AbstractAsyncTask {
        final SegmentReplicationPressureService pressureService;
        static final TimeValue INTERVAL = TimeValue.timeValueSeconds((long)30L);

        AsyncFailStaleReplicaTask(SegmentReplicationPressureService pressureService) {
            super(logger, pressureService.threadPool, INTERVAL, true);
            this.pressureService = pressureService;
            this.rescheduleIfNecessary();
        }

        @Override
        protected boolean mustReschedule() {
            return this.pressureService.shouldScheduleAsyncFailTask();
        }

        @Override
        protected void runInternal() {
            if (this.pressureService.shouldScheduleAsyncFailTask()) {
                SegmentReplicationStats stats = this.pressureService.tracker.getStats();
                stats.getShardStats().entrySet().stream().flatMap(entry -> this.pressureService.getStaleReplicas(((SegmentReplicationPerGroupStats)entry.getValue()).getReplicaStats()).stream().map(r -> Tuple.tuple((Object)((ShardId)entry.getKey()), (Object)r.getCurrentReplicationTimeMillis()))).max(Comparator.comparingLong(Tuple::v2)).map(Tuple::v1).ifPresent(shardId -> {
                    Set<SegmentReplicationShardStats> staleReplicas = this.pressureService.getStaleReplicas(stats.getShardStats().get(shardId).getReplicaStats());
                    IndexService indexService = this.pressureService.indicesService.indexService(shardId.getIndex());
                    if (indexService.getIndexSettings() != null && !indexService.getIndexSettings().isSegRepEnabledOrRemoteNode()) {
                        return;
                    }
                    IndexShard primaryShard = indexService.getShard(shardId.getId());
                    for (final SegmentReplicationShardStats staleReplica : staleReplicas) {
                        if (staleReplica.getCurrentReplicationTimeMillis() <= this.pressureService.replicationTimeLimitFailReplica.millis()) continue;
                        this.pressureService.shardStateAction.remoteShardFailed((ShardId)shardId, staleReplica.getAllocationId(), primaryShard.getOperationPrimaryTerm(), true, "replica too far behind primary, marking as stale", null, new ActionListener<Void>(this){

                            public void onResponse(Void unused) {
                                logger.trace("Successfully failed remote shardId [{}] allocation id [{}]", (Object)shardId, (Object)staleReplica.getAllocationId());
                            }

                            public void onFailure(Exception e) {
                                logger.error("Failed to send remote shard failure", (Throwable)e);
                            }
                        });
                    }
                });
            }
        }

        @Override
        protected String getThreadPool() {
            return "generic";
        }

        public String toString() {
            return "fail_stale_replica";
        }
    }
}

