blob: b1015e63230fcdd021abcda6522498d2d9e631c8 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2020 Ericsson Telecom AB
//
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v2.0
// which accompanies this distribution, and is available at
// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html
///////////////////////////////////////////////////////////////////////////////
// File: EPTF_MQTT_LocalTransport_Functions.ttcn
// Description:
// Rev: <RnXnn>
// Prodnr: CNL 113 860
// Updated: 2020-01-07
// Contact: http://ttcn.ericsson.se
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
// Module: EPTF_MQTT_LocalTransport_Functions
//
// Purpose:
// This module contains the functions of the MQTT local transport component
//
// See also:
// <EPTF_MQTT_LocalTransport_Definitions>
///////////////////////////////////////////////////////////////
module EPTF_MQTT_LocalTransport_Functions
{
import from IPL4asp_Types all;
import from EPTF_CLL_Base_Functions all;
import from EPTF_CLL_Common_Definitions all;
import from EPTF_CLL_Logging_Definitions all;
import from EPTF_CLL_Logging_Functions all;
import from EPTF_CLL_HashMapStr2Int_Functions all;
import from EPTF_CLL_HashMapInt2Int_Functions all;
import from EPTF_CLL_FBQ_Functions all;
import from EPTF_CLL_TransportCommPortIPL4_Functions all;
import from EPTF_CLL_RBTScheduler_Functions all;
import from EPTF_MQTT_LocalTransport_Definitions all;
import from EPTF_MQTT_Transport_Definitions all;
import from MQTT_v3_1_1_Types all;
import from MQTT_v3_1_1_IPL4SizeFunction all;
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_init
//
// Purpose:
// The main initialization function of the <EPTF_MQTT_LocalTransport_CT> component type
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_init()
runs on EPTF_MQTT_LocalTransport_CT
{
if (v_EPTF_MQTT_LocalTransport_initialized) {return;}
f_EPTF_MQTT_LocalTransport_initLogging();
f_EPTF_CommPort_IPL4_init();
v_EPTF_MQTT_Transport_stats := c_EPTF_MQTT_Transport_Statistics_empty;
f_EPTF_MQTT_LocalTransport_socketDB_init();
var f_IPL4_getMsgLen fcb_msglen := refers(f_GetMsgLengthMQTT);
f_EPTF_CommPort_IPL4_setMsgLen4LGenType(fcb_msglen, {}, c_MQTT_Transport_LGenType);
f_EPTF_CommPort_IPL4_setReceive({asp_Event := ?},refers(f_EPTF_MQTT_IPL4asp_handleEvent), c_MQTT_Transport_LGenType);
f_EPTF_CommPort_IPL4_setReceive({asp_RecvFrom := ?},refers(f_EPTF_MQTT_IPL4asp_handleMessage), c_MQTT_Transport_LGenType);
f_EPTF_Base_registerCleanup(refers(f_EPTF_MQTT_LocalTransport_cleanup));
v_EPTF_MQTT_LocalTransport_initialized := true;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_initLogging
//
// Purpose:
// Initializing CLL's logging feature on the <EPTF_MQTT_LocalTransport_CT> component type
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_initLogging()
runs on EPTF_MQTT_LocalTransport_CT
{
f_EPTF_Logging_init_CT("MQTT_Transport");
v_EPTF_MQTT_LocalTransport_loggingMaskId :=
f_EPTF_Logging_registerComponentMasks(
"MQTT_LocalTransport_Logging",
{"WARNING", "DEBUG", "DEBUGV", "ERROR"},
EPTF_Logging_CLL
);
if(tsp_EPTF_MQTT_LocalTransport_debug){
f_EPTF_Logging_enableLocalMask(v_EPTF_MQTT_LocalTransport_loggingMaskId, c_MQTT_LocalTransport_Logging_DEBUG);
}
else {
f_EPTF_Logging_disableLocalMask(v_EPTF_MQTT_LocalTransport_loggingMaskId, c_MQTT_LocalTransport_Logging_DEBUG);
}
if(tsp_EPTF_MQTT_LocalTransport_debugVerbose) {
f_EPTF_Logging_enableLocalMask(v_EPTF_MQTT_LocalTransport_loggingMaskId, c_MQTT_LocalTransport_Logging_DEBUGV);
}
else {
f_EPTF_Logging_disableLocalMask(v_EPTF_MQTT_LocalTransport_loggingMaskId, c_MQTT_LocalTransport_Logging_DEBUGV);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_cleanup
//
// Purpose:
// The main clean up function for the <EPTF_MQTT_LocalTransport_CT> component type
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_cleanup()
runs on EPTF_MQTT_LocalTransport_CT
{
// Reset DBs, close connections
f_EPTF_MQTT_LocalTransport_socketDB_cleanUp();
vf_EPTF_MQTT_Transport_receiveMessage := null;
vf_EPTF_MQTT_Transport_receiveEvent := null;
v_EPTF_MQTT_LocalTransport_initialized := false;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_IPL4asp_handleEvent
//
// Purpose:
// Handler function to be registered into the IPL4 transport layer <EPTF_CLL_TransportIPL4_Functions>.
// It is used to receieve transport events from the underlying IPL4 transport layer.
// The function currently handles the connection closed event and forwards every event to the load generator
// layer's handler function.
//
// Parameters:
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_IPL4asp_handleEvent() runs on EPTF_MQTT_LocalTransport_CT
{
f_MQTT_Transport_Logging_DEBUG(log2str(": incoming event: ", v_EPTF_CommPort_IPL4_incomingMessage));
//{ asp_Event := { connClosed := { connId := 1, remName := "127.0.0.1", remPort := 1883, locName := "127.0.0.1", locPort := 20000, proto := { tcp := { } }, userData := 0 } } }
if (ischosen(v_EPTF_CommPort_IPL4_incomingMessage.asp_Event))
{
if (ischosen(v_EPTF_CommPort_IPL4_incomingMessage.asp_Event.connClosed))
{
var integer vl_socketIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp_connId(v_EPTF_CommPort_IPL4_incomingMessage.asp_Event.connClosed.connId);
var SocketEntry vl_socket;
if (f_EPTF_MQTT_LocalTransport_socketDB_get(vl_socketIdx, vl_socket))
{
var EPTF_MQTT_Transport_Response vl_apiRsp := c_EPTF_MQTT_Transport_Response_init;
vl_apiRsp.sessionIdx := vl_socket.sessionIdx;
vl_apiRsp.params.connectionClosed := vl_socket.localAddr;
vl_apiRsp.succ := true;
f_EPTF_MQTT_LocalTransport_sendApiResponse(vl_apiRsp);
}
}
}
if (vf_EPTF_MQTT_Transport_receiveEvent != null) {
vf_EPTF_MQTT_Transport_receiveEvent.apply(v_EPTF_CommPort_IPL4_incomingMessage.asp_Event);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_IPL4asp_handleMessage
//
// Purpose:
// Handler function to be regisitered into the IPL4 transport layer <EPTF_CLL_TransportIPL4_Functions>.
// It is used to receieve MQTT messages from the underlying IPL4 transport layer.
// The function looks up the entity that owns the particular connection and forwards the message and the
// entity information to the load generator layer
//
// Parameters:
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_IPL4asp_handleMessage() runs on EPTF_MQTT_LocalTransport_CT
{
var MQTT_v3_1_1_Message vl_MQTT_MSG;
var EPTF_MQTT_PDU vl_EPTF_MQTT_PDU;
f_EPTF_SchedulerComp_refreshSnapshotTime();
f_MQTT_v3_1_1_dec(v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.msg, vl_MQTT_MSG);
if (tsp_EPTF_MQTT_LocalTransport_debug) { action("MQTT transport receiving: \n", vl_MQTT_MSG); }
if (ischosen(vl_MQTT_MSG.msg))
{
vl_EPTF_MQTT_PDU.pdu := vl_MQTT_MSG.msg;
vl_EPTF_MQTT_PDU.transportParams.localAddress.hostName := v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.locName;
vl_EPTF_MQTT_PDU.transportParams.localAddress.portNumber := v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.locPort;
vl_EPTF_MQTT_PDU.transportParams.remoteAddress.hostName := v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.remName;
vl_EPTF_MQTT_PDU.transportParams.remoteAddress.portNumber := v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.remPort;
vl_EPTF_MQTT_PDU.transportParams.proto := v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.proto;
var SocketEntry vl_entry := c_SocketEntry_init;
var integer vl_sockIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp({
v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.locName,
v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.locPort
});
f_EPTF_MQTT_LocalTransport_socketDB_get(vl_sockIdx, vl_entry);
vl_EPTF_MQTT_PDU.sessionIdx := vl_entry.sessionIdx;
if (tsp_EPTF_MQTT_LocalTransport_debug) { action("MQTT transport receiving: sessionIdx: ", vl_EPTF_MQTT_PDU.sessionIdx); }
v_EPTF_MQTT_Transport_stats.nofReceivedMessages :=
v_EPTF_MQTT_Transport_stats.nofReceivedMessages + 1.0;
v_EPTF_MQTT_Transport_stats.nofReceivedBytes :=
v_EPTF_MQTT_Transport_stats.nofReceivedBytes + int2float(lengthof(v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.msg));
if (vf_EPTF_MQTT_Transport_receiveMessage != null) {
vf_EPTF_MQTT_Transport_receiveMessage.apply(vl_EPTF_MQTT_PDU);
}
}
else
{
f_MQTT_Transport_Logging_WARNING(
log2str(%definitionId, " Couldn't decode msg: ",v_EPTF_CommPort_IPL4_incomingMessage.asp_RecvFrom.msg, " decoded: ", vl_MQTT_MSG)
);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_send
//
// Purpose:
// Function to send out a <EPTF_MQTT_PDU> message using the local transport. It automatically
// looks up the corresponding <Socket> or creates it on the fly if it doesn't exist yet
//
// Parameters:
// pl_msg - *in* <EPTF_MQTT_PDU> - message to be sent
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_send(in EPTF_MQTT_PDU pl_msg)
runs on EPTF_MQTT_LocalTransport_CT
{
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId, " ", pl_msg));
var integer vl_sockIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp(pl_msg.transportParams.localAddress);
if (vl_sockIdx < 0)
{
if (not f_EPTF_MQTT_LocalTransport_connect(
pl_msg.transportParams.localAddress,
pl_msg.transportParams.remoteAddress,
pl_msg.sessionIdx, vl_sockIdx))
{
f_MQTT_Transport_Logging_WARNING(log2str(": couldn't create socket: ", pl_msg.transportParams.localAddress));
return;
}
}
f_EPTF_MQTT_LocalTransport_socketDB_get(vl_sockIdx, v_MQTT_LocalTransport_currentSocket);
if (tsp_EPTF_MQTT_LocalTransport_debug) { action("MQTT transport sending: \n", pl_msg.pdu); }
var octetstring v_encoded;
f_MQTT_v3_1_1_enc(MQTT_v3_1_1_Message: {msg := pl_msg.pdu}, v_encoded);
f_EPTF_MQTT_LocalTransport_IPL4_send
(
v_MQTT_LocalTransport_currentSocket.connId,
pl_msg.transportParams.remoteAddress.hostName,
pl_msg.transportParams.remoteAddress.portNumber,
pl_msg.transportParams.proto,
v_encoded
);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_transportApiRequest
//
// Purpose:
// Function to handle incoming transport API requests
//
// Parameters:
// pl_req - *in* <EPTF_MQTT_Transport_Request> - transport API request
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_transportApiRequest(EPTF_MQTT_Transport_Request pl_req)
runs on EPTF_MQTT_LocalTransport_CT
{
if (tsp_EPTF_MQTT_LocalTransport_debug) { action("MQTT transport receiving: \n", pl_req); }
if (ischosen(pl_req.params.startListening))
{
var EPTF_MQTT_Transport_Response vl_rsp := c_EPTF_MQTT_Transport_Response_init;
vl_rsp.sessionIdx := pl_req.sessionIdx;
var integer vl_sockIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp(pl_req.params.startListening.localAddress);
if (vl_sockIdx < 0)
{
if (not f_EPTF_MQTT_LocalTransport_startListening(pl_req.params.startListening.localAddress, pl_req.sessionIdx, vl_sockIdx))
{
f_MQTT_Transport_Logging_WARNING(log2str(": couldn't create socket: ", pl_req.params.startListening.localAddress));
vl_rsp.succ := false;
}
else { vl_rsp.succ := true; }
}
else { vl_rsp.succ := false; }
if (pl_req.expectResponse) { f_EPTF_MQTT_LocalTransport_sendApiResponse(vl_rsp); }
}
else if (ischosen(pl_req.params.connect_))
{
var EPTF_MQTT_Transport_Response vl_rsp := c_EPTF_MQTT_Transport_Response_init;
vl_rsp.sessionIdx := pl_req.sessionIdx;
var integer vl_sockIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp(pl_req.params.connect_.localAddress);
if (vl_sockIdx < 0)
{
if (not f_EPTF_MQTT_LocalTransport_connect(
pl_req.params.connect_.localAddress,
pl_req.params.connect_.remoteAddress,
pl_req.sessionIdx, vl_sockIdx))
{
f_MQTT_Transport_Logging_WARNING(log2str(": couldn't create socket: ", pl_req));
vl_rsp.succ := false;
}
else { vl_rsp.succ := true; }
}
else { vl_rsp.succ := false; }
if (pl_req.expectResponse) { f_EPTF_MQTT_LocalTransport_sendApiResponse(vl_rsp); }
}
else if (ischosen(pl_req.params.close))
{
var EPTF_MQTT_Transport_Response vl_rsp := c_EPTF_MQTT_Transport_Response_init;
vl_rsp.sessionIdx := pl_req.sessionIdx;
var integer vl_sockIdx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp(pl_req.params.close.localAddress);
if (vl_sockIdx >= 0)
{
if (not f_EPTF_MQTT_LocalTransport_close(vl_sockIdx))
{
f_MQTT_Transport_Logging_WARNING(log2str(": couldn't close socket: ", pl_req));
vl_rsp.succ := false;
}
else { vl_rsp.succ := true; }
}
else { vl_rsp.succ := false; }
if (pl_req.expectResponse) { f_EPTF_MQTT_LocalTransport_sendApiResponse(vl_rsp); }
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_sendApiResponse
//
// Purpose:
// Function to handle incoming transport API requests
//
// Parameters:
// pl_rsp - *in* <EPTF_MQTT_Transport_Response> - transport API response
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_sendApiResponse(EPTF_MQTT_Transport_Response pl_rsp)
runs on EPTF_MQTT_LocalTransport_CT
{
f_MQTT_Transport_Logging_WARNING(log2str(%definitionId,": ", pl_rsp));
if (vf_EPTF_MQTT_Transport_apiResponse != null)
{
vf_EPTF_MQTT_Transport_apiResponse.apply(pl_rsp);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_startListening
//
// Purpose:
// Start listening on a local TCP socket and store its data and state in socketDB
//
// Parameters:
// p_local - *in* <Socket> - local socket
// p_sessionIdx - *in* *integer* - session Id stored with the socket data in socketDB
// p_idx - *inout* *integer* - the index of the added element in the database
//
// Return Type:
// *boolean* - was the operation succesful?
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_startListening(in Socket p_local, in integer p_sessionIdx, inout integer p_idx)
runs on EPTF_MQTT_LocalTransport_CT
return boolean
{
var Result vl_result;
var SocketEntry vl_socketEntry := c_SocketEntry_init;
vl_result :=
f_EPTF_CommPort_IPL4_send(
{asp_Listen :=
{
locName := p_local.hostName,
locPort := p_local.portNumber,
proto := { tcp := {} }
}
},
null,
false,
false,
c_MQTT_Transport_LGenType);
if (f_EPTF_MQTT_IPL4asp_handleResult(vl_result)) // Everything went OK
{
vl_socketEntry.localAddr := p_local;
vl_socketEntry.remoteAddr := omit;
vl_socketEntry.connId := vl_result.connId;
vl_socketEntry.sessionIdx := p_sessionIdx;
vl_socketEntry.state := OPENED;
}
else
{
f_MQTT_Transport_Logging_WARNING(log2str(%definitionId," returning false "));
return false;
}
p_idx := f_EPTF_MQTT_LocalTransport_socketDB_add(vl_socketEntry);
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId," returning true"));
return true;
}
///////////////////////////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_connect
//
// Purpose:
// Open a TCP connection and store its data and state in socketDB
//
// Parameters:
// p_local - *in* <Socket> - local socket
// p_remote - *in* <Socket> - remote socket
// pl_sessionIdx - *in* *integer* - session Id stored with the connection data in socketDB
// p_idx - *inout* *integer* - the index of the added element in the database
//
// Return Value:
// *boolean* - was the operation succesful?
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_connect(in Socket p_local, in Socket p_remote, in integer p_sessionIdx, inout integer p_idx)
runs on EPTF_MQTT_LocalTransport_CT
return boolean
{
var Result vl_result;
var SocketEntry vl_socketEntry := c_SocketEntry_init;
vl_result :=
f_EPTF_CommPort_IPL4_send(
{asp_Connect :=
{
remName := p_remote.hostName,
remPort := p_remote.portNumber,
locName := p_local.hostName,
locPort := p_local.portNumber,
proto := { tcp := {} }
}
},
null,
false,
false,
c_MQTT_Transport_LGenType);
if (f_EPTF_MQTT_IPL4asp_handleResult(vl_result)) // Everything went OK
{
vl_socketEntry.localAddr := p_local;
vl_socketEntry.remoteAddr := p_remote;
vl_socketEntry.connId := vl_result.connId;
vl_socketEntry.sessionIdx := p_sessionIdx;
vl_socketEntry.state := OPENED;
}
else
{
f_MQTT_Transport_Logging_WARNING(log2str(%definitionId," returning false "));
return false;
}
p_idx := f_EPTF_MQTT_LocalTransport_socketDB_add(vl_socketEntry);
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId," returning true"));
return true;
}
///////////////////////////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_close
//
// Purpose:
// Close a TCP connection and remove its entry from the socketDB
//
// Parameters:
// p_socketIdx - *in* *integer* - the index of the connection in socketDB to be closed
//
// Return Value:
// *boolean* - was the operation succesful?
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_close(in integer p_socketIdx)
runs on EPTF_MQTT_LocalTransport_CT
return boolean
{
var Result vl_result;
var SocketEntry vl_sockEntry;
f_EPTF_MQTT_LocalTransport_socketDB_get(p_socketIdx, vl_sockEntry);
vl_result :=
f_EPTF_CommPort_IPL4_send(
{asp_Close :=
{
id := vl_sockEntry.connId,
proto := { tcp := {} }
}
},
null,
false,
false,
c_MQTT_Transport_LGenType
);
if (f_EPTF_MQTT_IPL4asp_handleResult(vl_result)) // Everything went OK
{
f_EPTF_MQTT_LocalTransport_socketDB_remove(p_socketIdx);
}
else
{
f_MQTT_Transport_Logging_WARNING(log2str(%definitionId," returning false "));
return false;
}
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId," returning true"));
return true;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_IPL4_send
//
// Purpose:
// Send a message and add 1 message and its length to MQTT transport stats variable
//
// Parameters:
// pl_connId - *in* *integer* - connection Id
// pl_remName - *in* *charstring* - name of the remote host
// pl_remPort - *in* *integer* - remote port number
// pl_proto - *in* <ProtoTuple> - type of protocol of the connection
// pl_msg - *in* *octetstring* - message to be sent
//
// Related Type:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_IPL4_send(
in integer pl_connId,
in charstring pl_remName,
in integer pl_remPort,
in ProtoTuple pl_proto,
in octetstring pl_msg)
runs on EPTF_MQTT_LocalTransport_CT
{
var Result v_res;
v_res := f_EPTF_CommPort_IPL4_send(
{
asp_SendTo :=
{
connId := pl_connId,
remName := pl_remName,
remPort := pl_remPort,
proto := pl_proto,
msg := pl_msg
}
},
null,
false,
false,
c_MQTT_Transport_LGenType
);
if (ispresent(v_res.errorCode))
{
f_MQTT_Transport_Logging_WARNING(log2str(": message could not be sent."));
}
v_EPTF_MQTT_Transport_stats.nofSentMessages := v_EPTF_MQTT_Transport_stats.nofSentMessages + 1.0;
v_EPTF_MQTT_Transport_stats.nofSentBytes := v_EPTF_MQTT_Transport_stats.nofSentBytes + int2float(lengthof(pl_msg));
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_init
//
// Purpose:
// Initializes the *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_init()
runs on EPTF_MQTT_LocalTransport_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_LocalTransport_localSocketDB.queue);
v_MQTT_LocalTransport_localSocketDB.data := {};
v_MQTT_LocalTransport_localSocketDB.hashRef := f_EPTF_str2int_HashMap_New(c_EPTF_MQTT_LocalTransport_SocketDB);
v_MQTT_LocalTransport_localSocketDB.connIdHashRef := f_EPTF_int2int_HashMap_New(c_EPTF_MQTT_LocalTransport_SocketDB_connId);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_cleanUp
//
// Purpose:
// Cleans up the reserved resources of the *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_cleanUp()
runs on EPTF_MQTT_LocalTransport_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_LocalTransport_localSocketDB.queue);
v_MQTT_LocalTransport_localSocketDB.data := {};
f_EPTF_str2int_HashMap_Delete(c_EPTF_MQTT_LocalTransport_SocketDB);
f_EPTF_int2int_HashMap_Delete(c_EPTF_MQTT_LocalTransport_SocketDB_connId);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_add
//
// Purpose:
// Adds a new element to the *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
//
// Parameters:
// p_sock - *in* <SocketEntry> - the element to be added
//
// Returns:
// *integer* - the index of the added element in the database
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_add(in SocketEntry p_sock)
runs on EPTF_MQTT_LocalTransport_CT
return integer
{
var integer v_idx := f_EPTF_MQTT_LocalTransport_socketDB_lookUp(p_sock.localAddr);
if (v_idx == -1)
{
v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_LocalTransport_localSocketDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_LocalTransport_localSocketDB.queue);
f_MQTT_Transport_Logging_DEBUG(log2str(%definitionId,": "," adding socket ", v_idx, " ", p_sock));
f_EPTF_str2int_HashMap_Insert(v_MQTT_LocalTransport_localSocketDB.hashRef, f_EPTF_MQTT_Socket2String(p_sock.localAddr), v_idx);
f_EPTF_int2int_HashMap_Insert(v_MQTT_LocalTransport_localSocketDB.connIdHashRef, p_sock.connId, v_idx);
v_MQTT_LocalTransport_localSocketDB.data[v_idx] := p_sock;
}
return v_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_lookUp
//
// Purpose:
// Gets the index of an <SocketEntry> element in *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
// based on its socket data
//
// Parameters:
// p_sock - *in* <Socket> - socket data of the element to be found
//
// Returns:
// *integer* - the index of the element, or -1 if not found
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_lookUp(in Socket p_sock)
runs on EPTF_MQTT_LocalTransport_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(v_MQTT_LocalTransport_localSocketDB.hashRef, f_EPTF_MQTT_Socket2String(p_sock), vl_idx);
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId," looking up p_sock: ",p_sock," found at idx: ", vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_lookUp_connId
//
// Purpose:
// Gets the index of an <SocketEntry> element in *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
// based on its connection id
//
// Parameters:
// p_connId - *in* *integer* - the connection id
//
// Returns:
// *integer* - the index of the element, or -1 if not found
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_lookUp_connId(in integer p_connId)
runs on EPTF_MQTT_LocalTransport_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_int2int_HashMap_Find(v_MQTT_LocalTransport_localSocketDB.connIdHashRef, p_connId, vl_idx);
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId," looking up p_connId: ",p_connId," found at idx: ", vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_get
//
// Purpose:
// Retrieves an element from the *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be retrieved
// p_sock - *inout* <SocketEntry> - the retrieved element
//
// Returns:
// boolean - was the operation successful?
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_get(in integer p_idx, inout SocketEntry p_sock)
runs on EPTF_MQTT_LocalTransport_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_LocalTransport_localSocketDB.data) and p_idx >= 0)
{
p_sock := v_MQTT_LocalTransport_localSocketDB.data[p_idx];
return true;
}
return false;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LocalTransport_socketDB_remove
//
// Purpose:
// Removes an element from the *v_MQTT_LocalTransport_localSocketDB* <SocketDB> database and frees up its reserved resources
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be removed
//
// Related Type:
// <SocketDB>
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LocalTransport_socketDB_remove(in integer p_idx)
runs on EPTF_MQTT_LocalTransport_CT
{
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId, ": ",p_idx));
f_EPTF_str2int_HashMap_Erase(
v_MQTT_LocalTransport_localSocketDB.hashRef,
f_EPTF_MQTT_Socket2String(v_MQTT_LocalTransport_localSocketDB.data[p_idx].localAddr)
);
f_EPTF_int2int_HashMap_Erase(
v_MQTT_LocalTransport_localSocketDB.connIdHashRef,
v_MQTT_LocalTransport_localSocketDB.data[p_idx].connId
);
v_MQTT_LocalTransport_localSocketDB.data[p_idx] := c_SocketEntry_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_LocalTransport_localSocketDB.queue);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_Socket2String
//
// Purpose:
// Converts socket data from <Socket> type to the format "<hostname>:<port number>"
//
// Parameters:
// p_sock - *in* <Socket> - Socket data to be converted
//
// Return:
// charstring - the converted socket data
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_Socket2String(Socket p_sock)
return charstring
{
return p_sock.hostName&":"&int2str(p_sock.portNumber);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_Transport_Logging_VERBOSE
//
// Purpose:
// Logging functions for the VERBOSE log level
//
// Parameters:
// pl_message - *in* *charstring* - string to be logged
//
// Related Types:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_MQTT_Transport_Logging_VERBOSE(in @lazy charstring pl_message)
runs on EPTF_MQTT_LocalTransport_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_EPTF_MQTT_LocalTransport_loggingMaskId, {c_MQTT_LocalTransport_Logging_DEBUGV});
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_Transport_Logging_DEBUG
//
// Purpose:
// Logging functions for the DEBUG log level
//
// Parameters:
// pl_message - *in* *charstring* - string to be logged
//
// Related Types:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_MQTT_Transport_Logging_DEBUG(in @lazy charstring pl_message)
runs on EPTF_MQTT_LocalTransport_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_EPTF_MQTT_LocalTransport_loggingMaskId, {c_MQTT_LocalTransport_Logging_DEBUG});
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_Transport_Logging_WARNING
//
// Purpose:
// Logging functions for the WARNING log level
//
// Parameters:
// pl_message - *in* *charstring* - string to be logged
//
// Related Types:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_MQTT_Transport_Logging_WARNING(in @lazy charstring pl_message)
runs on EPTF_MQTT_LocalTransport_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_EPTF_MQTT_LocalTransport_loggingMaskId, {c_MQTT_LocalTransport_Logging_WARNING});
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_Transport_Logging_ERROR
//
// Purpose:
// Logging functions for the ERROR log level
//
// Parameters:
// pl_message - *in* *charstring* - string to be logged
//
// Related Types:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_MQTT_Transport_Logging_ERROR(in @lazy charstring pl_message)
runs on EPTF_MQTT_LocalTransport_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_EPTF_MQTT_LocalTransport_loggingMaskId, {c_MQTT_LocalTransport_Logging_ERROR});
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_IPL4asp_handleResult
//
// Purpose:
// Checks the result of an IPL4 transport operation and records an MQTT transport stats warning if unsuccessful
//
// Parameters:
// p_res - *inout* *Result* - result of an IPL4 transport operation
//
// Return Type:
// boolen - *true*: no error or "Temporary unavailable", *false*: error different from "Temporary unavailable"
//
// Related Types:
// <EPTF_MQTT_LocalTransport_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_IPL4asp_handleResult(inout Result p_res)
runs on EPTF_MQTT_LocalTransport_CT
return boolean
{
f_MQTT_Transport_Logging_VERBOSE(log2str(%definitionId,": ", p_res));
if (ispresent(p_res.errorCode) and (p_res.errorCode != IPL4_ERROR_TEMPORARILY_UNAVAILABLE))
{
f_MQTT_Transport_Logging_WARNING(log2str("Warning: f_EPTF_MQTT_IPL4asp_handleResult: IPL4 error: ", p_res));
v_EPTF_MQTT_Transport_stats.nofTransportWarnings := v_EPTF_MQTT_Transport_stats.nofTransportWarnings + 1;
return false;
}
return true;
}
}