/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;

public class NettyShuffleMaster
implements ShuffleMaster<NettyShuffleDescriptor> {
    private final int buffersPerInputChannel;
    private final int buffersPerInputGate;
    private final int sortShuffleMinParallelism;
    private final int sortShuffleMinBuffers;
    private final int networkBufferSize;

    public NettyShuffleMaster(Configuration conf) {
        Preconditions.checkNotNull(conf);
        this.buffersPerInputChannel = conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
        this.buffersPerInputGate = conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
        this.sortShuffleMinParallelism = conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
        this.sortShuffleMinBuffers = conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
        this.networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
    }

    @Override
    public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
        ResultPartitionID resultPartitionID = new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
        NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(producerDescriptor.getProducerLocation(), NettyShuffleMaster.createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID);
        return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
    }

    @Override
    public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
    }

    private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(ProducerDescriptor producerDescriptor, int connectionIndex) {
        return producerDescriptor.getDataPort() >= 0 ? NettyShuffleDescriptor.NetworkPartitionConnectionInfo.fromProducerDescriptor(producerDescriptor, connectionIndex) : NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo.INSTANCE;
    }

    @Override
    public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
        Preconditions.checkNotNull(desc);
        int numTotalInputChannels = desc.getInputChannelNums().values().stream().mapToInt(Integer::intValue).sum();
        int numTotalInputGates = desc.getInputGateNums();
        int numRequiredNetworkBuffers = NettyShuffleUtils.computeNetworkBuffersForAnnouncing(this.buffersPerInputChannel, this.buffersPerInputGate, this.sortShuffleMinParallelism, this.sortShuffleMinBuffers, numTotalInputChannels, numTotalInputGates, desc.getSubpartitionNums(), desc.getPartitionTypes());
        return new MemorySize((long)this.networkBufferSize * (long)numRequiredNetworkBuffers);
    }
}

