/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.KeyGroupPartitioner;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.heap.StateTable;

@Internal
public class StateTableByKeyGroupReaders {
    public static <K, N, S> StateSnapshotKeyGroupReader readerForVersion(StateTable<K, N, S> stateTable, int version) {
        switch (version) {
            case 1: {
                return new StateTableByKeyGroupReaderV1<K, N, S>(stateTable);
            }
            case 2: 
            case 3: 
            case 4: 
            case 5: 
            case 6: {
                return StateTableByKeyGroupReaders.createV2PlusReader(stateTable);
            }
        }
        throw new IllegalArgumentException("Unknown version: " + version);
    }

    private static <K, N, S> StateSnapshotKeyGroupReader createV2PlusReader(StateTable<K, N, S> stateTable) {
        TypeSerializer namespaceSerializer = stateTable.getNamespaceSerializer();
        TypeSerializer stateSerializer = stateTable.getStateSerializer();
        TypeSerializer keySerializer = stateTable.keySerializer;
        Tuple3 buffer = new Tuple3();
        return KeyGroupPartitioner.createKeyGroupPartitionReader(in -> {
            buffer.f0 = namespaceSerializer.deserialize(in);
            buffer.f1 = keySerializer.deserialize(in);
            buffer.f2 = stateSerializer.deserialize(in);
            return buffer;
        }, (element, keyGroupId1) -> stateTable.put(element.f1, keyGroupId1, element.f0, element.f2));
    }

    static final class StateTableByKeyGroupReaderV1<K, N, S>
    implements StateSnapshotKeyGroupReader {
        protected final StateTable<K, N, S> stateTable;
        protected final TypeSerializer<K> keySerializer;

        StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
            this.stateTable = stateTable;
            this.keySerializer = stateTable.keySerializer;
        }

        @Override
        public void readMappingsInKeyGroup(@Nonnull DataInputView inView, @Nonnegative int keyGroupId) throws IOException {
            if (inView.readByte() == 0) {
                return;
            }
            TypeSerializer<N> namespaceSerializer = this.stateTable.getNamespaceSerializer();
            TypeSerializer<S> stateSerializer = this.stateTable.getStateSerializer();
            int numNamespaces = inView.readInt();
            for (int k = 0; k < numNamespaces; ++k) {
                N namespace = namespaceSerializer.deserialize(inView);
                int numEntries = inView.readInt();
                for (int l = 0; l < numEntries; ++l) {
                    K key = this.keySerializer.deserialize(inView);
                    S state = stateSerializer.deserialize(inView);
                    this.stateTable.put(key, keyGroupId, namespace, state);
                }
            }
        }
    }
}

