/********************************************************************************
 * Copyright (c) 2015-2018 Contributors to the Eclipse Foundation
 *
 * See the NOTICE file(s) distributed with this work for additional
 * information regarding copyright ownership.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0
 *
 ********************************************************************************/

package org.eclipse.mdm.businessobjects.utils;

import java.io.ByteArrayOutputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.eclipse.mdm.api.base.ServiceNotProvidedException;
import org.eclipse.mdm.api.base.massdata.ReadRequest;
import org.eclipse.mdm.api.base.massdata.ReadRequest.ValuesMode;
import org.eclipse.mdm.api.base.massdata.ReadRequestBuilder;
import org.eclipse.mdm.api.base.model.AxisType;
import org.eclipse.mdm.api.base.model.Channel;
import org.eclipse.mdm.api.base.model.ChannelGroup;
import org.eclipse.mdm.api.base.model.DoubleComplex;
import org.eclipse.mdm.api.base.model.FloatComplex;
import org.eclipse.mdm.api.base.model.MeasuredValues;
import org.eclipse.mdm.api.base.model.MeasuredValues.ValueIterator;
import org.eclipse.mdm.api.base.model.ScalarType;
import org.eclipse.mdm.api.base.model.Unit;
import org.eclipse.mdm.api.dflt.ApplicationContext;
import org.eclipse.mdm.api.dflt.EntityManager;
import org.eclipse.mdm.protobuf.Mdm;
import org.eclipse.mdm.protobuf.Mdm.BooleanArray;
import org.eclipse.mdm.protobuf.Mdm.ByteArray;
import org.eclipse.mdm.protobuf.Mdm.ByteStreamArray;
import org.eclipse.mdm.protobuf.Mdm.DateArray;
import org.eclipse.mdm.protobuf.Mdm.DoubleArray;
import org.eclipse.mdm.protobuf.Mdm.DoubleComplexArray;
import org.eclipse.mdm.protobuf.Mdm.FloatArray;
import org.eclipse.mdm.protobuf.Mdm.FloatComplexArray;
import org.eclipse.mdm.protobuf.Mdm.IntegerArray;
import org.eclipse.mdm.protobuf.Mdm.LongArray;
import org.eclipse.mdm.protobuf.Mdm.MeasuredValuesList;
import org.eclipse.mdm.protobuf.Mdm.ShortArray;
import org.eclipse.mdm.protobuf.Mdm.StringArray;

import com.google.common.base.Strings;
import com.google.common.primitives.Doubles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;

/**
 * Helper class for converting between protobuf and mdm types.
 *
 */
public class ProtobufConverter {

	/**
	 * Converted a DateArray to an array of {@link LocalDateTime}
	 * 
	 * @param dateArray
	 * @param zoneId
	 * @return array of {@link LocalDateTime}
	 */
	public static LocalDateTime[] convertDates(DateArray dateArray, ZoneId zoneId) {
		LocalDateTime[] strings = new LocalDateTime[dateArray.getValuesCount()];
		for (int i = 0; i < dateArray.getValuesCount(); i++) {
			Timestamp ts = dateArray.getValues(i);
			strings[i] = Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos()).atZone(zoneId).toLocalDateTime();
		}
		return strings;
	}

	/**
	 * Converts a {@link StringArray} to an array of strings
	 * 
	 * @param stringArray
	 * @return array of strings
	 */
	public static String[] convertStrings(StringArray stringArray) {
		String[] strings = new String[stringArray.getValuesCount()];
		for (int i = 0; i < stringArray.getValuesCount(); i++) {
			strings[i] = stringArray.getValues(i);
		}
		return strings;
	}

	/**
	 * Converts a list of {@link org.eclipse.mdm.protobuf.Mdm.FloatComplex} to an
	 * array of {@link FloatComplex}
	 * 
	 * @param valuesList
	 * @return array of {@link FloatComplex}
	 */
	public static FloatComplex[] convertFloatComplex(List<org.eclipse.mdm.protobuf.Mdm.FloatComplex> valuesList) {
		FloatComplex[] floatComplexes = new FloatComplex[valuesList.size()];
		for (int i = 0; i < valuesList.size(); i++) {
			floatComplexes[i] = new FloatComplex(valuesList.get(i).getRe(), valuesList.get(i).getIm());
		}
		return floatComplexes;
	}

	/**
	 * Converts a list of {@link org.eclipse.mdm.protobuf.Mdm.DoubleComplex} to an
	 * array of {@link DoubleComplex}
	 * 
	 * @param valuesList
	 * @return array of {@link DoubleComplex}
	 */
	public static DoubleComplex[] convertDoubleComplex(List<org.eclipse.mdm.protobuf.Mdm.DoubleComplex> valuesList) {
		DoubleComplex[] doubleComplexes = new DoubleComplex[valuesList.size()];
		for (int i = 0; i < valuesList.size(); i++) {
			doubleComplexes[i] = new DoubleComplex(valuesList.get(i).getRe(), valuesList.get(i).getIm());
		}
		return doubleComplexes;
	}

	/**
	 * Converts a list of {@link ByteString} to an array of array of byte.
	 * 
	 * @param valuesList
	 * @return array of array of byte.
	 */
	public static byte[][] convertByteStreams(List<ByteString> valuesList) {
		byte[][] byteStreams = new byte[valuesList.size()][];
		for (int i = 0; i < valuesList.size(); i++) {
			byteStreams[i] = valuesList.get(i).toByteArray();
		}
		return byteStreams;
	}

	/**
	 * Converts an {@link org.eclipse.mdm.protobuf.Mdm.AxisType} to {@link AxisType}
	 * 
	 * @param axisType
	 * @return converted {@link AxisType}
	 */
	public static AxisType convert(Mdm.AxisType axisType) {
		switch (axisType) {
		case X_AXIS:
			return AxisType.X_AXIS;
		case Y_AXIS:
			return AxisType.Y_AXIS;
		case XY_AXIS:
			return AxisType.XY_AXIS;
		default:
			throw new RuntimeException("Invalid value for AxisType: " + axisType.name());
		}
	}

	/**
	 * Converts an {@link org.eclipse.mdm.protobuf.Mdm.ScalarType} to a
	 * {@link ScalarType}.
	 * 
	 * @param scalarType
	 * @return converted {@link ScalarType}
	 */
	public static ScalarType convert(Mdm.ScalarType scalarType) {
		switch (scalarType) {
		case STRING:
			return ScalarType.STRING;
		case DATE:
			return ScalarType.DATE;
		case BOOLEAN:
			return ScalarType.BOOLEAN;
		case BYTE:
			return ScalarType.BYTE;
		case SHORT:
			return ScalarType.SHORT;
		case INTEGER:
			return ScalarType.INTEGER;
		case LONG:
			return ScalarType.LONG;
		case FLOAT:
			return ScalarType.FLOAT;
		case DOUBLE:
			return ScalarType.DOUBLE;
		case BYTE_STREAM:
			return ScalarType.BYTE_STREAM;
		case FLOAT_COMPLEX:
			return ScalarType.FLOAT_COMPLEX;
		case DOUBLE_COMPLEX:
			return ScalarType.DOUBLE_COMPLEX;
		case ENUMERATION:
			return ScalarType.ENUMERATION;
		case FILE_LINK:
			return ScalarType.FILE_LINK;
		case BLOB:
			return ScalarType.BLOB;
		case UNKNOWN:
			return ScalarType.UNKNOWN;
		default:
			throw new RuntimeException("Invalid value for ScalarType: " + scalarType.name());
		}
	}

	/**
	 * Converts a list of {@link MeasuredValues} to a {@link MeasuredValuesList}
	 * 
	 * @param measuredValues
	 * @return converted {@link MeasuredValuesList}
	 */
	public static MeasuredValuesList convert(List<MeasuredValues> measuredValues) {
		MeasuredValuesList.Builder builder = MeasuredValuesList.newBuilder();
		for (MeasuredValues m : measuredValues) {
			builder.addValues(convert(m));
		}
		return builder.build();
	}

	/**
	 * Converts {@link MeasuredValues} to
	 * {@link org.eclipse.mdm.protobuf.Mdm.MeasuredValues}
	 * 
	 * @param m
	 * @return converted {@link org.eclipse.mdm.protobuf.Mdm.MeasuredValues}
	 */
	public static Mdm.MeasuredValues convert(MeasuredValues m) {

		Mdm.MeasuredValues.Builder builder = Mdm.MeasuredValues.newBuilder().setName(m.getName()).setUnit(m.getUnit())
				.setLength(m.getLength()).setAxisType(convert(m.getAxisType())).setIndependent(m.isIndependent())
				.setScalarType(Mdm.ScalarType.valueOf(m.getScalarType().name()))
				.addAllGenerationParameters(Doubles.asList(m.getGenerationParameters()));

		BooleanArray.Builder flags = BooleanArray.newBuilder();
		ValueIterator<Object> it = m.iterator();

		if (m.getScalarType().isString()) {
			StringArray.Builder strings = StringArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				strings.addValues((String) it.next());
			}
			builder.setStringArray(strings);
		} else if (m.getScalarType().isDate()) {
			DateArray.Builder dates = DateArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				LocalDateTime t = (LocalDateTime) it.next();
				Instant time = t.toInstant(ZoneOffset.UTC);
				dates.addValues(
						Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build());
			}
			builder.setDateArray(dates);
		} else if (m.getScalarType().isBoolean()) {
			BooleanArray.Builder booleans = BooleanArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				booleans.addValues((boolean) it.next());
			}
			builder.setBooleanArray(booleans);
		} else if (m.getScalarType().isByte()) {
			ByteArrayOutputStream bytes = new ByteArrayOutputStream();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				bytes.write((byte) it.next());
			}
			builder.setByteArray(ByteArray.newBuilder().setValues(ByteString.copyFrom(bytes.toByteArray())));
		} else if (m.getScalarType().isShort()) {
			ShortArray.Builder shorts = ShortArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				shorts.addValues((short) it.next());
			}
			builder.setShortArray(shorts);
		} else if (m.getScalarType().isInteger()) {
			IntegerArray.Builder ints = IntegerArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				int i = (int) it.next();
				ints.addValues(i);
			}
			builder.setIntegerArray(ints);
		} else if (m.getScalarType().isLong()) {
			LongArray.Builder ints = LongArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				ints.addValues((long) it.next());
			}
			builder.setLongArray(ints);
		} else if (m.getScalarType().isFloat()) {
			FloatArray.Builder floats = FloatArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				floats.addValues((float) it.next());
			}
			builder.setFloatArray(floats);
		} else if (m.getScalarType().isDouble()) {
			DoubleArray.Builder doubles = DoubleArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				doubles.addValues((double) it.next());
			}
			builder.setDoubleArray(doubles);
		} else if (m.getScalarType().isByteStream()) {
			ByteStreamArray.Builder bytestrs = ByteStreamArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				bytestrs.addValues(ByteString.copyFrom((byte[]) it.next()));
			}
			builder.setByteStreamArray(bytestrs);
		} else if (m.getScalarType().isFloatComplex()) {
			FloatComplexArray.Builder floats = FloatComplexArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				floats.addValues(convert((FloatComplex) it.next()));
			}
			builder.setFloatComplexArray(floats);
		} else if (m.getScalarType().isDoubleComplex()) {
			DoubleComplexArray.Builder doubles = DoubleComplexArray.newBuilder();
			while (it.hasNext()) {
				flags.addValues(it.isValid());
				doubles.addValues(convert((DoubleComplex) it.next()));
			}
			builder.setDoubleComplexArray(doubles);
		} else {
			throw new IllegalArgumentException(
					"MeasuredValues with scalarType '" + m.getScalarType() + "' not supported!");
		}
		builder.addAllFlags(flags.getValuesList());

		return builder.build();
	}

	/**
	 * Converts between FloatComplex.
	 * 
	 * @param complex
	 * @return
	 */
	public static Mdm.FloatComplex convert(FloatComplex complex) {
		return Mdm.FloatComplex.newBuilder().setRe(complex.real()).setIm(complex.imaginary()).build();
	}

	/**
	 * Converts between DoubleComplex.
	 * 
	 * @param complex
	 * @return
	 */
	public static Mdm.DoubleComplex convert(DoubleComplex complex) {
		return Mdm.DoubleComplex.newBuilder().setRe(complex.real()).setIm(complex.imaginary()).build();
	}

	/**
	 * Converts between AxisType.
	 * 
	 * @param axisType
	 * @return
	 */
	public static Mdm.AxisType convert(AxisType axisType) {
		return Mdm.AxisType.valueOf(axisType.name());
	}

	/**
	 * Converts between ValuesMode.
	 * 
	 * @param valuesMode
	 * @return
	 */
	public static ValuesMode convert(Mdm.ValuesMode valuesMode) {
		return ValuesMode.valueOf(valuesMode.name());
	}

	/**
	 * Converts between ScalarType.
	 * 
	 * @param value
	 * @return
	 */
	public static Mdm.ScalarType convert(ScalarType value) {
		return Mdm.ScalarType.valueOf(value.name());
	}

	/**
	 * Converts a {@link org.eclipse.mdm.protobuf.Mdm.ReadRequest} to a
	 * {@link ReadRequest}
	 * 
	 * @param context
	 * @param protoReadRequest
	 * @return converted {@link ReadRequest}
	 */
	public static ReadRequest convert(ApplicationContext context, Mdm.ReadRequest protoReadRequest) {

		EntityManager em = context.getEntityManager()
				.orElseThrow(() -> new ServiceNotProvidedException(EntityManager.class));

		ChannelGroup channelGroup = em.load(ChannelGroup.class, protoReadRequest.getChannelGroupId());
		ReadRequestBuilder rb = ReadRequest.create(channelGroup);
		if (protoReadRequest.getChannelIdsCount() == 0) {
			rb = rb.allChannels();
		} else {
			// Load Channels and group by ID. If multiple Channels with the same ID are
			// loaded (which would be incorrect data), only the first one is used.
			Map<String, Optional<Channel>> channels = em.load(Channel.class, protoReadRequest.getChannelIdsList())
					.stream().collect(Collectors.groupingBy(Channel::getID, Collectors.reducing((c1, c2) -> c1)));

			// Load Units and group by ID. If multiple Units with the same ID are
			// loaded (which would be incorrect data), only the first one is used.
			Map<String, Optional<Unit>> units = em.load(Unit.class, protoReadRequest.getUnitIdsList()).stream()
					.collect(Collectors.groupingBy(Unit::getID, Collectors.reducing((u1, u2) -> u1)));

			for (int i = 0; i < protoReadRequest.getChannelIdsCount(); i++) {
				String channelId = protoReadRequest.getChannelIds(i);

				Channel channel = channels.get(channelId).orElseThrow(
						() -> new IllegalArgumentException("Channel with ID '" + channelId + "' does not exist!"));

				final String unitId;
				if (i < protoReadRequest.getUnitIdsCount()) {
					unitId = protoReadRequest.getUnitIds(i);
				} else {
					unitId = null;
				}

				Unit unit;

				if (Strings.isNullOrEmpty(unitId)) {
					// no unit provided -> use unit from channel
					unit = channel.getUnit();
				} else {
					unit = units.get(unitId).orElseThrow(
							() -> new IllegalArgumentException("Unit with ID '" + unitId + "' does not exist!"));
				}
				rb = rb.channel(channel, unit);
			}
		}

		return rb.valuesMode(convert(protoReadRequest.getValuesMode())).values(protoReadRequest.getStartIndex(),
				protoReadRequest.getRequestSize());
	}
}
