blob: 449bb4d7c914bd1900b0c1bc5c79396a9c6e1fa5 [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package io.moquette.spi.impl;
import io.moquette.BrokerConstants;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.IResourceLoader;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.impl.security.ACLFileParser;
import io.moquette.spi.impl.security.AcceptAllAuthenticator;
import io.moquette.spi.impl.security.DenyAllAuthorizator;
import io.moquette.spi.impl.security.PermitAllAuthorizator;
import io.moquette.spi.impl.security.ResourceAuthenticator;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.persistence.MapDBPersistentStore;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import org.osgi.framework.BundleContext;
import org.sensinact.mqtt.server.internal.SensiNactServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/**
* It's main responsibility is bootstrap the ProtocolProcessor.
*
* @author andrea
*/
public class SensiNactProtocolProcessorBootstrapper {
private static final Logger LOG = LoggerFactory.getLogger(SensiNactProtocolProcessorBootstrapper.class);
private SubscriptionsStore subscriptions;
private MapDBPersistentStore m_mapStorage;
private ISessionsStore m_sessionsStore;
private BrokerInterceptor m_interceptor;
private final ProtocolProcessor m_processor;
public SensiNactProtocolProcessorBootstrapper(BundleContext bundleContext) {
this.m_processor = new SensiNactProtocolProcessor(bundleContext);
}
/**
* Initialize the processing part of the broker.
*
* @param props the properties carrier where some props like port end host could be loaded.
* For the full list check of configurable properties check moquette.conf file.
* @param embeddedObservers a list of callbacks to be notified of certain events inside the broker.
* Could be empty list of null.
* @param authenticator an implementation of the authenticator to be used, if null load that specified in config
* and fallback on the default one (permit all).
* @param authorizator an implementation of the authorizator to be used, if null load that specified in config
* and fallback on the default one (permit all).
* @param server the serber to init.
* @return the processor created for the broker.
*/
public ProtocolProcessor init(IConfig props, List<? extends InterceptHandler> embeddedObservers, IAuthenticator authenticator, IAuthorizator authorizator, SensiNactServer server) {
subscriptions = new SubscriptionsStore();
m_mapStorage = new MapDBPersistentStore(props);
m_mapStorage.initStore();
IMessagesStore messagesStore = m_mapStorage.messagesStore();
m_sessionsStore = m_mapStorage.sessionsStore();
List<InterceptHandler> observers = new ArrayList<InterceptHandler>(embeddedObservers);
String interceptorClassName = props.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (interceptorClassName != null && !interceptorClassName.isEmpty()) {
try {
InterceptHandler handler;
try {
final Constructor<? extends InterceptHandler> constructor = Class.forName(interceptorClassName).asSubclass(InterceptHandler.class).getConstructor(SensiNactServer.class);
handler = constructor.newInstance(server);
} catch (NoSuchMethodException nsme) {
handler = Class.forName(interceptorClassName).asSubclass(InterceptHandler.class).newInstance();
}
observers.add(handler);
} catch (Throwable ex) {
LOG.error("Can't load the intercept handler {}", ex);
}
}
m_interceptor = new BrokerInterceptor(observers);
subscriptions.init(m_sessionsStore);
String configPath = System.getProperty("moquette.path", null);
String authenticatorClassName = props.getProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, "");
if (!authenticatorClassName.isEmpty()) {
authenticator = (IAuthenticator) loadClass(authenticatorClassName, IAuthenticator.class, props);
LOG.info("Loaded custom authenticator {}", authenticatorClassName);
}
IResourceLoader resourceLoader = props.getResourceLoader();
if (authenticator == null) {
String passwdPath = props.getProperty(BrokerConstants.PASSWORD_FILE_PROPERTY_NAME, "");
if (passwdPath.isEmpty()) {
authenticator = new AcceptAllAuthenticator();
} else {
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath);
}
}
String authorizatorClassName = props.getProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, "");
if (!authorizatorClassName.isEmpty()) {
authorizator = (IAuthorizator) loadClass(authorizatorClassName, IAuthorizator.class, props);
LOG.info("Loaded custom authorizator {}", authorizatorClassName);
}
if (authorizator == null) {
String aclFilePath = props.getProperty(BrokerConstants.ACL_FILE_PROPERTY_NAME, "");
if (aclFilePath != null && !aclFilePath.isEmpty()) {
authorizator = new DenyAllAuthorizator();
try {
authorizator = ACLFileParser.parse(resourceLoader.loadResource(aclFilePath));
} catch (ParseException pex) {
LOG.error(String.format("Format error in parsing acl %s %s", resourceLoader.getName(), aclFilePath), pex);
}
LOG.info("Using acl file defined at path {}", aclFilePath);
} else {
authorizator = new PermitAllAuthorizator();
LOG.info("Starting without ACL definition");
}
}
boolean allowAnonymous = Boolean.parseBoolean(props.getProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "true"));
boolean allowZeroByteClientId = Boolean.parseBoolean(props.getProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, "false"));
m_processor.init(subscriptions, messagesStore, m_sessionsStore, authenticator, allowAnonymous, allowZeroByteClientId, authorizator, m_interceptor, props.getProperty(BrokerConstants.PORT_PROPERTY_NAME));
return m_processor;
}
private Object loadClass(String className, Class<?> cls, IConfig props) {
Object instance = null;
try {
Class<?> clazz = Class.forName(className);
// check if method getInstance exists
Method method = clazz.getMethod("getInstance", new Class[]{});
try {
instance = method.invoke(null, new Object[]{});
} catch (IllegalArgumentException ex) {
LOG.error(null, ex);
throw new RuntimeException("Cannot call method " + className + ".getInstance", ex);
} catch (InvocationTargetException ex) {
LOG.error(null, ex);
throw new RuntimeException("Cannot call method " + className + ".getInstance", ex);
} catch (IllegalAccessException ex) {
LOG.error(null, ex);
throw new RuntimeException("Cannot call method " + className + ".getInstance", ex);
}
} catch (NoSuchMethodException nsmex) {
try {
// check if constructor with IConfig parameter exists
instance = this.getClass().getClassLoader().loadClass(className).asSubclass(cls).getConstructor(IConfig.class).newInstance(props);
} catch (Exception e) {
try {
// fallback to default constructor
instance = this.getClass().getClassLoader().loadClass(className).asSubclass(cls).newInstance();
} catch (Exception ex) {
LOG.error(null, ex);
throw new RuntimeException("Cannot load custom authenticator class " + className, ex);
}
}
} catch (ClassNotFoundException ex) {
LOG.error(null, ex);
throw new RuntimeException("Class " + className + " not found", ex);
} catch (SecurityException ex) {
LOG.error(null, ex);
throw new RuntimeException("Cannot call method " + className + ".getInstance", ex);
}
return instance;
}
public List<Subscription> getSubscriptions() {
return m_sessionsStore.getSubscriptions();
}
public void shutdown() {
this.m_mapStorage.close();
}
}