Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/algorithm/predicates/guest/memory_predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package guest

import (
"yunion.io/x/log"
"yunion.io/x/onecloud/pkg/scheduler/algorithm/predicates"
"yunion.io/x/onecloud/pkg/scheduler/core"
)
Expand Down Expand Up @@ -58,6 +59,8 @@ func (p *MemoryPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []core
reqMemSize := int64(d.Memory)
if freeMemSize < reqMemSize {
totalMemSize := getter.TotalMemorySize(useRsvd)
log.Infof("[SchedDiag] host_memory insufficient hostId=%s freeMem=%d reqMem=%d totalMem=%d",
getter.Id(), freeMemSize, reqMemSize, totalMemSize)
h.AppendInsufficientResourceError(reqMemSize, totalMemSize, freeMemSize)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"yunion.io/x/log"
expirationcache "yunion.io/x/pkg/util/cache"
Expand Down Expand Up @@ -145,16 +146,17 @@ func (c *schedulerCache) updateAllObjects() {
// if ids is nil and err is normalError then return.
return
} else if len(ids) > 0 {
log.V(10).Debugf("Update host/baremetal status list: %v", ids)
c.loadObjects(ids)
}
}

func (c *schedulerCache) loadObjects(ids []string) ([]interface{}, error) {
startTime := time.Now()
log.Infof("Start load %s, period: %v, ttl: %v", c.Name(), c.item.Period(), c.item.TTL())

defer func() {
log.Infof("End load %s", c.Name())
duration := time.Since(startTime)
log.Infof("End load %s, duration: %v", c.Name(), duration)
}()

var (
Expand Down
8 changes: 7 additions & 1 deletion pkg/scheduler/cache/candidate/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,11 @@ func (h *HostDesc) GetTotalMemSize(useRsvd bool) int64 {
}

func (h *HostDesc) GetFreeMemSize(useRsvd bool) int64 {
return reservedResourceAddCal(h.FreeMemSize, h.GuestReservedMemSizeFree(), useRsvd) - int64(h.GetPendingUsage().Memory)
pending := h.GetPendingUsage()
result := reservedResourceAddCal(h.FreeMemSize, h.GuestReservedMemSizeFree(), useRsvd) - int64(pending.Memory)
log.Infof("[SchedDiag] GetFreeMemSize hostId=%s cacheFreeMem=%d pendingMem=%d result=%d",
h.Id, h.FreeMemSize, pending.Memory, result)
return result
}

func (h *HostDesc) GuestReservedMemSizeFree() int64 {
Expand Down Expand Up @@ -924,6 +928,8 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
memFreeSize += memSub
}
desc.FreeMemSize = memFreeSize
log.Infof("[SchedDiag] hostDesc hostId=%s freeMem=%d runningMem=%d creatingMem=%d requiredMem=%d guestCount=%d creatingCount=%d ignoreNonrunning=%v",
desc.Id, desc.FreeMemSize, desc.RunningMemSize, desc.CreatingMemSize, desc.RequiredMemSize, desc.GuestCount, desc.CreatingGuestCount, o.Options.IgnoreNonrunningGuests)

// free cpu count calculate
rsvdUseCPU := desc.GuestReservedResourceUsed.CPUCount
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ completed:
doSelect(unit, sc.Candidate, sc.Count)
selectedCandidates = append(selectedCandidates, sc)
}
for hostID, sc := range selectedMap {
cap := unit.GetCapacity(hostID)
log.Infof("[SchedDiag] SelectHosts sessionId=%s hostId=%s capacity=%d selectedCount=%d",
unit.SchedInfo.SessionId, hostID, cap, sc.Count)
}
// hack: not selected host should also execute OnSelectEnd step to inject result of network and storage candidates
/*for _, nsc := range noSelectedMap {
for _, plugin := range plugins {
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/core/result_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func transToRegionSchedResult(result SchedResultItems, count int64, sid string)
}
}

perHost := make(map[string]int)
for _, c := range apiResults {
if c.HostId != "" {
perHost[c.HostId]++
}
}
log.Infof("[SchedDiag] transToRegionSchedResult sessionId=%s requested=%d successCount=%d totalCandidates=%d perHost=%v",
sid, count, succCount, len(apiResults), perHost)

for {
if int64(succCount) >= count {
break
Expand Down
18 changes: 17 additions & 1 deletion pkg/scheduler/data_manager/candidate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"fmt"
"time"

"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/utils"

"yunion.io/x/onecloud/pkg/scheduler/cache"
candidatecache "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
"yunion.io/x/onecloud/pkg/scheduler/core"
schedmodels "yunion.io/x/onecloud/pkg/scheduler/models"
)

type CandidateGetArgs struct {
Expand Down Expand Up @@ -363,12 +365,26 @@ func (cm *CandidateManager) Reload(resType string, candidateIds []string) (
}

func (cm *CandidateManager) ReloadAll(resType string) ([]interface{}, error) {
// Mark the start of ReloadAll to protect pending usage added during reload
schedmodels.HostPendingUsageManager.SetReloadAllStartTime()
log.Infof("[SchedDiag] ReloadAll start resType=%s", resType)

impl, err := cm.getImpl(resType)
if err != nil {
return nil, err
}

return impl.ReloadAll()
result, err := impl.ReloadAll()
if err == nil {
// Clear pending usage created before ReloadAll started
// This ensures pending usage doesn't leak when cache is fully rebuilt
// but protects pending usage added during reload
schedmodels.HostPendingUsageManager.ClearAllPendingUsage()
log.Infof("[SchedDiag] ReloadAll done resType=%s", resType)
} else {
log.Errorf("[CandidateManager] Failed to reload all %q candidates: %v", resType, err)
}
return result, err
}

//type IDirtyPoolItem interface {
Expand Down
75 changes: 34 additions & 41 deletions pkg/scheduler/manager/expire_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ type ExpireManager struct {
expireChannel chan *api.ExpireArgs
stopCh <-chan struct{}

mergeLock *sync.Mutex
mergeLock *sync.Mutex
reloadCancelQueue *ReloadCancelQueue
}

func NewExpireManager(stopCh <-chan struct{}) *ExpireManager {
return &ExpireManager{
expireChannel: make(chan *api.ExpireArgs, o.Options.ExpireQueueMaxLength),
stopCh: stopCh,
mergeLock: new(sync.Mutex),
expireChannel: make(chan *api.ExpireArgs, o.Options.ExpireQueueMaxLength),
stopCh: stopCh,
mergeLock: new(sync.Mutex),
reloadCancelQueue: NewReloadCancelQueue(stopCh),
}
}

Expand Down Expand Up @@ -112,46 +114,37 @@ func (e *ExpireManager) batchMergeExpire() {
}
}
log.V(4).Infof("batchMergeExpire dirtyHosts: %v, dirtyBaremetals: %v", dirtyHosts, dirtyBaremetals)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
//dirtyHosts = notInSession(dirtyHosts, "host")
if len(dirtyHosts) > 0 {
log.V(10).Debugf("CleanDirty Hosts: %v\n", dirtyHosts)
if _, err := schedManager.CandidateManager.Reload("host", dirtyHostSets.List()); err != nil {
log.Errorf("Clean dirty hosts %v: %v", dirtyHosts, err)
}
schedManager.HistoryManager.CancelCandidatesPendingUsage(dirtyHosts)

// Use queue to ensure reload completes before cancel
var hostTask, baremetalTask *ReloadCancelTask

if len(dirtyHosts) > 0 {
hostTask = &ReloadCancelTask{
ResType: "host",
HostIds: dirtyHostSets.List(),
ExpireHosts: dirtyHosts,
}
}()

go func() {
defer wg.Done()
//dirtyBaremetals = notInSession(dirtyBaremetals, "baremetal")
if len(dirtyBaremetals) > 0 {
log.V(10).Debugf("CleanDirty Baremetals: %v\n", dirtyBaremetals)
if _, err := schedManager.CandidateManager.Reload("baremetal", dirtyBaremetalSets.List()); err != nil {
log.Errorf("Clean dirty baremetals %v: %v", dirtyBaremetals, err)
}
schedManager.HistoryManager.CancelCandidatesPendingUsage(dirtyBaremetals)
}

if len(dirtyBaremetals) > 0 {
baremetalTask = &ReloadCancelTask{
ResType: "baremetal",
HostIds: dirtyBaremetalSets.List(),
ExpireHosts: dirtyBaremetals,
}
}()
if ok := e.waitTimeOut(wg, u.ToDuration(o.Options.ExpireQueueConsumptionTimeout)); !ok {
log.Errorln("time out reload data.")
}
}

func (e *ExpireManager) waitTimeOut(wg *sync.WaitGroup, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
// Add tasks to queue (will be processed asynchronously)
if hostTask != nil || baremetalTask != nil {
tasks := make([]*ReloadCancelTask, 0, 2)
if hostTask != nil {
tasks = append(tasks, hostTask)
}
if baremetalTask != nil {
tasks = append(tasks, baremetalTask)
}
e.reloadCancelQueue.AddBatch(tasks, nil)
log.Infof("Added reload+cancel tasks to queue: hosts=%d, baremetals=%d",
len(dirtyHosts), len(dirtyBaremetals))
}
}
21 changes: 21 additions & 0 deletions pkg/scheduler/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type SchedulerManager struct {
DataManager *data_manager.DataManager
CandidateManager *data_manager.CandidateManager
KubeClusterManager *k8s.SKubeClusterManager

stopCh <-chan struct{}
}

func NewSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
Expand All @@ -57,6 +59,7 @@ func NewSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
sm.HistoryManager = NewHistoryManager(stopCh)
sm.TaskManager = NewTaskManager(stopCh)
sm.KubeClusterManager = k8s.NewKubeClusterManager(o.Options.Region, 30*time.Second)
sm.stopCh = stopCh

return sm
}
Expand All @@ -80,8 +83,26 @@ func InitAndStart(stopCh <-chan struct{}) {
}

func (sm *SchedulerManager) start() {
// Safety net: periodically GC session pending usages older than 30 minutes.
go func() {
t := time.NewTicker(1 * time.Minute)
defer t.Stop()
for {
select {
case <-t.C:
n := schedmodels.HostPendingUsageManager.GCExpiredSessionUsages(30 * time.Minute)
if n > 0 {
log.Warningf("[PendingUsage] GC expired session usages: cleared=%d", n)
}
case <-sm.stopCh:
return
}
}
}()

startFuncs := []func(){
sm.ExpireManager.Run,
sm.ExpireManager.reloadCancelQueue.Run,
sm.CompletedManager.Run,
sm.HistoryManager.Run,
sm.TaskManager.Run,
Expand Down
Loading