blob: a482ed6ae40550fb6d670ab0fc7f84e839785c8f [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.importing.crawler.web.test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Attachment;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.ipc.BinaryObjectStreamIterator;
import org.eclipse.smila.importing.ImportingConstants;
import org.eclipse.smila.importing.crawler.web.WebCrawlerConstants;
import org.eclipse.smila.importing.util.PropertyNameMapper;
import org.eclipse.smila.jobmanager.JobRunDataProvider;
import org.eclipse.smila.jobmanager.JobRunEngine;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.utils.config.ConfigUtils;
public class TestWebExtractorWorker extends WebExtractorTestBase {
private static final String DATA_SOURCE = "compounds";
private static final String STORE = "records";
private static final String BUCKET = "extractedRecords";
private static final String JOBNAME = "extractCompounds";
private static final String BASEURL = "http://localhost:8765/files/";
private JobRunEngine _jobRunEngine;
private JobRunDataProvider _jobRunDataProvider;
private BulkbuilderService _bulkBuilder;
private ObjectStoreService _objectStore;
private final PropertyNameMapper _mapper = _webCrawlingContext.getMapper();
@Override
public void setUp() throws Exception {
_jobRunEngine = getService(JobRunEngine.class);
_jobRunDataProvider = getService(JobRunDataProvider.class);
_bulkBuilder = getService(BulkbuilderService.class);
_objectStore = getService(ObjectStoreService.class);
_objectStore.ensureStore(STORE);
_objectStore.clearStore(STORE);
}
public void testZip() throws Exception {
final String jobRunId = _jobRunEngine.startJob(JOBNAME);
final Record compound = DataFactory.DEFAULT.createRecord("files.zip", "compounds");
compound.getMetadata().put(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL), BASEURL + "files.zip");
compound.getMetadata().put(_mapper.get(WebCrawlerConstants.ATTRIBUTE_MIMETYPE), "application/zip");
_bulkBuilder.addRecord(JOBNAME, compound);
_bulkBuilder.commitJob(JOBNAME);
_jobRunEngine.finishJob(JOBNAME, jobRunId);
waitForJobRunCompleted(JOBNAME, jobRunId, 10000);
checkExtractedRecordsBulk(compound);
}
private void checkExtractedRecordsBulk(final Record compound) throws ObjectStoreException, IOException {
final Collection<StoreObject> objects = _objectStore.getStoreObjectInfos(STORE, BUCKET);
assertEquals(1, objects.size());
int recordCount = 0;
final InputStream bulkStream = _objectStore.readObject(STORE, objects.iterator().next().getId());
try {
final BinaryObjectStreamIterator records = new BinaryObjectStreamIterator(bulkStream);
while (records.hasNext()) {
final Record record = records.next();
assertNotNull(record);
recordCount++;
assertNotNull(record.getId());
assertEquals(DATA_SOURCE, record.getSource());
final AnyMap metadata = record.getMetadata();
if (recordCount == 1) { // first should be the enriched original
assertEquals(compound.getId(), record.getId());
assertEquals(compound.getSource(), record.getSource());
assertEquals(compound.getMetadata().getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)),
metadata.getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)));
assertTrue(metadata.getBooleanValue(ImportingConstants.ATTRIBUTE_COMPOUNDFLAG));
assertFalse(metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDID));
assertFalse(metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDPATH));
} else {
assertTrue(metadata.containsKey(ImportingConstants.ATTRIBUTE_DELTA_HASH));
final String url = metadata.getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL));
final String fileName = url.substring(url.lastIndexOf('/') + 1);
assertTrue("URL must start with Base URL " + BASEURL + "was: " + url, url.startsWith(BASEURL));
assertTrue(metadata.containsKey(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE)));
assertTrue(metadata.get(_mapper.get(WebCrawlerConstants.ATTRIBUTE_LASTMODIFIED)).isDateTime());
assertTrue(metadata.get(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE)).isLong());
assertTrue(metadata.get(ImportingConstants.ATTRIBUTE_COMPOUNDPATH).isSeq());
assertEquals(compound.getMetadata().getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)),
metadata.getSeq(ImportingConstants.ATTRIBUTE_COMPOUNDPATH).getStringValue(0));
assertEquals(compound.getId(), metadata.getStringValue(ImportingConstants.ATTRIBUTE_COMPOUNDID));
if (metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDFLAG)) {
assertTrue("zip file expected, was: " + fileName, fileName.endsWith(".zip"));
} else {
assertTrue(record.hasAttachment(_mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT)));
final Attachment attachment = record.getAttachment(_mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT));
assertEquals(metadata.getLongValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE)).longValue(),
attachment.size());
final File originalFile = ConfigUtils.getConfigFile(AllTests.BUNDLE_ID, "files/" + fileName);
assertNotNull(originalFile);
assertFalse(".log file should be filtered out, was: " + fileName, fileName.endsWith(".log"));
if (originalFile.getName().endsWith(".html") || originalFile.getName().endsWith(".txt")) {
final List<String> originalFileContent = FileUtils.readLines(originalFile);
final List<String> attachmentContent = IOUtils.readLines(attachment.getAsStream());
assertEquals(originalFileContent, attachmentContent);
} else {
assertTrue(Arrays.equals(FileUtils.readFileToByteArray(originalFile), attachment.getAsBytes()));
}
}
}
}
} finally {
IOUtils.closeQuietly(bulkStream);
}
// files.zip contains 9 "normal" files
// + one zip containing two files, one is filtered out (-> 3 - 1 files)
// - minus one other filtered out file (test.log)
// + plus original record
assertEquals(10, recordCount); // 9 + 2 - 1 + 1
}
/** Waits for a job to be completed. */
protected void waitForJobRunCompleted(final String jobName, final String jobId, final long maxWaitTime)
throws Exception {
final long sleepTime = 500L;
final long millisStarted = System.currentTimeMillis();
while (true) {
final AnyMap runData = _jobRunDataProvider.getJobRunData(jobName, jobId);
final String jobRunState = runData.getStringValue(JobManagerConstants.DATA_JOB_STATE);
if (jobRunState != null) {
final JobState state = JobState.valueOf(jobRunState);
assertNotSame(JobState.FAILED, state);
if (state == JobState.SUCCEEDED) {
return; // finally found what we're waiting for.
}
}
assertTrue("Waited too long for job to complete", System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(sleepTime);
}
}
}