1use core::cmp::Ordering;
134use std::cell::Cell;
135use std::collections::BTreeMap;
136use std::collections::VecDeque;
137use std::fmt;
138use std::sync::Arc;
139
140use anyhow::anyhow;
141use anyhow::bail;
142use anyhow::Result;
143use log::debug;
144use log::trace;
145use ordered_float::OrderedFloat;
146use scx_utils::ravg::ravg_read;
147use scx_utils::LoadAggregator;
148use scx_utils::LoadLedger;
149use sorted_vec::SortedVec;
150
151use crate::bpf_intf;
152use crate::bpf_skel::*;
153use crate::stats::DomainStats;
154use crate::stats::NodeStats;
155use crate::DomainGroup;
156
157use crate::types::dom_ctx;
158
159const DEFAULT_WEIGHT: f64 = bpf_intf::consts_LB_DEFAULT_WEIGHT as f64;
160const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
161
162fn now_monotonic() -> u64 {
163 let time = nix::time::ClockId::CLOCK_MONOTONIC
164 .now()
165 .expect("Failed getting current monotonic time");
166 time.tv_sec() as u64 * 1_000_000_000 + time.tv_nsec() as u64
167}
168
169#[derive(Clone, Copy, Debug, PartialEq)]
170enum BalanceState {
171 Balanced,
172 NeedsPush,
173 NeedsPull,
174}
175
176impl fmt::Display for BalanceState {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 match self {
179 BalanceState::Balanced => write!(f, "BALANCED"),
180 BalanceState::NeedsPush => write!(f, "OVER-LOADED"),
181 BalanceState::NeedsPull => write!(f, "UNDER-LOADED"),
182 }
183 }
184}
185
186macro_rules! impl_ord_for_type {
187 ($($t:ty),*) => {
188 $(
189 impl PartialEq for $t {
190 fn eq(&self, other: &Self) -> bool {
191 <dyn LoadOrdered>::eq(self, other)
192 }
193 }
194
195 impl Eq for $t {}
196
197 impl PartialOrd for $t {
198 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
199 <dyn LoadOrdered>::partial_cmp(self, other)
200 }
201 }
202
203 impl Ord for $t {
204 fn cmp(&self, other: &Self) -> Ordering {
205 <dyn LoadOrdered>::cmp(self, other)
206 }
207 }
208 )*
209 };
210}
211
212trait LoadOrdered {
213 fn get_load(&self) -> OrderedFloat<f64>;
214}
215
216impl dyn LoadOrdered {
217 #[inline]
218 fn eq(&self, other: &Self) -> bool {
219 self.get_load().eq(&other.get_load())
220 }
221
222 #[inline]
223 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
224 self.get_load().partial_cmp(&other.get_load())
225 }
226
227 #[inline]
228 fn cmp(&self, other: &Self) -> Ordering {
229 self.get_load().cmp(&other.get_load())
230 }
231}
232
233#[derive(Debug, Clone)]
234pub struct LoadEntity {
235 cost_ratio: f64,
236 push_max_ratio: f64,
237 xfer_ratio: f64,
238 load_sum: OrderedFloat<f64>,
239 load_avg: f64,
240 load_delta: f64,
241 bal_state: BalanceState,
242}
243
244impl LoadEntity {
245 fn new(
246 cost_ratio: f64,
247 push_max_ratio: f64,
248 xfer_ratio: f64,
249 load_sum: f64,
250 load_avg: f64,
251 ) -> Self {
252 let mut entity = Self {
253 cost_ratio,
254 push_max_ratio,
255 xfer_ratio,
256 load_sum: OrderedFloat(load_sum),
257 load_avg,
258 load_delta: 0.0f64,
259 bal_state: BalanceState::Balanced,
260 };
261 entity.add_load(0.0f64);
262 entity
263 }
264
265 pub fn load_sum(&self) -> f64 {
266 *self.load_sum
267 }
268
269 pub fn load_avg(&self) -> f64 {
270 self.load_avg
271 }
272
273 pub fn imbal(&self) -> f64 {
274 self.load_sum() - self.load_avg
275 }
276
277 pub fn delta(&self) -> f64 {
278 self.load_delta
279 }
280
281 fn state(&self) -> BalanceState {
282 self.bal_state
283 }
284
285 fn rebalance(&mut self, new_load: f64) {
286 self.load_sum = OrderedFloat(new_load);
287
288 let imbal = self.imbal();
289 let needs_balance = imbal.abs() > self.load_avg * self.cost_ratio;
290
291 self.bal_state = if needs_balance {
292 if imbal > 0f64 {
293 BalanceState::NeedsPush
294 } else {
295 BalanceState::NeedsPull
296 }
297 } else {
298 BalanceState::Balanced
299 };
300 }
301
302 fn add_load(&mut self, delta: f64) {
303 self.rebalance(self.load_sum() + delta);
304 self.load_delta += delta;
305 }
306
307 fn push_cutoff(&self) -> f64 {
308 self.imbal().abs() * self.push_max_ratio
309 }
310
311 fn xfer_between(&self, other: &LoadEntity) -> f64 {
312 self.imbal().abs().min(other.imbal().abs()) * self.xfer_ratio
313 }
314}
315
316#[derive(Debug)]
317struct TaskInfo {
318 taskc_p: *mut types::task_ctx,
319 load: OrderedFloat<f64>,
320 dom_mask: u64,
321 preferred_dom_mask: u64,
322 migrated: Cell<bool>,
323 is_kworker: bool,
324}
325
326impl LoadOrdered for TaskInfo {
327 fn get_load(&self) -> OrderedFloat<f64> {
328 self.load
329 }
330}
331impl_ord_for_type!(TaskInfo);
332
333#[derive(Debug)]
334struct Domain {
335 id: usize,
336 queried_tasks: bool,
337 load: LoadEntity,
338 tasks: SortedVec<TaskInfo>,
339}
340
341impl Domain {
342 const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;
343 const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
344 const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
345
346 fn new(id: usize, load_sum: f64, load_avg: f64) -> Self {
347 Self {
348 id,
349 queried_tasks: false,
350 load: LoadEntity::new(
351 Domain::LOAD_IMBAL_HIGH_RATIO,
352 Domain::LOAD_IMBAL_PUSH_MAX_RATIO,
353 Domain::LOAD_IMBAL_XFER_TARGET_RATIO,
354 load_sum,
355 load_avg,
356 ),
357 tasks: SortedVec::new(),
358 }
359 }
360
361 fn transfer_load(&mut self, load: f64, taskc: &mut types::task_ctx, other: &mut Domain) {
362 trace!("XFER pid={} dom={}->{}", taskc.pid, self.id, other.id);
363
364 let dom_id: u32 = other.id.try_into().unwrap();
365 taskc.target_dom = dom_id;
366
367 self.load.add_load(-load);
368 other.load.add_load(load);
369 }
370
371 fn xfer_between(&self, other: &Domain) -> f64 {
372 self.load.xfer_between(&other.load)
373 }
374}
375
376impl LoadOrdered for Domain {
377 fn get_load(&self) -> OrderedFloat<f64> {
378 self.load.load_sum
379 }
380}
381impl_ord_for_type!(Domain);
382
383#[derive(Debug)]
384struct NumaNode {
385 id: usize,
386 load: LoadEntity,
387 domains: SortedVec<Domain>,
388}
389
390impl NumaNode {
391 const LOAD_IMBAL_HIGH_RATIO: f64 = 0.17;
392 const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
393 const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
394
395 fn new(id: usize, numa_load_avg: f64) -> Self {
396 Self {
397 id,
398 load: LoadEntity::new(
399 NumaNode::LOAD_IMBAL_HIGH_RATIO,
400 NumaNode::LOAD_IMBAL_PUSH_MAX_RATIO,
401 NumaNode::LOAD_IMBAL_XFER_TARGET_RATIO,
402 0.0f64,
403 numa_load_avg,
404 ),
405 domains: SortedVec::new(),
406 }
407 }
408
409 fn allocate_domain(&mut self, id: usize, load: f64, dom_load_avg: f64) {
410 let domain = Domain::new(id, load, dom_load_avg);
411
412 self.insert_domain(domain);
413 self.load.rebalance(self.load.load_sum() + load);
414 }
415
416 fn xfer_between(&self, other: &NumaNode) -> f64 {
417 self.load.xfer_between(&other.load)
418 }
419
420 fn insert_domain(&mut self, domain: Domain) {
421 self.domains.insert(domain);
422 }
423
424 fn update_load(&mut self, delta: f64) {
425 self.load.add_load(delta);
426 }
427
428 fn stats(&self) -> NodeStats {
429 let mut stats = NodeStats::new(
430 self.load.load_sum(),
431 self.load.imbal(),
432 self.load.delta(),
433 BTreeMap::new(),
434 );
435 for dom in self.domains.iter() {
436 stats.doms.insert(
437 dom.id,
438 DomainStats::new(dom.load.load_sum(), dom.load.imbal(), dom.load.delta()),
439 );
440 }
441 stats
442 }
443}
444
445impl LoadOrdered for NumaNode {
446 fn get_load(&self) -> OrderedFloat<f64> {
447 self.load.load_sum
448 }
449}
450impl_ord_for_type!(NumaNode);
451
452pub struct LoadBalancer<'a, 'b> {
453 skel: &'a mut BpfSkel<'b>,
454 dom_group: Arc<DomainGroup>,
455 skip_kworkers: bool,
456
457 infeas_threshold: f64,
458
459 nodes: SortedVec<NumaNode>,
460
461 lb_apply_weight: bool,
462 balance_load: bool,
463}
464
465const_assert_eq!(
468 bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS,
469 0
470);
471
472impl<'a, 'b> LoadBalancer<'a, 'b> {
473 pub fn new(
474 skel: &'a mut BpfSkel<'b>,
475 dom_group: Arc<DomainGroup>,
476 skip_kworkers: bool,
477 lb_apply_weight: bool,
478 balance_load: bool,
479 ) -> Self {
480 Self {
481 skel,
482 skip_kworkers,
483
484 infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
485
486 nodes: SortedVec::new(),
487
488 lb_apply_weight,
489 balance_load,
490
491 dom_group,
492 }
493 }
494
495 pub fn load_balance(&mut self) -> Result<()> {
499 self.create_domain_hierarchy()?;
500
501 if self.balance_load {
502 self.perform_balancing()?
503 }
504
505 Ok(())
506 }
507
508 pub fn get_stats(&self) -> BTreeMap<usize, NodeStats> {
509 self.nodes
510 .iter()
511 .map(|node| (node.id, node.stats()))
512 .collect()
513 }
514
515 fn create_domain_hierarchy(&mut self) -> Result<()> {
516 let ledger = self.calculate_load_avgs()?;
517
518 let (dom_loads, total_load) = if !self.lb_apply_weight {
519 (
520 ledger
521 .dom_dcycle_sums()
522 .into_iter()
523 .map(|d| DEFAULT_WEIGHT * d)
524 .collect(),
525 DEFAULT_WEIGHT * ledger.global_dcycle_sum(),
526 )
527 } else {
528 self.infeas_threshold = ledger.effective_max_weight();
529 (ledger.dom_load_sums().to_vec(), ledger.global_load_sum())
530 };
531
532 let num_numa_nodes = self.dom_group.nr_nodes();
533 let numa_load_avg = total_load / num_numa_nodes as f64;
534
535 let mut nodes: Vec<NumaNode> = (0..num_numa_nodes)
536 .map(|id| NumaNode::new(id, numa_load_avg))
537 .collect();
538
539 let dom_load_avg = total_load / dom_loads.len() as f64;
540 for (dom_id, load) in dom_loads.iter().enumerate() {
541 let numa_id = self
542 .dom_group
543 .dom_numa_id(&dom_id)
544 .ok_or_else(|| anyhow!("Failed to get NUMA ID for domain {}", dom_id))?;
545
546 if numa_id >= num_numa_nodes {
547 bail!("NUMA ID {} exceeds maximum {}", numa_id, num_numa_nodes);
548 }
549
550 let node = &mut nodes[numa_id];
551 node.allocate_domain(dom_id, *load, dom_load_avg);
552 }
553
554 self.nodes = SortedVec::from_unsorted(nodes);
555
556 Ok(())
557 }
558
559 fn calculate_load_avgs(&mut self) -> Result<LoadLedger> {
560 const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
561 let now_mono = now_monotonic();
562 let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
563
564 let mut aggregator =
565 LoadAggregator::new(self.dom_group.weight(), !self.lb_apply_weight.clone());
566
567 for (dom_id, dom) in self.dom_group.doms() {
568 aggregator.init_domain(*dom_id);
569
570 let dom_ctx = dom.ctx().unwrap();
571
572 for bucket in 0..NUM_BUCKETS {
573 let bucket_ctx = &dom_ctx.buckets[bucket as usize];
574 let rd = &bucket_ctx.rd;
575 let duty_cycle = ravg_read(
576 rd.val,
577 rd.val_at,
578 rd.old,
579 rd.cur,
580 now_mono,
581 load_half_life,
582 RAVG_FRAC_BITS,
583 );
584
585 if duty_cycle == 0.0f64 {
586 continue;
587 }
588
589 let weight = self.bucket_weight(bucket);
590 aggregator.record_dom_load(*dom_id, weight, duty_cycle)?;
591 }
592 }
593
594 Ok(aggregator.calculate())
595 }
596
597 fn bucket_range(&self, bucket: u64) -> (f64, f64) {
598 const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64;
599 const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
600 const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS;
601
602 if bucket >= NUM_BUCKETS {
603 panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS);
604 }
605
606 let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
608 let max_w = min_w + WEIGHT_PER_BUCKET - 1;
609
610 (min_w as f64, max_w as f64)
611 }
612
613 fn bucket_weight(&self, bucket: u64) -> usize {
614 const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
615 let (min_weight, _) = self.bucket_range(bucket);
616
617 (min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).ceil() as usize
619 }
620
621 fn populate_tasks_by_load(&mut self, dom: &mut Domain) -> Result<()> {
624 if dom.queried_tasks {
625 return Ok(());
626 }
627 dom.queried_tasks = true;
628
629 const MAX_TPTRS: u64 = bpf_intf::consts_MAX_DOM_ACTIVE_TPTRS as u64;
631
632 let types::topo_level(index) = types::topo_level::TOPO_LLC;
633 let ptr = self.skel.maps.bss_data.as_ref().unwrap().topo_nodes[index as usize][dom.id];
634 let dom_ctx = unsafe {
635 &mut *std::ptr::with_exposed_provenance_mut::<dom_ctx>(ptr.try_into().unwrap())
636 };
637 let active_tasks = &mut dom_ctx.active_tasks;
638
639 let (mut ridx, widx) = (active_tasks.read_idx, active_tasks.write_idx);
640 active_tasks.read_idx = active_tasks.write_idx;
641 active_tasks.gen += 1;
642
643 if widx - ridx > MAX_TPTRS {
644 ridx = widx - MAX_TPTRS;
645 }
646
647 let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
649 let now_mono = now_monotonic();
650
651 for idx in ridx..widx {
652 let taskc_p = active_tasks.tasks[(idx % MAX_TPTRS) as usize];
653 let taskc = unsafe { &mut *taskc_p };
654
655 if taskc.target_dom as usize != dom.id {
656 continue;
657 }
658
659 let rd = &taskc.dcyc_rd;
660 let mut load = ravg_read(
661 rd.val,
662 rd.val_at,
663 rd.old,
664 rd.cur,
665 now_mono,
666 load_half_life,
667 RAVG_FRAC_BITS,
668 );
669
670 let weight = if self.lb_apply_weight {
671 (taskc.weight as f64).min(self.infeas_threshold)
672 } else {
673 DEFAULT_WEIGHT
674 };
675 load *= weight;
676
677 dom.tasks.insert(TaskInfo {
678 taskc_p,
679 load: OrderedFloat(load),
680 dom_mask: taskc.dom_mask,
681 preferred_dom_mask: taskc.preferred_dom_mask,
682 migrated: Cell::new(false),
683 is_kworker: unsafe { taskc.is_kworker.assume_init() },
684 });
685 }
686
687 Ok(())
688 }
689
690 fn find_first_candidate<'d, I>(tasks_by_load: I) -> Option<&'d TaskInfo>
693 where
694 I: IntoIterator<Item = &'d TaskInfo>,
695 {
696 tasks_by_load.into_iter().next()
697 }
698
699 fn try_find_move_task(
703 &mut self,
704 (push_dom, to_push): (&mut Domain, f64),
705 (pull_dom, to_pull): (&mut Domain, f64),
706 task_filter: impl Fn(&TaskInfo, u32) -> bool,
707 to_xfer: f64,
708 ) -> Result<Option<f64>> {
709 let to_pull = to_pull.abs();
710 let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs();
711
712 self.populate_tasks_by_load(push_dom)?;
713
714 let pull_dom_id: u32 = pull_dom.id.try_into().unwrap();
722 let tasks: Vec<TaskInfo> = std::mem::take(&mut push_dom.tasks)
723 .into_vec()
724 .into_iter()
725 .filter(|task| {
726 task.dom_mask & (1 << pull_dom_id) != 0
727 && !(self.skip_kworkers && task.is_kworker)
728 && !task.migrated.get()
729 })
730 .collect();
731
732 let (task, new_imbal) = match (
733 Self::find_first_candidate(
734 tasks
735 .as_slice()
736 .iter()
737 .filter(|x| x.load <= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id))
738 .rev(),
739 ),
740 Self::find_first_candidate(
741 tasks
742 .as_slice()
743 .iter()
744 .filter(|x| x.load >= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id)),
745 ),
746 ) {
747 (None, None) => {
748 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
749 return Ok(None);
750 }
751 (Some(task), None) | (None, Some(task)) => (task, calc_new_imbal(*task.load)),
752 (Some(task0), Some(task1)) => {
753 let (new_imbal0, new_imbal1) =
754 (calc_new_imbal(*task0.load), calc_new_imbal(*task1.load));
755 if new_imbal0 <= new_imbal1 {
756 (task0, new_imbal0)
757 } else {
758 (task1, new_imbal1)
759 }
760 }
761 };
762
763 let old_imbal = to_push + to_pull;
766 if old_imbal < new_imbal {
767 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
768 return Ok(None);
769 }
770
771 let load = *(task.load);
772 let taskc_p = task.taskc_p;
773 task.migrated.set(true);
774 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
775
776 push_dom.transfer_load(load, unsafe { &mut *taskc_p }, pull_dom);
777 Ok(Some(load))
778 }
779
780 fn transfer_between_nodes(
781 &mut self,
782 push_node: &mut NumaNode,
783 pull_node: &mut NumaNode,
784 ) -> Result<f64> {
785 debug!("Inter node {} -> {} started", push_node.id, pull_node.id);
786
787 let push_imbal = push_node.load.imbal();
788 let pull_imbal = pull_node.load.imbal();
789 let xfer = push_node.xfer_between(pull_node);
790
791 if push_imbal <= 0.0f64 || pull_imbal >= 0.0f64 {
792 bail!(
793 "push node {}:{}, pull node {}:{}",
794 push_node.id,
795 push_imbal,
796 pull_node.id,
797 pull_imbal
798 );
799 }
800 let mut pushers = VecDeque::with_capacity(push_node.domains.len());
801 let mut pullers = Vec::with_capacity(pull_node.domains.len());
802 let mut pushed = 0f64;
803
804 while push_node.domains.len() > 0 {
805 let mut push_dom = push_node.domains.pop().unwrap();
807 if push_dom.load.state() != BalanceState::NeedsPush {
808 push_node.domains.insert(push_dom);
809 break;
810 }
811
812 while pull_node.domains.len() > 0 {
813 let mut pull_dom = pull_node.domains.remove_index(0);
814 if pull_dom.load.state() != BalanceState::NeedsPull {
815 pull_node.domains.insert(pull_dom);
816 break;
817 }
818 let mut transferred = self.try_find_move_task(
819 (&mut push_dom, push_imbal),
820 (&mut pull_dom, pull_imbal),
821 |task: &TaskInfo, pull_dom: u32| -> bool {
822 (task.preferred_dom_mask & (1 << pull_dom)) > 0
823 },
824 xfer,
825 )?;
826 if transferred.is_none() {
827 transferred = self.try_find_move_task(
828 (&mut push_dom, push_imbal),
829 (&mut pull_dom, pull_imbal),
830 |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
831 xfer,
832 )?;
833 }
834
835 pullers.push(pull_dom);
836 if let Some(transferred) = transferred {
837 pushed = transferred;
838 push_node.update_load(-transferred);
839 pull_node.update_load(transferred);
840 break;
841 }
842 }
843 while let Some(puller) = pullers.pop() {
844 pull_node.domains.insert(puller);
845 }
846 pushers.push_back(push_dom);
847 if pushed > 0.0f64 {
848 break;
849 }
850 }
851 while let Some(pusher) = pushers.pop_front() {
852 push_node.domains.insert(pusher);
853 }
854
855 Ok(pushed)
856 }
857
858 fn balance_between_nodes(&mut self) -> Result<()> {
859 debug!("Node <-> Node LB started");
860
861 let mut pushers = VecDeque::with_capacity(self.nodes.len());
903 let mut pullers = Vec::with_capacity(self.nodes.len());
904
905 while self.nodes.len() >= 2 {
906 let mut push_node = self.nodes.pop().unwrap();
908 if push_node.load.state() != BalanceState::NeedsPush {
909 self.nodes.insert(push_node);
910 break;
911 }
912
913 let push_cutoff = push_node.load.push_cutoff();
914 let mut pushed = 0f64;
915 while self.nodes.len() > 0 && pushed < push_cutoff {
916 let mut pull_node = self.nodes.remove_index(0);
918 let pull_id = pull_node.id;
919 if pull_node.load.state() != BalanceState::NeedsPull {
920 self.nodes.insert(pull_node);
921 break;
922 }
923 let migrated = self.transfer_between_nodes(&mut push_node, &mut pull_node)?;
924 pullers.push(pull_node);
925 if migrated > 0.0f64 {
926 pushed += migrated;
931 debug!(
932 "NODE {} sending {:.06} --> NODE {}",
933 push_node.id, migrated, pull_id
934 );
935 }
936 }
937 while let Some(puller) = pullers.pop() {
938 self.nodes.insert(puller);
939 }
940
941 if pushed > 0.0f64 {
942 debug!("NODE {} pushed {:.06} total load", push_node.id, pushed);
943 }
944 pushers.push_back(push_node);
945 }
946
947 while !pushers.is_empty() {
948 self.nodes.insert(pushers.pop_front().unwrap());
949 }
950
951 Ok(())
952 }
953
954 fn balance_within_node(&mut self, node: &mut NumaNode) -> Result<()> {
955 if node.domains.len() < 2 {
956 return Ok(());
957 }
958
959 debug!("Intra node {} LB started", node.id);
960
961 let mut pushers = VecDeque::with_capacity(node.domains.len());
966 let mut pullers = Vec::new();
967
968 while node.domains.len() >= 2 {
969 let mut push_dom = node.domains.pop().unwrap();
970 if node.domains.len() == 0 || push_dom.load.state() != BalanceState::NeedsPush {
971 node.domains.insert(push_dom);
972 break;
973 }
974
975 let mut pushed = 0.0f64;
976 let push_cutoff = push_dom.load.push_cutoff();
977 let push_imbal = push_dom.load.imbal();
978 if push_imbal < 0.0f64 {
979 bail!(
980 "Node {} push dom {} had imbal {}",
981 node.id,
982 push_dom.id,
983 push_imbal
984 );
985 }
986
987 while node.domains.len() > 0 && pushed < push_cutoff {
988 let mut pull_dom = node.domains.remove_index(0);
989 if pull_dom.load.state() != BalanceState::NeedsPull {
990 node.domains.push(pull_dom);
991 break;
992 }
993 let pull_imbal = pull_dom.load.imbal();
994 if pull_imbal >= 0.0f64 {
995 bail!(
996 "Node {} pull dom {} had imbal {}",
997 node.id,
998 pull_dom.id,
999 pull_imbal
1000 );
1001 }
1002 let xfer = push_dom.xfer_between(&pull_dom);
1003 let mut transferred = self.try_find_move_task(
1004 (&mut push_dom, push_imbal),
1005 (&mut pull_dom, pull_imbal),
1006 |task: &TaskInfo, pull_dom: u32| -> bool {
1007 (task.preferred_dom_mask & (1 << pull_dom)) > 0
1008 },
1009 xfer,
1010 )?;
1011 if transferred.is_none() {
1012 transferred = self.try_find_move_task(
1013 (&mut push_dom, push_imbal),
1014 (&mut pull_dom, pull_imbal),
1015 |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
1016 xfer,
1017 )?;
1018 }
1019
1020 if let Some(transferred) = transferred {
1021 if transferred <= 0.0f64 {
1022 bail!("Expected nonzero load transfer")
1023 }
1024 pushed += transferred;
1025 node.domains.insert(pull_dom);
1031 continue;
1032 }
1033
1034 pullers.push(pull_dom);
1036 }
1037 while let Some(puller) = pullers.pop() {
1038 node.domains.insert(puller);
1039 }
1040
1041 if pushed > 0.0f64 {
1042 debug!("DOM {} pushed {:.06} total load", push_dom.id, pushed);
1043 }
1044 pushers.push_back(push_dom);
1045 }
1046 while let Some(pusher) = pushers.pop_front() {
1047 node.domains.insert(pusher);
1048 }
1049
1050 Ok(())
1051 }
1052
1053 fn perform_balancing(&mut self) -> Result<()> {
1054 if self.dom_group.nr_nodes() > 1 {
1059 self.balance_between_nodes()?;
1060 }
1061
1062 debug!("Intra node LBs started");
1066
1067 let mut nodes = std::mem::take(&mut self.nodes).into_vec();
1070 for node in nodes.iter_mut() {
1071 self.balance_within_node(node)?;
1072 }
1073 std::mem::swap(&mut self.nodes, &mut SortedVec::from_unsorted(nodes));
1074
1075 Ok(())
1076 }
1077}