/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.runtime.checkpoint.KeyGroupState;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;

class SavepointV1Serializer
implements SavepointSerializer<SavepointV0> {
    public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();

    private SavepointV1Serializer() {
    }

    @Override
    public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
        dos.writeLong(savepoint.getCheckpointId());
        Collection<TaskState> taskStates = savepoint.getTaskStates();
        dos.writeInt(taskStates.size());
        for (TaskState taskState : savepoint.getTaskStates()) {
            byte[] serialized;
            SerializedValue<StateHandle<?>> serializedValue;
            int i;
            dos.writeLong(taskState.getJobVertexID().getLowerPart());
            dos.writeLong(taskState.getJobVertexID().getUpperPart());
            int parallelism = taskState.getParallelism();
            dos.writeInt(parallelism);
            dos.writeInt(taskState.getNumberCollectedStates());
            for (i = 0; i < parallelism; ++i) {
                SubtaskState subtaskState = taskState.getState(i);
                if (subtaskState == null) continue;
                dos.writeInt(i);
                serializedValue = subtaskState.getState();
                if (serializedValue == null) {
                    dos.writeInt(-1);
                } else {
                    serialized = serializedValue.getByteArray();
                    dos.writeInt(serialized.length);
                    dos.write(serialized, 0, serialized.length);
                }
                dos.writeLong(subtaskState.getStateSize());
                dos.writeLong(subtaskState.getDuration());
            }
            dos.writeInt(taskState.getNumberCollectedKvStates());
            for (i = 0; i < parallelism; ++i) {
                KeyGroupState keyGroupState = taskState.getKvState(i);
                if (keyGroupState == null) continue;
                dos.write(i);
                serializedValue = keyGroupState.getKeyGroupState();
                if (serializedValue == null) {
                    dos.writeInt(-1);
                } else {
                    serialized = serializedValue.getByteArray();
                    dos.writeInt(serialized.length);
                    dos.write(serialized, 0, serialized.length);
                }
                dos.writeLong(keyGroupState.getStateSize());
                dos.writeLong(keyGroupState.getDuration());
            }
        }
    }

    @Override
    public SavepointV0 deserialize(DataInputStream dis) throws IOException {
        long checkpointId = dis.readLong();
        int numTaskStates = dis.readInt();
        ArrayList<TaskState> taskStates = new ArrayList<TaskState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
            int parallelism = dis.readInt();
            TaskState taskState = new TaskState(jobVertexId, parallelism);
            taskStates.add(taskState);
            int numSubTaskStates = dis.readInt();
            for (int j = 0; j < numSubTaskStates; ++j) {
                SerializedValue serializedValue;
                int subtaskIndex = dis.readInt();
                int length = dis.readInt();
                if (length == -1) {
                    serializedValue = new SerializedValue(null);
                } else {
                    byte[] serializedData = new byte[length];
                    dis.readFully(serializedData, 0, length);
                    serializedValue = SerializedValue.fromBytes((byte[])serializedData);
                }
                long stateSize = dis.readLong();
                long duration = dis.readLong();
                SubtaskState subtaskState = new SubtaskState(serializedValue, stateSize, duration);
                taskState.putState(subtaskIndex, subtaskState);
            }
            int numKvStates = dis.readInt();
            for (int j = 0; j < numKvStates; ++j) {
                SerializedValue serializedValue;
                int keyGroupIndex = dis.readInt();
                int length = dis.readInt();
                if (length == -1) {
                    serializedValue = new SerializedValue(null);
                } else {
                    byte[] serializedData = new byte[length];
                    dis.readFully(serializedData, 0, length);
                    serializedValue = SerializedValue.fromBytes((byte[])serializedData);
                }
                long stateSize = dis.readLong();
                long duration = dis.readLong();
                KeyGroupState keyGroupState = new KeyGroupState(serializedValue, stateSize, duration);
                taskState.putKvState(keyGroupIndex, keyGroupState);
            }
        }
        return new SavepointV0(checkpointId, taskStates);
    }
}

