ecFlow's documentation is now on readthedocs!

Thanks to python syntax, provided there is no need to use include to merge code, one single python script can be used on the command line, but also as a task wrapper with no change, and it may provide the ability to load the task into the expected server/suite for operation and test.

meter sweeper
#!/usr/bin/env python

"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader

$manual
DESCRIPTION:

sweeper:

find out min/max among modeleps_nemo tasks

OPERATORS: please, set complete if problematic

ANALYST: example as pure python task - job
$end

$comment
  comments can be added ...
$end

"""
import time
import os.path
import getopt
import sys

PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec

MICRO = "$$" # double dollar to please ecFlow micro character balance
TARGET_TASK = "modeleps_nemo"             # use option -t to overwrite
TARGET_METER = "step"                     # would this need an option???
MC_STEPS_LIST = ("/mc/main/12/legA/fc",   # use option -p for single path
                 "/mc/main/12/legB/fc",
                 "/mc/main/18bc/legA/fc",
                 "/mc/main/00/legA/fc",
                 "/mc/main/00/legB/fc",
                 "/mc/main/06bc/legA/fc",
                 "/mofc/thu/01/legC/fc",
                 "/mofc/thu/hind/14/back",
                 "/mofc/mon/01/legC/fc",
                 "/mofc/mon/hind/14/back",)

def focus_below(node):
    """ continue down below nodes which name is expected ;
    please update for external use...
    """
    return (node.name() in ("00", "12", "18bc", "06bc",
                            "legA", "legB", "pf", "cf", "fc")
            or "main" == node.name())

def process(item, path=None, low=999, high=-1, task=None):
    """ track nodes to follow and update min/max """
    if type(item) == str:
        defs = ec.Defs(item)
    elif isinstance(item, ec.Client):
        item.sync_local()
        defs = item.get_defs()
    elif isinstance(item, ec.Defs):
        defs = item
    else: defs = None

    if defs:
        for node in defs.suites:
            if node.name() != str(path.split('/')[1]):
                continue
            status = "%s" % node.get_state()
            if node.is_suspended() or status == "unknown":
                continue
            for item in node.nodes:
                low, high = process(item, path, low, high, task)

    elif isinstance(item, ec.Family):
        for node in item.nodes:
            if focus_below(node):
                return process(node, path, low, high, task)
            low, high = process(node, path, low, high, task)

    elif isinstance(item, ec.Task):
        node = item
        path2node = item.get_abs_node_path()
        status = "%s" % node.get_state()

        # ignore cal-val # ecmwf specific
        if ("/cv/" in path2node or
            not task in item.name()): return low, high
        if "/cv/" in path2node and (high > 240 or low < -1):
            high = 240
            return low, high

        stamp = ""
        evt = ""
        meter = ""
        if node.is_suspended():
            status = "suspended"
        try:
            for att in node.meters:
                meter = "%s" % att.value()
                if att.name() != TARGET_METER:
                    continue
                if   att.value() > high:
                    high = att.value()
                elif att.value() < low:
                    low = att.value()

        except Exception as excpt:
            print("#! problem with line:", excpt)
        # name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter)

    else: print type(item)

    return low, high

def create_task():
    """ add ecFlow task into a family..."""
    sys.path.append("/home/ma/emos/o/def")
    import ecf
    steps = 360
    return ecf.Task("sweeper").add(
        ecf.Variables(
            ECF_FILES=os.getenv("ECF_FDI", None),
            ECF_EXTN=".py",
            ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &",
            ECF_MICRO=MICRO[0],
        ),
        ecf.Label("info", ""),
        ecf.Defcomplete(),
        ecf.Meter("min", -1, steps, steps),
        ecf.Meter("max", -1, steps, steps),)

def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)):
    """ replace sweeper task in test/oper mode """
    sys.path.append("/home/ma/emos/o/def")
    import ecf

    if test:
        env = {"host": os.getenv("ECF_TEST_HOST", None),
               "port": os.getenv("ECF_TEST_PORT", None),
               "path": "/admine/steps", }
    else:
        env = {"host": os.getenv("ECF_OPER_HOST", None),
               "port": os.getenv("ECF_OPER_PORT", None),
               "path": "/admin/steps", }
    defs = ecf.Defs()

    path = env["path"]
    sname = str(path.split('/')[1])
    fname = str(path.split('/')[2])

    suite = ecf.Suite(sname).add(
        ecf.Extern(MC_STEPS_LIST, defs),

        ecf.Family(fname).add(create_task().add(
            ecf.Cron("04:30 23:59 03:00"),
            ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"),
            ecf.Variables(WSHOST=host,
                          QUEUE="test"),
        )))

    defs.add_suite(suite)

    print "#MSG: replacing", path, env["host"], env["port"]
    ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)


class Sweep(object):
    """ find min/max meter value below a node """

    def __init__(self, host=None, port=None, path=None, task=None):
        import signal
        if host is None:
            host = os.getenv("ECF_NODE", "$ECF_NODE$")
        if port is None:
            port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
        self.clt = ec.Client(host, port)
        self.cl2 = None
        if path is None:
            self.path = MC_STEPS_LIST
        else: self.path = (path)
        if task is None:
            self.task = TARGET_TASK
        else: self.task = task

        # shall make sense when processed into job by ecflow
        # name remains when not processed...
        ecfv = {"ECF_NODE": "$ECF_NODE$",
                "ECF_PASS": "$ECF_PASS$",
                "ECF_NAME": "$ECF_NAME$",
                "ECF_PORT": "$ECF_PORT$",
                "ECF_TRYNO": "$ECF_TRYNO$",        }

        for key, val in ecfv.items():
            if key in val:
                ecfv[key] = os.getenv(key, None)
        if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
            print "#MSG will communicated with server..."
            print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid())
            self.cl2 = ec.Client()
            self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
            self.cl2.set_child_pid(os.getpid())
            self.cl2.set_child_path(ecfv["ECF_NAME"])
            self.cl2.set_child_password(ecfv["ECF_PASS"])
            self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
            self.cl2.child_init()
            self.cl2.set_child_timeout(20)

            for sig in (signal.SIGINT,
                        signal.SIGHUP,
                        signal.SIGQUIT,
                        signal.SIGILL,
                        signal.SIGTRAP,
                        signal.SIGIOT,
                        signal.SIGBUS,
                        signal.SIGFPE,
                        signal.SIGUSR1,
                        signal.SIGUSR2,
                        signal.SIGPIPE,
                        signal.SIGTERM,
                        signal.SIGXCPU,
                        signal.SIGPWR):
                signal.signal(sig, self.signal_handler)

    def signal_handler(self, signum, frame):
        """ catch signal """
        print 'Aborting: Signal handler called with signal ', signum
        self.cl2.child_abort("Signal handler called with signal " + str(signum))

    def report(self, msg, meter=None):
        """ communicate with ecFlow server """
        if not self.cl2:
            return
        if meter:
            self.cl2.child_meter(msg, meter)
        elif msg == "stop":
            self.cl2.child_complete()
            sys.exit(0)
        else: self.cl2.child_label("info", msg)

    def update(self):
        """ refresh status tree asking ecFlow server """
        num = 20
        leg = "legA"
        for path in self.path:
            first = 1
            low = 0
            high = 0
            self.clt.ch_register(False, [str(path.split('/')[1])])
            if "/back" in path:
                path += "/%02d/%s" % (num, leg)

            while 1:
                clt = self.clt
                status = ""
                try:
                    if clt.news_local() or first: # has the server changed
                        clt.sync_local()
                        first = 0
                        defs = clt.get_defs()
                        node = defs.find_abs_node(path)
                        if node:
                            status = "%s" % node.get_state()
                        low, high = process(node, self.path, 999, -1, self.task)
                        sleep = 30
                    else: sleep = 90
                    msg = "... %ds %s %s %d %d"% (sleep, path, status,
                                                  low, high)
                    print msg
                    if type(low) != type(high):
                        print type(low), type(high)

                    if status == "complete":
                        if leg == "legB":
                            num -= 1
                            leg = "legA"
                        else: leg = "legB"
                        break

                    elif low == high and status in (
                            "queued", "aborted", "complete") or node is None:
                        if node:
                            msg = "# %s %s" % (node.get_abs_node_path(), status)
                            print msg,
                            self.report(msg)
                        break
                    else:
                        self.report(msg)
                        self.report("min", low)
                        self.report("max", high)
                    sys.stdout.flush()
                    time.sleep(sleep)
                except RuntimeError, exp:
                    print str(exp)

def usage():
    """ help """
    print """client
  -o: operational node is to be replaced, default is test node,
      provide this option BEFORE -r
  -r: replace task node
  -p: path to look below, by default internal list MC_STEPS_LIST will be used
  -h: this help

ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e

"""

if __name__ == '__main__':
    try:
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hep:rot:",
            ["help", "ens", "path", "replace", "oper", "task"])
    except getopt.GetoptError as err:
        print "# what?", usage()
        sys.exit(2)

    TEST = 1
    PATH = None
    TASK = None
    for o, a in OPTS:
        if o in ("-e", "--ens", ):
            Sweep(path=PATH).update()
            sys.exit(0)
        elif o in ("-r", "--replace", ):
            loader(TEST)
            sys.exit(0)
        elif o in ("-p", "--path", ):
            PATH = a
        elif o in ("-o", "--oper", ):
            TEST = 0
        elif o in ("-t", "--task", ):
            TASK = a
    CLT = Sweep(path=PATH, task=TASK)
    CLT.update()
    CLT.report("stop")

Sweeper is used in operation to start product generation earlier.