blob: 0c329d55fe31fb43e09fd2006abae2b06da007d3 [file] [log] [blame]
/*
* Copyright (c) 2009, 2011, 2012, 2015, 2016, 2019 Eike Stepper (Loehne, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.emf.internal.cdo.session.remote;
import org.eclipse.emf.cdo.session.remote.CDORemoteSession;
import org.eclipse.emf.cdo.session.remote.CDORemoteSessionEvent;
import org.eclipse.emf.cdo.session.remote.CDORemoteSessionManager;
import org.eclipse.emf.cdo.session.remote.CDORemoteSessionMessage;
import org.eclipse.net4j.util.collection.ArrayIterator;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.container.ContainerEvent;
import org.eclipse.net4j.util.container.IContainerDelta;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.emf.spi.cdo.InternalCDORemoteSession;
import org.eclipse.emf.spi.cdo.InternalCDORemoteSessionManager;
import org.eclipse.emf.spi.cdo.InternalCDOSession;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author Eike Stepper
*/
public class CDORemoteSessionManagerImpl extends Container<CDORemoteSession> implements InternalCDORemoteSessionManager
{
private static final CDORemoteSession[] NO_REMOTE_SESSIONS = {};
private InternalCDOSession localSession;
private boolean forceSubscription;
private boolean subscribed;
private Map<Integer, CDORemoteSession> remoteSessions = new HashMap<>();
public CDORemoteSessionManagerImpl()
{
}
@Override
public InternalCDOSession getLocalSession()
{
return localSession;
}
@Override
public void setLocalSession(InternalCDOSession localSession)
{
this.localSession = localSession;
}
@Override
public CDORemoteSession[] getRemoteSessions()
{
synchronized (this)
{
if (localSession.isActive())
{
if (subscribed)
{
Collection<CDORemoteSession> values = remoteSessions.values();
return values.toArray(new CDORemoteSession[values.size()]);
}
List<CDORemoteSession> loadedRemoteSessions = localSession.getSessionProtocol().getRemoteSessions(this, false);
return loadedRemoteSessions.toArray(new CDORemoteSession[loadedRemoteSessions.size()]);
}
return NO_REMOTE_SESSIONS;
}
}
@Override
public CDORemoteSession[] getElements()
{
return getRemoteSessions();
}
@Override
public boolean isSubscribed()
{
synchronized (this)
{
return subscribed;
}
}
@Override
public boolean isForceSubscription()
{
synchronized (this)
{
return forceSubscription;
}
}
@Override
public void setForceSubscription(boolean forceSubscription)
{
IEvent[] events = null;
synchronized (this)
{
this.forceSubscription = forceSubscription;
if (forceSubscription)
{
if (!subscribed)
{
events = subscribe();
}
}
else
{
if (!hasListeners())
{
events = unsubscribe();
}
}
}
fireEvents(events);
}
@Override
public Set<CDORemoteSession> sendMessage(CDORemoteSessionMessage message, CDORemoteSession... recipients)
{
return sendMessage(message, new ArrayIterator<>(recipients));
}
@Override
public Set<CDORemoteSession> sendMessage(CDORemoteSessionMessage message, Collection<CDORemoteSession> recipients)
{
return sendMessage(message, recipients.iterator());
}
private Set<CDORemoteSession> sendMessage(CDORemoteSessionMessage message, Iterator<CDORemoteSession> recipients)
{
List<CDORemoteSession> subscribed = new ArrayList<>();
while (recipients.hasNext())
{
CDORemoteSession recipient = recipients.next();
if (recipient.isSubscribed())
{
subscribed.add(recipient);
}
}
if (subscribed.isEmpty())
{
return Collections.emptySet();
}
Set<Integer> sessionIDs = localSession.getSessionProtocol().sendRemoteMessage(message, subscribed);
Set<CDORemoteSession> result = new HashSet<>();
for (CDORemoteSession recipient : subscribed)
{
if (sessionIDs.contains(recipient.getSessionID()))
{
result.add(recipient);
}
}
return result;
}
@Override
public InternalCDORemoteSession createRemoteSession(int sessionID, String userID, boolean subscribed)
{
InternalCDORemoteSession remoteSession = new CDORemoteSessionImpl(this, sessionID, userID);
remoteSession.setSubscribed(subscribed);
return remoteSession;
}
@Override
public void handleRemoteSessionOpened(int sessionID, String userID)
{
CDORemoteSession remoteSession = createRemoteSession(sessionID, userID, false);
synchronized (this)
{
remoteSessions.put(sessionID, remoteSession);
}
fireElementAddedEvent(remoteSession);
}
@Override
public void handleRemoteSessionClosed(int sessionID)
{
CDORemoteSession remoteSession = null;
synchronized (this)
{
remoteSession = remoteSessions.remove(sessionID);
}
if (remoteSession != null)
{
fireElementRemovedEvent(remoteSession);
}
}
@Override
public void handleRemoteSessionSubscribed(int sessionID, boolean subscribed)
{
IEvent event = null;
synchronized (this)
{
InternalCDORemoteSession remoteSession = (InternalCDORemoteSession)remoteSessions.get(sessionID);
if (remoteSession != null)
{
remoteSession.setSubscribed(subscribed);
event = new SubscriptionChangedEventImpl(remoteSession, subscribed);
}
}
if (event != null)
{
fireEvent(event);
}
}
@Override
public void handleRemoteSessionMessage(int sessionID, final CDORemoteSessionMessage message)
{
IEvent event = null;
synchronized (this)
{
final CDORemoteSessionManager source = this;
final InternalCDORemoteSession remoteSession = (InternalCDORemoteSession)remoteSessions.get(sessionID);
if (remoteSession != null)
{
event = new CDORemoteSessionEvent.MessageReceived()
{
@Override
public CDORemoteSessionManager getSource()
{
return source;
}
@Override
public CDORemoteSession getRemoteSession()
{
return remoteSession;
}
@Override
public CDORemoteSessionMessage getMessage()
{
return message;
}
};
}
}
if (event != null)
{
fireEvent(event);
}
}
@Override
protected void firstListenerAdded()
{
IEvent[] events = null;
synchronized (this)
{
if (!subscribed)
{
events = subscribe();
}
}
fireEvents(events);
}
@Override
protected void lastListenerRemoved()
{
IEvent[] events = null;
synchronized (this)
{
if (!forceSubscription)
{
events = unsubscribe();
}
}
fireEvents(events);
}
/**
* Needs to be synchronized externally.
*/
private IEvent[] subscribe()
{
List<CDORemoteSession> result = localSession.getSessionProtocol().getRemoteSessions(this, true);
ContainerEvent<CDORemoteSession> event = new ContainerEvent<>(this);
for (CDORemoteSession remoteSession : result)
{
remoteSessions.put(remoteSession.getSessionID(), remoteSession);
event.addDelta(remoteSession, IContainerDelta.Kind.ADDED);
}
subscribed = true;
IEvent[] events = { new LocalSubscriptionChangedEventImpl(true), event.isEmpty() ? null : event };
return events;
}
/**
* Needs to be synchronized externally.
*/
private IEvent[] unsubscribe()
{
if (localSession.isActive())
{
localSession.getSessionProtocol().unsubscribeRemoteSessions();
}
ContainerEvent<CDORemoteSession> event = new ContainerEvent<>(this);
for (CDORemoteSession remoteSession : remoteSessions.values())
{
event.addDelta(remoteSession, IContainerDelta.Kind.REMOVED);
}
remoteSessions.clear();
subscribed = false;
IEvent[] events = { new LocalSubscriptionChangedEventImpl(false), event.isEmpty() ? null : event };
return events;
}
private void fireEvents(IEvent[] events)
{
if (events != null)
{
for (int i = 0; i < events.length; i++)
{
IEvent event = events[i];
if (event != null)
{
fireEvent(event);
}
}
}
}
/**
* @author Eike Stepper
*/
private final class LocalSubscriptionChangedEventImpl extends Event implements LocalSubscriptionChangedEvent
{
private static final long serialVersionUID = 1L;
private boolean subscribed;
public LocalSubscriptionChangedEventImpl(boolean subscribed)
{
super(CDORemoteSessionManagerImpl.this);
this.subscribed = subscribed;
}
@Override
public CDORemoteSessionManager getSource()
{
return (CDORemoteSessionManager)super.getSource();
}
@Override
public boolean isSubscribed()
{
return subscribed;
}
}
/**
* @author Eike Stepper
*/
private final class SubscriptionChangedEventImpl extends Event implements CDORemoteSessionEvent.SubscriptionChanged
{
private static final long serialVersionUID = 1L;
private InternalCDORemoteSession remoteSession;
private boolean subscribed;
public SubscriptionChangedEventImpl(InternalCDORemoteSession remoteSession, boolean subscribed)
{
super(CDORemoteSessionManagerImpl.this);
this.remoteSession = remoteSession;
this.subscribed = subscribed;
}
@Override
public CDORemoteSessionManager getSource()
{
return (CDORemoteSessionManager)super.getSource();
}
@Override
public CDORemoteSession getRemoteSession()
{
return remoteSession;
}
@Override
public boolean isSubscribed()
{
return subscribed;
}
}
}