blob: a7a77a70351237f4ec250cbe39d545819374a5a0 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.activity.internal;
import static org.eclipse.osee.activity.ActivityConstants.ACTIVITY_LOGGER__EXECUTOR_ID;
import static org.eclipse.osee.activity.ActivityConstants.ACTIVITY_LOGGER__EXECUTOR_POOL_SIZE;
import static org.eclipse.osee.activity.ActivityConstants.ACTIVITY_LOGGER__STACKTRACE_LINE_COUNT;
import static org.eclipse.osee.activity.ActivityConstants.ACTIVITY_LOGGER__WRITE_RATE_IN_MILLIS;
import static org.eclipse.osee.activity.ActivityConstants.DEFAULT_ACTIVITY_LOGGER__EXECUTOR_POOL_SIZE;
import static org.eclipse.osee.activity.ActivityConstants.DEFAULT_ACTIVITY_LOGGER__STACKTRACE_LINE_COUNT;
import static org.eclipse.osee.activity.ActivityConstants.DEFAULT_ACTIVITY_LOGGER__WRITE_RATE_IN_MILLIS;
import static org.eclipse.osee.activity.internal.ActivityUtil.captureStackTrace;
import static org.eclipse.osee.activity.internal.ActivityUtil.get;
import static org.eclipse.osee.framework.database.IOseeDatabaseService.MAX_VARCHAR_LENGTH;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.osee.activity.ActivityStorage;
import org.eclipse.osee.activity.api.Activity;
import org.eclipse.osee.activity.api.ActivityLog;
import org.eclipse.osee.activity.api.ActivityType;
import org.eclipse.osee.executor.admin.ExecutorAdmin;
import org.eclipse.osee.framework.jdk.core.type.DrainingIterator;
import org.eclipse.osee.framework.jdk.core.type.OseeArgumentException;
import org.eclipse.osee.framework.jdk.core.util.Collections;
import org.eclipse.osee.framework.jdk.core.util.Lib;
import org.eclipse.osee.logger.Log;
/**
* @author Ryan D. Brooks
*/
public class ActivityLogImpl implements ActivityLog, Callable<Void> {
public static enum LogEntry {
ENTRY_ID,
PARENT_ID,
TYPE_ID,
ACCOUNT_ID,
SERVER_ID,
CLIENT_ID,
START_TIME,
DURATION,
STATUS,
MESSAGE_ARGS;
Long from(Object[] entry) {
Object obj = entry[ordinal()];
if (obj instanceof Long) {
return (Long) entry[ordinal()];
}
throw new OseeArgumentException("LogEntryIndex.from may only be used with values of type Long");
}
};
private final ConcurrentHashMap<Long, Object[]> newEntities = new ConcurrentHashMap<Long, Object[]>();
private final ConcurrentHashMap<Long, Object[]> updatedEntities = new ConcurrentHashMap<Long, Object[]>();
private Log logger;
private ExecutorAdmin executorAdmin;
private ActivityStorage storage;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private ActivityMonitorImpl activityMonitor;
private volatile long freshnessMillis;
private volatile int exceptionLineCount;
private volatile int executorPoolSize;
private volatile long lastFlushTime;
public void setLogger(Log logger) {
this.logger = logger;
}
public void setActivityStorage(ActivityStorage storage) {
this.storage = storage;
}
public void setExecutorAdmin(ExecutorAdmin executorAdmin) {
this.executorAdmin = executorAdmin;
}
public void start(Map<String, Object> properties) throws Exception {
activityMonitor = new ActivityMonitorImpl();
update(properties);
}
public void stop() {
flush(true);
try {
executorAdmin.shutdown(ACTIVITY_LOGGER__EXECUTOR_ID);
} catch (Throwable th) {
logger.error(th, "Error shutting down executor [%s]", ACTIVITY_LOGGER__EXECUTOR_ID);
}
}
@Override
public void queryEntry(Long entryId, ActivityDataHandler handler) {
storage.selectEntry(entryId, handler);
}
public void update(Map<String, Object> properties) {
//@formatter:off
freshnessMillis = get(properties, ACTIVITY_LOGGER__WRITE_RATE_IN_MILLIS, DEFAULT_ACTIVITY_LOGGER__WRITE_RATE_IN_MILLIS);
exceptionLineCount = get(properties, ACTIVITY_LOGGER__STACKTRACE_LINE_COUNT, DEFAULT_ACTIVITY_LOGGER__STACKTRACE_LINE_COUNT);
int newExecutorPoolSize = get(properties, ACTIVITY_LOGGER__EXECUTOR_POOL_SIZE, DEFAULT_ACTIVITY_LOGGER__EXECUTOR_POOL_SIZE);
//@formatter:on
if (newExecutorPoolSize != executorPoolSize) {
executorPoolSize = newExecutorPoolSize;
try {
executorAdmin.shutdown(ACTIVITY_LOGGER__EXECUTOR_ID);
} catch (Throwable th) {
logger.error(th, "Error shutting down executor [%s]", ACTIVITY_LOGGER__EXECUTOR_ID);
} finally {
try {
executorAdmin.createFixedPoolExecutor(ACTIVITY_LOGGER__EXECUTOR_ID, executorPoolSize);
} catch (Throwable th) {
logger.error(th, "Error creating new executor for [%s]", ACTIVITY_LOGGER__EXECUTOR_ID);
}
}
}
}
@Override
public Long createEntry(ActivityType type, String... messageArgs) {
return createEntry(type.getTypeId(), COMPLETE_STATUS, messageArgs);
}
@Override
public Long createUpdateableEntry(ActivityType type, String... messageArgs) {
return createEntry(type.getTypeId(), INITIAL_STATUS, messageArgs);
}
@Override
public Long createEntry(ActivityType type, Long parentId, Integer status, String... messageArgs) {
return createEntry(type.getTypeId(), parentId, status, messageArgs);
}
@Override
public Long createEntry(Long typeId, Integer status, String... messageArgs) {
Object[] threadRootEntry = activityMonitor.getThreadRootEntry();
Long entryId = LogEntry.ENTRY_ID.from(threadRootEntry);
return createEntry(typeId, entryId, status, messageArgs);
}
@Override
public Long createEntry(Long typeId, Long parentId, Integer status, String... messageArgs) {
Object[] rootEntry = activityMonitor.getThreadRootEntry();
Long accountId = LogEntry.ACCOUNT_ID.from(rootEntry);
Long serverId = LogEntry.SERVER_ID.from(rootEntry);
Long clientId = LogEntry.CLIENT_ID.from(rootEntry);
Object[] entry =
createEntry(parentId, typeId, accountId, serverId, clientId, computeDuration(), status, messageArgs);
return LogEntry.ENTRY_ID.from(entry);
}
private Object[] createEntry(Long parentId, Long typeId, Long accountId, Long serverId, Long clientId, Long duration, Integer status, String... messageArgs) {
Long entryId = Lib.generateUuid();
Long startTime = System.currentTimeMillis();
String fullMsg = Collections.toString("\n", (Object[]) messageArgs);
String msg = fullMsg.substring(0, Math.min(fullMsg.length(), MAX_VARCHAR_LENGTH));
// this is the parent entry so it must be inserted first (because the entry writing is asynchronous
Object[] entry =
new Object[] {entryId, parentId, typeId, accountId, serverId, clientId, startTime, duration, status, msg};
newEntities.put(entryId, entry);
if (fullMsg.length() > MAX_VARCHAR_LENGTH) {
Long parentCursor = entryId;
for (int i = MAX_VARCHAR_LENGTH; i < fullMsg.length(); i += MAX_VARCHAR_LENGTH) {
Long continueEntryId = Lib.generateUuid();
Object[] continueEntry =
new Object[] {
continueEntryId,
parentCursor,
Activity.MSG_CONTINUATION.getTypeId(),
accountId,
serverId,
clientId,
startTime,
duration,
status,
fullMsg.substring(i, Math.min(fullMsg.length(), i + MAX_VARCHAR_LENGTH))};
newEntities.put(continueEntryId, continueEntry);
parentCursor = continueEntryId;
}
}
flush(false);
return entry;
}
@Override
public Long createThrowableEntry(ActivityType type, Throwable throwable) {
Long entryId = -1L;
try {
String message = captureStackTrace(throwable, exceptionLineCount);
entryId = createEntry(type.getTypeId(), ABNORMALLY_ENDED_STATUS, message);
} catch (Throwable th) {
logger.error(th, "logging failed in ActivityLogImpl.createThrowableEntry");
}
return entryId;
}
@Override
public boolean updateEntry(Long entryId, Integer status) {
boolean modified = false;
if (!updateIfNew(entryId, status)) {
Object[] data = updatedEntities.get(entryId);
if (data == null) {
addUpdatedEntryToMap(entryId, status);
} else {
data[LogEntry.STATUS.ordinal()] = status;
if (!updatedEntities.containsKey(entryId)) {
addUpdatedEntryToMap(entryId, status);
}
}
modified = true;
}
return modified;
}
private void addUpdatedEntryToMap(Long entryId, Integer status) {
updatedEntities.put(entryId, new Object[] {status, computeDuration(), entryId});
}
private Long computeDuration() {
long timeOfUpdate = System.currentTimeMillis();
Object[] rootEntry = activityMonitor.getThreadRootEntry();
LogEntry.START_TIME.from(rootEntry);
return timeOfUpdate - LogEntry.START_TIME.from(rootEntry);
}
/**
* If the status has changed for an entry that has not yet been written to the datastore, update in memory and return
* true if it has not yet been drained and written to the datastore
*/
private boolean updateIfNew(Long entryId, Integer status) {
Object[] data = newEntities.get(entryId);
if (data == null) {
return false;
} else {
data[LogEntry.STATUS.ordinal()] = status;
data[LogEntry.DURATION.ordinal()] = computeDuration();
return newEntities.containsKey(entryId);
}
}
@Override
public Void call() {
if (!initialized.getAndSet(true)) {
initialize();
}
if (!newEntities.isEmpty()) {
try {
storage.addEntries(new DrainingIterator<Object[]>(newEntities.values().iterator()));
} catch (Throwable ex) {
logger.error(ex, "Exception while storing updates to the activity log");
}
}
if (!updatedEntities.isEmpty()) {
try {
storage.updateEntries(new DrainingIterator<Object[]>(updatedEntities.values().iterator()));
} catch (Throwable ex) {
logger.error(ex, "Exception while storing updates to the activity log");
}
}
return null;
}
private void flush(boolean force) {
long currentTime = System.currentTimeMillis();
if (force || currentTime - lastFlushTime > freshnessMillis) {
try {
executorAdmin.schedule(ACTIVITY_LOGGER__EXECUTOR_ID, this);
} catch (Exception ex) {
logger.error(ex, "Error scheduling activity log callable");
} finally {
lastFlushTime = currentTime;
}
}
}
private void initialize() {
final Map<Long, ActivityType> types = new HashMap<Long, ActivityType>(4);
for (Activity type : Activity.values()) {
types.put(type.getTypeId(), type);
}
storage.selectTypes(new ActivityTypeDataHandler() {
@Override
public void onData(Long typeId, Long logLevel, String module, String messageFormat) {
types.remove(typeId);
}
});
if (!types.isEmpty()) {
storage.addActivityTypes(types.values());
}
}
@Override
public void completeEntry(Long entryId) {
updateEntry(entryId, COMPLETE_STATUS);
}
@Override
public void endEntryAbnormally(Long entryId) {
updateEntry(entryId, ABNORMALLY_ENDED_STATUS);
}
@Override
public void endEntryAbnormally(Long entryId, Integer status) {
if (status > COMPLETE_STATUS) {
updateEntry(entryId, status);
} else {
endEntryAbnormally(entryId);
}
}
@Override
public Long createActivityThread(ActivityType type, Long accountId, Long serverId, Long clientId, String... messageArgs) {
return createActivityThread(-1L, type, accountId, serverId, clientId, messageArgs);
}
@Override
public Long createActivityThread(Long parentId, ActivityType type, Long accountId, Long serverId, Long clientId, String... messageArgs) {
Object[] entry = createEntry(parentId, type.getTypeId(), accountId, serverId, clientId, 0L, 0, messageArgs);
activityMonitor.addActivityThread(entry);
return LogEntry.ENTRY_ID.from(entry);
}
@Override
public void createActivityTypes(ActivityType... types) {
storage.addActivityTypes(types);
}
@Override
public void queryActivityType(Long typeId, ActivityTypeDataHandler handler) {
storage.selectType(typeId, handler);
}
@Override
public boolean activityTypeExists(Long typeId) {
return storage.typeExists(typeId);
}
@Override
public void queryActivityTypes(ActivityTypeDataHandler handler) {
storage.selectTypes(handler);
}
}