/**
 * Copyright (c) 2011, 2015 - Lunifera GmbH (Gross Enzersdorf, Austria), Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License 2.0 
 * which accompanies this distribution, and is available at
 * https://www.eclipse.org/legal/epl-2.0/
 *
 * SPDX-License-Identifier: EPL-2.0
 *
 * Contributors:
 *         Florian Pirchner - Initial implementation
 */
package org.eclipse.osbp.runtime.web.vaadin.common.data;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.osbp.dsl.semantic.dto.util.NamingConventionsUtil;
import org.eclipse.osbp.runtime.common.annotations.DtoUtils;
import org.eclipse.osbp.runtime.common.event.EventDispatcherEvent;
import org.eclipse.osbp.runtime.common.event.EventDispatcherEvent.EventDispatcherDataTag;
import org.eclipse.osbp.runtime.common.event.IEventDispatcher;
import org.eclipse.osbp.runtime.common.filter.SortBy;
import org.eclipse.osbp.runtime.common.filter.SortOrder;
import org.eclipse.osbp.runtime.common.state.ISharedStateContext;
import org.eclipse.osbp.ui.api.datamart.IDatamartContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.vaadin.data.Container;
import com.vaadin.data.Container.Filterable;
import com.vaadin.data.Container.Indexed;
import com.vaadin.data.Container.SimpleFilterable;
import com.vaadin.data.Container.Sortable;
import com.vaadin.data.Item;
import com.vaadin.data.Property;
import com.vaadin.data.Property.ValueChangeEvent;
import com.vaadin.data.Property.ValueChangeListener;
import com.vaadin.data.util.AbstractContainer;
import com.vaadin.data.util.NestedPropertyDescriptor;
import com.vaadin.data.util.VaadinPropertyDescriptor;
import com.vaadin.data.util.filter.SimpleStringFilter;
import com.vaadin.data.util.filter.UnsupportedFilterException;

/**
 * The Class BeanServiceLazyLoadingContainer.
 *
 * @param <BEANTYPE>
 *            the generic type
 */
@SuppressWarnings("serial")
public class BeanServiceLazyLoadingContainer<BEANTYPE> extends AbstractContainer
		implements Filterable, Indexed, SimpleFilterable, ILazyRefreshFilterable, Sortable, ValueChangeListener,
		Container.ItemSetChangeNotifier, Container.PropertySetChangeNotifier, IClearable, INestedPropertyAble<BEANTYPE>,
		IAlreadyLoadedItemIdProvider<BEANTYPE>, IEventDispatcher.Receiver, IGlobalFilterable {

	/** The Constant LOGGER. */
	private static final Logger LOGGER = LoggerFactory.getLogger(BeanServiceLazyLoadingContainer.class);

	private BeanIdResolver<BEANTYPE> beanIdResolver = new DtoIdResolver<>();

	/** The service. */
	private IBeanSearchService<BEANTYPE> service;

	/** The sort order. */
	private SortOrder sortOrder = new SortOrder();

	/**
	 * These filters are not automatically removed by
	 * removeAllContainerFilters.<br>
	 * Eg, they are used by Historized DTOs to filter only their current
	 * version.
	 */
	private List<Filter> globalFilters = new ArrayList<Filter>();

	/** The filters. */
	private List<Filter> filters = new ArrayList<Filter>();

	/** The type. */
	private Class<BEANTYPE> type;

	/** The sortable properties. */
	private Set<String> sortableProperties = new HashSet<String>();

	/** The model. */
	private LinkedHashMap<String, VaadinPropertyDescriptor<BEANTYPE>> model;

	/** The cache. */
	private Map<Object, DeepResolvingBeanItem<BEANTYPE>> cache = new HashMap<>();

	/** The external cache. */
	// new records that have been added from outside -> new records
	private Map<Object, DeepResolvingBeanItem<BEANTYPE>> externalCache = new HashMap<>();

	private IEventDispatcher eventDispatcher;

	/**
	 * Instantiates a new bean service lazy loading container.
	 *
	 * @param service
	 *            the service
	 * @param type
	 *            the type
	 * @param sharedState
	 *            the shared state
	 */
	public BeanServiceLazyLoadingContainer(IBeanSearchService<BEANTYPE> service, Class<BEANTYPE> type,
			ISharedStateContext sharedState, IEventDispatcher eventDispatcher) {
		this.service = service != null ? service : new StatefulInMemoryBeanSearchService<BEANTYPE>(type);
		this.type = type;
		this.eventDispatcher = eventDispatcher;

		model = DeepResolvingBeanItem.getPropertyDescriptors((Class<BEANTYPE>) type);

		// create all sortable columns
		for (String column : model.keySet()) {
			if (DtoUtils.isDirtyField(type, column)) {
				continue;
			}
			sortableProperties.add(column);
		}

		if (eventDispatcher != null) {
			eventDispatcher.addEventReceiver(this);
		}
	}

	public BeanServiceLazyLoadingContainer(IBeanSearchService<BEANTYPE> service, Class<BEANTYPE> type,
			ISharedStateContext sharedState) {
		this(service, type, sharedState, null);
	}

	@Override
	public void receiveEvent(EventDispatcherEvent event) {
		switch (event.getCommand()) {
		case SAVE:
		case DELETE:
			if (event.getTopic().equals(NamingConventionsUtil.toFqnEntityName(type.getCanonicalName()))) {
				Map<Object, BEANTYPE> loadedItems = getAlreadyLoadedItemIdsAsMap();
				Object changedId = event.getData().get(EventDispatcherDataTag.ID);
				if (loadedItems.containsKey(changedId)) {
					BEANTYPE bean = loadedItems.get(changedId);
					disposeCacheFor(bean);
				}
			}
			break;
		default:
			break;
		}

	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.util.AbstractContainer#addListener(com.vaadin.data.
	 * Container.PropertySetChangeListener)
	 */
	@Override
	public void addListener(PropertySetChangeListener listener) {
		super.addPropertySetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.util.AbstractContainer#removeListener(com.vaadin.data.
	 * Container.PropertySetChangeListener)
	 */
	@Override
	public void removeListener(PropertySetChangeListener listener) {
		super.removePropertySetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.util.AbstractContainer#addListener(com.vaadin.data.
	 * Container.ItemSetChangeListener)
	 */
	@Override
	public void addListener(ItemSetChangeListener listener) {
		super.addItemSetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.util.AbstractContainer#removeListener(com.vaadin.data.
	 * Container.ItemSetChangeListener)
	 */
	@Override
	public void removeListener(ItemSetChangeListener listener) {
		super.removeItemSetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.util.AbstractContainer#addPropertySetChangeListener(com.
	 * vaadin.data.Container.PropertySetChangeListener)
	 */
	@Override
	public void addPropertySetChangeListener(PropertySetChangeListener listener) {
		super.addPropertySetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.util.AbstractContainer#removePropertySetChangeListener(
	 * com.vaadin.data.Container.PropertySetChangeListener)
	 */
	@Override
	public void removePropertySetChangeListener(PropertySetChangeListener listener) {
		super.removePropertySetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.util.AbstractContainer#addItemSetChangeListener(com.
	 * vaadin.data.Container.ItemSetChangeListener)
	 */
	@Override
	public void addItemSetChangeListener(ItemSetChangeListener listener) {
		super.addItemSetChangeListener(listener);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.util.AbstractContainer#removeItemSetChangeListener(com.
	 * vaadin.data.Container.ItemSetChangeListener)
	 */
	@Override
	public void removeItemSetChangeListener(ItemSetChangeListener listener) {
		super.removeItemSetChangeListener(listener);
	}

	/**
	 * Adds the item.
	 *
	 * @param itemId
	 *            the item id
	 * @param bean
	 *            the bean
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> addItem(Object itemId, BEANTYPE bean) {
		return addBean(bean);
	}

	/**
	 * Internal add item at end.
	 *
	 * @param newItemId
	 *            the new item id
	 * @param item
	 *            the item
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> internalAddItemAtEnd(Object newItemId,
			DeepResolvingBeanItem<BEANTYPE> item) {
		if (newItemId == null)
			return null;
		Object key = resolveBeanId(newItemId);
		if (externalCache.containsKey(key)) {
			cache.put(key, externalCache.get(key));
		} else {
			cache.put(key, item);
		}
		return item;
	}

	/**
	 * Creates the bean item.
	 *
	 * @param bean
	 *            the bean
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> createBeanItem(BEANTYPE bean) {
		return bean == null ? null : new DeepResolvingBeanItem<BEANTYPE>(bean, model);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#nextItemId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public Object nextItemId(final Object itemId) {
		return mapToCachedId(service.getNextBean((BEANTYPE) itemId, filters, sortOrder));
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#prevItemId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public Object prevItemId(final Object itemId) {
		return mapToCachedId(service.getPreviousBean((BEANTYPE) itemId, filters, sortOrder));
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#firstItemId()
	 */
	@Override
	public Object firstItemId() {
		return mapToCachedId(service.getFirstBean(filters, sortOrder));
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#lastItemId()
	 */
	@Override
	public Object lastItemId() {
		return mapToCachedId(service.getLastBean(filters, sortOrder));
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#isFirstId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public boolean isFirstId(final Object itemId) {
		return service.isFirstBean((BEANTYPE) itemId, filters, sortOrder);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#isLastId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public boolean isLastId(final Object itemId) {
		return service.isLastBean((BEANTYPE) itemId, filters, sortOrder);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#addItemAfter(java.lang.Object)
	 */
	@Override
	public Object addItemAfter(Object previousItemId) throws UnsupportedOperationException {
		throw new UnsupportedOperationException();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Ordered#addItemAfter(java.lang.Object,
	 * java.lang.Object)
	 */
	@Override
	public Item addItemAfter(Object previousItemId, Object newItemId) throws UnsupportedOperationException {
		throw new UnsupportedOperationException();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#getItem(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public Item getItem(Object itemId) {
		if (itemId == null) {
			return null;
		}
		Object key = resolveBeanId(itemId);
		if (cache.containsKey(key)) {
			return cache.get(key);
		} else if (externalCache.containsKey(key)) {
			return externalCache.get(key);
		}

		BEANTYPE bean = service.refresh((BEANTYPE) itemId);
		return bean != null ? addBean(bean) : null;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#getContainerPropertyIds()
	 */
	@Override
	public Collection<?> getContainerPropertyIds() {
		return model.keySet();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#getItemIds()
	 */
	@Override
	public Collection<?> getItemIds() {
		// Due to directly call of this method by com.vaadin.ui.ListSelect
		// without a previous calculation of pagesize a fix pagesize of 100
		// elements will be set in this method.
		@SuppressWarnings("unchecked")
		List<Object> itemIds = (List<Object>) getItemIds(0, 100);
		if (itemIds.isEmpty()) {
			itemIds.addAll(cache.values().stream().map(e -> e.getBean()).collect(Collectors.toList()));
			itemIds.addAll(externalCache.values().stream().map(e -> e.getBean()).collect(Collectors.toList()));
		}
		return itemIds;
	}

	@Override
	public List<BEANTYPE> getAlreadyLoadedItemIds() {
		List<BEANTYPE> itemIds = new ArrayList<>();
		itemIds.addAll(cache.values().stream().map(e -> e.getBean()).collect(Collectors.toList()));
		itemIds.addAll(externalCache.values().stream().map(e -> e.getBean()).collect(Collectors.toList()));
		return itemIds;
	}

	protected Map<Object, BEANTYPE> getAlreadyLoadedItemIdsAsMap() {
		Map<Object, BEANTYPE> itemIds = new HashMap<>();
		externalCache.entrySet().forEach(e -> itemIds.put(e.getKey(), e.getValue().getBean()));
		cache.entrySet().forEach(e -> itemIds.put(e.getKey(), e.getValue().getBean()));
		return itemIds;
	}

	@Override
	public void disposeCacheFor(Object itemId) {
		if (itemId == null) {
			return;
		}
		Object key = resolveBeanId(itemId);
		if (key != null) {
			cache.remove(key);
			externalCache.remove(key);

			// notify the listeners
			fireItemSetChange();
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#getContainerProperty(java.lang.Object,
	 * java.lang.Object)
	 */
	@Override
	public Property<?> getContainerProperty(Object itemId, Object propertyId) {
		Item item = getItem(itemId);
		if (item == null) {
			return null;
		}
		return item.getItemProperty(propertyId);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#getType(java.lang.Object)
	 */
	@Override
	public Class<?> getType(Object propertyId) {
		return model.get(propertyId).getPropertyType();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#size()
	 */
	@Override
	public int size() {
		return service.size(filters) + externalCache.size();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#containsId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public boolean containsId(final Object itemId) {
		if (cache.containsKey(resolveBeanId(itemId)) || externalCache.containsKey(resolveBeanId(itemId))) {
			return true;
		}
		return service.contains((BEANTYPE) itemId, filters);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#addItem(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public Item addItem(Object itemId) throws UnsupportedOperationException {
		// external data -> new record
		return addBean((BEANTYPE) itemId, true);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#addItem()
	 */
	@Override
	public Object addItem() throws UnsupportedOperationException {
		throw new UnsupportedOperationException("Bean required for new item!");
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#removeItem(java.lang.Object)
	 */
	@Override
	public boolean removeItem(Object itemId) throws UnsupportedOperationException {
		throw new UnsupportedOperationException("Does not support deleting item!");
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#addContainerProperty(java.lang.Object,
	 * java.lang.Class, java.lang.Object)
	 */
	@Override
	public boolean addContainerProperty(Object propertyId, Class<?> type, Object defaultValue)
			throws UnsupportedOperationException {
		throw new UnsupportedOperationException();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#removeContainerProperty(java.lang.Object)
	 */
	@Override
	public boolean removeContainerProperty(Object propertyId) throws UnsupportedOperationException {
		throw new UnsupportedOperationException();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.eclipse.osbp.runtime.web.vaadin.common.data.INestedPropertyAble#
	 * addNestedContainerProperty(java.lang.String)
	 */
	@Override
	@SuppressWarnings({ "rawtypes", "unchecked" })
	public boolean addNestedContainerProperty(String propertyId) {
		return addContainerProperty(propertyId, new NestedPropertyDescriptor(propertyId, type));
	}

	/**
	 * Adds a property for the container and all its items.
	 * 
	 * Primarily for internal use, may change in future versions.
	 *
	 * @param propertyId
	 *            the property id
	 * @param propertyDescriptor
	 *            the property descriptor
	 * @return true if the property was added
	 */
	protected final boolean addContainerProperty(String propertyId,
			VaadinPropertyDescriptor<BEANTYPE> propertyDescriptor) {
		if (null == propertyId || null == propertyDescriptor) {
			return false;
		}

		// Fails if the Property is already present
		if (model.containsKey(propertyId)) {
			return false;
		}

		model.put(propertyId, propertyDescriptor);

		// add to sortable properties
		sortableProperties.add(propertyId);

		// Sends a change event
		fireContainerPropertySetChange();

		return true;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container#removeAllItems()
	 */
	@Override
	public boolean removeAllItems() throws UnsupportedOperationException {
		throw new UnsupportedOperationException("Does not support deleting all items!");
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.Property.ValueChangeListener#valueChange(com.vaadin.data.
	 * Property.ValueChangeEvent)
	 */
	@Override
	public void valueChange(ValueChangeEvent event) {

	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Sortable#sort(java.lang.Object[],
	 * boolean[])
	 */
	@Override
	public void sort(Object[] propertyIds, boolean[] ascending) {
		sortOrder.clear();
		for (int i = 0; i < propertyIds.length; i++) {
			sortOrder.add(new SortBy((String) propertyIds[i], ascending[i]));
		}
		clearCache();

		fireItemSetChange();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Sortable#getSortableContainerPropertyIds()
	 */
	@Override
	public Collection<?> getSortableContainerPropertyIds() {
		return sortableProperties;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.Container.SimpleFilterable#addContainerFilter(java.lang.
	 * Object, java.lang.String, boolean, boolean)
	 */
	@Override
	public void addContainerFilter(Object propertyId, String filterString, boolean ignoreCase,
			boolean onlyMatchPrefix) {
		addContainerFilter(new SimpleStringFilter(propertyId, filterString, ignoreCase, onlyMatchPrefix));
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.Container.SimpleFilterable#removeContainerFilters(java.
	 * lang.Object)
	 */
	@Override
	public void removeContainerFilters(Object propertyId) {
		if (getFilters().isEmpty() || propertyId == null) {
			return;
		}
		boolean result = false;
		for (Iterator<Filter> iterator = getFilters().iterator(); iterator.hasNext();) {
			Filter f = iterator.next();
			if (f.appliesToProperty(propertyId)) {
				iterator.remove();
				result = true;
			}
		}

		if (result) {
			clearCache();
		}
	}

	/**
	 * Gets the filters.
	 *
	 * @return the filters
	 */
	protected List<Filter> getFilters() {
		return filters;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.Container.Filterable#addContainerFilter(com.vaadin.data.
	 * Container.Filter)
	 */
	@Override
	public void addContainerFilter(Filter filter) throws UnsupportedFilterException {
		if (!filters.contains(filter)) {
			filters.add(filter);
			clearCache();
		}

		fireItemSetChange();
	}

	/* (non-Javadoc)
	 * @see org.eclipse.osbp.runtime.web.vaadin.common.data.IGlobalFilterable#addGlobalContainerFilter(com.vaadin.data.Container.Filter)
	 */
	@Override
	public void addGlobalContainerFilter(Filter filter) throws UnsupportedFilterException {
		if (!globalFilters.contains(filter)) {
			globalFilters.add(filter);
		}
		if (!filters.contains(filter)) {
			filters.add(filter);
			clearCache();
		}

		fireItemSetChange();
	}
	
	/* (non-Javadoc)
	 * @see org.eclipse.osbp.runtime.web.vaadin.common.data.IGlobalFilterable#removeGlobalContainerFilter(com.vaadin.data.Container.Filter)
	 */
	@Override
	public void removeGlobalContainerFilter(Filter filter) throws UnsupportedFilterException {
		globalFilters.remove(filter);
		
		if (filters.remove(filter)) {
			clearCache();

			fireItemSetChange();
		}
	}

	/**
	 * Clear cache.
	 */
	protected void clearCache() {
		cache.clear();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.vaadin.data.Container.Filterable#removeContainerFilter(com.vaadin.
	 * data.Container.Filter)
	 */
	@Override
	public void removeContainerFilter(Filter filter) {
		if (filters.remove(filter)) {
			clearCache();

			fireItemSetChange();
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Filterable#removeAllContainerFilters()
	 */
	@Override
	public void removeAllContainerFilters() {
		boolean hadFilters = !filters.isEmpty();

		filters.clear();
		
		// but add the globalFilters again
		filters.addAll(globalFilters);
		
		clearCache();

		if (hadFilters) {
			fireItemSetChange();
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Filterable#getContainerFilters()
	 */
	@Override
	public Collection<Filter> getContainerFilters() {
		return Collections.unmodifiableList(filters);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * org.eclipse.osbp.runtime.web.vaadin.common.data.ILazyRefreshFilterable#
	 * refreshFilters()
	 */
	@Override
	public void refreshFilters() {
		fireItemSetChange();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Indexed#indexOfId(java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public int indexOfId(final Object itemId) {
		return service.indexOf((BEANTYPE) itemId, filters, sortOrder);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Indexed#getIdByIndex(int)
	 */
	@Override
	public Object getIdByIndex(final int index) {
		BEANTYPE result = null;
		result = (BEANTYPE) service.getBeanByIndex(index, filters, sortOrder);
		if (isCached(result)) {
			result = mapToCachedId(result);
		} else {
			addBean(result);
		}
		return result;
	}

	/**
	 * Tries to find the itemId in the cache. If it does so, the cached instance
	 * is returned.<br>
	 * This method will NOT add the itemId to the cache.
	 *
	 * @param itemId
	 *            the item id
	 * @return the beantype
	 */
	private BEANTYPE mapToCachedId(BEANTYPE itemId) {
		if (!isCached(itemId)) {
			return itemId;
		}
		Object key = resolveBeanId(itemId);
		if (cache.containsKey(key)) {
			return cache.get(key).getBean();
		} else if (externalCache.containsKey(key)) {
			return externalCache.get(key).getBean();
		}
		return null;
	}

	/**
	 * Checks if is cached.
	 *
	 * @param bean
	 *            the bean
	 * @return true, if is cached
	 */
	protected boolean isCached(Object bean) {
		Object key = resolveBeanId(bean);
		return cache.containsKey(key) || externalCache.containsKey(key);
	}

	/**
	 * Adds the bean.
	 *
	 * @param result
	 *            the result
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> addBean(BEANTYPE result) {
		return addBean(result, false);
	}

	/**
	 * Adds the bean.
	 *
	 * @param result
	 *            the result
	 * @param toExternalCache
	 *            the to external cache
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> addBean(BEANTYPE result, boolean toExternalCache) {
		return addBean(result, result, toExternalCache);
	}

	/**
	 * Adds the bean.
	 *
	 * @param itemId
	 *            the item id
	 * @param result
	 *            the result
	 * @param toExternalCache
	 *            the to external cache
	 * @return the deep resolving bean item
	 */
	protected DeepResolvingBeanItem<BEANTYPE> addBean(Object itemId, BEANTYPE result, boolean toExternalCache) {

		Object key = resolveBeanId(itemId);
		if (cache.containsKey(key)) {
			return cache.get(key);
		}

		if (externalCache.containsKey(key)) {
			return externalCache.get(key);
		}

		DeepResolvingBeanItem<BEANTYPE> item = createBeanItem(result);
		if (toExternalCache) {
			externalCache.put(key, item);
		}

		return internalAddItemAtEnd(itemId, item);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Indexed#getItemIds(int, int)
	 */
	@Override

	public List<BEANTYPE> getItemIds(final int startIndex, final int numberOfItems) {

		LOGGER.debug(String.format("Fetching item ids: StartIndex:%d NumberOfItems:%d", startIndex, numberOfItems));

		List<BEANTYPE> beans = new ArrayList<BEANTYPE>();
		beans.addAll(externalCache.values().stream().map(e -> e.getBean()).collect(Collectors.toList()));
		// map the results to cached instances
		for (BEANTYPE bean : service.getBeansByIndex(startIndex, numberOfItems, filters, sortOrder)) {
			if (isCached(bean)) {
				beans.add(mapToCachedId(bean));
			} else {
				addBean(bean);
				beans.add(bean);
			}
		}
		return beans;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Indexed#addItemAt(int)
	 */
	@Override
	public Object addItemAt(int index) throws UnsupportedOperationException {
		throw new UnsupportedOperationException();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.vaadin.data.Container.Indexed#addItemAt(int, java.lang.Object)
	 */
	@SuppressWarnings("unchecked")
	@Override
	public Item addItemAt(int index, Object newItemId) throws UnsupportedOperationException {

		// external data -> new record
		return addBean((BEANTYPE) newItemId, true);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.eclipse.osbp.runtime.web.vaadin.common.data.IClearable#clear()
	 */
	@Override
	public void clear() {
		cache.clear();
		externalCache.clear();
	}

	/**
	 * Use the bean resolver to get the identifier for a bean.
	 *
	 * @param bean
	 * @return resolved bean identifier, null if could not be resolved
	 * @throws IllegalStateException
	 *             if no bean resolver is set
	 */
	@SuppressWarnings("unchecked")
	protected Object resolveBeanId(Object bean) {
		if (beanIdResolver == null) {
			throw new IllegalStateException("Bean item identifier resolver is required.");
		}
		return beanIdResolver.getIdForBean((BEANTYPE) bean);
	}

	/**
	 * Resolver that maps beans to their (item) identifiers, removing the need
	 * to explicitly specify item identifiers when there is no need to customize
	 * this.
	 *
	 * Note that beans can also be added with an explicit id even if a resolver
	 * has been set.
	 *
	 * @param <IDTYPE>
	 * @param <BEANTYPE>
	 *
	 * @since 6.5
	 */
	public interface BeanIdResolver<BEANTYPE> {
		/**
		 * Return the item identifier for a bean.
		 *
		 * @param bean
		 * @return
		 */
		public Object getIdForBean(BEANTYPE bean);
	}

	public static class DtoIdResolver<BEANTYPE> implements BeanIdResolver<BEANTYPE> {

		@Override
		public Object getIdForBean(BEANTYPE bean) {
			if (bean == null) {
				return "";
			}
			if (bean instanceof IDatamartContainer) {
				return ((IDatamartContainer) bean).getIdValue();
			} else {
				return DtoUtils.getIdValue(bean);
			}
		}
	}
}
