blob: 04efea9a0ba8b22baeb53535972576bd5562340f [file] [log] [blame]
/*=============================================================================#
# Copyright (c) 2019, 2022 Stephan Wahlbrink and others.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
package org.eclipse.statet.rj.servi.pool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullLateInit;
import static org.eclipse.statet.internal.rj.servi.APool2.CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.security.auth.login.LoginException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
import org.eclipse.statet.jcommons.status.NullProgressMonitor;
import org.eclipse.statet.jcommons.status.ProgressMonitor;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
import org.eclipse.statet.internal.rj.servi.NodeHandler;
import org.eclipse.statet.internal.rj.servi.RServiImpl;
import org.eclipse.statet.rj.data.RDataUtils;
import org.eclipse.statet.rj.server.client.AbstractRJComClient;
import org.eclipse.statet.rj.server.util.RJContext;
import org.eclipse.statet.rj.servi.RServi;
import org.eclipse.statet.rj.servi.RServiUtils;
import org.eclipse.statet.rj.servi.jmx.PoolStatusMX;
import org.eclipse.statet.rj.servi.node.RServiNodeConfig;
import org.eclipse.statet.rj.servi.test.Accessor;
@EnabledIfEnvironmentVariable(named= "STATET_TEST_FILES", matches= ".+")
@NonNullByDefault
public class JMPoolTest extends AbstractServiTest {
private static Duration NODE_STARTUP_DURATION= Duration.ofSeconds(7);
private static Duration WAIT_IDLE_DURATION= Duration.ofSeconds(30);
private static double TIME_TOLERANCE_MILLIS= 10;
private static List<JMPoolServer> allServers= new ArrayList<>();
private static @Nullable JMPoolServer sharedServer;
@AfterAll
public static void disposeServers() throws Exception {
final List<Throwable> exceptions= new ArrayList<>();
try {
for (final JMPoolServer server : allServers) {
try {
server.shutdown();
server.waitForDisposal(0);
}
catch (final Exception e) {
exceptions.add(e);
}
}
}
finally {
allServers.clear();
Thread.sleep(1000);
}
reportErrors(exceptions);
}
private JMPoolServer server= nonNullLateInit();
public JMPoolTest() throws Exception {
}
@BeforeEach
public void initServer() throws Exception {
JMPoolServer server= sharedServer;
if (server == null) {
final RJContext context= ServiTests.getRJContext();
final RServiNodeConfig nodeConfig= new RServiNodeConfig();
nodeConfig.load(getEnvConfiguration("default"));
server= new JMPoolServer("JMPoolTest", context);
final NetConfig netConfig= new NetConfig();
netConfig.setRegistryEmbed(true);
netConfig.setHostAddress("localhost");
netConfig.setRegistryPort(ServiTests.getRegistryPort());
server.setNetConfig(netConfig);
server.setNodeConfig(nodeConfig);
allServers.add(server);
JMPoolTest.sharedServer= server;
}
this.server= server;
}
@AfterEach
public void stopServer() throws Exception {
final List<Throwable> exceptions= new ArrayList<>();
disposeServis(exceptions);
try {
if (this.server != null) {
try {
this.server.stop();
}
catch (final Throwable e) {
exceptions.add(e);
}
}
}
catch (final Exception e) {
exceptions.add(e);
}
reportErrors(exceptions);
}
private RServi getServi(final String id) throws NoSuchElementException, LoginException, StatusException {
final RServi servi= RServiUtils.getRServi(nonNullAssert(this.server.getPoolAddress()), id);
onServiGet(servi);
return servi;
}
@Test
public void test1() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
closeServi(servi1);
}
@Test
public void RServi_closed() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
assertEquals(false, servi1.isClosed());
closeServi(servi1);
assertEquals(true, servi1.isClosed());
try {
assertNodeOperative(servi1);
throw new AssertionError("not closed");
}
catch (final StatusException e) {
final Status eStatus= e.getStatus();
assertEquals(Status.ERROR, eStatus.getSeverity());
assertTrue(eStatus.getMessage().contains("closed"));
}
}
@Test
public void IdleNodes_default() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
this.server.setPoolConfig(poolConfig);
this.server.start();
assertIdleCount(1);
final RServi servi1= getServi("test1");
assertIdleCount(1);
assertNodeOperative(servi1);
closeServi(servi1);
assertIdleCount(2);
}
@Test
public void IdleNodes_ensureMin() throws Exception,
InterruptedException {
final PoolConfig poolConfig= new PoolConfig();
this.server.setPoolConfig(poolConfig);
this.server.start();
assertIdleCount(1);
poolConfig.setMinIdleCount(2);
this.server.setPoolConfig(poolConfig);
assertIdleCount(2);
}
@Test
public void TotalNodes_allocateMax() throws Exception,
InterruptedException {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(2);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
final RServi servi2= getServi("test2");
assertNodeOperative(servi1);
assertNodeOperative(servi2);
Thread.sleep(WAIT_IDLE_DURATION.toMillis());
assertIdleCount(0);
closeServi(servi1);
closeServi(servi2);
}
@Test
public void TotalNodes_allocateTimeout() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(2);
this.server.setPoolConfig(poolConfig);
this.server.start();
final var timeout= nonNullAssert(poolConfig.getAllocationTimeout());
final RServi servi1= getServi("test1");
assertNotNull(servi1);
final long t2= System.nanoTime();
final RServi servi2= getServi("test2");
assertNotNull(servi2);
{ final Duration d2= Duration.ofNanos(System.nanoTime() - t2);
final Duration dMax= timeout.plus(NODE_STARTUP_DURATION);
assertTrue(d2.compareTo(dMax) < 0, () ->
String.format("duration expected: < %1$sms, actual: %2$sms", dMax, d2) );
}
final long t3= System.nanoTime();
try {
final RServi servi3= getServi("test3");
assertNotNull(servi3);
throw new AssertionError("totalCount");
}
catch (final NoSuchElementException e) {
final Duration d3= Duration.ofNanos(System.nanoTime() - t3);
final Duration dExpected= timeout;
final double deltaMillis= 500;
assertDurationEquals(dExpected, d3, deltaMillis, () ->
String.format("duration expected: %1$sms (±%3$sms), actual: %2$sms",
dExpected, d3, deltaMillis ));
}
}
@Test
public void UsageCount_allocateMax() throws Exception {
final ProgressMonitor m1= new NullProgressMonitor();
final ProgressMonitor m2= new NullProgressMonitor();
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
poolConfig.setMaxUsageCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1, m1);
final int pid1= RDataUtils.checkSingleIntValue(servi1.evalData("Sys.getpid()", m1));
closeServi(servi1);
final RServi servi2= getServi("test2");
assertNodeOperative(servi2, m2);
final int pid2= RDataUtils.checkSingleIntValue(servi2.evalData("Sys.getpid()", m2));
assertTrue(pid1 != pid2);
closeServi(servi2);
}
@Test
public void ClientCheck_alive() throws Exception {
final int periodMillis= 15_000;
System.setProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY,
Integer.toString(periodMillis) );
try {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
final PoolNodeObject node1= assertSinglePoolNode();
awaitStateCondition(node1, (state) -> (state != PoolNodeState.ALLOCATED),
Duration.ofMillis((long)(1.2 * periodMillis)).plus(WAIT_IDLE_DURATION) );
skipChecking(node1);
assertEquals(PoolNodeState.ALLOCATED, node1.getState());
assertNodeOperative(servi1);
}
finally {
System.clearProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY);
}
}
@Test
public void ClientCheck_dead() throws Exception {
final int periodMillis= 15_000;
System.setProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY,
Integer.toString(periodMillis) );
try {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
final PoolNodeObject node1= assertSinglePoolNode();
final Accessor servi1Impl= new Accessor(servi1, RServiImpl.class);
final Accessor rjsClient= new Accessor(servi1Impl.getFieldObj("rjs"), AbstractRJComClient.class);
final ScheduledFuture<?> clientCheckJob= (ScheduledFuture<?>)rjsClient.getFieldObj("periodicCheckJob");
clientCheckJob.cancel(false);
awaitStateCondition(node1, (state) -> (state != PoolNodeState.ALLOCATED),
Duration.ofMillis((long)(1.2 * periodMillis)).plus(WAIT_IDLE_DURATION) );
skipChecking(node1);
assertDisposingToDisposed(node1);
Assertions.assertThrows(StatusException.class, () -> assertNodeOperative(servi1));
}
finally {
System.clearProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY);
}
}
@Test
public void ClientCheck_disabled() throws Exception {
System.setProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY, "-1");
try {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
// final PoolNodeObject node1= assertSinglePoolNode();
//
// awaitStateCondition(node1, (state) -> (state != PoolNodeState.ALLOCATED),
// (int)(1.2 * APool2.CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_DEFAULT_VALUE) + WAIT_IDLE_MILLIS );
// skipChecking(node1);
// assertEquals(PoolNodeState.ALLOCATED, node1.getState());
// assertNodeOperative(servi1);
closeServi(servi1);
}
finally {
System.clearProperty(CLIENT_ALLOCATION_RENEW_PERIOD_MILLIS_PROPERTY_KEY);
}
}
@Test
public void handleCrashedEngine() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
final ProgressMonitor m= new NullProgressMonitor();
assertNodeOperative(servi1, m);
final PoolNodeObject node1= assertSinglePoolNode();
assertEquals(PoolNodeState.ALLOCATED, node1.getState());
final var clientState= new AtomicInteger();
final var clientWorker= CompletableFuture.supplyAsync(() -> {
clientState.set(1);
try {
servi1.evalVoid("while (true) { Sys.sleep(1) }", m);
return null;
}
catch (final StatusException e) {
return e;
}
finally {
clientState.set(2);
}
});
while (clientState.get() == 0) {
Thread.sleep(100);
}
final Accessor nodeHandler= new Accessor(node1.getNodeHandler(), NodeHandler.class);
final Process process= nonNullAssert((Process)nodeHandler.getFieldObj("process"));
process.destroy();
assertTrue((clientWorker.get(1, TimeUnit.SECONDS) instanceof StatusException));
if (!servi1.isClosed()) {
closeServi(servi1);
}
assertNotEquals(PoolNodeState.ALLOCATED, node1.getState());
skipChecking(node1);
assertDisposingToDisposed(node1);
}
@Test
public void PoolNode_getState() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
poolConfig.setMinIdleCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
assertIdleCount(1);
final PoolNodeObject node1= assertSinglePoolNode();
assertEquals(PoolNodeState.IDLING, node1.getState());
final RServi servi1= getServi("test1");
assertEquals(PoolNodeState.ALLOCATED, node1.getState());
closeServi(servi1);
assertNotEquals(PoolNodeState.ALLOCATED, node1.getState());
skipChecking(node1);
assertEquals(PoolNodeState.IDLING, node1.getState());
node1.evict(null);
assertDisposingToDisposed(node1);
}
@Test
public void PoolNode_getClientLabel() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
poolConfig.setMinIdleCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
assertIdleCount(1);
final PoolNodeObject node1= assertSinglePoolNode();
assertNull(node1.getClientLabel());
final RServi servi1= getServi("test1");
final String clientLabel= node1.getClientLabel();
assertTrue(clientLabel != null && clientLabel.startsWith("test1@"), "actual label: " + clientLabel);
closeServi(servi1);
assertNull(node1.getClientLabel());
}
@Test
public void PoolNode_getAllocationCount() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
poolConfig.setMinIdleCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
assertIdleCount(1);
final PoolNodeObject node1= assertSinglePoolNode();
assertEquals(0, node1.getAllocationCount(), "count");
final RServi servi1= getServi("test1");
assertEquals(1, node1.getAllocationCount(), "count");
closeServi(servi1);
assertEquals(1, node1.getAllocationCount(), "count");
}
@Test
public void PoolNode_getLastestAllocationDuration() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(1);
poolConfig.setMinIdleCount(1);
this.server.setPoolConfig(poolConfig);
this.server.start();
Duration actual;
assertIdleCount(1);
final PoolNodeObject node1= assertSinglePoolNode();
actual= node1.getLastestAllocationDuration();
assertNull(actual, "duration");
final long startNanos= System.nanoTime();
final RServi servi1= getServi("test1");
actual= node1.getLastestAllocationDuration();
assertNotNull(actual);
assertDurationEquals(Duration.ofNanos(System.nanoTime() - startNanos), actual,
TIME_TOLERANCE_MILLIS, "duration" );
Thread.sleep(3000);
actual= node1.getLastestAllocationDuration();
assertNotNull(actual);
assertDurationEquals(Duration.ofNanos(System.nanoTime() - startNanos), actual,
TIME_TOLERANCE_MILLIS, "duration" );
final long endNanos= System.nanoTime();
closeServi(servi1);
actual= node1.getLastestAllocationDuration();
assertNotNull(actual);
assertDurationEquals(Duration.ofNanos(endNanos - startNanos), actual,
TIME_TOLERANCE_MILLIS, "duration" );
}
@Test
public void PoolNode_evict() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMinIdleCount(0);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
final PoolNodeObject node1= assertSinglePoolNode();
closeServi(servi1);
node1.evict(null);
skipChecking(node1);
assertDisposingToDisposed(node1);
}
@Test
public void PoolNode_evictAllocated() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMinIdleCount(0);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
final PoolNodeObject node1= assertSinglePoolNode();
assertEquals(PoolNodeState.ALLOCATED, node1.getState());
node1.evict(null);
skipChecking(node1);
assertDisposingToDisposed(node1);
assertThrows(StatusException.class, () -> closeServi(servi1));
}
@Test
public void PoolNode_evictBusy() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMinIdleCount(0);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
final ProgressMonitor m= new NullProgressMonitor();
assertNodeOperative(servi1, m);
final var clientState= new AtomicInteger();
final var clientWorker= CompletableFuture.supplyAsync(() -> {
clientState.set(1);
try {
servi1.evalVoid("while (true) { Sys.sleep(1) }", m);
return null;
}
catch (final StatusException e) {
return e;
}
finally {
clientState.set(2);
}
});
while (clientState.get() == 0) {
Thread.sleep(100);
}
final PoolNodeObject node1= assertSinglePoolNode();
assertEquals(PoolNodeState.ALLOCATED, node1.getState());
node1.evict(null);
skipChecking(node1);
assertDisposingToDisposed(node1);
assertTrue((clientWorker.get(1, TimeUnit.SECONDS) instanceof StatusException));
assertThrows(StatusException.class, () -> closeServi(servi1));
}
@Test
public void PoolNode_evictCrashedEngine() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMinIdleCount(0);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
final PoolNodeObject node1= assertSinglePoolNode();
final Accessor nodeHandler= new Accessor(node1.getNodeHandler(), NodeHandler.class);
final Process process= nonNullAssert((Process)nodeHandler.getFieldObj("process"));
process.destroy();
node1.evict(null);
skipChecking(node1);
assertDisposingToDisposed(node1);
assertThrows(StatusException.class, () -> closeServi(servi1));
}
@Test
public void Bug570660() throws Exception {
final PoolConfig poolConfig= new PoolConfig();
poolConfig.setMaxTotalCount(2);
poolConfig.setMinIdleCount(2);
this.server.setPoolConfig(poolConfig);
this.server.start();
final RServi servi1= getServi("test1");
assertNodeOperative(servi1);
closeServi(servi1);
}
private PoolNodeObject assertSinglePoolNode() {
final var poolNodeObjects= this.server.getManager().getPoolNodeObjects();
assertEquals(1, poolNodeObjects.size());
return poolNodeObjects.iterator().next();
}
private void assertIdleCount(final int expected) throws InterruptedException {
final long t= System.nanoTime();
while (true) {
final PoolStatusMX poolStatus= this.server.getPoolStatus();
if (expected == poolStatus.getNumIdling()) {
return;
}
if (System.nanoTime() - t > WAIT_IDLE_DURATION.toNanos()) {
assertEquals(expected, poolStatus.getNumIdling(), "num idle");
}
Thread.sleep(100);
}
}
private void skipChecking(final PoolNodeObject node) throws InterruptedException {
awaitStateCondition(node, (state) -> (state != PoolNodeState.CHECKING), WAIT_IDLE_DURATION);
}
private void awaitStateCondition(final PoolNodeObject node,
final Predicate<PoolNodeState> condition, final Duration timeout) throws InterruptedException {
final long t= System.nanoTime();
while (true) {
if (condition.test(node.getState())) {
return;
}
if (System.nanoTime() - t > timeout.toNanos()) {
return;
}
Thread.sleep(100);
}
}
private void assertDisposingToDisposed(final PoolNodeObject node) throws InterruptedException {
{ final PoolNodeState state= node.getState();
if (state == PoolNodeState.DISPOSED) {
return;
}
assertEquals(PoolNodeState.DISPOSING, state, "state");
}
awaitStateCondition(node, (state) -> (state != PoolNodeState.DISPOSING), WAIT_IDLE_DURATION);
assertEquals(PoolNodeState.DISPOSED, node.getState(), "state");
}
private void assertDurationEquals(final Duration expected, final Duration actual,
final double deltaMillis, final String message) {
assertEquals(expected.toMillis(), actual.toMillis(), deltaMillis, message);
}
private void assertDurationEquals(final Duration expected, final Duration actual,
final double deltaMillis, final Supplier<String> messageSupplier) {
assertEquals(expected.toMillis(), actual.toMillis(), deltaMillis, messageSupplier);
}
}