@@ -1512,7 +1512,7 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
15121512 go wait .Until (cc .PreemptQueueJobs , 60 * time .Second , stopCh )
15131513
15141514 // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
1515- go wait .Until (cc .UpdateQueueJobs , 5 * time .Second , stopCh )
1515+ // go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
15161516
15171517 if cc .isDispatcher {
15181518 go wait .Until (cc .UpdateAgent , 2 * time .Second , stopCh ) // In the Agent?
@@ -1536,90 +1536,79 @@ func (qjm *XController) UpdateAgent() {
15361536// Move AW from Running to Completed or RunningHoldCompletion
15371537// Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state.
15381538// State transition: Running->RunningHoldCompletion->Completed
1539- func (qjm * XController ) UpdateQueueJobs () {
1540- queueJobs , err := qjm .appWrapperLister .AppWrappers ("" ).List (labels .Everything ())
1541- if err != nil {
1542- klog .Errorf ("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v" , err )
1543- return
1544- }
1545- containsCompletionStatus := false
1546- for _ , newjob := range queueJobs {
1547- for _ , item := range newjob .Spec .AggrResources .GenericItems {
1548- if len (item .CompletionStatus ) > 0 {
1549- containsCompletionStatus = true
1550- }
1539+ func (qjm * XController ) UpdateQueueJobs (newjob * arbv1.AppWrapper ) {
1540+
1541+ if newjob .Status .State == arbv1 .AppWrapperStateActive || newjob .Status .State == arbv1 .AppWrapperStateRunningHoldCompletion {
1542+ err := qjm .UpdateQueueJobStatus (newjob )
1543+ if err != nil {
1544+ klog .Errorf ("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v" , newjob .Name , err )
1545+ //TODO: should we really return?
1546+ return
15511547 }
1552- if (newjob .Status .State == arbv1 .AppWrapperStateActive || newjob .Status .State == arbv1 .AppWrapperStateRunningHoldCompletion ) && containsCompletionStatus {
1553- err := qjm .UpdateQueueJobStatus (newjob )
1554- if err != nil {
1555- klog .Errorf ("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v" , newjob .Name , err )
1556- continue
1557- }
1558- klog .V (6 ).Infof ("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v" , newjob .Name , qjm .qjqueue .IfExist (newjob ), newjob , newjob .ResourceVersion , newjob .Status )
1559- // set appwrapper status to Complete or RunningHoldCompletion
1560- derivedAwStatus := qjm .getAppWrapperCompletionStatus (newjob )
1548+ klog .V (6 ).Infof ("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v" , newjob .Name , qjm .qjqueue .IfExist (newjob ), newjob , newjob .ResourceVersion , newjob .Status )
1549+ // set appwrapper status to Complete or RunningHoldCompletion
1550+ derivedAwStatus := qjm .getAppWrapperCompletionStatus (newjob )
15611551
1562- klog .Infof ("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]" , derivedAwStatus , newjob .Namespace , newjob .Name , newjob .ResourceVersion ,
1563- newjob .Status .CanRun , newjob .Status .State , newjob .Status .Pending , newjob .Status .Running , newjob .Status .Succeeded , newjob .Status .Failed )
1552+ klog .Infof ("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]" , derivedAwStatus , newjob .Namespace , newjob .Name , newjob .ResourceVersion ,
1553+ newjob .Status .CanRun , newjob .Status .State , newjob .Status .Pending , newjob .Status .Running , newjob .Status .Succeeded , newjob .Status .Failed )
15641554
1565- // Set Appwrapper state to complete if all items in Appwrapper
1566- // are completed
1567- if derivedAwStatus == arbv1 .AppWrapperStateRunningHoldCompletion {
1568- newjob .Status .State = derivedAwStatus
1569- var updateQj * arbv1.AppWrapper
1570- index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondRunningHoldCompletion , "SomeItemsCompleted" )
1571- if index < 0 {
1572- newjob .Status .QueueJobState = arbv1 .AppWrapperCondRunningHoldCompletion
1573- cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondRunningHoldCompletion , v1 .ConditionTrue , "SomeItemsCompleted" , "" )
1574- newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
1575- newjob .Status .FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
1576- updateQj = newjob .DeepCopy ()
1577- } else {
1578- cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondRunningHoldCompletion , v1 .ConditionTrue , "SomeItemsCompleted" , "" )
1579- newjob .Status .Conditions [index ] = * cond .DeepCopy ()
1580- updateQj = newjob .DeepCopy ()
1581- }
1582- err := qjm .updateStatusInEtcdWithRetry (context .Background (), updateQj , "[UpdateQueueJobs] setRunningHoldCompletion" )
1583- if err != nil {
1584- // TODO: implement retry
1585- klog .Errorf ("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v." , newjob .Namespace , newjob .Name , newjob .Status , err )
1586- }
1555+ // Set Appwrapper state to complete if all items in Appwrapper
1556+ // are completed
1557+ if derivedAwStatus == arbv1 .AppWrapperStateRunningHoldCompletion {
1558+ newjob .Status .State = derivedAwStatus
1559+ var updateQj * arbv1.AppWrapper
1560+ index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondRunningHoldCompletion , "SomeItemsCompleted" )
1561+ if index < 0 {
1562+ newjob .Status .QueueJobState = arbv1 .AppWrapperCondRunningHoldCompletion
1563+ cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondRunningHoldCompletion , v1 .ConditionTrue , "SomeItemsCompleted" , "" )
1564+ newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
1565+ newjob .Status .FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
1566+ updateQj = newjob .DeepCopy ()
1567+ } else {
1568+ cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondRunningHoldCompletion , v1 .ConditionTrue , "SomeItemsCompleted" , "" )
1569+ newjob .Status .Conditions [index ] = * cond .DeepCopy ()
1570+ updateQj = newjob .DeepCopy ()
15871571 }
1588- // Set appwrapper status to complete
1589- if derivedAwStatus == arbv1 . AppWrapperStateCompleted {
1590- newjob . Status . State = derivedAwStatus
1591- newjob . Status . CanRun = false
1592- var updateQj * arbv1. AppWrapper
1593- index := getIndexOfMatchedCondition ( newjob , arbv1 . AppWrapperCondCompleted , "PodsCompleted" )
1594- if index < 0 {
1595- newjob . Status . QueueJobState = arbv1 .AppWrapperCondCompleted
1596- cond := GenerateAppWrapperCondition ( arbv1 . AppWrapperCondCompleted , v1 . ConditionTrue , "PodsCompleted" , "" )
1597- newjob .Status .Conditions = append ( newjob . Status . Conditions , cond )
1598- newjob . Status . FilterIgnore = true // Update AppWrapperCondCompleted
1599- updateQj = newjob . DeepCopy ( )
1600- } else {
1601- cond := GenerateAppWrapperCondition ( arbv1 .AppWrapperCondCompleted , v1 . ConditionTrue , "PodsCompleted" , "" )
1602- newjob . Status . Conditions [ index ] = * cond . DeepCopy ( )
1603- updateQj = newjob .DeepCopy ( )
1604- }
1605- err := qjm . updateStatusInEtcdWithRetry ( context . Background (), updateQj , "[UpdateQueueJobs] setCompleted" )
1606- if err != nil {
1607- if qjm . quotaManager != nil {
1608- qjm . quotaManager . Release ( updateQj )
1609- }
1610- // TODO: Implement retry
1611- klog . Errorf ( "[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v." , newjob . Namespace , newjob . Name , newjob . Status , err )
1612- }
1572+ err := qjm . updateStatusInEtcdWithRetry ( context . Background (), updateQj , "[UpdateQueueJobs] setRunningHoldCompletion" )
1573+ if err != nil {
1574+ // TODO: implement retry
1575+ klog . Errorf ( "[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s', Status=%+v, err=%+v." , newjob . Namespace , newjob . Name , newjob . Status , err )
1576+ }
1577+ }
1578+ // Set appwrapper status to complete
1579+ if derivedAwStatus == arbv1 .AppWrapperStateCompleted {
1580+ newjob . Status . State = derivedAwStatus
1581+ newjob .Status .CanRun = false
1582+ var updateQj * arbv1. AppWrapper
1583+ index := getIndexOfMatchedCondition ( newjob , arbv1 . AppWrapperCondCompleted , "PodsCompleted" )
1584+ if index < 0 {
1585+ newjob . Status . QueueJobState = arbv1 .AppWrapperCondCompleted
1586+ cond := GenerateAppWrapperCondition ( arbv1 . AppWrapperCondCompleted , v1 . ConditionTrue , "PodsCompleted" , "" )
1587+ newjob . Status . Conditions = append ( newjob .Status . Conditions , cond )
1588+ newjob . Status . FilterIgnore = true // Update AppWrapperCondCompleted
1589+ updateQj = newjob . DeepCopy ( )
1590+ } else {
1591+ cond := GenerateAppWrapperCondition ( arbv1 . AppWrapperCondCompleted , v1 . ConditionTrue , "PodsCompleted" , "" )
1592+ newjob . Status . Conditions [ index ] = * cond . DeepCopy ( )
1593+ updateQj = newjob . DeepCopy ()
1594+ }
1595+ err := qjm . updateStatusInEtcdWithRetry ( context . Background (), updateQj , "[UpdateQueueJobs] setCompleted" )
1596+ if err != nil {
16131597 if qjm .quotaManager != nil {
16141598 qjm .quotaManager .Release (updateQj )
16151599 }
1616- // Delete AW from both queue's
1617- qjm .eventQueue .Delete (updateQj )
1618- qjm .qjqueue .Delete (updateQj )
1600+ // TODO: Implement retry
1601+ klog .Errorf ("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v." , newjob .Namespace , newjob .Name , newjob .Status , err )
1602+ }
1603+ if qjm .quotaManager != nil {
1604+ qjm .quotaManager .Release (updateQj )
16191605 }
1620- klog .Infof ("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]" , newjob .Namespace , newjob .Name , newjob .ResourceVersion ,
1621- newjob .Status .CanRun , newjob .Status .State , newjob .Status .Pending , newjob .Status .Running , newjob .Status .Succeeded , newjob .Status .Failed )
1606+ // Delete AW from both queue's
1607+ qjm .eventQueue .Delete (updateQj )
1608+ qjm .qjqueue .Delete (updateQj )
16221609 }
1610+ klog .Infof ("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]" , newjob .Namespace , newjob .Name , newjob .ResourceVersion ,
1611+ newjob .Status .CanRun , newjob .Status .State , newjob .Status .Pending , newjob .Status .Running , newjob .Status .Succeeded , newjob .Status .Failed )
16231612 }
16241613}
16251614
@@ -1678,6 +1667,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
16781667 }
16791668
16801669 klog .V (6 ).Infof ("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v" , newQJ .Namespace , newQJ .Name , time .Now ().Sub (newQJ .Status .ControllerFirstTimestamp .Time ).Seconds (), newQJ .ResourceVersion , newQJ .Status )
1670+ notBackedoff := true
16811671 for _ , cond := range newQJ .Status .Conditions {
16821672 if cond .Type == arbv1 .AppWrapperCondBackoff {
16831673 //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue.
@@ -1688,12 +1678,45 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
16881678 }
16891679 cc .enqueue (newQJ )
16901680 })
1691- return
1681+ notBackedoff = false
16921682 }
16931683 }
16941684
16951685 // cc.eventQueue.Delete(oldObj)
1696- cc .enqueue (newQJ )
1686+ if notBackedoff {
1687+ cc .enqueue (newQJ )
1688+
1689+ // Requeue the item to be processed again in 30 seconds.
1690+ //TODO: tune the frequency of reprocessing an AW
1691+ hasCompletionStatus := false
1692+ for _ , genericItem := range newQJ .Spec .AggrResources .GenericItems {
1693+ if len (genericItem .CompletionStatus ) > 0 {
1694+ hasCompletionStatus = true
1695+ }
1696+ }
1697+ //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
1698+ //on slate Aws
1699+ if newQJ .Status .State != arbv1 .AppWrapperStateCompleted && newQJ .Status .State != arbv1 .AppWrapperStateFailed && newQJ .Status .State != "" {
1700+ requeueInterval := 30 * time .Second
1701+ key , err := cache .MetaNamespaceKeyFunc (newQJ )
1702+ if err == nil {
1703+ go func () {
1704+ for {
1705+ time .Sleep (requeueInterval )
1706+ latestAw , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1707+ if err == nil && exists {
1708+ // Enqueue the latest copy of the AW.
1709+ if (newQJ .Status .State != arbv1 .AppWrapperStateCompleted || newQJ .Status .State != arbv1 .AppWrapperStateFailed ) && hasCompletionStatus {
1710+ cc .UpdateQueueJobs (latestAw .(* arbv1.AppWrapper ))
1711+ klog .V (2 ).Infof ("[Informer-updateQJ] Finished requeing AW to determine completion status" )
1712+ }
1713+ }
1714+ }
1715+ }()
1716+ }
1717+ }
1718+ }
1719+
16971720}
16981721
16991722// a, b arbitrary length numerical string. returns true if a larger than b
0 commit comments