blob: 8c71c9ac992626c0a0734630c52d52bf719ec7e1 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2018 Ericsson Telecom AB
//
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v2.0
// which accompanies this distribution, and is available at
// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html
///////////////////////////////////////////////////////////////////////////////
// File: EPTF_MQTT_LocalTransport_Definitions.ttcn
// Description:
// Rev: R1A
// Prodnr: CNL 113 860
// Updated: 2017-09-01
// Contact: http://ttcn.ericsson.se
///////////////////////////////////////////////////////////////////////////////
module EPTF_MQTT_LGen_Functions {
import from EPTF_MQTT_LGen_Definitions all;
import from EPTF_MQTT_Transport_Definitions all;
import from EPTF_CLL_Base_Functions all;
import from EPTF_CLL_Common_Definitions all;
import from EPTF_CLL_Variable_Definitions all;
import from EPTF_CLL_Variable_Functions all;
import from EPTF_CLL_LGenBase_Definitions all;
import from EPTF_CLL_LGenBase_ConfigFunctions all;
import from EPTF_CLL_LGenBase_Functions all;
import from EPTF_CLL_LGenBase_EventHandlingFunctions all;
import from EPTF_CLL_Logging_Definitions all;
import from EPTF_CLL_Logging_Functions all;
import from EPTF_CLL_FBQ_Functions all;
import from EPTF_CLL_HashMapStr2Int_Functions all;
import from EPTF_CLL_Scheduler_Definitions all;
import from EPTF_CLL_RBTScheduler_Functions all;
import from TCCConversion_Functions all;
import from MQTT_v3_1_1_Types all;
import from IPL4asp_Types all;
function f_EPTF_MQTT_LGen_init(in charstring pl_name)
runs on EPTF_MQTT_LGen_CT
{
if (v_MQTT_initialized){return;}
f_EPTF_LGenBase_init(pl_name, 0, pl_name);
f_EPTF_Logging_init_CT(pl_name);
f_EPTF_str2int_HashMap_Init();
v_MQTT_bIdx := f_EPTF_LGenBase_declareBehaviorType(
c_MQTT_behaviorType,
tsp_EPTF_MQTT_LGen_maxBindableCtx,
refers(f_MQTT_eCtxReset),
refers(f_MQTT_eCtxBind),
refers(f_MQTT_eCtxUnbind)
);
v_MQTT_loggingMaskId :=
f_EPTF_Logging_registerComponentMasks(
"MQTT_Logging",
{"WARNING", "DEBUG", "DEBUGV", "ERROR" },
EPTF_Logging_CLL);
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": my behavior idx is ", v_MQTT_bIdx));
f_EPTF_MQTT_addressDB_init();
f_EPTF_MQTT_templateDB_init();
f_EPTF_MQTT_sessionDB_init();
f_EPTF_MQTT_subscriptionDB_init();
f_EPTF_MQTT_publishDB_init();
f_EPTF_MQTT_declareSteps();
f_EPTF_MQTT_declareEvents();
f_EPTF_Base_registerCleanup(refers(f_MQTT_cleanUp));
v_MQTT_initialized := true;
}
function f_EPTF_MQTT_LGen_initLogging()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_Logging_init_CT("MQTT_LGen");
v_MQTT_loggingMaskId :=
f_EPTF_Logging_registerComponentMasks(
"MQTT_LGen_Logging",
{"WARNING", "DEBUG", "DEBUGV", "ERROR"},
EPTF_Logging_CLL
);
if(tsp_EPTF_MQTT_LGen_debug){
f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
}
else {
f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
}
if(tsp_EPTF_MQTT_LGen_debugVerbose) {
f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
}
else {
f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
}
}
function f_MQTT_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_addressDB_cleanUp();
f_EPTF_MQTT_templateDB_cleanUp();
f_EPTF_MQTT_sessionDB_cleanUp();
f_EPTF_MQTT_subscriptionDB_cleanUp();
f_EPTF_MQTT_publishDB_cleanUp();
vf_MQTT_msgReceived := null;
v_MQTT_initialized := false;
}
function f_MQTT_eCtxBind(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
return EPTF_IntegerList
{
return {pl_eIdx};
}
function f_MQTT_eCtxUnbind(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
{
if (not v_MQTT_initialized) {return;}
}
function f_MQTT_eCtxReset(in integer pl_eIdx)
runs on EPTF_MQTT_LGen_CT
{
}
function f_EPTF_MQTT_declareEvents()
runs on EPTF_MQTT_LGen_CT
{
var integer vl_dummy;
if (
c_MQTT_eventIdx_transportSucc != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportSucc) or
c_MQTT_eventIdx_transportFail != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportFail) or
c_MQTT_eventIdx_transportClosed != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportClosed) or
c_MQTT_eventIdx_CONNACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Accepted) or
c_MQTT_eventIdx_CONNACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Refused) or
c_MQTT_eventIdx_SUBACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Accepted) or
c_MQTT_eventIdx_SUBACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Refused) or
c_MQTT_eventIdx_UNSUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_UNSUBACK) or
c_MQTT_eventIdx_PUBLISH != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH) or
c_MQTT_eventIdx_PING_Request != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Request) or
c_MQTT_eventIdx_PING_Response != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Response) or
c_MQTT_eventIdx_PUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBACK) or
c_MQTT_eventIdx_PUBREC != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREC) or
c_MQTT_eventIdx_PUBREL != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREL) or
c_MQTT_eventIdx_PUBCOMP != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBCOMP) or
c_MQTT_eventIdx_PUBLISH_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH_Timeout)
)
{
f_EPTF_LGenBase_log();
log("error"); mtc.stop
}
}
function f_EPTF_MQTT_declareSteps()
runs on EPTF_MQTT_LGen_CT
{
if (
c_MQTT_stepIdx_init != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_init, refers(f_MQTT_step_init)}) or
c_MQTT_stepIdx_cleanUp != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_cleanUp, refers(f_MQTT_step_cleanUp) }) or
c_MQTT_stepIdx_setLocalAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress_byVars, refers(f_MQTT_step_setLocalAddress_byVars)}) or
c_MQTT_stepIdx_setRemoteAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress_byVars, refers(f_MQTT_step_setRemoteAddress_byVars)}) or
c_MQTT_stepIdx_transportConnect != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportConnect, refers(f_MQTT_step_transportConnect)}) or
c_MQTT_stepIdx_transportClose != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportClose, refers(f_MQTT_step_transportClose)}) or
c_MQTT_stepIdx_startListening != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_startListening, refers(f_MQTT_step_startListening)}) or
c_MQTT_stepIdx_loadTemplate_byIntIdx != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byIntIdx, refers(f_MQTT_step_loadTemplate_byIntIdx)}) or
c_MQTT_stepIdx_loadTemplate_byStringId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byStringId, refers(f_MQTT_step_loadTemplate_byStringId)}) or
c_MQTT_stepIdx_send != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_send, refers(f_MQTT_step_send)}) or
c_MQTT_stepIdx_setTopic_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_stringParam, refers(f_MQTT_step_setTopic_stringParam)}) or
c_MQTT_stepIdx_setTopic_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_stringParam, refers(f_MQTT_step_setTopic_add_stringParam)}) or
c_MQTT_stepIdx_setTopic_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_varParams, refers(f_MQTT_step_setTopic_add_varParams)}) or
c_MQTT_stepIdx_setTopic_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_clientId, refers(f_MQTT_step_setTopic_add_clientId)}) or
c_MQTT_stepIdx_setQos_intParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setQos_intParam, refers(f_MQTT_step_setQos_intParam)}) or
c_MQTT_stepIdx_setPublishMessage_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_stringParam, refers(f_MQTT_step_setPublishMessage_stringParam)}) or
c_MQTT_stepIdx_setPublishMessage_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_stringParam, refers(f_MQTT_step_setPublishMessage_add_stringParam)}) or
c_MQTT_stepIdx_setPublishMessage_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_varParams, refers(f_MQTT_step_setPublishMessage_add_varParams)}) or
c_MQTT_stepIdx_setPublishMessage_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_clientId, refers(f_MQTT_step_setPublishMessage_add_clientId)}) or
c_MQTT_stepIdx_reportPingResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPingResponse, refers(f_MQTT_step_reportPingResponse)}) or
c_MQTT_stepIdx_reportPublishResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPublishResponse, refers(f_MQTT_step_reportPublishResponse)})
)
{
f_EPTF_LGenBase_log();
log("EPTF_MQTT_LGen declaration error"); mtc.stop
}
}
function f_EPTF_MQTT_LGen_receiveMessage(in EPTF_MQTT_PDU pl_message)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", pl_message));
v_MQTT_msgToProcess := pl_message;
f_EPTF_MQTT_stack_fromEnv(v_MQTT_msgToProcess);
if (vf_MQTT_msgReceived != null)
{
vf_MQTT_msgReceived.apply(v_MQTT_msgToProcess);
}
}
function f_EPTF_MQTT_LGen_receiveEvent(in ASP_Event p_event)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", p_event));
// TODO: connection closed handling!
}
function f_EPTF_MQTT_LGen_transportApiResponse(in EPTF_MQTT_Transport_Response pl_rsp)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str("api response: ", pl_rsp));
if (f_EPTF_MQTT_sessionDB_get(pl_rsp.sessionIdx, v_MQTT_session))
{
if (pl_rsp.succ)
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportSucc, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
else
{
f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportFail, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
}
}
else
{
f_EPTF_MQTT_Logging_VERBOSE(log2str("session not found for api response: ", pl_rsp));
}
}
function f_EPTF_MQTT_LGen_send(inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
vf_EPTF_MQTT_Transport_send.apply(p_msg);
}
function f_MQTT_step_init(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
var integer vl_eIdx := pl_ptr.eIdx;
var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
var integer vl_newSessionIdx := -1;
if (f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_newSessionIdx)) { return }
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId, ": initializing fsm ", vl_fsmIdx, " for entity ",vl_eIdx));
var MQTT_Session vl_session := c_MQTT_Session_init;
vl_session.eIdx := vl_eIdx;
vl_session.fsmIdx := pl_ptr.refContext.fCtxIdx;
vl_newSessionIdx := f_EPTF_MQTT_sessionDB_add(vl_session);
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, vl_newSessionIdx);
}
function f_MQTT_step_cleanUp(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
var integer vl_eIdx:=pl_ptr.eIdx;
var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
var integer vl_sessionIdx := -1;
if (not f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_sessionIdx))
{
f_EPTF_MQTT_Logging_DEBUG(%definitionId&": FSM has not been initialized");
return;
}
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": FSM database: ",v_MQTT_sessionDB.data[vl_sessionIdx]));
f_EPTF_MQTT_session_remove(vl_sessionIdx);
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1);
}
// 1st param: remoteHost: charstring
// 2nd param: remotePort: integer
function f_MQTT_step_setLocalAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
if (sizeof(vl_varIds)==2)
{
var EPTF_Var_DirectContent vl_host, vl_port;
f_EPTF_Var_getContent(vl_varIds[0], vl_host);
f_EPTF_Var_getContent(vl_varIds[1], vl_port);
if (not ischosen(vl_host.charstringVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
return;
}
if (not ischosen(vl_port.intVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
return;
}
f_EPTF_MQTT_addressDB_add(
{
hostName := vl_host.charstringVal,
portNumber := vl_port.intVal
},
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx
);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
}
}
// 1st param: remoteHost: charstring
// 2nd param: remotePort: integer
function f_MQTT_step_setRemoteAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
if (sizeof(vl_varIds)==2)
{
var EPTF_Var_DirectContent vl_host, vl_port;
f_EPTF_Var_getContent(vl_varIds[0], vl_host);
f_EPTF_Var_getContent(vl_varIds[1], vl_port);
if (not ischosen(vl_host.charstringVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
return;
}
if (not ischosen(vl_port.intVal)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
return;
}
f_EPTF_MQTT_addressDB_add(
{
hostName := vl_host.charstringVal,
portNumber := vl_port.intVal
},
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx
);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
}
}
function f_MQTT_step_startListening(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_addressDB_get(vl_req.params.startListening.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
function f_MQTT_step_transportConnect(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
// 1st param int: expectResponse (optional)
function f_MQTT_step_transportClose(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_expectResponseInt := 1;
var boolean vl_expectResponse := true;
if (f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_expectResponseInt))
{
if (vl_expectResponseInt <= 0) { vl_expectResponse := false; }
}
if (v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx >= 0)
{
var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
vl_req.expectResponse := vl_expectResponse;
f_EPTF_MQTT_addressDB_get(vl_req.params.close.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
}
}
function f_MQTT_step_loadTemplate_byIntIdx(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_templateIdx := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_templateIdx);
f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
}
function f_MQTT_step_loadTemplate_byStringId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_templateId := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
var integer vl_templateIdx := f_EPTF_MQTT_templateDB_lookUp(vl_templateId);
if (vl_templateIdx >= 0)
{
f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find template with id: ", vl_templateId));
}
}
// 1st param: topic name charstring
function f_MQTT_step_setTopic_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter := f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name := f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// 1st param: topic name charstring
function f_MQTT_step_setTopic_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(vl_topic);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// params: variables
function f_MQTT_step_setTopic_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_strToAdd := "";
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
{
var EPTF_Var_DirectContent vl_var;
f_EPTF_Var_getContent(vl_varIds[i], vl_var);
if (ischosen(vl_var.charstringVal)) {
vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
}
else if (ischosen(vl_var.intVal)) {
vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
}
}
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(vl_strToAdd);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(vl_strToAdd);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
function f_MQTT_step_setTopic_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.topic_name :=
v_MQTT_msgToSend.pdu.publish.topic_name &
f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// 1st param: qos level (0,1,2) integer
function f_MQTT_step_setQos_intParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_qos := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_qos);
if (vl_qos >=0 and vl_qos < 3)
{
if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
v_MQTT_msgToSend.pdu.subscribe.payload[0].requested_qos := f_EPTF_MQTT_qos_int2enum(vl_qos);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
}
}
else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.header.qos_level := f_EPTF_MQTT_qos_int2enum(vl_qos);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid qos level: ", vl_qos));
}
}
// 1st param: publish message charstring
function f_MQTT_step_setPublishMessage_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload := char2oct(vl_msg);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// 1st param: publish message charstring
function f_MQTT_step_setPublishMessage_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(vl_msg);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// params: variables
function f_MQTT_step_setPublishMessage_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (not ischosen(v_MQTT_msgToSend.pdu.publish)) {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
return;
}
var charstring vl_strToAdd := "";
var EPTF_IntegerList vl_varIds := {};
f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);
for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
{
var EPTF_Var_DirectContent vl_var;
f_EPTF_Var_getContent(vl_varIds[i], vl_var);
if (ischosen(vl_var.charstringVal)) {
vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
}
else if (ischosen(vl_var.intVal)) {
vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
}
}
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(vl_strToAdd);
}
function f_MQTT_step_setPublishMessage_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
v_MQTT_msgToSend.pdu.publish.payload :=
v_MQTT_msgToSend.pdu.publish.payload &
char2oct(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
}
}
// 1st param: enable (1)/disable (0): integer
function f_MQTT_step_reportPingResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_enable := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);
if (vl_enable >=0 and vl_enable < 2)
{
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPingResponse := (vl_enable == 1);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
}
}
// 1st param: enable (1)/disable (0): integer
function f_MQTT_step_reportPublishResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
var integer vl_enable := -1;
f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);
if (vl_enable >=0 and vl_enable < 2)
{
v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPublishResponse := (vl_enable == 1);
}
else {
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
}
}
function f_MQTT_step_send(in EPTF_LGenBase_TestStepArgs pl_ptr)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," ",pl_ptr));
if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }
f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
v_MQTT_msgToSend.transportParams.proto := {tcp := {}};
v_MQTT_msgToSend.sessionIdx := v_MQTT_ctx.sessionIdx;
f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," msg to send: ",v_MQTT_msgToSend));
f_EPTF_MQTT_stack_fromApp(v_MQTT_msgToSend, v_MQTT_ctx);
f_EPTF_SchedulerComp_refreshSnapshotTime();
}
function f_EPTF_MQTT_addressDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
v_MQTT_addressDB.data := {};
v_MQTT_addressDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_addressDB_Hash");
}
function f_EPTF_MQTT_addressDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
v_MQTT_addressDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_addressDB_Hash");
}
function f_EPTF_MQTT_addressDB_add(in Socket p_addr, inout integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
p_idx := f_EPTF_MQTT_addressDB_lookUp(p_addr);
if (p_idx == -1)
{
p_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_addressDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_addressDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding target address ", p_idx, " ", p_addr));
f_EPTF_str2int_HashMap_Insert(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_addr), p_idx);
v_MQTT_addressDB.data[p_idx] := p_addr;
}
}
function f_EPTF_MQTT_addressDB_get(inout Socket p_addr, in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
if (p_idx < sizeof(v_MQTT_addressDB.data) and p_idx >=0)
{
p_addr := v_MQTT_addressDB.data[p_idx];
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(": "," couldn't get address at ", p_idx));
}
}
function f_EPTF_MQTT_addressDB_lookUp(in Socket p_sock)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_sock), vl_idx);
return vl_idx;
}
function f_EPTF_MQTT_addressDB_Socket2String(Socket p_sock)
return charstring
{
return p_sock.hostName&":"&int2str(p_sock.portNumber);
}
function f_EPTF_MQTT_templateDB_init()
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_templateDB.data := {};
v_MQTT_templateDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_templateDB_Hash");
for (var integer i:=0; i<sizeof(tsp_EPTF_MQTT_LGen_templates); i:=i+1) {
f_EPTF_MQTT_templateDB_add(tsp_EPTF_MQTT_LGen_templates[i]);
}
}
function f_EPTF_MQTT_templateDB_add(in MQTT_Template p_template)
runs on EPTF_MQTT_LGen_CT
return integer
{
if (f_EPTF_MQTT_templateDB_lookUp(p_template.id)!=-1)
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " template is already added with id: ", p_template.id));
return -1;
}
var integer v_idx := sizeof(v_MQTT_templateDB.data);
v_MQTT_templateDB.data[v_idx] := p_template;
f_EPTF_str2int_HashMap_Insert(v_MQTT_templateDB.hashRef, p_template.id, v_idx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template was added with id: ", p_template.id, " at idx: ",v_idx));
return v_idx;
}
function f_EPTF_MQTT_templateDB_lookUp(in charstring p_id)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
if (not f_EPTF_str2int_HashMap_Find(v_MQTT_templateDB.hashRef, p_id, vl_idx))
{
vl_idx := -1;
}
return vl_idx;
}
function f_EPTF_MQTT_templateDB_get(in integer p_idx, inout MQTT_v3_1_1_ReqResp p_pdu)
runs on EPTF_MQTT_LGen_CT
{
if (p_idx < sizeof(v_MQTT_templateDB.data) and p_idx >= 0)
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template is fetched with idx: ", p_idx));
p_pdu := v_MQTT_templateDB.data[p_idx].msg;
}
}
function f_EPTF_MQTT_templateDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_templateDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_templateDB_Hash");
}
function f_EPTF_MQTT_sessionDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
v_MQTT_sessionDB.data := {};
v_MQTT_sessionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_sessionDB_Hash");
}
function f_EPTF_MQTT_sessionDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
v_MQTT_sessionDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_sessionDB_Hash");
}
function f_EPTF_MQTT_sessionDB_add(in MQTT_Session p_session)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_sessionDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_sessionDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding session ", v_idx, " ", p_session));
v_MQTT_sessionDB.data[v_idx] := p_session;
return v_idx;
}
function f_EPTF_MQTT_sessionDB_setKey(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
if (f_EPTF_MQTT_sessionDB_get(p_idx, v_MQTT_session))
{
var Socket v_addr;
f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_session.localAddrIdx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "setting key for sock: ",f_EPTF_MQTT_sessionDB_addrHash(v_addr)," idx: ",p_idx));
f_EPTF_str2int_HashMap_Insert(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr), p_idx);
}
}
function f_EPTF_MQTT_sessionDB_lookUp(in Socket p_sock)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(p_sock), vl_idx);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up sock: ",f_EPTF_MQTT_sessionDB_addrHash(p_sock)," idx: ",vl_idx));
return vl_idx;
}
function f_EPTF_MQTT_sessionDB_get(in integer p_idx, inout MQTT_Session p_session)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0)
{
p_session := v_MQTT_sessionDB.data[p_idx];
return true;
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get session with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_sessionDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_sessionDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_sessionDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "removing session with idx: ", p_idx));
var Socket v_addr;
f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_sessionDB.data[p_idx].localAddrIdx);
f_EPTF_str2int_HashMap_Erase(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr));
v_MQTT_sessionDB.data[p_idx] := c_MQTT_Session_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_sessionDB.queue);
}
function f_EPTF_MQTT_sessionDB_addrHash(in Socket p_sock)
return charstring
{
return p_sock.hostName&":"&int2str(p_sock.portNumber);
}
function f_EPTF_MQTT_publishDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
v_MQTT_publishDB.data := {};
v_MQTT_publishDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_publishDB_Hash");
}
function f_EPTF_MQTT_publishDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
v_MQTT_publishDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_publishDB_Hash");
}
function f_EPTF_MQTT_publishDB_add(in MQTT_Publish p_pub)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_publishDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_publishDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding publish ", v_idx, " ", p_pub));
f_EPTF_str2int_HashMap_Insert(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(p_pub.sessionIdx, p_pub.packetId),
v_idx
);
v_MQTT_publishDB.data[v_idx] := p_pub;
return v_idx;
}
function f_EPTF_MQTT_publishDB_lookUp(in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(p_sessionIdx, p_packetId),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up publish idx: ",vl_idx));
return vl_idx;
}
function f_EPTF_MQTT_publishDB_get(in integer p_idx, inout MQTT_Publish p_pub)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0)
{
p_pub := v_MQTT_publishDB.data[p_idx];
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get publish with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_publishDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_publishDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid publish with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_publishDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));
if (f_EPTF_MQTT_publishDB_get(p_idx, v_MQTT_publish))
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_publishDB.hashRef,
f_EPTF_MQTT_publishDB_packetIdHash(
v_MQTT_publish.sessionIdx,
v_MQTT_publish.packetId)
);
v_MQTT_publishDB.data[p_idx] := c_MQTT_Publish_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_publishDB.queue);
}
}
function f_EPTF_MQTT_publishDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
}
function f_EPTF_MQTT_subscriptionDB_init()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
v_MQTT_subscriptionDB.data := {};
v_MQTT_subscriptionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_subscriptionDB_Hash");
}
function f_EPTF_MQTT_subscriptionDB_cleanUp()
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
v_MQTT_subscriptionDB.data := {};
f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_subscriptionDB_Hash");
}
function f_EPTF_MQTT_subscriptionDB_add(in MQTT_Subscription p_sub)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_subscriptionDB.queue);
f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_subscriptionDB.queue);
f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding subscription ", v_idx, " ", p_sub));
if (ispresent(p_sub.request))
{
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sub.sessionIdx, p_sub.request.packet_identifier),
v_idx
);
// TODO: Only the first topic name will be used!
// TODO: It is not checked if there is already a subscription for this!
if (sizeof(p_sub.request.payload)>0)
{
var universal charstring vl_topicName := p_sub.request.payload[0].topic_filter;
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(p_sub.sessionIdx, f_unichar2charstr(vl_topicName)),
v_idx
);
}
}
v_MQTT_subscriptionDB.data[v_idx] := p_sub;
return v_idx;
}
function f_EPTF_MQTT_subscriptionDB_setKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_str2int_HashMap_Insert(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
p_idx
);
}
function f_EPTF_MQTT_subscriptionDB_removeKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId)
);
}
function f_EPTF_MQTT_subscriptionDB_lookUp_packetId(in integer p_sessionIdx, in integer p_packetId)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
return vl_idx;
}
function f_EPTF_MQTT_subscriptionDB_lookUp_topicName(in integer p_sessionIdx, in charstring p_topicName)
runs on EPTF_MQTT_LGen_CT
return integer
{
var integer vl_idx := -1;
f_EPTF_str2int_HashMap_Find(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(p_sessionIdx, p_topicName),
vl_idx
);
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
return vl_idx;
}
function f_EPTF_MQTT_subscriptionDB_get(in integer p_idx, inout MQTT_Subscription p_sub)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0)
{
p_sub := v_MQTT_subscriptionDB.data[p_idx];
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get subscription with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_subscriptionDB_check(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_subscriptionDB.queue))
{
return true;
}
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
return false;
}
}
function f_EPTF_MQTT_subscriptionDB_remove(in integer p_idx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));
if (f_EPTF_MQTT_subscriptionDB_get(p_idx, v_MQTT_subscription))
{
if (ispresent(v_MQTT_subscription.request))
{
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_packetIdHash(
v_MQTT_subscription.sessionIdx,
v_MQTT_subscription.request.packet_identifier)
);
}
if (sizeof(v_MQTT_subscription.request.payload)>0)
{
var universal charstring vl_topicName := v_MQTT_subscription.request.payload[0].topic_filter;
f_EPTF_str2int_HashMap_Erase(
v_MQTT_subscriptionDB.hashRef,
f_EPTF_MQTT_subscriptionDB_topicHash(v_MQTT_subscription.sessionIdx, f_unichar2charstr(vl_topicName))
);
}
v_MQTT_subscriptionDB.data[p_idx] := c_MQTT_Subscription_init;
f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_subscriptionDB.queue);
}
}
function f_EPTF_MQTT_subscriptionDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
}
function f_EPTF_MQTT_subscriptionDB_topicHash(in integer p_sessionIdx, in charstring p_topic)
return charstring
{
return "session_"&int2str(p_sessionIdx)&":"&"topic_"&p_topic;
}
function f_EPTF_MQTT_stack_fromApp(inout EPTF_MQTT_PDU p_msg, in MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
{
if (p_ctx.sessionIdx >=0 and p_ctx.sessionIdx < sizeof(v_MQTT_sessionDB.data) )
{
f_EPTF_MQTT_session_fromApp(p_msg, p_ctx.sessionIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " sessionIdx is not valid [",p_ctx.sessionIdx,"]. Dropping message"));
}
}
function f_EPTF_MQTT_stack_fromEnv(inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
var integer vl_sIdx := f_EPTF_MQTT_sessionDB_lookUp(p_msg.transportParams.localAddress);
if (vl_sIdx >= 0)
{
// In case it's a publish response, there should be an ongoing publish transaction
if (f_EPTF_MQTT_publishResponseType(p_msg.pdu))
{
var integer vl_packetId := f_EPTF_MQTT_publishResponsePacketId(p_msg.pdu);
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_lookUp(vl_sIdx, vl_packetId);
if (vl_pubIdx >= 0)
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish transaction found for incoming message: ", vl_pubIdx));
f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no ongoing publish transaction found for incoming message (ignoring): ", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session found for incoming message: ",vl_sIdx));
f_EPTF_MQTT_session_fromEnv(p_msg, vl_sIdx);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no session found for incoming message (ignoring): ", p_msg));
}
}
function f_EPTF_MQTT_session_fromApp(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",p_sIdx,":",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));
f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);
// State: DISCONNECTED
if (v_MQTT_sessionDB.data[p_sIdx].state == DISCONNECTED)
{
// appIn: connect
if (ischosen(p_msg.pdu.connect_msg))
{
// At this point, the local address should be filled in correctly
f_EPTF_MQTT_sessionDB_setKey(p_sIdx);
p_msg.pdu.connect_msg.payload.client_identifier.stringItem := v_MQTT_sessionDB.data[p_sIdx].clientId;
p_msg.pdu.connect_msg.payload.client_identifier.stringLength := lengthof(v_MQTT_sessionDB.data[p_sIdx].clientId);
v_MQTT_sessionDB.data[p_sIdx].keepAliveTime := int2float(p_msg.pdu.connect_msg.keep_alive-1);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTING);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: CONNECTED
else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
{
// appIn: disconnect
if (ischosen(p_msg.pdu.disconnect_msg))
{
// cancel T keepalive
f_EPTF_MQTT_session_cancelT_keepalive(p_sIdx);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
}
if (ischosen(p_msg.pdu.publish))
{
if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
{
p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
f_EPTF_MQTT_session_send(p_sIdx, p_msg);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
}
else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
{
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// create new Publish QoS1 Orig
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := ORIG;
vl_publish.state.qos1 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
}
else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
{
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// create new Publish QoS2 Orig
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := ORIG;
vl_publish.state.qos2 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
}
}
if (ischosen(p_msg.pdu.subscribe))
{
// Fill in message
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// Create new Subscription
var MQTT_Subscription vl_sub := c_MQTT_Subscription_init;
vl_sub.sessionIdx := p_sIdx;
vl_sub.request := p_msg.pdu.subscribe;
vl_sub.state := UNSUBSCRIBED;
var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_add(vl_sub);
f_EPTF_MQTT_session_registerSubscription(p_sIdx, vl_subIdx);
// subscription.send
f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
}
if (ischosen(p_msg.pdu.unsubscribe))
{
// Fill in message
p_msg.sessionIdx := p_sIdx;
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);
// Look up corresponding subscription
if (sizeof(p_msg.pdu.unsubscribe.payload[0])>0)
{
var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload[0].topic_filter);
var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_lookUp_topicName(p_sIdx, vl_topicName);
// subOut.send
if (f_EPTF_MQTT_subscriptionDB_check(vl_subIdx)) {
f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
}
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
}
}
function f_EPTF_MQTT_session_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));
f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);
// State: CONNECTING
if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTING)
{
// envIn: connack
if (ischosen(p_msg.pdu.connack))
{
if (p_msg.pdu.connack.connect_return_code == 0)
{
// startT keepalive
f_EPTF_MQTT_session_startT_keepalive(p_sIdx, v_MQTT_sessionDB.data[p_sIdx].keepAliveTime);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_CONNACK_Accepted,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else
{
f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_CONNACK_Refused,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: CONNECTED
else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
{
// envIn: suback
if (ischosen(p_msg.pdu.suback))
{
var integer vl_subIdx :=
f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.suback.packet_identifier);
f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
}
// envIn: unsuback
else if (ischosen(p_msg.pdu.unsuback))
{
var integer vl_subIdx :=
f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.unsuback.packet_identifier);
f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
}
// envIn: publish
else if (ischosen(p_msg.pdu.publish))
{
if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
{
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
{
// Send PUBACK
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBACK(p_msg.pdu.publish.packet_identifier));
f_EPTF_MQTT_session_send(p_sIdx, v_MQTT_msgToSend);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
{
// create new Publish QoS2 TERM
var MQTT_Publish vl_publish := c_MQTT_Publish_init;
vl_publish.sessionIdx := p_sIdx;
vl_publish.side := TERM;
vl_publish.state.qos2 := CREATED;
vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);
f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
// pubOut.send
f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
}
}
// envIn: ping response
else if (ischosen(p_msg.pdu.pingresp))
{
if (v_MQTT_sessionDB.data[p_sIdx].reportPingResponse)
{
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PING_Response,
v_MQTT_sessionDB.data[p_sIdx].eIdx,
v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
{}
);
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
}
}
function f_EPTF_MQTT_session_keepalive(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
runs on EPTF_MQTT_LGen_CT
return boolean
{
var integer vl_sIdx := pl_action.actionId[0];
v_MQTT_sessionDB.data[vl_sIdx].keepaliveTimer := -1;
if (v_MQTT_sessionDB.data[vl_sIdx].state == CONNECTED)
{
// Send ping request
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_pingReq);
f_EPTF_MQTT_session_send(vl_sIdx, v_MQTT_msgToSend);
// startT keepAlive
f_EPTF_MQTT_session_startT_keepalive(vl_sIdx, v_MQTT_sessionDB.data[vl_sIdx].keepAliveTime);
f_EPTF_MQTT_session_setState(vl_sIdx, CONNECTED);
}
return true;
}
function f_EPTF_MQTT_session_startT_keepalive(in integer pl_sIdx, in float pl_time)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (pl_time <= 0.0) { return false; }
var boolean retval;
var EPTF_ActionId vl_actionId;
vl_actionId[0] := pl_sIdx;
retval := f_EPTF_SchedulerComp_scheduleAction(
f_EPTF_SchedulerComp_snapshotTime() + pl_time,
refers(f_EPTF_MQTT_session_keepalive),
vl_actionId,
v_MQTT_sessionDB.data[pl_sIdx].keepaliveTimer
);
return retval;
}
function f_EPTF_MQTT_session_cancelT_keepalive(in integer pl_sessionIdx)
runs on EPTF_MQTT_LGen_CT
{
if (v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer >= 0)
{
if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
}
v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer := -1;
}
}
function f_EPTF_MQTT_session_getNextPacketId(in MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
return integer
{
if (v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId < 65535)
{
v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId + 1;
}
else
{
v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := 0;
}
return v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId;
}
function f_EPTF_MQTT_session_setState(in integer p_sessionIdx, in MQTT_Session_State p_nextState)
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_sessionDB.data[p_sessionIdx].state := p_nextState;
f_EPTF_MQTT_Logging_VERBOSE(log2str("session [", p_sessionIdx,":",v_MQTT_sessionDB.data[p_sessionIdx].clientId,"] next state: ", v_MQTT_sessionDB.data[p_sessionIdx].state));
}
function f_EPTF_MQTT_session_send(in integer p_sessionIdx, inout EPTF_MQTT_PDU p_msg)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sessionIdx].localAddrIdx);
f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sessionIdx].remoteAddrIdx);
p_msg.transportParams.proto := {tcp := {}};
p_msg.sessionIdx := p_sessionIdx;
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
}
function f_EPTF_MQTT_session_registerSubscription(in integer p_sessionIdx, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
// Note, we don't check if it is already there
v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs)]
:= p_subIdx;
}
function f_EPTF_MQTT_session_deregisterSubscription(in integer p_sessionIdx, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
// This is slow, it's only acceptable if the assumption that a session has only a small number
// of subscriptions
var EPTF_IntegerList vl_new := {};
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
{
if (v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i] != p_subIdx) {
vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i];
}
}
v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs := vl_new;
}
function f_EPTF_MQTT_session_registerPublish(in integer p_sessionIdx, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
// Note, we don't check if it is already there
v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs)]
:= p_pubIdx;
}
function f_EPTF_MQTT_session_deregisterPublish(in integer p_sessionIdx, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
// This is slow, it's only acceptable if the assumption that a session has only a small number
// of concurrent publish(s)
var EPTF_IntegerList vl_new := {};
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs); i:=i+1)
{
if (v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i] != p_pubIdx) {
vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i];
}
}
v_MQTT_sessionDB.data[p_sessionIdx].publishRefs := vl_new;
}
function f_EPTF_MQTT_session_remove(in integer p_sessionIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_session_setState(p_sessionIdx, REMOVING);
f_EPTF_MQTT_session_cancelT_keepalive(p_sessionIdx);
// Remove subscriptions, remove publications
for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
{
f_EPTF_MQTT_subscription_remove(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i]);
}
// Derefer FSM pointers
f_EPTF_LGenBase_setAppDataItemOfFsmCtx(
v_MQTT_sessionDB.data[p_sessionIdx].eIdx,
v_MQTT_sessionDB.data[p_sessionIdx].fsmIdx,
v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1
);
// Clean up DB
f_EPTF_MQTT_sessionDB_remove(p_sessionIdx);
}
function f_EPTF_MQTT_subscription_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));
// State: UNSUBSCRIBED
if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBED)
{
// sessionIn: subscribe
if (ischosen(p_msg.pdu.subscribe))
{
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBING);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
// State: SUBSCRIBED
else if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBED)
{
// sessionIn: unsubscribe
if (ischosen(p_msg.pdu.unsubscribe))
{
f_EPTF_MQTT_subscriptionDB_setKey_packetId(
p_subIdx, v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_msg.pdu.unsubscribe.packet_identifier
);
f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBING);
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
}
}
function f_EPTF_MQTT_subscription_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));
// State: SUBSCRIBING
if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBING)
{
// sessionIn: suback
if (ischosen(p_msg.pdu.suback) and sizeof(p_msg.pdu.suback.payload.return_code)>0)
{
if (p_msg.pdu.suback.payload.return_code[0] == 0)
{
f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_SUBACK_Accepted,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
{}
);
}
else
{
f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_SUBACK_Refused,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
{}
);
// Removing subscription
f_EPTF_MQTT_subscription_remove(p_subIdx);
}
}
// sessionIn: unsuback
if (ischosen(p_msg.pdu.unsuback))
{
f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_UNSUBACK,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
{}
);
// Removing unsuback key
f_EPTF_MQTT_subscriptionDB_removeKey_packetId(
p_subIdx,
v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx,
p_msg.pdu.unsuback.packet_identifier
);
// Removing subscription
f_EPTF_MQTT_subscription_remove(p_subIdx);
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
}
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
}
}
function f_EPTF_MQTT_subscription_setState(in integer p_subIdx, in MQTT_Subscription_State p_nextState)
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_subscriptionDB.data[p_subIdx].state := p_nextState;
f_EPTF_MQTT_Logging_VERBOSE(log2str("subscription next state: ", v_MQTT_subscriptionDB.data[p_subIdx].state));
}
function f_EPTF_MQTT_subscription_remove(in integer p_subIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_subscription_setState(p_subIdx, REMOVING);
// Remove pointers from session
f_EPTF_MQTT_session_deregisterSubscription(v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_subIdx);
// Clean up DB
f_EPTF_MQTT_subscriptionDB_remove(p_subIdx);
}
function f_EPTF_MQTT_publish_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));
// QoS 1 Originating
if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
{
// State: CREATED
if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == CREATED)
{
// sessionIn: publish
if (ischosen(p_msg.pdu.publish))
{
// start T response watchdog
f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
f_EPTF_MQTT_publish_setState(p_pubIdx, {qos1 := PUBLISHED});
}
else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
}
else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1)); }
}
// QoS 1 Terminating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " NOP"));
}
// QoS 2 Originating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
{
// State: CREATED
if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
{
// sessionIn: publish
if (ischosen(p_msg.pdu.publish))
{
// start T response watchdog
f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);
// envOut.send
f_EPTF_MQTT_LGen_send(p_msg);
f_EPTF_MQTT_publish_setState(p_pubIdx, {qos2 := PUBLISHED});
}
else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
}
else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1)); }
}
// QoS 2 Terminating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
{
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
}
}
function f_EPTF_MQTT_publish_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));
// QoS 1 Originating
if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
{
// State: PUBLISHED
if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == PUBLISHED)
{
// sessionIn: PUBACK
if (ischosen(p_msg.pdu.puback))
{
// cancel T response watchdog
f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);
// report event if enabled
if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBACK,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
{}
);
}
// next state: ACKNOWLEDGED
f_EPTF_MQTT_publish_setState(p_pubIdx, { qos1:=ACKNOWLEDGED });
// remove publish transaction
f_EPTF_MQTT_publish_remove(p_pubIdx);
}
else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
}
else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1)); }
}
// QoS 1 Terminating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled, QoS1 TERM should be handled on session level: Ignoring:", p_msg));
}
// QoS 2 Originating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
{
// State: PUBLISHED
if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == PUBLISHED)
{
// sessionIn: PUBREC
if (ischosen(p_msg.pdu.pubrec))
{
// cancel T response watchdog
f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);
// envOut.send PUBREL
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREL(p_msg.pdu.pubrec.packet_identifier));
f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);
// start publish watchdog
f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);
// next state: RELEASED
f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RELEASED });
// report event if enabled
if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBREC,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
{}
);
}
}
}
// State: RELEASED
else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RELEASED)
{
// sessionIn: PUBCOMP
if (ischosen(p_msg.pdu.pubcomp))
{
// cancel T response watchdog
f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);
// next state: COMPLETE
f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });
// report event if enabled
if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBREC,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
{}
);
}
// remove publish transaction
f_EPTF_MQTT_publish_remove(p_pubIdx);
}
else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
}
else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2)); }
}
// QoS 2 Terminating
else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
{
// State: PUBLISHED
if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
{
// envIn: PUBREC
if (ischosen(p_msg.pdu.publish))
{
// envOut.send PUBREC
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREC(p_msg.pdu.publish.packet_identifier));
f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);
// start publish watchdog
f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);
// next state: RECEIVED
f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RECEIVED });
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBLISH,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
{}
);
}
}
// State: RECEIVED
else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RECEIVED)
{
// envIn: PUBREL
if (ischosen(p_msg.pdu.pubrel))
{
// cancel T response watchdog
f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);
// envOut.send PUBCOMP
v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBCOMP(p_msg.pdu.pubrel.packet_identifier));
f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);
// next state: COMPLETE
f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });
// report event if enabled
if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBREL,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
{}
);
}
// remove publish transaction
f_EPTF_MQTT_publish_remove(p_pubIdx);
}
else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
}
else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2)); }
}
else
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
}
}
function f_EPTF_MQTT_publish_watchdog(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
runs on EPTF_MQTT_LGen_CT
return boolean
{
var integer vl_pIdx := pl_action.actionId[0];
v_MQTT_publishDB.data[vl_pIdx].watchdogTimer := -1;
f_EPTF_MQTT_dispatchEvent(
c_MQTT_eventIdx_PUBACK,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].eIdx,
v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].fsmIdx,
{}
);
f_EPTF_MQTT_publish_remove(vl_pIdx);
return true;
}
function f_EPTF_MQTT_publish_startT_watchdog(in integer pl_pIdx, in float pl_time)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (pl_time <= 0.0) { return false; }
var boolean retval;
var EPTF_ActionId vl_actionId;
vl_actionId[0] := pl_pIdx;
retval := f_EPTF_SchedulerComp_scheduleAction(
f_EPTF_SchedulerComp_snapshotTime() + pl_time,
refers(f_EPTF_MQTT_publish_watchdog),
vl_actionId,
v_MQTT_publishDB.data[pl_pIdx].watchdogTimer
);
return retval;
}
function f_EPTF_MQTT_publish_setState(in integer p_pubIdx, in MQTT_Publish_State p_nextState)
runs on EPTF_MQTT_LGen_CT
{
v_MQTT_publishDB.data[p_pubIdx].state := p_nextState;
f_EPTF_MQTT_Logging_VERBOSE(log2str("publish next state: ", v_MQTT_publishDB.data[p_pubIdx].state));
}
function f_EPTF_MQTT_publish_cancelT_watchdog(in integer pl_publishIdx)
runs on EPTF_MQTT_LGen_CT
{
if (v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer >= 0)
{
if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer))
{
f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
}
v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer := -1;
}
}
function f_EPTF_MQTT_publish_remove(in integer p_pubIdx)
runs on EPTF_MQTT_LGen_CT
{
f_EPTF_MQTT_publish_setState(p_pubIdx, { removing := true });
// cancel timers if running
f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);
// Remove pointers from session
f_EPTF_MQTT_session_deregisterPublish(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, p_pubIdx);
// Clean up DB
f_EPTF_MQTT_publishDB_remove(p_pubIdx);
}
function f_EPTF_MQTT_setStepCtx(in EPTF_LGenBase_TestStepArgs pl_ptr, inout MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (not f_EPTF_MQTT_isFsmInitialized(pl_ptr.eIdx, pl_ptr.refContext.fCtxIdx, p_ctx.sessionIdx))
{
f_EPTF_MQTT_Logging_WARNING(%definitionId &
": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
return false;
}
p_ctx.eIdx := pl_ptr.eIdx;
p_ctx.fsmIdx := pl_ptr.refContext.fCtxIdx;
return true;
}
function f_EPTF_MQTT_setCtx(in integer p_eIdx, in integer p_fsmIdx, inout MQTT_StepCtx p_ctx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
if (not f_EPTF_MQTT_isFsmInitialized(p_eIdx, p_fsmIdx, p_ctx.sessionIdx))
{
f_EPTF_MQTT_Logging_WARNING(%definitionId &
": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
return false;
}
p_ctx.eIdx := p_eIdx;
p_ctx.fsmIdx := p_fsmIdx;
return true;
}
function f_EPTF_MQTT_isFsmInitialized(in integer pl_eIdx, in integer pl_fsmIdx, inout integer pl_sessionIdx)
runs on EPTF_MQTT_LGen_CT
return boolean
{
pl_sessionIdx := -1;
var EPTF_IntegerList vl_appData := f_EPTF_LGenBase_getAppDataListOfFsmCtx(pl_eIdx, pl_fsmIdx, v_MQTT_bIdx);
if (c_MQTT_AppData_sessionIdx < sizeof(vl_appData))
{
pl_sessionIdx := vl_appData[c_MQTT_AppData_sessionIdx];
}
return -1 < pl_sessionIdx and sizeof(v_MQTT_sessionDB.data) > pl_sessionIdx;
}
function f_EPTF_MQTT_getIntValue(
in EPTF_IntegerList pl_intList,
in integer pl_number,
inout integer pl_value)
return boolean
{
if (sizeof(pl_intList) > pl_number)
{
pl_value := pl_intList[pl_number];
return true;
}
return false;
}
function f_EPTF_MQTT_Logging_VERBOSE(in @lazy charstring pl_message)
runs on EPTF_MQTT_LGen_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUGV});
}
}
function f_EPTF_MQTT_Logging_DEBUG(in @lazy charstring pl_message)
runs on EPTF_MQTT_LGen_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUG});
}
}
function f_EPTF_MQTT_Logging_WARNING(in @lazy charstring pl_message)
runs on EPTF_MQTT_LGen_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_WARNING});
}
}
function f_EPTF_MQTT_Logging_ERROR(in @lazy charstring pl_message)
runs on EPTF_MQTT_LGen_CT
{
if (c_EPTF_Common_debugSwitch) {
f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_ERROR});
}
}
function f_EPTF_MQTT_dispatchEvent(in integer pl_eventIdx, in integer pl_eIdx, in integer pl_fsmCtx, in EPTF_IntegerList pl_reportedArgs)
runs on EPTF_MQTT_LGen_CT
{
if (pl_eIdx < 0)
{
f_EPTF_LGenBase_postEvent(
{
{
v_MQTT_bIdx,
pl_eventIdx,
omit,
omit
},
pl_reportedArgs
});
}
else
{
if (pl_fsmCtx < 0)
{
f_EPTF_LGenBase_postEvent(
{
{
v_MQTT_bIdx,
pl_eventIdx,
{
pl_eIdx,
omit
}, omit
},
pl_reportedArgs
});
}
else
{
f_EPTF_LGenBase_postEvent(
{
{
v_MQTT_bIdx,
pl_eventIdx,
{
pl_eIdx,
pl_fsmCtx
}, omit
},
pl_reportedArgs
});
}
}
}
function f_EPTF_MQTT_qos_int2enum(in integer p_qos)
return QoS
{
if (p_qos == 0) {
return AT_MOST_ONCE_DELIVERY;
}
else if (p_qos == 1) {
return AT_LEAST_ONCE_DELIVERY;
}
else if (p_qos == 2) {
return EXACTLY_ONE_DELIVERY;
}
return RESERVED;
}
function f_EPTF_MQTT_publishResponseType(in MQTT_v3_1_1_ReqResp p_msg)
return boolean
{
return
ischosen(p_msg.puback) or
ischosen(p_msg.pubrec) or
ischosen(p_msg.pubrel) or
ischosen(p_msg.pubcomp)
}
function f_EPTF_MQTT_publishResponsePacketId(in MQTT_v3_1_1_ReqResp p_msg)
return integer
{
if (ischosen(p_msg.puback)) { return p_msg.puback.packet_identifier }
else if (ischosen(p_msg.pubrec)) { return p_msg.pubrec.packet_identifier }
else if (ischosen(p_msg.pubrel)) { return p_msg.pubrel.packet_identifier }
else if (ischosen(p_msg.pubcomp)) { return p_msg.pubcomp.packet_identifier }
else {
return -1;
}
}
template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_pingReq :=
{
pingreq := {
header := {
flags := '0000'B
}
}
}
template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBACK(in integer p_packetId) :=
{
puback := {
header := {
flags := '0000'B
},
packet_identifier := p_packetId
}
}
template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREL(in integer p_packetId) :=
{
pubrel := {
header := {
flags := '0010'B
},
packet_identifier := p_packetId
}
}
template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREC(in integer p_packetId) :=
{
pubrec := {
header := {
flags := '0000'B
},
packet_identifier := p_packetId
}
}
template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBCOMP(in integer p_packetId) :=
{
pubcomp := {
header := {
flags := '0000'B
},
packet_identifier := p_packetId
}
}
}