blob: 75c869a4656eabfdc392c279c8452488dfebfb22 [file] [log] [blame]
/*****************************************************************************
* Copyright (c) 2018-2020 CEA LIST.
*
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* Contributors:
* Ansgar Radermacher ansgar.radermacher@cea.fr
* Matteo MORELLI matteo.morelli@cea.fr - Bug #566899
*
*****************************************************************************/
package org.eclipse.papyrus.robotics.ros2.codegen.common.launch
import java.util.ArrayList
import java.util.Collections
import java.util.List
import org.eclipse.papyrus.designer.deployment.tools.DepUtils
import org.eclipse.papyrus.designer.languages.common.base.file.IPFileSystemAccess
import org.eclipse.papyrus.robotics.core.utils.PortUtils
import org.eclipse.papyrus.robotics.ros2.preferences.Ros2Distributions
import org.eclipse.papyrus.robotics.ros2.preferences.Ros2PreferenceUtils
import org.eclipse.papyrus.uml.tools.utils.ConnectorUtil
import org.eclipse.uml2.uml.Class
import org.eclipse.uml2.uml.InstanceValue
import org.eclipse.uml2.uml.Port
import org.eclipse.uml2.uml.Property
import static extension org.eclipse.papyrus.robotics.codegen.common.utils.ComponentUtils.isRegistered
import static extension org.eclipse.papyrus.robotics.codegen.common.utils.PackageTools.pkgName
import static extension org.eclipse.papyrus.robotics.codegen.common.utils.TopicUtils.getTopic
import static extension org.eclipse.papyrus.robotics.core.utils.InstanceUtils.*
import static extension org.eclipse.papyrus.robotics.core.utils.InteractionUtils.*
import static extension org.eclipse.papyrus.robotics.ros2.codegen.common.utils.SequencerUtils.*
import static extension org.eclipse.papyrus.robotics.ros2.codegen.common.utils.SkillUtils.*
import org.eclipse.papyrus.designer.languages.common.base.file.ProtSection
/**
* Create a set of launch scripts
*/
class LaunchScript {
/**
* create a launch script for a single component instance (part)
*/
static def createLaunchScriptSingle(Class system, Property part, boolean addSequencer) '''
from launch import LaunchDescription
from launch_ros.actions import «IF part.isLifecycle»Lifecycle«ENDIF»Node
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('«system.nearestPackage.pkgName»')
# «ProtSection.protSection("imports")»
# «ProtSection.protSection»
def generate_entity():
# define empty user parameter (eventually redefined in the protected section afterwards)
«IF !addSequencer»
«part.name»_custom_params = {}
«ELSE»
«system.sequencerName»
«ENDIF»
# «ProtSection.protSection("parameters")»
# «ProtSection.protSection»
# Add the actions to the launch description.
«val component = part.type as Class»
«IF !addSequencer»
return «IF part.isLifecycle»Lifecycle«ENDIF»Node(
«IF eloquent»node_«ENDIF»name='«part.name»',
package='«component.nearestPackage.pkgName»', «IF eloquent»node_«ENDIF»executable='«component.name»«IF component.isRegistered»_main«ENDIF»',
namespace='',
remappings=[
«FOR remap : system.remaps(part) SEPARATOR(", ")»«remap»«ENDFOR»
],
parameters=[share_dir+'/launch/cfg/param.yaml', «part.name»_custom_params],
output='screen',
emulate_tty=True # assure that RCLCPP output gets flushed
)
«ELSE»
return LifecycleNode(
«IF eloquent»node_«ENDIF»name='«system.sequencerName»',
package='bt_sequencer', «IF eloquent»node_«ENDIF»executable='bt_sequencer',
namespace='',
parameters=[share_dir+'/launch/cfg/param.yaml'],
output='screen',
emulate_tty=True # assure that RCLCPP output gets flushed
)
«ENDIF»
def generate_launch_description():
# Launch Description
ld = LaunchDescription()
ld.add_entity(generate_entity())
# «ProtSection.protSection("post-launch")»
# «ProtSection.protSection»
return ld
'''
/**
* Create a launch script based on the launch script for individual instances. This script is used for
* individual activation scripts as well as for launh and activation of the whole system
*/
static def createLaunchScript(Class system, List<Property> parts, boolean addSequencer, boolean activate) '''
import launch.actions
import launch_ros
import lifecycle_msgs.msg
import sys
import os.path
from launch_ros.events.lifecycle import ChangeState
from launch import LaunchDescription
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('«system.nearestPackage.pkgName»')
# generated scripts are in launch folder
sys.path.append(os.path.join(share_dir, 'launch'))
# load launch scripts for each instance
«FOR part : parts»
import launch_«part.name»
«ENDFOR»
«IF addSequencer»
import launch_«system.sequencerName»
«ENDIF»
# «ProtSection.protSection("imports")»
# «ProtSection.protSection»
def generate_launch_description():
# Launch Description
ld = launch.LaunchDescription()
# call entity creation for each instance
«FOR part : parts»
«part.name»_node = launch_«part.name».generate_entity()
«ENDFOR»
«IF addSequencer»
«system.sequencerName»_node = launch_«system.sequencerName».generate_entity()
«ENDIF»
# now add all obtained entities to the launch description
«FOR part : parts»
ld.add_entity(«part.name»_node)
«ENDFOR»
«IF addSequencer»
ld.add_entity(«system.sequencerName»_node)
«ENDIF»
«IF activate»
# transition to configure after startup
«FOR part : parts»
«IF part.isLifecycle»
«generateConfig(part.name)»
«ENDIF»
«ENDFOR»
# transition to activate, once inactive
«FOR part : parts»
«IF part.isLifecycle»
«generateActivation(part.name)»
«ENDIF»
«ENDFOR»
«ENDIF»
# «ProtSection.protSection("post-launch")»
# «ProtSection.protSection»
return ld
'''
/**
* Generate the commands for node configuration given node name
*/
static def generateConfig(String name) '''
configure_«name» = launch.actions.RegisterEventHandler(
launch.event_handlers.on_process_start.OnProcessStart(
target_action=«name»_node,
on_start=[
launch.actions.EmitEvent(
event=launch_ros.events.lifecycle.ChangeState(
lifecycle_node_matcher=launch.events.matches_action(«name»_node),
transition_id=lifecycle_msgs.msg.Transition.TRANSITION_CONFIGURE
)
)
]
)
)
ld.add_entity(configure_«name»)
'''
/**
* Generate the commands for node activation given node name
*/
static def generateActivation(String name) '''
activate_«name» = launch.actions.RegisterEventHandler(
launch_ros.event_handlers.OnStateTransition(
target_lifecycle_node=«name»_node,
start_state='configuring', goal_state='inactive',
entities=[
launch.actions.EmitEvent(
event=launch_ros.events.lifecycle.ChangeState(
lifecycle_node_matcher=launch.events.matches_action(«name»_node),
transition_id=lifecycle_msgs.msg.Transition.TRANSITION_ACTIVATE
)
)
]
)
)
ld.add_entity(activate_«name»)
'''
/**
* initialize parameter values from IS and the (implicit) system sequencer model
* (In the current implementation, the default skill semantics realization is always selected)
*/
static def createParameterFile(Class system) '''
«FOR part : system.compInstanceList»
«IF part.defaultValue instanceof InstanceValue»
«val is = (part.defaultValue as InstanceValue).instance»
«IF is !== null»
«part.name»:
ros__parameters:
«FOR slot : is.slots»
«IF slot.definingFeature !== null && DepUtils.firstValue(slot) !== null»
«slot.definingFeature.name» : «DepUtils.firstValue(slot).stringValue»
«ENDIF»
«ENDFOR»
«ENDIF»
«ENDIF»
«ENDFOR»
«IF !system.uniqueSkills.nullOrEmpty»
«system.sequencerName»:
ros__parameters:
plugin_lib_names: [
«FOR skill : system.getUniqueSkills SEPARATOR ',
«" \""+skill.realizationFileName»_bt_node"
«ENDFOR»
]
«ENDIF»
'''
/**
* an XML based aunch script (XML)
*/
static def createLaunchScriptXML(Class system) '''
<launch>
«FOR part : system.compInstanceList»
«val component = part.type as Class»
<node pkg="«component.nearestPackage.pkgName»" exec="«component.name»" name="«part.name»" respawn="false" output="screen">
«FOR remap : system.remapsXML(part)»
«remap»
«ENDFOR»
<param from="$(find-pkg-share «component.nearestPackage.pkgName»)/launch/cfg/param.yaml"/>
</node>
«ENDFOR»
</launch>
'''
/**
* Remappings for XML
*/
static def remapsXML(Class system, Property part) {
val remaps = new ArrayList<String>
for (port : PortUtils.getAllPorts(part.type as Class)) {
val oppositeEnd = system.getOpposite(part, port)
if (oppositeEnd !== null) {
val from = port.topic
val to = part.getTopic(port, oppositeEnd)
if (from != to) {
remaps.add('''<remap from="«from»" to="«to»"/>''')
}
}
}
return remaps
}
/**
* Re-mappings for command line (used for debugging a binary)
*/
static def remapsCmd(Class system, Property part) '''
«FOR port : PortUtils.getAllPorts(part.type as Class) SEPARATOR " "»
«val oppositeEnd = system.getOpposite(part, port)»
«IF oppositeEnd !== null»
«val from = port.topic»
«val to = part.getTopic(port, oppositeEnd)»
«IF from != to»
-r «port.topic»:=«part.getTopic(port, oppositeEnd)»
«ENDIF»
«ENDIF»
«ENDFOR»
'''
static def remaps(Class system, Property part) {
val remaps = new ArrayList<String>
for (port : PortUtils.getAllPorts(part.type as Class)) {
val oppositeEnd = system.getOpposite(part, port)
if (oppositeEnd !== null && port.commObject !== null) {
val from = port.topic
val to = part.getTopic(port, oppositeEnd)
if (!from.equals(to)) {
remaps.add('''('«from»', '«to»')''')
}
}
}
return remaps
}
/**
* Return opposite part, based on the assumption that there is a single port
* targeting a port
*/
static def getOpposite(Class system, Property part, Port port) {
for (connector: system.ownedConnectors) {
if (ConnectorUtil.connectsPort(connector, port)) {
val end = ConnectorUtil.connEndNotPart(connector, part)
if (end !== null) {
return end
}
}
}
return null
}
static def generateLaunch(IPFileSystemAccess fileAccess, Class system) {
val parts = system.compInstanceList;
val addSequencer = !system.uniqueSkills.nullOrEmpty
fileAccess.generateFile("launch/launch.py", system.createLaunchScript(parts, addSequencer, false).toString)
fileAccess.generateFile("launch/activate.py", system.createLaunchScript(parts, addSequencer, true).toString)
// create a separate launch file for each part
for (Property part : parts) {
fileAccess.generateFile('''launch/launch_«part.name».py''', system.createLaunchScriptSingle(part, false).toString)
if (part.isLifecycle) {
fileAccess.generateFile('''launch/activate_«part.name».py''', system.createLaunchScript(Collections.singletonList(part), false, true).toString)
}
}
if (isGalactic) {
// generate XML based file
fileAccess.generateFile('''launch/«system.name».launch''', system.createLaunchScriptXML.toString)
}
if (addSequencer) {
// create a separate launch file for the sequencer
fileAccess.generateFile('''launch/launch_«system.sequencerName».py''', system.createLaunchScriptSingle(null, true).toString)
// do not activate the sequencer directly - the user must first specify which behavior wants to be executed
// fileAccess.generateFile('''launch/activate_«system.sequencerName».py''', system.createLaunchScript(Collections.emptyList(), true, true).toString)
}
// create the parameters file
fileAccess.generateFile("launch/cfg/param.yaml", system.createParameterFile().toString)
}
static def isEloquent() {
Ros2PreferenceUtils.rosDistro === Ros2Distributions.ELOQUENT;
}
static def isGalactic() {
Ros2PreferenceUtils.rosDistro === Ros2Distributions.GALACTIC;
}
}