blob: 699e602fa6def9db38f3c2ae5a984fffd4137205 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2021 Ericsson Telecom AB
//
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v2.0
// which accompanies this distribution, and is available at
// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html
///////////////////////////////////////////////////////////////////////////////
// File: EPTF_MQTT_LocalTransport_Definitions.ttcn
// Description:
// Rev: R1B
// Prodnr: CNL 113 860
// Updated: 2021-02-03
// Contact: http://ttcn.ericsson.se
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
// Module: EPTF_MQTT_LGen_Functions
//
// Purpose:
// This module contains the functions of the MQTT load generator component
//
// See also:
// <EPTF_MQTT_LGen_Definitions>
///////////////////////////////////////////////////////////////
module EPTF_MQTT_LGen_Functions {
import from EPTF_MQTT_LGen_Definitions all;
import from EPTF_MQTT_Transport_Definitions all;
import from EPTF_CLL_Base_Functions all;
import from EPTF_CLL_Common_Definitions all;
import from EPTF_CLL_Variable_Definitions all;
import from EPTF_CLL_Variable_Functions all;
import from EPTF_CLL_LGenBase_Definitions all;
import from EPTF_CLL_LGenBase_ConfigFunctions all;
import from EPTF_CLL_LGenBase_Functions all;
import from EPTF_CLL_LGenBase_EventHandlingFunctions all;
import from EPTF_CLL_Logging_Definitions all;
import from EPTF_CLL_Logging_Functions all;
import from EPTF_CLL_FBQ_Functions all;
import from EPTF_CLL_HashMapStr2Int_Functions all;
import from EPTF_CLL_Scheduler_Definitions all;
import from EPTF_CLL_RBTScheduler_Functions all;
import from TCCConversion_Functions all;
import from MQTT_v3_1_1_Types all;
import from IPL4asp_Types all;
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_init
//
// Purpose:
// The main initialization function for the <EPTF_MQTT_LGen_CT> component type
//
// Parameters:
// pl_name - *in* *charstring* - the name for the component instance
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_init(in charstring pl_name)
runs on EPTF_MQTT_LGen_CT
{
if (v_MQTT_initialized){return;}
f_EPTF_LGenBase_init(pl_name, 0, pl_name);
f_EPTF_Logging_init_CT(pl_name);
f_EPTF_str2int_HashMap_Init();
v_MQTT_bIdx := f_EPTF_LGenBase_declareBehaviorType(
c_MQTT_behaviorType,
tsp_EPTF_MQTT_LGen_maxBindableCtx,
refers(f_MQTT_eCtxReset),
refers(f_MQTT_eCtxBind),
refers(f_MQTT_eCtxUnbind)
);
v_MQTT_loggingMaskId :=
f_EPTF_Logging_registerComponentMasks(
"MQTT_Logging",
{"WARNING", "DEBUG", "DEBUGV", "ERROR" },
EPTF_Logging_CLL);
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": my behavior idx is ", v_MQTT_bIdx));
f_EPTF_MQTT_addressDB_init();
f_EPTF_MQTT_templateDB_init();
f_EPTF_MQTT_sessionDB_init();
f_EPTF_MQTT_subscriptionDB_init();
f_EPTF_MQTT_publishDB_init();
f_EPTF_MQTT_declareSteps();
f_EPTF_MQTT_declareEvents();
f_EPTF_Base_registerCleanup(refers(f_MQTT_cleanUp));
v_MQTT_initialized := true;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_initLogging
//
// Purpose:
// Initializing CLL's logging feature on the <EPTF_MQTT_LGen_CT> component type
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_initLogging()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_Logging_init_CT("MQTT_LGen");
v_MQTT_loggingMaskId :=
f_EPTF_Logging_registerComponentMasks(
"MQTT_LGen_Logging",
{"WARNING", "DEBUG", "DEBUGV", "ERROR"},
EPTF_Logging_CLL
);
if(tsp_EPTF_MQTT_LGen_debug){
f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
}
else {
f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
}
if(tsp_EPTF_MQTT_LGen_debugVerbose) {
f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
}
else {
f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_cleanUp
//
// Purpose:
// The main clean up function for the <EPTF_MQTT_LGen_CT> component type
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_addressDB_cleanUp();
f_EPTF_MQTT_templateDB_cleanUp();
f_EPTF_MQTT_sessionDB_cleanUp();
f_EPTF_MQTT_subscriptionDB_cleanUp();
f_EPTF_MQTT_publishDB_cleanUp();
vf_MQTT_msgReceived := null;
v_MQTT_initialized := false;
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_eCtxBind
//
// Purpose:
// This function is called by the CLL for each entity instance created on a particular instace of <EPTF_MQTT_LGen_CT>
//
// Parameters:
// pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
//
// Returns:
// <EPTF_IntegerList> - The list will contain the index of the entity the context belongs to
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_eCtxBind(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
return EPTF_IntegerList
{
return {pl_eIdx};
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_eCtxUnbind
//
// Purpose:
// The reverse operation of <f_MQTT_eCtxBind>. Cleans up resources reserved during <f_MQTT_eCtxBind>. Called by the CLL.
//
// Parameters:
// pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_eCtxUnbind(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
{
if (not v_MQTT_initialized) {return;}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_eCtxReset
//
// Purpose:
// The resources reserved during <f_MQTT_eCtxBind> are reinitalized (reset). Called by the CLL.
//
// Parameters:
// pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_eCtxReset(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
{
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_declareEvents
//
// Purpose:
// Declares the FSM events to the CLL framework implemented by <EPTF_MQTT_LGen_CT>
//
// Related Types:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_declareEvents()
runs on EPTF_MQTT_LGen_CT
{
var integer vl_dummy;
if (
c_MQTT_eventIdx_transportSucc != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportSucc) or
c_MQTT_eventIdx_transportFail != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportFail) or
c_MQTT_eventIdx_transportEstablished != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportEstablished) or
c_MQTT_eventIdx_transportClosed != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportClosed) or
c_MQTT_eventIdx_CONNACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Accepted) or
c_MQTT_eventIdx_CONNACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Refused) or
c_MQTT_eventIdx_SUBACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Accepted) or
c_MQTT_eventIdx_SUBACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Refused) or
c_MQTT_eventIdx_UNSUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_UNSUBACK) or
c_MQTT_eventIdx_PUBLISH != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH) or
c_MQTT_eventIdx_PING_Request != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Request) or
c_MQTT_eventIdx_PING_Response != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Response) or
c_MQTT_eventIdx_PUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBACK) or
c_MQTT_eventIdx_PUBREC != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREC) or
c_MQTT_eventIdx_PUBREL != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREL) or
c_MQTT_eventIdx_PUBCOMP != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBCOMP) or
c_MQTT_eventIdx_PUBLISH_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH_Timeout) or
c_MQTT_eventIdx_SUBSCRIBE_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBSCRIBE_Timeout)
)
{
f_EPTF_LGenBase_log();
log("error"); mtc.stop
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_declareSteps
//
// Purpose:
// Declares the FSM steps to the CLL framework implemented by <EPTF_MQTT_LGen_CT>
//
// Related Types:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_declareSteps()
runs on EPTF_MQTT_LGen_CT
{
if (
c_MQTT_stepIdx_init != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_init, refers(f_MQTT_step_init)}) or
c_MQTT_stepIdx_cleanUp != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_cleanUp, refers(f_MQTT_step_cleanUp) }) or
c_MQTT_stepIdx_setLocalAddress != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress, refers(f_MQTT_step_setLocalAddress)}) or
c_MQTT_stepIdx_setLocalAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress_byVars, refers(f_MQTT_step_setLocalAddress_byVars)}) or
c_MQTT_stepIdx_setRemoteAddress != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress, refers(f_MQTT_step_setRemoteAddress)}) or
c_MQTT_stepIdx_setRemoteAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress_byVars, refers(f_MQTT_step_setRemoteAddress_byVars)}) or
c_MQTT_stepIdx_transportConnect != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportConnect, refers(f_MQTT_step_transportConnect)}) or
c_MQTT_stepIdx_transportClose != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportClose, refers(f_MQTT_step_transportClose)}) or
c_MQTT_stepIdx_startListening != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_startListening, refers(f_MQTT_step_startListening)}) or
c_MQTT_stepIdx_loadTemplate_byIntIdx != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byIntIdx, refers(f_MQTT_step_loadTemplate_byIntIdx)}) or
c_MQTT_stepIdx_loadTemplate_byStringId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byStringId, refers(f_MQTT_step_loadTemplate_byStringId)}) or
c_MQTT_stepIdx_send != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_send, refers(f_MQTT_step_send)}) or
c_MQTT_stepIdx_setTopic_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_stringParam, refers(f_MQTT_step_setTopic_stringParam)}) or
c_MQTT_stepIdx_setTopic_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_stringParam, refers(f_MQTT_step_setTopic_add_stringParam)}) or
c_MQTT_stepIdx_setTopic_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_varParams, refers(f_MQTT_step_setTopic_add_varParams)}) or
c_MQTT_stepIdx_setTopic_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_clientId, refers(f_MQTT_step_setTopic_add_clientId)}) or
c_MQTT_stepIdx_setQos_intParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setQos_intParam, refers(f_MQTT_step_setQos_intParam)}) or
c_MQTT_stepIdx_setPublishMessage_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_stringParam, refers(f_MQTT_step_setPublishMessage_stringParam)}) or
c_MQTT_stepIdx_setPublishMessage_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_stringParam, refers(f_MQTT_step_setPublishMessage_add_stringParam)}) or
c_MQTT_stepIdx_setPublishMessage_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_varParams, refers(f_MQTT_step_setPublishMessage_add_varParams)}) or
c_MQTT_stepIdx_setPublishMessage_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_clientId, refers(f_MQTT_step_setPublishMessage_add_clientId)}) or
c_MQTT_stepIdx_reportPingResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPingResponse, refers(f_MQTT_step_reportPingResponse)}) or
c_MQTT_stepIdx_reportPublishResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPublishResponse, refers(f_MQTT_step_reportPublishResponse)})
)
{
f_EPTF_LGenBase_log();
log("EPTF_MQTT_LGen declaration error"); mtc.stop
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_receiveMessage
//
// Purpose:
// The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <EPTF_MQTT_PDU> message
// to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function.
//
// Parameters:
// pl_message - *in* <EPTF_MQTT_PDU> - received message
//
// Related Types:
// - <EPTF_MQTT_LGen_CT>
// - <fcb_EPTF_MQTT_Transport_receiveMessage>
// - <EPTF_MQTT_Transport_Provider_CT>
// - <EPTF_MQTT_Transport_User_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_receiveMessage(in EPTF_MQTT_PDU pl_message)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", pl_message));
v_MQTT_msgToProcess := pl_message;
f_EPTF_MQTT_updateMessageStatistics(v_MQTT_stats.incoming, pl_message);
f_EPTF_MQTT_stack_fromEnv(v_MQTT_msgToProcess);
if (vf_MQTT_msgReceived != null)
{
vf_MQTT_msgReceived.apply(v_MQTT_msgToProcess);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_receiveEvent
//
// Purpose:
// The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <ASP_Event> events
// to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function.
//
// Parameters:
// pl_message - *in* <ASP_Event> - received event
//
// Related Types:
// - <EPTF_MQTT_LGen_CT>
// - <fcb_EPTF_MQTT_Transport_receiveEvent>
// - <EPTF_MQTT_Transport_Provider_CT>
// - <EPTF_MQTT_Transport_User_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_receiveEvent(in ASP_Event p_event)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", p_event));
// TODO: connection closed handling!
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_transportApiResponse
//
// Purpose:
// The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <EPTF_MQTT_Transport_Response> responses
// to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function.
//
// Parameters:
// pl_rsp - *in* <EPTF_MQTT_Transport_Response> - received transport api response
//
// Related Types:
// - <EPTF_MQTT_LGen_CT>
// - <fcb_EPTF_MQTT_Transport_apiResponse>
// - <EPTF_MQTT_Transport_Provider_CT>
// - <EPTF_MQTT_Transport_User_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_transportApiResponse(in EPTF_MQTT_Transport_Response pl_rsp)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str("api response: ", pl_rsp));
if (f_EPTF_MQTT_sessionDB_get(pl_rsp.sessionIdx, v_MQTT_session))
{
if (ispresent(pl_rsp.params))
{
if (ischosen(pl_rsp.params.connectionClosed))
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportClosed, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
else if (ischosen(pl_rsp.params.tcpEstablished))
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportEstablished, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
}
else if (pl_rsp.succ)
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportSucc, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
else
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportFail, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
}
else
{
f_EPTF_MQTT_Logging_VERBOSE(log2str("session not found for api response: ", pl_rsp));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_LGen_send
//
// Purpose:
// This function is used to send out a message of a <EPTF_MQTT_PDU> using the registered
// function <fcb_EPTF_MQTT_Transport_sendMessage> of the underlying transport layer instance.
//
// Parameters:
// p_msg - *intout* <EPTF_MQTT_PDU> - the message to be sent
//
// Related Types:
// <EPTF_MQTT_PDU>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_LGen_send(inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_updateMessageStatistics(v_MQTT_stats.outgoing, p_msg);
vf_EPTF_MQTT_Transport_send.apply(p_msg);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_init
//
// Purpose:
// Test Step to dynamically allocate and initialize the MQTT FSM context for the caller FSM. Prerequisite to call any other MQTT test step.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_init>
// - <c_MQTT_stepName_init>
///////////////////////////////////////////////////////////
function f_MQTT_step_init(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
var integer vl_eIdx := pl_ptr.eIdx;
var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
var integer vl_newSessionIdx := -1;
if (f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_newSessionIdx)) { return }
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId, ": initializing fsm ", vl_fsmIdx, " for entity ",vl_eIdx));
var MQTT_Session vl_session := c_MQTT_Session_init;
vl_session.eIdx := vl_eIdx;
vl_session.fsmIdx := pl_ptr.refContext.fCtxIdx;
vl_newSessionIdx := f_EPTF_MQTT_sessionDB_add(vl_session);
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, vl_newSessionIdx);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_cleanUp
//
// Purpose:
// Test Step to free up the MQTT FSM context for the caller FSM. Frees up all allocated instances that were used by this FSM instance.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_cleanUp>
// - <c_MQTT_stepName_cleanUp>
///////////////////////////////////////////////////////////
function f_MQTT_step_cleanUp(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
var integer vl_eIdx:=pl_ptr.eIdx;
var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
var integer vl_sessionIdx := -1;
if (not f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_sessionIdx))
{
f_EPTF_MQTT_Logging_DEBUG(%definitionId&": FSM has not been initialized");
return;
}
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": FSM database: ",v_MQTT_sessionDB.data[vl_sessionIdx]));
f_EPTF_MQTT_session_remove(vl_sessionIdx);
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_setLocalAddress
//
// Purpose:
// Test step to set the local address in the entity context.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
// pl_ptr.refContext.fRefArgs[0] - *integer* - Index of the socket to use as local address
//
// Related Constants:
// - <c_MQTT_stepIdx_setLocalAddress>
// - <c_MQTT_stepName_setLocalAddress>
///////////////////////////////////////////////////////////
function f_MQTT_step_setLocalAddress(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_setLocalAddress_byVars
//
// Purpose:
// Test step to set the local address in the entity context.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param: remoteHost: charstring, 2nd param: remotePort: integer)
//
// Related Constants:
// - <c_MQTT_stepIdx_setLocalAddress_byVars>
// - <c_MQTT_stepName_setLocalAddress_byVars>
///////////////////////////////////////////////////////////
function f_MQTT_step_setLocalAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
if (sizeof(vl_varIds)==2)
{
var EPTF_Var_DirectContent vl_host, vl_port;
f_EPTF_Var_getContent(vl_varIds[0], vl_host);
f_EPTF_Var_getContent(vl_varIds[1], vl_port);
if (not ischosen(vl_host.charstringVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
return;
}
if (not ischosen(vl_port.intVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
return;
}
f_EPTF_MQTT_addressDB_add(
{
hostName := vl_host.charstringVal,
portNumber := vl_port.intVal
},
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx
);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_setRemoteAddress
//
// Purpose:
// Test step to set the remote address in the FSM context.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
// pl_ptr.refContext.fRefArgs[0] - *integer* - Index of the socket to use as local address
//
// Related Constants:
// - <c_MQTT_stepIdx_setRemoteAddress>
// - <c_MQTT_stepName_setRemoteAddress>
///////////////////////////////////////////////////////////
function f_MQTT_step_setRemoteAddress(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_setRemoteAddress_byVars
//
// Purpose:
// Test step to set the remote address in the FSM context.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param: remoteHost: charstring, 2nd param: remotePort: integer)
//
// Related Constants:
// - <c_MQTT_stepIdx_setRemoteAddress_byVars>
// - <c_MQTT_stepName_setRemoteAddress_byVars>
///////////////////////////////////////////////////////////
function f_MQTT_step_setRemoteAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
if (sizeof(vl_varIds)==2)
{
var EPTF_Var_DirectContent vl_host, vl_port;
f_EPTF_Var_getContent(vl_varIds[0], vl_host);
f_EPTF_Var_getContent(vl_varIds[1], vl_port);
if (not ischosen(vl_host.charstringVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
return;
}
if (not ischosen(vl_port.intVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
return;
}
f_EPTF_MQTT_addressDB_add(
{
hostName := vl_host.charstringVal,
portNumber := vl_port.intVal
},
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx
);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_startListening
//
// Purpose:
// The test step expects that a transport endpoint is set in the addressDB as a local address.
// The step will initiate allocating the local address associated with the current session in the MQTT context
// and call the callback function to start listening.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_startListening>
// - <c_MQTT_stepName_startListening>
//
// Related Steps:
// - <f_MQTT_step_setLocalAddress_byVars>
//
// Related Callback Function Type:
// <fcb_EPTF_MQTT_Transport_apiRequest>
///////////////////////////////////////////////////////////
function f_MQTT_step_startListening(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_addressDB_get(vl_req.params.startListening.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_transportConnect
//
// Purpose:
// The test step expects that local and remote socket adresses are set in the addressDB.
// The step will initiate allocating the local and remote addresses associated with the current session in the MQTT context
// and call the callback function to establish a connection.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_transportConnect>
// - <c_MQTT_stepName_transportConnect>
//
// Related Events:
// - <c_MQTT_eventIdx_transportSucc>
// - <c_MQTT_eventIdx_transportFail>
//
// Related Steps:
// - <f_MQTT_step_setLocalAddress_byVars>
//
// Related Callback Function Type:
// <fcb_EPTF_MQTT_Transport_apiRequest>
///////////////////////////////////////////////////////////
function f_MQTT_step_transportConnect(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_transportClose
//
// Purpose:
// The test step expects that a transport endpoint is set in the addressDB as a local address.
// The step will call the callback function to close the connection by the local address associated with the current session in the MQTT context.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param int: expectResponse (optional))
//
// Related Constants:
// - <c_MQTT_stepIdx_transportClose>
// - <c_MQTT_stepName_transportClose>
//
// Related Steps:
// - <f_MQTT_step_startListening>
// - <f_MQTT_step_transportConnect>
//
// Related Callback Function Type:
// <fcb_EPTF_MQTT_Transport_apiRequest>
///////////////////////////////////////////////////////////
function f_MQTT_step_transportClose(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_expectResponseInt := 1;
var boolean vl_expectResponse := true;
if (f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_expectResponseInt))
{
if (vl_expectResponseInt <= 0) { vl_expectResponse := false; }
}
if (v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx >= 0)
{
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
vl_req.expectResponse := vl_expectResponse;
f_EPTF_MQTT_addressDB_get(vl_req.params.close.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_loadTemplate_byIntIdx
//
// Purpose:
// Test step to load a <MQTT_Template> from <tsp_EPTF_MQTT_LGen_templates> into *v_MQTT_msgToSend*
// (which can be sent using the send test step) by its integer index in test step args.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_loadTemplate_byIntIdx>
// - <c_MQTT_stepName_loadTemplate_byIntIdx>
//
// Related Function:
// <f_MQTT_step_send>
///////////////////////////////////////////////////////////
function f_MQTT_step_loadTemplate_byIntIdx(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_templateIdx := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_templateIdx);
f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_loadTemplate_byStringId
//
// Purpose:
// Test step to load a <MQTT_Template> from <tsp_EPTF_MQTT_LGen_templates> into *v_MQTT_msgToSend*
// (which can be sent using the send test step) by its string Id.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_loadTemplate_byStringId>
// - <c_MQTT_stepName_loadTemplate_byStringId>
//
// Related Function:
// <f_MQTT_step_send>
///////////////////////////////////////////////////////////
function f_MQTT_step_loadTemplate_byStringId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_templateId := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
var integer vl_templateIdx := f_EPTF_MQTT_templateDB_lookUp(vl_templateId);
if (vl_templateIdx >= 0)
{
f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find template with id: ", vl_templateId));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setTopic_stringParam
//
// Purpose:
// Test step to set the string value referred by the test step argument as the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: topic name charstring)
//
// Related Constants:
// - <c_MQTT_stepIdx_setTopic_stringParam>
// - <c_MQTT_stepName_setTopic_stringParam>
//
///////////////////////////////////////////////////////////
function f_MQTT_step_setTopic_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter := f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name := f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setTopic_add_stringParam
//
// Purpose:
// Test step to add the string value referred by the test step argument to the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: topic name charstring)
//
// Related Constants:
// - <c_MQTT_stepIdx_setTopic_add_stringParam>
// - <c_MQTT_stepName_setTopic_add_stringParam>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setTopic_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setTopic_add_varParams
//
// Purpose:
// Test step to add the string value of variables referred by the test step argument to the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(params: variables)
//
// Related Constants:
// - <c_MQTT_stepIdx_setTopic_add_varParams>
// - <c_MQTT_stepName_setTopic_add_varParams>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setTopic_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_strToAdd := "";
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
{
var EPTF_Var_DirectContent vl_var;
f_EPTF_Var_getContent(vl_varIds[i], vl_var);
if (ischosen(vl_var.charstringVal)) {
vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
}
else if (ischosen(vl_var.intVal)) {
vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
}
}
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(vl_strToAdd);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(vl_strToAdd);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setTopic_add_clientId
//
// Purpose:
// Test step to add client ID in the current session to the topic in the first subscription in SUBSCRIBE and to PUBLISH message.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(param: clientId)
//
// Related Constants:
// - <c_MQTT_stepIdx_setTopic_add_clientId>
// - <c_MQTT_stepName_setTopic_add_clientId>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setTopic_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setQos_intParam
//
// Purpose:
// Test step to set the QoS level in SUBSCRIBE and PUBLISH messages using test step arguments
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: qos level (0,1,2) integer)
//
// Related Constants:
// - <c_MQTT_stepIdx_setQos_intParam>
// - <c_MQTT_stepName_setQos_intParam>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setQos_intParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_qos := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_qos);
if (vl_qos >=0 and vl_qos < 3)
{
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].requested_qos := f_EPTF_MQTT_qos_int2enum(vl_qos);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.header.qos_level := f_EPTF_MQTT_qos_int2enum(vl_qos);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid qos level: ", vl_qos));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setPublishMessage_stringParam
//
// Purpose:
// Test step to set the content of the payload in PUBLISH message.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: publish message charstring)
//
// Related Constants:
// - <c_MQTT_stepIdx_setPublishMessage_stringParam>
// - <c_MQTT_stepName_setPublishMessage_stringParam>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setPublishMessage_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload := char2oct(vl_msg);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setPublishMessage_add_stringParam
//
// Purpose:
// Test step to concatenate a string to the content of the PUBLISH message
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: publish message charstring)
//
// Related Constants:
// - <c_MQTT_stepIdx_setPublishMessage_add_stringParam>
// - <c_MQTT_stepName_setPublishMessage_add_stringParam>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
////////////////////////////////////////////////////////////
function f_MQTT_step_setPublishMessage_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(vl_msg);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setPublishMessage_add_varParams
//
// Purpose:
// Test step to add the content of a set of variables to the payload of a PUBLISH message.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(params: variables)
//
// Related Constants:
// - <c_MQTT_stepIdx_setPublishMessage_add_varParams>
// - <c_MQTT_stepName_setPublishMessage_add_varParams>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setPublishMessage_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (not ischosen(v_MQTT_msgToSend.pdu.publish)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
return;
}
var charstring vl_strToAdd := "";
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
{
var EPTF_Var_DirectContent vl_var;
f_EPTF_Var_getContent(vl_varIds[i], vl_var);
if (ischosen(vl_var.charstringVal)) {
vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
}
else if (ischosen(vl_var.intVal)) {
vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
}
}
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(vl_strToAdd);
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_setPublishMessage_add_clientId
//
// Purpose:
// Test step to add client ID of the current session to the payload of a PUBLISH message.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(param: clientId)
//
// Related Constants:
// - <c_MQTT_stepIdx_setPublishMessage_add_clientId>
// - <c_MQTT_stepName_setPublishMessage_add_clientId>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_setPublishMessage_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_reportPingResponse
//
// Purpose:
// Test step to set the report ping response to enable/disable using using step arguments.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args - 1:enable, 0:disable
//
// Related Constants:
// - <c_MQTT_stepIdx_reportPingResponse>
// - <c_MQTT_stepName_reportPingResponse>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_reportPingResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_enable := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);
if (vl_enable >=0 and vl_enable < 2)
{
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPingResponse := (vl_enable == 1);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
}
}
/////////////////////////////////////////////////////////////
// Function: f_MQTT_step_reportPublishResponse
//
// Purpose:
// Test step to set the report publish response to enable/disable using step arguments
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: enable (1)/disable (0): integer)
//
// Related Constants:
// - <c_MQTT_stepIdx_reportPublishResponse>
// - <c_MQTT_stepName_reportPublishResponse>
//
// Related Type:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_MQTT_step_reportPublishResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_enable := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);
if (vl_enable >=0 and vl_enable < 2)
{
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPublishResponse := (vl_enable == 1);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
}
}
///////////////////////////////////////////////////////////
// Function: f_MQTT_step_send
//
// Purpose:
// Test step to send out an MQTT message from *v_MQTT_msgToSend*.
// The message will be processed by the Applib's MQTT stack
// The step expects the localAddress and the remoteAddress to be configured in addressDB.
//
// Parameters:
// pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
//
// Related Constants:
// - <c_MQTT_stepIdx_send>
// - <c_MQTT_stepName_send>
//
// Related Functions:
// - <f_MQTT_step_loadTemplate_byIntIdx>
// - <f_MQTT_step_loadTemplate_byStringId>
//
// Related functions:
// <f_EPTF_MQTT_stack_fromApp>
///////////////////////////////////////////////////////////
function f_MQTT_step_send(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," ",pl_ptr));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
v_MQTT_msgToSend.transportParams.proto := {tcp := {}};
v_MQTT_msgToSend.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," msg to send: ",v_MQTT_msgToSend));
f_EPTF_MQTT_stack_fromApp(v_MQTT_msgToSend, v_MQTT_ctx);
f_EPTF_SchedulerComp_refreshSnapshotTime();
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_init
//
// Purpose:
// Function to initialize the addressDB
//
// Parameters:
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
v_MQTT_addressDB.data := {};
v_MQTT_addressDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_addressDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_cleanUp
//
// Purpose:
// Function to clean up the address database and release its resources
//
// Parameters:
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
v_MQTT_addressDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_addressDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_add
//
// Purpose:
// Add a socket address to the addressDB and return its index if no such entry yet,
// or return its index if already exists
//
// Parameters:
// p_addr - *in* <Socket> - socket address
// p_idx - *inout* *integer* - index of the address entry
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_add(in Socket p_addr, inout integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
p_idx := f_EPTF_MQTT_addressDB_lookUp(p_addr);
if (p_idx == -1)
{
p_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_addressDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_addressDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding target address ", p_idx, " ", p_addr));
f_EPTF_str2int_HashMap_Insert(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_addr), p_idx);
v_MQTT_addressDB.data[p_idx] := p_addr;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_get
//
// Purpose:
// Get a socket address from the addressDB by its index
//
// Parameters:
// p_addr - *inout* <Socket> - returned socket address
// p_idx - *in* *integer* - index of the address to get
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_get(inout Socket p_addr, in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
if (p_idx < sizeof(v_MQTT_addressDB.data) and p_idx >=0)
{
p_addr := v_MQTT_addressDB.data[p_idx];
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(": "," couldn't get address at ", p_idx));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_lookUp
//
// Purpose:
// Get the index of a socket entry in addressDB
//
// Parameters:
// p_sock - *in* <Socket> - socket address
//
// Return Type:
// *integer* - The index of the socket entry
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_lookUp(in Socket p_sock)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_sock), vl_idx);
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_addressDB_Socket2String
//
// Purpose:
// Converts a socket address in <Socket> type format to the string format "<IP address>:<port number>" to be used as hash key
//
// Parameters:
// p_sock - *inout* <Socket> - socket address
//
// Return Type:
// *charstring* - Socket address in string format
//
// Related Type:
// <MQTT_Address_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_addressDB_Socket2String(Socket p_sock)
return charstring
{
return p_sock.hostName&":"&int2str(p_sock.portNumber);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_templateDB_init
//
// Purpose:
// Initializes the *v_MQTT_templateDB* <MQTT_Template_DB> database by adding the templates given in <tsp_EPTF_MQTT_LGen_templates>
//
// Related Type:
// <MQTT_Template_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_templateDB_init()
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_templateDB.data := {};
v_MQTT_templateDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_templateDB_Hash");
for (var integer i:=0; i<sizeof(tsp_EPTF_MQTT_LGen_templates); i:=i+1) {
f_EPTF_MQTT_templateDB_add(tsp_EPTF_MQTT_LGen_templates[i]);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_templateDB_add
//
// Purpose:
// Adds a new element to the *v_MQTT_templateDB* <MQTT_Template_DB> database
//
// Parameters:
// p_template - *in* <MQTT_Template> - the element to be added
//
// Returns:
// *integer* - the index of the added element in the database
//
// Related Type:
// <MQTT_Template_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_templateDB_add(in MQTT_Template p_template)
runs on EPTF_MQTT_LGen_CT
return integer
{
if (f_EPTF_MQTT_templateDB_lookUp(p_template.id)!=-1)
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " template is already added with id: ", p_template.id));
return -1;
}
var integer v_idx := sizeof(v_MQTT_templateDB.data);
v_MQTT_templateDB.data[v_idx] := p_template;
f_EPTF_str2int_HashMap_Insert(v_MQTT_templateDB.hashRef, p_template.id, v_idx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template was added with id: ", p_template.id, " at idx: ",v_idx));
return v_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_templateDB_lookUp
//
// Purpose:
// Gets the index of an <MQTT_Template> element in *v_MQTT_templateDB* <MQTT_Template_DB> database
//
// Parameters:
// p_id - *in* *charstring* - the id of the <MQTT_Template>
//
// Returns:
// *integer* - the index of the searched template in the database, or -1 if not found
//
// Related Type:
// <MQTT_Template_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_templateDB_lookUp(in charstring p_id)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
if (not f_EPTF_str2int_HashMap_Find(v_MQTT_templateDB.hashRef, p_id, vl_idx))
{
vl_idx := -1;
}
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_templateDB_get
//
// Purpose:
// Retrieves an element from the *v_MQTT_templateDB* <MQTT_Template_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be retrieved
// p_pdu - *inout* <MQTT_v3_1_1_ReqResp> - the retrieved element
//
// Related Type:
// <MQTT_Template_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_templateDB_get(in integer p_idx, inout MQTT_v3_1_1_ReqResp p_pdu)
runs on EPTF_MQTT_LGen_CT
{
if (p_idx < sizeof(v_MQTT_templateDB.data) and p_idx >= 0)
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template is fetched with idx: ", p_idx));
p_pdu := v_MQTT_templateDB.data[p_idx].msg;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_templateDB_cleanUp
//
// Purpose:
// Cleans up the reserved resources of the *v_MQTT_templateDB* <MQTT_Template_DB> database
//
// Related Type:
// <MQTT_Template_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_templateDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_templateDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_templateDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_init
//
// Purpose:
// Initializes the *v_MQTT_sessionDB* <MQTT_Session_DB> database and adds its hash to *v_MQTT_sessionDB.hashRef*
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
v_MQTT_sessionDB.data := {};
v_MQTT_sessionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_sessionDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_cleanUp
//
// Purpose:
// Cleans up the reserved resources of the *v_MQTT_sessionDB* <MQTT_Session_DB> database
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
v_MQTT_sessionDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_sessionDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_add
//
// Purpose:
// Adds a new element to the *v_MQTT_sessionDB* <MQTT_Session_DB> database
//
// Parameters:
// p_session - *in* <MQTT_Session> - the element to be added
//
// Returns:
// *integer* - the index of the added element in the database
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_add(in MQTT_Session p_session)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_sessionDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_sessionDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding session ", v_idx, " ", p_session));
v_MQTT_sessionDB.data[v_idx] := p_session;
return v_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_setKey
//
// Purpose:
// Sets the hash of the local socket address of a session by the session index
//
// Parameters:
// p_idx - *in* *integer* - the session index
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_setKey(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
if (f_EPTF_MQTT_sessionDB_get(p_idx, v_MQTT_session))
{
var Socket v_addr;
f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_session.localAddrIdx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "setting key for sock: ",f_EPTF_MQTT_sessionDB_addrHash(v_addr)," idx: ",p_idx));
f_EPTF_str2int_HashMap_Insert(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr), p_idx);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_lookUp
//
// Purpose:
// Gets the index of a session in *v_MQTT_sessionDB* <MQTT_Session_DB> database by its socket address
//
// Parameters:
// p_sock - *in* <Socket> - the socket address to look up
//
// Returns:
// *integer* - the index of the added element in the database, or -1 if not found
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_lookUp(in Socket p_sock)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(p_sock), vl_idx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up sock: ",f_EPTF_MQTT_sessionDB_addrHash(p_sock)," idx: ",vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_get
//
// Purpose:
// Retrieves a session's data from the *v_MQTT_sessionDB* <MQTT_Session_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be retrieved
// p_session - *inout* <MQTT_Session> - the retrieved session context
//
// Returns:
// *boolean* - true if OK, false if no session element with this index
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_get(in integer p_idx, inout MQTT_Session p_session)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0)
{
p_session := v_MQTT_sessionDB.data[p_idx];
return true;
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get session with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_check
//
// Purpose:
// Checks if a session element exists in the *v_MQTT_sessionDB* <MQTT_Session_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be checked
//
// Returns:
// boolean - true if the session exists
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_sessionDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_remove
//
// Purpose:
// Removes an element from the *v_MQTT_sessionDB* <MQTT_Session_DB> database and releases its resources
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be removed
//
// Related Type:
// <MQTT_Session_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "removing session with idx: ", p_idx));
var Socket v_addr;
f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_sessionDB.data[p_idx].localAddrIdx);
f_EPTF_str2int_HashMap_Erase(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr));
v_MQTT_sessionDB.data[p_idx] := c_MQTT_Session_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_sessionDB.queue);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_sessionDB_addrHash
//
// Purpose:
// Converts a socket address in <Socket> type format to the string format "<IP address>:<port number>" to be used as a hash key
//
// Parameters:
// p_sock - *in* <Socket> - socket address
//
// Returns:
// charstring - socket address in string format
//
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_sessionDB_addrHash(in Socket p_sock)
return charstring
{
return p_sock.hostName&":"&int2str(p_sock.portNumber);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_init
//
// Purpose:
// Initializes the *v_MQTT_publishDB* <MQTT_Publish_DB> database and creates its hashmap
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
v_MQTT_publishDB.data := {};
v_MQTT_publishDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_publishDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_cleanUp
//
// Purpose:
// Cleans up the reserved resources of the *v_MQTT_publishDB* <MQTT_Publish_DB> database
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
v_MQTT_publishDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_publishDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_add
//
// Purpose:
// Adds a new element to the *v_MQTT_publishDB* <MQTT_Publish_DB> database
//
// Parameters:
// p_pub - *in* <MQTT_Publish> - the element to be added
//
// Returns:
// *integer* - the index of the added element in the database
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_add(in MQTT_Publish p_pub)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_publishDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_publishDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding publish ", v_idx, " ", p_pub));
f_EPTF_str2int_HashMap_Insert(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(p_pub.sessionIdx, p_pub.packetId),
v_idx
);
v_MQTT_publishDB.data[v_idx] := p_pub;
return v_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_lookUp
//
// Purpose:
// Gets the index of an <MQTT_Publish> element in *v_MQTT_publishDB* <MQTT_Publish_DB> database by its session and packet id-s
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session id
// p_packetId - *in* *integer* - input packet id
//
// Returns:
// *integer* - the index of the searched element in the database, or -1 if not found
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_lookUp(in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(p_sessionIdx, p_packetId),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up publish idx: ",vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_get
//
// Purpose:
// Retrieves an element from the *v_MQTT_publishDB* <MQTT_Publish_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be retrieved
// p_pub - *inout* <MQTT_Publish> - the retrieved element
//
// Returns:
// boolean - true: success, false: no element with the index p_idx
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_get(in integer p_idx, inout MQTT_Publish p_pub)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0)
{
p_pub := v_MQTT_publishDB.data[p_idx];
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get publish with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_check
//
// Purpose:
// Checks if an element exists in the *v_MQTT_publishDB* <MQTT_Publish_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be checked
//
// Returns:
// boolean - true: element present, false: element doesn't exists
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_publishDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid publish with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_remove
//
// Purpose:
// Removes an element from the *v_MQTT_publishDB* <MQTT_Publish_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be Removed
//
// Related Type:
// <MQTT_Publish_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));
if (f_EPTF_MQTT_publishDB_get(p_idx, v_MQTT_publish))
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(
v_MQTT_publish.sessionIdx,
v_MQTT_publish.packetId)
);
v_MQTT_publishDB.data[p_idx] := c_MQTT_Publish_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_publishDB.queue);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_publishDB_packetIdHash
//
// Purpose:
// Converts a pair of session ID & packet ID to the string format "session_<sessionId>:id_<packetId>" to be used as a hash key
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session ID
// p_packetId - *in* *integer* - input packet ID
//
// Returns:
// charstring - converted IDs
//
// Related Type:
// <MQTT_Publish_DB>
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_publishDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_init
//
// Purpose:
// Initializes the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
v_MQTT_subscriptionDB.data := {};
v_MQTT_subscriptionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_subscriptionDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_cleanUp
//
// Purpose:
// Cleans up the reserved resources of the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
v_MQTT_subscriptionDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_subscriptionDB_Hash");
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_add
//
// Purpose:
// Adds a new element to the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Parameters:
// p_sub - *in* <MQTT_Subscription> - the element to be added
//
// Returns:
// *integer* - the index of the added element in the database
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_add(in MQTT_Subscription p_sub)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_subscriptionDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_subscriptionDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding subscription ", v_idx, " ", p_sub));
if (ispresent(p_sub.request))
{
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sub.sessionIdx, p_sub.request.packet_identifier),
v_idx
);
// TODO: Only the first topic name will be used!
// TODO: It is not checked if there is already a subscription for this!
if (sizeof(p_sub.request.payload)>0)
{
var universal charstring vl_topicName := p_sub.request.payload[0].topic_filter;
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(p_sub.sessionIdx, f_unichar2charstr(vl_topicName)),
v_idx
);
}
}
v_MQTT_subscriptionDB.data[v_idx] := p_sub;
return v_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_setKey_packetId
//
// Purpose:
// Insert an integer element to the subscription hashmap, key is composed from session ID and packet ID
//
// Parameters:
// p_idx - *in* *integer* - data to be inserted
// p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
// p_packetId - *in* *integer* - input packet ID, used in hashmap key
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_setKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
p_idx
);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_removeKey_packetId
//
// Removes the element from the subscription hashmap identified by its session ID and packet ID
//
// Parameters:
// p_idx - *in* *integer* - NOT USED
// p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
// p_packetId - *in* *integer* - input packet ID, used in hashmap key
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_removeKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId)
);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_lookUp_packetId
//
// Purpose:
// Gets the index of an <MQTT_Subscription> element in *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
// by a session ID and packet ID
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
// p_packetId - *in* *integer* - input packet ID, used in hashmap key
//
// Returns:
// *integer* - the index of the searched element in the database, or -1 if not found
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_lookUp_packetId(in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_lookUp_topicName
//
// Purpose:
// Gets the index of an <MQTT_Subscription> element in *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
// by a session ID and packet ID
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
// p_topicName - *in* *charstring* - input topic, used in hashmap key
//
// Returns:
// *integer* - the index of the searched element in the database, or -1 if not found
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_lookUp_topicName(in integer p_sessionIdx, in charstring p_topicName)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(p_sessionIdx, p_topicName),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
return vl_idx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_get
//
// Purpose:
// Retrieves an element from the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be retrieved
// p_sub - *inout* <MQTT_Subscription> - the retrieved element
//
// Returns:
// *boolean* - true: success, false: element with this index doesn't exist
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_get(in integer p_idx, inout MQTT_Subscription p_sub)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0)
{
p_sub := v_MQTT_subscriptionDB.data[p_idx];
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get subscription with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_check
//
// Purpose:
// Checks if an element at an index exists the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be checked
//
// Returns:
// *boolean* - true: success, false: element at this index is not present
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_subscriptionDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
return false;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_remove
//
// Purpose:
// Removes an element from the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
//
// Parameters:
// p_idx - *in* *integer* - the index of the element to be removed
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));
if (f_EPTF_MQTT_subscriptionDB_get(p_idx, v_MQTT_subscription))
{
if (ispresent(v_MQTT_subscription.request))
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(
v_MQTT_subscription.sessionIdx,
v_MQTT_subscription.request.packet_identifier)
);
}
if (sizeof(v_MQTT_subscription.request.payload)>0)
{
var universal charstring vl_topicName := v_MQTT_subscription.request.payload[0].topic_filter;
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(v_MQTT_subscription.sessionIdx, f_unichar2charstr(vl_topicName))
);
}
v_MQTT_subscriptionDB.data[p_idx] := c_MQTT_Subscription_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_subscriptionDB.queue);
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_packetIdHash
//
// Purpose:
// Converts a pair of session ID & packet ID to the string format "session_<sessionId>:id_<packetId>" to be used as hash key
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session ID
// p_packetId - *in* *integer* - input packet ID
//
// Returns:
// charstring - converted IDs
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscriptionDB_topicHash
//
// Purpose:
// Converts a pair of session ID & topic to the string format "session_<sessionId>:topic_<topic>" to be used as hash key
//
// Parameters:
// p_sessionIdx - *in* *integer* - input session ID
// p_topic - *in* *charstring* - input topic string
//
// Returns:
// charstring - converted IDs
//
// Related Type:
// <MQTT_Subscription_DB>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscriptionDB_topicHash(in integer p_sessionIdx, in charstring p_topic)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"topic_"&p_topic;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_stack_fromApp
//
// Purpose:
// This is the main entry point for the MQTT stack realization of the <EPTF_MQTT_LGen_CT>
// component that handles messages received from the application layer (e.g. FSMs)
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - message that enters into the stack (will be modified by the stack)
// p_ctx - *in* <MQTT_StepCtx> - pointers for the instances related to a particular simulated entity
//
// Related Types:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_stack_fromApp(inout EPTF_MQTT_PDU p_msg, in MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
{
if (p_ctx.sessionIdx >=0 and p_ctx.sessionIdx < sizeof(v_MQTT_sessionDB.data) )
{
f_EPTF_MQTT_session_fromApp(p_msg, p_ctx.sessionIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " sessionIdx is not valid [",p_ctx.sessionIdx,"]. Dropping message"));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_stack_fromEnv
//
// Purpose:
// This is the main entry point for the MQTT stack realization of the <EPTF_MQTT_LGen_CT>
// component that handles messages received from the environment layer (e.g. transport layer)
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - message that enters into the stack (will be modified by the stack)
//
// Related Types:
// <EPTF_MQTT_LGen_CT>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_stack_fromEnv(inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
var integer vl_sIdx := f_EPTF_MQTT_sessionDB_lookUp(p_msg.transportParams.localAddress);
if (vl_sIdx >= 0)
{
// In case it's a publish response, there should be an ongoing publish transaction
if (f_EPTF_MQTT_publishResponseType(p_msg.pdu))
{
var integer vl_packetId := f_EPTF_MQTT_publishResponsePacketId(p_msg.pdu);
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_lookUp(vl_sIdx, vl_packetId);
if (vl_pubIdx >= 0)
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish transaction found for incoming message: ", vl_pubIdx));
f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no ongoing publish transaction found for incoming message (ignoring): ", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session found for incoming message: ",vl_sIdx));
f_EPTF_MQTT_session_fromEnv(p_msg, vl_sIdx);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no session found for incoming message (ignoring): ", p_msg));
}
}
/*****************************************************************
@startuml EPTF_MQTT_LGen_Functions.MQTT_Session.jpg
[*] --> DISCONNECTED
DISCONNECTED --> CONNECTING: appIn: connect_msg
CONNECTING --> CONNECTED: envIn: connack_accepted
CONNECTING --> DISCONNECTED: envIn: connack_refused
CONNECTED --> DISCONNECTED: appIn: disconnect
CONNECTED --> CONNECTED: appIn: publish
CONNECTED --> CONNECTED: envIn: publish
CONNECTED --> CONNECTED: appIn: subscribe
CONNECTED --> CONNECTED: appIn: unsubscribe
CONNECTED --> CONNECTED: envIn: suback
CONNECTED --> CONNECTED: envIn: unsuback
CONNECTED --> CONNECTED: envIn: pingresp
CONNECTED --> CONNECTED: T_keepalive
@enduml
******************************************************************/
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_fromApp
//
// Purpose:
// This is the entry point for an MQTT session fsm handling events coming from the application layer (e.g. client/broker FSMs)
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - next transport message to be sent
// p_sIdx - *in* *integer* - session index
//
// Related Types:
// <MQTT_Session>
//
// FSM Diagram of a MQTT session:
// (see EPTF_MQTT_LGen_Functions.MQTT_Session.jpg)
//
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_fromApp(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
runs on EPTF_MQTT_LGen_CT
{
if (p_sIdx < 0 or p_sIdx >= sizeof(v_MQTT_sessionDB.data[p_sIdx]))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid session index: ", p_sIdx));
return;
}
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",p_sIdx,":",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));
f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);
// State: DISCONNECTED
if (v_MQTT_sessionDB.data[p_sIdx].state == DISCONNECTED)
{
// appIn: connect
if (ischosen(p_msg.pdu.connect_msg))
{
// At this point, the local address should be filled in correctly
f_EPTF_MQTT_sessionDB_setKey(p_sIdx);
//p_msg.pdu.connect_msg.payload.client_identifier.stringItem := v_MQTT_sessionDB.data[p_sIdx].clientId;
//p_msg.pdu.connect_msg.payload.client_identifier.stringLength := lengthof(v_MQTT_sessionDB.data[p_sIdx].clientId);
p_msg.pdu.connect_msg.payload.client_identifier := v_MQTT_sessionDB.data[p_sIdx].clientId;
v_MQTT_sessionDB.data[p_sIdx].keepAliveTime := int2float(p_msg.pdu.connect_msg.keep_alive-1);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTING);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: CONNECTED
else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
{
// appIn: disconnect
if (ischosen(p_msg.pdu.disconnect_msg))
{
// cancel T keepalive
f_EPTF_MQTT_session_cancelT_keepalive(p_sIdx);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
}
if (ischosen(p_msg.pdu.publish))
{
if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
{
p_msg.pdu.publish.packet_identifier := omit; //f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
}
else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
{
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// create new Publish QoS1 Orig
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := ORIG;
vl_publish.state.qos1 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
}
else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
{
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// create new Publish QoS2 Orig
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := ORIG;
vl_publish.state.qos2 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
}
}
if (ischosen(p_msg.pdu.subscribe))
{
// Fill in message
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// Create new Subscription
var MQTT_Subscription vl_sub := c_MQTT_Subscription_init;
vl_sub.sessionIdx := p_sIdx;
vl_sub.request := p_msg.pdu.subscribe;
vl_sub.state := UNSUBSCRIBED;
var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_add(vl_sub);
f_EPTF_MQTT_session_registerSubscription(p_sIdx, vl_subIdx);
// subscription.send
f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
}
if (ischosen(p_msg.pdu.unsubscribe))
{
// Fill in message
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.unsubscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// Look up corresponding subscription
//if (sizeof(p_msg.pdu.unsubscribe.payload[0])>0)
if (sizeof(p_msg.pdu.unsubscribe.payload.topic_filter)>0)
{
//var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload[0].topic_filter);
var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload.topic_filter[0]);
var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_lookUp_topicName(p_sIdx, vl_topicName);
// subOut.send
if (f_EPTF_MQTT_subscriptionDB_check(vl_subIdx)) {
f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
}
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_fromEnv
//
// Purpose:
// This is the entry point for an MQTT session fsm from the environment layer (e.g. transport layer)
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - transport message received
// p_sIdx - *in* *integer* - session index
//
// Related Types:
// <MQTT_Session>
//
// FSM Diagram of a MQTT session:
// (see EPTF_MQTT_LGen_Functions.MQTT_Session.jpg)
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
runs on EPTF_MQTT_LGen_CT
{
if (p_sIdx < 0 or p_sIdx >= sizeof(v_MQTT_sessionDB.data[p_sIdx]))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid session index: ", p_sIdx));
return;
}
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));
f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);
// State: CONNECTING
if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTING)
{
// envIn: connack
if (ischosen(p_msg.pdu.connack))
{
if (p_msg.pdu.connack.connect_return_code == 0)
{
// startT keepalive
f_EPTF_MQTT_session_startT_keepalive(p_sIdx, v_MQTT_sessionDB.data[p_sIdx].keepAliveTime);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_CONNACK_Accepted,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else
{
f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_CONNACK_Refused,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: CONNECTED
else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
{
// envIn: suback
if (ischosen(p_msg.pdu.suback))
{
var integer vl_subIdx :=
f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.suback.packet_identifier);
f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
}
// envIn: unsuback
else if (ischosen(p_msg.pdu.unsuback))
{
var integer vl_subIdx :=
f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.unsuback.packet_identifier);
f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
}
// envIn: publish
else if (ischosen(p_msg.pdu.publish))
{
if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
{
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
{
// Send PUBACK
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBACK(p_msg.pdu.publish.packet_identifier));
f_EPTF_MQTT_session_send(p_sIdx, v_MQTT_msgToSend);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
{
// create new Publish QoS2 TERM
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := TERM;
vl_publish.state.qos2 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
}
}
// envIn: ping response
else if (ischosen(p_msg.pdu.pingresp))
{
if (v_MQTT_sessionDB.data[p_sIdx].reportPingResponse)
{
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PING_Response,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_keepalive
//
// Purpose:
// Handles the T_keepalive timer event in the <MQTT_Session> FSM
//
// Parameters:
// pl_action - *inout* <EPTF_ScheduledAction> - the scheduled action <>
// pl_eventIndex - *in* *integer* - eveny index in the scheduler
//
// Returns:
// *boolean* - true <always>
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_keepalive(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
runs on EPTF_MQTT_LGen_CT
return boolean
{
var integer vl_sIdx := pl_action.actionId[0];
v_MQTT_sessionDB.data[vl_sIdx].keepaliveTimer := -1;
if (v_MQTT_sessionDB.data[vl_sIdx].state == CONNECTED)
{
// Send ping request
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_pingReq);
f_EPTF_MQTT_session_send(vl_sIdx, v_MQTT_msgToSend);
// startT keepAlive
f_EPTF_MQTT_session_startT_keepalive(vl_sIdx, v_MQTT_sessionDB.data[vl_sIdx].keepAliveTime);
f_EPTF_MQTT_session_setState(vl_sIdx, CONNECTED);
}
return true;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_startT_keepalive
//
// Purpose:
// Starts the T_keepalive timer for an <MQTT_Session> FSM
//
// Parameters:
// pl_sIdx - *in* *integer* - session index (?)
// pl_time - *in* *float* - value of the keepalive timer
//
// Returns:
// *boolean* - true: succesful , false: 0 or negative timer value
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_startT_keepalive(in integer pl_sIdx, in float pl_time)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (pl_time <= 0.0) { return false; }
var boolean retval;
var EPTF_ActionId vl_actionId;
vl_actionId[0] := pl_sIdx;
retval := f_EPTF_SchedulerComp_scheduleAction(
f_EPTF_SchedulerComp_snapshotTime() + pl_time,
refers(f_EPTF_MQTT_session_keepalive),
vl_actionId,
v_MQTT_sessionDB.data[pl_sIdx].keepaliveTimer
);
return retval;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_cancelT_keepalive
//
// Purpose:
// Cancels the T_keepalive timer of an <MQTT_Session> FSM
//
// Parameters:
// pl_sessionIdx - *in* *integer* - session index
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_cancelT_keepalive(in integer pl_sessionIdx)
runs on EPTF_MQTT_LGen_CT
{
if (v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer >= 0)
{
if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
}
v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer := -1;
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_getNextPacketId
//
// Purpose:
// Sets the value of the packet id field in the next message to be sent in a session
//
// Parameters:
// p_ctx - *in* <MQTT_StepCtx> - Pointer of the context embedding the session id.
//
// Returns:
// *integer* - packet id for the next message
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_getNextPacketId(in MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
return integer
{
if (v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId < 65535)
{
v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId + 1;
}
else
{
v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := 0;
}
return v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_setState
//
// Purpose:
// Sets a new state for an <MQTT_Session> FSM
//
// Parameters:
// p_sessionIdx - *in* *integer* - index of session
// p_nextState - *in* <MQTT_Session_State> - new state of the state machine
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_setState(in integer p_sessionIdx, in MQTT_Session_State p_nextState)
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_sessionDB.data[p_sessionIdx].state := p_nextState;
f_EPTF_MQTT_Logging_VERBOSE(log2str("session [", p_sessionIdx,":",v_MQTT_sessionDB.data[p_sessionIdx].clientId,"] next state: ", v_MQTT_sessionDB.data[p_sessionIdx].state));
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_send
//
// Purpose:
// Sends a message to the transort layer with type <EPTF_MQTT_PDU> and the provided session index
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
// p_msg - *intout* <EPTF_MQTT_PDU> - the message to be sent
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_send(in integer p_sessionIdx, inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sessionIdx].remoteAddrIdx);
p_msg.transportParams.proto := {tcp := {}};
p_msg.sessionIdx := p_sessionIdx;
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_registerSubscription
//
// Purpose:
// Adds a subscription index to an <MQTT_Session>
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
// p_subIdx - *in* *integer* - subscription index to add
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_registerSubscription(in integer p_sessionIdx, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
// Note, we don't check if it is already there
v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs)]
:= p_subIdx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_deregisterSubscription
//
// Purpose:
// Removes a subscription index from <MQTT_Session>
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
// p_subIdx - *in* *integer* - subscription index to remove
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_deregisterSubscription(in integer p_sessionIdx, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
// This is slow, it's only acceptable if the assumption that a session has only a small number
// of subscriptions
var EPTF_IntegerList vl_new := {};
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
{
if (v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i] != p_subIdx) {
vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i];
}
}
v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs := vl_new;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_registerPublish
//
// Purpose:
// Adds a publish index to an <MQTT_Session>
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
// p_subIdx - *in* *integer* - publish index to add
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_registerPublish(in integer p_sessionIdx, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
// Note, we don't check if it is already there
v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs)]
:= p_pubIdx;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_deregisterPublish
//
// Purpose:
// Removes a publish index from an <MQTT_Session>
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
// p_subIdx - *in* *integer* - publish index to add
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_deregisterPublish(in integer p_sessionIdx, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
// This is slow, it's only acceptable if the assumption that a session has only a small number
// of concurrent publish(s)
var EPTF_IntegerList vl_new := {};
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs); i:=i+1)
{
if (v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i] != p_pubIdx) {
vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i];
}
}
v_MQTT_sessionDB.data[p_sessionIdx].publishRefs := vl_new;
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_session_remove
//
// Purpose:
// Releases all resources related to an <MQTT_Session> and removes it from the <MQTT_Session_DB>
//
// Parameters:
// p_sessionIdx - *in* *integer* - session index
//
// Related Types:
// <MQTT_Session>
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_session_remove(in integer p_sessionIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_session_setState(p_sessionIdx, REMOVING);
f_EPTF_MQTT_session_cancelT_keepalive(p_sessionIdx);
// Remove subscriptions, remove publications
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
{
f_EPTF_MQTT_subscription_remove(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i]);
}
// Derefer FSM pointers
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(
v_MQTT_sessionDB.data[p_sessionIdx].eIdx,
v_MQTT_sessionDB.data[p_sessionIdx].fsmIdx,
v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1
);
// Clean up DB
f_EPTF_MQTT_sessionDB_remove(p_sessionIdx);
}
/*****************************************************************
@startuml EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg
[*] --> UNSUBSCRIBED
UNSUBSCRIBED --> SUBSCRIBING: sessionIn: subscribe
SUBSCRIBED --> UNSUBSCRIBING: sessionIn: unsubscribe
SUBSCRIBING --> SUBSCRIBED: envIn: suback_accepted
SUBSCRIBING --> UNSUBSCRIBED: envIn: suback_refused
UNSUBSCRIBING --> UNSUBSCRIBED: envIn: unsuback
@enduml
******************************************************************/
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscription_fromSession
//
// Purpose:
// Implements part of the <MQTT_Subscription> FSM that handles the events coming from the <MQTT_Session>
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - message that
// p_subIdx - *in* *integer* - subscription index
//
// Related Types:
// <MQTT_Subscription>
//
// FSM Diagram of a MQTT subscription:
// (see EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg)
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscription_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
if (p_subIdx < 0 or p_subIdx >= sizeof(v_MQTT_subscriptionDB.data[p_subIdx]))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid subscription index: ", p_subIdx));
return;
}
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));
// State: UNSUBSCRIBED
if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBED)
{
// sessionIn: subscribe
if (ischosen(p_msg.pdu.subscribe))
{
f_EPTF_MQTT_subscription_startT_watchdog(p_subIdx, tsp_EPTF_MQTT_SUBSCRIBE_responseWatchdog);
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBING);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: SUBSCRIBED
else if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBED)
{
// sessionIn: unsubscribe
if (ischosen(p_msg.pdu.unsubscribe))
{
f_EPTF_MQTT_subscription_startT_watchdog(p_subIdx, tsp_EPTF_MQTT_SUBSCRIBE_responseWatchdog);
f_EPTF_MQTT_subscriptionDB_setKey_packetId(
p_subIdx, v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_msg.pdu.unsubscribe.packet_identifier
);
f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBING);
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
}
}
///////////////////////////////////////////////////////////
// Function: f_EPTF_MQTT_subscription_fromEnv
//
// Purpose:
// Implements part of the <MQTT_Subscription> FSM that handles the events coming from the environment
//
// Parameters:
// p_msg - *inout* <EPTF_MQTT_PDU> - received transport message
// p_subIdx - *in* *integer* - subscription index
//
// Related Types:
// <MQTT_Subscription>
//
// FSM Diagram of a MQTT subscription:
// (see EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg)
///////////////////////////////////////////////////////////
function f_EPTF_MQTT_subscription_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
if (p_subIdx < 0 or p_subIdx >= sizeof(v_MQTT_subscriptionDB.data[p_subIdx]))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid subscription index: ", p_subIdx));
return;
}
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));
// State: SUBSCRIBING
if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBING)
{
// envIn: suback
if (ischosen(p_msg.pdu.suback) and sizeof(p_msg.pdu.suback.payload.return_code)>0)
{
f_EPTF_MQTT_subscription_cancelT_watchdog(p_subIdx);
if (
p_msg.pdu.suback.payload.return_code[0] == 0 or
p_msg.pdu.suback.payload.return_code[0] == 1 or
p_msg.pdu.suback.payload.return_code[0] == 2
)
{
f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_SUBACK_Accepted,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
{}
);
}
else
{
f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_SUBACK_Refused,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
{}
);
// Removing subscription
f_EPTF_MQTT_subscription_remove(p_subIdx);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: UNSUBSCRIBING
else if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBING)
{
// envIn: unsuback
if (ischosen(p_msg.pdu.unsuback))
{
f_EPTF_MQTT_subscription_cancelT_watchdog(p_subIdx);