blob: 452dcb82d050ecc8e7d4e0397f37d1375c32845f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.HdfsUtil;
import org.apache.solr.util.IOUtils;
/** @lucene.experimental */
public class HdfsUpdateLog extends UpdateLog {
private volatile FileSystem fs;
private volatile Path tlogDir;
private final String confDir;
public HdfsUpdateLog() {
this.confDir = null;
}
public HdfsUpdateLog(String confDir) {
this.confDir = confDir;
}
// HACK
// while waiting for HDFS-3107, instead of quickly
// dropping, we slowly apply
// This is somewhat brittle, but current usage
// allows for it
@Override
public boolean dropBufferedUpdates() {
Future<RecoveryInfo> future = applyBufferedUpdates();
if (future != null) {
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return true;
}
@Override
public void init(PluginInfo info) {
dataDir = (String) info.initArgs.get("dir");
defaultSyncLevel = SyncLevel.getSyncLevel((String) info.initArgs
.get("syncLevel"));
}
private Configuration getConf() {
Configuration conf = new Configuration();
if (confDir != null) {
HdfsUtil.addHdfsResources(conf, confDir);
}
return conf;
}
@Override
public void init(UpdateHandler uhandler, SolrCore core) {
// ulogDir from CoreDescriptor overrides
String ulogDir = core.getCoreDescriptor().getUlogDir();
if (ulogDir != null) {
dataDir = ulogDir;
}
if (dataDir == null || dataDir.length()==0) {
dataDir = core.getDataDir();
}
if (!core.getDirectoryFactory().isAbsolute(dataDir)) {
try {
dataDir = core.getDirectoryFactory().getDataHome(core.getCoreDescriptor());
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
FileSystem oldFs = fs;
try {
fs = FileSystem.newInstance(new Path(dataDir).toUri(), getConf());
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
try {
if (oldFs != null) {
oldFs.close();
}
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
this.uhandler = uhandler;
if (dataDir.equals(lastDataDir)) {
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id,
" this is a reopen... nothing else to do.");
}
versionInfo.reload();
// on a normal reopen, we currently shouldn't have to do anything
return;
}
lastDataDir = dataDir;
tlogDir = new Path(dataDir, TLOG_NAME);
while (true) {
try {
if (!fs.exists(tlogDir)) {
boolean success = fs.mkdirs(tlogDir);
if (!success) {
throw new RuntimeException("Could not create directory:" + tlogDir);
}
} else {
fs.mkdirs(tlogDir); // To check for safe mode
}
break;
} catch (RemoteException e) {
if (e.getClassName().equals(
"org.apache.hadoop.hdfs.server.namenode.SafeModeException")) {
log.warn("The NameNode is in SafeMode - Solr will wait 5 seconds and try again.");
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
Thread.interrupted();
}
continue;
}
throw new RuntimeException(
"Problem creating directory: " + tlogDir, e);
} catch (IOException e) {
throw new RuntimeException("Problem creating directory: " + tlogDir, e);
}
}
tlogFiles = getLogList(fs, tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the
// next update
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs="
+ Arrays.asList(tlogFiles) + ", next id=" + id);
}
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
Path f = new Path(tlogDir, oldLogName);
try {
oldLog = new HdfsTransactionLog(fs, f, null, true);
addOldLog(oldLog, false); // don't remove old logs on startup since more
// than one may be uncapped.
} catch (Exception e) {
SolrException.log(log, "Failure to open existing log file (non fatal) "
+ f, e);
try {
fs.delete(f, false);
} catch (IOException e1) {
throw new RuntimeException(e1);
}
}
}
// Record first two logs (oldest first) at startup for potential tlog
// recovery.
// It's possible that at abnormal shutdown both "tlog" and "prevTlog" were
// uncapped.
for (TransactionLog ll : logs) {
newestLogsOnStartup.addFirst(ll);
if (newestLogsOnStartup.size() >= 2) break;
}
try {
versionInfo = new VersionInfo(this, 256);
} catch (SolrException e) {
log.error("Unable to use updateLog: " + e.getMessage(), e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to use updateLog: " + e.getMessage(), e);
}
// TODO: these startingVersions assume that we successfully recover from all
// non-complete tlogs.
HdfsUpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
try {
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
startingOperation = startingUpdates.getLatestOperation();
// populate recent deletes list (since we can't get that info from the
// index)
for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) {
DeleteUpdate du = startingUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1, du.version));
}
// populate recent deleteByQuery commands
for (int i = startingUpdates.deleteByQueryList.size() - 1; i >= 0; i--) {
Update update = startingUpdates.deleteByQueryList.get(i);
List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
long version = (Long) dbq.get(1);
String q = (String) dbq.get(2);
trackDeleteByQuery(q, version);
}
} finally {
startingUpdates.close();
}
}
@Override
public String getLogDir() {
return tlogDir.toUri().toString();
}
public static String[] getLogList(FileSystem fs, Path tlogDir) {
final String prefix = TLOG_NAME + '.';
assert fs != null;
FileStatus[] fileStatuses;
try {
fileStatuses = fs.listStatus(tlogDir, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(prefix);
}
});
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
String[] names = new String[fileStatuses.length];
for (int i = 0; i < fileStatuses.length; i++) {
names[i] = fileStatuses[i].getPath().getName();
}
Arrays.sort(names);
return names;
}
@Override
public void close(boolean committed) {
close(committed, false);
}
@Override
public void close(boolean committed, boolean deleteOnClose) {
try {
super.close(committed, deleteOnClose);
} finally {
IOUtils.closeQuietly(fs);
}
}
@Override
protected void ensureLog() {
if (tlog == null) {
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN,
TLOG_NAME, id);
tlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
globalStrings);
}
}
/**
* Clears the logs on the file system. Only call before init.
*
* @param core the SolrCore
* @param ulogPluginInfo the init info for the UpdateHandler
*/
@Override
public void clearLog(SolrCore core, PluginInfo ulogPluginInfo) {
if (ulogPluginInfo == null) return;
Path tlogDir = new Path(getTlogDir(core, ulogPluginInfo));
try {
if (fs != null && fs.exists(tlogDir)) {
String[] files = getLogList(tlogDir);
for (String file : files) {
Path f = new Path(tlogDir, file);
boolean s = fs.delete(f, false);
if (!s) {
log.error("Could not remove tlog file:" + f);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String[] getLogList(Path tlogDir) throws FileNotFoundException, IOException {
final String prefix = TLOG_NAME+'.';
FileStatus[] files = fs.listStatus(tlogDir, new PathFilter() {
@Override
public boolean accept(Path name) {
return name.getName().startsWith(prefix);
}
});
List<String> fileList = new ArrayList<>(files.length);
for (FileStatus file : files) {
fileList.add(file.getPath().getName());
}
return fileList.toArray(new String[0]);
}
/**
* Returns true if we were able to drop buffered updates and return to the
* ACTIVE state
*/
// public boolean dropBufferedUpdates() {
// versionInfo.blockUpdates();
// try {
// if (state != State.BUFFERING) return false;
//
// if (log.isInfoEnabled()) {
// log.info("Dropping buffered updates " + this);
// }
//
// // since we blocked updates, this synchronization shouldn't strictly be
// necessary.
// synchronized (this) {
// if (tlog != null) {
// tlog.rollback(recoveryInfo.positionOfStart);
// }
// }
//
// state = State.ACTIVE;
// operationFlags &= ~FLAG_GAP;
// } catch (IOException e) {
// SolrException.log(log,"Error attempting to roll back log", e);
// return false;
// }
// finally {
// versionInfo.unblockUpdates();
// }
// return true;
// }
public String toString() {
return "HDFSUpdateLog{state=" + getState() + ", tlog=" + tlog + "}";
}
}