blob: a33c13653f956d69fd0a43eaf542fa93804c1d9e [file] [log] [blame]
/****************************************************************************
* Copyright (c) 2005, 2010 Jan S. Rellermeyer, Systems Group,
* Department of Computer Science, ETH Zurich and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Jan S. Rellermeyer - initial API and implementation
* Markus Alexander Kuppe - enhancements and bug fixes
*
*****************************************************************************/
package ch.ethz.iks.slp.impl;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.net.BindException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.ProtocolException;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import ch.ethz.iks.slp.ServiceLocationException;
import ch.ethz.iks.slp.ServiceType;
/**
* the core class of the jSLP implementation.
* <code>ch.ethz.iks.slp.ServiceLocationManager</code> inherits from this
* class.
*
* @see ch.ethz.iks.slp.impl.ServiceLocationManager
* @author Jan S. Rellermeyer, IKS, ETH Zurich
* @since 0.6
*/
public abstract class SLPCore {
private static volatile boolean isMulticastSocketInitialized = false;
private static volatile boolean isInitialized = false;
protected static PlatformAbstraction platform;
/**
* the default empty locale. Used for messages that don't specify a locale.
*/
static final Locale DEFAULT_LOCALE = Locale.getDefault();
/**
* the port for SLP communication.
*/
static final int SLP_PORT;
/**
* the reserved (standard) port.
*/
static final int SLP_RESERVED_PORT = 427;
/**
* the standard SLP multicast address.
*/
static final String SLP_MCAST_ADDRESS = "239.255.255.253";
/**
*
*/
static final InetAddress MCAST_ADDRESS;
/**
* the SLP configuration.
*/
static final SLPConfiguration CONFIG;
/**
* currently only for debugging.
*/
static final boolean TCP_ONLY = false;
/**
* the standard service type for DAs.
*/
static final String SLP_DA_TYPE = "service:directory-agent";
/**
* my own ip. Used to check if this peer is already in the previous
* responder list.
*/
static String[] myIPs;
/**
* configured to perform no DA discovery ?
*/
static final boolean noDiscovery;
/**
* the constructor for <code>Advertiser</code> instances, if an
* implementation exists.
*/
protected static final Constructor advertiser;
/**
* the constructor for <code>Locator</code> instances, if an
* implementation exists.
*/
protected static final Constructor locator;
/**
* the constructor for <code>SLPDaemon</code> instances, if an
* implementation exists.
*/
private static final Constructor daemonConstr;
/**
* the daemon instance, if the implementation exists and no other daemon is
* already running on the machine.
*/
private static SLPDaemon daemon;
/**
* the multicast server thread.
*/
private static Thread multicastThread;
/**
* the multicast socket.
*/
private static MulticastSocket mtcSocket;
/**
* the next free XID.
*/
private static short nextXid;
/**
* used to asynchronously receive replies. query XID -> reply queue (List)
*/
private static Map replyListeners = new HashMap();
/**
* Map of DAs:
*
* String scope -> list of Strings of DA URLs.
*/
static Map dAs = new HashMap();
/**
* Map of DA SPIs:
*
* String DA URL -> String String.
*/
static Map dASPIs = new HashMap(); // DA URL -> List of SPIs
static InetAddress LOCALHOST;
/**
* initialize the core class.
*/
static {
try {
LOCALHOST = InetAddress.getLocalHost();
} catch (Throwable t) {
t.printStackTrace();
}
final Class[] locale = new Class[] { Locale.class };
// check, if an Advertiser implementation is available
Constructor constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.AdvertiserImpl")
.getConstructor(locale);
} catch (Exception e) {
}
advertiser = constr;
// check, if a Locator implementation is available
constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.LocatorImpl")
.getConstructor(locale);
} catch (Exception e) {
}
locator = constr;
// check, if a Daemon is available
constr = null;
try {
constr = Class.forName("ch.ethz.iks.slp.impl.SLPDaemonImpl")
.getConstructor(null);
} catch (Exception e) {
}
daemonConstr = constr;
// read in the property file, if it exists
File propFile = new File("jslp.properties");
SLPConfiguration config;
try {
config = propFile.exists() ? new SLPConfiguration(propFile)
: new SLPConfiguration();
} catch (IOException e1) {
System.out.println("Could not parse the property file" + propFile.toString());
e1.printStackTrace();
config = new SLPConfiguration();
}
CONFIG = config;
noDiscovery = CONFIG.getNoDaDiscovery();
// determine the interfaces on which jSLP runs on
final List ips = new ArrayList();
String[] IPs = CONFIG.getInterfaces();
if (IPs == null) {
InetAddress[] addresses = null;
try {
addresses = InetAddress.getAllByName(InetAddress.getLocalHost()
.getHostName());
for (int i = 0; i < addresses.length; i++) {
// https://bugs.eclipse.org/328074
if(addresses[i] instanceof Inet6Address) {
System.err.println("No support for IPv6 in jSLP yet (see https://bugs.eclipse.org/328074), skipping interface...");
continue;
}
ips.add(addresses[i].getHostAddress());
}
} catch (UnknownHostException e) {
System.err
.println("Reverse lookup of host name failed. Running service discovery on localloop.");
try {
ips.clear();
ips.add(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
}
} else {
ips.addAll(Arrays.asList(IPs));
}
myIPs = (String[]) ips.toArray(new String[ips.size()]);
SLP_PORT = CONFIG.getPort();
// initialize the XID with a random number
nextXid = (short) Math.round(Math.random() * Short.MAX_VALUE);
InetAddress mcast = null;
try {
mcast = InetAddress.getByName(SLPCore.SLP_MCAST_ADDRESS);
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
MCAST_ADDRESS = mcast;
}
protected static void init() {
if(isInitialized) {
return;
}
isInitialized = true;
platform.logDebug("jSLP is running on the following interfaces: "
+ java.util.Arrays.asList(myIPs));
platform.logDebug("jSLP is using port: " + SLP_PORT);
String[] daAddresses = CONFIG.getDaAddresses();
if (daAddresses == null) {
if (noDiscovery) {
throw new IllegalArgumentException(
"Configuration 'net.slp.noDaDiscovery=true' requires a non-empty list of preconfigured DAs");
}
} else {
try {
// process the preconfigured DAs
final ServiceRequest req = new ServiceRequest(new ServiceType(
SLP_DA_TYPE), null, null, null);
req.port = SLP_PORT;
for (int i = 0; i < daAddresses.length; i++) {
try {
req.address = InetAddress.getByName(daAddresses[i]);
DAAdvertisement daa = (DAAdvertisement) sendMessage(
req, true);
String[] scopes = (String[]) daa.scopeList
.toArray(new String[daa.scopeList.size()]);
for (int j = 0; j < scopes.length; j++) {
platform.logDebug("jSLP is adding DA, "
+ daAddresses[i] + " for the Scope, "
+ scopes[j]);
SLPUtils.addValue(dAs, scopes[i].toLowerCase(), daAddresses[i]);
}
} catch (ServiceLocationException e) {
platform.logWarning("Error communitcating with " + daAddresses[i], e);
} catch (UnknownHostException e) {
platform.logWarning("Unknown net.slp.DAAddresses address: " + daAddresses[i], e);
}
}
} catch (IllegalArgumentException ise) {
platform.logDebug("May never happen", ise);
}
}
if (!noDiscovery) {
// perform an initial lookup
try {
List scopes = new ArrayList();
scopes.add("default");
daLookup(scopes);
} catch (Exception e) {
platform.logError("Exception in initial DA lookup", e);
}
}
}
// a pure UA doesn't need a multicast listener which is only required by a SA or DA
protected static void initMulticastSocket() {
if(isMulticastSocketInitialized) {
return;
}
isMulticastSocketInitialized = true;
try {
mtcSocket = new MulticastSocket(SLP_PORT);
mtcSocket.setTimeToLive(CONFIG.getMcastTTL());
if (CONFIG.getInterfaces() != null) {
try {
mtcSocket.setInterface(InetAddress.getByName(myIPs[0]));
} catch (Throwable t) {
platform.logDebug("Setting multicast socket interface to " + myIPs[0] + " failed.",t);
}
}
mtcSocket.joinGroup(MCAST_ADDRESS);
} catch (BindException be) {
platform.logError(be.getMessage(), be);
throw new RuntimeException("You have to be root to open port "
+ SLP_PORT);
} catch (Exception e) {
platform.logError(e.getMessage(), e);
}
// setup and start the multicast thread
multicastThread = new Thread() {
public void run() {
DatagramPacket packet;
byte[] bytes = new byte[SLPCore.CONFIG.getMTU()];
while (true) {
try {
packet = new DatagramPacket(bytes, bytes.length);
mtcSocket.receive(packet);
final SLPMessage reply = handleMessage(SLPMessage
.parse(packet.getAddress(), packet.getPort(),
new DataInputStream(
new ByteArrayInputStream(packet
.getData())), false));
if (reply != null) {
final byte[] repbytes = reply.getBytes();
DatagramPacket datagramPacket = new DatagramPacket(
repbytes, repbytes.length, reply.address,
reply.port);
mtcSocket.send(datagramPacket);
platform.logDebug("SEND (" + reply.address
+ ":" + reply.port + ") "
+ reply.toString());
}
} catch (Exception e) {
platform
.logError(
"Exception in Multicast Receiver Thread",
e);
}
}
}
};
multicastThread.start();
// check, if there is already a SLP daemon runnung on port 427
// that can be either a jSLP daemon, or an OpenSLP daemon or something
// else. If not, try to start a new daemon instance.
if (daemonConstr != null) {
try {
daemon = (SLPDaemon) daemonConstr.newInstance(null);
} catch (Exception e) {
platform.logWarning("jSLP has not created a SLPDaemon", e);
daemon = null;
}
}
}
/**
* get my own IP.
*
* @return the own IP.
*/
static InetAddress getMyIP() {
try {
return InetAddress.getByName(myIPs[0]);
} catch (UnknownHostException e) {
platform.logError("Unknown net.slp.interfaces address: " + myIPs[0], e);
return null;
}
}
/**
* get the list of all available scopes.
*
* @return a List of all available scopes. RFC 2614 proposes
* <code>Vector</code> but jSLP returns <code>List</code>.
* @throws ServiceLocationException
* in case of an exception in the underlying framework.
*/
public static List findScopes() throws ServiceLocationException {
return new ArrayList(dAs.keySet());
}
/**
* handle incoming UDP messages.
*
* @param message
* the incoming message.
* @throws ServiceLocationException
* if something goes wrong.
*/
private static SLPMessage handleMessage(final SLPMessage message)
throws ServiceLocationException {
if (message == null) {
return null;
}
platform.logTraceMessage("RECEIVED (" + message.address + ":"
+ message.port + ") " + message);
switch (message.funcID) {
case SLPMessage.DAADVERT:
// drop message, if noDADiscovery is set
if (noDiscovery) {
platform.logTraceDrop("DROPPED (" + message.address + ":"
+ message.port + ") " + message.toString()
+ "(reason: noDADiscovery is set");
return null;
}
DAAdvertisement advert = (DAAdvertisement) message;
if (advert.errorCode != 0) {
platform.logTraceDrop("DROPPED DAADvertisement (" + advert.address + ":"
+ advert.port + ") " + advert.toString()
+ "(reason: " + advert.errorCode + " != 0");
return null;
}
if (advert.url != advert.address.getHostAddress()) {
advert.url = advert.address.getHostAddress();
}
// statelessBootTimestamp = 0 means DA is going down
if (advert.statelessBootTimestamp == 0) {
for (Iterator iter = advert.scopeList.iterator(); iter
.hasNext();) {
String scope = ((String) iter.next()).toLowerCase();
SLPUtils.removeValue(SLPCore.dAs, scope.toLowerCase(), advert.url);
dASPIs.remove(advert.url);
}
} else {
for (Iterator iter = advert.scopeList.iterator(); iter
.hasNext();) {
String scope = ((String) iter.next()).toLowerCase();
// If OpenSLP would strictly follow RFC 2608,
// it should only send a new statelessBootTimestamp
// if it was really rebooted or has lost
// registrations for other reasons.
// But it looks like OpenSLP sends a new sBT whenever
// it sends a DAADVERT so we will just reregister
// all of our services if we receive a new DAADVERT
// not caring for the sBT at all.
SLPUtils.addValue(SLPCore.dAs, scope, advert.url);
if (CONFIG.getSecurityEnabled()) {
dASPIs.put(advert.url, SLPMessage.stringToList(
advert.spi, ","));
}
}
synchronized (dAs) {
dAs.notifyAll();
}
// if there is a daemon instance, inform it about the discovered
// DA
if (daemon != null) {
daemon.newDaDiscovered(advert);
}
}
platform.logDebug("NEW DA LIST: " + dAs);
return null;
// reply messages
case SLPMessage.ATTRRPLY:
case SLPMessage.SRVRPLY:
case SLPMessage.SRVTYPERPLY:
synchronized (replyListeners) {
List queue = (List) replyListeners
.get(new Integer(message.xid));
if (queue != null) {
synchronized (queue) {
queue.add(message);
queue.notifyAll();
}
return null;
} else {
platform.logTraceReg("SRVTYPEREPLY recieved ("
+ message.address + ":" + message.port + ") "
+ message.toString()
+ " but not replyListeners present anymore");
}
}
return null;
// request messages
case SLPMessage.SRVRQST:
case SLPMessage.ATTRRQST:
case SLPMessage.SRVTYPERQST:
// silently drop messages where this peer is in the previous
// responder list
for (int i = 0; i < SLPCore.myIPs.length; i++) {
if (((RequestMessage) message).prevRespList
.contains(SLPCore.myIPs[i])) {
platform.logTraceDrop("DROPPED (" + message.address + ":"
+ message.port + ") " + message.toString()
+ "(udp multicast)");
return null;
}
}
// if we have a daemon instance, delegate the
// message to the daemon.
if (daemon != null) {
return daemon.handleMessage(message);
} else {
platform.logDebug("SRVTYPERQST recieved ("
+ message.address + ":" + message.port + ") "
+ message.toString()
+ " but no SLPDaemon to handle the message present");
return null;
}
default:
// if we have a daemon instance, delegate all other
// messages to the daemon.
if (daemon != null) {
return daemon.handleMessage(message);
} else {
platform.logDebug("A message recieved ("
+ message.address + ":" + message.port + ") "
+ message.toString()
+ " but no SLPDaemon to handle the message present");
return null;
}
}
}
/**
* get the next XID.
*
* @return the next XID.
*/
static short nextXid() {
if (nextXid == 0) {
nextXid = 1;
}
return nextXid++;
}
/**
* find DAs for the scopes by sending a multicast service request for
* service <i>service:directory-agent</i>.
*
* @param scopes
* a <code>List</code> of scopes.
* @throws ServiceLocationException
* in case of network errors.
*/
static void daLookup(final List scopes) throws ServiceLocationException {
int i = 0;
try {
// change by TomoTherapy Inc
// added loop for each IP for each interface
// used 1.4 SocketAddress
// altered by Jan to be backwards compatible with Java 2
for (; i < myIPs.length; i++) {
// create a socket bound to the next ip address
final InetAddress addr = InetAddress.getByName(myIPs[i]);
DatagramSocket socket = new DatagramSocket(0, addr);
ServiceRequest sreq = new ServiceRequest(new ServiceType(
SLP_DA_TYPE), scopes, null, SLPCore.DEFAULT_LOCALE);
sreq.xid = SLPCore.nextXid();
sreq.scopeList = scopes;
sreq.address = MCAST_ADDRESS;
sreq.multicast = true;
byte[] bytes = sreq.getBytes();
DatagramPacket d = new DatagramPacket(bytes, bytes.length,
MCAST_ADDRESS, SLP_PORT);
platform.logTraceMessage("SENT " + sreq + "(udp multicast)");
setupReceiverThread(socket, CONFIG.getWaitTime(), sreq);
try {
socket.send(d);
} catch (SocketException se) {
// blacklist address
final List remaining = new ArrayList(java.util.Arrays
.asList(myIPs));
final String faulty = myIPs[i];
remaining.remove(faulty);
myIPs = (String[]) remaining.toArray(new String[remaining
.size()]);
platform.logDebug("Blacklisting IP " + faulty);
}
}
} catch (IllegalArgumentException ise) {
platform.logDebug("May never happen, no filter set", ise);
} catch (UnknownHostException uhe) {
platform.logWarning("Unknown net.slp.interfaces address: " + myIPs[i], uhe);
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, uhe.getMessage());
} catch (IOException e) {
platform.logWarning("Error connecting to: " + myIPs[i], e);
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, e.getMessage());
}
}
/**
* send a unicast message over TCP.
*
* @param msg
* the message.
* @return the reply.
* @throws ServiceLocationException
* in case of network errors.
*/
static ReplyMessage sendMessageTCP(final SLPMessage msg)
throws ServiceLocationException {
try {
if (msg.xid == 0) {
msg.xid = nextXid();
}
Socket socket = new Socket(msg.address, msg.port);
socket.setSoTimeout(CONFIG.getTCPTimeout());
DataOutputStream out = new DataOutputStream(socket
.getOutputStream());
DataInputStream in = new DataInputStream(socket.getInputStream());
out.write(msg.getBytes());
final ReplyMessage reply = (ReplyMessage) SLPMessage.parse(
msg.address, msg.port, in, true);
socket.close();
return reply;
} catch (Exception e) {
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, e.getMessage());
}
}
/**
* send a unicast message over UDP.
*
* @param msg
* the message to be sent.
* @param expectReply
* waits for a reply if set to true.
* @return the reply.
* @throws ServiceLocationException
* in case of network errors etc.
*/
static ReplyMessage sendMessage(final SLPMessage msg,
final boolean expectReply) throws ServiceLocationException {
if (msg.xid == 0) {
msg.xid = nextXid();
}
if (msg.getSize() > CONFIG.getMTU() || TCP_ONLY || (daemon == null && isMulticastSocketInitialized)) {
return sendMessageTCP(msg);
}
try {
DatagramSocket dsocket = new DatagramSocket();
dsocket.setSoTimeout(CONFIG.getDatagramMaxWait());
byte[] bytes = msg.getBytes();
DatagramPacket packet = new DatagramPacket(bytes, bytes.length,
msg.address, msg.port);
byte[] receivedBytes = new byte[CONFIG.getMTU()];
DatagramPacket received = new DatagramPacket(receivedBytes,
receivedBytes.length);
dsocket.send(packet);
platform.logTraceMessage("SENT (" + msg.address + ":" + msg.port + ") "
+ msg + " (via udp port " + dsocket.getLocalPort()
+ ")");
// if no reply is expected, return
if (!expectReply) {
return null;
}
dsocket.receive(received);
dsocket.close();
final DataInputStream in = new DataInputStream(
new ByteArrayInputStream(received.getData()));
ReplyMessage reply = (ReplyMessage) SLPMessage.parse(received
.getAddress(), received.getPort(), in, false);
return reply;
} catch (SocketException se) {
throw new ServiceLocationException(
ServiceLocationException.NETWORK_INIT_FAILED, se
.getMessage());
} catch (ProtocolException pe) {
// Overflow, retry with TCP
return sendMessageTCP(msg);
} catch (IOException ioe) {
platform.logError("Exception during sending of " + msg);
platform.logError("to " + msg.address + ":" + msg.port);
platform.logError("Exception:", ioe);
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, ioe.getMessage());
} catch (Throwable t) {
platform.logDebug(t.getMessage(), t);
throw new ServiceLocationException((short) 1, t.getMessage());
}
}
/**
* send a request via multicast convergence algorithm.
*
* @param msg
* the message.
* @return the collected reply messages.
* @throws ServiceLocationException
* in case of network errors.
*/
static List multicastConvergence(final RequestMessage msg)
throws ServiceLocationException {
try {
long start = System.currentTimeMillis();
List replyQueue = new ArrayList();
List responders = new ArrayList();
List responses = new ArrayList();
if (msg.xid == 0) {
msg.xid = SLPCore.nextXid();
}
// register the reply queue as listener
Integer queryXID = new Integer(msg.xid);
synchronized (replyListeners) {
replyListeners.put(queryXID, replyQueue);
}
msg.port = SLPCore.SLP_PORT;
msg.prevRespList = new ArrayList();
msg.multicast = true;
// send to localhost, in case the OS does not support multicast over
// loopback which can fail if no SA is running locally
msg.address = LOCALHOST;
try {
replyQueue.add(sendMessageTCP(msg));
} catch (ServiceLocationException e) {
if(e.getErrorCode() != ServiceLocationException.NETWORK_ERROR) {
throw e;
}
}
msg.address = MCAST_ADDRESS;
ReplyMessage reply;
for (int i = 0; i < myIPs.length; i++) {
// create a socket bound to the next ip address
final InetAddress addr = InetAddress.getByName(myIPs[i]);
final MulticastSocket socket = new MulticastSocket();
socket.setInterface(addr);
socket.setTimeToLive(CONFIG.getMcastTTL());
setupReceiverThread(socket, CONFIG.getMcastMaxWait(), msg);
// the multicast convergence algorithm
long totalTimeout = System.currentTimeMillis()
+ CONFIG.getMcastMaxWait();
int[] transmissionSchedule = SLPCore.CONFIG.getMcastTimeouts();
int retryCounter = 0;
long nextTimeout;
int failCounter = 0;
boolean seenNew = false;
boolean seenLocalResponse = false;
nextTimeout = System.currentTimeMillis()
+ transmissionSchedule[retryCounter];
while (!Thread.currentThread().isInterrupted()
&& totalTimeout > System.currentTimeMillis()
&& nextTimeout > System.currentTimeMillis()
&& retryCounter < transmissionSchedule.length
&& failCounter < CONFIG.getConvergenceFailerCount()) {
msg.prevRespList = responders;
byte[] message = msg.getBytes();
// finish convergence in case of message size exeeds MTU
if (message.length > CONFIG.getMTU()) {
break;
}
// send the message
DatagramPacket p = new DatagramPacket(message,
message.length, InetAddress
.getByName(SLP_MCAST_ADDRESS), SLP_PORT);
try {
socket.send(p);
} catch (IOException ioe) {
break;
}
platform.logTraceMessage("SENT " + msg);
/**
* @fix: bug #1518729. Changed processing of the replyQueue.
* Thanks to Richard Reid for figuring out the problem
* with multicast replies and proposing the fix
*/
try {
Thread.sleep(transmissionSchedule[retryCounter]);
} catch (InterruptedException dontcare) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
synchronized (replyQueue) {
// did something else wake us up ?
if (replyQueue.isEmpty()) {
failCounter++;
nextTimeout = System.currentTimeMillis()
+ transmissionSchedule[retryCounter++];
continue;
}
while (!replyQueue.isEmpty()) {
reply = (ReplyMessage) replyQueue.remove(0);
// silently drop duplicate responses, process only
// new
// results
if (!responders.contains(reply.address
.getHostAddress())) {
if (isLocalResponder(reply.address)) {
if (seenLocalResponse) {
continue;
} else {
seenLocalResponse = true;
}
}
seenNew = true;
responders.add(reply.address.getHostAddress());
responses.addAll(reply.getResult());
}
}
if (!seenNew) {
failCounter++;
} else {
seenNew = false;
}
}
nextTimeout = System.currentTimeMillis()
+ transmissionSchedule[retryCounter++];
}
}
// we are done, remove the listener queue
synchronized (replyListeners) {
replyListeners.remove(queryXID);
}
platform.logDebug("convergence for xid=" + msg.xid
+ " finished after "
+ (System.currentTimeMillis() - start)
+ " ms, result: " + responses);
return responses;
} catch (IOException ioe) {
platform.logDebug(ioe.getMessage(), ioe);
throw new ServiceLocationException(
ServiceLocationException.NETWORK_ERROR, ioe.getMessage());
}
}
private static boolean isLocalResponder(InetAddress addr) {
for (int i = 0; i < SLPCore.myIPs.length; i++) {
if (addr.getHostAddress().equals(SLPCore.myIPs[i])) {
return true;
}
}
return false;
}
/**
* setup a new receiver thread for a socket.
*
* @param socket
* the <code>DatagramSocket</code> for which the receiver
* thread is set up.
* @param minLifetime
* the minimum lifetime of the receiver thread.
*/
private static void setupReceiverThread(final DatagramSocket socket,
final long minLifetime, final SLPMessage msg) {
new Thread() {
public void run() {
// prepare an empty datagram for receiving
DatagramPacket packet;
byte[] bytes = new byte[SLPCore.CONFIG.getMTU()];
// calculate the end of lifetime
long timeout = System.currentTimeMillis() + minLifetime + 1000;
// while lifetime is not expired
while (System.currentTimeMillis() < timeout) {
// set socket timeout
try {
long l = timeout - System.currentTimeMillis();
int soTimeout = (int) (l < 0 ? 1 : l);
socket.setSoTimeout(soTimeout);
} catch (SocketException e1) {
platform.logError(
"Exception in mcast receiver thread", e1);
return;
}
packet = new DatagramPacket(bytes, bytes.length);
try {
// try to receive a datagram packet
socket.receive(packet);
} catch (InterruptedIOException iioe) {
continue;
} catch (IOException e) {
platform.logDebug(e.getMessage(), e);
return;
}
final DataInputStream in = new DataInputStream(
new ByteArrayInputStream(packet.getData()));
try {
// and delegate it to the SLPCore
handleMessage(SLPMessage.parse(packet.getAddress(),
packet.getPort(), in, false));
} catch (ProtocolException pe) {
// Overflow, try to use TCP
try {
msg.address = packet.getAddress();
msg.port = packet.getPort();
msg.multicast = false;
handleMessage(sendMessageTCP(msg));
} catch (ServiceLocationException e) {
}
} catch (ServiceLocationException e) {
platform.logDebug(e.getMessage(), e);
}
}
// close the socket
socket.close();
}
}.start();
}
}