| /********************************************************************************************************************* |
| * Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.importing.crawler.web.test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.eclipse.smila.bulkbuilder.BulkbuilderService; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.Attachment; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.datamodel.ipc.BinaryObjectStreamIterator; |
| import org.eclipse.smila.importing.ImportingConstants; |
| import org.eclipse.smila.importing.crawler.web.WebCrawlerConstants; |
| import org.eclipse.smila.importing.util.PropertyNameMapper; |
| import org.eclipse.smila.jobmanager.JobRunDataProvider; |
| import org.eclipse.smila.jobmanager.JobRunEngine; |
| import org.eclipse.smila.jobmanager.JobState; |
| import org.eclipse.smila.jobmanager.definitions.JobManagerConstants; |
| import org.eclipse.smila.objectstore.ObjectStoreException; |
| import org.eclipse.smila.objectstore.ObjectStoreService; |
| import org.eclipse.smila.objectstore.StoreObject; |
| import org.eclipse.smila.utils.config.ConfigUtils; |
| |
| public class TestWebExtractorWorker extends WebExtractorTestBase { |
| |
| private static final String DATA_SOURCE = "compounds"; |
| |
| private static final String STORE = "records"; |
| |
| private static final String BUCKET = "extractedRecords"; |
| |
| private static final String JOBNAME = "extractCompounds"; |
| |
| private static final String BASEURL = "http://localhost:8765/files/"; |
| |
| private JobRunEngine _jobRunEngine; |
| |
| private JobRunDataProvider _jobRunDataProvider; |
| |
| private BulkbuilderService _bulkBuilder; |
| |
| private ObjectStoreService _objectStore; |
| |
| private final PropertyNameMapper _mapper = _webCrawlingContext.getMapper(); |
| |
| @Override |
| public void setUp() throws Exception { |
| _jobRunEngine = getService(JobRunEngine.class); |
| _jobRunDataProvider = getService(JobRunDataProvider.class); |
| _bulkBuilder = getService(BulkbuilderService.class); |
| _objectStore = getService(ObjectStoreService.class); |
| _objectStore.ensureStore(STORE); |
| _objectStore.clearStore(STORE); |
| } |
| |
| public void testZip() throws Exception { |
| final String jobRunId = _jobRunEngine.startJob(JOBNAME); |
| final Record compound = DataFactory.DEFAULT.createRecord("files.zip", "compounds"); |
| compound.getMetadata().put(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL), BASEURL + "files.zip"); |
| compound.getMetadata().put(_mapper.get(WebCrawlerConstants.ATTRIBUTE_MIMETYPE), "application/zip"); |
| _bulkBuilder.addRecord(JOBNAME, compound); |
| _bulkBuilder.commitJob(JOBNAME); |
| _jobRunEngine.finishJob(JOBNAME, jobRunId); |
| waitForJobRunCompleted(JOBNAME, jobRunId, 10000); |
| checkExtractedRecordsBulk(compound); |
| } |
| |
| private void checkExtractedRecordsBulk(final Record compound) throws ObjectStoreException, IOException { |
| final Collection<StoreObject> objects = _objectStore.getStoreObjectInfos(STORE, BUCKET); |
| assertEquals(1, objects.size()); |
| int recordCount = 0; |
| final InputStream bulkStream = _objectStore.readObject(STORE, objects.iterator().next().getId()); |
| try { |
| final BinaryObjectStreamIterator records = new BinaryObjectStreamIterator(bulkStream); |
| while (records.hasNext()) { |
| final Record record = records.next(); |
| assertNotNull(record); |
| recordCount++; |
| assertNotNull(record.getId()); |
| assertEquals(DATA_SOURCE, record.getSource()); |
| final AnyMap metadata = record.getMetadata(); |
| if (recordCount == 1) { // first should be the enriched original |
| assertEquals(compound.getId(), record.getId()); |
| assertEquals(compound.getSource(), record.getSource()); |
| assertEquals(compound.getMetadata().getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)), |
| metadata.getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL))); |
| assertTrue(metadata.getBooleanValue(ImportingConstants.ATTRIBUTE_COMPOUNDFLAG)); |
| assertFalse(metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDID)); |
| assertFalse(metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDPATH)); |
| } else { |
| assertTrue(metadata.containsKey(ImportingConstants.ATTRIBUTE_DELTA_HASH)); |
| final String url = metadata.getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)); |
| final String fileName = url.substring(url.lastIndexOf('/') + 1); |
| assertTrue("URL must start with Base URL " + BASEURL + "was: " + url, url.startsWith(BASEURL)); |
| assertTrue(metadata.containsKey(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE))); |
| assertTrue(metadata.get(_mapper.get(WebCrawlerConstants.ATTRIBUTE_LASTMODIFIED)).isDateTime()); |
| assertTrue(metadata.get(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE)).isLong()); |
| assertTrue(metadata.get(ImportingConstants.ATTRIBUTE_COMPOUNDPATH).isSeq()); |
| assertEquals(compound.getMetadata().getStringValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_URL)), |
| metadata.getSeq(ImportingConstants.ATTRIBUTE_COMPOUNDPATH).getStringValue(0)); |
| assertEquals(compound.getId(), metadata.getStringValue(ImportingConstants.ATTRIBUTE_COMPOUNDID)); |
| if (metadata.containsKey(ImportingConstants.ATTRIBUTE_COMPOUNDFLAG)) { |
| assertTrue("zip file expected, was: " + fileName, fileName.endsWith(".zip")); |
| } else { |
| assertTrue(record.hasAttachment(_mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT))); |
| final Attachment attachment = record.getAttachment(_mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT)); |
| assertEquals(metadata.getLongValue(_mapper.get(WebCrawlerConstants.ATTRIBUTE_SIZE)).longValue(), |
| attachment.size()); |
| final File originalFile = ConfigUtils.getConfigFile(AllTests.BUNDLE_ID, "files/" + fileName); |
| assertNotNull(originalFile); |
| assertFalse(".log file should be filtered out, was: " + fileName, fileName.endsWith(".log")); |
| if (originalFile.getName().endsWith(".html") || originalFile.getName().endsWith(".txt")) { |
| final List<String> originalFileContent = FileUtils.readLines(originalFile); |
| final List<String> attachmentContent = IOUtils.readLines(attachment.getAsStream()); |
| assertEquals(originalFileContent, attachmentContent); |
| } else { |
| assertTrue(Arrays.equals(FileUtils.readFileToByteArray(originalFile), attachment.getAsBytes())); |
| } |
| } |
| } |
| } |
| } finally { |
| IOUtils.closeQuietly(bulkStream); |
| } |
| // files.zip contains 9 "normal" files |
| // + one zip containing two files, one is filtered out (-> 3 - 1 files) |
| // - minus one other filtered out file (test.log) |
| // + plus original record |
| assertEquals(10, recordCount); // 9 + 2 - 1 + 1 |
| } |
| |
| /** Waits for a job to be completed. */ |
| protected void waitForJobRunCompleted(final String jobName, final String jobId, final long maxWaitTime) |
| throws Exception { |
| final long sleepTime = 500L; |
| final long millisStarted = System.currentTimeMillis(); |
| while (true) { |
| final AnyMap runData = _jobRunDataProvider.getJobRunData(jobName, jobId); |
| final String jobRunState = runData.getStringValue(JobManagerConstants.DATA_JOB_STATE); |
| if (jobRunState != null) { |
| final JobState state = JobState.valueOf(jobRunState); |
| assertNotSame(JobState.FAILED, state); |
| if (state == JobState.SUCCEEDED) { |
| return; // finally found what we're waiting for. |
| } |
| } |
| assertTrue("Waited too long for job to complete", System.currentTimeMillis() - millisStarted <= maxWaitTime); |
| Thread.sleep(sleepTime); |
| } |
| } |
| } |