1use core::cmp::Ordering;
134use std::cell::Cell;
135use std::collections::BTreeMap;
136use std::collections::VecDeque;
137use std::fmt;
138use std::sync::Arc;
139
140use anyhow::anyhow;
141use anyhow::bail;
142use anyhow::Result;
143use log::debug;
144use log::trace;
145use ordered_float::OrderedFloat;
146use scx_utils::ravg::ravg_read;
147use scx_utils::LoadAggregator;
148use scx_utils::LoadLedger;
149use sorted_vec::SortedVec;
150
151use crate::bpf_intf;
152use crate::bpf_skel::*;
153use crate::stats::DomainStats;
154use crate::stats::NodeStats;
155use crate::DomainGroup;
156
157const DEFAULT_WEIGHT: f64 = bpf_intf::consts_LB_DEFAULT_WEIGHT as f64;
158const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
159
160fn now_monotonic() -> u64 {
161    let mut time = libc::timespec {
162        tv_sec: 0,
163        tv_nsec: 0,
164    };
165    let ret = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut time) };
166    assert!(ret == 0);
167    time.tv_sec as u64 * 1_000_000_000 + time.tv_nsec as u64
168}
169
170#[derive(Clone, Copy, Debug, PartialEq)]
171enum BalanceState {
172    Balanced,
173    NeedsPush,
174    NeedsPull,
175}
176
177impl fmt::Display for BalanceState {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        match self {
180            BalanceState::Balanced => write!(f, "BALANCED"),
181            BalanceState::NeedsPush => write!(f, "OVER-LOADED"),
182            BalanceState::NeedsPull => write!(f, "UNDER-LOADED"),
183        }
184    }
185}
186
187macro_rules! impl_ord_for_type {
188    ($($t:ty),*) => {
189        $(
190            impl PartialEq for $t {
191                fn eq(&self, other: &Self) -> bool {
192                    <dyn LoadOrdered>::eq(self, other)
193                }
194            }
195
196            impl Eq for $t {}
197
198            impl PartialOrd for $t {
199                fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200                    <dyn LoadOrdered>::partial_cmp(self, other)
201                }
202            }
203
204            impl Ord for $t {
205                fn cmp(&self, other: &Self) -> Ordering {
206                    <dyn LoadOrdered>::cmp(self, other)
207                }
208            }
209        )*
210    };
211}
212
213trait LoadOrdered {
214    fn get_load(&self) -> OrderedFloat<f64>;
215}
216
217impl dyn LoadOrdered {
218    #[inline]
219    fn eq(&self, other: &Self) -> bool {
220        self.get_load().eq(&other.get_load())
221    }
222
223    #[inline]
224    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225        self.get_load().partial_cmp(&other.get_load())
226    }
227
228    #[inline]
229    fn cmp(&self, other: &Self) -> Ordering {
230        self.get_load().cmp(&other.get_load())
231    }
232}
233
234#[derive(Debug, Clone)]
235pub struct LoadEntity {
236    cost_ratio: f64,
237    push_max_ratio: f64,
238    xfer_ratio: f64,
239    load_sum: OrderedFloat<f64>,
240    load_avg: f64,
241    load_delta: f64,
242    bal_state: BalanceState,
243}
244
245impl LoadEntity {
246    fn new(
247        cost_ratio: f64,
248        push_max_ratio: f64,
249        xfer_ratio: f64,
250        load_sum: f64,
251        load_avg: f64,
252    ) -> Self {
253        let mut entity = Self {
254            cost_ratio,
255            push_max_ratio,
256            xfer_ratio,
257            load_sum: OrderedFloat(load_sum),
258            load_avg,
259            load_delta: 0.0f64,
260            bal_state: BalanceState::Balanced,
261        };
262        entity.add_load(0.0f64);
263        entity
264    }
265
266    pub fn load_sum(&self) -> f64 {
267        *self.load_sum
268    }
269
270    pub fn load_avg(&self) -> f64 {
271        self.load_avg
272    }
273
274    pub fn imbal(&self) -> f64 {
275        self.load_sum() - self.load_avg
276    }
277
278    pub fn delta(&self) -> f64 {
279        self.load_delta
280    }
281
282    fn state(&self) -> BalanceState {
283        self.bal_state
284    }
285
286    fn rebalance(&mut self, new_load: f64) {
287        self.load_sum = OrderedFloat(new_load);
288
289        let imbal = self.imbal();
290        let needs_balance = imbal.abs() > self.load_avg * self.cost_ratio;
291
292        self.bal_state = if needs_balance {
293            if imbal > 0f64 {
294                BalanceState::NeedsPush
295            } else {
296                BalanceState::NeedsPull
297            }
298        } else {
299            BalanceState::Balanced
300        };
301    }
302
303    fn add_load(&mut self, delta: f64) {
304        self.rebalance(self.load_sum() + delta);
305        self.load_delta += delta;
306    }
307
308    fn push_cutoff(&self) -> f64 {
309        self.imbal().abs() * self.push_max_ratio
310    }
311
312    fn xfer_between(&self, other: &LoadEntity) -> f64 {
313        self.imbal().abs().min(other.imbal().abs()) * self.xfer_ratio
314    }
315}
316
317#[derive(Debug)]
318struct TaskInfo {
319    taskc_p: *mut types::task_ctx,
320    load: OrderedFloat<f64>,
321    dom_mask: u64,
322    preferred_dom_mask: u64,
323    migrated: Cell<bool>,
324    is_kworker: bool,
325}
326
327impl LoadOrdered for TaskInfo {
328    fn get_load(&self) -> OrderedFloat<f64> {
329        self.load
330    }
331}
332impl_ord_for_type!(TaskInfo);
333
334#[derive(Debug)]
335struct Domain {
336    id: usize,
337    queried_tasks: bool,
338    load: LoadEntity,
339    tasks: SortedVec<TaskInfo>,
340}
341
342impl Domain {
343    const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;
344    const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
345    const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
346
347    fn new(id: usize, load_sum: f64, load_avg: f64) -> Self {
348        Self {
349            id,
350            queried_tasks: false,
351            load: LoadEntity::new(
352                Domain::LOAD_IMBAL_HIGH_RATIO,
353                Domain::LOAD_IMBAL_PUSH_MAX_RATIO,
354                Domain::LOAD_IMBAL_XFER_TARGET_RATIO,
355                load_sum,
356                load_avg,
357            ),
358            tasks: SortedVec::new(),
359        }
360    }
361
362    fn transfer_load(&mut self, load: f64, taskc: &mut types::task_ctx, other: &mut Domain) {
363        trace!("XFER pid={} dom={}->{}", taskc.pid, self.id, other.id);
364
365        let dom_id: u32 = other.id.try_into().unwrap();
366        taskc.target_dom = dom_id;
367
368        self.load.add_load(-load);
369        other.load.add_load(load);
370    }
371
372    fn xfer_between(&self, other: &Domain) -> f64 {
373        self.load.xfer_between(&other.load)
374    }
375}
376
377impl LoadOrdered for Domain {
378    fn get_load(&self) -> OrderedFloat<f64> {
379        self.load.load_sum
380    }
381}
382impl_ord_for_type!(Domain);
383
384#[derive(Debug)]
385struct NumaNode {
386    id: usize,
387    load: LoadEntity,
388    domains: SortedVec<Domain>,
389}
390
391impl NumaNode {
392    const LOAD_IMBAL_HIGH_RATIO: f64 = 0.17;
393    const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
394    const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
395
396    fn new(id: usize, numa_load_avg: f64) -> Self {
397        Self {
398            id,
399            load: LoadEntity::new(
400                NumaNode::LOAD_IMBAL_HIGH_RATIO,
401                NumaNode::LOAD_IMBAL_PUSH_MAX_RATIO,
402                NumaNode::LOAD_IMBAL_XFER_TARGET_RATIO,
403                0.0f64,
404                numa_load_avg,
405            ),
406            domains: SortedVec::new(),
407        }
408    }
409
410    fn allocate_domain(&mut self, id: usize, load: f64, dom_load_avg: f64) {
411        let domain = Domain::new(id, load, dom_load_avg);
412
413        self.insert_domain(domain);
414        self.load.rebalance(self.load.load_sum() + load);
415    }
416
417    fn xfer_between(&self, other: &NumaNode) -> f64 {
418        self.load.xfer_between(&other.load)
419    }
420
421    fn insert_domain(&mut self, domain: Domain) {
422        self.domains.insert(domain);
423    }
424
425    fn update_load(&mut self, delta: f64) {
426        self.load.add_load(delta);
427    }
428
429    fn stats(&self) -> NodeStats {
430        let mut stats = NodeStats::new(
431            self.load.load_sum(),
432            self.load.imbal(),
433            self.load.delta(),
434            BTreeMap::new(),
435        );
436        for dom in self.domains.iter() {
437            stats.doms.insert(
438                dom.id,
439                DomainStats::new(dom.load.load_sum(), dom.load.imbal(), dom.load.delta()),
440            );
441        }
442        stats
443    }
444}
445
446impl LoadOrdered for NumaNode {
447    fn get_load(&self) -> OrderedFloat<f64> {
448        self.load.load_sum
449    }
450}
451impl_ord_for_type!(NumaNode);
452
453pub struct LoadBalancer<'a, 'b> {
454    skel: &'a mut BpfSkel<'b>,
455    dom_group: Arc<DomainGroup>,
456    skip_kworkers: bool,
457
458    infeas_threshold: f64,
459
460    nodes: SortedVec<NumaNode>,
461
462    lb_apply_weight: bool,
463    balance_load: bool,
464}
465
466const_assert_eq!(
469    bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS,
470    0
471);
472
473impl<'a, 'b> LoadBalancer<'a, 'b> {
474    pub fn new(
475        skel: &'a mut BpfSkel<'b>,
476        dom_group: Arc<DomainGroup>,
477        skip_kworkers: bool,
478        lb_apply_weight: bool,
479        balance_load: bool,
480    ) -> Self {
481        Self {
482            skel,
483            skip_kworkers,
484
485            infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
486
487            nodes: SortedVec::new(),
488
489            lb_apply_weight,
490            balance_load,
491
492            dom_group,
493        }
494    }
495
496    pub fn load_balance(&mut self) -> Result<()> {
500        self.create_domain_hierarchy()?;
501
502        if self.balance_load {
503            self.perform_balancing()?
504        }
505
506        Ok(())
507    }
508
509    pub fn get_stats(&self) -> BTreeMap<usize, NodeStats> {
510        self.nodes
511            .iter()
512            .map(|node| (node.id, node.stats()))
513            .collect()
514    }
515
516    fn create_domain_hierarchy(&mut self) -> Result<()> {
517        let ledger = self.calculate_load_avgs()?;
518
519        let (dom_loads, total_load) = if !self.lb_apply_weight {
520            (
521                ledger
522                    .dom_dcycle_sums()
523                    .iter()
524                    .copied()
525                    .map(|d| DEFAULT_WEIGHT * d)
526                    .collect(),
527                DEFAULT_WEIGHT * ledger.global_dcycle_sum(),
528            )
529        } else {
530            self.infeas_threshold = ledger.effective_max_weight();
531            (ledger.dom_load_sums().to_vec(), ledger.global_load_sum())
532        };
533
534        let num_numa_nodes = self.dom_group.nr_nodes();
535        let numa_load_avg = total_load / num_numa_nodes as f64;
536
537        let mut nodes: Vec<NumaNode> = (0..num_numa_nodes)
538            .map(|id| NumaNode::new(id, numa_load_avg))
539            .collect();
540
541        let dom_load_avg = total_load / dom_loads.len() as f64;
542        for (dom_id, load) in dom_loads.iter().enumerate() {
543            let numa_id = self
544                .dom_group
545                .dom_numa_id(&dom_id)
546                .ok_or_else(|| anyhow!("Failed to get NUMA ID for domain {}", dom_id))?;
547
548            if numa_id >= num_numa_nodes {
549                bail!("NUMA ID {} exceeds maximum {}", numa_id, num_numa_nodes);
550            }
551
552            let node = &mut nodes[numa_id];
553            node.allocate_domain(dom_id, *load, dom_load_avg);
554        }
555
556        self.nodes = SortedVec::from_unsorted(nodes);
557
558        Ok(())
559    }
560
561    fn calculate_load_avgs(&mut self) -> Result<LoadLedger> {
562        const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
563        let now_mono = now_monotonic();
564        let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
565
566        let mut aggregator =
567            LoadAggregator::new(self.dom_group.weight(), !self.lb_apply_weight.clone());
568
569        for (dom_id, dom) in self.dom_group.doms() {
570            aggregator.init_domain(*dom_id);
571
572            let dom_ctx = dom.ctx().unwrap();
573
574            for bucket in 0..NUM_BUCKETS {
575                let bucket_ctx = &dom_ctx.buckets[bucket as usize];
576                let rd = &bucket_ctx.rd;
577                let duty_cycle = ravg_read(
578                    rd.val,
579                    rd.val_at,
580                    rd.old,
581                    rd.cur,
582                    now_mono,
583                    load_half_life,
584                    RAVG_FRAC_BITS,
585                );
586
587                if duty_cycle == 0.0f64 {
588                    continue;
589                }
590
591                let weight = self.bucket_weight(bucket);
592                aggregator.record_dom_load(*dom_id, weight, duty_cycle)?;
593            }
594        }
595
596        Ok(aggregator.calculate())
597    }
598
599    fn bucket_range(&self, bucket: u64) -> (f64, f64) {
600        const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64;
601        const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
602        const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS;
603
604        if bucket >= NUM_BUCKETS {
605            panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS);
606        }
607
608        let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
610        let max_w = min_w + WEIGHT_PER_BUCKET - 1;
611
612        (min_w as f64, max_w as f64)
613    }
614
615    fn bucket_weight(&self, bucket: u64) -> usize {
616        const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
617        let (min_weight, _) = self.bucket_range(bucket);
618
619        (min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).ceil() as usize
621    }
622
623    fn populate_tasks_by_load(&mut self, dom: &mut Domain) -> Result<()> {
626        if dom.queried_tasks {
627            return Ok(());
628        }
629        dom.queried_tasks = true;
630
631        const MAX_TPTRS: u64 = bpf_intf::consts_MAX_DOM_ACTIVE_TPTRS as u64;
633        let dom_ctx = unsafe { &mut *self.skel.maps.bss_data.as_mut().unwrap().dom_ctxs[dom.id] };
634        let active_tasks = &mut dom_ctx.active_tasks;
635
636        let (mut ridx, widx) = (active_tasks.read_idx, active_tasks.write_idx);
637        active_tasks.read_idx = active_tasks.write_idx;
638        active_tasks.genn += 1;
639
640        if widx - ridx > MAX_TPTRS {
641            ridx = widx - MAX_TPTRS;
642        }
643
644        let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
646        let now_mono = now_monotonic();
647
648        for idx in ridx..widx {
649            let taskc_p = active_tasks.tasks[(idx % MAX_TPTRS) as usize];
650            let taskc = unsafe { &mut *taskc_p };
651
652            if taskc.target_dom as usize != dom.id {
653                continue;
654            }
655
656            let rd = &taskc.dcyc_rd;
657            let mut load = ravg_read(
658                rd.val,
659                rd.val_at,
660                rd.old,
661                rd.cur,
662                now_mono,
663                load_half_life,
664                RAVG_FRAC_BITS,
665            );
666
667            let weight = if self.lb_apply_weight {
668                (taskc.weight as f64).min(self.infeas_threshold)
669            } else {
670                DEFAULT_WEIGHT
671            };
672            load *= weight;
673
674            dom.tasks.insert(TaskInfo {
675                taskc_p,
676                load: OrderedFloat(load),
677                dom_mask: taskc.dom_mask,
678                preferred_dom_mask: taskc.preferred_dom_mask,
679                migrated: Cell::new(false),
680                is_kworker: unsafe { taskc.is_kworker.assume_init() },
681            });
682        }
683
684        Ok(())
685    }
686
687    fn find_first_candidate<'d, I>(tasks_by_load: I) -> Option<&'d TaskInfo>
690    where
691        I: IntoIterator<Item = &'d TaskInfo>,
692    {
693        tasks_by_load.into_iter().next()
694    }
695
696    fn try_find_move_task(
700        &mut self,
701        (push_dom, to_push): (&mut Domain, f64),
702        (pull_dom, to_pull): (&mut Domain, f64),
703        task_filter: impl Fn(&TaskInfo, u32) -> bool,
704        to_xfer: f64,
705    ) -> Result<Option<f64>> {
706        let to_pull = to_pull.abs();
707        let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs();
708
709        self.populate_tasks_by_load(push_dom)?;
710
711        let pull_dom_id: u32 = pull_dom.id.try_into().unwrap();
719        let tasks: Vec<TaskInfo> = std::mem::take(&mut push_dom.tasks)
720            .into_vec()
721            .into_iter()
722            .filter(|task| {
723                task.dom_mask & (1 << pull_dom_id) != 0
724                    && !(self.skip_kworkers && task.is_kworker)
725                    && !task.migrated.get()
726            })
727            .collect();
728
729        let (task, new_imbal) = match (
730            Self::find_first_candidate(
731                tasks
732                    .as_slice()
733                    .iter()
734                    .filter(|x| x.load <= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id))
735                    .rev(),
736            ),
737            Self::find_first_candidate(
738                tasks
739                    .as_slice()
740                    .iter()
741                    .filter(|x| x.load >= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id)),
742            ),
743        ) {
744            (None, None) => {
745                std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
746                return Ok(None);
747            }
748            (Some(task), None) | (None, Some(task)) => (task, calc_new_imbal(*task.load)),
749            (Some(task0), Some(task1)) => {
750                let (new_imbal0, new_imbal1) =
751                    (calc_new_imbal(*task0.load), calc_new_imbal(*task1.load));
752                if new_imbal0 <= new_imbal1 {
753                    (task0, new_imbal0)
754                } else {
755                    (task1, new_imbal1)
756                }
757            }
758        };
759
760        let old_imbal = to_push + to_pull;
763        if old_imbal < new_imbal {
764            std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
765            return Ok(None);
766        }
767
768        let load = *(task.load);
769        let taskc_p = task.taskc_p;
770        task.migrated.set(true);
771        std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
772
773        push_dom.transfer_load(load, unsafe { &mut *taskc_p }, pull_dom);
774        Ok(Some(load))
775    }
776
777    fn transfer_between_nodes(
778        &mut self,
779        push_node: &mut NumaNode,
780        pull_node: &mut NumaNode,
781    ) -> Result<f64> {
782        debug!("Inter node {} -> {} started", push_node.id, pull_node.id);
783
784        let push_imbal = push_node.load.imbal();
785        let pull_imbal = pull_node.load.imbal();
786        let xfer = push_node.xfer_between(pull_node);
787
788        if push_imbal <= 0.0f64 || pull_imbal >= 0.0f64 {
789            bail!(
790                "push node {}:{}, pull node {}:{}",
791                push_node.id,
792                push_imbal,
793                pull_node.id,
794                pull_imbal
795            );
796        }
797        let mut pushers = VecDeque::with_capacity(push_node.domains.len());
798        let mut pullers = Vec::with_capacity(pull_node.domains.len());
799        let mut pushed = 0f64;
800
801        while push_node.domains.len() > 0 {
802            let mut push_dom = push_node.domains.pop().unwrap();
804            if push_dom.load.state() != BalanceState::NeedsPush {
805                push_node.domains.insert(push_dom);
806                break;
807            }
808
809            while pull_node.domains.len() > 0 {
810                let mut pull_dom = pull_node.domains.remove_index(0);
811                if pull_dom.load.state() != BalanceState::NeedsPull {
812                    pull_node.domains.insert(pull_dom);
813                    break;
814                }
815                let mut transferred = self.try_find_move_task(
816                    (&mut push_dom, push_imbal),
817                    (&mut pull_dom, pull_imbal),
818                    |task: &TaskInfo, pull_dom: u32| -> bool {
819                        (task.preferred_dom_mask & (1 << pull_dom)) > 0
820                    },
821                    xfer,
822                )?;
823                if transferred.is_none() {
824                    transferred = self.try_find_move_task(
825                        (&mut push_dom, push_imbal),
826                        (&mut pull_dom, pull_imbal),
827                        |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
828                        xfer,
829                    )?;
830                }
831
832                pullers.push(pull_dom);
833                if let Some(transferred) = transferred {
834                    pushed = transferred;
835                    push_node.update_load(-transferred);
836                    pull_node.update_load(transferred);
837                    break;
838                }
839            }
840            while let Some(puller) = pullers.pop() {
841                pull_node.domains.insert(puller);
842            }
843            pushers.push_back(push_dom);
844            if pushed > 0.0f64 {
845                break;
846            }
847        }
848        while let Some(pusher) = pushers.pop_front() {
849            push_node.domains.insert(pusher);
850        }
851
852        Ok(pushed)
853    }
854
855    fn balance_between_nodes(&mut self) -> Result<()> {
856        debug!("Node <-> Node LB started");
857
858        let mut pushers = VecDeque::with_capacity(self.nodes.len());
900        let mut pullers = Vec::with_capacity(self.nodes.len());
901
902        while self.nodes.len() >= 2 {
903            let mut push_node = self.nodes.pop().unwrap();
905            if push_node.load.state() != BalanceState::NeedsPush {
906                self.nodes.insert(push_node);
907                break;
908            }
909
910            let push_cutoff = push_node.load.push_cutoff();
911            let mut pushed = 0f64;
912            while self.nodes.len() > 0 && pushed < push_cutoff {
913                let mut pull_node = self.nodes.remove_index(0);
915                let pull_id = pull_node.id;
916                if pull_node.load.state() != BalanceState::NeedsPull {
917                    self.nodes.insert(pull_node);
918                    break;
919                }
920                let migrated = self.transfer_between_nodes(&mut push_node, &mut pull_node)?;
921                pullers.push(pull_node);
922                if migrated > 0.0f64 {
923                    pushed += migrated;
928                    debug!(
929                        "NODE {} sending {:.06} --> NODE {}",
930                        push_node.id, migrated, pull_id
931                    );
932                }
933            }
934            while let Some(puller) = pullers.pop() {
935                self.nodes.insert(puller);
936            }
937
938            if pushed > 0.0f64 {
939                debug!("NODE {} pushed {:.06} total load", push_node.id, pushed);
940            }
941            pushers.push_back(push_node);
942        }
943
944        while let Some(pusher) = pushers.pop_front() {
945            self.nodes.insert(pusher);
946        }
947
948        Ok(())
949    }
950
951    fn balance_within_node(&mut self, node: &mut NumaNode) -> Result<()> {
952        if node.domains.len() < 2 {
953            return Ok(());
954        }
955
956        debug!("Intra node {} LB started", node.id);
957
958        let mut pushers = VecDeque::with_capacity(node.domains.len());
963        let mut pullers = Vec::new();
964
965        while node.domains.len() >= 2 {
966            let mut push_dom = node.domains.pop().unwrap();
967            if node.domains.len() == 0 || push_dom.load.state() != BalanceState::NeedsPush {
968                node.domains.insert(push_dom);
969                break;
970            }
971
972            let mut pushed = 0.0f64;
973            let push_cutoff = push_dom.load.push_cutoff();
974            let push_imbal = push_dom.load.imbal();
975            if push_imbal < 0.0f64 {
976                bail!(
977                    "Node {} push dom {} had imbal {}",
978                    node.id,
979                    push_dom.id,
980                    push_imbal
981                );
982            }
983
984            while node.domains.len() > 0 && pushed < push_cutoff {
985                let mut pull_dom = node.domains.remove_index(0);
986                if pull_dom.load.state() != BalanceState::NeedsPull {
987                    node.domains.push(pull_dom);
988                    break;
989                }
990                let pull_imbal = pull_dom.load.imbal();
991                if pull_imbal >= 0.0f64 {
992                    bail!(
993                        "Node {} pull dom {} had imbal {}",
994                        node.id,
995                        pull_dom.id,
996                        pull_imbal
997                    );
998                }
999                let xfer = push_dom.xfer_between(&pull_dom);
1000                let mut transferred = self.try_find_move_task(
1001                    (&mut push_dom, push_imbal),
1002                    (&mut pull_dom, pull_imbal),
1003                    |task: &TaskInfo, pull_dom: u32| -> bool {
1004                        (task.preferred_dom_mask & (1 << pull_dom)) > 0
1005                    },
1006                    xfer,
1007                )?;
1008                if transferred.is_none() {
1009                    transferred = self.try_find_move_task(
1010                        (&mut push_dom, push_imbal),
1011                        (&mut pull_dom, pull_imbal),
1012                        |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
1013                        xfer,
1014                    )?;
1015                }
1016
1017                if let Some(transferred) = transferred {
1018                    if transferred <= 0.0f64 {
1019                        bail!("Expected nonzero load transfer")
1020                    }
1021                    pushed += transferred;
1022                    node.domains.insert(pull_dom);
1028                    continue;
1029                }
1030
1031                pullers.push(pull_dom);
1033            }
1034            while let Some(puller) = pullers.pop() {
1035                node.domains.insert(puller);
1036            }
1037
1038            if pushed > 0.0f64 {
1039                debug!("DOM {} pushed {:.06} total load", push_dom.id, pushed);
1040            }
1041            pushers.push_back(push_dom);
1042        }
1043        while let Some(pusher) = pushers.pop_front() {
1044            node.domains.insert(pusher);
1045        }
1046
1047        Ok(())
1048    }
1049
1050    fn perform_balancing(&mut self) -> Result<()> {
1051        if self.dom_group.nr_nodes() > 1 {
1056            self.balance_between_nodes()?;
1057        }
1058
1059        debug!("Intra node LBs started");
1063
1064        let mut nodes = std::mem::take(&mut self.nodes).into_vec();
1067        for node in nodes.iter_mut() {
1068            self.balance_within_node(node)?;
1069        }
1070        std::mem::swap(&mut self.nodes, &mut SortedVec::from_unsorted(nodes));
1071
1072        Ok(())
1073    }
1074}