blob: 08ffdf10eecabb25a3e26e8b3a0bb60ee208b8c0 [file] [log] [blame]
/******************************************************************************
* Copyright (c) 2005 The Regents of the University of California.
* This material was produced under U.S. Government contract W-7405-ENG-36
* for Los Alamos National Laboratory, which is operated by the University
* of California for the U.S. Department of Energy. The U.S. Government has
* rights to use, reproduce, and distribute this software. NEITHER THE
* GOVERNMENT NOR THE UNIVERSITY MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
* ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified
* to produce derivative works, such modified software should be clearly
* marked, so as not to confuse it with the version available from LANL.
*
* Additionally, this program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* LA-CC 04-115
*
* Copyright (c) 2005, 2007 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
******************************************************************************/
/******************************************************************************
* NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
* This code best developed and formatted on Linux using indent as follows:
* indent -kr -bap -bli2 -br -brs -cbi2 -ci2 -cli2 -d2 -di2 -fc1 -i2 -l1255
* -lc255 -nbad -nbfde -nbfda -nbc -nsob -nut -sc -nfca ptp_ibmll_proxy.c
* Note that the formatter still screws up a few comments but they are easily
* fixed and the code is so much more readable. It is appreciated if you stay
* with this formatting if you change the code in the repository. Thanks.
******************************************************************************/
#include <pthread.h>
#include "config.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <strings.h>
#ifdef __linux__
#include <getopt.h>
#endif
#include <errno.h>
#include <pwd.h>
#include <grp.h>
#include <proxy_cmd.h>
#include <proxy.h>
#include <proxy_tcp.h>
#include <proxy_event.h>
#include <proxy_cmd.h>
#include <proxy_msg.h>
#include <handler.h>
#include <signal.h>
#include <list.h>
#include <hash.h>
#include <limits.h>
#include <dlfcn.h>
#include <llapi.h>
/*
* RTEV_ERROR codes are used internally in the ibmll specific plugin
*/
#define RTEV_ERROR_LL_INIT RTEV_OFFSET + 1000
#define RTEV_ERROR_LL_FINALIZE RTEV_OFFSET + 1001
#define RTEV_ERROR_LL_SUBMIT_JOB RTEV_OFFSET + 1002
#define RTEV_ERROR_CANCEL_JOB RTEV_OFFSET + 1003
#define DEFAULT_PROXY "tcp"
#define DEFAULT_QUEUE_NAME "default"
#define ATTRIB_NODE_NAME_ID 1
#define ATTRIB_QUEUE_NAME_ID 1
/*
* The following groups of definitions belong in a global PTP header,
* not in individual proxies
*/
/*
* RTEV codes must EXACTLY
* match org.eclipse.ptp.rtsystem.proxy.event.IProxyRuntimeEvent
*/
#define RTEV_OFFSET 200
static RETSIGTYPE ptp_signal_handler(int sig);
static int server(char *name, char *host, int port, char *libpath);
static int command_initialize(int gui_transmission_id, int nargs, char *args[]);
static int command_define_model(int gui_transmission_id, int nargs, char *args[]);
static int command_submit_job(int gui_transmission_id, int nargs, char *args[]);
static int command_cancel_job(int gui_transmission_id, int nargs, char *args[]);
static int command_terminate(int gui_transmission_id, int nargs, char *args[]);
static int command_start_events(int gui_transmission_id, int nargs, char *args[]);
static int command_halt_events(int gui_transmission_id, int nargs, char *args[]);
static void print_message_args(int nargs, char *args[]);
static void print_message(int type, const char *, ...); /* INFO, TRACE & WARNING to stdout, ERROR & FATAL to stderr */
#define INFO_MESSAGE 0
#define TRACE_MESSAGE 1
#define WARNING_MESSAGE 2
#define ERROR_MESSAGE 3
#define FATAL_MESSAGE 4
#define ARGS_MESSAGE 5
static int state_trace = 0; /* 0=message off, 1=message on */
static int state_info = 0; /* 0=message off, 1=message on */
static int state_warning = 0; /* 0=message off, 1=message on */
static int state_error = 0; /* 0=message off, 1=message on */
static int state_fatal = 0; /* 0=message off, 1=message on */
static int state_args = 0; /* 0=message off, 1=message on */
static int state_events_active = 0; /* events are off initialize */
static int state_debug_loop = 0; /* debug loop for debugger attach is off by default */
static int state_shutdown_requested = 0; /* shutdown not in progress */
static int state_message_timedate = 1; /* want time date stamp in messages */
static int state_message_threadid = 1; /* want thread id in messages */
static int state_template = 1; /* always write default template at startup */
static char *static_template_prefix = "/tmp/PTP_IBMLL_TEMPLATE_";
static char *userid = NULL;
static char static_template_name[256]; /* template path and name */
static char *static_template_override = "";
static int max_node_sleep_seconds = 300;
static int min_node_sleep_seconds = 30;
static int job_sleep_seconds = 30;
static char hostname[256];
static int debug_level = 0; /* 0 is off */
struct ClusterObject { /* a LoadLeveler cluster (same as a ptp machine) */
int proxy_generated_cluster_id;
char *cluster_name;
Hash *node_hash;
int proxy_generated_queue_id;
int cluster_state;
int queue_state;
int cluster_is_local;
int node_cleanup_required;
int job_cleanup_required;
};
typedef struct ClusterObject ClusterObject;
struct NodeObject { /* a LoadLeveler or ptp node in a cluster (machine) */
int proxy_generated_node_id;
int node_found; /* node found indicator */
int node_state;
char *node_name; /* use the name as the key to the node hash table in the cluster object */
};
typedef struct NodeObject NodeObject;
NodeObject *get_node_in_hash(Hash * node_hash, char *node_name);
void add_node_to_hash(Hash * node_hash, NodeObject * node_object);
void delete_node_from_list(List * node_list, NodeObject * node_object);
struct JobObject { /* a LoadLeveler or ptp job in a cluster */
int proxy_generated_job_id;
char *gui_assigned_job_id;
int job_found; /* job found indicator */
int job_state; /* 1=submitted, 2=in queue */
time_t job_submit_time; /* time when submitted */
List *task_list; /* processes running for this job */
LL_STEP_ID ll_step_id;
char *cluster_name;
};
typedef struct JobObject JobObject;
JobObject *get_job_in_list(List * job_list, LL_STEP_ID ll_step_ID);
JobObject *get_job_in_list_from_id(List * job_list, int job_id);
void add_job_to_list(List * job_list, JobObject * job_object);
void delete_job_from_list(List * job_list, JobObject * job_object);
struct TaskObject { /* a LoadLeveler or ptp task for job */
int proxy_generated_task_id;
int task_found; /* job found indicator */
int ll_task_id;
int task_state;
char *node_name;
char *node_address;
};
typedef struct TaskObject TaskObject;
TaskObject *get_task_in_list(List * task_list, char *task_instance_machine_name, int ll_task_id);
void add_task_to_list(List * task_list, TaskObject * task_object);
void delete_task_from_list(List * task_list, TaskObject * task_object);
static void sendErrorEvent(int gui_transmission_id, int type, char *msg);
static void sendJobSubmissionErrorEvent(int gui_transmission_id, char *subid, char *msgtext);
static void sendOkEvent(int gui_transmission_id);
static void sendShutdownEvent(int gui_transmission_id);
static int sendAttrDefIntEvent(int gui_transmission_id, char *id, char *shortname, char *description, int show, int default_value, int llimit, int ulimit);
static int sendAttrDefStringEvent(int gui_transmission_id, char *id, char *shortname, char *description, int show, char *value);
static int sendMachineAddEvent(int gui_transmission_id, ClusterObject * cluster_object);
static int sendQueueAddEvent(int gui_transmission_id, ClusterObject * cluster_object);
static int sendNodeAddEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object);
static int sendNodeChangeEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object);
static int sendNodeRemoveEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object);
static int sendJobAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object);
static int sendJobChangeEvent(int gui_transmission_id, JobObject * job_object);
static int sendJobRemoveEvent(int gui_transmission_id, JobObject * job_object);
static int sendTaskAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object, TaskObject * task_object);
static int sendTaskChangeEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object);
static int sendTaskRemoveEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object);
static int generate_id(void);
static void enqueue_event_to_proxy_server(proxy_msg * event);
int main(int argc, char *argv[]);
extern char **environ;
void refresh_cluster_list(); /* refresh static list of clusters */
static List *cluster_list = NULL; /* list of clusters if multicluster (we'll set to single local if none) */
static List *job_list = NULL; /* job list for all clusters (since jobs can move) */
int get_multicluster_status(); /* are we running multicluster or single cluster ? */
static int multicluster_status = -1; /* init to not determined yet - we want 0-local 1-multicluster */
static pthread_t thread_map_table[256] = { 0, 0, 0, 0 }; /* to simplify messages */
static void malloc_check(void *p, const char *function, int line);
int is_substitution_required(char *line);
int register_thread(pthread_t handle);
int find_thread(pthread_t handle);
#define MY_STATE_UNKNOWN 0
#define MY_STATE_UP 1
#define MY_STATE_DOWN 2
#define MY_STATE_STOPPED 3
#define MY_STATE_RUNNING 4
#define MY_STATE_IDLE 5
#define MY_STATE_TERMINATED 6
static int ptp_signal_exit;
static int ptp_signal_thread;
static List *events = NULL;
static RETSIGTYPE(*saved_signals[NSIG]) (int);
static proxy_svr *ll_proxy = NULL;
static pthread_t monitor_LoadLeveler_jobs_thread = 0;
static pthread_attr_t monitor_LoadLeveler_jobs_thread_attr;
static void *monitor_LoadLeveler_jobs(void *args);
static pthread_t monitor_LoadLeveler_nodes_thread = 0;
static pthread_attr_t monitor_LoadLeveler_nodes_thread_attr;
static void *monitor_LoadLeveler_nodes(void *args);
static pthread_mutex_t access_LoadLeveler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t proxy_svr_queue_msg_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t print_message_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t master_lock = PTHREAD_MUTEX_INITIALIZER;
#ifdef __linux__
static struct option longopts[] = {
{"proxy", required_argument, NULL, 'P'},
{"port", required_argument, NULL, 'p'},
{"host", required_argument, NULL, 'h'},
{"lib_override", optional_argument, NULL, 'l'},
{"debug_loop", optional_argument, NULL, 'd'},
{"debug", optional_argument, NULL, 'g'},
{"trace_messages", optional_argument, NULL, 't'},
{"info_messages", optional_argument, NULL, 'i'},
{"warning_messages", optional_argument, NULL, 'w'},
{"error_messages", optional_argument, NULL, 'e'},
{"fatal_messages", optional_argument, NULL, 'f'},
{"args_messages", optional_argument, NULL, 'a'},
{"multicluster", optional_argument, NULL, 'm'},
{"template_override", optional_argument, NULL, 'o'},
{"template_write", optional_argument, NULL, 'r'},
{"node_polling_min", optional_argument, NULL, 'x'},
{"node_polling_max", optional_argument, NULL, 'y'},
{"job_polling", optional_argument, NULL, 'z'},
{NULL, 0, NULL, 0}
};
#endif
static proxy_svr_helper_funcs helper_funcs = {
NULL,
NULL
};
static proxy_cmd ibmll_cmds[] = {
command_terminate,
command_initialize,
command_define_model,
command_start_events,
command_halt_events,
command_submit_job,
command_cancel_job
};
static proxy_commands command_tab = {
0, sizeof(ibmll_cmds) / sizeof(proxy_cmd), ibmll_cmds
};
static char * ibmll_libpath_name = NULL;
static void * ibmll_libpath_handle = 0;
static int ibmll_proxy_base_id = 0;
static char ibmll_proxy_base_id_string[256];
static int ibmll_last_id = 0;
static int start_events_gui_transmission_id = 0;
/**********************************************************************
* LoadLeveler symbols from dynamic load of libllapi.a or libllapi.so *
**********************************************************************/
static struct {
LL_element *(*ll_query) (enum QueryType);
int (*ll_set_request) (LL_element *, enum QueryFlags, char **, enum DataFilter);
LL_element *(*ll_get_objs) (LL_element *, enum LL_Daemon, char *, int *, int *);
int (*ll_get_data) (LL_element *, enum LLAPI_Specification, void *);
int (*ll_deallocate) (LL_element *);
LL_element *(*ll_next_obj) (LL_element *);
int (*ll_free_objs) (LL_element *);
int (*ll_cluster) (int, LL_element **, LL_cluster_param *);
int (*ll_submit_job) (char *job_cmd_file, char *monitor_program, char *monitor_arg, LL_job * job_info, int job_version);
int (*ll_terminate_job) (LL_terminate_job_info * ptr);
void (*ll_free_job_info) (LL_job * job_info, int job_version);
char *(*ll_error) (LL_element ** errObj, int print_to);
} LL_SYMS;
int my_ll_cluster(int, LL_element **, LL_cluster_param *);
int my_ll_get_data(LL_element *, enum LLAPI_Specification, void *);
LL_element *my_ll_query(enum QueryType);
int my_ll_free_objs(LL_element *);
int my_ll_deallocate(LL_element *);
LL_element *my_ll_next_obj(LL_element *);
LL_element *my_ll_get_objs(LL_element *, enum LL_Daemon, char *, int *, int *);
int my_ll_set_request(LL_element *, enum QueryFlags, char **, enum DataFilter);
int my_ll_submit_job(int gui_transmission_id, char *job_sub_id, char *command_file, char *job_env_vars[][3]);
int my_ll_terminate_job(int gui_transmission_id, JobObject * job_object);
void my_ll_free_job_info(LL_job * job_info);
void print_proxy_message(proxy_msg * msg);
/*-----------------------------------------------------------------------*
* attributes to send to front end in response to the *
* command_define_model call to initialize the tabbed panels. *
* This code is set up to send up blanks as defaults and not return any *
* field with blank back from gui so it will be stripped from job *
* command file. *
*-----------------------------------------------------------------------*/
/*-----------------------------------------------------------------------*
* Strings *
*-----------------------------------------------------------------------*/
typedef struct {
char *id; /* Attribute identifier */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
char *default_value; /* Attribute's default value */
} string_launch_attr;
static string_launch_attr string_launch_attrs[] = {
{"LL_PTP_JOB_COMMAND_FILE", "IBMLLLaunch.LL_PTP_JOB_COMMAND_FILE_LABEL", "IBMLLLaunch.LL_PTP_JOB_COMMAND_FILE_TOOLTIP", ""},
{"LL_PTP_JOB_COMMAND_FILE_TEMPLATE", "IBMLLLaunch.LL_PTP_JOB_COMMAND_FILE_TEMPLATE_LABEL", "IBMLLLaunch.LL_PTP_JOB_COMMAND_FILE_TEMPLATE_TOOLTIP", "/tmp/PTP_IBMLL_TEMPLATE_userid"},
{"LL_PTP_CLASS", "IBMLLLaunch.LL_PTP_CLASS_LABEL", "IBMLLLaunch.LL_PTP_CLASS_TOOLTIP", "No_Class"},
{"LL_PTP_COMMENT", "IBMLLLaunch.LL_PTP_COMMENT_LABEL", "IBMLLLaunch.LL_PTP_COMMENT_TOOLTIP", ""},
{"LL_PTP_ERROR", "IBMLLLaunch.LL_PTP_ERROR_LABEL", "IBMLLLaunch.LL_PTP_ERROR_TOOLTIP", "/dev/null"},
{"LL_PTP_EXECUTABLE", "IBMLLLaunch.LL_PTP_EXECUTABLE_LABEL", "IBMLLLaunch.LL_PTP_EXECUTABLE_TOOLTIP", ""},
{"LL_PTP_ENVIRONMENT", "IBMLLLaunch.LL_PTP_ENVIRONMENT_LABEL", "IBMLLLaunch.LL_PTP_ENVIRONMENT_TOOLTIP", "COPY_ALL"},
{"LL_PTP_INPUT", "IBMLLLaunch.LL_PTP_INPUT_LABEL", "IBMLLLaunch.LL_PTP_INPUT_TOOLTIP", "/dev/null"},
{"LL_PTP_OUTPUT", "IBMLLLaunch.LL_PTP_OUTPUT_LABEL", "IBMLLLaunch.LL_PTP_OUTPUT_TOOLTIP", "/dev/null"},
{"LL_PTP_INITIALDIR", "IBMLLLaunch.LL_PTP_INITIALDIR_LABEL", "IBMLLLaunch.LL_PTP_INITIALDIR_TOOLTIP", ""},
{"LL_PTP_JOB_NAME", "IBMLLLaunch.LL_PTP_JOB_NAME_LABEL", "IBMLLLaunch.LL_PTP_JOB_NAME_TOOLTIP", ""},
{"LL_PTP_NETWORK_MPI", "IBMLLLaunch.LL_PTP_NETWORK_MPI_LABEL", "IBMLLLaunch.LL_PTP_NETWORK_MPI_TOOLTIP", ""},
{"LL_PTP_NETWORK_LAPI", "IBMLLLaunch.LL_PTP_NETWORK_LAPI_LABEL", "IBMLLLaunch.LL_PTP_NETWORK_LAPI_TOOLTIP", ""},
{"LL_PTP_NETWORK_MPI_LAPI", "IBMLLLaunch.LL_PTP_NETWORK_MPI_LAPI_LABEL", "IBMLLLaunch.LL_PTP_NETWORK_MPI_LAPI_TOOLTIP", ""},
{"LL_PTP_REQUIREMENTS", "IBMLLLaunch.LL_PTP_REQUIREMENTS_LABEL", "IBMLLLaunch.LL_PTP_REQUIREMENTS_TOOLTIP", ""},
{"LL_PTP_RESOURCES", "IBMLLLaunch.LL_PTP_RESOURCES_LABEL", "IBMLLLaunch.LL_PTP_RESOURCES_TOOLTIP", ""},
{"LL_PTP_SHELL", "IBMLLLaunch.LL_PTP_SHELL_LABEL", "IBMLLLaunch.LL_PTP_SHELL_TOOLTIP", ""},
{"LL_PTP_TASK_GEOMETRY", "IBMLLLaunch.LL_PTP_TASK_GEOMETRY_LABEL", "IBMLLLaunch.LL_PTP_TASK_GEOMETRY_TOOLTIP", ""},
{"LL_PTP_NODE_MIN", "IBMLLLaunch.LL_PTP_NODE_MIN_LABEL", "IBMLLLaunch.LL_PTP_NODE_MIN_TOOLTIP", ""},
{"LL_PTP_NODE_MAX", "IBMLLLaunch.LL_PTP_NODE_MAX_LABEL", "IBMLLLaunch.LL_PTP_NODE_MAX_TOOLTIP", ""},
{"LL_PTP_BLOCKING", "IBMLLLaunch.LL_PTP_BLOCKING_LABEL", "IBMLLLaunch.LL_PTP_BLOCKING_TOOLTIP", ""},
{"LL_PTP_TOTAL_TASKS", "IBMLLLaunch.LL_PTP_TOTAL_TASKS_LABEL", "IBMLLLaunch.LL_PTP_TOTAL_TASKS_TOOLTIP", ""},
{"LL_PTP_WALLCLOCK_HARD", "IBMLLLaunch.LL_PTP_WALLCLOCK_HARD_LABEL", "IBMLLLaunch.LL_PTP_WALLCLOCK_HARD_TOOLTIP", "00:00:00"},
{"LL_PTP_WALLCLOCK_SOFT", "IBMLLLaunch.LL_PTP_WALLCLOCK_SOFT_LABEL", "IBMLLLaunch.LL_PTP_WALLCLOCK_SOFT_TOOLTIP", "00:00:00"}
};
/*-----------------------------------------------------------------------*
* Enums *
*-----------------------------------------------------------------------*/
typedef struct {
char *id; /* Attribute identifier */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
char *default_value; /* Attribute's default value */
char *enums; /* Enumeration values ',' delimited */
} enum_launch_attr;
static enum_launch_attr enum_attrs[] = {
{"LL_PTP_JOB_TYPE", "IBMLLLaunch.LL_PTP_JOB_TYPE_LABEL", "IBMLLLaunch.LL_PTP_JOB_TYPE_TOOLTIP", "Parallel", "Serial|Parallel|MPICH"},
{"LL_PTP_BULK_XFER", "IBMLLLaunch.LL_PTP_BULK_XFER_LABEL", "IBMLLLaunch.LL_PTP_BULK_XFER_TOOLTIP", "(LoadLeveler default)", "(LoadLeveler default)|YES|NO"},
{"LL_PTP_LARGE_PAGE", "IBMLLLaunch.LL_PTP_LARGE_PAGE_LABEL", "IBMLLLaunch.LL_PTP_LARGE_PAGE_TOOLTIP", "(LoadLeveler default)", "(LoadLeveler default)|Y|M|N"},
{"LL_PTP_SUBMIT_MODE", "IBMLLLaunch.LL_PTP_SUBMIT_MODE_LABEL", "IBMLLLaunch.LL_PTP_SUBMIT_MODE_TOOLTIP", "Advanced", "Advanced|Basic"}
};
/*-----------------------------------------------------------------------*
* Ints *
*-----------------------------------------------------------------------*/
typedef struct {
char *id; /* Attribute identifier */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
int default_value; /* Attribute's default value */
int llimit; /* Attribute's lower limit (min) */
int ulimit; /* Attribute's upper limit (max) */
} int_launch_attr;
int_launch_attr int_attrs[] = {
{"LL_PTP_TASKS_PER_NODE", "IBMLLLaunch.LL_PTP_TASKS_PER_NODE_LABEL", "IBMLLLaunch.LL_PTP_TASKS_PER_NODE_TOOLTIP", 0,0,INT_MAX}
};
/*-----------------------------------------------------------------------*
* Longs *
*-----------------------------------------------------------------------*/
typedef struct {
char *id; /* Attribute identifier */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
long long default_value; /* Attribute's default value */
long long llimit; /* Attribute's lower limit (min) */
long long ulimit; /* Attribute's upper limit (max) */
} long_int_launch_attr;
long_int_launch_attr long_int_attrs[] = {
{"PlaceHolder-long", "???", "Place holder:", 0, 0, 0x7fffffffffffffffLL}
};
/*************************************************************************
* Job Command File Template *
*************************************************************************/
static char *job_command_file_template[] = {
"#!/bin/sh",
"# @ account_no = <<<LL_PTP_ACCOUNT_NO>>>",
"# @ arguments = <<<progArgs>>>",
"#(NOT SUPPORTED)# @ bg_connection",
"#(NOT SUPPORTED)# @ bg_partition",
"#(NOT SUPPORTED)# @ bg_requirements",
"#(NOT SUPPORTED)# @ bg_route",
"#(NOT SUPPORTED)# @ bg_shape",
"#(NOT SUPPORTED)# @ bg_size",
"# @ blocking = <<<LL_PTP_BLOCKING>>>",
"# @ bulkxfer = <<<LL_PTP_BULKXFER>>>",
"# @ checkpoint = <<<LL_PTP_CHECKPOINT>>>",
"# @ ckpt_dir = <<<LL_PTP_CKPT_DIR>>>",
"# @ ckpt_execute_dir = <<<LL_PTP_CKPT_EXECUTE_DIR>>>",
"# @ ckpt_file = <<<LL_PTP_CKPT_FILE>>>",
"# @ ckpt_time_limit = <<<LL_PTP_CKPT_TIME_LIMIT_HARD>>>,<<<LL_PTP_CKPT_TIME_LIMIT_SOFT>>>",
"# @ class = <<<LL_PTP_CLASS>>>",
"# @ cluster_input_file = <<<LL_PTP_CLUSTER_INPUT_FILE_1>>>",
"# @ cluster_input_file = <<<LL_PTP_CLUSTER_INPUT_FILE_2>>>",
"# @ cluster_input_file = <<<LL_PTP_CLUSTER_INPUT_FILE_3>>>",
"# @ cluster_list = <<<LL_PTP_CLUSTER_LIST>>>",
"# @ cluster_output_file = <<<LL_PTP_CLUSTER_OUTPUT_FILE_1>>>",
"# @ cluster_output_file = <<<LL_PTP_CLUSTER_OUTPUT_FILE_2>>>",
"# @ cluster_output_file = <<<LL_PTP_CLUSTER_OUTPUT_FILE_3>>>",
"# @ comment = <<<LL_PTP_COMMENT>>>",
"# @ core_limit = <<<LL_PTP_CORE_LIMIT_HARD>>>,<<<LL_PTP_CORE_LIMIT_SOFT>>>",
"#(NOT SUPPORTED)# @ coschedule",
"# @ cpu_limit = <<<LL_PTP_CPU_LIMIT_HARD>>>,<<<LL_PTP_CPU_LIMIT_SOFT>>>",
"# @ data_limit = <<<LL_PTP_DATA_LIMIT_HARD>>>,<<<LL_PTP_DATA_LIMIT_SOFT>>>",
"#(NOT SUPPORTED)# @ dependency",
"# @ env_copy = <<<LL_PTP_ENV_COPY>>>",
"# @ environment = <<<LL_PTP_ENVIRONMENT>>>",
"# @ error = <<<LL_PTP_ERROR>>>",
"# @ executable = <<<execPath>>>/<<<execName>>>",
"# @ executable = <<<LL_PTP_EXECUTABLE>>>",
"# @ file_limit = <<<LL_PTP_FILE_LIMIT_HARD>>>,<<<LL_PTP_FILE_LIMIT_SOFT>>>",
"# @ group = <<<LL_PTP_GROUP>>>",
"#(NOT SUPPORTED)# @ hold",
"# @ image_size = <<<LL_PTP_IMAGE_SIZE>>>",
"# @ initialdir = <<<workingDir>>>",
"# @ initialdir = <<<LL_PTP_INITIALDIR>>>",
"# @ input = <<<LL_PTP_INPUT>>>",
"# @ job_cpu_limit = <<<LL_PTP_JOB_CPU_LIMIT_HARD>>>, <<<LL_PTP_JOB_CPU_LIMIT_SOFT>>>",
"# @ job_name = <<<LL_PTP_JOB_NAME>>>",
"# @ job_type = <<<LL_PTP_JOB_TYPE>>>",
"# @ large_page = <<<LL_PTP_LARGE_PAGE>>>",
"#(NOT SUPPORTED)# @ max_processors",
"# @ mcm_affinity_options = <<<LL_PTP_MCM_AFFINITY_OPTIONS>>>",
"#(NOT SUPPORTED)# @ min_processors",
"# @ network.MPI = <<<LL_PTP_NETWORK_MPI>>>",
"# @ network.LAPI = <<<LL_PTP_NETWORK_LAPI>>>",
"# @ network.MPI_LAPI = <<<LL_PTP_NETWORK_MPI_LAPI>>>",
"# @ node = <<<LL_PTP_NODE_MIN>>>,<<<LL_PTP_NODE_MAX>>>",
"# @ node_usage = <<<LL_PTP_NODE_USAGE>>>",
"# @ notification = <<<LL_PTP_NOTIFICATION>>>",
"# @ notify_user = <<<LL_PTP_NOTIFY_USER>>>",
"# @ output = <<<LL_PTP_OUTPUT>>>",
"# @ preferences = <<<LL_PTP_PREFERENCES>>>",
"# @ requirements = <<<LL_PTP_REQUIREMENTS>>>",
"# @ resources = <<<LL_PTP_RESOURCES>>>",
"# @ restart = <<<LL_PTP_RESTART>>>",
"# @ restart_from_ckpt = <<<LL_PTP_RESTART_FROM_CKPT>>>",
"#(NOT SUPPORTED)# @ restart_on_same_nodes",
"# @ rset = <<<LL_PTP_RSET>>>",
"# @ shell = <<<LL_PTP_SHELL>>>",
"# @ smt = <<<LL_PTP_SMT>>>",
"# @ stack_limit = <<<LL_PTP_STACK_LIMIT_HARD>>>,<<<LL_PTP_STACK_LIMIT_SOFT>>>",
"# @ start_date = <<<LL_PTP_START_DATE>>>",
"#(NOT SUPPORTED)# @ step_name",
"# @ task_geometry = <<<LL_PTP_TASK_GEOMETRY>>>",
"# @ tasks_per_node = <<<LL_PTP_TASKS_PER_NODE>>>",
"# @ total_tasks = <<<LL_PTP_TOTAL_TASKS>>>",
"# @ user_priority = <<<LL_PTP_USER_PRIORITY>>>",
"# @ wall_clock_limit = <<<LL_PTP_WALLCLOCK_HARD>>>,<<<LL_PTP_WALLCLOCK_SOFT>>>",
"# @ queue"
};
/*************************************************************************
* Proxy command handler - command_initialize *
* *
* Dynamically load LoadLeveler lib (libllapi.a for AIX or *
* libllapi.so for Linux) then resolve the functions we need to *
* call to talk to LoadLeveler (submit, cancel or query jobs, *
* query machines, ...) *
* *
* See llapi.h for interfaces to LoadLeveler. *
*************************************************************************/
int command_initialize(int gui_transmission_id, int nargs, char *args[])
{
int i;
int dlopen_mode = 0;
int my_errno = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(BASE_ID_ATTR, args[i])) {
ibmll_proxy_base_id = proxy_get_attribute_value_int(args[i]); /* the base id established by gui front end */
}
}
sprintf(ibmll_proxy_base_id_string, "%d", ibmll_proxy_base_id);
memset(&LL_SYMS, '\0', sizeof(LL_SYMS)); /* zero the LoadLeveler dlsym symbol table */
print_message(INFO_MESSAGE, "dlopen LoadLeveler shared library %s.\n", ibmll_libpath_name);
dlopen_mode = RTLD_LOCAL | RTLD_NOW;
#ifdef _AIX
dlopen_mode = dlopen_mode | RTLD_MEMBER;
#endif
ibmll_libpath_handle = dlopen(ibmll_libpath_name, dlopen_mode);
my_errno = errno;
if (ibmll_libpath_handle == NULL) {
print_message(ERROR_MESSAGE, "dlopen of %s failed with errno=%d.\n", ibmll_libpath_name, my_errno);
sendErrorEvent(gui_transmission_id, RTEV_ERROR_LL_INIT, "dlopen failed for LoadLeveler shared library");
return PROXY_RES_ERR;
} else {
print_message(INFO_MESSAGE, "dlopen %s successful.\n", ibmll_libpath_name);
}
print_message(INFO_MESSAGE, "Locating LoadLeveler functions via dlsym.\n");
*(void **) (&(LL_SYMS.ll_query)) = dlsym(ibmll_libpath_handle, "ll_query");
*(void **) (&LL_SYMS.ll_set_request) = dlsym(ibmll_libpath_handle, "ll_set_request");
*(void **) (&LL_SYMS.ll_get_objs) = dlsym(ibmll_libpath_handle, "ll_get_objs");
*(void **) (&LL_SYMS.ll_get_data) = dlsym(ibmll_libpath_handle, "ll_get_data");
*(void **) (&LL_SYMS.ll_free_objs) = dlsym(ibmll_libpath_handle, "ll_free_objs");
*(void **) (&LL_SYMS.ll_deallocate) = dlsym(ibmll_libpath_handle, "ll_deallocate");
*(void **) (&LL_SYMS.ll_next_obj) = dlsym(ibmll_libpath_handle, "ll_next_obj");
*(void **) (&LL_SYMS.ll_cluster) = dlsym(ibmll_libpath_handle, "ll_cluster");
*(void **) (&LL_SYMS.ll_submit_job) = dlsym(ibmll_libpath_handle, "llsubmit");
*(void **) (&LL_SYMS.ll_terminate_job) = dlsym(ibmll_libpath_handle, "ll_terminate_job");
*(void **) (&LL_SYMS.ll_free_job_info) = dlsym(ibmll_libpath_handle, "llfree_job_info");
*(void **) (&LL_SYMS.ll_error) = dlsym(ibmll_libpath_handle, "ll_error");
if ((LL_SYMS.ll_query == NULL) || (LL_SYMS.ll_set_request == NULL)
|| (LL_SYMS.ll_get_objs == NULL) || (LL_SYMS.ll_get_data == NULL)
|| (LL_SYMS.ll_free_objs == NULL)
|| (LL_SYMS.ll_deallocate == NULL)
|| (LL_SYMS.ll_cluster == NULL)
|| (LL_SYMS.ll_next_obj == NULL)
|| (LL_SYMS.ll_free_job_info == NULL)
|| (LL_SYMS.ll_terminate_job == NULL)
|| (LL_SYMS.ll_error == NULL)
|| (LL_SYMS.ll_submit_job == NULL)) {
print_message(ERROR_MESSAGE, "One or more LoadLeveler symbols could not be located in %s.\n", ibmll_libpath_name);
sendErrorEvent(gui_transmission_id, RTEV_ERROR_LL_INIT, "LoadLeveler symbols not located");
return PROXY_RES_ERR;
} else {
print_message(INFO_MESSAGE, "Successfully located all of the required LoadLeveler functions via dlsym.\n");
}
sendOkEvent(gui_transmission_id);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return PROXY_RES_OK;
}
/*************************************************************************
* Proxy command handler - command_define_model *
* *
* Send attributes to front end describing the launch *
* configuration (labels, default values, etc). *
*************************************************************************/
int command_define_model(int gui_transmission_id, int nargs, char *args[])
{
int i = 0;
char proxy_generated_attribute_string[256];
proxy_msg *msg;
char *cp;
char *end_cp;
char *cp_save;
int n;
/*
* Send the attribute definitions, launch attribute definitions,
* and element definitions known by this proxy to the GUI.
*/
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
/*-----------------------------------------------------------------------*
* send strings *
*-----------------------------------------------------------------------*/
for (i = 0; i < (sizeof(string_launch_attrs) / sizeof(string_launch_attr)); i++) {
if (strcmp("LL_PTP_JOB_COMMAND_FILE_TEMPLATE", string_launch_attrs[i].id) == 0) {
if (strlen(static_template_override) > 0) {
sendAttrDefStringEvent(gui_transmission_id, string_launch_attrs[i].id, string_launch_attrs[i].short_name, string_launch_attrs[i].long_name, 0, static_template_override);
} else {
sendAttrDefStringEvent(gui_transmission_id, string_launch_attrs[i].id, string_launch_attrs[i].short_name, string_launch_attrs[i].long_name, 0, static_template_name);
}
} else {
sendAttrDefStringEvent(gui_transmission_id, string_launch_attrs[i].id, string_launch_attrs[i].short_name, string_launch_attrs[i].long_name, 0, string_launch_attrs[i].default_value);
}
}
/*-----------------------------------------------------------------------*
* send ints *
*-----------------------------------------------------------------------*/
for (i = 0; i < (sizeof(int_attrs) / sizeof(int_launch_attr)); i++) {
sendAttrDefIntEvent(gui_transmission_id, int_attrs[i].id, int_attrs[i].short_name, int_attrs[i].long_name, 0, int_attrs[i].default_value, int_attrs[i].llimit, int_attrs[i].ulimit);
}
/*-----------------------------------------------------------------------*
* send longs *
*-----------------------------------------------------------------------*/
for (i = 0; i < (sizeof(long_int_attrs) / sizeof(long_int_launch_attr)); i++) {
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, gui_transmission_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, 8);
proxy_msg_add_string(msg, long_int_attrs[i].id);
proxy_msg_add_string(msg, "BIGINTEGER");
proxy_msg_add_string(msg, long_int_attrs[i].short_name);
proxy_msg_add_string(msg, long_int_attrs[i].long_name);
proxy_msg_add_int(msg, 0);
sprintf(proxy_generated_attribute_string, "%lld", long_int_attrs[i].default_value);
proxy_msg_add_string(msg, proxy_generated_attribute_string);
sprintf(proxy_generated_attribute_string, "%lld", long_int_attrs[i].llimit);
proxy_msg_add_string(msg, proxy_generated_attribute_string);
sprintf(proxy_generated_attribute_string, "%lld", long_int_attrs[i].ulimit);
proxy_msg_add_string(msg, proxy_generated_attribute_string);
enqueue_event_to_proxy_server(msg);
}
/*-----------------------------------------------------------------------*
* send enums *
*-----------------------------------------------------------------------*/
for (i = 0; i < (sizeof(enum_attrs) / sizeof(enum_launch_attr)); i++) {
/*
* Count the number of enumerations in the enum list. Enumerations are
* delimited by '|' so number of enumerations is number of '|' + 1
*/
cp = enum_attrs[i].enums;
n = 1;
while (*cp != '\0') {
if ((*cp) == '|') {
n = n + 1;
}
cp = cp + 1;
}
/*
* Create the enumeration attribute definition header
*/
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, gui_transmission_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, n + 6);
proxy_msg_add_string(msg, enum_attrs[i].id);
proxy_msg_add_string(msg, "ENUMERATED");
proxy_msg_add_string(msg, enum_attrs[i].short_name);
proxy_msg_add_string(msg, enum_attrs[i].long_name);
proxy_msg_add_int(msg, 0);
proxy_msg_add_string(msg, enum_attrs[i].default_value);
/*
* Append enumerations to message. Since enumerations string is
* a string literal that is illegal to modify, create a working copy
*/
cp = strdup(enum_attrs[i].enums);
cp_save = cp;
for (;;) {
end_cp = strchr(cp, '|');
if (end_cp == NULL) {
proxy_msg_add_string(msg, cp);
break;
} else {
*end_cp = '\0';
proxy_msg_add_string(msg, cp);
cp = end_cp + 1;
}
}
free(cp_save);
/*
* Send the attribute definition
*/
enqueue_event_to_proxy_server(msg);
}
/*-----------------------------------------------------------------------*
* done defining the model *
*-----------------------------------------------------------------------*/
sendOkEvent(gui_transmission_id);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return PROXY_RES_OK;
}
/*************************************************************************
* Proxy command handler - command_start_events *
* *
* In the LoadLeveler world a PTP machine is a cluster - so here we *
*************************************************************************/
int command_start_events(int gui_transmission_id, int nargs, char *args[])
{
/*
* Send the complete machine state to the GUI. Query LoadLeveler
* to get the set of nodes that are part of the cluster (machine)
* and send new node events to the GUI for each node.
*/
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
state_events_active = 1; /* events are now active */
start_events_gui_transmission_id = gui_transmission_id;
/* Create thread to monitor LoadLeveler clusters of machines (these are nodes in a machine in ptp lingo). */
pthread_attr_init(&monitor_LoadLeveler_nodes_thread_attr);
pthread_attr_setdetachstate(&monitor_LoadLeveler_nodes_thread_attr, PTHREAD_CREATE_DETACHED);
pthread_create(&monitor_LoadLeveler_nodes_thread, &monitor_LoadLeveler_nodes_thread_attr, monitor_LoadLeveler_nodes, NULL);
register_thread(monitor_LoadLeveler_nodes_thread);
/* Create thread to monitor LoadLeveler jobs in clusters. */
pthread_attr_init(&monitor_LoadLeveler_jobs_thread_attr);
pthread_attr_setdetachstate(&monitor_LoadLeveler_jobs_thread_attr, PTHREAD_CREATE_DETACHED);
pthread_create(&monitor_LoadLeveler_jobs_thread, &monitor_LoadLeveler_jobs_thread_attr, monitor_LoadLeveler_jobs, NULL);
register_thread(monitor_LoadLeveler_jobs_thread);
print_message(INFO_MESSAGE, "remapped thread id for nodes is %ul->%i and for jobs is %ul->%i\n", monitor_LoadLeveler_nodes_thread, find_thread(monitor_LoadLeveler_nodes_thread), monitor_LoadLeveler_jobs_thread, find_thread(monitor_LoadLeveler_jobs_thread));
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return PROXY_RES_OK;
}
/*************************************************************************
* Proxy command handler - command_submit_job *
* *
* In the proxy we are accepting environment variables to be passed on *
* to submit. These come in as env=VARNAME=VALUE *
* *
* We also accept a finite number of variables to control the cmd file *
* submit. This is the name of the command file and optionally a hex *
* string to be written to the specified commanf file. *
* Hex string only -> write into cmd file in tmp and submit *
* Cmd file only -> submit *
* Hex string and cmd file -> write into specified cmd file and submit *
* It is expected that all other variables are processed in the gui and *
* substituted into a template job command file. *
*************************************************************************/
int command_submit_job(int gui_transmission_id, int nargs, char *args[])
{
int rc = 0;
char *cp = NULL;
char *job_sub_id = NULL;
char *ll_job_command_file = NULL;
char *ll_job_command_file_template = NULL;
char *job_env_vars[1024][3]; /* allow 1024 env vars keyword, new value, old value */
char *ll_ptp_vars[1024][2]; /* allow 1024 ll vars keyword, value */
int i = 0;
int k = 0;
int o = 0;
int match = 0;
int env_count = 0;
int ll_count = 0;
char incoming[2048];
char outgoing[2048];
char submit_temp_file[256];
int submit_temp_file_fd = 0;
FILE *submit_template_FILE = NULL;
int advanced_mode = 1; /* preset to advanced mode */
int bytes_written = 0;
int myerrno = 0;
int submit_failed = 0; /* preset to all ok on this submit */
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
memset(job_env_vars, '\0', sizeof(job_env_vars)); /* zero the area */
memset(submit_temp_file, '\0', sizeof(submit_temp_file)); /* zero the area */
/*-----------------------------------------------------------------------*
* process args passed into the submit command *
*-----------------------------------------------------------------------*/
for (i = 0; args[i] != NULL; i++) {
/*-----------------------------------------------------------------------*
* LoadLeveler keywords *
*-----------------------------------------------------------------------*/
if (strncmp(args[i], "LL_PTP_JOB_COMMAND_FILE=", strlen("LL_PTP_JOB_COMMAND_FILE=")) == 0) {
cp = strchr(args[i], '=');
ll_job_command_file = strdup(cp + 1);
}
if (strncmp(args[i], "LL_PTP_JOB_COMMAND_FILE_TEMPLATE=", strlen("LL_PTP_JOB_COMMAND_FILE_TEMPLATE=")) == 0) {
cp = strchr(args[i], '=');
ll_job_command_file_template = strdup(cp + 1);
}
/*-----------------------------------------------------------------------*
* General PTP launch configuration variables and environment vars *
*-----------------------------------------------------------------------*/
if (strncmp(args[i], JOB_SUB_ID_ATTR "=", strlen(JOB_SUB_ID_ATTR) + 1) == 0) {
cp = strchr(args[i], '=');
job_sub_id = strdup(cp + 1); /* pick up the val part of the job id */
}
if (strncmp(args[i], JOB_ENV_ATTR "=", strlen(JOB_ENV_ATTR) + 1) == 0) {
cp = strchr(args[i], '='); /* pick up the val part of env= which is a keyword=value pair */
cp++; /* first char of keyword=value */
memset(incoming, '\0', sizeof(incoming));
strcpy(incoming, cp); /* pick up into work area for modifications */
cp = strchr(incoming, '='); /* pick up the = in keyword=value pair */
*cp = '\0'; /* make key a string by itself */
cp++; /* first char of value in keyword=value pair */
job_env_vars[env_count][0] = strdup(incoming); /* keyword */
job_env_vars[env_count][1] = strdup(cp); /* new value */
env_count++;
}
/*-----------------------------------------------------------------------*
* Substitution variables (like LL_PTP_xxxxxxxxx) *
*-----------------------------------------------------------------------*/
if ((strncmp(args[i], "LL_PTP_", strlen("LL_PTP_")) == 0) || (strncmp(args[i], "workingDir", strlen("workingDir")) == 0) || (strncmp(args[i], "progArgs", strlen("progArgs")) == 0) || (strncmp(args[i], "execName", strlen("execName")) == 0) || (strncmp(args[i], "execPath", strlen("execPath")) == 0)
) {
memset(incoming, '\0', sizeof(incoming));
strcpy(incoming, args[i]); /* pick up into work area for modifications */
cp = strchr(incoming, '='); /* pick up the = in keyword=value pair */
*cp = '\0'; /* make key a string by itself */
ll_ptp_vars[ll_count][0] = strdup(incoming); /* pick up the keyword */
cp++; /* first char of value in keyword=value pair */
ll_ptp_vars[ll_count][1] = strdup(cp); /* pick up the value */
print_message(INFO_MESSAGE, "Received keyword[%d]=value %s=%s\n", ll_count, ll_ptp_vars[ll_count][0], ll_ptp_vars[ll_count][1]);
if (strcmp(ll_ptp_vars[ll_count][0], "LL_PTP_SUBMIT_MODE") == 0) {
if (strcmp(ll_ptp_vars[ll_count][1], "Basic") == 0) {
advanced_mode = 0; /* no longer advanced mode - now we are basic */
}
}
ll_count++; /* count it */
}
} /* end loop on args passed into submit command */
/*-----------------------------------------------------------------------*
* Handle overrides of LL_PTP_xxxxx variables over PTP variables *
* LL_PTP_INITIALDIR overrides workingDir *
* LL_PTP_EXECUTABLE overrides execPath/execName *
*-----------------------------------------------------------------------*/
for (k = 0; k < ll_count; k++) { /* loop on keyword value pairs */
if (strncmp("LL_PTP_EXECUTABLE", ll_ptp_vars[k][0], strlen(ll_ptp_vars[k][0])) == 0) { /* match so far */
for (i = 0; i < ll_count; i++) { /* loop on keyword value pairs */
if (strncmp("execPath", ll_ptp_vars[i][0], strlen(ll_ptp_vars[i][0])) == 0) {
free(ll_ptp_vars[i][0]);
ll_ptp_vars[i][0] = strdup(" ");
}
if (strncmp("execName", ll_ptp_vars[i][0], strlen(ll_ptp_vars[i][0])) == 0) {
free(ll_ptp_vars[i][0]);
ll_ptp_vars[i][0] = strdup(" ");
}
}
}
if (strncmp("LL_PTP_INITIALDIR", ll_ptp_vars[k][0], strlen(ll_ptp_vars[k][0])) == 0) { /* match so far */
for (i = 0; i < ll_count; i++) { /* loop on keyword value pairs */
if (strncmp("workingDir", ll_ptp_vars[i][0], strlen(ll_ptp_vars[i][0])) == 0) {
free(ll_ptp_vars[i][0]);
ll_ptp_vars[i][0] = strdup(" ");
}
}
}
} /* end loop on keyword value pairs */
/*-----------------------------------------------------------------------*
* Perform substitutions and do submit *
*-----------------------------------------------------------------------*/
if (advanced_mode == 1) { /* if advanced mode submit - no substitutions */
rc = my_ll_submit_job(gui_transmission_id, job_sub_id, ll_job_command_file, job_env_vars);
if (rc == 0) { /* if job submit successful */
print_message(INFO_MESSAGE, "Job %s submitted.\n", ll_job_command_file);
} /* end if job submit successful */
else { /* if job submit error */
print_message(ERROR_MESSAGE, "Job submit failed for job %s.\n", ll_job_command_file);
} /* end if job submit error */
} /* end if advanced mode submit - no substitutions */
else { /* if basic mode submit - substitutions to be performed */
/*-----------------------------------------------------------------------*
* Open a temp file for submitting on /tmp *
*-----------------------------------------------------------------------*/
strcpy(submit_temp_file, "/tmp/ll_ptp_temp_submit_file_");
strcat(submit_temp_file, userid);
strcat(submit_temp_file, "_XXXXXX");
submit_temp_file_fd = mkstemp(submit_temp_file);
/*-----------------------------------------------------------------------*
* Open template file *
*-----------------------------------------------------------------------*/
submit_template_FILE = fopen(ll_job_command_file_template, "r");
if (submit_template_FILE == NULL) {
myerrno = errno;
print_message(ERROR_MESSAGE, "Unable to open submit template file %s, errno=%d\n", ll_job_command_file_template, myerrno);
submit_failed = 1; /* stop procesing the submit */
}
/*-----------------------------------------------------------------------*
* Read in from template and substitute in key=value, if line contains *
* any non substituted then don't write the line back out *
*-----------------------------------------------------------------------*/
if (submit_failed == 0) { /* if still ok to process */
memset(incoming, '\0', sizeof(incoming)); /* zero the area */
while ((fgets(incoming, sizeof(incoming), submit_template_FILE)) != NULL) { /* read line into incoming */
o = 0; /* outgoing line position */
memset(outgoing, '\0', sizeof(outgoing)); /* zero the outgoing area */
for (i = 0; i < strlen(incoming);) { /* loop on incoming line */
match = 0;
if (strlen(&incoming[i]) >= 6) { /* if line left is >= 6 */
if (strncmp(&incoming[i], "<<<", 3) == 0) { /* maybe a match */
for (k = 0; k < ll_count; k++) { /* loop on keyword value pairs */
if (strncmp(&incoming[i + 3], ll_ptp_vars[k][0], strlen(ll_ptp_vars[k][0])) == 0) { /* match so far */
if (strncmp(&incoming[i + 3 + strlen(ll_ptp_vars[k][0])], ">>>", 3) == 0) { /* it is a complete match ! */
strncpy(&outgoing[o], ll_ptp_vars[k][1], strlen(ll_ptp_vars[k][1]));
i = i + 6 + strlen(ll_ptp_vars[k][0]); /* next char in incoming line */
o = o + strlen(ll_ptp_vars[k][1]); /* next char in outgoing line */
match = 1; /* we match on this string in incoming */
break;
} /* end if it is a complete match */
} /* end if match */
} /* end loop on keyword value pairs */
} /* end if maybe a match */
} /* end if line left is >= 7 */
if (match == 0) {
outgoing[o++] = incoming[i++]; /* copy 1 byte over to new line */
}
} /* end loop on incoming line */
if (is_substitution_required(outgoing) == 0) { /* if there are not any <<<XXXXXXXX>>> variables */
bytes_written = write(submit_temp_file_fd, outgoing, strlen(outgoing));
}
/* end loop on keyword value pairs */
myerrno = errno;
memset(incoming, '\0', sizeof(incoming)); /* zero the area for next template read */
}
} /* end if still ok to process */
if (submit_temp_file_fd > 0) {
close(submit_temp_file_fd);
}
if (submit_template_FILE != NULL) {
fclose(submit_template_FILE);
}
if (submit_failed == 0) { /* if still ok to process */
/*-----------------------------------------------------------------------*
* Submit the file *
*-----------------------------------------------------------------------*/
rc = my_ll_submit_job(gui_transmission_id, job_sub_id, submit_temp_file, job_env_vars);
if (rc == 0) { /* if job submit successful */
print_message(INFO_MESSAGE, "Job %s submitted.\n", submit_temp_file);
} /* end if job submit successful */
else { /* if job submit error */
print_message(ERROR_MESSAGE, "Job submit failed for job %s.\n", submit_temp_file);
submit_failed = 1; /* flag as failed */
} /* end if job submit error */
/*-----------------------------------------------------------------------*
* Delete the temporary file *
*-----------------------------------------------------------------------*/
remove(submit_temp_file); /* delete the tmp file */
} /* end if still ok to process */
/*-----------------------------------------------------------------------*
* Cleanup after submit *
*-----------------------------------------------------------------------*/
for (i = 0; i < ll_count; i++) {
free(ll_ptp_vars[i][0]);
ll_ptp_vars[i][0] = NULL;
free(ll_ptp_vars[i][1]);
ll_ptp_vars[i][1] = NULL;
}
if (ll_job_command_file_template != NULL) {
free(ll_job_command_file_template);
ll_job_command_file_template = NULL;
}
if (ll_job_command_file != NULL) {
free(ll_job_command_file);
ll_job_command_file = NULL;
}
} /* end if basic mode */
if (submit_failed == 0) {
sendOkEvent(gui_transmission_id); /* close out the halt events command */
}
else {
sendJobSubmissionErrorEvent(gui_transmission_id, job_sub_id, "LoadLeveler job submit failed.");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return PROXY_RES_OK;
}
/*************************************************************************
* Proxy command handler - command_halt_events *
*************************************************************************/
int command_halt_events(int gui_transmission_id, int nargs, char *args[])
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
sendOkEvent(start_events_gui_transmission_id); /* close out the start events command (kept open for async) */
state_events_active = 0; /* events are now inactive */
sendOkEvent(gui_transmission_id); /* close out the halt events command */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return PROXY_RES_OK;
}
/*************************************************************************
* Proxy command handler - command_cancel_job *
*************************************************************************/
int command_cancel_job(int gui_transmission_id, int nargs, char *args[])
{
int i;
int job_ident = -1;
JobObject *job_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(JOB_ID_ATTR, args[i])) {
job_ident = proxy_get_attribute_value_int(args[i]);
}
}
job_object = get_job_in_list_from_id(job_list, job_ident);
if (job_object != NULL) {
my_ll_terminate_job(gui_transmission_id, job_object);
}
sendOkEvent(gui_transmission_id);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* Proxy command handler - command_terminate *
*************************************************************************/
int command_terminate(int gui_transmission_id, int nargs, char *args[])
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message_args(nargs, args);
state_shutdown_requested = 1;
print_message(INFO_MESSAGE, "Waiting for monitor threads to become inactve.\n");
pthread_mutex_lock(&master_lock);
print_message(INFO_MESSAGE, "Continuing termination request.\n");
print_message(INFO_MESSAGE, "dlclose LoadLeveler shared library %s.\n", ibmll_libpath_name);
rc = dlclose(ibmll_libpath_handle);
if (rc != 0) {
print_message(ERROR_MESSAGE, "dlclose of %s failed with rc=%d.\n", rc);
sendErrorEvent(gui_transmission_id, RTEV_ERROR_LL_INIT, "dlclose failed for LoadLeveler shared library");
sendShutdownEvent(gui_transmission_id);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_unlock(&master_lock);
return PROXY_RES_ERR;
} else {
print_message(INFO_MESSAGE, "dlclose %s successful.\n", ibmll_libpath_name);
sendShutdownEvent(gui_transmission_id);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_unlock(&master_lock);
return PROXY_RES_OK;
}
pthread_mutex_unlock(&master_lock);
}
/*************************************************************************
* Service thread - Loop while allowed to monitor LoadLeveler for nodes *
*************************************************************************/
void *monitor_LoadLeveler_nodes(void *job_ident)
{
int rc = 0;
int i = 0;
char *node_name = NULL;
LL_element *node = NULL;
LL_element *query_elem = NULL;
int node_count = 0;
int sleep_seconds = 30;
int sleep_time_reset = 0; /* if changes this pass */
LL_element *errObj = NULL;
ListElement *cluster_list_element = NULL;
ClusterObject *cluster_object = NULL;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
/*-----------------------------------------------------------------------*
* loop forever until we are told we are shutting down. *
*-----------------------------------------------------------------------*/
while (state_shutdown_requested == 0) { /* loop while not shutting down */
pthread_mutex_lock(&master_lock);
if (state_shutdown_requested == 1) { /* if main task started shutdown and snuck in at the right time */
pthread_mutex_unlock(&master_lock);
break;
}
query_elem = NULL;
node_name = NULL;
node_count = 0;
node = NULL;
LL_cluster_param cluster_parm;
char *remote_cluster[2];
Hash *node_hash = NULL;
HashEntry *hash_element = NULL;
List *node_list = NULL;
ListElement *node_list_element = NULL;
sleep_time_reset = 0; /* preset for this loop that no changes were detected */
print_message(TRACE_MESSAGE, ">>> %s thread running. line=%d.\n", __FUNCTION__, __LINE__);
if (cluster_list == NULL) {
refresh_cluster_list(); /* insure we have latest cluster configuration from LoadLeveler */
}
if (cluster_list != NULL) { /* if we have clusters, nodes, etc. */
/*-----------------------------------------------------------------------*
* loop on the cluster list we obtained earlier from LoadLeveler. *
*-----------------------------------------------------------------------*/
cluster_list_element = cluster_list->l_head;
while (cluster_list_element != NULL) { /* while we have a cluster to query */
cluster_object = cluster_list_element->l_value; /* get our cluster object from this list element */
cluster_list_element = cluster_list_element->l_next; /* prepare for next pass */
if (cluster_object->node_hash->count <= 0) { /* if no nodes in this cluster */
sleep_time_reset = 1; /* we need to keep looking for nodes, etc on short interval */
}
if (multicluster_status == 1) { /* if running multicluster */
/*-----------------------------------------------------------------------*
* we are running multicluster - set cluster name into environment *
* to influence where LoadLeveler searches for data (what cluster) *
*-----------------------------------------------------------------------*/
remote_cluster[0] = cluster_object->cluster_name;
remote_cluster[1] = NULL;
print_message(INFO_MESSAGE, "Setting access for LoadLeveler cluster=%s.\n", cluster_object->cluster_name);
cluster_parm.action = CLUSTER_SET; /* we are setting the cluster for remote access */
cluster_parm.cluster_list = remote_cluster; /* cluster name we want data from */
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* set the cluster name */
} /* end if this is not a local cluster */
else { /* this is a local cluster */
/*-----------------------------------------------------------------------*
* not running multicluster *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET; /* we are unsetting the cluster */
cluster_parm.cluster_list = NULL;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* unset the cluster name - back to local */
} /* end if this is a local cluster */
/*-----------------------------------------------------------------------*
* build a LoadLeveler query object (for nodes) *
*-----------------------------------------------------------------------*/
query_elem = my_ll_query(MACHINES);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE, "Unable to obtain query element. LoadLeveler may not be active or is not responding.\n");
continue;
}
/*-----------------------------------------------------------------------*
* set the request type for LoadLeveler (we want nodes) *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for nodes in cluster=%s.\n", cluster_object->cluster_name);
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
/*-----------------------------------------------------------------------*
* get nodes from LoadLeveler for current or local cluster. *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for nodes in cluster=%s.\n", cluster_object->cluster_name);
node = my_ll_get_objs(query_elem, LL_CM, NULL, &node_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Nodes=%d in cluster=%s.\n", node_count, cluster_object->cluster_name);
/*-----------------------------------------------------------------------*
* loop on the nodes returned by LoadLeveler *
*-----------------------------------------------------------------------*/
i = 0;
while (node != NULL) { /* node points to the first or next node in the list from LoadLeveler */
print_message(INFO_MESSAGE, "LoadLeveler Node %d:\n", i);
rc = my_ll_get_data(node, LL_MachineName, &node_name);
if (rc == 0) { /* do something here with the node object */
print_message(INFO_MESSAGE, "Node name=%s\n", node_name);
if ((node_object = get_node_in_hash(cluster_object->node_hash, node_name)) != NULL) { /* existing node */
/*-----------------------------------------------------------------------*
* node returned by LoadLeveler was found in our ptp node list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
node_object->node_found = 1; /* flag it as found */
if (node_object->node_state != MY_STATE_UP) {
node_object->node_state = MY_STATE_UP;
sleep_time_reset = 1; /* we have a change this pass */
print_message(INFO_MESSAGE, "Schedule event notification: node=%s changed for LoadLeveler Cluster=%s.\n", node_name, cluster_object->cluster_name);
sendNodeChangeEvent(start_events_gui_transmission_id, cluster_object, node_object);
}
} else { /* new node (not yet in list) */
/*-----------------------------------------------------------------------*
* node returned by LoadLeveler was not found in our ptp node list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
node_object = (NodeObject *) malloc(sizeof(NodeObject));
malloc_check(node_object, __FUNCTION__, __LINE__);
memset(node_object, '\0', sizeof(node_object)); /* zero the malloc area */
node_object->proxy_generated_node_id = generate_id(); /* a unique identifier for this cluster */
node_object->node_name = strdup(node_name);
node_object->node_found = 2; /* flag it as added */
node_object->node_state = MY_STATE_UP;
sleep_time_reset = 1; /* we have a change this pass */
add_node_to_hash(cluster_object->node_hash, (void *) node_object); /* add the new node object to the hash */
sleep_time_reset = 1; /* we have a change this pass */
print_message(INFO_MESSAGE, "Schedule event notification: node=%s added for LoadLeveler Cluster=%s.\n", node_name, cluster_object->cluster_name);
sendNodeAddEvent(start_events_gui_transmission_id, cluster_object, node_object);
} /* end if new node */
} /* we got a node name ok */
i++;
node = my_ll_next_obj(query_elem);
} /* end while we have a node */
/*-----------------------------------------------------------------------*
* loop on the ptp node list to see if any nodes were not returned *
* by LoadLeveler on this pass (maybe they went down). *
* generate an event (changed/gone) to the gui. *
*-----------------------------------------------------------------------*/
node_hash = cluster_object->node_hash; /* pick up the node hash */
if (node_hash != NULL) { /* if we have a node hash table ok */
HashSet(node_hash); /* position to beginning of the hash table */
hash_element = HashGet(node_hash); /* get next hash table entry */
while (hash_element != NULL) {
node_list = (List *) hash_element->h_data; /* the hash entry is a list of node objects */
hash_element = HashGet(node_hash); /* get next hash table entry */
node_list_element = node_list->l_head; /* first in list */
while (node_list_element != NULL) { /* while we have a node to query */
node_object = node_list_element->l_value; /* get our node object from this list element */
node_list_element = node_list_element->l_next; /* prepare for next pass */
if (node_object->node_found == 0) { /* if not found - node went away */
if (node_object->node_state != MY_STATE_UNKNOWN) {
node_object->node_state = MY_STATE_UNKNOWN;
print_message(INFO_MESSAGE, "Schedule event notification: node=%s changed for LoadLeveler Cluster=%s.\n", node_name, cluster_object->cluster_name);
sendNodeChangeEvent(start_events_gui_transmission_id, cluster_object, node_object);
sleep_time_reset = 1; /* we have a change this pass */
}
} else { /* node was found or added this pass */
node_object->node_found = 0; /* reset for next pass */
}
} /* end while we have node list element to look at */
} /* end while we still have hash table entry */
} /* end if we have a node hash table ok */
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
} /* end while we have a cluster to query */
} /* end if we have cluster list, nodes, etc. */
else {
sleep_time_reset = 1; /* we need to keep looking for nodes, etc */
}
pthread_mutex_unlock(&master_lock);
/*-----------------------------------------------------------------------*
* adjust sleep interval based on changes this pass. *
*-----------------------------------------------------------------------*/
if (sleep_time_reset == 1) { /* yes - there was activity */
sleep_seconds = min_node_sleep_seconds; /* set to min sleep interval */
} /* end if activity this pass */
else { /* if no activity this pass */
sleep_seconds = sleep_seconds + min_node_sleep_seconds;
if (sleep_seconds > max_node_sleep_seconds) { /* if max exceeded */
sleep_seconds = max_node_sleep_seconds; /* reset to max sleep interval */
} /* end if max exceeded */
} /* end if no activity this pass */
/*-----------------------------------------------------------------------*
* sleep and loop again on the LoadLeveler machines. *
*-----------------------------------------------------------------------*/
if (state_shutdown_requested == 0) {
int sleep_interval = 0;
int mini_sleep_interval = (sleep_seconds + 4) / 5;
print_message(INFO_MESSAGE, "%s Sleeping for (%d seconds) %d intervals of 5 seconds\n", __FUNCTION__, mini_sleep_interval * 5, mini_sleep_interval);
for (sleep_interval = 0; sleep_interval < mini_sleep_interval; sleep_interval++) {
if (state_shutdown_requested == 0) {
sleep(5);
}
}
}
} /* end while we are not shutting down */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return NULL;
}
/*************************************************************************
* Service thread - Loop while allowed to monitor LoadLeveler for jobs *
*************************************************************************/
void *monitor_LoadLeveler_jobs(void *job_ident)
{
int rc = 0;
int i = 0;
char *job_name = NULL;
char *job_submit_host = NULL;
char *step_ID = NULL;
LL_STEP_ID ll_step_id;
int step_machine_count = 0;
LL_element *job = NULL;
LL_element *step = NULL;
LL_element *query_elem = NULL;
LL_element *node = NULL;
LL_element *task = NULL;
LL_element *task_instance = NULL;
int job_count = 0;
LL_element *errObj = NULL;
LL_cluster_param cluster_parm;
char *remote_cluster[2];
ListElement *job_list_element = NULL;
List *task_list = NULL;
ListElement *task_list_element = NULL;
char *task_instance_machine_name = NULL;
char *task_instance_machine_address = NULL;
int task_instance_task_ID = 0;
int step_node_count = 0;
int node_task_count = 0;
int task_instance_count = 0;
time_t my_clock;
ListElement *cluster_list_element = NULL;
ClusterObject *cluster_object = NULL;
JobObject *job_object = NULL;
TaskObject *task_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
/*-----------------------------------------------------------------------*
* loop forever until we are told we are shutting down. *
*-----------------------------------------------------------------------*/
while (state_shutdown_requested == 0) { /* loop while not shutting down */
pthread_mutex_lock(&master_lock);
if (state_shutdown_requested == 1) { /* if main task started shutdown and snuck in at the right time */
pthread_mutex_unlock(&master_lock);
break;
}
query_elem = NULL;
job_name = NULL;
step_ID = NULL;
job_count = 0;
step_machine_count = 0;
job = NULL;
char *pChar = NULL;
print_message(TRACE_MESSAGE, ">>> %s thread running. line=%d.\n", __FUNCTION__, __LINE__);
if (cluster_list != NULL) { /* if we have clusters, nodes, etc. from monitor_LoadLeveler_nodes thread */
/*-----------------------------------------------------------------------*
* loop on the cluster list we obtained earlier from LoadLeveler. *
*-----------------------------------------------------------------------*/
cluster_list_element = cluster_list->l_head;
while (cluster_list_element != NULL) { /* while we have a cluster to query */
cluster_object = cluster_list_element->l_value; /* get our cluster object from this list element */
cluster_list_element = cluster_list_element->l_next; /* prepare for next pass */
if (cluster_object != NULL) { /* if we have a cluster object */
if (cluster_object->node_hash != NULL) { /* if we have a node has table to look at */
if (cluster_object->node_hash->count > 0) { /* if we have nodes in this cluster */
if (multicluster_status == 1) { /* if running multicluster */
/*-----------------------------------------------------------------------*
* we are running multicluster - set cluster name into environment *
* to influence where LoadLeveler searches for data (what cluster) *
*-----------------------------------------------------------------------*/
remote_cluster[0] = cluster_object->cluster_name;
remote_cluster[1] = NULL;
print_message(INFO_MESSAGE, "Setting access for LoadLeveler cluster=%s.\n", cluster_object->cluster_name);
cluster_parm.action = CLUSTER_SET; /* we are setting the cluster for remote access */
cluster_parm.cluster_list = remote_cluster; /* cluster name we want data from */
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* set the cluster name */
} /* end if this is not a local cluster */
else { /* this is a local cluster */
/*-----------------------------------------------------------------------*
* not running multicluster *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET; /* we are unsetting the cluster */
cluster_parm.cluster_list = NULL;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* unset the cluster name - back to local */
} /* end if this is a local cluster */
/*-----------------------------------------------------------------------*
* build a LoadLeveler query object (for jobs) *
*-----------------------------------------------------------------------*/
query_elem = my_ll_query(JOBS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE, "Unable to obtain query element. LoadLeveler may not be active or is not responding.\n");
continue;
}
/*-----------------------------------------------------------------------*
* set the request type for LoadLeveler (we want nodes) *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for jobs in cluster=%s.\n", cluster_object->cluster_name);
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
/*-----------------------------------------------------------------------*
* get jobs from LoadLeveler for current or local cluster. *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for jobs in cluster=%s.\n", cluster_object->cluster_name);
job_count = 0; /* preset to no jobs found in cluster */
job = my_ll_get_objs(query_elem, LL_CM, NULL, &job_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Jobs=%d in cluster=%s.\n", job_count, cluster_object->cluster_name);
/*-----------------------------------------------------------------------*
* loop on the jobs returned by LoadLeveler *
*-----------------------------------------------------------------------*/
i = 0;
while (job != NULL) { /* job points to the first or next job in the list from LoadLeveler */
print_message(INFO_MESSAGE, "LoadLeveler Job %d:\n", i);
rc = my_ll_get_data(job, LL_JobSubmitHost, &job_submit_host);
rc = my_ll_get_data(job, LL_JobName, &job_name);
if (rc == 0) { /* do something here with the job object */
print_message(INFO_MESSAGE, "Job name=%s\n", job_name);
rc = my_ll_get_data(job, LL_JobGetFirstStep, &step); /* get a job step */
while (step != NULL) { /* while we have a step */
step_machine_count = 0; /* pre assume job not running */
rc = my_ll_get_data(step, LL_StepID, &step_ID);
if (rc == 0) { /* do something here with the step object */
/*-----------------------------------------------------------------------*
* break the job step name apart into a LoadLeveler LL_STEP_ID *
*-----------------------------------------------------------------------*/
ll_step_id.from_host = strdup(job_submit_host);
pChar = step_ID + strlen(job_submit_host) + 1; /* The next segment is the cluster or job number */
pChar = strtok(pChar, ".");
ll_step_id.cluster = atoi(pChar); /* The last token is the proc or step number */
pChar = strtok(NULL, ".");
ll_step_id.proc = atoi(pChar);
print_message(INFO_MESSAGE, "Job step ID=%s.%d.%d\n", ll_step_id.from_host, ll_step_id.cluster, ll_step_id.proc);
if ((job_object = get_job_in_list(job_list, ll_step_id)) != NULL) { /* existing step */
/*-----------------------------------------------------------------------*
* step returned by LoadLeveler was found in our ptp job list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
job_object->job_found = 1; /* flag it as found */
if (job_object->job_state == MY_STATE_UNKNOWN) {
job_object->job_state = MY_STATE_IDLE;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
}
if (multicluster_status == 1) { /* if running multicluster */
if (strcmp(job_object->cluster_name, cluster_object->cluster_name) != 0) { /* if job moved */
sendJobRemoveEvent(start_events_gui_transmission_id, job_object);
job_object->cluster_name = cluster_object->cluster_name; /* pick up new cluster */
sendJobAddEvent(start_events_gui_transmission_id, cluster_object, job_object);
} /* if job moved */
} /* end if running multicluster */
} else { /* new job (not yet in list) */
/*-----------------------------------------------------------------------*
* job returned by LoadLeveler was not found in our ptp job list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
job_object = (JobObject *) malloc(sizeof(JobObject));
malloc_check(job_object, __FUNCTION__, __LINE__);
memset(job_object, '\0', sizeof(job_object)); /* zero the malloc area */
job_object->proxy_generated_job_id = generate_id(); /* a unique identifier for this cluster */
job_object->gui_assigned_job_id = "-1"; /* unsolicited job from proxy */
job_object->ll_step_id.from_host = strdup(ll_step_id.from_host);
job_object->ll_step_id.cluster = ll_step_id.cluster;
job_object->ll_step_id.proc = ll_step_id.proc;
job_object->job_found = 2; /* flag it as newly added */
job_object->job_state = MY_STATE_IDLE;
job_object->task_list = NewList(); /* list to hold tasks for this job step */
job_object->cluster_name = strdup(cluster_object->cluster_name);
add_job_to_list(job_list, (void *) job_object); /* add the new job object to the list */
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d added for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobAddEvent(start_events_gui_transmission_id, cluster_object, job_object);
} /* end if new node */
rc = my_ll_get_data(step, LL_StepNodeCount, &step_node_count); /* nodes running on */
print_message(INFO_MESSAGE, "Step=%s.%d.%d. StepNodeCount=%d.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, step_node_count);
/*-----------------------------------------------------------------------*
* if this job from LoadLeveler has nodes (is running-like) then loop on *
* the nodes to see task status for new or existing tasks. *
*-----------------------------------------------------------------------*/
if (step_node_count > 0) { /* if job step is in running-like state (is using nodes) */
rc = my_ll_get_data(step, LL_StepGetFirstNode, &node); /* node */
/*-----------------------------------------------------------------------*
* loop on the nodes in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (node != NULL) { /* while we have a valid LoadLeveler node */
rc = my_ll_get_data(node, LL_NodeTaskCount, &node_task_count); /* tasks running on node */
print_message(INFO_MESSAGE, "NodeTaskCount=%d.\n", node_task_count);
rc = my_ll_get_data(node, LL_NodeGetFirstTask, &task); /* task */
/*-----------------------------------------------------------------------*
* loop on the tasks in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (task != NULL) {
rc = my_ll_get_data(task, LL_TaskTaskInstanceCount, &task_instance_count); /* task instances */
print_message(INFO_MESSAGE, "TaskInstanceCount=%d.\n", task_instance_count);
rc = my_ll_get_data(task, LL_TaskGetFirstTaskInstance, &task_instance); /* task instances */
/*-----------------------------------------------------------------------*
* loop on the task_instances in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (task_instance != NULL) {
rc = my_ll_get_data(task_instance, LL_TaskInstanceMachineName, &task_instance_machine_name); /* machine name */
rc = my_ll_get_data(task_instance, LL_TaskInstanceMachineAddress, &task_instance_machine_address); /* machine name */
rc = my_ll_get_data(task_instance, LL_TaskInstanceTaskID, &task_instance_task_ID); /* task id */
print_message(INFO_MESSAGE, "TaskInstanceMachineName=%s. TaskInstanceMachineAddress=%s. TaskInstanceTaskID=%d.\n", task_instance_machine_name, task_instance_machine_address, task_instance_task_ID);
if ((task_object = get_task_in_list(job_object->task_list, task_instance_machine_name, task_instance_task_ID)) != NULL) { /* existing task */
/*-----------------------------------------------------------------------*
* task returned by LoadLeveler was found in our ptp job task list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
task_object->ll_task_id = task_instance_task_ID;
task_object->task_found = 1; /* flag it as found */
if (task_object->task_state != MY_STATE_RUNNING) {
task_object->task_state = MY_STATE_RUNNING;
print_message(INFO_MESSAGE, "Schedule event notification: Task_ID=%d running on node_name=%s added for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n", task_object->ll_task_id, task_object->node_name, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendTaskChangeEvent(start_events_gui_transmission_id, job_object, task_object);
}
if (job_object->job_state != MY_STATE_RUNNING) {
job_object->job_state = MY_STATE_RUNNING;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
}
} else { /* new task (not yet in list) */
/*-----------------------------------------------------------------------*
* task returned by LoadLeveler was not found in our ptp job task list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
task_object = (TaskObject *) malloc(sizeof(TaskObject));
malloc_check(task_object, __FUNCTION__, __LINE__);
memset(task_object, '\0', sizeof(task_object)); /* zero the malloc area */
task_object->proxy_generated_task_id = generate_id(); /* a unique identifier for this cluster */
task_object->ll_task_id = task_instance_task_ID;
task_object->node_name = strdup(task_instance_machine_name);
task_object->node_address = strdup(task_instance_machine_address);
task_object->task_found = 2; /* flag it as added */
task_object->task_state = MY_STATE_RUNNING;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
if (job_object->job_state != MY_STATE_RUNNING) {
job_object->job_state = MY_STATE_RUNNING;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
}
add_task_to_list(job_object->task_list, (void *) task_object); /* add the new task object to the list */
print_message(INFO_MESSAGE, "Schedule event notification: Task_ID=%d running on node_name=%s added for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n", task_object->ll_task_id, task_object->node_name, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendTaskAddEvent(start_events_gui_transmission_id, cluster_object, job_object, task_object);
} /* end if new node */
rc = my_ll_get_data(task, LL_TaskGetNextTaskInstance, &task_instance); /* task instances */
}
rc = my_ll_get_data(node, LL_NodeGetNextTask, &task); /* task */
}
rc = my_ll_get_data(step, LL_StepGetNextNode, &node); /* node */
} /* end while we have a valid LoadLeveler node */
} /* end if job step is in running-like state (is using nodes) */
/*-----------------------------------------------------------------------*
* loop on the tasks in the job object - if any not found that were *
* present before then generate deleted task events. *
*-----------------------------------------------------------------------*/
task_list = (List *) job_object->task_list; /* the task list in the job */
task_list_element = task_list->l_head; /* first in list */
while (task_list_element != NULL) { /* while we have a task to query */
task_object = task_list_element->l_value; /* get the task object (if any) */
task_list_element = task_list_element->l_next; /* prepare for next pass */
if (task_object != 0) { /* if we have a task object */
if (task_object->task_found == 0) { /* if not found or added this pass */
task_object->task_state = MY_STATE_TERMINATED;
task_object->task_found = 0; /* reset for next pass */
print_message(INFO_MESSAGE, "Schedule event notification: Task_ID=%d running on node_name=%s deleted for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n", task_object->ll_task_id, task_object->node_name, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendTaskRemoveEvent(start_events_gui_transmission_id, job_object, task_object);
delete_task_from_list(task_list, task_object); /* clean this task object and delete from the job */
if (SizeOfList(task_list) == 0) {
if (job_object->job_state == MY_STATE_RUNNING) {
job_object->job_state = MY_STATE_TERMINATED; /* job is done */
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d terminated for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
/* NO print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d deleted for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name); */
/* NO sendJobRemoveEvent(start_events_gui_transmission_id, job_object); */
} else {
job_object->job_state = MY_STATE_IDLE;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
}
}
} else { /* it was found or added this pass */
task_object->task_found = 0; /* reset for next pass */
}
} /* end if we have a task object */
} /* end while we have task list element to look at */
rc = my_ll_get_data(job, LL_JobGetNextStep, &step); /* get another job step */
}
} /* end while we have a step */
} /* we got a job name ok */
i++;
job = my_ll_next_obj(query_elem);
} /* end while we have a job */
} /* end if we have nodes in this cluster */
} /* end if we have a node table in this cluster */
} /* end if we have a cluster object */
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
} /* end while we have a cluster to query */
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
/*-----------------------------------------------------------------------*
* get the time and see if job has been sitting in submitted state too *
* long. *
*-----------------------------------------------------------------------*/
time(&my_clock); /* what time is it ? */
/*-----------------------------------------------------------------------*
* loop on the ptp job list to see if any jobs were not returned *
* by LoadLeveler on this pass (maybe they went down). *
* generate an event (changed/gone) to the gui. *
*-----------------------------------------------------------------------*/
if (job_list != NULL) { /* if we have a job list ok */
job_list_element = job_list->l_head; /* first in list */
while (job_list_element != NULL) { /* while we have a job to query */
job_object = job_list_element->l_value; /* get our job object from this list element */
job_list_element = job_list_element->l_next; /* prepare for next pass */
if ((job_object->job_found == 0) && /* if job not found */
((job_object->job_state != MY_STATE_UNKNOWN) || /* and was running */
((my_clock - job_object->job_submit_time) > 300))) { /* or never queued after time limit (seconds) */
job_object->job_found = 0; /* reset for next pass */
/*-----------------------------------------------------------------------*
* loop on the tasks in the job object - send deleted event and mark *
* all deleted. *
*-----------------------------------------------------------------------*/
task_list = (List *) job_object->task_list; /* the task list in the job */
task_list_element = task_list->l_head; /* first in list */
while (task_list_element != NULL) { /* while we have a task to delete */
task_object = task_list_element->l_value; /* get the task object (if any) */
task_list_element = task_list_element->l_next; /* prepare for next pass */
if (task_object != 0) { /* if we have a task object */
print_message(INFO_MESSAGE, "Schedule event notification: Task_ID=%d deleted on node_name=%s deleted for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n", task_object->ll_task_id, task_object->node_name, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendTaskRemoveEvent(start_events_gui_transmission_id, job_object, task_object);
delete_task_from_list(task_list, task_object); /* clean this task object and delete from the job */
if (SizeOfList(task_list) == 0) {
job_object->job_state = MY_STATE_IDLE;
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
}
} /* end if we have a task object */
} /* end while we have task list element to look at */
job_object->job_state = MY_STATE_TERMINATED; /* job is done */
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d terminated for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobChangeEvent(start_events_gui_transmission_id, job_object);
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d deleted for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobRemoveEvent(start_events_gui_transmission_id, job_object);
delete_job_from_list(job_list, job_object); /* clean this job object and delete from its cluster */
} /* end if not found and was not previously in queue */
else { /* job was found or not found but hasn't made it to queue yet */
job_object->job_found = 0; /* reset for next pass */
} /* end if job was found or not found but hasn't made it to queue yet */
} /* end while we have job list element to look at */
} /* end while we have a job list table */
}
/* end if we have a clusters, nodes, etc */
pthread_mutex_unlock(&master_lock);
/*-----------------------------------------------------------------------*
* sleep and loop again on the LoadLeveler machines. *
*-----------------------------------------------------------------------*/
if (state_shutdown_requested == 0) {
int sleep_interval = 0;
int mini_sleep_interval = (job_sleep_seconds + 4) / 5;
print_message(INFO_MESSAGE, "%s Sleeping for (%d seconds) %d intervals of 5 seconds\n", __FUNCTION__, mini_sleep_interval * 5, mini_sleep_interval);
for (sleep_interval = 0; sleep_interval < mini_sleep_interval; sleep_interval++) {
if (state_shutdown_requested == 0) {
sleep(5);
}
}
}
} /* end while we are not shutting down */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return NULL;
}
/*************************************************************************
* Retrieve a list of LoadLeveler clusters. *
* If no multicluster environment then return a list of 1 cluster. *
*************************************************************************/
void refresh_cluster_list()
{
int rc = 0;
int i = 0;
LL_element *query_elem = NULL;
LL_element *cluster = NULL;
int cluster_count = 0;
char *cluster_name = NULL;
int cluster_local = 0;
ClusterObject *cluster_object = NULL;
char workarea[512];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if ((state_shutdown_requested == 0) && (state_events_active == 1)) { /* if not shutting down */
if (multicluster_status == -1) { /* if state not yet determined */
multicluster_status = get_multicluster_status(); /* are we running multicluster? */
}
/* end if state not yet determined */
switch (multicluster_status) {
/*-----------------------------------------------------------------------*
* no contact with loadleveler yet *
*-----------------------------------------------------------------------*/
case -1:
break;
/*-----------------------------------------------------------------------*
* single cluster and multi cluster *
*-----------------------------------------------------------------------*/
case 0: /* single cluster */
if (cluster_list == NULL) { /* if no list obtained yet */
cluster_list = NewList(); /* create a new cluster list */
job_list = NewList(); /* create a new job list */
}
/* end if no list obtained yet */
print_message(INFO_MESSAGE, "Number of LoadLeveler Clusters=0 (not running multicluster).\n");
i = 0;
cluster_object = (ClusterObject *) malloc(sizeof(ClusterObject));
malloc_check(cluster_object, __FUNCTION__, __LINE__);
memset(cluster_object, '\0', sizeof(cluster_object)); /* zero the malloc area */
cluster_object->proxy_generated_cluster_id = generate_id(); /* a unique identifier for this cluster */
cluster_object->proxy_generated_queue_id = generate_id(); /* a unique identifier for this cluster */
cluster_object->cluster_state = MY_STATE_UP; /* mark cluster object created */
cluster_object->queue_state = MY_STATE_UP; /* mark queue object created */
memset(workarea, '\0', sizeof(workarea)); /* zero the area */
strcpy(workarea,"Local@");
strcat(workarea,hostname);
strcat(workarea," (not multicluster)");
cluster_object->cluster_name = strdup(workarea); /* first cluster in the list is the local cluster (noname) */
cluster_object->cluster_is_local = 1; /* this is a local cluster */
cluster_object->node_hash = HashCreate(1024);
print_message(INFO_MESSAGE, "Cluster name=%s\n", cluster_name);
AddToList(cluster_list, (void *) cluster_object); /* add the new cluster object to the list */
print_message(INFO_MESSAGE, "Send event notification: PTP Machine added for LoadLeveler Cluster=%s.\n", cluster_name);
sendMachineAddEvent(start_events_gui_transmission_id, cluster_object);
print_message(INFO_MESSAGE, "Send event notification: PTP Queue added for LoadLeveler Cluster=%s.\n", cluster_name);
sendQueueAddEvent(start_events_gui_transmission_id, cluster_object);
break;
/*-----------------------------------------------------------------------*
* multicluster *
*-----------------------------------------------------------------------*/
case 1: /* multicluster */
if (cluster_list == NULL) { /* if no list obtained yet */
cluster_list = NewList(); /* create a new cluster list */
job_list = NewList(); /* create a new job list */
}
/* end if no list obtained yet */
query_elem = NULL;
query_elem = my_ll_query(MCLUSTERS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE, "Unable to obtain query element. LoadLeveler may not be active.\n");
multicluster_status = -1; /* we need to come in here again */
return;
}
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for clusters.\n");
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1; /* we need to come in here again */
return;
}
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for clusters.\n");
cluster = my_ll_get_objs(query_elem, LL_SCHEDD, NULL, &cluster_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1; /* we need to come in here again */
return;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Clusters=%d.\n", cluster_count);
i = 0;
while ((cluster != NULL) && (query_elem != NULL)) {
print_message(INFO_MESSAGE, "Cluster %d:\n", i);
rc = my_ll_get_data(cluster, LL_MClusterName, &cluster_name);
if (rc != 0) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1; /* we need to come in here again */
return;
} else {
cluster_object = (ClusterObject *) malloc(sizeof(ClusterObject));
malloc_check(cluster_object, __FUNCTION__, __LINE__);
memset(cluster_object, '\0', sizeof(cluster_object)); /* zero the malloc area */
cluster_object->proxy_generated_cluster_id = generate_id(); /* a unique identifier for this cluster */
cluster_object->proxy_generated_queue_id = generate_id(); /* a unique identifier for this cluster */
cluster_object->cluster_state = MY_STATE_UP; /* mark cluster object created */
cluster_object->queue_state = MY_STATE_UP; /* mark queue object created */
rc = my_ll_get_data(cluster, LL_MClusterLocal, &cluster_local);
cluster_object->cluster_name = strdup(cluster_name); /* first cluster in the list is the local cluster (noname) */
if (cluster_local == 1) {
cluster_object->cluster_is_local = 1; /* this is a local cluster */
}
cluster_object->node_hash = HashCreate(1024);
print_message(INFO_MESSAGE, "Cluster name=%s\n", cluster_name);
AddToList(cluster_list, (void *) cluster_object); /* add the new cluster object to the list */
print_message(INFO_MESSAGE, "Send event notification: PTP Machine added for LoadLeveler Cluster=%s.\n", cluster_name);
sendMachineAddEvent(start_events_gui_transmission_id, cluster_object);
print_message(INFO_MESSAGE, "Send event notification: PTP Queue added for LoadLeveler Cluster=%s.\n", cluster_name);
sendQueueAddEvent(start_events_gui_transmission_id, cluster_object);
}
i++;
cluster = my_ll_next_obj(query_elem);
}
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
}
query_elem = NULL;
break;
} /* end switch */
} /* end if not shutdown */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return;
}
/*************************************************************************
* Send OK Event to front end GUI *
*************************************************************************/
void sendOkEvent(int gui_transmission_id)
{
proxy_msg *msg;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
msg = proxy_ok_event(gui_transmission_id);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* Send Shutdown Event to front end GUI *
*************************************************************************/
void sendShutdownEvent(int gui_transmission_id)
{
proxy_msg *msg;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
msg = proxy_shutdown_event(gui_transmission_id);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* Send ERROR Event to front end GUI (error type and message) *
*************************************************************************/
void sendErrorEvent(int gui_transmission_id, int type, char *msgtext)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. type=%d. message=%s.\n", __FUNCTION__, __LINE__, type, msgtext);
/*
* Send an error message to the front end
*/
proxy_msg *msg;
msg = new_proxy_msg(PROXY_EV_ERROR, gui_transmission_id);
proxy_msg_add_int(msg, type);
proxy_msg_add_string(msg, msgtext);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return;
}
/*************************************************************************
* Send job submission ERROR Event to front end GUI (error type and *
* message) *
*************************************************************************/
void sendJobSubmissionErrorEvent(int gui_transmission_id, char *subid, char *msgtext)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%s. message=%s.\n", __FUNCTION__, __LINE__, subid, msgtext);
/*
* Send an error message to the front end
*/
proxy_msg *msg;
msg = proxy_submitjob_error_event(gui_transmission_id, subid, 0, msgtext);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return;
}
/*************************************************************************
* Send Configuration panel integer attribute Event to front end GUI *
*************************************************************************/
static int sendAttrDefIntEvent(int gui_transmission_id, char *attribute_id, char *attribute_shortname, char *attribute_description, int attribute_show, int attribute_default_value, int attribute_llimit, int attribute_ulimit)
{
proxy_msg *msg;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%s. name=%s. description=%s. default=%d. value1=%d. value2=%d.\n", __FUNCTION__, __LINE__, attribute_id, attribute_shortname, attribute_description, attribute_default_value, attribute_llimit, attribute_ulimit);
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, gui_transmission_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, 8);
proxy_msg_add_string(msg, attribute_id);
proxy_msg_add_string(msg, "INTEGER");
proxy_msg_add_string(msg, attribute_shortname);
proxy_msg_add_string(msg, attribute_description);
proxy_msg_add_int(msg, attribute_show);
proxy_msg_add_int(msg, attribute_default_value);
proxy_msg_add_int(msg, attribute_llimit);
proxy_msg_add_int(msg, attribute_ulimit);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* Send Configuration panel string attribute Event to front end GUI *
*************************************************************************/
static int sendAttrDefStringEvent(int gui_transmission_id, char *attribute_id, char *attribute_shortname, char *attribute_description, int attribute_show, char *attribute_value)
{
proxy_msg *msg;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%s. name=%s. description=%s. value=%s.\n", __FUNCTION__, __LINE__, attribute_id, attribute_shortname, attribute_description, attribute_value);
msg = proxy_attr_def_string_event(gui_transmission_id, attribute_id, attribute_shortname, attribute_description, attribute_show, attribute_value);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* Determine if LoadLeveler is configured multicluster. *
* If any errors then default to local. This code will not be called *
* if the user has forced us to run local only or multicluster mode. *
*************************************************************************/
int get_multicluster_status()
{
int rc = 0;
int i = 0;
LL_element *query_elem = NULL;
LL_element *cluster = NULL;
int cluster_count = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if ((state_shutdown_requested == 0) && (state_events_active == 1)) { /* if not shutting down and allowed to process events */
if (multicluster_status == -1) { /* if state not yet determined */
query_elem = NULL;
query_elem = my_ll_query(CLUSTERS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE, "Unable to obtain query element. LoadLeveler may not be active.\n");
return -1;
}
/* Get information relating to LoadLeveler clusters.
* QUERY_ALL: we are querying for local cluster data
* NULL: no filter needed
* ALL_DATA: we want all the information available about the cluster */
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for cluster.\n");
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for cluster.\n");
cluster = my_ll_get_objs(query_elem, LL_CM, NULL, &cluster_count, &rc);
if (rc < 0) {
rc = my_ll_deallocate(query_elem);
return -1;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Clusters=%d.\n", cluster_count);
i = 0;
if (cluster != NULL) {
print_message(INFO_MESSAGE, "Cluster %d:\n", i);
rc = my_ll_get_data(cluster, LL_ClusterMusterEnvironment, &multicluster_status);
if (rc != 0) {
print_message(ERROR_MESSAGE, "Error rc=%d trying to determine if LoadLeveler is running local or multicluster configuration. Defaulting to local cluster only (no multicluster).\n", rc);
multicluster_status = 0; /* default to local */
} else {
print_message(INFO_MESSAGE, "Multicluster returned is = %d.\n", multicluster_status);
}
}
/* First we need to release the individual objects that were */
/* obtained by the query */
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
} /* end if state not yet determined */
} /* end if not shutdown */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return multicluster_status;
}
/*************************************************************************
* send cluster added event to gui (a cluster to LoadLeveler is a *
* PTP machine to the gui) *
*************************************************************************/
static int sendMachineAddEvent(int gui_transmission_id, ClusterObject * cluster_object)
{
proxy_msg *msg;
char proxy_generated_cluster_id_string[256];
char *machine_state_to_report = MACHINE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. state=%d.\n", __FUNCTION__, __LINE__, cluster_object->cluster_name, cluster_object->cluster_state);
memset(proxy_generated_cluster_id_string, '\0', sizeof(proxy_generated_cluster_id_string)); /* zero the area */
sprintf(proxy_generated_cluster_id_string, "%d", cluster_object->proxy_generated_cluster_id);
switch (cluster_object->cluster_state) {
case MY_STATE_UP:
machine_state_to_report = MACHINE_STATE_UP;
break;
case MY_STATE_DOWN:
machine_state_to_report = MACHINE_STATE_DOWN;
break;
default:
machine_state_to_report = MACHINE_STATE_UNKNOWN;
break;
}
msg = proxy_new_machine_event(gui_transmission_id, ibmll_proxy_base_id_string, proxy_generated_cluster_id_string, cluster_object->cluster_name, machine_state_to_report);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send queue aqdded event to gui (one per cluster) *
*************************************************************************/
static int sendQueueAddEvent(int gui_transmission_id, ClusterObject * cluster_object)
{
proxy_msg *msg;
char proxy_generated_queue_id_string[256];
char *queue_state_to_report = QUEUE_STATE_STOPPED;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. queue=%s. state=%d.\n", __FUNCTION__, __LINE__, cluster_object->cluster_name, cluster_object->queue_state);
memset(proxy_generated_queue_id_string, '\0', sizeof(proxy_generated_queue_id_string)); /* zero the area */
sprintf(proxy_generated_queue_id_string, "%d", cluster_object->proxy_generated_queue_id);
switch (cluster_object->cluster_state) {
case MY_STATE_UP:
queue_state_to_report = QUEUE_STATE_NORMAL;
break;
default:
queue_state_to_report = QUEUE_STATE_STOPPED;
break;
}
msg = proxy_new_queue_event(gui_transmission_id, ibmll_proxy_base_id_string, proxy_generated_queue_id_string, cluster_object->cluster_name, queue_state_to_report);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send node added event to gui *
*************************************************************************/
static int sendNodeAddEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object)
{
proxy_msg *msg;
char proxy_generated_cluster_id_string[256];
char proxy_generated_node_id_string[256];
char *node_state_to_report = NODE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. node=%s. state=%d.\n", __FUNCTION__, __LINE__, cluster_object->cluster_name, node_object->node_name, node_object->node_state);
memset(proxy_generated_cluster_id_string, '\0', sizeof(proxy_generated_cluster_id_string)); /* zero the area */
memset(proxy_generated_node_id_string, '\0', sizeof(proxy_generated_node_id_string)); /* zero the area */
sprintf(proxy_generated_cluster_id_string, "%d", cluster_object->proxy_generated_cluster_id);
sprintf(proxy_generated_node_id_string, "%d", node_object->proxy_generated_node_id);
switch (node_object->node_state) {
case MY_STATE_UP:
node_state_to_report = NODE_STATE_UP;
break;
case MY_STATE_DOWN:
node_state_to_report = NODE_STATE_DOWN;
break;
default:
node_state_to_report = NODE_STATE_UNKNOWN;
break;
}
msg = proxy_new_node_event(gui_transmission_id, proxy_generated_cluster_id_string, 1); /* 1==number new nodes in cluster */
proxy_add_node(msg, proxy_generated_node_id_string, node_object->node_name, node_state_to_report, 0); /* 0==extra attributes */
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send node changed event to gui *
*************************************************************************/
static int sendNodeChangeEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object)
{
proxy_msg *msg;
char proxy_generated_node_id_string[256];
char *node_state_to_report = NODE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. node=%s. state=%d.\n", __FUNCTION__, __LINE__, cluster_object->cluster_name, node_object->node_name, node_object->node_state);
memset(proxy_generated_node_id_string, '\0', sizeof(proxy_generated_node_id_string)); /* zero the area */
sprintf(proxy_generated_node_id_string, "%d", node_object->proxy_generated_node_id);
switch (node_object->node_state) {
case MY_STATE_UP:
node_state_to_report = NODE_STATE_UP;
break;
case MY_STATE_DOWN:
node_state_to_report = NODE_STATE_DOWN;
break;
default:
node_state_to_report = NODE_STATE_UNKNOWN;
break;
}
msg = proxy_node_change_event(gui_transmission_id, proxy_generated_node_id_string, 1); /* 1==number new nodes in cluster */
proxy_add_node(msg, proxy_generated_node_id_string, node_object->node_name, node_state_to_report, 0); /* 0==extra attributes */
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send node removed event to gui *
*************************************************************************/
static int sendNodeRemoveEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object)
{
proxy_msg *msg;
char proxy_generated_node_id_string[256];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s.\n", __FUNCTION__, __LINE__, cluster_object->cluster_name, node_object->node_name);
memset(proxy_generated_node_id_string, '\0', sizeof(proxy_generated_node_id_string)); /* zero the area */
sprintf(proxy_generated_node_id_string, "%d", node_object->proxy_generated_node_id);
msg = proxy_remove_node_event(gui_transmission_id, proxy_generated_node_id_string);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job added event to gui *
*************************************************************************/
static int sendJobAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char proxy_generated_queue_id_string[256];
char job_name_string[256];
char *job_state_to_report = JOB_STATE_INIT;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__, __LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string)); /* zero the area */
memset(proxy_generated_queue_id_string, '\0', sizeof(proxy_generated_queue_id_string)); /* zero the area */
memset(job_name_string, '\0', sizeof(job_name_string)); /* zero the area */
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
sprintf(proxy_generated_queue_id_string, "%d", cluster_object->proxy_generated_queue_id);
switch (job_object->job_state) {
case MY_STATE_IDLE:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_RUNNING:
job_state_to_report = JOB_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_TERMINATED:
job_state_to_report = JOB_STATE_TERMINATED;
break;
default:
job_state_to_report = JOB_STATE_INIT;
break;
}
sprintf(job_name_string, "%s.%d.%d", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc);
msg = proxy_new_job_event(gui_transmission_id, proxy_generated_queue_id_string, proxy_generated_job_id_string, job_name_string, job_state_to_report, job_object->gui_assigned_job_id);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job changed event to gui *
*************************************************************************/
static int sendJobChangeEvent(int gui_transmission_id, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char job_state_string[256];
char *job_state_to_report = JOB_STATE_INIT;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__, __LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string)); /* zero the area */
memset(job_state_string, '\0', sizeof(job_state_string)); /* zero the area */
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
switch (job_object->job_state) {
case MY_STATE_IDLE:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_RUNNING:
job_state_to_report = JOB_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_TERMINATED:
job_state_to_report = JOB_STATE_TERMINATED;
break;
default:
job_state_to_report = JOB_STATE_INIT;
break;
}
msg = proxy_job_change_event(gui_transmission_id, proxy_generated_job_id_string, 1);
sprintf(job_state_string, "%d", job_object->job_state);
proxy_add_string_attribute(msg, JOB_STATE_ATTR, job_state_to_report);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job removed event to gui *
*************************************************************************/
static int sendJobRemoveEvent(int gui_transmission_id, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__, __LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string)); /* zero the area */
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
msg = proxy_remove_job_event(gui_transmission_id, proxy_generated_job_id_string);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task added event to gui *
*************************************************************************/
static int sendTaskAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object, TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char proxy_generated_task_id_string[256];
char ll_task_id_string[256];
char *task_state_to_report = PROC_STATE_STOPPED;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. node=%s. task=%d.\n", __FUNCTION__, __LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, task_object->node_name, task_object->ll_task_id);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string)); /* zero the area */
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string)); /* zero the area */
memset(ll_task_id_string, '\0', sizeof(ll_task_id_string)); /* zero the area */
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
sprintf(ll_task_id_string, "%d", task_object->ll_task_id);
msg = proxy_new_process_event(gui_transmission_id, proxy_generated_job_id_string, 1); /* 1 == num tasks */
switch (task_object->task_state) {
case MY_STATE_IDLE:
task_state_to_report = PROC_STATE_STARTING;
break;
case MY_STATE_RUNNING:
task_state_to_report = PROC_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
task_state_to_report = PROC_STATE_STOPPED;
break;
case MY_STATE_TERMINATED:
task_state_to_report = PROC_STATE_EXITED;
break;
default:
task_state_to_report = PROC_STATE_STARTING;
break;
}
proxy_add_process(msg, proxy_generated_task_id_string, ll_task_id_string, task_state_to_report, 2); /* 2=extra attrs */
node_object = get_node_in_hash(cluster_object->node_hash, task_object->node_name);
proxy_add_int_attribute(msg, PROC_NODEID_ATTR, node_object->proxy_generated_node_id); /* proxy node id */
proxy_add_int_attribute(msg, PROC_INDEX_ATTR, task_object->ll_task_id); /* loadleveler task id */
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task changed event to gui *
*************************************************************************/
static int sendTaskChangeEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_task_id_string[256];
char *task_state_to_report = PROC_STATE_STOPPED;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string)); /* zero the area */
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
msg = proxy_process_change_event(gui_transmission_id, proxy_generated_task_id_string, 1);
switch (task_object->task_state) {
case MY_STATE_IDLE:
task_state_to_report = PROC_STATE_STARTING;
break;
case MY_STATE_RUNNING:
task_state_to_report = PROC_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
task_state_to_report = PROC_STATE_STOPPED;
break;
case MY_STATE_TERMINATED:
task_state_to_report = PROC_STATE_EXITED;
break;
default:
task_state_to_report = PROC_STATE_STARTING;
break;
}
proxy_add_string_attribute(msg, PROC_STATE_ATTR, task_state_to_report);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task removed event to gui *
*************************************************************************/
static int sendTaskRemoveEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_task_id_string[256];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string)); /* zero the area */
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
msg = proxy_remove_process_event(gui_transmission_id, proxy_generated_task_id_string);
enqueue_event_to_proxy_server(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* Generate a unique id *
*************************************************************************/
static int generate_id()
{
return ibmll_proxy_base_id + ibmll_last_id++;
}
/*************************************************************************
* Send an event message (proxy_msg) to gui *
*************************************************************************/
static void enqueue_event_to_proxy_server(proxy_msg * msg)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_proxy_message(msg);
pthread_mutex_lock(&proxy_svr_queue_msg_lock);
proxy_svr_queue_msg(ll_proxy, msg);
pthread_mutex_unlock(&proxy_svr_queue_msg_lock);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* Print command arguments from gui command calls *
*************************************************************************/
void print_message_args(int nargs, char *optional_args[])
{
int i = 0;
if (optional_args != NULL) {
while ((optional_args[i] != NULL) && (i < nargs)) {
print_message(ARGS_MESSAGE, " '%s'", optional_args[i++]);
}
}
print_message(ARGS_MESSAGE, "\n");
}
/*************************************************************************
* Proxy startup and command loop *
*************************************************************************/
int main(int argc, char *argv[])
{
char *user_libpath = NULL; /* final LoadLeveler lib path and
* shared library name */
char *host = "localhost";
char *proxy_str = DEFAULT_PROXY;
int ch;
int port = PROXY_TCP_PORT;
int rc;
struct passwd *pwd;
int i = 0;
char *cp = NULL;
int n = 0;
n = 0; /* prevent warning on linux for unsed var (used only on aix) */
/* preprocess input args to set debug loop and message processing
* before we do any further processing */
for (i = 0; i < argc; i++) {
if (strstr(argv[i], "debug_loop=y") != NULL) {
state_debug_loop = 1; /* turn on hard loop for attaching debugger */
continue;
}
if (strstr(argv[i], "trace_messages=y") != NULL) {
state_trace = 1; /* turn on tracing messages */
continue;
}
if (strstr(argv[i], "info_messages=y") != NULL) {
state_info = 1; /* turn on info messages */
continue;
}
if (strstr(argv[i], "warning_messages=y") != NULL) {
state_warning = 1; /* turn on warning messages */
continue;
}
if (strstr(argv[i], "error_messages=y") != NULL) {
state_error = 1; /* turn on error messages */
continue;
}
if (strstr(argv[i], "fatal_messages=y") != NULL) {
state_fatal = 1; /* turn on fatal messages */
continue;
}
if (strstr(argv[i], "args_messages=y") != NULL) {
state_args = 1; /* turn on args messages */
continue;
}
}
register_thread(pthread_self());
/* set following directive to "if 1" and recompile to turn on debug, "if 0" and recompile to turn off debug */
if (state_debug_loop) {
int debug_loop = 1;
while (debug_loop > 0) {
if ((debug_loop % 60) == 1) {
print_message(INFO_MESSAGE, "Debug loop - attach debugger, set breakpoints and then change debug_loop to 0 to continue.\n");
}
debug_loop++;
sleep(1);
}
}
gethostname(hostname,sizeof(hostname));
cp = strchr(hostname, '.');
if (cp != NULL) {
*cp = '\0';
}
for (i = 0; i < argc; i++) {
print_message(INFO_MESSAGE, "Main called with arg[%d]=%s.\n", i, argv[i]);
}
pwd = getpwuid(getuid());
userid = strdup(pwd->pw_name);
#ifdef __linux__
while ((ch = getopt_long(argc, argv, "P:p:h:l:d:t:i:w:e:f:a:m:o:r:x:y:z:", longopts, NULL)) != -1) {
switch (ch) {
case 'P':
proxy_str = strdup(optarg);
break;
case 'p':
port = atoi(optarg);
break;
case 'g':
debug_level = atoi(optarg);
break;
case 'h':
host = strdup(optarg);
break;
case 'l':
user_libpath = strdup(optarg); /* user has specified override full path to LoadLeveler shared library */
break;
case 'm':
if (strncmp(optarg, "y", 1) == 0) { /* y - multicluster forced on */
multicluster_status = 1; /* force multicluster */
} else if (strncmp(optarg, "n", 1) == 0) { /* n - multicluster forced off */
multicluster_status = 0; /* force local */
} else { /* d - default to LoadLeveler determine multicluster state */
multicluster_status = -1; /* allow LoadLeveler to determine mode */
}
break;
case 'o':
static_template_override = strdup(optarg); /* user supplied template file name */
break;
case 'r':
if (strncmp(optarg, "y", 1) == 0) { /* write template file always (at main startup) */
state_template = 1; /* write template at every main startup */
} else { /* n - do not write default template file */
state_template = 0; /* never write template file */
}
break;
case 'd': /* already preprocessed */
case 't': /* already preprocessed */
case 'i': /* already preprocessed */
case 'w': /* already preprocessed */
case 'e': /* already preprocessed */
case 'f': /* already preprocessed */
case 'a': /* already preprocessed */
break;
case 'x': /* min node polling */
min_node_sleep_seconds = atoi(optarg);
break;
case 'y': /* max node polling */
max_node_sleep_seconds = atoi(optarg);
break;
case 'z': /* job polling */
job_sleep_seconds = atoi(optarg);
break;
default:
print_message(ERROR_MESSAGE, "%s [--Proxy=proxy] [--host=host_name] [--port=port] [--debug=level] [--lib_override=directory] [--debug_loop=y|n] [--trace_messages=y|n] [--info_messages=y|n] [--warning_messages=y|n] [--error_messages=y|n] [--fatal_messages=y|n] [--args_messages=y|n] [--multicluster=d|n|y] [--template_override=file] [--template_write=y|n] --node_polling_min=value --node_polling_max=value --job_polling=value \n", argv[0]);
fflush(stderr);
return 1;
}
}
#else
/* AIX does not have the getopt_long function. Since the proxy is
* invoked only by the PTP front end, the following simplified options
* parsing is sufficient.
* Make sure that this is maintained in sync with the above
* getopt_long loop.
*/
n = 1;
while (n < argc) {
cp = strchr(argv[n], '=');
if (cp != NULL) {
*cp = '\0';
if (strcmp(argv[n], "--proxy") == 0) {
proxy_str = strdup(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--port") == 0) {
port = atoi(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--host") == 0) {
host = strdup(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--lib_override") == 0) {
user_libpath = strdup(cp + 1); /* user has specified override full path to LoadLeveler shared library */
n = n + 1;
}
else if (strcmp(argv[n], "--multicluster") == 0) {
if (strncmp(cp + 1, "y", 1) == 0) { /* y - multicluster forced on */
multicluster_status = 1; /* force multicluster */
} else if (strncmp(cp + 1, "n", 1) == 0) { /* n - multicluster forced off */
multicluster_status = 0; /* force local */
} else { /* d - default to LoadLeveler determine multicluster state */
multicluster_status = -1; /* allow LoadLeveler to determine mode */
}
n = n + 1;
}
else if (strcmp(argv[n], "--template_override") == 0) {
static_template_override = strdup(cp + 1); /* user supplied template file name */
n = n + 1;
}
else if (strcmp(argv[n], "--template_write") == 0) {
if (strncmp(cp + 1, "y", 1) == 0) { /* write template file always (at main startup) */
state_template = 1; /* write template at every main startup */
} else { /* n - do not write default template file */
state_template = 0; /* never write template file */
}
n = n + 1;
}
else if ((strcmp(argv[n], "--debug_loop") == 0) || /* already preprocessed */
(strcmp(argv[n], "--trace_messages") == 0) || /* already preprocessed */
(strcmp(argv[n], "--info_messages") == 0) || /* already preprocessed */
(strcmp(argv[n], "--warning_messages") == 0) || /* already preprocessed */
(strcmp(argv[n], "--error_messages") == 0) || /* already preprocessed */
(strcmp(argv[n], "--fatal_messages") == 0) || /* already preprocessed */
(strcmp(argv[n], "--args_messages") == 0)) { /* already preprocessed */
n = n + 1;
}
else if (strcmp(argv[n], "--node_polling_min") == 0) {
min_node_sleep_seconds = atoi(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--node_polling_max") == 0) {
max_node_sleep_seconds = atoi(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--debug") == 0) {
debug_level = atoi(cp + 1);
n = n + 1;
}
else if (strcmp(argv[n], "--job_polling") == 0) {
job_sleep_seconds = atoi(cp + 1);
n = n + 1;
}
else {
print_message(ERROR_MESSAGE, "Invalid argument %s (%d)\n", argv[n], n);
print_message(ERROR_MESSAGE, "%s [--Proxy=proxy] [--host=host_name] [--port=port] [--lib_override=directory] [--debug_loop=y|n] [--trace_messages=y|n] [--info_messages=y|n] [--warning_messages=y|n] [--error_messages=y|n] [--fatal_messages=y|n] [--args_messages=y|n] [--multicluster=d|n|y] [--template_override=file] [--template_write=y|n] --node_polling_min=value --node_polling_max=value --job_polling=value \n", argv[0]);
fflush(stderr);
return 1;
}
}
}
#endif
ptp_signal_exit = 0;
ptp_signal_thread = -1;
signal(SIGINT, ptp_signal_handler);
signal(SIGHUP, ptp_signal_handler);
signal(SIGILL, ptp_signal_handler);
signal(SIGSEGV, ptp_signal_handler);
signal(SIGTERM, ptp_signal_handler);
signal(SIGQUIT, ptp_signal_handler);
signal(SIGABRT, ptp_signal_handler);
rc = server(proxy_str, host, port, user_libpath);
state_shutdown_requested = 1;
exit(rc);
}
/*************************************************************************
* Handle a signal *
*************************************************************************/
RETSIGTYPE ptp_signal_handler(int sig)
{
ptp_signal_exit = sig;
ptp_signal_thread = find_thread(pthread_self());
if (sig >= 0 && sig < NSIG) {
RETSIGTYPE(*saved_signal) (int) = saved_signals[sig];
if (saved_signal != SIG_ERR && saved_signal != SIG_IGN && saved_signal != SIG_DFL) {
saved_signal(sig);
}
}
}
/*************************************************************************
* Print message (with time and date and thread id) *
* Info, Trace, Arg and Warning messages go to stdout. *
* Error and Fatal messages go to stderr. *
*************************************************************************/
static void print_message(int type, const char *format, ...)
{
va_list ap;
char timebuf[20];
time_t my_clock;
struct tm a_tm;
int thread_id = 0;
memset(timebuf, '\0', sizeof(timebuf)); /* zero the area */
pthread_mutex_lock(&print_message_lock);
pthread_t tid = pthread_self(); /* what thread am I ? */
for (thread_id = 0; thread_id < (sizeof(thread_map_table) / sizeof(pthread_t)); thread_id++) {
if (tid == thread_map_table[thread_id]) {
break;
}
}
time(&my_clock); /* what time is it ? */
localtime_r(&my_clock, &a_tm);
strftime(&timebuf[0], 15, "%m/%d %02H:%02M:%02S", &a_tm);
va_start(ap, format);
switch (type) {
case INFO_MESSAGE:
if (state_info == 1) { /* if info messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Info: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case TRACE_MESSAGE:
if (state_trace == 1) { /* if trace messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Trace: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case WARNING_MESSAGE:
if (state_warning == 1) { /* if warning messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Warning: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case ERROR_MESSAGE:
if (state_error == 1) { /* if error messages allowed */
if (state_message_timedate != 0) {
fprintf(stderr, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stderr, "T(%d) ", thread_id);
}
fprintf(stderr, "Error: ");
vfprintf(stderr, format, ap);
fflush(stderr);
}
break;
case FATAL_MESSAGE: /* fatal messages are never suppressed */
if (state_fatal == 1) { /* if fatal messages allowed */
if (state_message_timedate != 0) {
fprintf(stderr, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stderr, "T(%d) ", thread_id);
}
fprintf(stderr, "Fatal: ");
vfprintf(stderr, format, ap);
fflush(stderr);
}
break;
case ARGS_MESSAGE:
if (state_args == 1) { /* if formatted arg messages are allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Args: ");
vfprintf(stdout, format, ap);
fprintf(stdout, "\n");
fflush(stdout);
}
break;
default: /* unknown message type - allow it */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, ": ");
vfprintf(stdout, format, ap);
fflush(stdout);
break;
}
va_end(ap);
pthread_mutex_unlock(&print_message_lock);
}
/*************************************************************************
* main proxy processing *
*************************************************************************/
int server(char *name, char *host, int port, char *user_libpath)
{
/*
* Initialize the proxy, connect to front end, then run the main loop
* until the proxy is requested to shut down.
*/
char shared_buffer[1024];
char *msg1 = NULL;
char *msg2 = NULL;
int rc = 0;
struct timeval timeout = {
0, 20000
};
int lib_found = 0;
int status = 0;
struct stat statinfo;
int i = 0;
int save_errno = 0;
int connect_rc=0;
#ifdef __linux__
char *libpath[] = {
NULL, "/opt/ibmll/LoadL/full/lib/", "/opt/ibmll/LoadL/so/lib/", (char *) -1
};
char *libname = "libllapi.so";
#else
char *libpath[] = {
NULL, "/usr/lpp/LoadL/full/lib", "/usr/lpp/LoadL/so/lib", "/opt/ibmll/LoadL/full/lib/", "/opt/ibmll/LoadL/so/lib/", (char *) -1
};
char *libname = "libllapi.a";
#endif
int template_write = 1; /* preset to write */
FILE *template_FILE = NULL;
int template_count = 0;
memset(shared_buffer, '\0', sizeof(shared_buffer)); /* zero the area */
memset(static_template_name, '\0', sizeof(static_template_name)); /* zero the area */
/*-----------------------------------------------------------------------*
* *
* Find the LoadLeveler shared library we are using for AIX or Linux. *
* If we cannot find it then LoadLeveler is not installed or we have *
* been directed to use the wrong path. If we find it OK then we will *
* try to dynamic open the library and compare versions later in the *
* command_initialize. *
*-----------------------------------------------------------------------*/
if (user_libpath != NULL) { /* if user specified lib */
if (strlen(user_libpath) > 0) { /* if not a null string */
libpath[0] = user_libpath; /* pick up user specified libpath
* as only one to check */
libpath[1] = (char *) -1; /* new end of list */
}
}
lib_found = 0; /* preset to not found */
print_message(INFO_MESSAGE, "Searching for LoadLeveler shared library.\n");
while ((libpath[i] != (char *) -1) && (lib_found == 0)) { /* if not end of list and not yet found */
if (libpath[i] != NULL) { /* if valid entry */
strcpy(shared_buffer, libpath[i]);
strcat(shared_buffer, "/");
strcat(shared_buffer, libname);
/* see if this is a valid LoadLeveler shared library */
print_message(INFO_MESSAGE, "Trying: %s\n", shared_buffer);
status = stat(shared_buffer, &statinfo);
save_errno = errno;
if (status == 0) {
#ifdef _AIX
strcat(shared_buffer,"(shr.o)");
#endif
ibmll_libpath_name = strdup(shared_buffer);
print_message(INFO_MESSAGE, "Successful search: Found LoadLeveler shared library %s\n", ibmll_libpath_name);
lib_found = 1; /* we found it */
break;
} else {
print_message(ERROR_MESSAGE, "Search failure: \"stat\" of LoadLeveler shared library %s returned errno=%d.\n", shared_buffer, save_errno);
}
}
i++;
}
if (lib_found == 0) {
print_message(FATAL_MESSAGE, "No LoadLeveler shared library found - quitting...\n");
state_shutdown_requested = 1;
exit(99);
}
/*-----------------------------------------------------------------------*
* If allowed, write the template file to /tmp *
* Template file will be saved with userid as *
* /tmp/PTP_IBMLL_TEMPLATE_userid *
*-----------------------------------------------------------------------*/
switch (state_template) {
case 0: /* do not write template file */
template_write = 0;
break;
case 1: /* always write template file */
default:
template_write = 1;
break;
}
if (template_write == 1) {
strcpy(&static_template_name[0], static_template_prefix);
strcpy(&static_template_name[strlen(static_template_prefix)], userid);
print_message(INFO_MESSAGE, "Writing job command template file %s.\n", static_template_name);
template_FILE = fopen(static_template_name, "w"); /* reset file length to 0 or create file */
template_count = sizeof(job_command_file_template) / sizeof(job_command_file_template[0]);
for (i = 0; i < template_count; i++) {
fputs(job_command_file_template[i], template_FILE);
fputs("\n", template_FILE);
}
fclose(template_FILE);
}
/*-----------------------------------------------------------------------*
* continue proxy initialization *
*-----------------------------------------------------------------------*/
events = NewList();
if (proxy_svr_init(name, &timeout, &helper_funcs, &command_tab, &ll_proxy) != PROXY_RES_OK) {
return 0;
}
if ((connect_rc = proxy_svr_connect(ll_proxy, host, port)) == PROXY_RES_OK) {
print_message(INFO_MESSAGE, "Running proxy on port %d.\n", port);
while (ptp_signal_exit == 0 && state_shutdown_requested == 0) {
if ((proxy_svr_progress(ll_proxy) != PROXY_RES_OK)) {
print_message(INFO_MESSAGE, "Ending node monitor loop\n");
break;
}
}
if (ptp_signal_exit != 0) {
switch (ptp_signal_exit) {
case SIGINT:
msg1 = "INT";
msg2 = "Interrupt";
break;
case SIGHUP:
msg1 = "HUP";
msg2 = "Hangup";
break;
case SIGILL:
msg1 = "ILL";
msg2 = "Illegal Instruction";
break;
case SIGSEGV:
msg1 = "SEGV";
msg2 = "Segmentation Violation";
break;
case SIGTERM:
msg1 = "TERM";
msg2 = "Termination";
break;
case SIGQUIT:
msg1 = "QUIT";
msg2 = "Quit";
break;
case SIGABRT:
msg1 = "ABRT";
msg2 = "Process Aborted";
break;
default:
msg1 = "***UNKNOWN SIGNAL***";
msg2 = "ERROR - UNKNOWN SIGNAL";
break;
}
print_message(FATAL_MESSAGE, "###### SIGNAL: %i (%s) detected in T(%i)\n", ptp_signal_exit, msg1, ptp_signal_thread);
print_message(FATAL_MESSAGE, "###### Shutting down proxy\n");
print_message(FATAL_MESSAGE, "ptp_ibmll_proxy received signal %s (%s) and is exiting.\n", msg1, msg2);
/* our return code = the signal that fired */
rc = ptp_signal_exit;
}
}
else {
print_message(ERROR_MESSAGE,"proxy connection failed. rc=%d\n",connect_rc);
}
proxy_svr_finish(ll_proxy);
print_message(INFO_MESSAGE, "proxy_svr_finish returned.\n");
state_shutdown_requested = 1;
return rc;
}
/*************************************************************************
* Call LoadLeveler to retrieve the cluster element *
*************************************************************************/
int my_ll_cluster(int version, LL_element ** errObj, LL_cluster_param * cp)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. version=%d.\n", __FUNCTION__, __LINE__, version);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_cluster) (version, errObj, cp); /* set the cluster name */
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(INFO_MESSAGE, "LoadLeveler ll_cluster rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to get data *
*************************************************************************/
int my_ll_get_data(LL_element * request, enum LLAPI_Specification spec, void *result)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. LLAPI_Specification=%d.\n", __FUNCTION__, __LINE__, spec);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_get_data) (request, spec, result);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(INFO_MESSAGE, "LoadLeveler ll_get_data rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to perform a query *
*************************************************************************/
LL_element *my_ll_query(enum QueryType type)
{
LL_element *query_elem = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. QueryType=%d.\n", __FUNCTION__, __LINE__, type);
pthread_mutex_lock(&access_LoadLeveler_lock);
query_elem = (*LL_SYMS.ll_query) (type);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (query_elem == NULL) {
print_message(INFO_MESSAGE, "LoadLeveler ll_query element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return query_elem;
}
/*************************************************************************
* Call LoadLeveler to free the objects in the query element *
*************************************************************************/
int my_ll_free_objs(LL_element * query_elem)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_free_objs) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_free_objs rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to deallocate the query element *
*************************************************************************/
int my_ll_deallocate(LL_element * query_elem)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_deallocate) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_deallocate rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to get the next object from the returned list *
*************************************************************************/
LL_element *my_ll_next_obj(LL_element * query_elem)
{
LL_element *next_elem = NULL;;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
next_elem = (*LL_SYMS.ll_next_obj) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (next_elem == NULL) {
print_message(INFO_MESSAGE, "LoadLeveler ll_next_obj element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return next_elem;
}
/*************************************************************************
* Call LoadLeveler to get objects from the previously returned element *
*************************************************************************/
LL_element *my_ll_get_objs(LL_element * query_elem, enum LL_Daemon daemon, char *ignore, int *value, int *rc)
{
*rc = 0; /* preset rc to 0 */
LL_element *ret_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. LL_Daemon=%d.\n", __FUNCTION__, __LINE__, daemon);
pthread_mutex_lock(&access_LoadLeveler_lock);
ret_object = LL_SYMS.ll_get_objs(query_elem, daemon, ignore, value, rc);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (ret_object == NULL) {
print_message(INFO_MESSAGE, "LoadLeveler ll_get_objs element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, *rc);
return ret_object;
}
/*************************************************************************
* Call LoadLeveler to build a request object for a subsequent call *
*************************************************************************/
int my_ll_set_request(LL_element * query_elem, enum QueryFlags qflags, char **ignore, enum DataFilter dfilter)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. QueryFlags=%d.\n", __FUNCTION__, __LINE__, qflags);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = LL_SYMS.ll_set_request(query_elem, qflags, ignore, dfilter);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_set_request rc=%i.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to submit a job *
* *
* In the proxy we are accepting environment variables to be passed on *
* to submit by forking the submit and making sure we se the env vars *
* into the environment. *
*************************************************************************/
int my_ll_submit_job(int gui_transmission_id, char *job_sub_id, char *command_file, char *job_env_vars[][3])
{
LL_job job_info;
LL_job_step *job_step_info;
JobObject *job_object = NULL;
int submit_rc = 0;
LL_cluster_param cluster_parm;
LL_element *errObj = NULL;
int i = 0;
ClusterObject *cluster_object = NULL;
ListElement *cluster_list_element = NULL;
time_t my_clock;
char *tempstring = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(&job_info, '\0', sizeof(job_info)); /* zero the job info area */
pthread_mutex_lock(&master_lock);
cluster_list_element = cluster_list->l_head; /* always submit to the first cluster */
cluster_object = cluster_list_element->l_value; /* get our cluster object from this list element */
/*-----------------------------------------------------------------------*
* always submit local regardless if running multicluster or not *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET; /* we are unsetting the cluster */
cluster_parm.cluster_list = NULL;
submit_rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* unset the cluster name - back to local */
pthread_mutex_lock(&access_LoadLeveler_lock);
/*-----------------------------------------------------------------------*
* retrieve environment variables *
*-----------------------------------------------------------------------*/
i = 0;
while (job_env_vars[i][0] != NULL) { /* if keyword */
tempstring = getenv(job_env_vars[i][0]);
if (tempstring != NULL) {
print_message(INFO_MESSAGE, "Retrieving env var %s=%s.\n", job_env_vars[i][0], tempstring);
job_env_vars[i][2] = strdup(tempstring); /* if old value */
}
i++;
}
/*-----------------------------------------------------------------------*
* write new environment variables *
*-----------------------------------------------------------------------*/
i = 0;
while (job_env_vars[i][0] != NULL) { /* if keyword */
if (job_env_vars[i][1] != NULL) { /* if new value */
if ((job_env_vars[i][2] == NULL) || /* if no old val */
((job_env_vars[i][2] != NULL) && /* or if we had old value and different than new */
(strcmp(job_env_vars[i][1], job_env_vars[i][2]) != 0))) { /* and if old and new not identical */
print_message(INFO_MESSAGE, "Setting env var %s=%s.\n", job_env_vars[i][0], job_env_vars[i][1]);
setenv(job_env_vars[i][0], job_env_vars[i][1], 1); /* set the new value */
}
}
i++;
}
/*-----------------------------------------------------------------------*
* submit job *
*-----------------------------------------------------------------------*/
submit_rc = LL_SYMS.ll_submit_job(command_file, NULL, NULL, &job_info, LL_JOB_VERSION);
if (submit_rc == -1) {
char *theError = LL_SYMS.ll_error(NULL, 3); /* print any internal errors */
print_message(ERROR_MESSAGE, "Internal errors (if any) returned by LoadLeveler submit API:\n%s\n\n", theError);
}
/*-----------------------------------------------------------------------*
* restore environment variables *
*-----------------------------------------------------------------------*/
i = 0;
while (job_env_vars[i][0] != NULL) { /* if keyword */
if (job_env_vars[i][1] != NULL) { /* if new value */
if (job_env_vars[i][2] != NULL) { /* if we had old value */
if (strcmp(job_env_vars[i][1], job_env_vars[i][2]) != 0) { /* and if old and new not identical */
print_message(INFO_MESSAGE, "Restoring env var %s=%s.\n", job_env_vars[i][0], job_env_vars[i][2]);
setenv(job_env_vars[i][0], job_env_vars[i][2], 1); /* restore old value back into environment */
}
} /* end if we had old value */
else { /* if new only - no old */
print_message(INFO_MESSAGE, "Unsetting env var %s.\n", job_env_vars[i][0]);
unsetenv(job_env_vars[i][0]); /* unset new value we put in environment */
} /* end if new value only */
} /* end if we had a new value */
i++;
} /* end while loop on keywords */
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (submit_rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_submit_job rc=%i.\n", submit_rc);
} else {
print_message(INFO_MESSAGE, "Job Submitted: job_name=%s submithost=%s owner=%s steps=%d.\n", job_info.job_name, job_info.submit_host, job_info.owner, job_info.steps);
time(&my_clock); /* what time is it ? */
for (i = 0; i < job_info.steps; i++) {
job_step_info = job_info.step_list[i];
print_message(INFO_MESSAGE, "Job step: step_name=%s. step_class=%s. step_id=%s.%d.%d\n", job_step_info->step_name, job_step_info->stepclass, job_step_info->id.from_host, job_step_info->id.cluster, job_step_info->id.proc);
/*-----------------------------------------------------------------------*
* create new job object for submitted job *
*-----------------------------------------------------------------------*/
job_object = (JobObject *) malloc(sizeof(JobObject));
malloc_check(job_object, __FUNCTION__, __LINE__);
memset(job_object, '\0', sizeof(job_object)); /* zero the malloc area */
job_object->proxy_generated_job_id = generate_id(); /* a unique identifier for this cluster */
job_object->gui_assigned_job_id = "-1"; /* preset to async id (2 - n) */
if (i == 0) { /* if first */
job_object->gui_assigned_job_id = job_sub_id; /* pick up the parsed jobid for the first jobstep in job */
} /* end if first */
job_object->ll_step_id.from_host = strdup(job_step_info->id.from_host);
job_object->ll_step_id.cluster = job_step_info->id.cluster;
job_object->ll_step_id.proc = job_step_info->id.proc;
job_object->task_list = NewList(); /* list to hold tasks for this job step */
job_object->cluster_name = strdup(cluster_object->cluster_name);
job_object->job_state = MY_STATE_IDLE;
job_object->job_submit_time = my_clock; /* time since epoch when job submitted */
add_job_to_list(job_list, (void *) job_object); /* add the new job object to the list */
print_message(INFO_MESSAGE, "Schedule event notification: Job=%s.%d.%d added for LoadLeveler Cluster=%s.\n", job_object->ll_step_id.from_host, job_object->ll_step_id.cluster, job_object->ll_step_id.proc, job_object->cluster_name);
sendJobAddEvent(start_events_gui_transmission_id, cluster_object, job_object);
}
my_ll_free_job_info(&job_info);
}
pthread_mutex_unlock(&master_lock);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, submit_rc);
return submit_rc;
}
/*************************************************************************
* Call LoadLeveler to cancel a job *
*************************************************************************/
int my_ll_terminate_job(int gui_transmission_id, JobObject * job_object)
{
LL_terminate_job_info terminate_job_info;
int rc = 0;
LL_cluster_param cluster_parm;
LL_element *errObj = NULL;
char *remote_cluster[2];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(&terminate_job_info, '\0', sizeof(terminate_job_info)); /* zero the terminate job info area */
pthread_mutex_lock(&master_lock);
if (multicluster_status == 1) { /* if running multicluster */
/*-----------------------------------------------------------------------*
* we are running multicluster - set cluster name into environment *
* to influence where LoadLeveler searches for data (what cluster) *
*-----------------------------------------------------------------------*/
remote_cluster[0] = job_object->cluster_name;
remote_cluster[1] = NULL;
print_message(INFO_MESSAGE, "Setting access for LoadLeveler cluster %s.\n", job_object->cluster_name);
cluster_parm.action = CLUSTER_SET; /* we are setting the cluster for remote access */
cluster_parm.cluster_list = remote_cluster; /* cluster name we want to cancel job in */
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* set the cluster name */
} /* end if this is not a local cluster */
else { /* this is a local cluster */
/*-----------------------------------------------------------------------*
* not running multicluster *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE, "Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET; /* we are unsetting the cluster */
cluster_parm.cluster_list = NULL;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm); /* unset the cluster name - back to local */
} /* end if this is a local cluster */
/*-----------------------------------------------------------------------*
* send the ll cancel command and hope that it eventually works *
*-----------------------------------------------------------------------*/
terminate_job_info.version_num = LL_PROC_VERSION;
terminate_job_info.StepId.cluster = job_object->ll_step_id.cluster; /* old notation for job number */
terminate_job_info.StepId.proc = job_object->ll_step_id.proc; /* old notation for step number */
terminate_job_info.StepId.from_host = job_object->ll_step_id.from_host; /* old notation for originating schedd node name */
terminate_job_info.msg = NULL; /* message to use */
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = LL_SYMS.ll_terminate_job(&terminate_job_info);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_terminate_job rc=%i.\n", rc);
}
pthread_mutex_unlock(&master_lock);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* call LoadLeveler to free job info *
*************************************************************************/
void my_ll_free_job_info(LL_job * job_info)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
LL_SYMS.ll_free_job_info(job_info, LL_JOB_VERSION);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* find node in my hash table *
*************************************************************************/
NodeObject *get_node_in_hash(Hash * node_hash, char *node_name)
{
int hash_key = 0;
List *node_list = NULL;
ListElement *node_list_element = NULL;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. name=%s.\n", __FUNCTION__, __LINE__, node_name);
hash_key = HashCompute(node_name, strlen(node_name));
node_list = HashSearch(node_hash, hash_key);
if (node_list != NULL) {
node_list_element = node_list->l_head; /* beginning of the list */
while (node_list_element != NULL) {
node_object = node_list_element->l_value;
node_list_element = node_list_element->l_next; /* prepare for next pass */
if (strcmp(node_name, node_object->node_name) == 0) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. node_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, node_object);
return node_object;
}
}
}
node_object = NULL; /* not found in list */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. node_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, node_object);
return node_object;
}
/*************************************************************************
* add node to my hash table *
*************************************************************************/
void add_node_to_hash(Hash * node_hash, NodeObject * node_object)
{
int hash_key;
List *node_list;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. object=x\'%08x\'.\n", __FUNCTION__, __LINE__, node_object);
hash_key = HashCompute(node_object->node_name, strlen(node_object->node_name));
node_list = HashSearch(node_hash, hash_key);
if (node_list == NULL) {
node_list = NewList();
AddToList(node_list, node_object);
HashInsert(node_hash, hash_key, node_list);
} else {
AddToList(node_list, node_object);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* find job in my list *
*************************************************************************/
JobObject *get_job_in_list(List * job_list, LL_STEP_ID ll_step_id)
{
ListElement *job_list_element = NULL;
JobObject *job_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. name=%s.%d.%d.\n", __FUNCTION__, __LINE__, ll_step_id.from_host, ll_step_id.cluster, ll_step_id.proc);
job_list_element = job_list->l_head; /* beginning of the list */
while (job_list_element != NULL) {
job_object = job_list_element->l_value;
job_list_element = job_list_element->l_next; /* prepare for next pass */
if ((strcmp(ll_step_id.from_host, job_object->ll_step_id.from_host) == 0) && (ll_step_id.cluster == job_object->ll_step_id.cluster) && (ll_step_id.proc == job_object->ll_step_id.proc)) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, job_object);
return job_object;
}
}
job_object = NULL; /* not found in list */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, job_object);
return job_object;
}
JobObject *get_job_in_list_from_id(List * job_list, int job_id)
{
ListElement *job_list_element = NULL;
JobObject *job_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%d.\n", __FUNCTION__, __LINE__, job_id);
job_list_element = job_list->l_head; /* beginning of the list */
while (job_list_element != NULL) {
job_object = job_list_element->l_value;
job_list_element = job_list_element->l_next; /* prepare for next pass */
if (job_id == job_object->proxy_generated_job_id) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, job_object);
return job_object;
}
}
job_object = NULL; /* not found in list */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, job_object);
return job_object;
}
/*************************************************************************
* add job to my list *
*************************************************************************/
void add_job_to_list(List * job_list, JobObject * job_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, job_object);
AddToList(job_list, job_object);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* save real thread number for mapping to useable number (0, 1, 2, ...) *
*************************************************************************/
int register_thread(pthread_t handle)
{
int thread_id = -1;
for (thread_id = 0; thread_id < (sizeof(thread_map_table) / sizeof(pthread_t)); thread_id++) {
if (thread_map_table[thread_id] == 0) {
thread_map_table[thread_id] = handle;
break;
}
}
return thread_id;
}
/*************************************************************************
* map my threads to useable number (0, 1, 2, ...) *
*************************************************************************/
int find_thread(pthread_t handle)
{
int thread_id = -1;
for (thread_id = 0; thread_id < (sizeof(thread_map_table) / sizeof(pthread_t)); thread_id++) {
if (thread_map_table[thread_id] == handle) {
break;
}
}
return thread_id;
}
/*************************************************************************
* verify that malloc didn't fail *
*************************************************************************/
void malloc_check(void *p, const char *function, int line)
{
if (p == NULL) {
print_message(ERROR_MESSAGE, "Memory allocation error in function %s (line %d)\n", function, line);
state_shutdown_requested = 1;
exit(1);
}
}
/*************************************************************************
* find task in my list *
*************************************************************************/
TaskObject *get_task_in_list(List * task_list, char *task_instance_machine_name, int ll_task_id)
{
TaskObject *task_object = NULL;
ListElement *task_list_element = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%d.\n", __FUNCTION__, __LINE__, ll_task_id);
task_list_element = task_list->l_head; /* beginning of the list */
while (task_list_element != NULL) { /* while we have a valid task element */
task_object = task_list_element->l_value;
task_list_element = task_list_element->l_next; /* prepare for next pass */
if ((ll_task_id == task_object->ll_task_id) && (strcmp(task_object->node_name, task_instance_machine_name) == 0)) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. task_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, task_object);
return task_object;
}
} /* end while we have a valid task element */
task_object = NULL; /* not found */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. task_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, task_object);
return task_object;
}
/*************************************************************************
* add task to my list *
*************************************************************************/
void add_task_to_list(List * task_list, TaskObject * task_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. task_object=x\'%08x\'.\n", __FUNCTION__, __LINE__, task_object);
AddToList(task_list, task_object);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* delete node from my list *
*************************************************************************/
void delete_node_from_list(List * node_list, NodeObject * node_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if (node_object->node_name != NULL) {
free(node_object->node_name);
node_object->node_name = NULL;
}
RemoveFromList(node_list, node_object); /* remove from list and free memory */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* delete job from my list *
*************************************************************************/
void delete_job_from_list(List * job_list, JobObject * job_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if (job_object->ll_step_id.from_host != NULL) {
free(job_object->ll_step_id.from_host);
job_object->ll_step_id.from_host = NULL;
}
if (job_object->cluster_name != NULL) {
free(job_object->cluster_name);
job_object->cluster_name = NULL;
}
if (job_object->task_list != NULL) {
free(job_object->task_list);
job_object->task_list = NULL;
}
RemoveFromList(job_list, job_object); /* remove from list and free memory */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* delete task from my list *
*************************************************************************/
void delete_task_from_list(List * task_list, TaskObject * task_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if (task_object->node_name != NULL) {
free(task_object->node_name);
task_object->node_name = NULL;
}
if (task_object->node_address != NULL) {
free(task_object->node_address);
task_object->node_address = NULL;
}
RemoveFromList(task_list, task_object); /* remove from list and free memory */
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* dump out all the parms being sent back to the proxy front end *
*************************************************************************/
void print_proxy_message(proxy_msg * msg)
{
int i = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
print_message(INFO_MESSAGE, "msg_id=%d. trans_id=%d. num_args=%d. arg_size=%d.\n", msg->msg_id, msg->trans_id, msg->num_args, msg->arg_size);
if (msg->num_args > 0) { /* if we have arguments */
print_message(INFO_MESSAGE, "proxy msg (event) arguments:\n");
for (i = 0; i < msg->num_args; i++) {
print_message(INFO_MESSAGE, "arg[%d]=%s.\n", i, msg->args[i]);
}
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* see if <<<XXXXXXXX>>> variable in the line *
*************************************************************************/
int is_substitution_required(char *line)
{
char *position1 = NULL;
char *position2 = NULL;
for (position1 = line; strlen(position1) >= 6; position1++) {
if (strncmp(position1, "<<<", 3) == 0) { /* if <<< found */
for (position2 = position1 + 3; strlen(position2) >= 3; position2++) {
if (((position2[0] >= 'a') && (position2[0] <= 'z')) || ((position2[0] >= 'A') && (position2[0] <= 'Z')) || ((position2[0] >= '0') && (position2[0] <= '9')) || (position2[0] == '_')) {
continue;
} else {
if (strncmp(position2, ">>>", 3) == 0) { /* if >>> found */
return 1; /* found a <<<XXXXXXXX>>> match */
} /* end if <<< found */
else {
break;
}
}
}
} /* end if <<< found */
}
return 0;
}