/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupPartitioner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot;

public class HeapPriorityQueueSnapshotRestoreWrapper<T extends HeapPriorityQueueElement>
implements StateSnapshotRestore {
    @Nonnull
    private final HeapPriorityQueueSet<T> priorityQueue;
    @Nonnull
    private final KeyExtractorFunction<T> keyExtractorFunction;
    @Nonnull
    private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
    @Nonnull
    private final KeyGroupRange localKeyGroupRange;
    @Nonnegative
    private final int totalKeyGroups;

    public HeapPriorityQueueSnapshotRestoreWrapper(@Nonnull HeapPriorityQueueSet<T> priorityQueue, @Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo, @Nonnull KeyExtractorFunction<T> keyExtractorFunction, @Nonnull KeyGroupRange localKeyGroupRange, int totalKeyGroups) {
        this.priorityQueue = priorityQueue;
        this.keyExtractorFunction = keyExtractorFunction;
        this.metaInfo = metaInfo;
        this.localKeyGroupRange = localKeyGroupRange;
        this.totalKeyGroups = totalKeyGroups;
    }

    @Override
    @Nonnull
    public HeapPriorityQueueStateSnapshot<T> stateSnapshot() {
        HeapPriorityQueueElement[] queueDump = this.priorityQueue.toArray(new HeapPriorityQueueElement[this.priorityQueue.size()]);
        return new HeapPriorityQueueStateSnapshot<HeapPriorityQueueElement>(queueDump, this.keyExtractorFunction, this.metaInfo.deepCopy(), this.localKeyGroupRange, this.totalKeyGroups);
    }

    @Override
    @Nonnull
    public StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint) {
        TypeSerializer<T> elementSerializer = this.metaInfo.getElementSerializer();
        return KeyGroupPartitioner.createKeyGroupPartitionReader(elementSerializer::deserialize, (element, keyGroupId) -> this.priorityQueue.add((HeapPriorityQueueElement)element));
    }

    @Nonnull
    public HeapPriorityQueueSet<T> getPriorityQueue() {
        return this.priorityQueue;
    }

    @Nonnull
    public RegisteredPriorityQueueStateBackendMetaInfo<T> getMetaInfo() {
        return this.metaInfo;
    }

    public HeapPriorityQueueSnapshotRestoreWrapper<T> forUpdatedSerializer(@Nonnull TypeSerializer<T> updatedSerializer) {
        RegisteredPriorityQueueStateBackendMetaInfo<T> updatedMetaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<T>(this.metaInfo.getName(), updatedSerializer);
        return new HeapPriorityQueueSnapshotRestoreWrapper<T>(this.priorityQueue, updatedMetaInfo, this.keyExtractorFunction, this.localKeyGroupRange, this.totalKeyGroups);
    }
}

