- Created by Axel Bonet, last modified on Apr 20, 2020
An ecFlow Python Client can be used to solve a Fault Tolerance request, for example, when three out of five families or tasks are enough to carry on submitting new jobs.
ecflow3of5.py Expand source
#!/usr/bin/env python """ ./ecflow3of5.py 63 /e_41r2/main/12/prod """ import ecflow import os import sys import time host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecflow.Client(host, port) wait = False; wait = True interval = 30 outof5 = int(sys.argv[1]) node_path = sys.argv[2] def stop(msg, num): print msg; sys.exit(num) while 1: tot = 0 count = 0 done = True client.sync_local() node = client.get_defs().find_abs_node(node_path) if node is None: stop("node not found!!!", 1) for item in node.nodes: count += 1 status = "%s" % item.get_state() # print item.get_abs_node_path(), status, outof5, tot if status == "complete": tot += 1 if tot >= outof5: stop("# OK", 0) elif status == "aborted": pass else: done = False if count < outof5: stop("# Impossible: %d < %d" % (count, outof5), 1) if done: stop("# KO %d" % tot, 1) print "# still possible", if wait: print "...", tot, outof5, count time.sleep(interval); else: stop("", -1)
Such client can be used as an ecFlow task wrapper, with few more lines.
tasked client Expand source
#!/usr/bin/env python """ an example for a simple python client to help ecFlow scheduling + unit test + task loading in the suite $manual example $end $comment a comment in a comment $end """ import ecflow import getopt import os import sys import time import unittest sys.path.append("/home/ma/emos/def/o/def") # ecf can be imported ... MICRO = "$$" # keep ecFlow pleased with micro character balance def usage(): print sys.argv[0], """ -n <num> -p <path_to_node> -h # help -i <number> # sleep interval for sleep mode -t # unit test -r # load the task definition in a suite, associated with -p option -w # activate sleep mode """ for num, val in enumerate(sys.argv): print num, val sys.exit(2) def stop(msg, num, child=None): print msg; if child: child.report(msg) if num == 0: child.report("stop") sys.stdout.flush() sys.stderr.flush() sys.exit(num) class Child(object): """ /var/tmp/map/work/p4/metapps/suites/o/admin/ecflow3of5_prod.py -i 30 -w -p /test3outof5/fam -n 3 """ def __init__(self): import signal env = { "ECF_NODE": "$ECF_NODE$", # check can be on a server, child on another... "ECF_PASS": "$ECF_PASS$", "ECF_NAME": "$ECF_NAME$", "ECF_PORT": "$ECF_PORT$", "ECF_TRYNO": "$ECF_TRYNO$", } self.client = None if MICRO[0] in env["ECF_PORT"]: print "# cli mode" return print "#MSG: will communicate with server..." print "#MSG: kill: ssh %s kill -15 %d" % (os.getenv("HOST","localhost"), os.getpid()) self.client = ecflow.Client() self.client.set_host_port(env["ECF_NODE"], int(env["ECF_PORT"])) self.client.set_child_pid(os.getpid()) self.client.set_child_path(env["ECF_NAME"]) self.client.set_child_password(env["ECF_PASS"]) self.client.set_child_try_no(int(env["ECF_TRYNO"])) self.client.child_init() self.client.set_child_timeout(20) for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGILL, signal.SIGTRAP, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGUSR1, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM, signal.SIGXCPU, signal.SIGPWR): signal.signal(sig, self.signal_handler) def signal_handler(self, signum, frame): """ catch signal """ print 'Aborting: Signal handler called with signal ', signum if self.client: self.client.child_abort("Signal handler called with signal " + str(signum)) sys.exit(0) def __exit__(self, exc_type, exc_value, traceback): if self.client: self.client.child_abort() sys.exit(0) def report(self, msg, meter=None): """ communicate with ecFlow server """ if not self.client: return elif meter: self.client.child_meter(msg, meter) elif msg == "stop": self.client.child_complete() self.client = None else: self.client.child_label("info", msg) def check3of5(outof5, node_path, wait=True, interval=30): host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecflow.Client(host, port) client.ch_register(False, [ str(node_path.split('/')[1]) ]) child = Child() # choice? might be global variable ??? # raise BaseException() while 1: tot = 0 count = 0 done = True client.sync_local() node = client.get_defs().find_abs_node(node_path) if node is None: stop("node not found!!!", 1, child) for item in node.nodes: count += 1 status = "%s" % item.get_state() # print item.get_abs_node_path(), status, outof5, tot if status == "complete": tot += 1 if tot >= outof5: stop("# OK", 0, child) elif status == "aborted": pass else: done = False if count < outof5: stop("# Impossible %d %d" % (count, outof5), 1, child) if done: stop("# KO %d %d %d" % (tot, outof5, count), 1, child) print "# still possible", if wait: print "...", tot, outof5, count child.report("... %d %d %d" % (tot, outof5, count)) time.sleep(interval); else: stop("", -1, child) def task_check(): from ecf import Task, Label, Variables pwd = os.getcwd() name = sys.argv[0] if '/' in name: name = name.split('/')[-1] if '.' in name: name = name.split('.')[0] return Task(name).add( Label("info", "this task uses a python-script directly, no job..."), Variables(ECF_MICRO = MICRO[0], ECF_FILES= pwd, ECF_HOME= pwd, ECF_EXTN= ".py", ECF_PASS= "FREE", # ECF_JOB_CMD= pwd + "/%s -i $INTERVAL$ -w" % name # ECF_JOB_CMD= "python $ECF_JOB$ -i $INTERVAL$ -w" ECF_JOB_CMD= "python $ECF_JOB$ -w" + " -n 3 -p /$SUITE$/fam" + " > $ECF_JOBOUT$ 2>&1", INTERVAL=30, ), ) class Test3outof5(unittest.TestCase): """ a test case """ def test_1(self, test_ok=1): """ a test """ import ecf from ecf import Task, Label, Variables, Suite, Defs, Family, Trigger defs = Defs() sname = "test3outof5" cargo = task_check() defs.add_suite( Suite(sname).add( # ecf.Defstatus("suspended"), Family("fam").add( ecf.Defstatus("suspended"), [ Task(name) for name in ("a", "b", "c", "d", "e") ]), cargo.add(Trigger("fam != queued")), # trigger as string Family("user").add( # trigger with python == and obj: Trigger(cargo.name() == ecf.COMPLETE), Task("dummy").add( ecf.Complete("1==1")) ) ) ) client = replace("/%s" % sname, defs) client.begin_suite(sname) client.force_state("/%s/fam/a" % sname, ecflow.State.complete) client.force_state("/%s/fam/b" % sname, ecflow.State.aborted) client.force_state("/%s/fam/c" % sname, ecflow.State.complete) if test_ok: client.force_state("/%s/fam/d" % sname, ecflow.State.active) else: client.force_state("/%s/fam/d" % sname, ecflow.State.aborted) if test_ok: client.force_state("/%s/fam/e" % sname, ecflow.State.complete) else: client.force_state("/%s/fam/e" % sname, ecflow.State.aborted) # def test_2(self): self.test_1(0) def replace(path, defs=None): import ecf host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecf.Client(host, port) # ecf.Client(host, port).replace(path, defs, 1, 1) client.replace(path, defs, 1, 1) return client if __name__ == '__main__': try: OPTS, ARGS = getopt.getopt( sys.argv[1:], "hi:n:p:rtw", ["help", "interval", "number", "path", "replace", "test", "wait"]) except getopt.GetoptError as err: print "#what?", usage() INTERVAL = 30 WAIT = False PATH = None NUM = None REPLACE = False for o, a in OPTS: if o in ("-h", "--help"): usage() elif o in ("-i", "--interval"): INTERVAL = int(a) elif o in ("-n", "--number"): NUM = int(a) elif o in ("-p", "--path"): PATH = a elif o in ("-r", "--replace"): REPLACE = True elif o in ("-t", "--test"): unittest.main(argv=[sys.argv[0]]) sys.exit(0) elif o in ("-w", "--wait"): WAIT = True if NUM and PATH: check3of5(NUM, PATH, WAIT, INTERVAL) elif PATH and REPLACE: replace(PATH) else: usage()