///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2018 Ericsson Telecom AB
//
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v2.0
// which accompanies this distribution, and is available at
// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html
///////////////////////////////////////////////////////////////////////////////
//  File:               EPTF_MQTT_LocalTransport_Definitions.ttcn
//  Description:
//  Rev:                R1A
//  Prodnr:             CNL 113 860
//  Updated:            2017-09-01
//  Contact:            http://ttcn.ericsson.se
///////////////////////////////////////////////////////////////////////////////
module EPTF_MQTT_LGen_Functions {

  import from EPTF_MQTT_LGen_Definitions all;
  import from EPTF_MQTT_Transport_Definitions all;

  import from EPTF_CLL_Base_Functions all;
  import from EPTF_CLL_Common_Definitions all;
  import from EPTF_CLL_Variable_Definitions all;
  import from EPTF_CLL_Variable_Functions all;
  import from EPTF_CLL_LGenBase_Definitions all;
  import from EPTF_CLL_LGenBase_ConfigFunctions all;
  import from EPTF_CLL_LGenBase_Functions all;
  import from EPTF_CLL_LGenBase_EventHandlingFunctions all;
  import from EPTF_CLL_Logging_Definitions all;
  import from EPTF_CLL_Logging_Functions all;
  import from EPTF_CLL_FBQ_Functions all;
  import from EPTF_CLL_HashMapStr2Int_Functions all;
  import from EPTF_CLL_Scheduler_Definitions all;
  import from EPTF_CLL_RBTScheduler_Functions all;
  import from TCCConversion_Functions all;

  import from MQTT_v3_1_1_Types all;
  import from IPL4asp_Types all;

  function f_EPTF_MQTT_LGen_init(in charstring pl_name)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_initialized){return;}

    f_EPTF_LGenBase_init(pl_name, 0, pl_name);
    f_EPTF_Logging_init_CT(pl_name);
    f_EPTF_str2int_HashMap_Init();

    v_MQTT_bIdx := f_EPTF_LGenBase_declareBehaviorType(
      c_MQTT_behaviorType,
      tsp_EPTF_MQTT_LGen_maxBindableCtx,
      refers(f_MQTT_eCtxReset),
      refers(f_MQTT_eCtxBind),
      refers(f_MQTT_eCtxUnbind)
      );

    v_MQTT_loggingMaskId :=
      f_EPTF_Logging_registerComponentMasks(
        "MQTT_Logging",
        {"WARNING", "DEBUG", "DEBUGV", "ERROR" },
        EPTF_Logging_CLL);

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": my behavior idx is ", v_MQTT_bIdx));

    f_EPTF_MQTT_addressDB_init();
    f_EPTF_MQTT_templateDB_init();
    f_EPTF_MQTT_sessionDB_init();
    f_EPTF_MQTT_subscriptionDB_init();
    f_EPTF_MQTT_publishDB_init();

    f_EPTF_MQTT_declareSteps();
    f_EPTF_MQTT_declareEvents();

    f_EPTF_Base_registerCleanup(refers(f_MQTT_cleanUp));

    v_MQTT_initialized := true;
  }

  function f_EPTF_MQTT_LGen_initLogging()
  runs on EPTF_MQTT_LGen_CT
  {
	f_EPTF_Logging_init_CT("MQTT_LGen");
    v_MQTT_loggingMaskId :=
    	f_EPTF_Logging_registerComponentMasks(
    		"MQTT_LGen_Logging",
    		{"WARNING", "DEBUG", "DEBUGV", "ERROR"},
    		EPTF_Logging_CLL
    	);

	if(tsp_EPTF_MQTT_LGen_debug){
	  f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
	}
	else {
	  f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUG);
	}

	if(tsp_EPTF_MQTT_LGen_debugVerbose) {
	  f_EPTF_Logging_enableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
	}
	else {
	  f_EPTF_Logging_disableLocalMask(v_MQTT_loggingMaskId, c_MQTT_LGen_Logging_DEBUGV);
	}
  }

  function f_MQTT_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_addressDB_cleanUp();
    f_EPTF_MQTT_templateDB_cleanUp();
    f_EPTF_MQTT_sessionDB_cleanUp();
    f_EPTF_MQTT_subscriptionDB_cleanUp();
    f_EPTF_MQTT_publishDB_cleanUp();

    vf_MQTT_msgReceived := null;

    v_MQTT_initialized := false;
  }

  function f_MQTT_eCtxBind(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  return EPTF_IntegerList
  {
    return {pl_eIdx};
  }

  function f_MQTT_eCtxUnbind(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (not v_MQTT_initialized) {return;}
  }

  function f_MQTT_eCtxReset(in integer pl_eIdx)
  runs on EPTF_MQTT_LGen_CT
  {
  }

  function f_EPTF_MQTT_declareEvents()
  runs on EPTF_MQTT_LGen_CT
  {
    var integer vl_dummy;

    if (
      c_MQTT_eventIdx_transportSucc != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportSucc) or
      c_MQTT_eventIdx_transportFail != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportFail) or
      c_MQTT_eventIdx_transportClosed != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_transportClosed) or
      c_MQTT_eventIdx_CONNACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Accepted) or
      c_MQTT_eventIdx_CONNACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_CONNACK_Refused) or
      c_MQTT_eventIdx_SUBACK_Accepted != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Accepted) or
      c_MQTT_eventIdx_SUBACK_Refused != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_SUBACK_Refused) or
      c_MQTT_eventIdx_UNSUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_UNSUBACK) or
      c_MQTT_eventIdx_PUBLISH != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH) or
      c_MQTT_eventIdx_PING_Request != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Request) or
      c_MQTT_eventIdx_PING_Response != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PING_Response) or
      c_MQTT_eventIdx_PUBACK != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBACK) or
      c_MQTT_eventIdx_PUBREC != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREC) or
      c_MQTT_eventIdx_PUBREL != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBREL) or
      c_MQTT_eventIdx_PUBCOMP != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBCOMP) or
      c_MQTT_eventIdx_PUBLISH_Timeout != f_EPTF_LGenBase_declareFsmEvent(c_MQTT_behaviorType, c_MQTT_eventName_PUBLISH_Timeout)
    )
    {
      f_EPTF_LGenBase_log();
      log("error"); mtc.stop
    }

  }

  function f_EPTF_MQTT_declareSteps()
  runs on EPTF_MQTT_LGen_CT
  {
    if (
      c_MQTT_stepIdx_init != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_init, refers(f_MQTT_step_init)}) or
      c_MQTT_stepIdx_cleanUp != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_cleanUp, refers(f_MQTT_step_cleanUp) }) or
      c_MQTT_stepIdx_setLocalAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setLocalAddress_byVars, refers(f_MQTT_step_setLocalAddress_byVars)}) or
      c_MQTT_stepIdx_setRemoteAddress_byVars != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setRemoteAddress_byVars, refers(f_MQTT_step_setRemoteAddress_byVars)}) or
      c_MQTT_stepIdx_transportConnect != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportConnect, refers(f_MQTT_step_transportConnect)}) or
      c_MQTT_stepIdx_transportClose != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_transportClose, refers(f_MQTT_step_transportClose)}) or
      c_MQTT_stepIdx_startListening != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_startListening, refers(f_MQTT_step_startListening)}) or
      c_MQTT_stepIdx_loadTemplate_byIntIdx != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byIntIdx, refers(f_MQTT_step_loadTemplate_byIntIdx)}) or
      c_MQTT_stepIdx_loadTemplate_byStringId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_loadTemplate_byStringId, refers(f_MQTT_step_loadTemplate_byStringId)}) or
      c_MQTT_stepIdx_send != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_send, refers(f_MQTT_step_send)}) or
      c_MQTT_stepIdx_setTopic_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_stringParam, refers(f_MQTT_step_setTopic_stringParam)}) or
      c_MQTT_stepIdx_setTopic_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_stringParam, refers(f_MQTT_step_setTopic_add_stringParam)}) or
      c_MQTT_stepIdx_setTopic_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_varParams, refers(f_MQTT_step_setTopic_add_varParams)}) or
      c_MQTT_stepIdx_setTopic_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setTopic_add_clientId, refers(f_MQTT_step_setTopic_add_clientId)}) or
      c_MQTT_stepIdx_setQos_intParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setQos_intParam, refers(f_MQTT_step_setQos_intParam)}) or
      c_MQTT_stepIdx_setPublishMessage_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_stringParam, refers(f_MQTT_step_setPublishMessage_stringParam)}) or
      c_MQTT_stepIdx_setPublishMessage_add_stringParam != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_stringParam, refers(f_MQTT_step_setPublishMessage_add_stringParam)}) or
      c_MQTT_stepIdx_setPublishMessage_add_varParams != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_varParams, refers(f_MQTT_step_setPublishMessage_add_varParams)}) or
      c_MQTT_stepIdx_setPublishMessage_add_clientId != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_setPublishMessage_add_clientId, refers(f_MQTT_step_setPublishMessage_add_clientId)}) or
      c_MQTT_stepIdx_reportPingResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPingResponse, refers(f_MQTT_step_reportPingResponse)}) or
      c_MQTT_stepIdx_reportPublishResponse != f_EPTF_LGenBase_declareStep(c_MQTT_behaviorType,{c_MQTT_stepName_reportPublishResponse, refers(f_MQTT_step_reportPublishResponse)})
    )
    {
      f_EPTF_LGenBase_log();
      log("EPTF_MQTT_LGen declaration error"); mtc.stop
    }
  }

  function f_EPTF_MQTT_LGen_receiveMessage(in EPTF_MQTT_PDU pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", pl_message));

    v_MQTT_msgToProcess := pl_message;

    f_EPTF_MQTT_stack_fromEnv(v_MQTT_msgToProcess);

    if (vf_MQTT_msgReceived != null)
    {
      vf_MQTT_msgReceived.apply(v_MQTT_msgToProcess);
    }
  }

  function f_EPTF_MQTT_LGen_receiveEvent(in ASP_Event p_event)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " ", p_event));

    // TODO: connection closed handling!
  }

  function f_EPTF_MQTT_LGen_transportApiResponse(in EPTF_MQTT_Transport_Response pl_rsp)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str("api response: ", pl_rsp));

    if (f_EPTF_MQTT_sessionDB_get(pl_rsp.sessionIdx, v_MQTT_session))
    {
      if (pl_rsp.succ)
      {
        f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportSucc, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
      }
      else
      {
        f_EPTF_MQTT_dispatchEvent(c_MQTT_eventIdx_transportFail, v_MQTT_session.eIdx, v_MQTT_session.fsmIdx, {});
      }
    }
    else
    {
      f_EPTF_MQTT_Logging_VERBOSE(log2str("session not found for api response: ", pl_rsp));
    }
  }

  function f_EPTF_MQTT_LGen_send(inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    vf_EPTF_MQTT_Transport_send.apply(p_msg);
  }

  function f_MQTT_step_init(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    var integer vl_eIdx := pl_ptr.eIdx;
    var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
    var integer vl_newSessionIdx := -1;

    if (f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_newSessionIdx)) { return }

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId, ":  initializing fsm ", vl_fsmIdx, " for entity ",vl_eIdx));

    var MQTT_Session vl_session := c_MQTT_Session_init;

	vl_session.eIdx := vl_eIdx;
    vl_session.fsmIdx := pl_ptr.refContext.fCtxIdx;

    vl_newSessionIdx := f_EPTF_MQTT_sessionDB_add(vl_session);

    f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, vl_newSessionIdx);
  }

  function f_MQTT_step_cleanUp(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
     f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    var integer vl_eIdx:=pl_ptr.eIdx;
    var integer vl_fsmIdx := pl_ptr.refContext.fCtxIdx;
    var integer vl_sessionIdx := -1;

    if (not f_EPTF_MQTT_isFsmInitialized(vl_eIdx, vl_fsmIdx, vl_sessionIdx))
    {
      f_EPTF_MQTT_Logging_DEBUG(%definitionId&": FSM has not been initialized");
      return;
    }

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId,": FSM database: ",v_MQTT_sessionDB.data[vl_sessionIdx]));

	f_EPTF_MQTT_session_remove(vl_sessionIdx);

    f_EPTF_LGenBase_setAppDataItemOfFsmCtx(vl_eIdx, vl_fsmIdx, v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1);
  }

  // 1st param: remoteHost: charstring
  // 2nd param: remotePort: integer
  function f_MQTT_step_setLocalAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

    if (sizeof(vl_varIds)==2)
    {
      var EPTF_Var_DirectContent vl_host, vl_port;
      f_EPTF_Var_getContent(vl_varIds[0], vl_host);
      f_EPTF_Var_getContent(vl_varIds[1], vl_port);

      if (not ischosen(vl_host.charstringVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
        return;
      }
      if (not ischosen(vl_port.intVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
        return;
      }

      f_EPTF_MQTT_addressDB_add(
        {
          hostName := vl_host.charstringVal,
          portNumber := vl_port.intVal
        },
        v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx
      );
    }
    else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
    }
  }

  // 1st param: remoteHost: charstring
  // 2nd param: remotePort: integer
  function f_MQTT_step_setRemoteAddress_byVars(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

    if (sizeof(vl_varIds)==2)
    {
      var EPTF_Var_DirectContent vl_host, vl_port;
      f_EPTF_Var_getContent(vl_varIds[0], vl_host);
      f_EPTF_Var_getContent(vl_varIds[1], vl_port);

      if (not ischosen(vl_host.charstringVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " first param is not charstring variable!"));
        return;
      }
      if (not ischosen(vl_port.intVal)) {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " second param is not integer variable!"));
        return;
      }

      f_EPTF_MQTT_addressDB_add(
        {
          hostName := vl_host.charstringVal,
          portNumber := vl_port.intVal
        },
        v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx
      );
    }
    else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " two variables are needed as params!"));
    }
  }

  function f_MQTT_step_startListening(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
    vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_addressDB_get(vl_req.params.startListening.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);

    vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
  }

  function f_MQTT_step_transportConnect(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
    vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(vl_req.params.connect_.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);

    vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
  }

  // 1st param int: expectResponse (optional)
  function f_MQTT_step_transportClose(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var integer vl_expectResponseInt := 1;
    var boolean vl_expectResponse := true;
    if (f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_expectResponseInt))
    {
      if (vl_expectResponseInt <= 0) { vl_expectResponse := false; }
    }

	if (v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx >= 0)
	{
      var EPTF_MQTT_Transport_Request vl_req := c_EPTF_MQTT_Transport_Request_init;
      vl_req.sessionIdx := v_MQTT_ctx.sessionIdx;
      vl_req.expectResponse := vl_expectResponse;

      f_EPTF_MQTT_addressDB_get(vl_req.params.close.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);

      vf_EPTF_MQTT_Transport_apiRequest.apply(vl_req);
    }
  }

  function f_MQTT_step_loadTemplate_byIntIdx(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var integer vl_templateIdx := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_templateIdx);

    f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
  }

  function f_MQTT_step_loadTemplate_byStringId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_templateId := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);
	var integer vl_templateIdx := f_EPTF_MQTT_templateDB_lookUp(vl_templateId);

	if (vl_templateIdx >= 0)
	{
      f_EPTF_MQTT_templateDB_get(vl_templateIdx, v_MQTT_msgToSend.pdu);
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find template with id: ", vl_templateId));
    }
  }

  // 1st param: topic name charstring
  function f_MQTT_step_setTopic_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

	if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter := f_charstr2unichar(vl_topic);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
	else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name := f_charstr2unichar(vl_topic);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // 1st param: topic name charstring
  function f_MQTT_step_setTopic_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_topic := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

	if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(vl_topic);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(vl_topic);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // params: variables
  function f_MQTT_step_setTopic_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    var charstring vl_strToAdd := "";
    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

	for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
    {
      var EPTF_Var_DirectContent vl_var;
      f_EPTF_Var_getContent(vl_varIds[i], vl_var);

      if (ischosen(vl_var.charstringVal)) {
        vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
      }
      else if (ischosen(vl_var.intVal)) {
        vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
      }
    }

    if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(vl_strToAdd);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(vl_strToAdd);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  function f_MQTT_step_setTopic_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
	  if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
	    v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter :=
         v_MQTT_msgToSend.pdu.subscribe.payload[0].topic_filter &
	     f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	  }
	  else {
	    f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
	  }
	}
    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.topic_name :=
	    v_MQTT_msgToSend.pdu.publish.topic_name &
	    f_charstr2unichar(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // 1st param: qos level (0,1,2) integer
  function f_MQTT_step_setQos_intParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_qos := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_qos);

	if (vl_qos >=0 and vl_qos < 3)
	{
      if (ischosen(v_MQTT_msgToSend.pdu.subscribe)) {
		if (sizeof(v_MQTT_msgToSend.pdu.subscribe.payload)>0) {
		  v_MQTT_msgToSend.pdu.subscribe.payload[0].requested_qos := f_EPTF_MQTT_qos_int2enum(vl_qos);
		}
		else {
		  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Couldn't find first subscription entry: ", v_MQTT_msgToSend));
		}
	  }
	  else if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
		v_MQTT_msgToSend.pdu.publish.header.qos_level := f_EPTF_MQTT_qos_int2enum(vl_qos);
	  }
	  else {
		f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	  }
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid qos level: ", vl_qos));
	}
  }

  // 1st param: publish message charstring
  function f_MQTT_step_setPublishMessage_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload := char2oct(vl_msg);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // 1st param: publish message charstring
  function f_MQTT_step_setPublishMessage_add_stringParam(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var charstring vl_msg := f_EPTF_LGenBase_charstringValOfStep(pl_ptr);

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload :=
	    v_MQTT_msgToSend.pdu.publish.payload &
	    char2oct(vl_msg);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // params: variables
  function f_MQTT_step_setPublishMessage_add_varParams(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (not ischosen(v_MQTT_msgToSend.pdu.publish)) {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
      return;
    }

    var charstring vl_strToAdd := "";
    var EPTF_IntegerList vl_varIds := {};
    f_EPTF_LGenBase_fsmVarIdListFromStep(pl_ptr, vl_varIds);

	for (var integer i:=0; i<sizeof(vl_varIds); i:=i+1)
    {
      var EPTF_Var_DirectContent vl_var;
      f_EPTF_Var_getContent(vl_varIds[i], vl_var);

      if (ischosen(vl_var.charstringVal)) {
        vl_strToAdd := vl_strToAdd & vl_var.charstringVal;
      }
      else if (ischosen(vl_var.intVal)) {
        vl_strToAdd := vl_strToAdd & int2str(vl_var.intVal);
      }
    }

    v_MQTT_msgToSend.pdu.publish.payload :=
	  v_MQTT_msgToSend.pdu.publish.payload &
	  char2oct(vl_strToAdd);
  }

  function f_MQTT_step_setPublishMessage_add_clientId(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    if (ischosen(v_MQTT_msgToSend.pdu.publish)) {
	  v_MQTT_msgToSend.pdu.publish.payload :=
	    v_MQTT_msgToSend.pdu.publish.payload &
	    char2oct(v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].clientId);
	}
	else {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid message type in msgToSend: ", v_MQTT_msgToSend));
	}
  }

  // 1st param: enable (1)/disable (0): integer
  function f_MQTT_step_reportPingResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_enable := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);

	if (vl_enable >=0 and vl_enable < 2)
	{
      v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPingResponse := (vl_enable == 1);
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
	}
  }

  // 1st param: enable (1)/disable (0): integer
  function f_MQTT_step_reportPublishResponse(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

	var integer vl_enable := -1;
    f_EPTF_MQTT_getIntValue(pl_ptr.refContext.fRefArgs, 0, vl_enable);

	if (vl_enable >=0 and vl_enable < 2)
	{
      v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].reportPublishResponse := (vl_enable == 1);
	}
	else {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId," Invalid parameter: ", vl_enable));
	}
  }

  function f_MQTT_step_send(in EPTF_LGenBase_TestStepArgs pl_ptr)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," ",pl_ptr));

    if (not f_EPTF_MQTT_setStepCtx(pl_ptr, v_MQTT_ctx)) { return; }

    f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.localAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(v_MQTT_msgToSend.transportParams.remoteAddress, v_MQTT_sessionDB.data[v_MQTT_ctx.sessionIdx].remoteAddrIdx);

    v_MQTT_msgToSend.transportParams.proto := {tcp := {}};
    v_MQTT_msgToSend.sessionIdx := v_MQTT_ctx.sessionIdx;

    f_EPTF_MQTT_Logging_DEBUG(log2str(%definitionId," msg to send: ",v_MQTT_msgToSend));

    f_EPTF_MQTT_stack_fromApp(v_MQTT_msgToSend, v_MQTT_ctx);

    f_EPTF_SchedulerComp_refreshSnapshotTime();
  }

  function f_EPTF_MQTT_addressDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
    v_MQTT_addressDB.data := {};
    v_MQTT_addressDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_addressDB_Hash");
  }

  function f_EPTF_MQTT_addressDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_addressDB.queue);
    v_MQTT_addressDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_addressDB_Hash");
  }

  function f_EPTF_MQTT_addressDB_add(in Socket p_addr, inout integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    p_idx := f_EPTF_MQTT_addressDB_lookUp(p_addr);

    if (p_idx == -1)
    {
      p_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_addressDB.queue);
      f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_addressDB.queue);
      f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding target address ", p_idx, " ", p_addr));
      f_EPTF_str2int_HashMap_Insert(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_addr), p_idx);
      v_MQTT_addressDB.data[p_idx] := p_addr;
    }
  }

  function f_EPTF_MQTT_addressDB_get(inout Socket p_addr, in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_idx < sizeof(v_MQTT_addressDB.data) and p_idx >=0)
    {
      p_addr := v_MQTT_addressDB.data[p_idx];
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(": "," couldn't get address at ", p_idx));
	}
  }

  function f_EPTF_MQTT_addressDB_lookUp(in Socket p_sock)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(v_MQTT_addressDB.hashRef, f_EPTF_MQTT_addressDB_Socket2String(p_sock), vl_idx);
    return vl_idx;
  }

  function f_EPTF_MQTT_addressDB_Socket2String(Socket p_sock)
  return charstring
  {
    return p_sock.hostName&":"&int2str(p_sock.portNumber);
  }

  function f_EPTF_MQTT_templateDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_templateDB.data := {};
    v_MQTT_templateDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_templateDB_Hash");

    for (var integer i:=0; i<sizeof(tsp_EPTF_MQTT_LGen_templates); i:=i+1) {
      f_EPTF_MQTT_templateDB_add(tsp_EPTF_MQTT_LGen_templates[i]);
    }
  }

  function f_EPTF_MQTT_templateDB_add(in MQTT_Template p_template)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    if (f_EPTF_MQTT_templateDB_lookUp(p_template.id)!=-1)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " template is already added with id: ", p_template.id));
      return -1;
    }

    var integer v_idx := sizeof(v_MQTT_templateDB.data);
    v_MQTT_templateDB.data[v_idx] := p_template;
    f_EPTF_str2int_HashMap_Insert(v_MQTT_templateDB.hashRef, p_template.id, v_idx);

    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template was added with id: ", p_template.id, " at idx: ",v_idx));

    return v_idx;
  }

  function f_EPTF_MQTT_templateDB_lookUp(in charstring p_id)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    if (not f_EPTF_str2int_HashMap_Find(v_MQTT_templateDB.hashRef, p_id, vl_idx))
    {
      vl_idx := -1;
    }
    return vl_idx;
  }

  function f_EPTF_MQTT_templateDB_get(in integer p_idx, inout MQTT_v3_1_1_ReqResp p_pdu)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_idx < sizeof(v_MQTT_templateDB.data) and p_idx >= 0)
    {
      f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " template is fetched with idx: ", p_idx));
      p_pdu := v_MQTT_templateDB.data[p_idx].msg;
    }
  }

  function f_EPTF_MQTT_templateDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_templateDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_templateDB_Hash");
  }

  function f_EPTF_MQTT_sessionDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
    v_MQTT_sessionDB.data := {};
    v_MQTT_sessionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_sessionDB_Hash");
  }

  function f_EPTF_MQTT_sessionDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_sessionDB.queue);
    v_MQTT_sessionDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_sessionDB_Hash");
  }

  function f_EPTF_MQTT_sessionDB_add(in MQTT_Session p_session)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_sessionDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_sessionDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding session ", v_idx, " ", p_session));

    v_MQTT_sessionDB.data[v_idx] := p_session;

    return v_idx;
  }

  function f_EPTF_MQTT_sessionDB_setKey(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (f_EPTF_MQTT_sessionDB_get(p_idx, v_MQTT_session))
    {
      var Socket v_addr;
      f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_session.localAddrIdx);
      f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "setting key for sock: ",f_EPTF_MQTT_sessionDB_addrHash(v_addr)," idx: ",p_idx));
      f_EPTF_str2int_HashMap_Insert(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr), p_idx);
    }
  }

  function f_EPTF_MQTT_sessionDB_lookUp(in Socket p_sock)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(p_sock), vl_idx);
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up sock: ",f_EPTF_MQTT_sessionDB_addrHash(p_sock)," idx: ",vl_idx));
    return vl_idx;
  }

  function f_EPTF_MQTT_sessionDB_get(in integer p_idx, inout MQTT_Session p_session)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0)
    {
      p_session := v_MQTT_sessionDB.data[p_idx];
      return true;
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get session with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_sessionDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_sessionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_sessionDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_sessionDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "removing session with idx: ", p_idx));

    var Socket v_addr;
    f_EPTF_MQTT_addressDB_get(v_addr, v_MQTT_sessionDB.data[p_idx].localAddrIdx);
    f_EPTF_str2int_HashMap_Erase(v_MQTT_sessionDB.hashRef, f_EPTF_MQTT_sessionDB_addrHash(v_addr));
    v_MQTT_sessionDB.data[p_idx] := c_MQTT_Session_init;
    f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_sessionDB.queue);
  }

  function f_EPTF_MQTT_sessionDB_addrHash(in Socket p_sock)
  return charstring
  {
    return p_sock.hostName&":"&int2str(p_sock.portNumber);
  }

  function f_EPTF_MQTT_publishDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
    v_MQTT_publishDB.data := {};
    v_MQTT_publishDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_publishDB_Hash");
  }

  function f_EPTF_MQTT_publishDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_publishDB.queue);
    v_MQTT_publishDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_publishDB_Hash");
  }

  function f_EPTF_MQTT_publishDB_add(in MQTT_Publish p_pub)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_publishDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_publishDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding publish ", v_idx, " ", p_pub));

    f_EPTF_str2int_HashMap_Insert(
      v_MQTT_publishDB.hashRef,
      f_EPTF_MQTT_publishDB_packetIdHash(p_pub.sessionIdx, p_pub.packetId),
      v_idx
    );

    v_MQTT_publishDB.data[v_idx] := p_pub;

    return v_idx;
  }

  function f_EPTF_MQTT_publishDB_lookUp(in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_publishDB.hashRef,
      f_EPTF_MQTT_publishDB_packetIdHash(p_sessionIdx, p_packetId),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up publish idx: ",vl_idx));
    return vl_idx;
  }

  function f_EPTF_MQTT_publishDB_get(in integer p_idx, inout MQTT_Publish p_pub)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0)
    {
      p_pub := v_MQTT_publishDB.data[p_idx];
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get publish with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_publishDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_publishDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_publishDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid publish with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_publishDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));

    if (f_EPTF_MQTT_publishDB_get(p_idx, v_MQTT_publish))
    {
      f_EPTF_str2int_HashMap_Erase(
        v_MQTT_publishDB.hashRef,
        f_EPTF_MQTT_publishDB_packetIdHash(
          v_MQTT_publish.sessionIdx,
          v_MQTT_publish.packetId)
      );

      v_MQTT_publishDB.data[p_idx] := c_MQTT_Publish_init;
      f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_publishDB.queue);
    }
  }

  function f_EPTF_MQTT_publishDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
  }

  function f_EPTF_MQTT_subscriptionDB_init()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
    v_MQTT_subscriptionDB.data := {};
    v_MQTT_subscriptionDB.hashRef := f_EPTF_str2int_HashMap_New("EPTF_MQTT_subscriptionDB_Hash");
  }

  function f_EPTF_MQTT_subscriptionDB_cleanUp()
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_FBQ_initFreeBusyQueue(v_MQTT_subscriptionDB.queue);
    v_MQTT_subscriptionDB.data := {};
    f_EPTF_str2int_HashMap_Delete("EPTF_MQTT_subscriptionDB_Hash");
  }

  function f_EPTF_MQTT_subscriptionDB_add(in MQTT_Subscription p_sub)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer v_idx := f_EPTF_FBQ_getOrCreateFreeSlot(v_MQTT_subscriptionDB.queue);
    f_EPTF_FBQ_moveFromFreeHeadToBusyTail(v_MQTT_subscriptionDB.queue);
    f_EPTF_MQTT_Logging_DEBUG(log2str(": "," adding subscription ", v_idx, " ", p_sub));

    if (ispresent(p_sub.request))
    {
      f_EPTF_str2int_HashMap_Insert(
        v_MQTT_subscriptionDB.hashRef,
        f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sub.sessionIdx, p_sub.request.packet_identifier),
        v_idx
      );

      // TODO: Only the first topic name will be used!
      // TODO: It is not checked if there is already a subscription for this!
      if (sizeof(p_sub.request.payload)>0)
      {
        var universal charstring vl_topicName := p_sub.request.payload[0].topic_filter;
        f_EPTF_str2int_HashMap_Insert(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_topicHash(p_sub.sessionIdx, f_unichar2charstr(vl_topicName)),
          v_idx
        );
      }
    }

    v_MQTT_subscriptionDB.data[v_idx] := p_sub;

    return v_idx;
  }

  function f_EPTF_MQTT_subscriptionDB_setKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_str2int_HashMap_Insert(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
      p_idx
    );
  }

  function f_EPTF_MQTT_subscriptionDB_removeKey_packetId(in integer p_idx, in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_str2int_HashMap_Erase(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId)
    );
  }

  function f_EPTF_MQTT_subscriptionDB_lookUp_packetId(in integer p_sessionIdx, in integer p_packetId)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_packetIdHash(p_sessionIdx, p_packetId),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
    return vl_idx;
  }

  function f_EPTF_MQTT_subscriptionDB_lookUp_topicName(in integer p_sessionIdx, in charstring p_topicName)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    var integer    vl_idx := -1;
    f_EPTF_str2int_HashMap_Find(
      v_MQTT_subscriptionDB.hashRef,
      f_EPTF_MQTT_subscriptionDB_topicHash(p_sessionIdx, p_topicName),
      vl_idx
    );
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, "looked up subscription idx: ",vl_idx));
    return vl_idx;
  }

  function f_EPTF_MQTT_subscriptionDB_get(in integer p_idx, inout MQTT_Subscription p_sub)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0)
    {
      p_sub := v_MQTT_subscriptionDB.data[p_idx];
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "couldn't get subscription with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_subscriptionDB_check(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (p_idx < sizeof(v_MQTT_subscriptionDB.data) and p_idx >= 0 and f_EPTF_FBQ_itemIsBusy(p_idx, v_MQTT_subscriptionDB.queue))
    {
      return true;
    }
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, "invalid subscription with idx: ", p_idx));
      return false;
    }
  }

  function f_EPTF_MQTT_subscriptionDB_remove(in integer p_idx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " removing idx: ", p_idx));

    if (f_EPTF_MQTT_subscriptionDB_get(p_idx, v_MQTT_subscription))
    {
      if (ispresent(v_MQTT_subscription.request))
      {
        f_EPTF_str2int_HashMap_Erase(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_packetIdHash(
            v_MQTT_subscription.sessionIdx,
            v_MQTT_subscription.request.packet_identifier)
        );
      }
      if (sizeof(v_MQTT_subscription.request.payload)>0)
      {
        var universal charstring vl_topicName := v_MQTT_subscription.request.payload[0].topic_filter;
        f_EPTF_str2int_HashMap_Erase(
          v_MQTT_subscriptionDB.hashRef,
          f_EPTF_MQTT_subscriptionDB_topicHash(v_MQTT_subscription.sessionIdx, f_unichar2charstr(vl_topicName))
        );
      }
      v_MQTT_subscriptionDB.data[p_idx] := c_MQTT_Subscription_init;
      f_EPTF_FBQ_moveFromBusyToFreeTail(p_idx, v_MQTT_subscriptionDB.queue);
    }
  }

  function f_EPTF_MQTT_subscriptionDB_packetIdHash(in integer p_sessionIdx, in integer p_packetId)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"id_"&int2str(p_packetId);
  }

  function f_EPTF_MQTT_subscriptionDB_topicHash(in integer p_sessionIdx, in charstring p_topic)
  return charstring
  {
    return "session_"&int2str(p_sessionIdx)&":"&"topic_"&p_topic;
  }

  function f_EPTF_MQTT_stack_fromApp(inout EPTF_MQTT_PDU p_msg, in MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (p_ctx.sessionIdx >=0 and p_ctx.sessionIdx < sizeof(v_MQTT_sessionDB.data) )
    {
      f_EPTF_MQTT_session_fromApp(p_msg, p_ctx.sessionIdx);
    }
    else
    {
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " sessionIdx is not valid [",p_ctx.sessionIdx,"]. Dropping message"));
    }
  }

  function f_EPTF_MQTT_stack_fromEnv(inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    var integer vl_sIdx := f_EPTF_MQTT_sessionDB_lookUp(p_msg.transportParams.localAddress);

    if (vl_sIdx >= 0)
    {
      // In case it's a publish response, there should be an ongoing publish transaction
      if (f_EPTF_MQTT_publishResponseType(p_msg.pdu))
      {
        var integer vl_packetId := f_EPTF_MQTT_publishResponsePacketId(p_msg.pdu);
		var integer vl_pubIdx := f_EPTF_MQTT_publishDB_lookUp(vl_sIdx, vl_packetId);

		if (vl_pubIdx >= 0)
		{
		  f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish transaction found for incoming message: ", vl_pubIdx));
		  f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);
		}
		else
	    {
    	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no ongoing publish transaction found for incoming message (ignoring): ", p_msg));
	    }
      }
      else
      {
        f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session found for incoming message: ",vl_sIdx));
        f_EPTF_MQTT_session_fromEnv(p_msg, vl_sIdx);
      }
    }
    else
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " no session found for incoming message (ignoring): ", p_msg));
    }
  }

  function f_EPTF_MQTT_session_fromApp(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",p_sIdx,":",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));

    f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);

    // State: DISCONNECTED
    if (v_MQTT_sessionDB.data[p_sIdx].state == DISCONNECTED)
    {
      // appIn: connect
      if (ischosen(p_msg.pdu.connect_msg))
      {
		// At this point, the local address should be filled in correctly
        f_EPTF_MQTT_sessionDB_setKey(p_sIdx);

		p_msg.pdu.connect_msg.payload.client_identifier.stringItem := v_MQTT_sessionDB.data[p_sIdx].clientId;
		p_msg.pdu.connect_msg.payload.client_identifier.stringLength := lengthof(v_MQTT_sessionDB.data[p_sIdx].clientId);
        v_MQTT_sessionDB.data[p_sIdx].keepAliveTime := int2float(p_msg.pdu.connect_msg.keep_alive-1);

        f_EPTF_MQTT_session_send(p_sIdx, p_msg);

        f_EPTF_MQTT_session_setState(p_sIdx, CONNECTING);
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	// State: CONNECTED
    else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
    {
      // appIn: disconnect
      if (ischosen(p_msg.pdu.disconnect_msg))
      {
        // cancel T keepalive
        f_EPTF_MQTT_session_cancelT_keepalive(p_sIdx);

		f_EPTF_MQTT_session_send(p_sIdx, p_msg);

        f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);
	  }
	  if (ischosen(p_msg.pdu.publish))
      {
        if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
        {
          p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

  	 	  f_EPTF_MQTT_session_send(p_sIdx, p_msg);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
        }
        else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
        {
          p_msg.sessionIdx := p_sIdx;
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);

          p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

		  // create new Publish QoS1 Orig
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := ORIG;
		  vl_publish.state.qos1 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
        }
        else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
        {
          p_msg.sessionIdx := p_sIdx;
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
          f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);

          p_msg.pdu.publish.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

		  // create new Publish QoS2 Orig
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := ORIG;
		  vl_publish.state.qos2 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromSession(p_msg, vl_pubIdx);
        }
        else
        {
          f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
        }
	  }
	  if (ischosen(p_msg.pdu.subscribe))
      {
        // Fill in message
        p_msg.sessionIdx := p_sIdx;
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
        p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

        // Create new Subscription
        var MQTT_Subscription vl_sub := c_MQTT_Subscription_init;
        vl_sub.sessionIdx := p_sIdx;
        vl_sub.request := p_msg.pdu.subscribe;
        vl_sub.state := UNSUBSCRIBED;

        var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_add(vl_sub);
        f_EPTF_MQTT_session_registerSubscription(p_sIdx, vl_subIdx);

        // subscription.send
        f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);

        f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);
	  }
	  if (ischosen(p_msg.pdu.unsubscribe))
      {
        // Fill in message
        p_msg.sessionIdx := p_sIdx;
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sIdx].localAddrIdx);
        f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sIdx].remoteAddrIdx);
        p_msg.pdu.subscribe.packet_identifier := f_EPTF_MQTT_session_getNextPacketId(v_MQTT_ctx);

        // Look up corresponding subscription
        if (sizeof(p_msg.pdu.unsubscribe.payload[0])>0)
        {
          var charstring vl_topicName := f_unichar2charstr(p_msg.pdu.unsubscribe.payload[0].topic_filter);
          var integer vl_subIdx := f_EPTF_MQTT_subscriptionDB_lookUp_topicName(p_sIdx, vl_topicName);

          // subOut.send
          if (f_EPTF_MQTT_subscriptionDB_check(vl_subIdx)) {
            f_EPTF_MQTT_subscription_fromSession(p_msg, vl_subIdx);
          }
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
	}
  }

  function f_EPTF_MQTT_session_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_sIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " session [",v_MQTT_sessionDB.data[p_sIdx].clientId,"] current state: ",v_MQTT_sessionDB.data[p_sIdx].state));

    f_EPTF_MQTT_setCtx(v_MQTT_sessionDB.data[p_sIdx].eIdx, v_MQTT_sessionDB.data[p_sIdx].fsmIdx, v_MQTT_ctx);

    // State: CONNECTING
    if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTING)
    {
      // envIn: connack
      if (ischosen(p_msg.pdu.connack))
      {
        if (p_msg.pdu.connack.connect_return_code == 0)
        {
          // startT keepalive
          f_EPTF_MQTT_session_startT_keepalive(p_sIdx, v_MQTT_sessionDB.data[p_sIdx].keepAliveTime);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_CONNACK_Accepted,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else
        {
          f_EPTF_MQTT_session_setState(p_sIdx, DISCONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_CONNACK_Refused,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	// State: CONNECTED
    else if (v_MQTT_sessionDB.data[p_sIdx].state == CONNECTED)
    {
      // envIn: suback
      if (ischosen(p_msg.pdu.suback))
      {
        var integer vl_subIdx :=
          f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.suback.packet_identifier);

        f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
	  }
      // envIn: unsuback
      else if (ischosen(p_msg.pdu.unsuback))
      {
        var integer vl_subIdx :=
          f_EPTF_MQTT_subscriptionDB_lookUp_packetId(p_sIdx, p_msg.pdu.unsuback.packet_identifier);

        f_EPTF_MQTT_subscription_fromEnv(p_msg, vl_subIdx);
	  }
      // envIn: publish
      else if (ischosen(p_msg.pdu.publish))
      {
        if (p_msg.pdu.publish.header.qos_level == AT_MOST_ONCE_DELIVERY)
        {
          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else if (p_msg.pdu.publish.header.qos_level == AT_LEAST_ONCE_DELIVERY)
        {
		  // Send PUBACK

		  v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBACK(p_msg.pdu.publish.packet_identifier));
		  f_EPTF_MQTT_session_send(p_sIdx, v_MQTT_msgToSend);

          f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else if (p_msg.pdu.publish.header.qos_level == EXACTLY_ONE_DELIVERY)
        {
          // create new Publish QoS2 TERM
		  var MQTT_Publish vl_publish := c_MQTT_Publish_init;
		  vl_publish.sessionIdx := p_sIdx;
		  vl_publish.side := TERM;
		  vl_publish.state.qos2 := CREATED;
		  vl_publish.packetId := p_msg.pdu.publish.packet_identifier;
		  var integer vl_pubIdx := f_EPTF_MQTT_publishDB_add(vl_publish);

		  f_EPTF_MQTT_session_setState(p_sIdx, CONNECTED);

		  // pubOut.send
		  f_EPTF_MQTT_publish_fromEnv(p_msg, vl_pubIdx);

		  f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
        else
        {
          f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Publish qos level not handled. Ignoring:", p_msg));
        }
	  }
      // envIn: ping response
      else if (ischosen(p_msg.pdu.pingresp))
      {
        if (v_MQTT_sessionDB.data[p_sIdx].reportPingResponse)
        {
          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PING_Response,
            v_MQTT_sessionDB.data[p_sIdx].eIdx,
            v_MQTT_sessionDB.data[p_sIdx].fsmIdx,
            {}
          );
        }
      }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_sessionDB.data[p_sIdx].state));
	}
  }

  function f_EPTF_MQTT_session_keepalive(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    var integer vl_sIdx := pl_action.actionId[0];
    v_MQTT_sessionDB.data[vl_sIdx].keepaliveTimer := -1;

    if (v_MQTT_sessionDB.data[vl_sIdx].state == CONNECTED)
    {
      // Send ping request
      v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_pingReq);
      f_EPTF_MQTT_session_send(vl_sIdx, v_MQTT_msgToSend);

      // startT keepAlive
      f_EPTF_MQTT_session_startT_keepalive(vl_sIdx, v_MQTT_sessionDB.data[vl_sIdx].keepAliveTime);

      f_EPTF_MQTT_session_setState(vl_sIdx, CONNECTED);
    }

    return true;
  }

  function f_EPTF_MQTT_session_startT_keepalive(in integer pl_sIdx, in float pl_time)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (pl_time <= 0.0) { return false; }

    var boolean retval;
    var EPTF_ActionId vl_actionId;

    vl_actionId[0] := pl_sIdx;

    retval := f_EPTF_SchedulerComp_scheduleAction(
      f_EPTF_SchedulerComp_snapshotTime() + pl_time,
      refers(f_EPTF_MQTT_session_keepalive),
      vl_actionId,
      v_MQTT_sessionDB.data[pl_sIdx].keepaliveTimer
    );

    return retval;
  }

  function f_EPTF_MQTT_session_cancelT_keepalive(in integer pl_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer >= 0)
    {
      if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer))
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
      }
      v_MQTT_sessionDB.data[pl_sessionIdx].keepaliveTimer := -1;
    }
  }

  function f_EPTF_MQTT_session_getNextPacketId(in MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return integer
  {
    if (v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId < 65535)
    {
      v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId + 1;
    }
    else
    {
      v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId := 0;
    }
    return v_MQTT_sessionDB.data[p_ctx.sessionIdx].nextPacketId;
  }

  function f_EPTF_MQTT_session_setState(in integer p_sessionIdx, in MQTT_Session_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_sessionDB.data[p_sessionIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("session [", p_sessionIdx,":",v_MQTT_sessionDB.data[p_sessionIdx].clientId,"] next state: ", v_MQTT_sessionDB.data[p_sessionIdx].state));
  }

  function f_EPTF_MQTT_session_send(in integer p_sessionIdx, inout EPTF_MQTT_PDU p_msg)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_addressDB_get(p_msg.transportParams.localAddress, v_MQTT_sessionDB.data[p_sessionIdx].localAddrIdx);
    f_EPTF_MQTT_addressDB_get(p_msg.transportParams.remoteAddress, v_MQTT_sessionDB.data[p_sessionIdx].remoteAddrIdx);

	p_msg.transportParams.proto := {tcp := {}};
    p_msg.sessionIdx := p_sessionIdx;

	// envOut.send
    f_EPTF_MQTT_LGen_send(p_msg);
  }

  function f_EPTF_MQTT_session_registerSubscription(in integer p_sessionIdx, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // Note, we don't check if it is already there
    v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs)]
      := p_subIdx;
  }

  function f_EPTF_MQTT_session_deregisterSubscription(in integer p_sessionIdx, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // This is slow, it's only acceptable if the assumption that a session has only a small number
    // of subscriptions
    var EPTF_IntegerList vl_new := {};
    for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
    {
      if (v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i] != p_subIdx) {
        vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i];
      }
    }
    v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs := vl_new;
  }

  function f_EPTF_MQTT_session_registerPublish(in integer p_sessionIdx, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // Note, we don't check if it is already there
    v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs)]
      := p_pubIdx;
  }

  function f_EPTF_MQTT_session_deregisterPublish(in integer p_sessionIdx, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    // This is slow, it's only acceptable if the assumption that a session has only a small number
    // of concurrent publish(s)
    var EPTF_IntegerList vl_new := {};
    for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].publishRefs); i:=i+1)
    {
      if (v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i] != p_pubIdx) {
        vl_new[sizeof(vl_new)] := v_MQTT_sessionDB.data[p_sessionIdx].publishRefs[i];
      }
    }
    v_MQTT_sessionDB.data[p_sessionIdx].publishRefs := vl_new;
  }

  function f_EPTF_MQTT_session_remove(in integer p_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_session_setState(p_sessionIdx, REMOVING);

    f_EPTF_MQTT_session_cancelT_keepalive(p_sessionIdx);

    // Remove subscriptions, remove publications
	for (var integer i:=0; i<sizeof(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs); i:=i+1)
	{
	  f_EPTF_MQTT_subscription_remove(v_MQTT_sessionDB.data[p_sessionIdx].subscriptionRefs[i]);
	}

	// Derefer FSM pointers
	f_EPTF_LGenBase_setAppDataItemOfFsmCtx(
      v_MQTT_sessionDB.data[p_sessionIdx].eIdx,
	  v_MQTT_sessionDB.data[p_sessionIdx].fsmIdx,
	  v_MQTT_bIdx, c_MQTT_AppData_sessionIdx, -1
	);

	// Clean up DB
    f_EPTF_MQTT_sessionDB_remove(p_sessionIdx);
  }

  function f_EPTF_MQTT_subscription_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));

    // State: UNSUBSCRIBED
    if (v_MQTT_subscriptionDB.data[p_subIdx].state == UNSUBSCRIBED)
    {
      // sessionIn: subscribe
      if (ischosen(p_msg.pdu.subscribe))
      {
        // envOut.send
        f_EPTF_MQTT_LGen_send(p_msg);

        f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBING);
	  }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
    // State: SUBSCRIBED
    else if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBED)
    {
      // sessionIn: unsubscribe
      if (ischosen(p_msg.pdu.unsubscribe))
      {

        f_EPTF_MQTT_subscriptionDB_setKey_packetId(
          p_subIdx, v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_msg.pdu.unsubscribe.packet_identifier
        );

        f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBING);

        // envOut.send
        f_EPTF_MQTT_LGen_send(p_msg);
	  }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
	}
  }

  function f_EPTF_MQTT_subscription_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " subscription current state: ",v_MQTT_subscriptionDB.data[p_subIdx].state));

    // State: SUBSCRIBING
    if (v_MQTT_subscriptionDB.data[p_subIdx].state == SUBSCRIBING)
    {
      // sessionIn: suback
      if (ischosen(p_msg.pdu.suback) and sizeof(p_msg.pdu.suback.payload.return_code)>0)
      {
        if (p_msg.pdu.suback.payload.return_code[0] == 0)
        {
          f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_SUBACK_Accepted,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
            {}
          );
        }
        else
        {
          f_EPTF_MQTT_subscription_setState(p_subIdx, UNSUBSCRIBED);

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_SUBACK_Refused,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
            {}
          );

          // Removing subscription
          f_EPTF_MQTT_subscription_remove(p_subIdx);
        }
	  }
      // sessionIn: unsuback
      if (ischosen(p_msg.pdu.unsuback))
      {
        f_EPTF_MQTT_subscription_setState(p_subIdx, SUBSCRIBED);

        f_EPTF_MQTT_dispatchEvent(
          c_MQTT_eventIdx_UNSUBACK,
          v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].eIdx,
          v_MQTT_sessionDB.data[v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx].fsmIdx,
          {}
        );

        // Removing unsuback key
        f_EPTF_MQTT_subscriptionDB_removeKey_packetId(
          p_subIdx,
          v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx,
          p_msg.pdu.unsuback.packet_identifier
        );

        // Removing subscription
        f_EPTF_MQTT_subscription_remove(p_subIdx);
	  }
      else
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg));
      }
    }
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_subscriptionDB.data[p_subIdx].state));
	}
  }

  function f_EPTF_MQTT_subscription_setState(in integer p_subIdx, in MQTT_Subscription_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_subscriptionDB.data[p_subIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("subscription next state: ", v_MQTT_subscriptionDB.data[p_subIdx].state));
  }

  function f_EPTF_MQTT_subscription_remove(in integer p_subIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_subscription_setState(p_subIdx, REMOVING);

    // Remove pointers from session
    f_EPTF_MQTT_session_deregisterSubscription(v_MQTT_subscriptionDB.data[p_subIdx].sessionIdx, p_subIdx);

	// Clean up DB
    f_EPTF_MQTT_subscriptionDB_remove(p_subIdx);
  }

  function f_EPTF_MQTT_publish_fromSession(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));

    // QoS 1 Originating
    if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: CREATED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == CREATED)
      {
        // sessionIn: publish
        if (ischosen(p_msg.pdu.publish))
        {
          // start T response watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // envOut.send
          f_EPTF_MQTT_LGen_send(p_msg);

          f_EPTF_MQTT_publish_setState(p_pubIdx, {qos1 := PUBLISHED});
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
    }
    // QoS 1 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " NOP"));
	}
    // QoS 2 Originating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: CREATED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
      {
        // sessionIn: publish
        if (ischosen(p_msg.pdu.publish))
        {
          // start T response watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // envOut.send
          f_EPTF_MQTT_LGen_send(p_msg);

          f_EPTF_MQTT_publish_setState(p_pubIdx, {qos2 := PUBLISHED});
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
	}
    // QoS 2 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
	}
  }

  function f_EPTF_MQTT_publish_fromEnv(inout EPTF_MQTT_PDU p_msg, in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_Logging_VERBOSE(log2str(%definitionId, " publish current state: ",v_MQTT_publishDB.data[p_pubIdx].state));

    // QoS 1 Originating
    if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: PUBLISHED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos1 == PUBLISHED)
      {
        // sessionIn: PUBACK
        if (ischosen(p_msg.pdu.puback))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBACK,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // next state: ACKNOWLEDGED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos1:=ACKNOWLEDGED });

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos1));	}
    }
    // QoS 1 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos1) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled, QoS1 TERM should be handled on session level: Ignoring:", p_msg));
	}
    // QoS 2 Originating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == ORIG)
    {
      // State: PUBLISHED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == PUBLISHED)
      {
        // sessionIn: PUBREC
        if (ischosen(p_msg.pdu.pubrec))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // envOut.send PUBREL
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREL(p_msg.pdu.pubrec.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

		  // start publish watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // next state: RELEASED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RELEASED });

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBREC,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }
        }
      }
      // State: RELEASED
      else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RELEASED)
      {
        // sessionIn: PUBCOMP
        if (ischosen(p_msg.pdu.pubcomp))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // next state: COMPLETE
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBREC,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2));	}
	}
    // QoS 2 Terminating
    else if (ischosen(v_MQTT_publishDB.data[p_pubIdx].state.qos2) and v_MQTT_publishDB.data[p_pubIdx].side == TERM)
    {
      // State: PUBLISHED
      if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == CREATED)
      {
        // envIn: PUBREC
        if (ischosen(p_msg.pdu.publish))
        {
          // envOut.send PUBREC
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBREC(p_msg.pdu.publish.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

		  // start publish watchdog
          f_EPTF_MQTT_publish_startT_watchdog(p_pubIdx, tsp_EPTF_MQTT_PUBLISH_responseWatchdog);

          // next state: RECEIVED
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=RECEIVED });

          f_EPTF_MQTT_dispatchEvent(
            c_MQTT_eventIdx_PUBLISH,
            v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
            v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
            {}
          );
        }
      }
      // State: RECEIVED
      else if (v_MQTT_publishDB.data[p_pubIdx].state.qos2 == RECEIVED)
      {
        // envIn: PUBREL
        if (ischosen(p_msg.pdu.pubrel))
        {
          // cancel T response watchdog
          f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

          // envOut.send PUBCOMP
          v_MQTT_msgToSend.pdu := valueof(t_EPTF_MQTT_PUBCOMP(p_msg.pdu.pubrel.packet_identifier));
          f_EPTF_MQTT_session_send(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, v_MQTT_msgToSend);

          // next state: COMPLETE
          f_EPTF_MQTT_publish_setState(p_pubIdx, { qos2:=COMPLETE });

          // report event if enabled
          if (v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].reportPublishResponse) {
            f_EPTF_MQTT_dispatchEvent(
              c_MQTT_eventIdx_PUBREL,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].eIdx,
              v_MQTT_sessionDB.data[v_MQTT_publishDB.data[p_pubIdx].sessionIdx].fsmIdx,
              {}
            );
          }

          // remove publish transaction
          f_EPTF_MQTT_publish_remove(p_pubIdx);
        }
        else { f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " Message not handled. Ignoring:", p_msg)); }
      }
  	  else {f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown state ", v_MQTT_publishDB.data[p_pubIdx].state.qos2));	}
	}
	else
	{
	  f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId, " unknown publish type ", v_MQTT_publishDB.data[p_pubIdx]));
	}
  }

  function f_EPTF_MQTT_publish_watchdog(in EPTF_ScheduledAction pl_action, in integer pl_eventIndex)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    var integer vl_pIdx := pl_action.actionId[0];
    v_MQTT_publishDB.data[vl_pIdx].watchdogTimer := -1;

	f_EPTF_MQTT_dispatchEvent(
      c_MQTT_eventIdx_PUBACK,
      v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].eIdx,
      v_MQTT_sessionDB.data[v_MQTT_publishDB.data[vl_pIdx].sessionIdx].fsmIdx,
      {}
    );

    f_EPTF_MQTT_publish_remove(vl_pIdx);

    return true;
  }

  function f_EPTF_MQTT_publish_startT_watchdog(in integer pl_pIdx, in float pl_time)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (pl_time <= 0.0) { return false; }

    var boolean retval;
    var EPTF_ActionId vl_actionId;

    vl_actionId[0] := pl_pIdx;

    retval := f_EPTF_SchedulerComp_scheduleAction(
      f_EPTF_SchedulerComp_snapshotTime() + pl_time,
      refers(f_EPTF_MQTT_publish_watchdog),
      vl_actionId,
      v_MQTT_publishDB.data[pl_pIdx].watchdogTimer
    );

    return retval;
  }

  function f_EPTF_MQTT_publish_setState(in integer p_pubIdx, in MQTT_Publish_State p_nextState)
  runs on EPTF_MQTT_LGen_CT
  {
    v_MQTT_publishDB.data[p_pubIdx].state := p_nextState;
	f_EPTF_MQTT_Logging_VERBOSE(log2str("publish next state: ", v_MQTT_publishDB.data[p_pubIdx].state));
  }

  function f_EPTF_MQTT_publish_cancelT_watchdog(in integer pl_publishIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    if (v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer >= 0)
    {
      if(not f_EPTF_SchedulerComp_CancelEvent(v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer))
      {
        f_EPTF_MQTT_Logging_WARNING(log2str(%definitionId,": could not cancel timer"));
      }
      v_MQTT_publishDB.data[pl_publishIdx].watchdogTimer := -1;
    }
  }

  function f_EPTF_MQTT_publish_remove(in integer p_pubIdx)
  runs on EPTF_MQTT_LGen_CT
  {
    f_EPTF_MQTT_publish_setState(p_pubIdx, { removing := true });

    // cancel timers if running
    f_EPTF_MQTT_publish_cancelT_watchdog(p_pubIdx);

    // Remove pointers from session
    f_EPTF_MQTT_session_deregisterPublish(v_MQTT_publishDB.data[p_pubIdx].sessionIdx, p_pubIdx);

	// Clean up DB
    f_EPTF_MQTT_publishDB_remove(p_pubIdx);
  }

  function f_EPTF_MQTT_setStepCtx(in EPTF_LGenBase_TestStepArgs pl_ptr, inout MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (not f_EPTF_MQTT_isFsmInitialized(pl_ptr.eIdx, pl_ptr.refContext.fCtxIdx, p_ctx.sessionIdx))
    {
      f_EPTF_MQTT_Logging_WARNING(%definitionId &
        ": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
      return false;
    }

    p_ctx.eIdx := pl_ptr.eIdx;
    p_ctx.fsmIdx := pl_ptr.refContext.fCtxIdx;

    return true;
  }

  function f_EPTF_MQTT_setCtx(in integer p_eIdx, in integer p_fsmIdx, inout MQTT_StepCtx p_ctx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    if (not f_EPTF_MQTT_isFsmInitialized(p_eIdx, p_fsmIdx, p_ctx.sessionIdx))
    {
      f_EPTF_MQTT_Logging_WARNING(%definitionId &
        ": FSM has not been initialized. The f_MQTT_step_init function must be called as first step in the FSMs using MQTT.");
      return false;
    }

    p_ctx.eIdx := p_eIdx;
    p_ctx.fsmIdx := p_fsmIdx;

    return true;
  }

  function f_EPTF_MQTT_isFsmInitialized(in integer pl_eIdx, in integer pl_fsmIdx, inout integer pl_sessionIdx)
  runs on EPTF_MQTT_LGen_CT
  return boolean
  {
    pl_sessionIdx := -1;

    var EPTF_IntegerList vl_appData := f_EPTF_LGenBase_getAppDataListOfFsmCtx(pl_eIdx, pl_fsmIdx, v_MQTT_bIdx);

    if (c_MQTT_AppData_sessionIdx < sizeof(vl_appData))
    {
      pl_sessionIdx := vl_appData[c_MQTT_AppData_sessionIdx];
    }

    return -1 < pl_sessionIdx and sizeof(v_MQTT_sessionDB.data) > pl_sessionIdx;
  }

  function f_EPTF_MQTT_getIntValue(
    in EPTF_IntegerList pl_intList,
    in integer pl_number,
    inout integer pl_value)
  return boolean
  {
    if (sizeof(pl_intList) > pl_number)
    {
      pl_value := pl_intList[pl_number];
      return true;
    }
    return false;
  }

  function f_EPTF_MQTT_Logging_VERBOSE(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
    	f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUGV});
	}
  }

  function f_EPTF_MQTT_Logging_DEBUG(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
	    f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_DEBUG});
	}
  }

  function f_EPTF_MQTT_Logging_WARNING(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
	    f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_WARNING});
	}
  }

  function f_EPTF_MQTT_Logging_ERROR(in @lazy charstring pl_message)
  runs on EPTF_MQTT_LGen_CT
  {
    if (c_EPTF_Common_debugSwitch) {
    	f_EPTF_Logging_debugV2(pl_message, v_MQTT_loggingMaskId, {c_MQTT_LGen_Logging_ERROR});
	}
  }

  function f_EPTF_MQTT_dispatchEvent(in integer pl_eventIdx, in integer pl_eIdx, in integer pl_fsmCtx, in EPTF_IntegerList pl_reportedArgs)
  runs on EPTF_MQTT_LGen_CT
  {
    if (pl_eIdx < 0)
    {
      f_EPTF_LGenBase_postEvent(
        {
          {
            v_MQTT_bIdx,
            pl_eventIdx,
            omit,
            omit
          },
          pl_reportedArgs
      });
    }
    else
    {
      if (pl_fsmCtx < 0)
      {
        f_EPTF_LGenBase_postEvent(
          {
            {
              v_MQTT_bIdx,
              pl_eventIdx,
              {
                pl_eIdx,
                omit
              }, omit
            },
            pl_reportedArgs
        });
      }
      else
      {
        f_EPTF_LGenBase_postEvent(
          {
            {
              v_MQTT_bIdx,
              pl_eventIdx,
              {
                pl_eIdx,
                pl_fsmCtx
              }, omit
            },
            pl_reportedArgs
        });
      }
    }
  }

  function f_EPTF_MQTT_qos_int2enum(in integer p_qos)
  return QoS
  {
    if (p_qos == 0) {
      return AT_MOST_ONCE_DELIVERY;
    }
    else if (p_qos == 1) {
      return AT_LEAST_ONCE_DELIVERY;
    }
    else if (p_qos == 2) {
      return EXACTLY_ONE_DELIVERY;
    }
    return RESERVED;
  }

  function f_EPTF_MQTT_publishResponseType(in MQTT_v3_1_1_ReqResp p_msg)
  return boolean
  {
    return
      ischosen(p_msg.puback) or
      ischosen(p_msg.pubrec) or
      ischosen(p_msg.pubrel) or
      ischosen(p_msg.pubcomp)
  }

  function f_EPTF_MQTT_publishResponsePacketId(in MQTT_v3_1_1_ReqResp p_msg)
  return integer
  {
    if (ischosen(p_msg.puback)) { return p_msg.puback.packet_identifier }
    else if (ischosen(p_msg.pubrec)) { return p_msg.pubrec.packet_identifier }
    else if (ischosen(p_msg.pubrel)) { return p_msg.pubrel.packet_identifier }
    else if (ischosen(p_msg.pubcomp)) { return p_msg.pubcomp.packet_identifier }
    else {
      return -1;
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_pingReq :=
  {
    pingreq := {
      header := {
        flags := '0000'B
      }
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBACK(in integer p_packetId) :=
  {
    puback := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREL(in integer p_packetId) :=
  {
    pubrel := {
      header := {
        flags := '0010'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBREC(in integer p_packetId) :=
  {
    pubrec := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

  template MQTT_v3_1_1_ReqResp t_EPTF_MQTT_PUBCOMP(in integer p_packetId) :=
  {
    pubcomp := {
      header := {
        flags := '0000'B
      },
      packet_identifier := p_packetId
    }
  }

}
