blob: cdb89fc96aecccbf1486b21227b8ea4f6ae0d78a [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the accompanying
* materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation
**********************************************************************************************************************/
package org.eclipse.smila.connectivity.framework;
import java.util.Date;
import java.util.Dictionary;
import java.util.Iterator;
import org.eclipse.smila.connectivity.framework.performancecounters.AgentPerformanceCounterHelper;
import org.eclipse.smila.connectivity.framework.performancecounters.ConnectivityPerformanceAgent;
import org.eclipse.smila.connectivity.framework.schema.config.DataSourceConnectionConfig;
import org.eclipse.smila.connectivity.framework.util.AgentControllerCallback;
import org.eclipse.smila.connectivity.framework.util.AgentThreadState;
import org.eclipse.smila.datamodel.Record;
import org.osgi.service.component.ComponentContext;
/**
* Abstract base class for Agent implementations.
*/
public abstract class AbstractAgent implements Agent {
/** The logger. */
protected org.apache.commons.logging.Log _log = org.apache.commons.logging.LogFactory.getLog(this.getClass());
/**
* The _counter helper.
*/
protected AgentPerformanceCounterHelper<ConnectivityPerformanceAgent> _performanceCounters;
/**
* The agent ID.
*/
private String _agentId;
/**
* Reference to the AgentControllerCallback.
*/
private AgentControllerCallback _controllerCallback;
/**
* Reference to the DataSourceConnectionConfig.
*/
private DataSourceConnectionConfig _config;
/**
* The delta indexing session.
*/
private String _sessionId;
/**
* Flag if the agent thread should be / was stopped.
*/
private boolean _stopThread;
/**
* Reference to the Agents thread.
*/
private Thread _agentThread;
/**
* The state of the agent.
*/
private AgentState _agentState;
/**
* Default Constructor.
*/
public AbstractAgent() {
}
/**
* Returns the Agent Id, which is the OSGi DecarativeService Component Name.
*
* @return the AgentId
* @throws AgentException
* if any error occurs
*/
@Override
public String getAgentId() throws AgentException {
return _agentId;
}
/**
* Gets the AgentState.
*
* @return the AgentState
*/
public AgentState getAgentState() {
return _agentState;
}
/**
* {@inheritDoc}
*
* @see org.eclipse.smila.connectivity.framework.Agent#start(AgentControllerCallback, AgentState,
* DataSourceConnectionConfig, String)
*/
@Override
public void start(final AgentControllerCallback controllerCallback, final AgentState agentState,
final DataSourceConnectionConfig config, final String sessionId) throws AgentException {
if (controllerCallback == null) {
throw new AgentException("parameter controllerCallback is null");
}
if (config == null) {
throw new AgentException("parameter config is null");
}
_stopThread = false;
_controllerCallback = controllerCallback;
_config = config;
_sessionId = sessionId;
// initialize the AgentState
_agentState = agentState;
_agentState.setDataSourceId(config.getDataSourceID());
_agentState.setState(AgentThreadState.Running);
_agentState.setStartTime(System.currentTimeMillis());
// initialize the agent (depends on implementation)
initialize();
initializePerformanceCounterHelper();
getCounterHelper().setImportRunId(_agentState.getImportRunId());
getCounterHelper().setStartDate(new Date(_agentState.getStartTime()));
// initialize the Agent thread
_agentThread = new Thread(this, "Agent:" + getConfig().getDataSourceID());
_agentThread.start();
}
/**
* @return the AgentPerformanceCounterHelper
*/
@Override
public AgentPerformanceCounterHelper<ConnectivityPerformanceAgent> getCounterHelper() {
return _performanceCounters;
}
/**
* creates a new AgentPerformanceCounterHelper instance.
*/
protected void initializePerformanceCounterHelper() {
_performanceCounters =
new AgentPerformanceCounterHelper<ConnectivityPerformanceAgent>(_config, hashCode(),
ConnectivityPerformanceAgent.class);
}
/**
* {@inheritDoc}
*
* @see org.eclipse.smila.connectivity.framework.Agent#stop()
*/
@Override
public void stop() throws AgentException {
stopThread();
_agentState.setState(AgentThreadState.Stopped);
}
/**
* Returns the value of the flag _stopThread.
*
* @return true if the thread is stopped, false otherwise
*/
protected boolean isStopThread() {
return _stopThread;
}
/**
* Stops the agent thread.
*/
protected void stopThread() {
_agentState.setEndTime(System.currentTimeMillis());
getCounterHelper().setEndDate(new Date(_agentState.getEndTime()));
_stopThread = true;
_agentThread = null;
_controllerCallback.unregister(_sessionId, _config.getDeltaIndexing(), _config.getDataSourceID());
}
/**
* Gets the AgentControllerCallback.
*
* @return the AgentControllerCallback
*/
protected AgentControllerCallback getControllerCallback() {
return _controllerCallback;
}
/**
* Gets the DataSourceConnectionConfig.
*
* @return the DataSourceConnectionConfig
*/
protected DataSourceConnectionConfig getConfig() {
return _config;
}
/**
* Returns the delta indexing session id.
*
* @return the delta indexing session id
*/
protected String getSessionId() {
return _sessionId;
}
/**
* Activate the component.
*
* @param context
* the ComponentContext
*/
@SuppressWarnings("unchecked")
protected void activate(final ComponentContext context) {
final Dictionary<String, String> dictionary = context.getProperties();
_agentId = dictionary.get("component.name");
}
/**
* Deactivate the component.
*
* @param context
* the ComponentContext
*/
protected void deactivate(final ComponentContext context) {
_agentId = null;
}
/**
* Method to contain initialization of the agent. Must be implemented by subclasses. This method is called just before
* the AgentThread is started.
*
* @throws AgentException
* if any error occurs
*/
protected abstract void initialize() throws AgentException;
/**
* Gets the attachments bytes length.
*
* @param record
* the record
*
* @return the attachments size
*/
protected long getAttachmentsByteLength(final Record record) {
if (!record.hasAttachments()) {
return 0;
}
long size = 0;
final Iterator<String> attachmentNames = record.getAttachmentNames();
while (attachmentNames.hasNext()) {
final String attachmentName = attachmentNames.next();
final byte[] attachment = record.getAttachmentAsBytes(attachmentName);
if (attachment != null) {
size += attachment.length;
}
}
return size;
}
}