blob: c67cad16afae1611c87cd12f0913db6cdae238f3 [file] [log] [blame]
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.ThreadInterruptedException;
/** Abstract base class to rate limit IO. Typically implementations are
* shared across multiple IndexInputs or IndexOutputs (for example
* those involved all merging). Those IndexInputs and
* IndexOutputs would call {@link #pause} whenever the have read
* or written more than {@link #getMinPauseCheckBytes} bytes. */
public abstract class RateLimiter {
/**
* Sets an updated mb per second rate limit.
*/
public abstract void setMbPerSec(double mbPerSec);
/**
* The current mb per second rate limit.
*/
public abstract double getMbPerSec();
/** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target.
* <p>
* Note: the implementation is thread-safe
* </p>
* @return the pause time in nano seconds
* */
public abstract long pause(long bytes);
/** How many bytes caller should add up itself before invoking {@link #pause}. */
public abstract long getMinPauseCheckBytes();
/**
* Simple class to rate limit IO.
*/
public static class SimpleRateLimiter extends RateLimiter {
private final static int MIN_PAUSE_CHECK_MSEC = 5;
private volatile double mbPerSec;
private volatile long minPauseCheckBytes;
private long lastNS;
// TODO: we could also allow eg a sub class to dynamically
// determine the allowed rate, eg if an app wants to
// change the allowed rate over time or something
/** mbPerSec is the MB/sec max IO rate */
public SimpleRateLimiter(double mbPerSec) {
setMbPerSec(mbPerSec);
}
/**
* Sets an updated mb per second rate limit.
*/
@Override
public void setMbPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024);
}
@Override
public long getMinPauseCheckBytes() {
return minPauseCheckBytes;
}
/**
* The current mb per second rate limit.
*/
@Override
public double getMbPerSec() {
return this.mbPerSec;
}
/** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. Be sure to only call
* this method when bytes > {@link #getMinPauseCheckBytes},
* otherwise it will pause way too long!
*
* @return the pause time in nano seconds */
@Override
public long pause(long bytes) {
long startNS = System.nanoTime();
double secondsToPause = (bytes/1024./1024.) / mbPerSec;
long targetNS;
// Sync'd to read + write lastNS:
synchronized (this) {
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
targetNS = lastNS + (long) (1000000000 * secondsToPause);
if (startNS >= targetNS) {
// OK, current time is already beyond the target sleep time,
// no pausing to do.
// Set to startNS, not targetNS, to enforce the instant rate, not
// the "averaaged over all history" rate:
lastNS = startNS;
return 0;
}
lastNS = targetNS;
}
long curNS = startNS;
// While loop because Thread.sleep doesn't always sleep
// enough:
while (true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
// NOTE: except maybe on real-time JVMs, minimum realistic sleep time
// is 1 msec; if you pass just 1 nsec the default impl rounds
// this up to 1 msec:
Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
curNS = System.nanoTime();
continue;
}
break;
}
return curNS - startNS;
}
}
}