- Created by Daniel Varela Santoalla, last modified by Avi Bahra on Jul 17, 2020
Here is an example of a Python Suite Definition, hosting a task that might be helpful for simple SMS suite translation to ecFlow:
course.py Expand source
#!/usr/bin/env python """ suite builder example for ecFlow course """ import sys, os sys.path.append('/home/ma/emos/def/ecflow') import ecf as ec from ecf import * import inc_emos as ie # provides Seed class + system related dependencies # cd ~map/course/201303/ecflow; python course.py # task wrappers underneath # consume: choices for a family matching the producer-consumer pattern # local family + remote family + BEWARE ECF_OUT + log-server example # barber: an example of a "dynamical suite", with a "family producer task" # perl + python: example that task wrapper may not be ksh/bash scripts import time from datetime import date import argparse today = str(date.today()).replace('-', '') ############################################################################ class GenericFamily(object): """ provide structure for derived classes""" def make(self, node): return node def main(self, node): return BaseException # return node def arch(self, node): return node ############################################################################ class NativePerl(GenericFamily): """ ksh is not the only language for task wrappers""" def __init__(self): self.name = "perl" def main(self, node): tsk = Task(self.name).add( Variables(ECF_MICRO= "^", ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1"), Meter("step", -1, 100), Event("1"), Event("2"), Label("info", "none"), ) node.add(tsk) ############################################################################ class NativePython(NativePerl): """ ksh is not the only language for task wrappers""" def __init__(self): super(NativePerl, self).__init__() self.name = "python" ############################################################################ def _kind(prod=1, cons=1): return Variables(CONSUME= cons, PRODUCE=prod) def _evt(): return (Event("p"), Event("c")) def _leaf(name="produce", init=0, stop=100, step=1): add = None if type(stop) == int: add = Meter("step", -1, int(stop)) return Task("%s" % name).add( _evt(), add, Variables(INIT= init, STOP= stop, STEP= step),) ############################################################################ class FallBack(GenericFamily): """ in some situation, user may want its family to continue, and repeat to increment, even when some tasks abort """ def main(self, node=None): return Family("daily").repeat(name="YMD", start=today, end=DATE_STOP).add( Task("action").add(Time("10:00")), Family("loop").add( Time("11:00"), Task("dummy").add( TriggerImpossible(), Complete("1==1"))), Task("fallback").add( Label("info", "force complete when action is aborted"), Time("10:55"), Trigger("action eq aborted"), Complete("action eq complete"))) class DailyInc(GenericFamily): """ anopther method to have daily repeat increment, with aborted tasks""" def main(self, node=None): return Family("daily_inc").repeat(name="YMD", start=today, end=DATE_STOP).add( Label("info", "requeue will reset repeat attribute!"), Complete("daily_inc/loop/dummy eq complete"), Task("action").add(Time("10:00")), Family("loop").add( Time("11:00"), Task("dummy").add(TriggerImpossible(), Complete("1==1")))) class Consume(GenericFamily): """ producer-consumer pattern can be implemented in many ways""" def __init__(self): self.init = 0 self.stop = 48 self.step = 3 def main(self, node): """ pass the parent node, so that absolute paths can be used with triggers""" path = node.fullname() top = node.family("consume").add( Variables(SLEEP= 10, PRODUCE= 1, # default: tasks will do both CONSUME= 1), Family("limits").add(Defcomplete(), Limit("consume", 7)), Task("leader").add( Label("info", "set event to get produce1 leader"), Event("1"), # set/cleared by user Defcomplete()), # task does both, ie serial ########################### _leaf("produce", self.init, self.stop, self.step).add( Label("info", "both produce and consume in a task")), # meter will report about producer progress ########### Family("produce0").add( Label("info", "only produce"), _kind(1, 0), _leaf("produce", self.init, self.stop, self.step)), # serialy produced, create a new task for each step ### Family("produce1").add( _kind(1, 0), Label("info", "repeat, one job per step"), _leaf("produce", init="%STEP%", stop="%STEP%", step=1).add( Meter("step", -1, 100)))\ .repeat(kind="integer", name="STEP", start=self.init, end =self.stop, step =self.step).add() ) top.defstatus("suspended") fam = Family("produce2").add( # parallel _kind(1, 0), Label("info", "limited, one task per step, step by 3"), Limit("prod", 5), InLimit("produce2:prod")) top.add(fam) for step in xrange(self.init, self.stop, self.step): fam.add(Family("%02d" % step).add( Variables(STEP= step), _leaf("produce", step, step, 1))) ###################### lead = path + "/consume/leader:1" prod = path + "/consume/produce" top.add( ### trigger may be inside a task _leaf("consume", self.init, self.stop, self.step).add( Label("info", "trigger may be inside a task"), _kind(0, 1), InLimit("limits:consume"), Variables(CALL_WAITER= 1, SLEEP= 3, # sleep less than producer TRIGGER_EXPRESSION= prod + ":step ge $step or " + prod + " eq complete",)), Family("consume1").add( Label("info", "explicit trigger, follow faster"), _kind(0, 1), Trigger("(consume1:STEP lt %s1:STEP and %s) or " % (prod, lead) + "(consume1:STEP lt %s0/produce:step and not %s) or " % (prod, lead) + # lt while both are repeat "(%s1 eq complete and %s0 eq complete)" % (prod, prod) ), InLimit("limits:consume"), _leaf("consume", "%STEP%", "%STEP%", 1), ).repeat(kind="integer", name="STEP", start=self.init, end=self.stop, step=self.step)) fam = Family("consume2").add( # parallel Label("info", "one task per step, step by three"), _kind(0, 1), Limit("consume", 5), InLimit("consume2:consume")) top.add(fam) for step in xrange(self.init, self.stop, self.step): fam.add(Family("%02d" % step).add( Variables(STEP = step), Trigger("(%02d:STEP le %s1:STEP and %s) or " % (step, prod, lead) + "(%02d:STEP le %s0/produce:step and not %s)" % (step, prod, lead)), _leaf("consume", init=step, stop=step, step=1))) ############################################################################ class Barber(GenericFamily): """ a 'barber shop' example with families created by a task """ def _passer_by(self): """ generator """ return Task("passby").add( Time("00:00 23:59 00:05"), Variables(ID=0), Label("info", ""), Label("rem", "this task alters its variable ID, " + "aliases won't work natively"), InLimit("limits:passby")) def _client(self, node, position): """ python version of the family created initialy attention: raw definition file is located in passby task wrapper""" path = node.fullname() + "/limits" fam = node.family("list").family("%s" % position).add( AutoCancel(1), Task("cut").inlimit(path + ":barbers"), Task("pay").add( Trigger("cut eq complete"), InLimit(path + ":barbers"), InLimit(path + ":cashiers")), Task("leave").add( Label("info", ""), Trigger(["cut", "pay"]))) fam.defstatus("complete") def _shop(self, node): fam = node.family("shop").defstatus("suspended").add( Variables(NB_CHAIRS= 4), Family("limits").add(Defcomplete(), Limit("passby", 1), Limit("barbers", 2), Limit("cashiers", 1)), self._passer_by(), ) self._client(fam, 1), def main(self, node): self._shop(node) ############################################################################ def user(): return os.getlogin() def locate_scripts(): pwd = os.getcwd() return Variables( ECF_HOME= "/tmp/%s/ecflow/" % user(), # pwd, ECF_FILES= pwd + "/scripts", ECF_INCLUDE= pwd + "/include", ) DATE_STOP = 20300115 class Course(ie.Seed): """ host families together """ def __init__(self): super(Course, self).__init__() self.name = "course" def suite(self): """ define limits (empty) """ node = Suite(user()) node.defstatus("suspended").add( Variables(USER= user()), locate_scripts()) # self.top(node) fp = open("/tmp/%s/" % user() + self.name + ".def", "w") print >> fp, node return node def top(self, node): barber_shop = Barber() perl = NativePerl() python = NativePython() consume = Consume() with node.family(self.name) as node: node.add(FallBack().main(), DailyInc().main()) barber_shop.main(node) perl.main(node) python.main(node) consume.main(node) return node ############################################################################### class Admin(Course): """ host newlog task + logsvr start/stop/check task -- server logs can be renewed with a ecflowview menu command also """ def __init__(self): self.name = "admin" def top(self, node): with node.family("admin") as node: node.add(self.main()).repeat(name="YMD", start=today, end=DATE_STOP) node.defstatus("suspended") return node def main(self): """ return self contained Family/Task, without absolute node references or with relative path triggers""" remote_submit = "rsh -l %USER% %HOST% %ECF_JOB% > %ECF_JOBOUT% 2>&1" logpath = "/home/ma/map/course/201303/ecflow" return ( Task("newlog").add( Label("info", "renew server log-file"), Time("08:00")), Task("logsvr").add( Defcomplete(), Variables(HOST= "pikachu", ECF_LOGPORT=9316, ECF_LOGPATH= logpath, ECF_LOGMAP= logpath + ":" + logpath, ECF_JOB_CMD= remote_submit), Label("info", "(re)start the logsvr on HOST"), Time("08:00")), Family("loop").add( Time("08:30"), Family("dummy").add(# TriggerImpossible(), Complete("1==1")))) ############################################################################### class EcEvents(Admin): """ connecting to third party software as event generator to update a suite variable, and enable daily family run """ def top(self, node): node = node.family("ecevents") node.add( Label("info", "use web... menu"), Defcomplete(), Variables( URL= "http://eccmd.ecmwf.int:8090/#Mainpanel", ECF_URL_CMD= "${BROWSER:=firefox} -remote 'openURL(%URL%")) self.main(node) return node def main(self, node): for mode in ["list", "register", "delete", "register_all", "delete_all"]: added = None if "list" in mode: added = Label("regs", "") fam = Family(mode).add( Variables(MODE= mode), Task("ecjobs").add(Label("info", "none"), added)) node.add(fam) if "_all" in mode: fam.add(Variables(EVENT= "_all_")) elif mode in ("register", "delete"): fam.add(Variables(EVENT= "an12h000"), Label("info", "update EVENT variable")) event = "an00h000" node.family("ref").defstatus("complete").add( Task(event).add(Variables(YMD= today), Label("YMD", today))) node.family("user").family(event).repeat( name="YMD", start=today, end=DATE_STOP).add( Label("info", "extern cannot be used anymore for " + "intra-suite reference triggers"), Variables(SLEEP= 1), Trigger(event + ":YMD le %s/ref/%s:YMD" % (node.fullname(), event)), _leaf("consume")) return node ############################################################################### class SerialTask(object): """ add trigger on the previous task """ def __init__(self): self.prev = None def add(self, name): fam = Family(name).add( Variables(MODE= name), Task("to_ecflow")) if self.prev != None: fam.add(Trigger("./%s eq complete" % self.prev)) self.prev = name return fam class Reload(Admin): """ a simple task to download SMS content and translate it to ecFlow """ def top(self, node): node = node.family("reload") node.add( Label("info", "from sms to ecFlow"), Defcomplete(), Variables( URL= "https://confluence.ecmwf.int/display/ECFLOW/Home", ECF_URL_CMD= "${BROWSER:=firefox} -remote 'openURL(%URL%"), self.main()) return node def main(self, node=None): fam = Family("reload") serial = SerialTask() fam.add( serial.add("get").add( Variables( SMS_SUITE= "eras", # SMS_PROG= 314159, SMS_NODE= "localhost", # SMS_PROG= 314199, SMS_NODE= "marhaus", # eras SMS_PROG= 314197, SMS_NODE= "marhaus", # eras2 ),), serial.add("translate"), serial.add("edit"), serial.add("mail"), serial.add("load"), serial.add("bench").add(Defcomplete())) if node is not None: return node.add(fam) return fam ############################################################################ if __name__ == '__main__': import cli_proc, sys parser = argparse.ArgumentParser( description=DESC, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--host", default="localhost", help= "server hostname") parser.add_argument("--port", default="3141", help= "server port number") if len(sys.argv) > 1: suites = { "course": Course(), } argv = cli_proc.process(suites, compare=False) sys.exit(0) args = parser.parse_args() clt = ec.Client(args.host, args.port) try: clt.ping() except: clt = ec.Client("localhost", 1000 + os.geteuid()) try: clt.ping() except: clt = ec.Client("localhost", 31415) try: clt.ping() except: clt = ec.Client("localhost", 3199) defs = ec.ecflow.Defs() course = Course() suite = course.suite().add(Reload().main()) if 1: # enable/disable remote version of 'consume' rem = suite.family("remote").add( Task("mk_remote").add( Defcomplete(), Label("info", "do not forget to create directory structure" + "on remote host for job output creation"), ie.onws(host = "class01")), Variables(ECF_HOME= "/tmp/%s/ecflow" % user()), ie.onws(host = "class02")) course.top(rem) # print clt.stats() Admin().top(suite) EcEvents().top(suite) defs.add_suite(suite) path = "/%s" % user() # path = "/%s/course" % user() # path = "/%s/admin" % user() # path = "/%s/ecevents" % user() # print defs clt.replace(path, defs) sys.exit (0)
A simple tar file can be downloaded to get script wrappers and include files (~map/course/201303/course2013ex.tgz).
Some may benefit from the file ecf.py that shall be distributed with ecflow package to facilitate large suite definition.
ecf.py Expand source
#!/usr/bin/env python # This software is provided under the ECMWF standard software license agreement. """ a layer over raw ecflow api use 'export ECF_DEBUG_LEVEL=10' to remove warning message related to variables overwrite """ import pwd, os, unittest try: from ecflow import TimeSlot from ecflow import JobCreationCtrl as JobCreationCtrl from ecflow import ChildCmdType from ecflow import ZombieType, ZombieAttr, ZombieUserActionType except: print "# ecf.py cannot import few types" import ecflow ecflow.Ecf.set_debug_level(3) DEBUG = 0 # DECORATE = "ONLY_TRIGGER" # DECORATE = "NO_TRIGGER" # DECORATE = "ONLY_EVENT" # DECORATE = "NO_ATTRIBUTE" DECORATE = "ALL" USE_TIME = True USE_LATE = False USE_TRIGGER = True USE_LIMIT = True if DECORATE == "NO_TRIGGER": USE_TRIGGER = False USE_LIMIT = True USE_EVENT = True elif DECORATE == "ONLY_TRIGGER": USE_TRIGGER = True USE_LIMIT = False USE_EVENT = False elif DECORATE == "NO_ATTRIBUTE": USE_TRIGGER = False USE_LIMIT = False USE_EVENT = False elif DECORATE == "ONLY_EVENT": USE_TRIGGER = False USE_LIMIT = False USE_EVENT = True elif DECORATE == "ALL": USE_TRIGGER = True USE_LIMIT = True USE_EVENT = True else: raise BaseException def get_username(): return pwd.getpwuid( os.getuid() )[ 0 ] def get_uid(): return pwd.getpwnam(get_username()).pw_uid def translate(name, value=None): """ Translate from sms to ecflow """ import sms2ecf, sys def is_sms(): if sys.argv[-1] in ( # "localhost" "sms", "od3", "ode","pikachu","ibis","map", "od", "od2", ): sms2ecf.ECF_MODE = "sms" return True return sms2ecf.ECF_MODE == "sms" if is_sms(): sms2ecf.ECF_MODE = "sms" return sms2ecf.translate(name, value) return name, value class State: """ this class aims at affording a user the possibility to add Triggers as t1 = Task("t1") Task("ts").add(Trigger(t1 == COMPLETE)) SUBMITTED, ACTIVE, SUSPENDED, ABORTED, QUEUED, COMPLETE, UNKNOWN are instance of this class. """ def __init__(self, state): """ store the status """ self.state = str(state) def __str__(self): """ translate into string """ return "%s" % self.state def __eq__(self, arg): """ when == is used, we should care about task name starting with 0-9""" if type(arg) == str: add = "" if type(arg[0]) == int: add = "./" return add + arg + " == " + self.state elif isinstance(arg, ecflow.Node): return arg.get_abs_node_path() + " == " + self.state return False def __ne__(self, arg): """ aka != """ if type(arg) == str: add = "" if type(arg[0]) == int: add = "./" return add + arg + " != " + self.state elif isinstance(arg, ecflow.Node): return arg.get_abs_node_path() + " != " + self.state return False def value(self): """ return state """ return self.state def eval(self, node): """ return state """ return self.state == node.get_state() SUBMITTED = State("submitted") ACTIVE = State("active") SUSPENDED = State("suspended") ABORTED = State("aborted") QUEUED = State("queued") COMPLETE = State("complete") UNKNOWN = State("unknown") class Attribute(object): """ generic attribute to be attached to a node""" def add_to(self, node): """ use polymorphism to attach attribute to a node""" pass class Variable(Attribute, ecflow.Variable): """ filter variables to be attached to a node, may alter its name SMS-ECF""" def add_to(self, node): """ add_variable""" keyt, valt = translate(self.name(), self.value()) if "/tc1/emos_es" in valt: raise BaseException # FIXME node.add_variable(keyt, valt) class Label(Attribute, ecflow.Label): """ wrap around label""" def add_to(self, node): """ add_label""" node.add_label(self) class Meter(Attribute, ecflow.Meter): """ wrap around meter""" def add_to(self, node): """ add_meter""" node.add_meter(self) class Event(Attribute, ecflow.Event): """ wrap around event""" def add_to(self, node): """ add_event""" node.add_event(self) class InLimit(Attribute): """ a class to host a path for a limit silently ignore if USE_LIMIT is False, (in debug mode) """ def __init__(self, fullpath): self.data = None if USE_LIMIT: try: path, name = fullpath.split(":") except: name = fullpath; path = "" if name is None: raise BaseException self.data = ecflow.InLimit(name, path) self.path_ = path self.name_ = name def add_to(self, node): """ add_inlimit""" if USE_LIMIT and self.data is None: raise BaseException if USE_LIMIT and self.data is not None: node.add_inlimit(self.data) def value(self): """ get limit fullpath-name """ return self.path_ + ":" + self.name_ def name(self): """ get limit name """ return self.name_ class Inlimit(InLimit): def __init__(self, fullpath): super(Inlimit, self).__init__(fullpath) class Trigger(Attribute): """ add trigger (string, list of task names, or directly expression and: and'ed (True) or or'ed (False) unk: add or [name]==unknown for RD """ def __init__(self, expr, unk=False, anded=True): self.expr = None if expr is None: return if expr == "": return if type(expr) == str: # self.expr = ecflow.Expression(expr) self.expr = expr return if type(expr) == tuple: prep = list(expr) expr = prep if type(expr) == list: for index, name in enumerate(expr): if name == None: continue pre = "" if type(name) in (Node, Task, Family, Suite): fullname = name.fullname() name = fullname elif type(name) in (ecflow.Task, ecflow.Family): name = name.name() else: pass if name[0].isdigit(): pre = "./" # "0123456789": if ':' in name: item = pre + name else: item = pre + "%s == complete" % name if unk: item += " or %s%s==unknown" % (pre, name) else: pass if item is None: self.expr = None elif index == 0 or self.expr is None: self.expr = item elif item is None: return else: self.expr += " and %s" % item elif type(expr) in (ecflow.Expression, ecflow.PartExpression): self.expr = ecflow.Expression(str(item)) else: print type(expr) raise Exception("what? trigger?") def add_to(self, node): if not USE_TRIGGER: return if self.expr is None: return if node.get_trigger() is None: node.add_trigger(self.expr) else: node.add_part_trigger(ecflow.PartExpression(self.expr, True)) class TriggerAnd(Trigger): def __init__(self, expr, unk=False, anded=True): self.expr = expr if (not ":" in expr and not " eq " in expr and not "==" in expr): self.expr += " eq complete" def add_to(self, node): node.add_part_trigger(ecflow.PartExpression(self.expr, True)) class TriggerImpossible(Trigger): """ attribute to be added to node when it is not expected to run any task""" def __init__(self): """ add an 'impossible trigger', for a task not to run """ super(TriggerImpossible, self).__init__("1==0") class TriggerAlways(Trigger): """ attribute to be added to node when it is not expected to run any task""" def __init__(self): """ add an 'impossible trigger', for a task not to run """ super(TriggerAlways, self).__init__("1==1") class Complete(Trigger): """ class to host complete expression, added later to a node""" def __init__(self, expression, unk=False, anded=False): super(Complete, self).__init__(expression, unk, anded) def add_to(self, node): if USE_TRIGGER and self.expr is not None: node.add_complete(self.expr) class Clock(Attribute): """ wrapper to add clock """ def __init__(self, arg): if type(arg) == str: hybrid = "hybrid" in arg hhh, mmm, sss = [0, 0, 0] try: hhh, mmm, sss = arg.split(':') self.data = ecflow.Clock(hhh, mmm, sss, hybrid) except: self.data = ecflow.Clock(hybrid) else: self.data = ecflow.Clock(arg) def add_to(self, node): if type(node) != Suite: print "WAR: clock can only be attached to suite node, " print "WAR: clock is ignored" return node.add_clock(self.data) class AutoCancel(Attribute): """ wrapper to add time """ def __init__(self, arg): if type(arg) == str: hhh, mmm = arg.split(':') rel = '+' in arg self.data = ecflow.Autocancel(int(hhh), int(mmm), rel) else: self.data = ecflow.Autocancel(arg) def add_to(self, node): node.add_autocancel(self.data) class Time(Attribute): """ wrapper to add time """ def __init__(self, arg): self.data = arg def add_to(self, node): if USE_TIME and self.data is not None: node.add_time(self.data) class Today(Time): """ wrapper to add time """ def __init__(self, arg): self.data = arg def add_to(self, node): if USE_TIME and self.data is not None: node.add_today(self.data) class Cron(Time): """ wrapper to add time """ def __init__(self, bes, wdays=None, days=None, months=None): self.data = ecflow.Cron() if not ("-w" in bes or "-m" in bes or "-d" in bes): self.data.set_time_series(bes); return import argparse parser = argparse.ArgumentParser() parser.add_argument("-w", nargs='?', default=0, help= "weekdays") parser.add_argument("-d", nargs='?', default=0, help= "days") parser.add_argument("-m", nargs='?', default=0, help= "months") parser.add_argument("arg", type=str, help= "begin end step") parsed = parser.parse_args(bes.split()) if parsed.w: self.data.set_week_days([int(x) for x in parsed.w.split(',')]) if parsed.d: self.data.set_week_days([int(x) for x in parsed.d.split(',')]) if parsed.m: self.data.set_months([int(x) for x in parsed.m.split(',')]) self.data.set_time_series(parsed.arg) def add_to(self, node): if USE_TIME and self.data is not None: node.add_cron(self.data) else: print "#WAR: ignoring: %s" % self.data class Date(Time): """ wrapper to add date """ def __init__(self, arg, mask=False): super(Date, self).__init__(arg) self.mask = mask def add_to(self, node): if USE_TIME and self.data is not None: ### ??? FIX, emos avoids dates, datasvc would not if self.mask: node.add_variable("DATEMASK", self.data) else: ddd, mmm, yyy = self.data.split('.') if ddd == '*': ddd = 0 if mmm == '*': mmm = 0 if yyy == '*': yyy = 0 node.add_date(int(ddd), int(mmm), int(yyy)) # node.add_date(self.data) class Day(Date): """ wrapper to add day """ def add_to(self, node): if USE_TIME and self.data is not None: if isinstance(self.data, str): days = { "monday": ecflow.Days.monday, "sunday": ecflow.Days.sunday, "tuesday": ecflow.Days.tuesday, "wednesday": ecflow.Days.wednesday, "thursday": ecflow.Days.thursday, "saturday": ecflow.Days.saturday, "friday": ecflow.Days.friday, } # node.add_date(self.data) ### FIX node.add_variable("WEEKDAY", self.data) node.add_day(ecflow.Days(days[self.data])) else: node.add_day(ecflow.Days(self.data)) class Defcomplete(Attribute): """ wrapper to add defstatus complete """ def __init__(self): pass def add_to(self, node): node.defstatus("complete") class Defstatus(Defcomplete): """ add defstatus attribute""" def __init__(self, kind): if type(kind) == str: kinds = {"suspended": ecflow.DState.suspended, "aborted": ecflow.DState.aborted, "complete": ecflow.DState.complete, "active": ecflow.DState.active, "submitted": ecflow.DState.submitted, "unknown": ecflow.DState.unknown, "queued": ecflow.DState.queued, } self.data = kinds[kind] else: self.data = kind def add_to(self, node): node.add_defstatus(self.data) class DefcompleteIf(Defcomplete): """ wrapper to add conditional defstatus complete just change name to make it explicit """ def __init__(self, arg=True): # super(DefcompleteIf, self).__init__() self.data = arg def add_to(self, node): if self.data: node.defstatus("complete") # else: node.defstatus("queued") # in comment to prevent # overwrite when using multiple defcomplete class Limit(Attribute): """ wrapper to add limit """ # name = None; size = 1 def __init__(self, name=None, size=1): self.name = name self.size = size def add_to(self, node): if USE_LIMIT and self.name is not None: if type(self.name) is dict: for name, size in self.name.items(): node.add_limit(name, size) else: node.add_limit(self.name, self.size) class Late(Attribute): """ wrapper around late, to be add'ed to families and tasks """ def __init__(self, arg): self.data = None if not USE_LATE: if DEBUG: print "#MSG: late is disabled" return sub = False act = False com = False rel = False self.data = ecflow.Late() for item in arg.split(" "): if item == "-s": sub = True elif item == "-c": com = True elif item == "-a": act = True else: hour, mins = item.split(":") rel = "+" in hour if "+" in hour: hour= hour[1:] if sub: self._add_sub(hour, mins) elif com: self._add_com(hour, mins, rel) elif act: self._add_act(hour, mins) sub = False act = False com = False def _add_sub(self, hour, mins): """ submitted""" self.data.submitted(ecflow.TimeSlot(int(hour), int(mins))) def _add_com(self, hour, mins, rel): """ complete""" self.data.complete(ecflow.TimeSlot(int(hour), int(mins)), rel) def _add_act(self, hour, mins): """ active""" self.data.active(ecflow.TimeSlot(int(hour), int(mins))) def add_to(self, node): if USE_LATE and self.data is not None: node.add_late(self.data) class Variables(Attribute): """ dedicated class to enable variable addition with different syntax """ def _set_tvar(self, key, val): """ facilitate to load a ecflow suite to SMS, translating variable names""" keyt, valt = translate(str(key), str(val)) if self.data is None: self.data = Variable(keyt, valt) else: next = self.next self.next = Variables(keyt, valt, next) def __init__(self, __a=None, __b=None, __next=None, *args, **kwargs): self.data = None self.next = __next if len(args) > 0: if type(args) == list: for item in args.iteritems(): self._set_tvar(item.name(), item.value()) elif type(args) == tuple: for key, val in args.items(): self._set_tvar(key, val) else: raise BaseException() if len(kwargs) > 0: for key, val in kwargs.items(): self._set_tvar(key, val) if type(__a) == dict: for key, val in __a.items(): self._set_tvar(key, val) elif type(__a) == tuple: raise BaseException() # for key, val in __a.items(): self._set_tvar(key, val) elif type(__a) == list: raise BaseException() elif type(__a) == Variable: self.data = __a elif __a is not None and __b is not None: self._set_tvar(__a, __b) elif __a is None and __b is None: pass else: raise BaseException(__a, __b, __next, args, kwargs) def add_to(self, node): if self.data is not None: node.add_variable(self.data) edit = "%s" % self.data try: # FIXME Christian's request if "ECF_JOB_CMD" in edit: if "%WSHOST%" in edit: node.add_label("infopcmd", "WSHOST") elif "%SCHOST%" in edit: node.add_label("infopcmd", "SCHOST") elif "%HOST%" in edit: node.add_label("infopcmd", "HOST") else: node.add_label("infopcmd", edit) elif "edit WSHOST " in edit: node.add_label("infopws", edit.replace("edit WSHOST ", "")) elif "edit SCHOST " in edit: node.add_label("infopsc", edit.replace("edit SCHOST ", "")) elif "edit HOST " in edit: node.add_label("infophs", edit.replace("edit HOST ", "")) except: pass if self.next is not None: self.next.add_to(node) def add(self, what): raise baseException(what.fullname()) class Limits(Attribute): """ dedicated class to enable limits addition with different syntax """ def _set_tvar(self, key, val): """ append limits """ if self.data is None: self.data = ecflow.Limit(key, val) else: next = self.next self.next = Limits(key, val, next) def __init__(self, __a=None, __b=None, __next=None, *args, **kwargs): self.data = None self.next = __next if len(args) > 0: if type(args) == list: for item in args.iteritems(): self._set_tvar(item.name(), item.value()) elif type(args) == tuple: for key, val in args.items(): self._set_tvar(key, val) elif len(kwargs) > 0: for key, val in kwargs.items(): self._set_tvar(key, val) elif type(__a) == dict: for key in sorted(__a.iterkeys()): self._set_tvar(key, __a[key]) if __a is not None and __b is not None: self._set_tvar(__a, __b) def add_to(self, node): if self.data is not None: node.add_limit(self.data) if self.next is not None: self.next.add_to(node) class Repeat(Attribute): def __init__(self, name="YMD", start=20120101, end=21010101, step=1, kind="date"): if kind == "date": # print "# repeat", start, end, step, name, kind self.data = ecflow.RepeatDate(name, int(start), int(end), int(step)) elif "int" in kind: self.data = ecflow.RepeatInteger(name, int(start), int(end), int(step)) elif kind == "string": self.data = ecflow.RepeatString(name, start) elif "enum" in kind: self.data = ecflow.RepeatEnumerated(name, start) elif kind == "day": self.data = ecflow.RepeatDay(step) else: self.data = None def add_to(self, node): if self.data is not None: node.add_repeat(self.data) def If(test=True, then=None, otow=None): """ enable Task("t1").add(If(test= (1==1), then= Variables(ONE=1), otow= Variables(TWO=2))) appreciate that both branches are evaluated, using this If class ie there is no 'dead code' as it is with python language 'if' structure using If to distinguish od/rd mode request that both users share the variables (parameter.py) and ecf.py otow: on the other way? """ if test: return then return otow class Root(object): # from where Suite and Node derive """ generic tree node """ def __str__(self): if isinstance(self, ecflow.Node): return self.fullname() return str(self) def __eq__(self, node): if isinstance(self, ecflow.Node): return "%s == " % self.fullname() + str(node) return False def __ne__(self, node): if isinstance(self, ecflow.Node): return "%s != " % self.fullname() + str(node) return False def __and__(self, node): if isinstance(self, ecflow.Node): return "%s and " % self.fullname() + str(node) return False def __or__(self, node): if isinstance(self, ecflow.Node): return "%s or " % self.fullname() + str(node) return False def fullname(self): """ simple syntax """ if isinstance(self, ecflow.Node): return self.get_abs_node_path() return str(self) def repeat(self, name="YMD", start=20120101, end=20321212, step=1, kind="date"): """ add repeat attribute""" if kind == "date": self.add_repeat(ecflow.RepeatDate(name, int(start), int(end), int(step))) elif kind == "integer": self.add_repeat(ecflow.RepeatInteger(name, int(start), int(end), int(step))) elif kind == "string": self.add_repeat(ecflow.RepeatString(name, start)) elif kind == "enumerated": self.add_repeat(ecflow.RepeatEnumerated(name, start)) elif kind == "day": self.add_repeat(ecflow.RepeatDay(step)) else: raise BaseException return self def defstatus(self, kind): """ add defstatus attribute""" status = kind if type(kind) == str: kinds = {"suspended": ecflow.DState.suspended, "aborted": ecflow.DState.aborted, "complete": ecflow.DState.complete, "active": ecflow.DState.active, "submitted": ecflow.DState.submitted, "unknown": ecflow.DState.unknown, "queued": ecflow.DState.queued, } status = kinds[kind] self.add_defstatus(status) return self def add(self, item=None, *args): """ add a task, a family or an attribute """ if DEBUG: print self.fullname(), item, args if item is not None: if type(item) == tuple: for val in item: self.add(val) elif type(item) == list: for val in item: self.add(val) else: if DEBUG: if type(item) in (Task, Family): print item.fullname() try: item.add_to(self) except Exception, exc: print item raise BaseException("not yet", self, type(item), exc) if len(args) > 0: if type(args) == tuple: for val in args: self.add(val) elif type(args) == list: for val in args: self.add(val) else: raise BaseException() if not isinstance(self, ecflow.Node): raise BaseException( "you don't want that") return self def limit(self, name, size): """ add limit attribute""" if name is None: raise BaseException self.add_limit(name, size) return self def inlimit(self, full_path): """ add inlimit attribute""" if not USE_LIMIT: return self path, name = full_path.split(":") if name is None: raise BaseException() if path is None: raise BaseException() self.add_inlimit(name, path) return self class Node(Root): # from where Task and Family derive """ Node class is shared by family and task """ def add_limits(self, __a = None, __b = None, **kwargs): """ add limit dependency""" if isinstance(__a, basestring): self.add_limit(__a, __b) elif isinstance(__a, dict): assert __b is None for key, val in __a.items(): self.add_limit(key, val) for key, val in kwargs.items(): self.add_limit(key, val) return self def meter(self, name, start, end, threshold=None): """ add meter attribute""" if threshold == None: threshold = end self.add_meter(name, start, end, threshold) return self def label(self, name, default=""): """ add label attribute""" self.add_label(name, default) return self def event(self, name=1): """ add event attribute""" if USE_EVENT: self.add_event(name) return self def cron(self, time, dom=False, wdays=False, month=False): """ wrapper for add_cron """ cron = ecflow.Cron() cron.set_time_series(time) if wdays is not False: cron.set_week_days(wdays) if month is not False: cron.set_months(month) if dom is not False: cron.set_day_of_month(dom) self.add_cron(cron) return self def today(self, hhmm): """ wrapper around time """ self.time(hhmm) return self # ??? def time(self, hhmm): """ wrapper around time, None argument is silently ignored """ if hhmm is not None: self.add_time(hhmm) return self def trigger(self, arg): """ add trigger attribute""" if USE_TRIGGER and arg is not None: self.add_trigger(arg) return self def trigger_and(self, arg): """ append to existing trigger""" if USE_TRIGGER and arg is not None: self.add_part_trigger(ecflow.PartExpression(arg, True)) return self def trigger_or(self, arg): """ append to existing trigger""" if USE_TRIGGER and arg is not None: self.add_part_trigger(ecflow.PartExpression(arg, False)) return self def complete(self, arg): """ add complete attribute""" if USE_TRIGGER and arg is not None: self.add_complete(arg) return self def complete_and(self, arg): """ append to existing complete""" if USE_TRIGGER and arg is not None: self.add_part_complete(ecflow.PartExpression(arg, True)) return self def complete_or(self, arg): """ append to existing complete""" if USE_TRIGGER and arg is not None: self.add_part_complete(ecflow.PartExpression(arg, False)) return self def up(self): """ get parent, one level up""" return self.get_parent() class Defs(ecflow.Defs): """ wrapper for the definition """ def add(self, suite): """ add suite """ self.add_suite(suite) return suite def suite(self, name): """ add suite providing its name """ suite = Suite(name) self.add(suite) return suite class Client(ecflow.Client): """ wrapper around client """ def __init__(self, host="localhost", port="31415"): if "@" in host: host, port = host.split("@") # super(Client, self).__init__(host, int(port)) super(Client, self).__init__() self.set_host_port(host, int(port)) else: super(Client, self).__init__() self.set_host_port(host, int(port)) # super(Client, self).__init__(host, "%s" % port) self.host = host self.port = port def __str__(self): return "ecflow client %s@%s v%s" % (self.host, self.port, self.version()) class Suite(ecflow.Suite, Root): """ wrapper for a suite """ def family(self, name): """ add family """ fam = Family(name) self.add_family(fam) return fam def task(self, name): """ add family """ tsk = Task(name) self.add_task(tsk) return tsk # def __enter__(self): return self # def __exit__(self, *args): pass class Family(ecflow.Family, Node, Attribute): """ wrapper around family """ def family(self, name): """ add a family """ fam = Family(name) self.add_family(fam) return fam def task(self, name): """ add a task """ tsk = Task(name) self.add_task(tsk) return tsk def add_to(self, node): node.add_family(self) # def __enter__(self): return self # def __exit__(self, *args): pass class Task(ecflow.Task, Node, Attribute): """ wrapper around task """ def __setattr__(self, key, val): # assert key.isupper() if key.isupper(): key, val = translate(key, val) self.add_variable(key, val) def add_to(self, node): # self.add_label("infop", "def") node.add_task(self) def add_family(self, node): raise BaseException() def display(defs, fname=None): """ print defs""" if fname is None: pass # print defs else: fop = open(fname, "w") print >> fop, defs class TestEcf(unittest.TestCase): """ a test case """ def test_xxx(self): """ a test """ suite = Suite ("a_suite") suite.defstatus("suspended") fam = Family("a_family") tsk = Task("a_task") ft2 = Task("a_fam") ft2.add_to(fam) tsk.VAR = "VALUE" # edit VAR "VALUE" tsk.add(Late("-s 00:05 -c 01:00")) fam.add(tsk, (Task("1"), Task("2")), [Task("11"), Task("12")], Task("111"), Task("211"), Task("t2").add(Trigger(tsk == COMPLETE), Time("01:00")) ) fam.add(Task("t3").add( If(test= (1==1), then=Variables(ADD_ONE=1), otow=Variables(ADD_TWO=1)), If(test= (1==0), then=Variables(ADD_ONE=0), otow=Variables(ADD_TWO=0)), Trigger(tsk != ABORTED), Complete(tsk == COMPLETE))) # longer fam.add( Task("2t"), Task("t4").add( Trigger(tsk.name() != COMPLETE)), Late("-s 00:05 -c 01:00"), Variables(VAR="VALUE"), Task("t5").add(Trigger(["t4", "t3", "t2"])), Task("t6").add(Trigger("2t" == COMPLETE)), Task("t7").add(Trigger("2t eq complete")), ) tsk.add(Limit("a_limit", 10), InLimit("a_task:a_limit"), Meter("step", -1, 100), Label("info", "none"), Event(1), Event("a"), Defcomplete()) tsk.add(Variables({"A": "a", "B": "b"})) tsk.add(Variables(D="d", E="e")) tsk.add(Variables("C", "c")) suite.add(fam) fam.family("another").add(DefcompleteIf(True)) defs = Defs() defs.add(suite) another = defs.suite("another") another.defstatus("suspended") another.task("tn") afam = another.family("another_fam") afam.task("t2n") display(defs, fname="test_ecf.tmp") if __name__ == '__main__': unittest.main()
Also, a simple converter from expended definition file to a python file is hereby provided as an example (it is known to face intrinsic python language limitations in large complete suites case).
def2def.py Expand source
#!/usr/bin/env python # This software is provided under the ECMWF standard software license agreement. import sys try: import ecflow except: sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow") import ecflow import ecflow as ec import sys """ a simple program to convert an expanded definition file to py script using ecf.py""" class Indent: """This class manages indentation, for use with context manager It is used to correctly indent the definition node tree hierarchy """ _pos = 0 _step = 3 def __init__(self): Indent._pos += Indent._step def __del__(self): Indent._pos -= Indent._step @classmethod def indent(cls, loc=''): for i in range(Indent._pos): if loc is None: print ' ' elif type(loc) == str: loc += ' ' else: loc.write(' ') if type(loc) == str: return loc def adds(line=0): if line is None: return "" return Indent.indent() + line + "\n" def add(line, echo=0): if line is None: return elif echo: print Indent.indent() + line else: return Indent.indent() + line + "\n" class DefFormat(object): def __init__(self, defs): self.defs = defs def process_attr(self, node, end=False): res = "" ind = Indent() defstatus = node.get_defstatus() if defstatus: if defstatus != ec.DState.queued: res += add("Defstatus('%s')," % defstatus) item = node.get_autocancel() if item: line = "%s" % item line = line.replace("autocancel ", "") res += add("Autocancel('%s')," % line) item = node.get_repeat() if not item.empty(): line = "%s" % item full = line.split() kind = full[1] name = full[2] try: beg = full[3] try: end = full[4] except: end = beg if "#" in end: end = beg except: beg = 1; end = 1 by = 1 if len(full) == 6: by = full[5] if kind in ("integer", "date"): res += add( "Repeat(kind='%s', name='%s', start=%s, end=%s, step=%s)," % ( kind, name, beg, end, by)) elif kind in ("day", ): res += add( "Repeat(kind='%s', name='%s', step=%s)," % ( kind, name, by)) elif kind in ("string", "enumerated"): line = "%s" % item line = line.replace("repeat %s %s" % (kind, name), "") line.replace('"', '') res += add("Repeat(kind='%s', name='%s', start=%s)," % ( kind, name, line.split())) # FIXME string enum item = node.get_late() if item: line = "%s" % item line = line.replace("late ", "") res += add("Late('%s')," % line) item = node.get_complete() if item: res += add("Complete('%s')," % item) item = node.get_trigger() if item: res += add("Trigger('%s')," % item) for item in node.meters: line = "%s" % item dummy, name, beg, end, thr = line.split(" ") res += add("Meter('%s', %s, %s, %s)," % (name, beg, end, thr)) for item in node.events: line = "%s" % item line = line.replace("event ", "").strip() res += add("Event('%s')," % line) for item in node.labels: res += add("Label('%s', '%s')," % (item.name(), item.value())) for item in node.limits: val = item.value() if val == "0" or val == 0: val = "1" res += add("Limit('%s', %d)," % ( item.name(), int(val))) for item in node.inlimits: line = "%s" % item line = line.replace("inlimit ", "") res += add("InLimit('%s')," % line) for item in node.times: line = "%s" % item line = line.replace("time ", "") res += add("Time('%s')," % line) for item in node.todays: line = "%s" % item line = line.replace("today ", "") res += add("Today('%s')," % line) for item in node.dates: line = "%s" % item line = line.replace("date ", "") res += add("Date('%s')," % line) for item in node.days: line = "%s" % item line = line.replace("day ", "") res += add("Day('%s')," % item) for item in node.crons: line = "%s" % item line = line.replace("cron ", "") res += add("Cron('%s')," % line) var = "Variables(" vind = Indent() num = 0 for item in node.variables: sep="'" if sep in item.value(): sep="\"" var += ind.indent("\n") + item.name() + \ "= %s%s%s," % (sep, item.value(), sep) num += 1 del vind if num: res += add(var + "),") del ind if len(res) == 0: return None return res def process(self, node=None, inc=0): STREAM = 1 if node is None: num = 0 print "# ecf.ECF_MODE = 'sms'" print "suites = []" for suite in self.defs.suites: if STREAM: print "suite%d = Suite('%s').add(" % (num, suite.name()) else: print "s = Suite('%s')"% (suite.name()) + "; suites.append(s);" self.process(suite) print ")" num += 1 if num > 1: if 0: raise BaseException("no more than one suite at a time") print "### WARNING: more than one suite defined" elif isinstance(node, ec.Suite): if not STREAM: print "s.add(\n" add( self.process_attr(node),1 ) if STREAM: ind = Indent() for kid in node.nodes: self.process(kid) if STREAM: del ind elif isinstance(node, ec.Family): many = ""; one = ""; post = "" # circumvent limitation to 255 items if inc % 200 == 0: one += "("; post = ")," if inc > 200: many = ")," if STREAM: add( many + one + "Family('%s').add(" % node.name(), 1) res = self.process_attr(node) else: print ")\ncur = Family('%s')" % node.name() print "fam.add(cur); fam = cur; fam.add(" res = self.process_attr(node) if STREAM: if res is not None: print Indent.indent(res) ind = Indent() num = 0 for kid in node.nodes: self.process(kid, num) ; num += 1 if STREAM: del ind add( Indent.indent(post + "), # endfamily %s" % node.name()), 1 ) print "# ", num elif isinstance(node, ec.Task): res = self.process_attr(node) if res is None: add( "Task('%s')," % node.name(), 1) else: add( "Task('%s').add(\n" % node.name() + "%s%s)," % (res, Indent.indent()), 1) def main(self): self.header() self.process() self.footer() def header(self): print """#!/usr/bin/env python # NOTICE: this file was originally created with def2def.py import sys # sys.path.append('../o/def') # sys.path.append('.') sys.path.append('/home/ma/emos/def/o/def') from ecf import * import ecf as ecf # import inc_emos as ie """ def footer(self): print """ if __name__ == '__main__': defs = Defs() defs.add(suite0); defs.auto_add_externs(True) if 0: import cli_proc, ecf cli_proc.process(ie.Seed(defs), compare=False) else: port = 1500 + int(get_uid()) # when started with ecflow_start.sh # ECMWF if 0: # test job creation job_ctrl = ecflow.JobCreationCtrl() defs.check_job_creation(job_ctrl) print "loading on localhost@%s" % port client = ecf.Client("localhost", port) client.replace("/%s" % suite0.name(), defs) """ if __name__ == '__main__': filename = sys.argv[1] defs = ec.Defs(filename) defs.auto_add_externs(True) if 0: print defs DefFormat(defs).main()