blob: 9c54ed7a0c9049264d15cb7e1669af898d99a714 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2017 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package org.eclipse.equinox.internal.log.stream;
import java.io.Closeable;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogReaderService;
import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.PushEventSource;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.tracker.ServiceTracker;
public class LogEntrySource implements PushEventSource<LogEntry> {
private final Set<PushEventConsumer<? super LogEntry>> consumers = new CopyOnWriteArraySet<>();
private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory;
private volatile PushStream<LogEntry> logStream;
private final ReentrantLock historyLock = new ReentrantLock();
public LogEntrySource(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) {
this.withHistory = withHistory;
}
public PushStream<LogEntry> getLogStream() {
return logStream;
}
public void setLogStream(PushStream<LogEntry> logStream) {
this.logStream = logStream;
}
/* Open method isused to connect to the source and begin receiving a stream of events.
* It returns an AutoCloseable which can be used to close the event stream.
* If the close method is called on this object then the stream is terminated by sending a close event.
* (non-Javadoc)
* @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.PushEventConsumer)
*/
@Override
public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception {
LinkedBlockingDeque<LogEntry> historyList = new LinkedBlockingDeque<>();
if (!consumers.add(aec)) {
throw new IllegalStateException("Cannot add the same consumer multiple times"); //$NON-NLS-1$
}
/*when history is not equal to null then we acquire a lock to provide the full history
* to the consumer first before any other new entries
*/
if (withHistory != null) {
historyLock.lock();
try {
AtomicReference<LogReaderService> readerRef = withHistory.getService();
LogReaderService reader = readerRef.get();
if (reader != null) {
// Enumeration has the most recent entry first
Enumeration<LogEntry> e = reader.getLog();
if (e != null) {
while (e.hasMoreElements()) {
historyList.add(e.nextElement());
}
}
//Logging the history in the order of their appearance
if (historyList != null) {
while (!historyList.isEmpty()) {
LogEntry logEntry = historyList.removeLast();
logged(logEntry);
}
}
}
} finally {
historyLock.unlock();
}
}
Closeable result = () -> {
if (consumers.remove(aec)) {
try {
aec.accept(PushEvent.close());
} catch (Exception e) {
// ignore here for log stream
}
}
};
return result;
}
public void logged(LogEntry entry) {
if (withHistory != null) {
historyLock.lock();
}
/*consumer accepts the incoming log entries and returns a back pressure.
* A return of zero indicates that event delivery may continue immediately.
* A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds.
* A return value of -1 indicates that no further events should be sent and that the stream can be closed.
* @see org.osgi.util.pushstream.PushEventConsumer<T>
*/
try {
for (PushEventConsumer<? super LogEntry> consumer : consumers) {
try {
long status = consumer.accept(PushEvent.data(entry));
if (status < 0) {
consumer.accept(PushEvent.close());
}
} catch (Exception e) {
// we ignore exceptions here for log stream
}
}
} finally {
if (withHistory != null) {
historyLock.unlock();
}
}
}
}