///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2021 Ericsson Telecom AB
//
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v2.0
// which accompanies this distribution, and is available at
// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html
///////////////////////////////////////////////////////////////////////////////
//  File:               EPTF_MQTT_LocalTransport_Definitions.ttcn
//  Description:
//  Rev:                R1B
//  Prodnr:             CNL 113 860
//  Updated:            2021-02-03
//  Contact:            http://ttcn.ericsson.se
///////////////////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////
//  Module: EPTF_MQTT_LGen_Functions
// 
//  Purpose:
//    This module contains the functions of the MQTT load generator component
//
//  See also:
//    <EPTF_MQTT_LGen_Definitions>
///////////////////////////////////////////////////////////////

module EPTF_MQTT_LGen_Functions {

  import from EPTF_MQTT_LGen_Definitions all;
  import from EPTF_MQTT_Transport_Definitions all;

  import from EPTF_CLL_Base_Functions all;
  import from EPTF_CLL_Common_Definitions all;
  import from EPTF_CLL_Variable_Definitions all;
  import from EPTF_CLL_Variable_Functions all;
  import from EPTF_CLL_LGenBase_Definitions all;
  import from EPTF_CLL_LGenBase_ConfigFunctions all;
  import from EPTF_CLL_LGenBase_Functions all;
  import from EPTF_CLL_LGenBase_EventHandlingFunctions all;
  import from EPTF_CLL_Logging_Definitions all;
  import from EPTF_CLL_Logging_Functions all;
  import from EPTF_CLL_FBQ_Functions all;
  import from EPTF_CLL_HashMapStr2Int_Functions all;
  import from EPTF_CLL_Scheduler_Definitions all;
  import from EPTF_CLL_RBTScheduler_Functions all;
  import from TCCConversion_Functions all;

  import from MQTT_v3_1_1_Types all;
  import from IPL4asp_Types all;

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_init
  //
  //  Purpose:
  //    The main initialization function for the <EPTF_MQTT_LGen_CT> component type
  //
  //  Parameters:
  //    pl_name - *in* *charstring* - the name for the component instance
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_init(in charstring pl_name)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_initialized){return;}

    f_EPTF_LGenBase_init(pl_name, 0, pl_name);
    f_EPTF_Logging_init_CT(pl_name);
    f_EPTF_str2int_HashMap_Init();

    v_MQTT_bIdx := f_EPTF_LGenBase_declareBehaviorType(
      c_MQTT_behaviorType,
      tsp_EPTF_MQTT_LGen_maxBindableCtx,
      refers(f_MQTT_eCtxReset),
      refers(f_MQTT_eCtxBind),
      refers(f_MQTT_eCtxUnbind)
      );

    v_MQTT_loggingMaskId :=
      f_EPTF_Logging_registerComponentMasks(
        "MQTT_Logging",
        {"WARNING", "DEBUG", "DEBUGV", "ERROR" },
        EPTF_Logging_CLL);

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": my behavior idx is ", v_MQTT_bIdx));

    f_EPTF_MQTT_addressDB_init();
    f_EPTF_MQTT_templateDB_init();
    f_EPTF_MQTT_sessionDB_init();
    f_EPTF_MQTT_subscriptionDB_init();
    f_EPTF_MQTT_publishDB_init();

    f_EPTF_MQTT_declareSteps();
    f_EPTF_MQTT_declareEvents();

    f_EPTF_Base_registerCleanup(refers(f_MQTT_cleanUp));

    v_MQTT_initialized := true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_initLogging
  //
  //  Purpose:
  //    Initializing CLL's logging feature on the <EPTF_MQTT_LGen_CT> component type
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_initLogging()
  runs on EPTF_MQTT_LGen_CT
  {
	f_EPTF_Logging_init_CT("MQTT_LGen");
    v_MQTT_loggingMaskId :=
    	f_EPTF_Logging_registerComponentMasks(
    		"MQTT_LGen_Logging",
    		{"WARNING", "DEBUG", "DEBUGV", "ERROR"},
    		EPTF_Logging_CLL
    	);

	if(tsp_EPTF_MQTT_LGen_debug){
	  f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
	}
	else {
	  f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
	}

	if(tsp_EPTF_MQTT_LGen_debugVerbose) {
	  f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
	}
	else {
	  f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_cleanUp
  //
  //  Purpose:
  //    The main clean up function for the <EPTF_MQTT_LGen_CT> component type
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_addressDB_cleanUp();
    f_EPTF_MQTT_templateDB_cleanUp();
    f_EPTF_MQTT_sessionDB_cleanUp();
    f_EPTF_MQTT_subscriptionDB_cleanUp();
    f_EPTF_MQTT_publishDB_cleanUp();

    vf_MQTT_msgReceived := null;

    v_MQTT_initialized := false;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_eCtxBind
  //
  //  Purpose:
  //    This function is called by the CLL for each entity instance created on a particular instace of <EPTF_MQTT_LGen_CT>
  //
  //  Parameters:
  //    pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
  //
  //  Returns:
  //    <EPTF_IntegerList> - The list will contain the index of the entity the context belongs to
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_eCtxBind(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  return EPTF_IntegerList
  {
    return {pl_eIdx};
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_eCtxUnbind
  //
  //  Purpose:
  //    The reverse operation of <f_MQTT_eCtxBind>. Cleans up resources reserved during <f_MQTT_eCtxBind>. Called by the CLL.
  //
  //  Parameters:
  //    pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_eCtxUnbind(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (not v_MQTT_initialized) {return;}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_eCtxReset
  //
  //  Purpose:
  //    The resources reserved during <f_MQTT_eCtxBind> are reinitalized (reset). Called by the CLL. 
  //
  //  Parameters:
  //    pl_eIdx - *in* *integer* - the index of the entity instance on this load generator component instance
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_eCtxReset(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  {
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_declareEvents
  //
  //  Purpose:
  //    Declares the FSM events to the CLL framework implemented by <EPTF_MQTT_LGen_CT>
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_declareEvents()
  runs on EPTF_MQTT_LGen_CT
  {
    var integer vl_dummy;

    if (
      c_MQTT_eventIdx_transportSucc != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportSucc) or
      c_MQTT_eventIdx_transportFail != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportFail) or
      c_MQTT_eventIdx_transportEstablished != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportEstablished) or
      c_MQTT_eventIdx_transportClosed != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportClosed) or
      c_MQTT_eventIdx_CONNACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Accepted) or
      c_MQTT_eventIdx_CONNACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Refused) or
      c_MQTT_eventIdx_SUBACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Accepted) or
      c_MQTT_eventIdx_SUBACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Refused) or
      c_MQTT_eventIdx_UNSUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_UNSUBACK) or
      c_MQTT_eventIdx_PUBLISH != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH) or
      c_MQTT_eventIdx_PING_Request != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Request) or
      c_MQTT_eventIdx_PING_Response != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Response) or
      c_MQTT_eventIdx_PUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBACK) or
      c_MQTT_eventIdx_PUBREC != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREC) or
      c_MQTT_eventIdx_PUBREL != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREL) or
      c_MQTT_eventIdx_PUBCOMP != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBCOMP) or
      c_MQTT_eventIdx_PUBLISH_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH_Timeout) or
      c_MQTT_eventIdx_SUBSCRIBE_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBSCRIBE_Timeout)
    )
    {
      f_EPTF_LGenBase_log();
      log("error"); mtc.stop
    }

  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_declareSteps
  //
  //  Purpose:
  //    Declares the FSM steps to the CLL framework implemented by <EPTF_MQTT_LGen_CT>
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_declareSteps()
  runs on EPTF_MQTT_LGen_CT
  {
    if (
      c_MQTT_stepIdx_init != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_init, refers(f_MQTT_step_init)}) or
      c_MQTT_stepIdx_cleanUp != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_cleanUp, refers(f_MQTT_step_cleanUp) }) or
      c_MQTT_stepIdx_setLocalAddress != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress, refers(f_MQTT_step_setLocalAddress)}) or
      c_MQTT_stepIdx_setLocalAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress_byVars, refers(f_MQTT_step_setLocalAddress_byVars)}) or
      c_MQTT_stepIdx_setRemoteAddress != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress, refers(f_MQTT_step_setRemoteAddress)}) or
      c_MQTT_stepIdx_setRemoteAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress_byVars, refers(f_MQTT_step_setRemoteAddress_byVars)}) or
      c_MQTT_stepIdx_transportConnect != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportConnect, refers(f_MQTT_step_transportConnect)}) or
      c_MQTT_stepIdx_transportClose != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportClose, refers(f_MQTT_step_transportClose)}) or
      c_MQTT_stepIdx_startListening != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_startListening, refers(f_MQTT_step_startListening)}) or
      c_MQTT_stepIdx_loadTemplate_byIntIdx != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byIntIdx, refers(f_MQTT_step_loadTemplate_byIntIdx)}) or
      c_MQTT_stepIdx_loadTemplate_byStringId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byStringId, refers(f_MQTT_step_loadTemplate_byStringId)}) or
      c_MQTT_stepIdx_send != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_send, refers(f_MQTT_step_send)}) or
      c_MQTT_stepIdx_setTopic_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_stringParam, refers(f_MQTT_step_setTopic_stringParam)}) or
      c_MQTT_stepIdx_setTopic_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_stringParam, refers(f_MQTT_step_setTopic_add_stringParam)}) or
      c_MQTT_stepIdx_setTopic_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_varParams, refers(f_MQTT_step_setTopic_add_varParams)}) or
      c_MQTT_stepIdx_setTopic_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_clientId, refers(f_MQTT_step_setTopic_add_clientId)}) or
      c_MQTT_stepIdx_setQos_intParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setQos_intParam, refers(f_MQTT_step_setQos_intParam)}) or
      c_MQTT_stepIdx_setPublishMessage_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_stringParam, refers(f_MQTT_step_setPublishMessage_stringParam)}) or
      c_MQTT_stepIdx_setPublishMessage_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_stringParam, refers(f_MQTT_step_setPublishMessage_add_stringParam)}) or
      c_MQTT_stepIdx_setPublishMessage_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_varParams, refers(f_MQTT_step_setPublishMessage_add_varParams)}) or
      c_MQTT_stepIdx_setPublishMessage_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_clientId, refers(f_MQTT_step_setPublishMessage_add_clientId)}) or
      c_MQTT_stepIdx_reportPingResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPingResponse, refers(f_MQTT_step_reportPingResponse)}) or
      c_MQTT_stepIdx_reportPublishResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPublishResponse, refers(f_MQTT_step_reportPublishResponse)})
    )
    {
      f_EPTF_LGenBase_log();
      log("EPTF_MQTT_LGen declaration error"); mtc.stop
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_receiveMessage
  //
  //  Purpose:
  //    The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <EPTF_MQTT_PDU> message
  //    to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function. 
  //
  //  Parameters:
  //    pl_message - *in* <EPTF_MQTT_PDU> - received message
  //
  //  Related Types:
  //    - <EPTF_MQTT_LGen_CT>
  //    - <fcb_EPTF_MQTT_Transport_receiveMessage>
  //    - <EPTF_MQTT_Transport_Provider_CT>
  //    - <EPTF_MQTT_Transport_User_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_receiveMessage(in EPTF_MQTT_PDU pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", pl_message));

    v_MQTT_msgToProcess := pl_message;

    f_EPTF_MQTT_stack_fromEnv(v_MQTT_msgToProcess);

    if (vf_MQTT_msgReceived != null)
    {
      vf_MQTT_msgReceived.apply(v_MQTT_msgToProcess);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_receiveEvent
  //
  //  Purpose:
  //    The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <ASP_Event> events
  //    to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function. 
  //
  //  Parameters:
  //    pl_message - *in* <ASP_Event> - received event
  //
  //  Related Types:
  //    - <EPTF_MQTT_LGen_CT>
  //    - <fcb_EPTF_MQTT_Transport_receiveEvent>
  //    - <EPTF_MQTT_Transport_Provider_CT>
  //    - <EPTF_MQTT_Transport_User_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_receiveEvent(in ASP_Event p_event)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", p_event));

    // TODO: connection closed handling!
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_transportApiResponse
  //
  //  Purpose:
  //    The transport layer implementation <EPTF_MQTT_Transport_Provider_CT> can report received <EPTF_MQTT_Transport_Response> responses
  //    to the load generator layer <EPTF_MQTT_Transport_User_CT> extended by <EPTF_MQTT_LGen_CT> using this function. 
  //
  //  Parameters:
  //    pl_rsp - *in* <EPTF_MQTT_Transport_Response> - received transport api response
  //
  //  Related Types:
  //    - <EPTF_MQTT_LGen_CT>
  //    - <fcb_EPTF_MQTT_Transport_apiResponse>
  //    - <EPTF_MQTT_Transport_Provider_CT>
  //    - <EPTF_MQTT_Transport_User_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_transportApiResponse(in EPTF_MQTT_Transport_Response pl_rsp)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str("api response: ", pl_rsp));

    if (f_EPTF_MQTT_sessionDB_get(pl_rsp.sessionIdx, v_MQTT_session))
    {
      if (ispresent(pl_rsp.params))
      {
        if (ischosen(pl_rsp.params.connectionClosed))
        {
          f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportClosed, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
        }
        else if (ischosen(pl_rsp.params.tcpEstablished))
        {
          f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportEstablished, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
        }
      }
      else if (pl_rsp.succ)
      {
        f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportSucc, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
      }
      else
      {
        f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportFail, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
      }
    }
    else
    {
      f_EPTF_MQTT_Logging_VERBOSE(log2str("session not found for api response: ", pl_rsp));
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_LGen_send
  //
  //  Purpose:
  //    This function is used to send out a message of a <EPTF_MQTT_PDU> using the registered
  //    function <fcb_EPTF_MQTT_Transport_sendMessage> of the underlying transport layer instance.
  //
  //  Parameters:
  //    p_msg - *intout* <EPTF_MQTT_PDU> - the message to be sent
  //
  //  Related Types:
  //    <EPTF_MQTT_PDU>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_LGen_send(inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    vf_EPTF_MQTT_Transport_send.apply(p_msg);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_init
  //
  //  Purpose:
  //    Test Step to dynamically allocate and initialize the MQTT FSM context for the caller FSM. Prerequisite to call any other MQTT test step.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_init>
  //    - <c_MQTT_stepName_init>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_init(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    var integer vl_eIdx := pl_ptr.eIdx;
    var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
    var integer vl_newSessionIdx := -1;

    if (f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_newSessionIdx)) { return }

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId, ":  initializing fsm ", vl_fsmIdx, " for entity ",vl_eIdx));

    var MQTT_Session vl_session := c_MQTT_Session_init;

	vl_session.eIdx := vl_eIdx;
    vl_session.fsmIdx := pl_ptr.refContext.fCtxIdx;

    vl_newSessionIdx := f_EPTF_MQTT_sessionDB_add(vl_session);

    f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, vl_newSessionIdx);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_cleanUp
  //
  //  Purpose:
  //    Test Step to free up the MQTT FSM context for the caller FSM. Frees up all allocated instances that were used by this FSM instance.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_cleanUp>
  //    - <c_MQTT_stepName_cleanUp>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_cleanUp(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
     f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    var integer vl_eIdx:=pl_ptr.eIdx;
    var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
    var integer vl_sessionIdx := -1;

    if (not f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_sessionIdx))
    {
      f_EPTF_MQTT_Logging_DEBUG(%definitionId&": FSM has not been initialized");
      return;
    }

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": FSM database: ",v_MQTT_sessionDB.data[vl_sessionIdx]));

	f_EPTF_MQTT_session_remove(vl_sessionIdx);

    f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setLocalAddress
  //
  //  Purpose:
  //    Test step to set the local address in the entity context.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //    pl_ptr.refContext.fRefArgs[0] - *integer* - Index of the socket to use as local address
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setLocalAddress>
  //    - <c_MQTT_stepName_setLocalAddress>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setLocalAddress(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
  }
  
  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setLocalAddress_byVars
  //
  //  Purpose:
  //    Test step to set the local address in the entity context.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param: remoteHost: charstring, 2nd param: remotePort: integer)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setLocalAddress_byVars>
  //    - <c_MQTT_stepName_setLocalAddress_byVars>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setLocalAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

    if (sizeof(vl_varIds)==2)
    {
      var EPTF_Var_DirectContent vl_host, vl_port;
      f_EPTF_Var_getContent(vl_varIds[0], vl_host);
      f_EPTF_Var_getContent(vl_varIds[1], vl_port);

      if (not ischosen(vl_host.charstringVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
        return;
      }
      if (not ischosen(vl_port.intVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
        return;
      }

      f_EPTF_MQTT_addressDB_add(
        {
          hostName := vl_host.charstringVal,
          portNumber := vl_port.intVal
        },
        v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx
      );
    }
    else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setRemoteAddress
  //
  //  Purpose:
  //    Test step to set the remote address in the FSM context.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //    pl_ptr.refContext.fRefArgs[0] - *integer* - Index of the socket to use as local address
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setRemoteAddress>
  //    - <c_MQTT_stepName_setRemoteAddress>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setRemoteAddress(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setRemoteAddress_byVars
  //
  //  Purpose:
  //    Test step to set the remote address in the FSM context.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param: remoteHost: charstring, 2nd param: remotePort: integer)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setRemoteAddress_byVars>
  //    - <c_MQTT_stepName_setRemoteAddress_byVars>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setRemoteAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

    if (sizeof(vl_varIds)==2)
    {
      var EPTF_Var_DirectContent vl_host, vl_port;
      f_EPTF_Var_getContent(vl_varIds[0], vl_host);
      f_EPTF_Var_getContent(vl_varIds[1], vl_port);

      if (not ischosen(vl_host.charstringVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
        return;
      }
      if (not ischosen(vl_port.intVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
        return;
      }

      f_EPTF_MQTT_addressDB_add(
        {
          hostName := vl_host.charstringVal,
          portNumber := vl_port.intVal
        },
        v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx
      );
    }
    else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_startListening
  //
  //  Purpose:
  //    The test step expects that a transport endpoint is set in the addressDB as a local address.
  //    The step will initiate allocating the local address associated with the current session in the MQTT context
  //    and call the callback function to start listening.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_startListening>
  //    - <c_MQTT_stepName_startListening>
  //
  //  Related Steps:
  //    - <f_MQTT_step_setLocalAddress_byVars>
  //
  //  Related Callback Function Type:
  //    <fcb_EPTF_MQTT_Transport_apiRequest>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_startListening(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
    vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_addressDB_get(vl_req.params.startListening.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);

    vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_transportConnect
  //
  //  Purpose:
  //    The test step expects that local and remote socket adresses are set in the addressDB.
  //    The step will initiate allocating the local and remote addresses associated with the current session in the MQTT context
  //    and call the callback function to establish a connection.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_transportConnect>
  //    - <c_MQTT_stepName_transportConnect>
  //
  //  Related Events:
  //    - <c_MQTT_eventIdx_transportSucc>
  //    - <c_MQTT_eventIdx_transportFail>
  //
  //  Related Steps:
  //    - <f_MQTT_step_setLocalAddress_byVars>
  //
  //  Related Callback Function Type:
  //    <fcb_EPTF_MQTT_Transport_apiRequest>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_transportConnect(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
    vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);

    vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_transportClose
  //
  //  Purpose:
  //    The test step expects that a transport endpoint is set in the addressDB as a local address.
  //    The step will call the callback function to close the connection by the local address associated with the current session in the MQTT context.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args (1st param int: expectResponse (optional))
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_transportClose>
  //    - <c_MQTT_stepName_transportClose>
  //
  //  Related Steps:
  //    - <f_MQTT_step_startListening>
  //    - <f_MQTT_step_transportConnect>
  //
  //  Related Callback Function Type:
  //    <fcb_EPTF_MQTT_Transport_apiRequest>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_transportClose(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var integer vl_expectResponseInt := 1;
    var boolean vl_expectResponse := true;
    if (f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_expectResponseInt))
    {
      if (vl_expectResponseInt <= 0) { vl_expectResponse := false; }
    }

	if (v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx >= 0)
	{
      var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
      vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
      vl_req.expectResponse := vl_expectResponse;

      f_EPTF_MQTT_addressDB_get(vl_req.params.close.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);

      vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_loadTemplate_byIntIdx
  //
  //  Purpose:
  //    Test step to load a <MQTT_Template> from <tsp_EPTF_MQTT_LGen_templates> into *v_MQTT_msgToSend*
  //    (which can be sent using the send test step) by its integer index in test step args.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_loadTemplate_byIntIdx>
  //    - <c_MQTT_stepName_loadTemplate_byIntIdx>
  //
  //  Related Function:
  //    <f_MQTT_step_send>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_loadTemplate_byIntIdx(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var integer vl_templateIdx := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_templateIdx);

    f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_loadTemplate_byStringId
  //
  //  Purpose:
  //    Test step to load a <MQTT_Template> from <tsp_EPTF_MQTT_LGen_templates> into *v_MQTT_msgToSend*
  //    (which can be sent using the send test step) by its string Id.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_loadTemplate_byStringId>
  //    - <c_MQTT_stepName_loadTemplate_byStringId>
  //
  //  Related Function:
  //    <f_MQTT_step_send>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_loadTemplate_byStringId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_templateId := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
	var integer vl_templateIdx := f_EPTF_MQTT_templateDB_lookUp(vl_templateId);

	if (vl_templateIdx >= 0)
	{
      f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find template with id: ", vl_templateId));
    }
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setTopic_stringParam
  //
  //  Purpose:
  //    Test step to set the string value referred by the test step argument as the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: topic name charstring)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setTopic_stringParam>
  //    - <c_MQTT_stepName_setTopic_stringParam>
  //
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setTopic_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

	if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter := f_charstr2unichar(vl_topic);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
	else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name := f_charstr2unichar(vl_topic);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setTopic_add_stringParam
  //
  //  Purpose:
  //    Test step to add the string value referred by the test step argument to the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: topic name charstring)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setTopic_add_stringParam>
  //    - <c_MQTT_stepName_setTopic_add_stringParam>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setTopic_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

	if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(vl_topic);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(vl_topic);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setTopic_add_varParams
  //
  //  Purpose:
  //    Test step to add the string value of variables referred by the test step argument to the topic of the first subscription entry in SUBSCRIBE and PUBLISH messages.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(params: variables)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setTopic_add_varParams>
  //    - <c_MQTT_stepName_setTopic_add_varParams>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setTopic_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var charstring vl_strToAdd := "";
    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

	for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
    {
      var EPTF_Var_DirectContent vl_var;
      f_EPTF_Var_getContent(vl_varIds[i], vl_var);

      if (ischosen(vl_var.charstringVal)) {
        vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
      }
      else if (ischosen(vl_var.intVal)) {
        vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
      }
    }

    if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(vl_strToAdd);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(vl_strToAdd);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setTopic_add_clientId
  //
  //  Purpose:
  //    Test step to add client ID in the current session to the topic in the first subscription in SUBSCRIBE and to PUBLISH message.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(param: clientId)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setTopic_add_clientId>
  //    - <c_MQTT_stepName_setTopic_add_clientId>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setTopic_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setQos_intParam
  //
  //  Purpose:
  //    Test step to set the QoS level in SUBSCRIBE and PUBLISH messages using test step arguments
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: qos level (0,1,2) integer)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setQos_intParam>
  //    - <c_MQTT_stepName_setQos_intParam>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setQos_intParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_qos := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_qos);

	if (vl_qos >=0 and vl_qos < 3)
	{
      if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
		if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
		  v_MQTT_msgToSend.pdu.subscribe.payload[0].requested_qos := f_EPTF_MQTT_qos_int2enum(vl_qos);
		}
		else {
		  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
		}
	  }
	  else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
		v_MQTT_msgToSend.pdu.publish.header.qos_level := f_EPTF_MQTT_qos_int2enum(vl_qos);
	  }
	  else {
		f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	  }
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid qos level: ", vl_qos));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setPublishMessage_stringParam
  //
  //  Purpose:
  //    Test step to set the content of the payload in PUBLISH message.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: publish message charstring)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setPublishMessage_stringParam>
  //    - <c_MQTT_stepName_setPublishMessage_stringParam>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setPublishMessage_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload := char2oct(vl_msg);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setPublishMessage_add_stringParam
  //
  //  Purpose:
  //    Test step to concatenate a string to the content of the PUBLISH message
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: publish message charstring)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setPublishMessage_add_stringParam>
  //    - <c_MQTT_stepName_setPublishMessage_add_stringParam>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ////////////////////////////////////////////////////////////
  function f_MQTT_step_setPublishMessage_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload :=
	    v_MQTT_msgToSend.pdu.publish.payload &
	    char2oct(vl_msg);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setPublishMessage_add_varParams
  //
  //  Purpose:
  //    Test step to add the content of a set of variables to the payload of a PUBLISH message.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(params: variables)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setPublishMessage_add_varParams>
  //    - <c_MQTT_stepName_setPublishMessage_add_varParams>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setPublishMessage_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (not ischosen(v_MQTT_msgToSend.pdu.publish)) {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
      return;
    }

    var charstring vl_strToAdd := "";
    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

	for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
    {
      var EPTF_Var_DirectContent vl_var;
      f_EPTF_Var_getContent(vl_varIds[i], vl_var);

      if (ischosen(vl_var.charstringVal)) {
        vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
      }
      else if (ischosen(vl_var.intVal)) {
        vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
      }
    }

    v_MQTT_msgToSend.pdu.publish.payload :=
	  v_MQTT_msgToSend.pdu.publish.payload &
	  char2oct(vl_strToAdd);
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_setPublishMessage_add_clientId
  //
  //  Purpose:
  //    Test step to add client ID of the current session to the payload of a PUBLISH message.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(param: clientId)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_setPublishMessage_add_clientId>
  //    - <c_MQTT_stepName_setPublishMessage_add_clientId>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_setPublishMessage_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload :=
	    v_MQTT_msgToSend.pdu.publish.payload &
	    char2oct(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_reportPingResponse
  //
  //  Purpose:
  //    Test step to set the report ping response to enable/disable using using step arguments.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args - 1:enable, 0:disable
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_reportPingResponse>
  //    - <c_MQTT_stepName_reportPingResponse>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_reportPingResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_enable := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);

	if (vl_enable >=0 and vl_enable < 2)
	{
      v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPingResponse := (vl_enable == 1);
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
	}
  }

  /////////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_reportPublishResponse
  //
  //  Purpose:
  //    Test step to set the report publish response to enable/disable using step arguments
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args(1st param: enable (1)/disable (0): integer)
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_reportPublishResponse>
  //    - <c_MQTT_stepName_reportPublishResponse>
  //
  //  Related Type:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_reportPublishResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_enable := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);

	if (vl_enable >=0 and vl_enable < 2)
	{
      v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPublishResponse := (vl_enable == 1);
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_MQTT_step_send
  //
  //  Purpose:
  //    Test step to send out an MQTT message from *v_MQTT_msgToSend*.
  //    The message will be processed by the Applib's MQTT stack
  //    The step expects the localAddress and the remoteAddress to be configured in addressDB.
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //
  //  Related Constants:
  //    - <c_MQTT_stepIdx_send>
  //    - <c_MQTT_stepName_send>
  //
  //  Related Functions:
  //    - <f_MQTT_step_loadTemplate_byIntIdx>
  //    - <f_MQTT_step_loadTemplate_byStringId>
  //
  //  Related functions:
  //    <f_EPTF_MQTT_stack_fromApp>
  ///////////////////////////////////////////////////////////
  function f_MQTT_step_send(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," ",pl_ptr));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);

    v_MQTT_msgToSend.transportParams.proto := {tcp := {}};
    v_MQTT_msgToSend.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," msg to send: ",v_MQTT_msgToSend));

    f_EPTF_MQTT_stack_fromApp(v_MQTT_msgToSend, v_MQTT_ctx);

    f_EPTF_SchedulerComp_refreshSnapshotTime();
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_init
  //
  //  Purpose:
  //    Function to initialize the addressDB
  //
  //  Parameters:
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
    v_MQTT_addressDB.data := {};
    v_MQTT_addressDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_addressDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_cleanUp
  //
  //  Purpose:
  //    Function to clean up the address database and release its resources
  //
  //  Parameters:
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
    v_MQTT_addressDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_addressDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_add
  //
  //  Purpose:
  //    Add a socket address to the addressDB and return its index if no such entry yet,
  //    or return its index if already exists
  //
  //  Parameters:
  //    p_addr  - *in* <Socket> - socket address
  //    p_idx   - *inout* *integer* - index of the address entry
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_add(in Socket p_addr, inout integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    p_idx := f_EPTF_MQTT_addressDB_lookUp(p_addr);

    if (p_idx == -1)
    {
      p_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_addressDB.queue);
      f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_addressDB.queue);
      f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding target address ", p_idx, " ", p_addr));
      f_EPTF_str2int_HashMap_Insert(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_addr), p_idx);
      v_MQTT_addressDB.data[p_idx] := p_addr;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_get
  //
  //  Purpose:
  //    Get a socket address from the addressDB by its index
  //
  //  Parameters:
  //    p_addr  - *inout* <Socket> - returned socket address
  //    p_idx   - *in* *integer* - index of the address to get
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_get(inout Socket p_addr, in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_idx < sizeof(v_MQTT_addressDB.data) and p_idx >=0)
    {
      p_addr := v_MQTT_addressDB.data[p_idx];
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(": "," couldn't get address at ", p_idx));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_lookUp
  //
  //  Purpose:
  //    Get the index of a socket entry in addressDB
  //
  //  Parameters:
  //    p_sock  - *in* <Socket> - socket address
  //
  //  Return Type:
  //    *integer* - The index of the socket entry
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_lookUp(in Socket p_sock)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_sock), vl_idx);
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_addressDB_Socket2String
  //
  //  Purpose:
  //    Converts a socket address in <Socket> type format to the string format "<IP address>:<port number>" to be used as hash key
  //
  //  Parameters:
  //    p_sock  - *inout* <Socket> - socket address
  //
  //  Return Type:
  //    *charstring* - Socket address in string format
  //
  //  Related Type:
  //    <MQTT_Address_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_addressDB_Socket2String(Socket p_sock)
  return charstring
  {
    return p_sock.hostName&":"&int2str(p_sock.portNumber);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_templateDB_init
  //
  //  Purpose:
  //    Initializes the *v_MQTT_templateDB* <MQTT_Template_DB> database by adding the templates given in <tsp_EPTF_MQTT_LGen_templates>
  //
  //  Related Type:
  //    <MQTT_Template_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_templateDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_templateDB.data := {};
    v_MQTT_templateDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_templateDB_Hash");

    for (var integer i:=0; i<sizeof(tsp_EPTF_MQTT_LGen_templates); i:=i+1) {
      f_EPTF_MQTT_templateDB_add(tsp_EPTF_MQTT_LGen_templates[i]);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_templateDB_add
  //
  //  Purpose:
  //    Adds a new element to the *v_MQTT_templateDB* <MQTT_Template_DB> database
  //
  //  Parameters:
  //    p_template - *in* <MQTT_Template> - the element to be added
  //
  //  Returns:
  //    *integer* - the index of the added element in the database
  //
  //  Related Type:
  //    <MQTT_Template_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_templateDB_add(in MQTT_Template p_template)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    if (f_EPTF_MQTT_templateDB_lookUp(p_template.id)!=-1)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " template is already added with id: ", p_template.id));
      return -1;
    }

    var integer v_idx := sizeof(v_MQTT_templateDB.data);
    v_MQTT_templateDB.data[v_idx] := p_template;
    f_EPTF_str2int_HashMap_Insert(v_MQTT_templateDB.hashRef, p_template.id, v_idx);

    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template was added with id: ", p_template.id, " at idx: ",v_idx));

    return v_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_templateDB_lookUp
  //
  //  Purpose:
  //    Gets the index of an <MQTT_Template> element in *v_MQTT_templateDB* <MQTT_Template_DB> database
  //
  //  Parameters:
  //    p_id - *in* *charstring* - the id of the <MQTT_Template>
  //
  //  Returns:
  //    *integer* - the index of the searched template in the database, or -1 if not found
  //
  //  Related Type:
  //    <MQTT_Template_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_templateDB_lookUp(in charstring p_id)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    if (not f_EPTF_str2int_HashMap_Find(v_MQTT_templateDB.hashRef, p_id, vl_idx))
    {
      vl_idx := -1;
    }
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_templateDB_get
  //
  //  Purpose:
  //    Retrieves an element from the *v_MQTT_templateDB* <MQTT_Template_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be retrieved
  //    p_pdu - *inout* <MQTT_v3_1_1_ReqResp> - the retrieved element
  //
  //  Related Type:
  //    <MQTT_Template_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_templateDB_get(in integer p_idx, inout MQTT_v3_1_1_ReqResp p_pdu)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_idx < sizeof(v_MQTT_templateDB.data) and p_idx >= 0)
    {
      f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template is fetched with idx: ", p_idx));
      p_pdu := v_MQTT_templateDB.data[p_idx].msg;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_templateDB_cleanUp
  //
  //  Purpose:
  //    Cleans up the reserved resources of the *v_MQTT_templateDB* <MQTT_Template_DB> database
  //
  //  Related Type:
  //    <MQTT_Template_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_templateDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_templateDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_templateDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_init
  //
  //  Purpose:
  //    Initializes the *v_MQTT_sessionDB* <MQTT_Session_DB> database and adds its hash to *v_MQTT_sessionDB.hashRef*
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
    v_MQTT_sessionDB.data := {};
    v_MQTT_sessionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_sessionDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_cleanUp
  //
  //  Purpose:
  //    Cleans up the reserved resources of the *v_MQTT_sessionDB* <MQTT_Session_DB> database
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
    v_MQTT_sessionDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_sessionDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_add
  //
  //  Purpose:
  //    Adds a new element to the *v_MQTT_sessionDB* <MQTT_Session_DB> database
  //
  //  Parameters:
  //    p_session - *in* <MQTT_Session> - the element to be added
  //
  //  Returns:
  //    *integer* - the index of the added element in the database
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_add(in MQTT_Session p_session)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_sessionDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_sessionDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding session ", v_idx, " ", p_session));

    v_MQTT_sessionDB.data[v_idx] := p_session;

    return v_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_setKey
  //
  //  Purpose:
  //    Sets the hash of the local socket address of a session by the session index
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the session index
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_setKey(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (f_EPTF_MQTT_sessionDB_get(p_idx, v_MQTT_session))
    {
      var Socket v_addr;
      f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_session.localAddrIdx);
      f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "setting key for sock: ",f_EPTF_MQTT_sessionDB_addrHash(v_addr)," idx: ",p_idx));
      f_EPTF_str2int_HashMap_Insert(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr), p_idx);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_lookUp
  //
  //  Purpose:
  //    Gets the index of a session in *v_MQTT_sessionDB* <MQTT_Session_DB> database by its socket address
  //
  //  Parameters:
  //    p_sock - *in* <Socket> - the socket address to look up
  //
  //  Returns:
  //    *integer* - the index of the added element in the database, or -1 if not found
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_lookUp(in Socket p_sock)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(p_sock), vl_idx);
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up sock: ",f_EPTF_MQTT_sessionDB_addrHash(p_sock)," idx: ",vl_idx));
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_get
  //
  //  Purpose:
  //    Retrieves a session's data from the *v_MQTT_sessionDB* <MQTT_Session_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be retrieved
  //    p_session - *inout* <MQTT_Session> - the retrieved session context
  //
  //  Returns:
  //    *boolean* - true if OK, false if no session element with this index
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_get(in integer p_idx, inout MQTT_Session p_session)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0)
    {
      p_session := v_MQTT_sessionDB.data[p_idx];
      return true;
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get session with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_check
  //
  //  Purpose:
  //    Checks if a session element exists in the *v_MQTT_sessionDB* <MQTT_Session_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be checked
  //
  //  Returns:
  //    boolean - true if the session exists
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_sessionDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_remove
  //
  //  Purpose:
  //    Removes an element from the *v_MQTT_sessionDB* <MQTT_Session_DB> database and releases its resources
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be removed
  //
  //  Related Type:
  //    <MQTT_Session_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "removing session with idx: ", p_idx));

    var Socket v_addr;
    f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_sessionDB.data[p_idx].localAddrIdx);
    f_EPTF_str2int_HashMap_Erase(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr));
    v_MQTT_sessionDB.data[p_idx] := c_MQTT_Session_init;
    f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_sessionDB.queue);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_sessionDB_addrHash
  //
  //  Purpose:
  //    Converts a socket address in <Socket> type format to the string format "<IP address>:<port number>" to be used as a hash key
  //
  //  Parameters:
  //    p_sock - *in* <Socket> - socket address
  //
  //  Returns:
  //    charstring - socket address in string format
  //
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_sessionDB_addrHash(in Socket p_sock)
  return charstring
  {
    return p_sock.hostName&":"&int2str(p_sock.portNumber);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_init
  //
  //  Purpose:
  //    Initializes the *v_MQTT_publishDB* <MQTT_Publish_DB> database and creates its hashmap 
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
    v_MQTT_publishDB.data := {};
    v_MQTT_publishDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_publishDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_cleanUp
  //
  //  Purpose:
  //    Cleans up the reserved resources of the *v_MQTT_publishDB* <MQTT_Publish_DB> database
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
    v_MQTT_publishDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_publishDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_add
  //
  //  Purpose:
  //    Adds a new element to the *v_MQTT_publishDB* <MQTT_Publish_DB> database
  //
  //  Parameters:
  //    p_pub - *in* <MQTT_Publish> - the element to be added
  //
  //  Returns:
  //    *integer* - the index of the added element in the database
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_add(in MQTT_Publish p_pub)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_publishDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_publishDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding publish ", v_idx, " ", p_pub));

    f_EPTF_str2int_HashMap_Insert(
      v_MQTT_publishDB.hashRef,
      f_EPTF_MQTT_publishDB_packetIdHash(p_pub.sessionIdx, p_pub.packetId),
      v_idx
    );

    v_MQTT_publishDB.data[v_idx] := p_pub;

    return v_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_lookUp
  //
  //  Purpose:
  //    Gets the index of an <MQTT_Publish> element in *v_MQTT_publishDB* <MQTT_Publish_DB> database by its session and packet id-s
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session id
  //    p_packetId - *in* *integer* - input packet id
  //
  //  Returns:
  //    *integer* - the index of the searched element in the database, or -1 if not found
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_lookUp(in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_publishDB.hashRef,
      f_EPTF_MQTT_publishDB_packetIdHash(p_sessionIdx, p_packetId),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up publish idx: ",vl_idx));
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_get
  //
  //  Purpose:
  //    Retrieves an element from the *v_MQTT_publishDB* <MQTT_Publish_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be retrieved
  //    p_pub - *inout* <MQTT_Publish> - the retrieved element
  //
  //  Returns:
  //     boolean - true: success, false: no element with the index p_idx
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_get(in integer p_idx, inout MQTT_Publish p_pub)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0)
    {
      p_pub := v_MQTT_publishDB.data[p_idx];
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get publish with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_check
  //
  //  Purpose:
  //    Checks if an element exists in the *v_MQTT_publishDB* <MQTT_Publish_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be checked
  //
  //  Returns:
  //     boolean - true: element present, false: element doesn't exists
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_publishDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid publish with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_remove
  //
  //  Purpose:
  //    Removes an element from the *v_MQTT_publishDB* <MQTT_Publish_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be Removed
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));

    if (f_EPTF_MQTT_publishDB_get(p_idx, v_MQTT_publish))
    {
      f_EPTF_str2int_HashMap_Erase(
        v_MQTT_publishDB.hashRef,
        f_EPTF_MQTT_publishDB_packetIdHash(
          v_MQTT_publish.sessionIdx,
          v_MQTT_publish.packetId)
      );

      v_MQTT_publishDB.data[p_idx] := c_MQTT_Publish_init;
      f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_publishDB.queue);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishDB_packetIdHash
  //
  //  Purpose:
  //    Converts a pair of session ID & packet ID to the string format "session_<sessionId>:id_<packetId>" to be used as a hash key
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session ID
  //    p_packetId - *in* *integer* - input packet ID
  //
  //  Returns:
  //    charstring - converted IDs
  //
  //  Related Type:
  //    <MQTT_Publish_DB>
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_init
  //
  //  Purpose:
  //    Initializes the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
    v_MQTT_subscriptionDB.data := {};
    v_MQTT_subscriptionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_subscriptionDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_cleanUp
  //
  //  Purpose:
  //    Cleans up the reserved resources of the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
    v_MQTT_subscriptionDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_subscriptionDB_Hash");
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_add
  //
  //  Purpose:
  //    Adds a new element to the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Parameters:
  //    p_sub - *in* <MQTT_Subscription> - the element to be added
  //
  //  Returns:
  //    *integer* - the index of the added element in the database
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_add(in MQTT_Subscription p_sub)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_subscriptionDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_subscriptionDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding subscription ", v_idx, " ", p_sub));

    if (ispresent(p_sub.request))
    {
      f_EPTF_str2int_HashMap_Insert(
        v_MQTT_subscriptionDB.hashRef,
        f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sub.sessionIdx, p_sub.request.packet_identifier),
        v_idx
      );

      // TODO: Only the first topic name will be used!
      // TODO: It is not checked if there is already a subscription for this!
      if (sizeof(p_sub.request.payload)>0)
      {
        var universal charstring vl_topicName := p_sub.request.payload[0].topic_filter;
        f_EPTF_str2int_HashMap_Insert(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_topicHash(p_sub.sessionIdx, f_unichar2charstr(vl_topicName)),
          v_idx
        );
      }
    }

    v_MQTT_subscriptionDB.data[v_idx] := p_sub;

    return v_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_setKey_packetId
  //
  //  Purpose:
  //    Insert an integer element to the subscription hashmap, key is composed from session ID and packet ID
  //
  //  Parameters:
  //    p_idx - *in* *integer* - data to be inserted
  //    p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
  //    p_packetId - *in* *integer* - input packet ID, used in hashmap key
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_setKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_str2int_HashMap_Insert(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
      p_idx
    );
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_removeKey_packetId
  //
  //    Removes the element from the subscription hashmap identified by its session ID and packet ID
  //
  //  Parameters:
  //    p_idx - *in* *integer* - NOT USED
  //    p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
  //    p_packetId - *in* *integer* - input packet ID, used in hashmap key
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_removeKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_str2int_HashMap_Erase(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId)
    );
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_lookUp_packetId
  //
  //  Purpose:
  //    Gets the index of an <MQTT_Subscription> element in *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //    by a session ID and packet ID
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
  //    p_packetId - *in* *integer* - input packet ID, used in hashmap key
  //
  //  Returns:
  //    *integer* - the index of the searched element in the database, or -1 if not found
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_lookUp_packetId(in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_lookUp_topicName
  //
  //  Purpose:
  //    Gets the index of an <MQTT_Subscription> element in *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //    by a session ID and packet ID
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session ID, used in hashmap key
  //    p_topicName - *in* *charstring* - input topic, used in hashmap key
  //
  //  Returns:
  //    *integer* - the index of the searched element in the database, or -1 if not found
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_lookUp_topicName(in integer p_sessionIdx, in charstring p_topicName)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_topicHash(p_sessionIdx, p_topicName),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
    return vl_idx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_get
  //
  //  Purpose:
  //    Retrieves an element from the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be retrieved
  //    p_sub - *inout* <MQTT_Subscription> - the retrieved element
  //
  //  Returns:
  //    *boolean* - true: success, false: element with this index doesn't exist
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_get(in integer p_idx, inout MQTT_Subscription p_sub)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0)
    {
      p_sub := v_MQTT_subscriptionDB.data[p_idx];
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get subscription with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_check
  //
  //  Purpose:
  //    Checks if an element at an index exists the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be checked
  //
  //  Returns:
  //    *boolean* - true: success, false: element at this index is not present
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_subscriptionDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
      return false;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_remove
  //
  //  Purpose:
  //    Removes an element from the *v_MQTT_subscriptionDB* <MQTT_Subscription_DB> database
  //
  //  Parameters:
  //    p_idx - *in* *integer* - the index of the element to be removed
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));

    if (f_EPTF_MQTT_subscriptionDB_get(p_idx, v_MQTT_subscription))
    {
      if (ispresent(v_MQTT_subscription.request))
      {
        f_EPTF_str2int_HashMap_Erase(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_packetIdHash(
            v_MQTT_subscription.sessionIdx,
            v_MQTT_subscription.request.packet_identifier)
        );
      }
      if (sizeof(v_MQTT_subscription.request.payload)>0)
      {
        var universal charstring vl_topicName := v_MQTT_subscription.request.payload[0].topic_filter;
        f_EPTF_str2int_HashMap_Erase(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_topicHash(v_MQTT_subscription.sessionIdx, f_unichar2charstr(vl_topicName))
        );
      }
      v_MQTT_subscriptionDB.data[p_idx] := c_MQTT_Subscription_init;
      f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_subscriptionDB.queue);
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_packetIdHash
  //
  //  Purpose:
  //    Converts a pair of session ID & packet ID to the string format "session_<sessionId>:id_<packetId>" to be used as hash key
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session ID
  //    p_packetId - *in* *integer* - input packet ID
  //
  //  Returns:
  //    charstring - converted IDs
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscriptionDB_topicHash
  //
  //  Purpose:
  //    Converts a pair of session ID & topic to the string format "session_<sessionId>:topic_<topic>" to be used as hash key
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - input session ID
  //    p_topic - *in* *charstring* - input topic string
  //
  //  Returns:
  //    charstring - converted IDs
  //
  //  Related Type:
  //    <MQTT_Subscription_DB>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscriptionDB_topicHash(in integer p_sessionIdx, in charstring p_topic)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"topic_"&p_topic;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_stack_fromApp
  //
  //  Purpose:
  //    This is the main entry point for the MQTT stack realization of the <EPTF_MQTT_LGen_CT>
  //    component that handles messages received from the application layer (e.g. FSMs)
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - message that enters into the stack (will be modified by the stack)
  //    p_ctx - *in* <MQTT_StepCtx> - pointers for the instances related to a particular simulated entity
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_stack_fromApp(inout EPTF_MQTT_PDU p_msg, in MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_ctx.sessionIdx >=0 and p_ctx.sessionIdx < sizeof(v_MQTT_sessionDB.data) )
    {
      f_EPTF_MQTT_session_fromApp(p_msg, p_ctx.sessionIdx);
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " sessionIdx is not valid [",p_ctx.sessionIdx,"]. Dropping message"));
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_stack_fromEnv
  //
  //  Purpose:
  //    This is the main entry point for the MQTT stack realization of the <EPTF_MQTT_LGen_CT>
  //    component that handles messages received from the environment layer (e.g. transport layer)
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - message that enters into the stack (will be modified by the stack)
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_stack_fromEnv(inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    var integer vl_sIdx := f_EPTF_MQTT_sessionDB_lookUp(p_msg.transportParams.localAddress);

    if (vl_sIdx >= 0)
    {
      // In case it's a publish response, there should be an ongoing publish transaction
      if (f_EPTF_MQTT_publishResponseType(p_msg.pdu))
      {
        var integer vl_packetId := f_EPTF_MQTT_publishResponsePacketId(p_msg.pdu);
		var integer vl_pubIdx := f_EPTF_MQTT_publishDB_lookUp(vl_sIdx, vl_packetId);

		if (vl_pubIdx >= 0)
		{
		  f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish transaction found for incoming message: ", vl_pubIdx));
		  f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
		}
		else
	    {
    	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no ongoing publish transaction found for incoming message (ignoring): ", p_msg));
	    }
      }
      else
      {
        f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session found for incoming message: ",vl_sIdx));
        f_EPTF_MQTT_session_fromEnv(p_msg, vl_sIdx);
      }
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no session found for incoming message (ignoring): ", p_msg));
    }
  }

  /*****************************************************************
    @startuml EPTF_MQTT_LGen_Functions.MQTT_Session.jpg     
      [*] --> DISCONNECTED
      DISCONNECTED --> CONNECTING: appIn: connect_msg
      CONNECTING --> CONNECTED: envIn: connack_accepted
      CONNECTING --> DISCONNECTED: envIn: connack_refused
      CONNECTED --> DISCONNECTED: appIn: disconnect
      CONNECTED --> CONNECTED: appIn: publish
      CONNECTED --> CONNECTED: envIn: publish
      CONNECTED --> CONNECTED: appIn: subscribe
      CONNECTED --> CONNECTED: appIn: unsubscribe
      CONNECTED --> CONNECTED: envIn: suback
      CONNECTED --> CONNECTED: envIn: unsuback
      CONNECTED --> CONNECTED: envIn: pingresp
      CONNECTED --> CONNECTED: T_keepalive
    @enduml
  ******************************************************************/
  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_fromApp
  //
  //  Purpose:
  //    This is the entry point for an MQTT session fsm handling events coming from the application layer (e.g. client/broker FSMs)
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - next transport message to be sent
  //    p_sIdx - *in* *integer* - session index
  //
  //  Related Types:
  //    <MQTT_Session>
  //
  // FSM Diagram of a MQTT session:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Session.jpg)
  //
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_fromApp(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_sIdx < 0 or p_sIdx >= sizeof(v_MQTT_sessionDB.data[p_sIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid session index: ", p_sIdx));
      return;
    }
    
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",p_sIdx,":",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));

    f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);

    // State: DISCONNECTED
    if (v_MQTT_sessionDB.data[p_sIdx].state == DISCONNECTED)
    {
      // appIn: connect
      if (ischosen(p_msg.pdu.connect_msg))
      {
		// At this point, the local address should be filled in correctly
        f_EPTF_MQTT_sessionDB_setKey(p_sIdx);

	//p_msg.pdu.connect_msg.payload.client_identifier.stringItem := v_MQTT_sessionDB.data[p_sIdx].clientId;
        //p_msg.pdu.connect_msg.payload.client_identifier.stringLength := lengthof(v_MQTT_sessionDB.data[p_sIdx].clientId);
        p_msg.pdu.connect_msg.payload.client_identifier := v_MQTT_sessionDB.data[p_sIdx].clientId;
        v_MQTT_sessionDB.data[p_sIdx].keepAliveTime := int2float(p_msg.pdu.connect_msg.keep_alive-1);

        f_EPTF_MQTT_session_send(p_sIdx, p_msg);

        f_EPTF_MQTT_session_setState(p_sIdx, CONNECTING);
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	// State: CONNECTED
    else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
    {
      // appIn: disconnect
      if (ischosen(p_msg.pdu.disconnect_msg))
      {
        // cancel T keepalive
        f_EPTF_MQTT_session_cancelT_keepalive(p_sIdx);

		f_EPTF_MQTT_session_send(p_sIdx, p_msg);

        f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
	  }
	  if (ischosen(p_msg.pdu.publish))
      {
        if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
        {
          p_msg.pdu.publish.packet_identifier := omit; //f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

  	 	  f_EPTF_MQTT_session_send(p_sIdx, p_msg);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
        }
        else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
        {
          p_msg.sessionIdx := p_sIdx;
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);

          p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

		  // create new Publish QoS1 Orig
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := ORIG;
		  vl_publish.state.qos1 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
        }
        else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
        {
          p_msg.sessionIdx := p_sIdx;
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);

          p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

		  // create new Publish QoS2 Orig
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := ORIG;
		  vl_publish.state.qos2 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
        }
        else
        {
          f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
        }
	  }
	  if (ischosen(p_msg.pdu.subscribe))
      {
        // Fill in message
        p_msg.sessionIdx := p_sIdx;
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
        p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

        // Create new Subscription
        var MQTT_Subscription vl_sub := c_MQTT_Subscription_init;
        vl_sub.sessionIdx := p_sIdx;
        vl_sub.request := p_msg.pdu.subscribe;
        vl_sub.state := UNSUBSCRIBED;

        var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_add(vl_sub);
        f_EPTF_MQTT_session_registerSubscription(p_sIdx, vl_subIdx);

        // subscription.send
        f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);

        f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
	  }
	  if (ischosen(p_msg.pdu.unsubscribe))
      {
        // Fill in message
        p_msg.sessionIdx := p_sIdx;
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
        p_msg.pdu.unsubscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

        // Look up corresponding subscription
        //if (sizeof(p_msg.pdu.unsubscribe.payload[0])>0)
        if (sizeof(p_msg.pdu.unsubscribe.payload.topic_filter)>0)
        {
          //var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload[0].topic_filter);
          var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload.topic_filter[0]);
          var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_lookUp_topicName(p_sIdx, vl_topicName);

          // subOut.send
          if (f_EPTF_MQTT_subscriptionDB_check(vl_subIdx)) {
            f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
          }
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_fromEnv
  //
  //  Purpose:
  //    This is the entry point for an MQTT session fsm from the environment layer (e.g. transport layer)
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - transport message received
  //    p_sIdx - *in* *integer* - session index
  //
  //  Related Types:
  //    <MQTT_Session>
  //
  // FSM Diagram of a MQTT session:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Session.jpg)
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_sIdx < 0 or p_sIdx >= sizeof(v_MQTT_sessionDB.data[p_sIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid session index: ", p_sIdx));
      return;
    }
    
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));

    f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);

    // State: CONNECTING
    if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTING)
    {
      // envIn: connack
      if (ischosen(p_msg.pdu.connack))
      {
        if (p_msg.pdu.connack.connect_return_code == 0)
        {
          // startT keepalive
          f_EPTF_MQTT_session_startT_keepalive(p_sIdx, v_MQTT_sessionDB.data[p_sIdx].keepAliveTime);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_CONNACK_Accepted,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else
        {
          f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_CONNACK_Refused,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	// State: CONNECTED
    else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
    {
      // envIn: suback
      if (ischosen(p_msg.pdu.suback))
      {
        var integer vl_subIdx :=
          f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.suback.packet_identifier);

        f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
	  }
      // envIn: unsuback
      else if (ischosen(p_msg.pdu.unsuback))
      {
        var integer vl_subIdx :=
          f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.unsuback.packet_identifier);

        f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
	  }
      // envIn: publish
      else if (ischosen(p_msg.pdu.publish))
      {
        if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
        {
          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
        {
		  // Send PUBACK

		  v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBACK(p_msg.pdu.publish.packet_identifier));
		  f_EPTF_MQTT_session_send(p_sIdx, v_MQTT_msgToSend);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
        {
          // create new Publish QoS2 TERM
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := TERM;
		  vl_publish.state.qos2 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
        }
        else
        {
          f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
        }
	  }
      // envIn: ping response
      else if (ischosen(p_msg.pdu.pingresp))
      {
        if (v_MQTT_sessionDB.data[p_sIdx].reportPingResponse)
        {
          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PING_Response,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_keepalive
  //
  //  Purpose:
  //    Handles the T_keepalive timer event in the <MQTT_Session> FSM
  //
  //  Parameters:
  //    pl_action - *inout* <EPTF_ScheduledAction> - the scheduled action <>
  //    pl_eventIndex - *in* *integer* - eveny index in the scheduler
  //
  //  Returns:
  //    *boolean* - true <always>
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_keepalive(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    var integer vl_sIdx := pl_action.actionId[0];
    v_MQTT_sessionDB.data[vl_sIdx].keepaliveTimer := -1;

    if (v_MQTT_sessionDB.data[vl_sIdx].state == CONNECTED)
    {
      // Send ping request
      v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_pingReq);
      f_EPTF_MQTT_session_send(vl_sIdx, v_MQTT_msgToSend);

      // startT keepAlive
      f_EPTF_MQTT_session_startT_keepalive(vl_sIdx, v_MQTT_sessionDB.data[vl_sIdx].keepAliveTime);

      f_EPTF_MQTT_session_setState(vl_sIdx, CONNECTED);
    }

    return true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_startT_keepalive
  //
  //  Purpose:
  //    Starts the T_keepalive timer for an <MQTT_Session> FSM
  //
  //  Parameters:
  //    pl_sIdx - *in* *integer* - session index (?)
  //    pl_time - *in* *float* - value of the keepalive timer
  //
  //  Returns:
  //    *boolean* - true: succesful , false: 0 or negative timer value
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_startT_keepalive(in integer pl_sIdx, in float pl_time)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (pl_time <= 0.0) { return false; }

    var boolean retval;
    var EPTF_ActionId vl_actionId;

    vl_actionId[0] := pl_sIdx;

    retval := f_EPTF_SchedulerComp_scheduleAction(
      f_EPTF_SchedulerComp_snapshotTime() + pl_time,
      refers(f_EPTF_MQTT_session_keepalive),
      vl_actionId,
      v_MQTT_sessionDB.data[pl_sIdx].keepaliveTimer
    );

    return retval;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_cancelT_keepalive
  //
  //  Purpose:
  //    Cancels the T_keepalive timer of an <MQTT_Session> FSM
  //
  //  Parameters:
  //    pl_sessionIdx - *in* *integer* - session index
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_cancelT_keepalive(in integer pl_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer >= 0)
    {
      if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer))
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
      }
      v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer := -1;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_getNextPacketId
  //
  //  Purpose:
  //    Sets the value of the packet id field in the next message to be sent in a session
  //
  //  Parameters:
  //    p_ctx - *in* <MQTT_StepCtx> - Pointer of the context embedding the session id. 
  //
  //  Returns:
  //    *integer* - packet id for the next message
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_getNextPacketId(in MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    if (v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId < 65535)
    {
      v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId + 1;
    }
    else
    {
      v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := 0;
    }
    return v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_setState
  //
  //  Purpose:
  //    Sets a new state for an <MQTT_Session> FSM
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - index of session
  //    p_nextState - *in* <MQTT_Session_State> - new state of the state machine
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_setState(in integer p_sessionIdx, in MQTT_Session_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_sessionDB.data[p_sessionIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("session [", p_sessionIdx,":",v_MQTT_sessionDB.data[p_sessionIdx].clientId,"] next state: ", v_MQTT_sessionDB.data[p_sessionIdx].state));
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_send
  //
  //  Purpose:
  //    Sends a message to the transort layer with type <EPTF_MQTT_PDU> and the provided session index
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //    p_msg - *intout* <EPTF_MQTT_PDU> - the message to be sent
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_send(in integer p_sessionIdx, inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sessionIdx].remoteAddrIdx);

	p_msg.transportParams.proto := {tcp := {}};
    p_msg.sessionIdx := p_sessionIdx;

	// envOut.send
    f_EPTF_MQTT_LGen_send(p_msg);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_registerSubscription
  //
  //  Purpose:
  //    Adds a subscription index to an <MQTT_Session>
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //    p_subIdx - *in* *integer* - subscription index to add
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_registerSubscription(in integer p_sessionIdx, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // Note, we don't check if it is already there
    v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs)]
      := p_subIdx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_deregisterSubscription
  //
  //  Purpose:
  //    Removes a subscription index from <MQTT_Session>
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //    p_subIdx - *in* *integer* - subscription index to remove
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_deregisterSubscription(in integer p_sessionIdx, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // This is slow, it's only acceptable if the assumption that a session has only a small number
    // of subscriptions
    var EPTF_IntegerList vl_new := {};
    for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
    {
      if (v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i] != p_subIdx) {
        vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i];
      }
    }
    v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs := vl_new;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_registerPublish
  //
  //  Purpose:
  //    Adds a publish index to an <MQTT_Session>
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //    p_subIdx - *in* *integer* - publish index to add
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_registerPublish(in integer p_sessionIdx, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // Note, we don't check if it is already there
    v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs)]
      := p_pubIdx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_deregisterPublish
  //
  //  Purpose:
  //    Removes a publish index from an <MQTT_Session>
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //    p_subIdx - *in* *integer* - publish index to add
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_deregisterPublish(in integer p_sessionIdx, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // This is slow, it's only acceptable if the assumption that a session has only a small number
    // of concurrent publish(s)
    var EPTF_IntegerList vl_new := {};
    for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs); i:=i+1)
    {
      if (v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i] != p_pubIdx) {
        vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i];
      }
    }
    v_MQTT_sessionDB.data[p_sessionIdx].publishRefs := vl_new;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_session_remove
  //
  //  Purpose:
  //    Releases all resources related to an <MQTT_Session> and removes it from the <MQTT_Session_DB>
  //
  //  Parameters:
  //    p_sessionIdx - *in* *integer* - session index
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_session_remove(in integer p_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_session_setState(p_sessionIdx, REMOVING);

    f_EPTF_MQTT_session_cancelT_keepalive(p_sessionIdx);

    // Remove subscriptions, remove publications
	for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
	{
	  f_EPTF_MQTT_subscription_remove(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i]);
	}

	// Derefer FSM pointers
	f_EPTF_LGenBase_setAppDataItemOfFsmCtx(
      v_MQTT_sessionDB.data[p_sessionIdx].eIdx,
	  v_MQTT_sessionDB.data[p_sessionIdx].fsmIdx,
	  v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1
	);

	// Clean up DB
    f_EPTF_MQTT_sessionDB_remove(p_sessionIdx);
  }

  /*****************************************************************
    @startuml EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg     
      [*] --> UNSUBSCRIBED
      UNSUBSCRIBED --> SUBSCRIBING: sessionIn: subscribe
      SUBSCRIBED --> UNSUBSCRIBING: sessionIn: unsubscribe
      SUBSCRIBING --> SUBSCRIBED: envIn: suback_accepted
      SUBSCRIBING --> UNSUBSCRIBED: envIn: suback_refused
      UNSUBSCRIBING --> UNSUBSCRIBED: envIn: unsuback
    @enduml
  ******************************************************************/
  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_fromSession
  //
  //  Purpose:
  //    Implements part of the <MQTT_Subscription> FSM that handles the events coming from the <MQTT_Session>
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - message that
  //    p_subIdx - *in* *integer* - subscription index
  //
  //  Related Types:
  //    <MQTT_Subscription>
  //
  // FSM Diagram of a MQTT subscription:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg)
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_subIdx < 0 or p_subIdx >= sizeof(v_MQTT_subscriptionDB.data[p_subIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid subscription index: ", p_subIdx));
      return;
    }
    
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));

    // State: UNSUBSCRIBED
    if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBED)
    {
      // sessionIn: subscribe
      if (ischosen(p_msg.pdu.subscribe))
      {
        f_EPTF_MQTT_subscription_startT_watchdog(p_subIdx, tsp_EPTF_MQTT_SUBSCRIBE_responseWatchdog);
	
        // envOut.send
        f_EPTF_MQTT_LGen_send(p_msg);

        f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBING);
	  }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
    // State: SUBSCRIBED
    else if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBED)
    {
      // sessionIn: unsubscribe
      if (ischosen(p_msg.pdu.unsubscribe))
      {
        f_EPTF_MQTT_subscription_startT_watchdog(p_subIdx, tsp_EPTF_MQTT_SUBSCRIBE_responseWatchdog);

        f_EPTF_MQTT_subscriptionDB_setKey_packetId(
          p_subIdx, v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_msg.pdu.unsubscribe.packet_identifier
        );

        f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBING);

        // envOut.send
        f_EPTF_MQTT_LGen_send(p_msg);
	  }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_fromEnv
  //
  //  Purpose:
  //    Implements part of the <MQTT_Subscription> FSM that handles the events coming from the environment
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - received transport message
  //    p_subIdx - *in* *integer* - subscription index
  //
  //  Related Types:
  //    <MQTT_Subscription>
  //
  // FSM Diagram of a MQTT subscription:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Subscription.jpg)
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_subIdx < 0 or p_subIdx >= sizeof(v_MQTT_subscriptionDB.data[p_subIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid subscription index: ", p_subIdx));
      return;
    }
    
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));

    // State: SUBSCRIBING
    if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBING)
    {
      // envIn: suback
      if (ischosen(p_msg.pdu.suback) and sizeof(p_msg.pdu.suback.payload.return_code)>0)
      {
        f_EPTF_MQTT_subscription_cancelT_watchdog(p_subIdx);
      
        if (
	  p_msg.pdu.suback.payload.return_code[0] == 0 or
	  p_msg.pdu.suback.payload.return_code[0] == 1 or
	  p_msg.pdu.suback.payload.return_code[0] == 2
	)
        {
          f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_SUBACK_Accepted,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
            {}
          );
        }
        else
        {
          f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_SUBACK_Refused,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
            {}
          );

          // Removing subscription
          f_EPTF_MQTT_subscription_remove(p_subIdx);
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
    // State: UNSUBSCRIBING
    else if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBING)
    {
      // envIn: unsuback
      if (ischosen(p_msg.pdu.unsuback))
      {
        f_EPTF_MQTT_subscription_cancelT_watchdog(p_subIdx);
	
        f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBED);

        f_EPTF_MQTT_dispatchEvent(
          c_MQTT_eventIdx_UNSUBACK,
          v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
          v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
          {}
        );

        // Removing unsuback key
        f_EPTF_MQTT_subscriptionDB_removeKey_packetId(
          p_subIdx,
          v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx,
          p_msg.pdu.unsuback.packet_identifier
        );

        // Removing subscription
        f_EPTF_MQTT_subscription_remove(p_subIdx);
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_watchdog
  //
  //  Purpose:
  //    Handles the T_watchdog event in the <MQTT_Subscribe> FSM
  //
  //  Parameters:
  //    pl_action - *in* <EPTF_ScheduledAction> - scheduled action 
  //    pl_eventIndex - *in* *integer* - event index
  //
  //  Returns:
  //    *boolean* - true <always>
  //
  //  Related Types:
  //    <MQTT_Subscribe>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_watchdog(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    var integer vl_pIdx := pl_action.actionId[0];
    v_MQTT_subscriptionDB.data[vl_pIdx].watchdogTimer := -1;

    f_EPTF_MQTT_dispatchEvent(
      c_MQTT_eventIdx_SUBSCRIBE_Timeout,
      v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[vl_pIdx].sessionIdx].eIdx,
      v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[vl_pIdx].sessionIdx].fsmIdx,
      {}
    );

    f_EPTF_MQTT_subscription_remove(vl_pIdx);

    return true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_startT_watchdog
  //
  //  Purpose:
  //    Start a T_watchdog timer in the <MQTT_Subscription> FSM
  //
  //  Parameters:
  //    pl_sIdx - *in* *integer* - subscription transaction index
  //    pl_time - *in* *float* - time from now when the action takes place
  //
  //  Returns:
  //    *boolean* - true: action scheduled , false: 0 or negative timer value is passed
  //
  //  Related Types:
  //    <MQTT_Subscription>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_startT_watchdog(in integer pl_sIdx, in float pl_time)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (pl_time <= 0.0) { return false; }

    var boolean retval;
    var EPTF_ActionId vl_actionId;

    vl_actionId[0] := pl_sIdx;

    retval := f_EPTF_SchedulerComp_scheduleAction(
      f_EPTF_SchedulerComp_snapshotTime() + pl_time,
      refers(f_EPTF_MQTT_subscription_watchdog),
      vl_actionId,
      v_MQTT_subscriptionDB.data[pl_sIdx].watchdogTimer //sets the event index of the next publish watchdog action
    );

    return retval;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_cancelT_watchdog
  //
  //  Purpose:
  //    Cancels the T_watchdog timer of an <MQTT_Subscription> FSM
  //
  //  Parameters:
  //    pl_sIdx - *in* *integer* - index of the subscription FSM in subscriptionDB
  //
  //  Related Types:
  //    <MQTT_Subscription>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_cancelT_watchdog(in integer pl_sIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_subscriptionDB.data[pl_sIdx].watchdogTimer >= 0)
    {
      if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_subscriptionDB.data[pl_sIdx].watchdogTimer))
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
      }
      v_MQTT_subscriptionDB.data[pl_sIdx].watchdogTimer := -1;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_setState
  //
  //  Purpose:
  //    Sets a new state of a <MQTT_Subscription> FSM
  //
  //  Parameters:
  //    p_subIdx - *in* *integer* - subscription index
  //    p_nextState - *in* <MQTT_Subscription_State> - new state in the state machine
  //
  //  Related Types:
  //    <MQTT_Subscription>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_setState(in integer p_subIdx, in MQTT_Subscription_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_subscriptionDB.data[p_subIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("subscription next state: ", v_MQTT_subscriptionDB.data[p_subIdx].state));
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_subscription_remove
  //
  //  Purpose:
  //    Removing resources related to <MQTT_Subscription> FSM
  //
  //  Parameters:
  //    p_subIdx - *in* *integer* - subscription index
  //
  //  Related Types:
  //    <MQTT_Subscription>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_subscription_remove(in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_subscription_setState(p_subIdx, REMOVING);

    // Remove pointers from session
    f_EPTF_MQTT_session_deregisterSubscription(v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_subIdx);

	// Clean up DB
    f_EPTF_MQTT_subscriptionDB_remove(p_subIdx);
  }

  /*****************************************************************
    @startuml EPTF_MQTT_LGen_Functions.MQTT_Publish_qos1.jpg
      [*] --> CREATED
      CREATED --> PUBLISHED: sessionIn: publish
	  PUBLISHED --> ACKNOWLEDGED: envIn: puback
    @enduml

    @startuml EPTF_MQTT_LGen_Functions.MQTT_Publish_qos2.jpg
      [*] --> CREATED
      CREATED --> PUBLISHED: sessionIn: publish
	  PUBLISHED --> RELEASED: envIn: pubrec (orig)
	  PUBLISHED --> RECEIVED: envIn: pubrec (term)
	  RELEASED --> COMPLETE: envIn: pubcomp (orig)
	  RECEIVED --> COMPLETE: envIn: pubcomp (term)
    @enduml
  ******************************************************************/
  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_fromSession
  //
  //  Purpose:
  //    Handles a publish transaction in case of QoS 1 and QoS 2 fsm requested by the application layer (e.g. client/broker FSMs)
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - transport message 
  //    p_pubIdx - *in* *integer* - publish index
  //
  //  Related Types:
  //    <MQTT_Publish>
  //
  // FSM Diagram of a MQTT publish:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Publish_qos1.jpg)
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Publish_qos2.jpg)
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_pubIdx < 0 or p_pubIdx >= sizeof(v_MQTT_publishDB.data[p_pubIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid publish index: ", p_pubIdx));
      return;
    }

    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));

    // QoS 1 Originating
    if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: CREATED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == CREATED)
      {
        // sessionIn: publish
        if (ischosen(p_msg.pdu.publish))
        {
          // start T response watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // envOut.send
          f_EPTF_MQTT_LGen_send(p_msg);

          f_EPTF_MQTT_publish_setState(p_pubIdx, {qos1 := PUBLISHED});
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
    }
    // QoS 1 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " NOP"));
	}
    // QoS 2 Originating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: CREATED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
      {
        // sessionIn: publish
        if (ischosen(p_msg.pdu.publish))
        {
          // start T response watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // envOut.send
          f_EPTF_MQTT_LGen_send(p_msg);

          f_EPTF_MQTT_publish_setState(p_pubIdx, {qos2 := PUBLISHED});
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
	}
    // QoS 2 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_fromEnv
  //
  //  Purpose:
  //    Handles PUBLISH and publish response (PUB REC/PUB REL/PUB COMP) messages received from the peer (i.e. from transport layer) based on its QoS and originator
  //
  //  Parameters:
  //    p_msg - *inout* <EPTF_MQTT_PDU> - received transport message
  //    p_pubIdx - *in* *integer* - publish index
  //
  //  Related Types:
  //    <MQTT_Publish>
  //
  // FSM Diagram of a MQTT publish:
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Publish_qos1.jpg)
  //   (see EPTF_MQTT_LGen_Functions.MQTT_Publish_qos2.jpg)
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_pubIdx < 0 or p_pubIdx >= sizeof(v_MQTT_publishDB.data[p_pubIdx]))
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " invalid publish index: ", p_pubIdx));
      return;
    }
    
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));

    // QoS 1 Originating
    if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: PUBLISHED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == PUBLISHED)
      {
        // sessionIn: PUBACK
        if (ischosen(p_msg.pdu.puback))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBACK,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // next state: ACKNOWLEDGED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos1:=ACKNOWLEDGED });

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
    }
    // QoS 1 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled, QoS1 TERM should be handled on session level: Ignoring:", p_msg));
	}
    // QoS 2 Originating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: PUBLISHED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == PUBLISHED)
      {
        // sessionIn: PUBREC
        if (ischosen(p_msg.pdu.pubrec))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // envOut.send PUBREL
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREL(p_msg.pdu.pubrec.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

		  // start publish watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // next state: RELEASED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RELEASED });

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBREC,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }
        }
      }
      // State: RELEASED
      else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RELEASED)
      {
        // sessionIn: PUBCOMP
        if (ischosen(p_msg.pdu.pubcomp))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // next state: COMPLETE
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBCOMP,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2));	}
	}
    // QoS 2 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      // State: CREATED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
      {
        // envIn: PUBLISH
        if (ischosen(p_msg.pdu.publish))
        {
          // envOut.send PUBREC
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREC(p_msg.pdu.publish.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

		  // start publish watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // next state: RECEIVED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RECEIVED });

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
            {}
          );
        }
      }
      // State: RECEIVED
      else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RECEIVED)
      {
        // envIn: PUBREL
        if (ischosen(p_msg.pdu.pubrel))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // report event if enabled	  
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse)
	  {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBREL,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // envOut.send PUBCOMP
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBCOMP(p_msg.pdu.pubrel.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

          // next state: COMPLETE
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2));	}
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_watchdog
  //
  //  Purpose:
  //    Handles the T_watchdog event in the <MQTT_Publish> FSM
  //
  //  Parameters:
  //    pl_action - *in* <EPTF_ScheduledAction> - scheduled action 
  //    pl_eventIndex - *in* *integer* - event index
  //
  //  Returns:
  //    *boolean* - true <always>
  //
  //  Related Types:
  //    <MQTT_Publish>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_watchdog(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    var integer vl_pIdx := pl_action.actionId[0];
    v_MQTT_publishDB.data[vl_pIdx].watchdogTimer := -1;

    f_EPTF_MQTT_dispatchEvent(
      c_MQTT_eventIdx_PUBLISH_Timeout,
      v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].eIdx,
      v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].fsmIdx,
      {}
    );

    f_EPTF_MQTT_publish_remove(vl_pIdx);

    return true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_startT_watchdog
  //
  //  Purpose:
  //    Start a T_watchdog timer in the <MQTT_Publish> FSM
  //
  //  Parameters:
  //    pl_pIdx - *in* *integer* - publish transaction index
  //    pl_time - *in* *float* - time from now when the action takes place
  //
  //  Returns:
  //    *boolean* - true: action scheduled , false: 0 or negative timer value is passed
  //
  //  Related Types:
  //    <MQTT_Publish>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_startT_watchdog(in integer pl_pIdx, in float pl_time)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (pl_time <= 0.0) { return false; }

    var boolean retval;
    var EPTF_ActionId vl_actionId;

    vl_actionId[0] := pl_pIdx;

    retval := f_EPTF_SchedulerComp_scheduleAction(
      f_EPTF_SchedulerComp_snapshotTime() + pl_time,
      refers(f_EPTF_MQTT_publish_watchdog),
      vl_actionId,
      v_MQTT_publishDB.data[pl_pIdx].watchdogTimer //sets the event index of the next publish watchdog action
    );

    return retval;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_cancelT_watchdog
  //
  //  Purpose:
  //    Cancels the T_watchdog timer of an <MQTT_Publish> FSM
  //
  //  Parameters:
  //    pl_publishIdx - *in* *integer* - index of the publish FSM in publishDB
  //
  //  Related Types:
  //    <MQTT_Publish>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_cancelT_watchdog(in integer pl_publishIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer >= 0)
    {
      if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer))
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
      }
      v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer := -1;
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_setState
  //
  //  Purpose:
  //    Sets the new state of a <MQTT_Publish> FSM
  //
  //  Parameters:
  //    p_pubIdx - *in* *integer* - index of the publish FSM in publishDB
  //    p_nextState - *in* <MQTT_Publish_State> - new state of the state machine
  //
  //  Related Types:
  //    <MQTT_Publish>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_setState(in integer p_pubIdx, in MQTT_Publish_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_publishDB.data[p_pubIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("publish next state: ", v_MQTT_publishDB.data[p_pubIdx].state));
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publish_remove
  //
  //  Purpose:
  //    Releaseing resources of an <MQTT_Publish> FSM
  //
  //  Parameters:
  //    p_pubIdx - *in* *integer* - publish index
  //
  //  Related Types:
  //    <MQTT_Publish>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publish_remove(in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_publish_setState(p_pubIdx, { removing := true });

    // cancel timers if running
    f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

    // Remove pointers from session
    f_EPTF_MQTT_session_deregisterPublish(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, p_pubIdx);

	// Clean up DB
    f_EPTF_MQTT_publishDB_remove(p_pubIdx);
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_setStepCtx
  //
  //  Purpose:
  //    Sets the entity and FSM indexes in the MQTT step context
  //
  //  Parameters:
  //    pl_ptr - *in* <EPTF_LGenBase_TestStepArgs> - test step args
  //    p_ctx - *inout* <MQTT_StepCtx> - returns MQTT step context
  //
  //  Return Type:
  //    *boolean* - was the operation successful?
  //
  //  Related Function:
  //    <f_MQTT_step_init>
  //    
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_setStepCtx(in EPTF_LGenBase_TestStepArgs pl_ptr, inout MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (not f_EPTF_MQTT_isFsmInitialized(pl_ptr.eIdx, pl_ptr.refContext.fCtxIdx, p_ctx.sessionIdx))
    {
      f_EPTF_MQTT_Logging_WARNING(%definitionId &
        ": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
      return false;
    }

    p_ctx.eIdx := pl_ptr.eIdx;
    p_ctx.fsmIdx := pl_ptr.refContext.fCtxIdx;

    return true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_setCtx
  //
  //  Purpose:
  //    Sets the instance pointers of <MQTT_StepCtx> to the related instances of a simulated device (entity) and FSM
  //
  //  Parameters:
  //    p_eIdx - *in* *integer* - entity index
  //    p_fsmIdx - *in* *integer* - FSM index
  //    p_ctx - *inout* MQTT_StepCtx - returned context value
  //
  //  Related Functions:
  //    <f_MQTT_step_init>
  //
  //  Related Types:
  //    <MQTT_StepCtx>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_setCtx(in integer p_eIdx, in integer p_fsmIdx, inout MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (not f_EPTF_MQTT_isFsmInitialized(p_eIdx, p_fsmIdx, p_ctx.sessionIdx))
    {
      f_EPTF_MQTT_Logging_WARNING(%definitionId &
        ": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
      return false;
    }

    p_ctx.eIdx := p_eIdx;
    p_ctx.fsmIdx := p_fsmIdx;

    return true;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_isFsmInitialized
  //
  //  Purpose:
  //    Checks if an FSM instance has already been initialized
  //
  //  Parameters:
  //    p_eIdx - *in* *integer* - entity index
  //    p_fsmIdx - *in* *integer* - FSM index
  //    pl_sessionIdx - *inout* *integer* - returns session index if initialized,  -1 if not
  //  
  //  Returns:
  //    boolean - true: initialized, false: not initialized
  //
  //  Related Types:
  //    <MQTT_Session>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_isFsmInitialized(in integer pl_eIdx, in integer pl_fsmIdx, inout integer pl_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    pl_sessionIdx := -1;

    var EPTF_IntegerList vl_appData := f_EPTF_LGenBase_getAppDataListOfFsmCtx(pl_eIdx, pl_fsmIdx, v_MQTT_bIdx);

    if (c_MQTT_AppData_sessionIdx < sizeof(vl_appData))
    {
      pl_sessionIdx := vl_appData[c_MQTT_AppData_sessionIdx];
    }

    return -1 < pl_sessionIdx and sizeof(v_MQTT_sessionDB.data) > pl_sessionIdx;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_getIntValue
  //
  //  Purpose:
  //    Retreives an element of an <EPTF_IntegerList> if it exists
  //
  //  Parameters:
  //    pl_intList - *in* <EPTF_IntegerList> - list of integers
  //    pl_number - *in* *integer* - index of the integer to be retrieved
  //    pl_value - *inout* *integer* - returns the value of the retrieved integer
  //  
  //  Returns:
  //    *boolean* - true if the element exists in the integer list
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_getIntValue(
    in EPTF_IntegerList pl_intList,
    in integer pl_number,
    inout integer pl_value)
  return boolean
  {
    if (sizeof(pl_intList) > pl_number)
    {
      pl_value := pl_intList[pl_number];
      return true;
    }
    return false;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_Logging_VERBOSE
  //
  //  Purpose:
  //    Logging functions for the VERBOSE log level
  //
  //  Parameters:
  //    pl_message - *in* *charstring* - string to be logged
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_Logging_VERBOSE(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
    	f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUGV});
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_Logging_DEBUG
  //
  //  Purpose:
  //    Logging functions for the DEBUG log level
  //
  //  Parameters:
  //    pl_message - *in* *charstring* - string to be logged
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_Logging_DEBUG(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
	    f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUG});
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_Logging_WARNING
  //
  //  Purpose:
  //    Logging functions for the WARNING log level
  //
  //  Parameters:
  //    pl_message - *in* *charstring* - string to be logged
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_Logging_WARNING(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
	    f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_WARNING});
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_Logging_ERROR
  //
  //  Purpose:
  //    Logging functions for the ERROR log level
  //
  //  Parameters:
  //    pl_message - *in* *charstring* - string to be logged
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_Logging_ERROR(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
    	f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_ERROR});
	}
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_dispatchEvent
  //
  //  Purpose:
  //    Dispatches events to an entity/fsm
  //
  //  Parameters:
  //    pl_eventIdx - *in* *integer* - index of the event
  //    pl_eIdx - *in* *integer* - the index of the entity
  //    pl_fsmCtx - *in* *integer* - the index of FSM
  //    pl_reportedArgs - *in* <EPTF_IntegerList> - additional arguments to be reported to the entity/FSM
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_dispatchEvent(in integer pl_eventIdx, in integer pl_eIdx, in integer pl_fsmCtx, in EPTF_IntegerList pl_reportedArgs)
  runs on EPTF_MQTT_LGen_CT
  {
    if (pl_eIdx < 0)
    {
      f_EPTF_LGenBase_postEvent(
        {
          {
            v_MQTT_bIdx,
            pl_eventIdx,
            omit,
            omit
          },
          pl_reportedArgs
      });
    }
    else
    {
      if (pl_fsmCtx < 0)
      {
        f_EPTF_LGenBase_postEvent(
          {
            {
              v_MQTT_bIdx,
              pl_eventIdx,
              {
                pl_eIdx,
                omit
              }, omit
            },
            pl_reportedArgs
        });
      }
      else
      {
        f_EPTF_LGenBase_postEvent(
          {
            {
              v_MQTT_bIdx,
              pl_eventIdx,
              {
                pl_eIdx,
                pl_fsmCtx
              }, omit
            },
            pl_reportedArgs
        });
      }
    }
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_qos_int2enum
  //
  //  Purpose:
  //    Converts the integer value of QoS to its enumerated value
  //
  //  Parameters:
  //    p_qos - *in* *integer* - integer QoS value
  //
  //  Return Type:
  //    <QoS> - enumerated value of the input QoS
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_qos_int2enum(in integer p_qos)
  return QoS
  {
    if (p_qos == 0) {
      return AT_MOST_ONCE_DELIVERY;
    }
    else if (p_qos == 1) {
      return AT_LEAST_ONCE_DELIVERY;
    }
    else if (p_qos == 2) {
      return EXACTLY_ONE_DELIVERY;
    }
    return RESERVED;
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishResponseType
  //
  //  Purpose:
  //    Checks if an MQTT message is a response to a PUBLISH
  //
  //  Parameters:
  //    p_msg - *in* <MQTT_v3_1_1_ReqResp> - the MQTT message to be checked
  //
  //  Return Type:
  //    *boolean* - true: message is a publish response type message
  //
  //  Related Types:
  //    <EPTF_MQTT_LGen_CT>
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishResponseType(in MQTT_v3_1_1_ReqResp p_msg)
  return boolean
  {
    return
      ischosen(p_msg.puback) or
      ischosen(p_msg.pubrec) or
      ischosen(p_msg.pubrel) or
      ischosen(p_msg.pubcomp)
  }

  ///////////////////////////////////////////////////////////
  //  Function: f_EPTF_MQTT_publishResponsePacketId
  //
  //  Purpose:
  //    Get the packet identifier of an MQTT message, if it is a response to a PUBLISH 
  //
  //  Parameters:
  //    p_msg - *in* <MQTT_v3_1_1_ReqResp> - the input MQTT message
  //
  //  Return Type:
  //    *integer* - Packet identifier value if the message was of a PUBLISH response type, -1 in other cases
  ///////////////////////////////////////////////////////////
  function f_EPTF_MQTT_publishResponsePacketId(in MQTT_v3_1_1_ReqResp p_msg)
  return integer
  {
    if (ischosen(p_msg.puback)) { return p_msg.puback.packet_identifier }
    else if (ischosen(p_msg.pubrec)) { return p_msg.pubrec.packet_identifier }
    else if (ischosen(p_msg.pubrel)) { return p_msg.pubrel.packet_identifier }
    else if (ischosen(p_msg.pubcomp)) { return p_msg.pubcomp.packet_identifier }
    else {
      return -1;
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_pingReq :=
  {
    pingreq := {
      header := {
        flags := '0000'B
      }
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBACK(in integer p_packetId) :=
  {
    puback := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREL(in integer p_packetId) :=
  {
    pubrel := {
      header := {
        flags := '0010'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREC(in integer p_packetId) :=
  {
    pubrec := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBCOMP(in integer p_packetId) :=
  {
    pubcomp := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

}
