- Created by Axel Bonet, last modified on Apr 20, 2020
From one simple task which both produces and consumes "steps", to the extreme parallel version with one task per step to process.
Alternatives Expand source
beg = 0 fin = 48 by = 3 def not_consumer(): return Variables(CONSUME= "no") def not_producer(): return Variables(PRODUCE= "no") def ev():return (Event("p"), Event("c")) def call_task(name, start, stop, inc): meter = None if start != stop: meter = Meter( "step", -1, int(stop)) return Task(name).add( ev(), Variables(BEG= start, FIN= stop, BY= inc), meter) def call_consumer(SELECTION): # defstatus suspended # avoid early start def consume1(leap = 1, leap_nb = 3): return [ Family("%d" % leap).add( call_task( "consume", "'%STEP%'", "'%STEP%'", by * leap_nb).add( Repeat("STEP", beg + by * (leap - 1), fin, by * leap_nb, kind="integer"), # same trigger as consume0 # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) + # "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod)) Trigger("(consume:STEP le %s1/produce:STEP)" % prod), )) for leap in xrange(1, leap_nb) ] def consume2(beg, fin): return [ Family("%03d" % idx).add( call_task( "consume", idx, idx, by).add( Variables(STEP= idx), # Same trigger as consume0 # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) + # "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod)))) Trigger("consume:STEP le %s1/produce:STEP" % prod), )) for idx in xrange(beg, fin+1, by )] lead = ic.psel() + "/consumer/admin/leader:1" prod = ic.psel() + "/consumer/produce" if 0: return Family("consumer").add(Defcomplete()) # FIXME return Family("consumer").add( Defcomplete(), Variables(SLEEP= 10, PRODUCE= "no", CONSUME= "no"), Family("limit").add( Defcomplete(), Limit("consume", 7),), Family("admin").add( # set manually with Xcdp or alter the event 1 so # that producer 1 becomes leader # default is producer0 leads Task("leader").add( Event("1"), Defcomplete())), # text this task is dummy task not designed to run # default : task does both : Variables(PRODUCE= "yes", CONSUME= "yes"), # this task will do both, ie serial call_task( "produce", beg, fin, by).add( Label("info", "do both at once"),), # this will loop inside the task, reporting # its processed step as a meter indication Family("produce0").add( not_consumer(), call_task( "produce", beg, fin, by )), # here, choice is to push a new task for each step Family("produce1").add( not_consumer(), call_task( "produce", '%STEP%', "%STEP%", by ).add( Repeat("STEP", beg, fin, by , kind="integer"), )), # PRB edit FIN '$((%STEP% + %BY%))' Family("consume").add( not_producer(), Inlimit("limit:consume"), Variables(CALL_WAITER= 1, # $step will be interpreted in the job! TRIGGER= "../produce:step -gt consume:$step or ../produce eq complete"), call_task( "consume", beg, fin, by ).add( )), Family("consume0or1").add( not_producer(), Inlimit("limit:consume"), call_task( "consume", "%STEP%", "%STEP%", by, ), Repeat( "STEP", beg, fin, by, kind="integer"), Trigger("(%s and (consume0or1:STEP le %s1/produce:STEP)) or " % (lead, prod) + "(not %s and (consume0or1:STEP le %s0/produce:step))" % (lead, prod))), Family("consume1").add( not_producer(), Inlimit("limit:consume"), consume1()), Family("consume2").add( # loop is ``exploded'' # consume limit may be changed manually with Xcdp # to reduce or increase the load Inlimit("limit:consume"), not_producer(), consume2(beg, fin)))