blob: 92b132ccf6f69dba119143526d0011e458a4e576 [file] [log] [blame]
/****************************************************************************
* Copyright (c) 2009 Composent, Inc. and others.
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*****************************************************************************/
package org.eclipse.ecf.internal.examples.loadbalancing.consumer;
import org.eclipse.ecf.core.IContainer;
import org.eclipse.ecf.core.IContainerManager;
import org.eclipse.ecf.core.identity.IDFactory;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.examples.loadbalancing.IDataProcessor;
import org.eclipse.ecf.remoteservice.IRemoteService;
import org.eclipse.ecf.remoteservice.IRemoteServiceContainerAdapter;
import org.eclipse.ecf.remoteservice.IRemoteServiceReference;
import org.eclipse.ecf.remoteservice.util.tracker.IRemoteServiceTrackerCustomizer;
import org.eclipse.ecf.remoteservice.util.tracker.RemoteServiceTracker;
import org.eclipse.equinox.app.IApplication;
import org.eclipse.equinox.app.IApplicationContext;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
public class DataProcessorConsumerApplication implements IApplication {
private static final String LB_SVCCONSUMER_CONTAINER_TYPE = "ecf.jms.activemq.tcp.client";
private static final String DEFAULT_TOPIC_ID = "tcp://localhost:61616/exampleTopic";
private static final String DEFAULT_INPUT_DATA = "hello there";
private BundleContext bundleContext;
private ServiceTracker containerManagerServiceTracker;
// JMS topic URI that we will connect to in order to lookup/get/use the
// data processor remote service. Note that this topicId can be
// changed by using the -topicId launch parameter...e.g.
// -topicId tcp://myjmdnsbrokerdnsname:61616/myTopicName
private String topicId = DEFAULT_TOPIC_ID;
// Container type is the load balancing service consumer container type,
// which is normal client
private String containerType = LB_SVCCONSUMER_CONTAINER_TYPE;
// Container instance that connects us with the ActiveMQ queue as a message
// producer and publishes the service on the topicId
private IContainer container;
private IRemoteServiceContainerAdapter remoteServiceAdapter;
// Input data that is passed to the data processor
private String inputData = DEFAULT_INPUT_DATA;
// Lock and flag for synchronization
private final Object remoteServiceReceivedLock = new Object();
private boolean remoteServiceReceived = false;
// Remote service. The RemoteServiceTrackerCustomizer sets this
IRemoteService remoteService;
class RemoteServiceTrackerCustomizer implements
IRemoteServiceTrackerCustomizer {
public IRemoteService addingService(IRemoteServiceReference reference) {
remoteService = remoteServiceAdapter.getRemoteService(reference);
try {
IDataProcessor dataProcessorProxy = (IDataProcessor) remoteService
.getProxy();
System.out.println("Calling remote service with input data="
+ inputData);
// And then call it
String result = dataProcessorProxy.processData(inputData);
// And print out results
System.out.println("\tremote service result=" + result);
} catch (ECFException e) {
e.printStackTrace();
}
synchronized (remoteServiceReceivedLock) {
remoteServiceReceived = true;
remoteServiceReceivedLock.notify();
}
return remoteService;
}
public void modifiedService(IRemoteServiceReference reference,
IRemoteService remoteService) {
}
public void removedService(IRemoteServiceReference reference,
IRemoteService remoteService) {
}
}
public Object start(IApplicationContext appContext) throws Exception {
bundleContext = Activator.getContext();
// Process Arguments...i.e. set queueId and topicId if specified
processArgs(appContext);
// Create container of appropriate type
container = getContainerManagerService().getContainerFactory()
.createContainer(containerType);
// Get appropriate adapter
remoteServiceAdapter = (IRemoteServiceContainerAdapter) container
.getAdapter(IRemoteServiceContainerAdapter.class);
// Create remote service tracker, and then open it
RemoteServiceTracker tracker = new RemoteServiceTracker(
remoteServiceAdapter, null, IDataProcessor.class.getName(),
new RemoteServiceTrackerCustomizer());
// Open it
tracker.open();
// Connect to topic. This should trigger remote service registration to be asynchronously
// added, and in turn call RemoteServiceTrackerCustomizer.addingService (see impl of that
// method above
container.connect(
IDFactory.getDefault().createID(
container.getConnectNamespace(), topicId), null);
// Wait for remote service tracker to receive proxy and execute. See
// RemoteServiceTrackerCustomizer.addingService above
waitForRemoteService();
return IApplication.EXIT_OK;
}
public void stop() {
if (container != null) {
container.dispose();
container = null;
getContainerManagerService().removeAllContainers();
}
if (containerManagerServiceTracker != null) {
containerManagerServiceTracker.close();
containerManagerServiceTracker = null;
}
synchronized (remoteServiceReceivedLock) {
remoteServiceReceived = true;
remoteServiceReceivedLock.notifyAll();
}
bundleContext = null;
}
private void processArgs(IApplicationContext appContext) {
String[] originalArgs = (String[]) appContext.getArguments().get(
"application.args");
if (originalArgs == null)
return;
for (int i = 0; i < originalArgs.length; i++) {
if (originalArgs[i].equals("-topicId")) {
topicId = originalArgs[i + 1];
i++;
} else if (originalArgs[i].equals("-inputData")) {
StringBuffer buf = new StringBuffer();
for (int j = i + 1; j < originalArgs.length; j++) {
buf.append(originalArgs[j]).append(" ");
}
inputData = buf.toString();
return;
} else if (originalArgs[i].equals("-containerType")) {
containerType = originalArgs[i + 1];
i++;
}
}
}
private IContainerManager getContainerManagerService() {
if (containerManagerServiceTracker == null) {
containerManagerServiceTracker = new ServiceTracker(bundleContext,
IContainerManager.class.getName(), null);
containerManagerServiceTracker.open();
}
return (IContainerManager) containerManagerServiceTracker.getService();
}
private void waitForRemoteService() {
// then just wait here
synchronized (remoteServiceReceivedLock) {
while (!remoteServiceReceived) {
try {
remoteServiceReceivedLock.wait();
} catch (InterruptedException e) {
// do nothing
}
}
}
}
}