blob: a9255f8b4e43308f89ec4fd9ea056b7a02e53551 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation
**********************************************************************************************************************/
package org.eclipse.smila.connectivity.framework.compound.zip;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.connectivity.ConnectivityId;
import org.eclipse.smila.connectivity.framework.CrawlerCallback;
import org.eclipse.smila.connectivity.framework.CrawlerCriticalException;
import org.eclipse.smila.connectivity.framework.CrawlerException;
import org.eclipse.smila.connectivity.framework.DataReference;
import org.eclipse.smila.connectivity.framework.compound.AbstractCompoundCrawler;
import org.eclipse.smila.connectivity.framework.performancecounters.ConnectivityPerformanceAgent;
import org.eclipse.smila.connectivity.framework.performancecounters.CrawlerPerformanceCounterHelper;
import org.eclipse.smila.connectivity.framework.schema.config.CompoundHandling;
import org.eclipse.smila.connectivity.framework.schema.config.DataSourceConnectionConfig;
import org.eclipse.smila.connectivity.framework.schema.config.ElementAttributeType;
import org.eclipse.smila.connectivity.framework.util.ConnectivityHashFactory;
import org.eclipse.smila.connectivity.framework.util.DataReferenceFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.utils.file.EncodingHelper;
import org.eclipse.smila.utils.workspace.WorkspaceHelper;
/**
* The Interface CompoundHandler.
*/
public class ZipCompoundCrawler extends AbstractCompoundCrawler {
/** The compund system metadata element. */
public static final String COMPOUND_METADATA_ELEMENT = "_compounds";
/**
* The Constant BUNDLE_ID.
*/
private static final String BUNDLE_ID = "org.eclipse.smila.connectivity.framework.compound.zip";
/**
* The Constant QUEUE_POLL_WAITING.
*/
private static final int QUEUE_POLL_WAITING = 300;
/**
* The Constant HAS_NEXT_WAITING.
*/
private static final int HAS_NEXT_WAITING = 50;
/**
* The Constant CAPACITY.
*/
private static final int CAPACITY = 100;
/**
* The Constant STEP.
*/
private static final int STEP = 10;
/**
* Reference to the zip file.
*/
private ZipFile _zipFile;
/**
* The _queue.
*/
private ArrayBlockingQueue<DataReference> _queue;
/**
* The _crawl thread.
*/
private ZipEntryProducerThread _producerThread;
/**
* The Id to ZipEntry mapping.
*/
private Map<ConnectivityId, ZipEntry> _entryMap;
/**
* The _attributes.
*/
private CompoundHandling.CompoundAttribute[] _compoundAttributes;
/**
* The _attachment names.
*/
private String[] _attachmentNames;
/**
* Flag if initialize() was called successfully.
*/
private boolean _initialized;
/**
* The LOG.
*/
private final Log _log = LogFactory.getLog(this.getClass());
/**
*
* @param crawlerId
*/
public ZipCompoundCrawler() {
super();
}
/** {@inheritDoc} */
@Override
public void initialize(final DataSourceConnectionConfig config) throws CrawlerException, CrawlerCriticalException {
if (config == null) {
throw new CrawlerCriticalException("parameter config is null");
}
final Record record = getCompoundRecord();
if (record == null) {
throw new CrawlerCriticalException("the compound record was not set");
}
try {
// get configured compound attributes and attachments
final CompoundHandling.CompoundAttributes attributes = config.getCompoundHandling().getCompoundAttributes();
final List<CompoundHandling.CompoundAttribute> attrs = attributes.getCompoundAttributes();
_compoundAttributes = attrs.toArray(new CompoundHandling.CompoundAttribute[attrs.size()]);
final List<String> attachmentsNames = new ArrayList<String>();
for (final CompoundHandling.CompoundAttribute a : _compoundAttributes) {
if (a.isAttachment()) {
attachmentsNames.add(a.getName());
}
}
_attachmentNames = attachmentsNames.toArray(new String[attachmentsNames.size()]);
// get the content of the compound object
final String contentAttachmentName = config.getCompoundHandling().getContentAttachment();
byte[] content = record.getAttachmentAsBytes(contentAttachmentName);
if (content == null) {
content = new byte[0];
}
final File workingDir = WorkspaceHelper.createWorkingDir(BUNDLE_ID);
final File file =
new File(workingDir, createTempFileNape(new ConnectivityId(record.getSource(), record.getId())));
IOUtils.copy(new ByteArrayInputStream(content), new FileOutputStream(file));
_zipFile = new ZipFile(file);
_queue = new ArrayBlockingQueue<DataReference>(CAPACITY);
_entryMap = new HashMap<ConnectivityId, ZipEntry>();
_producerThread = new ZipEntryProducerThread(this);
_producerThread.start();
_initialized = true;
} catch (final Throwable e) {
final String msg = "Error during initialization";
if (_log.isErrorEnabled()) {
_log.error(msg, e);
}
try {
close();
} catch (final Exception ex) {
if (_log.isErrorEnabled()) {
_log.error("Error during close in initialization", ex);
}
}
throw new CrawlerCriticalException(msg, e);
}
}
/**
* creates a temporary file name from a given id.
*
* @param id
* the id
* @return a filename for the id
*/
private String createTempFileNape(final ConnectivityId id) {
return id.getIdHash();
}
/** {@inheritDoc} */
@Override
public DataReference[] getNext() throws CrawlerException, CrawlerCriticalException {
if (!_initialized) {
throw new CrawlerCriticalException("ZipCompoundCrawler was not initialized");
}
while (hasNext()) {
final List<DataReference> refList = new ArrayList<DataReference>();
try {
final DataReference ref = _queue.poll(QUEUE_POLL_WAITING, TimeUnit.MILLISECONDS);
if (ref != null) {
refList.add(ref);
final int size = _queue.drainTo(refList, STEP - 1);
return refList.toArray(new DataReference[size + 1]);
}
} catch (final InterruptedException e) {
if (_log.isTraceEnabled()) {
_log.trace("InterruptedException in getNext(): ", e);
}
}
}
return null;
}
/** {@inheritDoc} */
@Override
public void close() throws CrawlerException {
_initialized = false;
if (_zipFile != null) {
try {
_zipFile.close();
} catch (final IOException e) {
final String msg = "Could not close temporary zip file " + _zipFile.getName();
if (_log.isErrorEnabled()) {
_log.error(msg, e);
}
}
final File file = new File(_zipFile.getName());
file.delete();
_zipFile = null;
}
}
/**
* {@inheritDoc}
*
* @see CrawlerCallback#getMObject(Id)
*/
@Override
public AnyMap getMetadata(final ConnectivityId id) throws CrawlerException, CrawlerCriticalException {
if (!_initialized) {
throw new CrawlerCriticalException("ZipCompoundCrawler was not initialized");
}
final ZipEntry zipEntry = _entryMap.get(id);
if (zipEntry == null) {
throw new CrawlerException("Could not find ZipEntry for id " + id);
}
final AnyMap metadataObject = getCompoundRecord().getFactory().createAnyMap();
for (final CompoundHandling.CompoundAttribute attribute : _compoundAttributes) {
if (!attribute.isAttachment()) {
final Object value = readAttribute(zipEntry, attribute, true);
// special handling for compound path:
if (attribute.getElementAttribute() == ElementAttributeType.PATH) {
final AnySeq parentCompounds = getCompoundRecord().getMetadata().getSeq(COMPOUND_METADATA_ELEMENT);
if (getCompoundRecord() != null
&& getCompoundRecord().getMetadata().getStringValue(attribute.getName()) != null) {
final AnySeq newCompounds;
if (parentCompounds == null) {
newCompounds = getCompoundRecord().getFactory().createAnySeq();
} else {
newCompounds = parentCompounds.getFactory().cloneAnySeq(parentCompounds);
}
newCompounds.add(getCompoundRecord().getFactory().createStringValue(
getCompoundRecord().getMetadata().getStringValue(attribute.getName())));
metadataObject.put(COMPOUND_METADATA_ELEMENT, newCompounds);
}
}
if (value != null) {
try {
metadataObject.put(attribute.getName(), getCompoundRecord().getFactory().parseFromObject(value));
} catch (final Throwable e) {
throw new CrawlerException(e);
}
} // if
} // if
} // for
return metadataObject;
}
/**
* {@inheritDoc}
*
* @see CrawlerCallback#getAttachmentNames(Id)
*/
@Override
public String[] getAttachmentNames(final ConnectivityId id) throws CrawlerException, CrawlerCriticalException {
if (!_initialized) {
throw new CrawlerCriticalException("ZipCompoundCrawler was not initialized");
}
return _attachmentNames;
}
/**
* {@inheritDoc}
*
* @see CrawlerCallback#getAttachment(Id, String)
*/
@Override
public byte[] getAttachment(final ConnectivityId id, final String name) throws CrawlerException,
CrawlerCriticalException {
if (!_initialized) {
throw new CrawlerCriticalException("ZipCompoundCrawler was not initialized");
}
final ZipEntry zipEntry = _entryMap.get(id);
if (zipEntry == null) {
throw new CrawlerException("Could not find ZipEntry for id " + id);
}
// find attribute
for (final CompoundHandling.CompoundAttribute attribute : _compoundAttributes) {
if (attribute.getName().equals(name)) {
return readAttachment(zipEntry, attribute);
}
}
throw new CrawlerException(String.format("Unable to find attachment definition for [%s]", name));
}
/**
* {@inheritDoc}
*
* @see CrawlerCallback#dispose(Id)
*/
@Override
public void dispose(final ConnectivityId id) {
_entryMap.remove(id);
}
/**
* Checks for next.
*
* @return true, if successful
*/
private boolean hasNext() {
while (_producerThread != null && _queue.isEmpty()) {
try {
Thread.sleep(HAS_NEXT_WAITING);
} catch (final InterruptedException e) {
if (_log.isTraceEnabled()) {
_log.trace("InterruptedException in hasNext(): ", e);
}
}
}
return !_queue.isEmpty();
}
/**
* Reads the content of the given ZipEntry.
*
* @param zipEntry
* the ZipEntry
* @return a byte[]
* @throws CrawlerException
* if any error occurs
*/
private byte[] readZipEntryContent(final ZipEntry zipEntry) throws CrawlerException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
InputStream in = null;
try {
in = _zipFile.getInputStream(zipEntry);
IOUtils.copy(in, out);
} catch (final IOException e) {
final String msg =
"Error reading content of ZipEntry '" + zipEntry.getName() + "' of record id "
+ getCompoundRecord().getId();
if (_log.isErrorEnabled()) {
_log.error(msg, e);
}
throw new CrawlerException(msg, e);
} catch (final Throwable e) {
// this may occur if a ZipEntry containing Umlauts in the filename is read. Depending on the zip tool, the
// characters may not be encoded in utf-8, but ZipEntry expects names to be encoded in utf-8
final String msg =
"Error reading content of ZipEntry '" + zipEntry.getName() + "' of record id "
+ getCompoundRecord().getId();
if (_log.isErrorEnabled()) {
_log.error(msg, e);
}
throw new CrawlerException(msg, e);
} finally {
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(out);
}
return out.toByteArray();
}
/**
* Read attribute value.
*
* @param zipEntry
* the ZipEntry
* @param attribute
* the attribute
* @param forceByteToString
* the force byte to string
*
* @return the object
*
* @throws CrawlerException
* the crawler exception
*/
private Serializable readAttribute(final ZipEntry zipEntry, final CompoundHandling.CompoundAttribute attribute,
final boolean forceByteToString) throws CrawlerException {
switch (attribute.getElementAttribute()) {
case NAME:
return FilenameUtils.getName(zipEntry.getName());
case FILE_EXTENSION:
return FilenameUtils.getExtension(zipEntry.getName());
case PATH:
return zipEntry.getName();
case LAST_MODIFIED_DATE:
return new Date(zipEntry.getTime());
case SIZE:
return Long.valueOf(zipEntry.getSize());
case CONTENT:
try {
final byte[] bytes = readZipEntryContent(zipEntry);
if (forceByteToString) {
final String encoding = EncodingHelper.getEncoding(bytes);
if (encoding != null) {
return IOUtils.toString(new ByteArrayInputStream(bytes), encoding);
} else {
return IOUtils.toString(new ByteArrayInputStream(bytes));
}
} else {
return bytes;
}
} catch (final IOException e) {
throw new CrawlerException(e);
}
default:
throw new RuntimeException("Unknown compound element attributes type " + attribute.getElementAttribute());
}
}
/**
* Read attachment.
*
* @param zipEntry
* the ZipEntry
* @param attribute
* the attribute
*
* @return the byte[]
*
* @throws CrawlerException
* the crawler exception
*/
private byte[] readAttachment(final ZipEntry zipEntry, final CompoundHandling.CompoundAttribute attribute)
throws CrawlerException {
final Serializable value = readAttribute(zipEntry, attribute, false);
if (value != null) {
if (value instanceof String) {
try {
return ((String) value).getBytes("utf-8");
} catch (final UnsupportedEncodingException e) {
throw new CrawlerException(e);
}
} else if (value instanceof byte[]) {
return (byte[]) value;
} // TODO serialization to byte[] for other types of attachments.
}
return null;
}
/**
* Worker thread that fills the internal Queue with DataReference objects. It iterates over the _zipFile and creates a
* DataReference for each ZipEntry, ignoring directory entries.
*/
private class ZipEntryProducerThread extends Thread {
/**
* The _crawlerCallback.
*/
private final CrawlerCallback _crawlerCallback;
/**
* Instantiates a new crawling producer thread.
*
* @param crawlerCallback
* the CrawlerCallback
*/
public ZipEntryProducerThread(final CrawlerCallback crawlerCallback) {
super();
_crawlerCallback = crawlerCallback;
}
/**
* {@inheritDoc}
*
* @see java.lang.Thread#run()
*/
@Override
public void run() {
try {
if (_zipFile != null) {
final Enumeration<? extends ZipEntry> entries = _zipFile.entries();
if (entries != null) {
while (entries.hasMoreElements()) {
final ZipEntry entry = entries.nextElement();
if (entry != null && !entry.isDirectory()) {
// creation of Id and Hash are NOT configurable
final ConnectivityId parentId =
new ConnectivityId(getCompoundRecord().getSource(), getCompoundRecord().getId());
final ConnectivityId id = new ConnectivityId(parentId, entry.getName());
final String hash =
ConnectivityHashFactory.getInstance().createHash(Long.toString(entry.getTime()));
final DataReference dataRef =
DataReferenceFactory.getInstance().createDataReference(_crawlerCallback, id, hash);
boolean added = false;
while (!added) {
added = _queue.add(dataRef);
_entryMap.put(id, entry);
}
} // if
} // while
} // if
} // if
} catch (final Throwable ex) {
if (_log.isErrorEnabled()) {
_log.error("Producer error", ex);
}
} finally {
_producerThread = null;
if (_log.isInfoEnabled()) {
_log.info("ZipEntry producer thread finished!");
}
}
}
}
@Override
public CrawlerPerformanceCounterHelper<? extends ConnectivityPerformanceAgent> getCounterHelper() {
return null;
}
}