blob: 4b4e13fba9a63f0f8f4f220664fa51ecde41d8dd [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2012 Draeger Medical GmbH (http://www.draeger.com).
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* CONTRIBUTORS:
* Peter Karlitschek (initial contribution)
*
*******************************************************************************/
#include "common/messaging/MessageSeQueue.h"
#include "common/messaging/MessageService.h"
#include "common/messaging/MessageServiceController.h"
#include "common/messaging/RTServices.h"
#include "osal/etTime.h"
#include <new>
namespace etRuntime {
MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, int node, int thread, const String& name,
IMessageMemory* memory, int priority) :
RTObject(parent, name),
m_running(false),
m_execMode(mode),
m_lastMessageTimestamp(0),
m_address(node, thread, 0),
m_messageQueue(this, "Queue"),
m_messageDispatcher(this, m_address.createInc(), "Dispatcher"),
m_messageMemory(memory) {
etTime interval;
interval.sec = 0;
interval.nSec = 0;
MessageService_init(interval, priority);
}
MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, etTime interval, int node, int thread,
const String& name, IMessageMemory* memory, int priority) :
RTObject(parent, name),
m_running(false),
m_execMode(mode),
m_lastMessageTimestamp(0),
m_address(node, thread, 0),
m_messageQueue(this, "Queue"),
m_messageDispatcher(this, m_address.createInc(), "Dispatcher"),
m_messageMemory(memory) {
MessageService_init(interval, priority);
}
void MessageService::MessageService_init(etTime interval, int priority) {
/* init mutexes and semaphores */
etMutex_construct(&m_mutex);
etSema_construct(&m_executionSemaphore);
/* init thread */
etThread_construct(&m_thread, static_cast<etStacksize>(1024), static_cast<etPriority>(priority), (etThreadname)"MessageService",
MessageService::run, static_cast<void *>(this));
// check and set priority
// assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)");
// assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)");
// this.setPriority(priority);
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
/* init timer */
etTimer_construct(&m_timer, &interval, MessageService::pollingTask, static_cast<void *>(this));
}
}
MessageService::~MessageService() {
while(m_messageQueue.getSize() > 0) {
const Message* msg = m_messageQueue.pop();
msg->~Message();
returnMessageBuffer(msg);
}
etMutex_destruct(&m_mutex);
etSema_destruct(&m_executionSemaphore);
etThread_destruct(&m_thread);
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
etTimer_destruct(&m_timer);
}
delete m_messageMemory;
}
void MessageService::start() {
m_running = true;
etThread_start(&m_thread);
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
etTimer_start(&m_timer);
}
}
void MessageService::run() {
while (m_running) {
etMutex_enter(&m_mutex);
const Message* msg = m_messageQueue.pop(); // get next Message from Queue
etMutex_leave(&m_mutex);
if (msg == 0) {
// no message in queue -> wait till Thread is notified
etSema_waitForWakeup(&m_executionSemaphore);
} else {
//TODO: set timestamp
// m_lastMessageTimestamp = System.currentTimeMillis();
m_messageDispatcher.receive(msg);
}
}
RTServices::getInstance().getMsgSvcCtrl().setMsgSvcTerminated(*this);
}
void MessageService::receive(const Message* msg) {
etMutex_enter(&m_mutex);
if (msg != 0) {
m_messageQueue.push(const_cast<Message*>(msg));
}
etSema_wakeup(&m_executionSemaphore);
etMutex_leave(&m_mutex);
}
Address MessageService::getFreeAddress() {
etMutex_enter(&m_mutex);
Address address = m_messageDispatcher.getFreeAddress();
etMutex_leave(&m_mutex);
return address;
}
void MessageService::freeAddress(const Address& addr) {
etMutex_enter(&m_mutex);
m_messageDispatcher.freeAddress(addr);
etMutex_leave(&m_mutex);
}
void MessageService::addMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
m_messageDispatcher.addMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::removeMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
m_messageDispatcher.removeMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::addPollingMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
m_messageDispatcher.addPollingMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::removePollingMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
m_messageDispatcher.removePollingMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
Message* MessageService::getMessageBuffer(int size) {
etMutex_enter(&m_mutex);
Message* buffer = m_messageMemory->getMessageBuffer(size);
etMutex_leave(&m_mutex);
return buffer;
}
void MessageService::returnMessageBuffer(const Message* buffer) {
etMutex_enter(&m_mutex);
m_messageMemory->returnMessageBuffer(buffer);
etMutex_leave(&m_mutex);
}
String MessageService::toString() const {
return getName() + " " + getAddress().toID().c_str();
}
void MessageService::terminate() {
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
etTimer_stop(&m_timer);
}
if (m_running) {
m_running = false;
etSema_wakeup(&m_executionSemaphore);
}
}
// called by osal timer, thread ?
void MessageService::pollingTask() {
if (m_running) {
Message* pollingMessage = getMessageBuffer(sizeof(Message));
if (pollingMessage) {
new (pollingMessage) Message(m_messageDispatcher.getAddress(), 0);
receive(pollingMessage);
}
else {
// TODO JB: error handling for pollingMessage == NULL
}
}
}
} /* namespace etRuntime */