- Created by Axel Bonet, last modified on Apr 13, 2020
Thanks to python syntax, provided there is no need to use include to merge code, one single python script can be used on the command line, but also as a task wrapper with no change, and it may provide the ability to load the task into the expected server/suite for operation and test.
meter sweeper Expand source
#!/usr/bin/env python """ python docstring: demonstrate python cli + ecFlow task wrapper + task loader $manual DESCRIPTION: sweeper: find out min/max among modeleps_nemo tasks OPERATORS: please, set complete if problematic ANALYST: example as pure python task - job $end $comment comments can be added ... $end """ import time import os.path import getopt import sys PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow" sys.path.insert(0, PATH) import ecflow as ec MICRO = "$$" # double dollar to please ecFlow micro character balance TARGET_TASK = "modeleps_nemo" # use option -t to overwrite TARGET_METER = "step" # would this need an option??? MC_STEPS_LIST = ("/mc/main/12/legA/fc", # use option -p for single path "/mc/main/12/legB/fc", "/mc/main/18bc/legA/fc", "/mc/main/00/legA/fc", "/mc/main/00/legB/fc", "/mc/main/06bc/legA/fc", "/mofc/thu/01/legC/fc", "/mofc/thu/hind/14/back", "/mofc/mon/01/legC/fc", "/mofc/mon/hind/14/back",) def focus_below(node): """ continue down below nodes which name is expected ; please update for external use... """ return (node.name() in ("00", "12", "18bc", "06bc", "legA", "legB", "pf", "cf", "fc") or "main" == node.name()) def process(item, path=None, low=999, high=-1, task=None): """ track nodes to follow and update min/max """ if type(item) == str: defs = ec.Defs(item) elif isinstance(item, ec.Client): item.sync_local() defs = item.get_defs() elif isinstance(item, ec.Defs): defs = item else: defs = None if defs: for node in defs.suites: if node.name() != str(path.split('/')[1]): continue status = "%s" % node.get_state() if node.is_suspended() or status == "unknown": continue for item in node.nodes: low, high = process(item, path, low, high, task) elif isinstance(item, ec.Family): for node in item.nodes: if focus_below(node): return process(node, path, low, high, task) low, high = process(node, path, low, high, task) elif isinstance(item, ec.Task): node = item path2node = item.get_abs_node_path() status = "%s" % node.get_state() # ignore cal-val # ecmwf specific if ("/cv/" in path2node or not task in item.name()): return low, high if "/cv/" in path2node and (high > 240 or low < -1): high = 240 return low, high stamp = "" evt = "" meter = "" if node.is_suspended(): status = "suspended" try: for att in node.meters: meter = "%s" % att.value() if att.name() != TARGET_METER: continue if att.value() > high: high = att.value() elif att.value() < low: low = att.value() except Exception as excpt: print("#! problem with line:", excpt) # name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter) else: print type(item) return low, high def create_task(): """ add ecFlow task into a family...""" sys.path.append("/home/ma/emos/o/def") import ecf steps = 360 return ecf.Task("sweeper").add( ecf.Variables( ECF_FILES=os.getenv("ECF_FDI", None), ECF_EXTN=".py", ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &", ECF_MICRO=MICRO[0], ), ecf.Label("info", ""), ecf.Defcomplete(), ecf.Meter("min", -1, steps, steps), ecf.Meter("max", -1, steps, steps),) def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)): """ replace sweeper task in test/oper mode """ sys.path.append("/home/ma/emos/o/def") import ecf if test: env = {"host": os.getenv("ECF_TEST_HOST", None), "port": os.getenv("ECF_TEST_PORT", None), "path": "/admine/steps", } else: env = {"host": os.getenv("ECF_OPER_HOST", None), "port": os.getenv("ECF_OPER_PORT", None), "path": "/admin/steps", } defs = ecf.Defs() path = env["path"] sname = str(path.split('/')[1]) fname = str(path.split('/')[2]) suite = ecf.Suite(sname).add( ecf.Extern(MC_STEPS_LIST, defs), ecf.Family(fname).add(create_task().add( ecf.Cron("04:30 23:59 03:00"), ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"), ecf.Variables(WSHOST=host, QUEUE="test"), ))) defs.add_suite(suite) print "#MSG: replacing", path, env["host"], env["port"] ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1) class Sweep(object): """ find min/max meter value below a node """ def __init__(self, host=None, port=None, path=None, task=None): import signal if host is None: host = os.getenv("ECF_NODE", "$ECF_NODE$") if port is None: port = int(os.getenv("ECF_PORT", "$ECF_PORT$")) self.clt = ec.Client(host, port) self.cl2 = None if path is None: self.path = MC_STEPS_LIST else: self.path = (path) if task is None: self.task = TARGET_TASK else: self.task = task # shall make sense when processed into job by ecflow # name remains when not processed... ecfv = {"ECF_NODE": "$ECF_NODE$", "ECF_PASS": "$ECF_PASS$", "ECF_NAME": "$ECF_NAME$", "ECF_PORT": "$ECF_PORT$", "ECF_TRYNO": "$ECF_TRYNO$", } for key, val in ecfv.items(): if key in val: ecfv[key] = os.getenv(key, None) if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]: print "#MSG will communicated with server..." print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid()) self.cl2 = ec.Client() self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"]) self.cl2.set_child_pid(os.getpid()) self.cl2.set_child_path(ecfv["ECF_NAME"]) self.cl2.set_child_password(ecfv["ECF_PASS"]) self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"])) self.cl2.child_init() self.cl2.set_child_timeout(20) for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGILL, signal.SIGTRAP, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGUSR1, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM, signal.SIGXCPU, signal.SIGPWR): signal.signal(sig, self.signal_handler) def signal_handler(self, signum, frame): """ catch signal """ print 'Aborting: Signal handler called with signal ', signum self.cl2.child_abort("Signal handler called with signal " + str(signum)) def report(self, msg, meter=None): """ communicate with ecFlow server """ if not self.cl2: return if meter: self.cl2.child_meter(msg, meter) elif msg == "stop": self.cl2.child_complete() sys.exit(0) else: self.cl2.child_label("info", msg) def update(self): """ refresh status tree asking ecFlow server """ num = 20 leg = "legA" for path in self.path: first = 1 low = 0 high = 0 self.clt.ch_register(False, [str(path.split('/')[1])]) if "/back" in path: path += "/%02d/%s" % (num, leg) while 1: clt = self.clt status = "" try: if clt.news_local() or first: # has the server changed clt.sync_local() first = 0 defs = clt.get_defs() node = defs.find_abs_node(path) if node: status = "%s" % node.get_state() low, high = process(node, self.path, 999, -1, self.task) sleep = 30 else: sleep = 90 msg = "... %ds %s %s %d %d"% (sleep, path, status, low, high) print msg if type(low) != type(high): print type(low), type(high) if status == "complete": if leg == "legB": num -= 1 leg = "legA" else: leg = "legB" break elif low == high and status in ( "queued", "aborted", "complete") or node is None: if node: msg = "# %s %s" % (node.get_abs_node_path(), status) print msg, self.report(msg) break else: self.report(msg) self.report("min", low) self.report("max", high) sys.stdout.flush() time.sleep(sleep) except RuntimeError, exp: print str(exp) def usage(): """ help """ print """client -o: operational node is to be replaced, default is test node, provide this option BEFORE -r -r: replace task node -p: path to look below, by default internal list MC_STEPS_LIST will be used -h: this help ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e """ if __name__ == '__main__': try: OPTS, ARGS = getopt.getopt( sys.argv[1:], "hep:rot:", ["help", "ens", "path", "replace", "oper", "task"]) except getopt.GetoptError as err: print "# what?", usage() sys.exit(2) TEST = 1 PATH = None TASK = None for o, a in OPTS: if o in ("-e", "--ens", ): Sweep(path=PATH).update() sys.exit(0) elif o in ("-r", "--replace", ): loader(TEST) sys.exit(0) elif o in ("-p", "--path", ): PATH = a elif o in ("-o", "--oper", ): TEST = 0 elif o in ("-t", "--task", ): TASK = a CLT = Sweep(path=PATH, task=TASK) CLT.update() CLT.report("stop")
Sweeper is used in operation to start product generation earlier.