1use core::cmp::Ordering;
134use std::cell::Cell;
135use std::collections::BTreeMap;
136use std::collections::VecDeque;
137use std::fmt;
138use std::sync::Arc;
139
140use anyhow::anyhow;
141use anyhow::bail;
142use anyhow::Result;
143use log::debug;
144use log::trace;
145use ordered_float::OrderedFloat;
146use scx_utils::ravg::ravg_read;
147use scx_utils::LoadAggregator;
148use scx_utils::LoadLedger;
149use sorted_vec::SortedVec;
150
151use crate::bpf_intf;
152use crate::bpf_skel::*;
153use crate::stats::DomainStats;
154use crate::stats::NodeStats;
155use crate::DomainGroup;
156
157const DEFAULT_WEIGHT: f64 = bpf_intf::consts_LB_DEFAULT_WEIGHT as f64;
158const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
159
160fn now_monotonic() -> u64 {
161 let mut time = libc::timespec {
162 tv_sec: 0,
163 tv_nsec: 0,
164 };
165 let ret = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut time) };
166 assert!(ret == 0);
167 time.tv_sec as u64 * 1_000_000_000 + time.tv_nsec as u64
168}
169
170#[derive(Clone, Copy, Debug, PartialEq)]
171enum BalanceState {
172 Balanced,
173 NeedsPush,
174 NeedsPull,
175}
176
177impl fmt::Display for BalanceState {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 BalanceState::Balanced => write!(f, "BALANCED"),
181 BalanceState::NeedsPush => write!(f, "OVER-LOADED"),
182 BalanceState::NeedsPull => write!(f, "UNDER-LOADED"),
183 }
184 }
185}
186
187macro_rules! impl_ord_for_type {
188 ($($t:ty),*) => {
189 $(
190 impl PartialEq for $t {
191 fn eq(&self, other: &Self) -> bool {
192 <dyn LoadOrdered>::eq(self, other)
193 }
194 }
195
196 impl Eq for $t {}
197
198 impl PartialOrd for $t {
199 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200 <dyn LoadOrdered>::partial_cmp(self, other)
201 }
202 }
203
204 impl Ord for $t {
205 fn cmp(&self, other: &Self) -> Ordering {
206 <dyn LoadOrdered>::cmp(self, other)
207 }
208 }
209 )*
210 };
211}
212
213trait LoadOrdered {
214 fn get_load(&self) -> OrderedFloat<f64>;
215}
216
217impl dyn LoadOrdered {
218 #[inline]
219 fn eq(&self, other: &Self) -> bool {
220 self.get_load().eq(&other.get_load())
221 }
222
223 #[inline]
224 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225 self.get_load().partial_cmp(&other.get_load())
226 }
227
228 #[inline]
229 fn cmp(&self, other: &Self) -> Ordering {
230 self.get_load().cmp(&other.get_load())
231 }
232}
233
234#[derive(Debug, Clone)]
235pub struct LoadEntity {
236 cost_ratio: f64,
237 push_max_ratio: f64,
238 xfer_ratio: f64,
239 load_sum: OrderedFloat<f64>,
240 load_avg: f64,
241 load_delta: f64,
242 bal_state: BalanceState,
243}
244
245impl LoadEntity {
246 fn new(
247 cost_ratio: f64,
248 push_max_ratio: f64,
249 xfer_ratio: f64,
250 load_sum: f64,
251 load_avg: f64,
252 ) -> Self {
253 let mut entity = Self {
254 cost_ratio,
255 push_max_ratio,
256 xfer_ratio,
257 load_sum: OrderedFloat(load_sum),
258 load_avg,
259 load_delta: 0.0f64,
260 bal_state: BalanceState::Balanced,
261 };
262 entity.add_load(0.0f64);
263 entity
264 }
265
266 pub fn load_sum(&self) -> f64 {
267 *self.load_sum
268 }
269
270 pub fn load_avg(&self) -> f64 {
271 self.load_avg
272 }
273
274 pub fn imbal(&self) -> f64 {
275 self.load_sum() - self.load_avg
276 }
277
278 pub fn delta(&self) -> f64 {
279 self.load_delta
280 }
281
282 fn state(&self) -> BalanceState {
283 self.bal_state
284 }
285
286 fn rebalance(&mut self, new_load: f64) {
287 self.load_sum = OrderedFloat(new_load);
288
289 let imbal = self.imbal();
290 let needs_balance = imbal.abs() > self.load_avg * self.cost_ratio;
291
292 self.bal_state = if needs_balance {
293 if imbal > 0f64 {
294 BalanceState::NeedsPush
295 } else {
296 BalanceState::NeedsPull
297 }
298 } else {
299 BalanceState::Balanced
300 };
301 }
302
303 fn add_load(&mut self, delta: f64) {
304 self.rebalance(self.load_sum() + delta);
305 self.load_delta += delta;
306 }
307
308 fn push_cutoff(&self) -> f64 {
309 self.imbal().abs() * self.push_max_ratio
310 }
311
312 fn xfer_between(&self, other: &LoadEntity) -> f64 {
313 self.imbal().abs().min(other.imbal().abs()) * self.xfer_ratio
314 }
315}
316
317#[derive(Debug)]
318struct TaskInfo {
319 taskc_p: *mut types::task_ctx,
320 load: OrderedFloat<f64>,
321 dom_mask: u64,
322 preferred_dom_mask: u64,
323 migrated: Cell<bool>,
324 is_kworker: bool,
325}
326
327impl LoadOrdered for TaskInfo {
328 fn get_load(&self) -> OrderedFloat<f64> {
329 self.load
330 }
331}
332impl_ord_for_type!(TaskInfo);
333
334#[derive(Debug)]
335struct Domain {
336 id: usize,
337 queried_tasks: bool,
338 load: LoadEntity,
339 tasks: SortedVec<TaskInfo>,
340}
341
342impl Domain {
343 const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;
344 const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
345 const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
346
347 fn new(id: usize, load_sum: f64, load_avg: f64) -> Self {
348 Self {
349 id,
350 queried_tasks: false,
351 load: LoadEntity::new(
352 Domain::LOAD_IMBAL_HIGH_RATIO,
353 Domain::LOAD_IMBAL_PUSH_MAX_RATIO,
354 Domain::LOAD_IMBAL_XFER_TARGET_RATIO,
355 load_sum,
356 load_avg,
357 ),
358 tasks: SortedVec::new(),
359 }
360 }
361
362 fn transfer_load(&mut self, load: f64, taskc: &mut types::task_ctx, other: &mut Domain) {
363 trace!("XFER pid={} dom={}->{}", taskc.pid, self.id, other.id);
364
365 let dom_id: u32 = other.id.try_into().unwrap();
366 taskc.target_dom = dom_id;
367
368 self.load.add_load(-load);
369 other.load.add_load(load);
370 }
371
372 fn xfer_between(&self, other: &Domain) -> f64 {
373 self.load.xfer_between(&other.load)
374 }
375}
376
377impl LoadOrdered for Domain {
378 fn get_load(&self) -> OrderedFloat<f64> {
379 self.load.load_sum
380 }
381}
382impl_ord_for_type!(Domain);
383
384#[derive(Debug)]
385struct NumaNode {
386 id: usize,
387 load: LoadEntity,
388 domains: SortedVec<Domain>,
389}
390
391impl NumaNode {
392 const LOAD_IMBAL_HIGH_RATIO: f64 = 0.17;
393 const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
394 const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
395
396 fn new(id: usize, numa_load_avg: f64) -> Self {
397 Self {
398 id,
399 load: LoadEntity::new(
400 NumaNode::LOAD_IMBAL_HIGH_RATIO,
401 NumaNode::LOAD_IMBAL_PUSH_MAX_RATIO,
402 NumaNode::LOAD_IMBAL_XFER_TARGET_RATIO,
403 0.0f64,
404 numa_load_avg,
405 ),
406 domains: SortedVec::new(),
407 }
408 }
409
410 fn allocate_domain(&mut self, id: usize, load: f64, dom_load_avg: f64) {
411 let domain = Domain::new(id, load, dom_load_avg);
412
413 self.insert_domain(domain);
414 self.load.rebalance(self.load.load_sum() + load);
415 }
416
417 fn xfer_between(&self, other: &NumaNode) -> f64 {
418 self.load.xfer_between(&other.load)
419 }
420
421 fn insert_domain(&mut self, domain: Domain) {
422 self.domains.insert(domain);
423 }
424
425 fn update_load(&mut self, delta: f64) {
426 self.load.add_load(delta);
427 }
428
429 fn stats(&self) -> NodeStats {
430 let mut stats = NodeStats::new(
431 self.load.load_sum(),
432 self.load.imbal(),
433 self.load.delta(),
434 BTreeMap::new(),
435 );
436 for dom in self.domains.iter() {
437 stats.doms.insert(
438 dom.id,
439 DomainStats::new(dom.load.load_sum(), dom.load.imbal(), dom.load.delta()),
440 );
441 }
442 stats
443 }
444}
445
446impl LoadOrdered for NumaNode {
447 fn get_load(&self) -> OrderedFloat<f64> {
448 self.load.load_sum
449 }
450}
451impl_ord_for_type!(NumaNode);
452
453pub struct LoadBalancer<'a, 'b> {
454 skel: &'a mut BpfSkel<'b>,
455 dom_group: Arc<DomainGroup>,
456 skip_kworkers: bool,
457
458 infeas_threshold: f64,
459
460 nodes: SortedVec<NumaNode>,
461
462 lb_apply_weight: bool,
463 balance_load: bool,
464}
465
466const_assert_eq!(
469 bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS,
470 0
471);
472
473impl<'a, 'b> LoadBalancer<'a, 'b> {
474 pub fn new(
475 skel: &'a mut BpfSkel<'b>,
476 dom_group: Arc<DomainGroup>,
477 skip_kworkers: bool,
478 lb_apply_weight: bool,
479 balance_load: bool,
480 ) -> Self {
481 Self {
482 skel,
483 skip_kworkers,
484
485 infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
486
487 nodes: SortedVec::new(),
488
489 lb_apply_weight,
490 balance_load,
491
492 dom_group,
493 }
494 }
495
496 pub fn load_balance(&mut self) -> Result<()> {
500 self.create_domain_hierarchy()?;
501
502 if self.balance_load {
503 self.perform_balancing()?
504 }
505
506 Ok(())
507 }
508
509 pub fn get_stats(&self) -> BTreeMap<usize, NodeStats> {
510 self.nodes
511 .iter()
512 .map(|node| (node.id, node.stats()))
513 .collect()
514 }
515
516 fn create_domain_hierarchy(&mut self) -> Result<()> {
517 let ledger = self.calculate_load_avgs()?;
518
519 let (dom_loads, total_load) = if !self.lb_apply_weight {
520 (
521 ledger
522 .dom_dcycle_sums()
523 .iter()
524 .copied()
525 .map(|d| DEFAULT_WEIGHT * d)
526 .collect(),
527 DEFAULT_WEIGHT * ledger.global_dcycle_sum(),
528 )
529 } else {
530 self.infeas_threshold = ledger.effective_max_weight();
531 (ledger.dom_load_sums().to_vec(), ledger.global_load_sum())
532 };
533
534 let num_numa_nodes = self.dom_group.nr_nodes();
535 let numa_load_avg = total_load / num_numa_nodes as f64;
536
537 let mut nodes: Vec<NumaNode> = (0..num_numa_nodes)
538 .map(|id| NumaNode::new(id, numa_load_avg))
539 .collect();
540
541 let dom_load_avg = total_load / dom_loads.len() as f64;
542 for (dom_id, load) in dom_loads.iter().enumerate() {
543 let numa_id = self
544 .dom_group
545 .dom_numa_id(&dom_id)
546 .ok_or_else(|| anyhow!("Failed to get NUMA ID for domain {}", dom_id))?;
547
548 if numa_id >= num_numa_nodes {
549 bail!("NUMA ID {} exceeds maximum {}", numa_id, num_numa_nodes);
550 }
551
552 let node = &mut nodes[numa_id];
553 node.allocate_domain(dom_id, *load, dom_load_avg);
554 }
555
556 self.nodes = SortedVec::from_unsorted(nodes);
557
558 Ok(())
559 }
560
561 fn calculate_load_avgs(&mut self) -> Result<LoadLedger> {
562 const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
563 let now_mono = now_monotonic();
564 let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
565
566 let mut aggregator =
567 LoadAggregator::new(self.dom_group.weight(), !self.lb_apply_weight.clone());
568
569 for (dom_id, dom) in self.dom_group.doms() {
570 aggregator.init_domain(*dom_id);
571
572 let dom_ctx = dom.ctx().unwrap();
573
574 for bucket in 0..NUM_BUCKETS {
575 let bucket_ctx = &dom_ctx.buckets[bucket as usize];
576 let rd = &bucket_ctx.rd;
577 let duty_cycle = ravg_read(
578 rd.val,
579 rd.val_at,
580 rd.old,
581 rd.cur,
582 now_mono,
583 load_half_life,
584 RAVG_FRAC_BITS,
585 );
586
587 if duty_cycle == 0.0f64 {
588 continue;
589 }
590
591 let weight = self.bucket_weight(bucket);
592 aggregator.record_dom_load(*dom_id, weight, duty_cycle)?;
593 }
594 }
595
596 Ok(aggregator.calculate())
597 }
598
599 fn bucket_range(&self, bucket: u64) -> (f64, f64) {
600 const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64;
601 const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
602 const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS;
603
604 if bucket >= NUM_BUCKETS {
605 panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS);
606 }
607
608 let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
610 let max_w = min_w + WEIGHT_PER_BUCKET - 1;
611
612 (min_w as f64, max_w as f64)
613 }
614
615 fn bucket_weight(&self, bucket: u64) -> usize {
616 const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
617 let (min_weight, _) = self.bucket_range(bucket);
618
619 (min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).ceil() as usize
621 }
622
623 fn populate_tasks_by_load(&mut self, dom: &mut Domain) -> Result<()> {
626 if dom.queried_tasks {
627 return Ok(());
628 }
629 dom.queried_tasks = true;
630
631 const MAX_TPTRS: u64 = bpf_intf::consts_MAX_DOM_ACTIVE_TPTRS as u64;
633 let dom_ctx = unsafe { &mut *self.skel.maps.bss_data.as_mut().unwrap().dom_ctxs[dom.id] };
634 let active_tasks = &mut dom_ctx.active_tasks;
635
636 let (mut ridx, widx) = (active_tasks.read_idx, active_tasks.write_idx);
637 active_tasks.read_idx = active_tasks.write_idx;
638 active_tasks.genn += 1;
639
640 if widx - ridx > MAX_TPTRS {
641 ridx = widx - MAX_TPTRS;
642 }
643
644 let load_half_life = self.skel.maps.rodata_data.as_ref().unwrap().load_half_life;
646 let now_mono = now_monotonic();
647
648 for idx in ridx..widx {
649 let taskc_p = active_tasks.tasks[(idx % MAX_TPTRS) as usize];
650 let taskc = unsafe { &mut *taskc_p };
651
652 if taskc.target_dom as usize != dom.id {
653 continue;
654 }
655
656 let rd = &taskc.dcyc_rd;
657 let mut load = ravg_read(
658 rd.val,
659 rd.val_at,
660 rd.old,
661 rd.cur,
662 now_mono,
663 load_half_life,
664 RAVG_FRAC_BITS,
665 );
666
667 let weight = if self.lb_apply_weight {
668 (taskc.weight as f64).min(self.infeas_threshold)
669 } else {
670 DEFAULT_WEIGHT
671 };
672 load *= weight;
673
674 dom.tasks.insert(TaskInfo {
675 taskc_p,
676 load: OrderedFloat(load),
677 dom_mask: taskc.dom_mask,
678 preferred_dom_mask: taskc.preferred_dom_mask,
679 migrated: Cell::new(false),
680 is_kworker: unsafe { taskc.is_kworker.assume_init() },
681 });
682 }
683
684 Ok(())
685 }
686
687 fn find_first_candidate<'d, I>(tasks_by_load: I) -> Option<&'d TaskInfo>
690 where
691 I: IntoIterator<Item = &'d TaskInfo>,
692 {
693 tasks_by_load.into_iter().next()
694 }
695
696 fn try_find_move_task(
700 &mut self,
701 (push_dom, to_push): (&mut Domain, f64),
702 (pull_dom, to_pull): (&mut Domain, f64),
703 task_filter: impl Fn(&TaskInfo, u32) -> bool,
704 to_xfer: f64,
705 ) -> Result<Option<f64>> {
706 let to_pull = to_pull.abs();
707 let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs();
708
709 self.populate_tasks_by_load(push_dom)?;
710
711 let pull_dom_id: u32 = pull_dom.id.try_into().unwrap();
719 let tasks: Vec<TaskInfo> = std::mem::take(&mut push_dom.tasks)
720 .into_vec()
721 .into_iter()
722 .filter(|task| {
723 task.dom_mask & (1 << pull_dom_id) != 0
724 && !(self.skip_kworkers && task.is_kworker)
725 && !task.migrated.get()
726 })
727 .collect();
728
729 let (task, new_imbal) = match (
730 Self::find_first_candidate(
731 tasks
732 .as_slice()
733 .iter()
734 .filter(|x| x.load <= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id))
735 .rev(),
736 ),
737 Self::find_first_candidate(
738 tasks
739 .as_slice()
740 .iter()
741 .filter(|x| x.load >= OrderedFloat(to_xfer) && task_filter(x, pull_dom_id)),
742 ),
743 ) {
744 (None, None) => {
745 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
746 return Ok(None);
747 }
748 (Some(task), None) | (None, Some(task)) => (task, calc_new_imbal(*task.load)),
749 (Some(task0), Some(task1)) => {
750 let (new_imbal0, new_imbal1) =
751 (calc_new_imbal(*task0.load), calc_new_imbal(*task1.load));
752 if new_imbal0 <= new_imbal1 {
753 (task0, new_imbal0)
754 } else {
755 (task1, new_imbal1)
756 }
757 }
758 };
759
760 let old_imbal = to_push + to_pull;
763 if old_imbal < new_imbal {
764 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
765 return Ok(None);
766 }
767
768 let load = *(task.load);
769 let taskc_p = task.taskc_p;
770 task.migrated.set(true);
771 std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
772
773 push_dom.transfer_load(load, unsafe { &mut *taskc_p }, pull_dom);
774 Ok(Some(load))
775 }
776
777 fn transfer_between_nodes(
778 &mut self,
779 push_node: &mut NumaNode,
780 pull_node: &mut NumaNode,
781 ) -> Result<f64> {
782 debug!("Inter node {} -> {} started", push_node.id, pull_node.id);
783
784 let push_imbal = push_node.load.imbal();
785 let pull_imbal = pull_node.load.imbal();
786 let xfer = push_node.xfer_between(pull_node);
787
788 if push_imbal <= 0.0f64 || pull_imbal >= 0.0f64 {
789 bail!(
790 "push node {}:{}, pull node {}:{}",
791 push_node.id,
792 push_imbal,
793 pull_node.id,
794 pull_imbal
795 );
796 }
797 let mut pushers = VecDeque::with_capacity(push_node.domains.len());
798 let mut pullers = Vec::with_capacity(pull_node.domains.len());
799 let mut pushed = 0f64;
800
801 while push_node.domains.len() > 0 {
802 let mut push_dom = push_node.domains.pop().unwrap();
804 if push_dom.load.state() != BalanceState::NeedsPush {
805 push_node.domains.insert(push_dom);
806 break;
807 }
808
809 while pull_node.domains.len() > 0 {
810 let mut pull_dom = pull_node.domains.remove_index(0);
811 if pull_dom.load.state() != BalanceState::NeedsPull {
812 pull_node.domains.insert(pull_dom);
813 break;
814 }
815 let mut transferred = self.try_find_move_task(
816 (&mut push_dom, push_imbal),
817 (&mut pull_dom, pull_imbal),
818 |task: &TaskInfo, pull_dom: u32| -> bool {
819 (task.preferred_dom_mask & (1 << pull_dom)) > 0
820 },
821 xfer,
822 )?;
823 if transferred.is_none() {
824 transferred = self.try_find_move_task(
825 (&mut push_dom, push_imbal),
826 (&mut pull_dom, pull_imbal),
827 |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
828 xfer,
829 )?;
830 }
831
832 pullers.push(pull_dom);
833 if let Some(transferred) = transferred {
834 pushed = transferred;
835 push_node.update_load(-transferred);
836 pull_node.update_load(transferred);
837 break;
838 }
839 }
840 while let Some(puller) = pullers.pop() {
841 pull_node.domains.insert(puller);
842 }
843 pushers.push_back(push_dom);
844 if pushed > 0.0f64 {
845 break;
846 }
847 }
848 while let Some(pusher) = pushers.pop_front() {
849 push_node.domains.insert(pusher);
850 }
851
852 Ok(pushed)
853 }
854
855 fn balance_between_nodes(&mut self) -> Result<()> {
856 if self.nodes.len() < 2 {
857 return Ok(());
858 }
859
860 debug!("Node <-> Node LB started");
861
862 let mut pushers = VecDeque::with_capacity(self.nodes.len());
904 let mut pullers = Vec::with_capacity(self.nodes.len());
905
906 while self.nodes.len() >= 2 {
907 let mut push_node = self.nodes.pop().unwrap();
909 if push_node.load.state() != BalanceState::NeedsPush {
910 self.nodes.insert(push_node);
911 break;
912 }
913
914 let push_cutoff = push_node.load.push_cutoff();
915 let mut pushed = 0f64;
916 while self.nodes.len() > 0 && pushed < push_cutoff {
917 let mut pull_node = self.nodes.remove_index(0);
919 let pull_id = pull_node.id;
920 if pull_node.load.state() != BalanceState::NeedsPull {
921 self.nodes.insert(pull_node);
922 break;
923 }
924 let migrated = self.transfer_between_nodes(&mut push_node, &mut pull_node)?;
925 pullers.push(pull_node);
926 if migrated > 0.0f64 {
927 pushed += migrated;
932 debug!(
933 "NODE {} sending {:.06} --> NODE {}",
934 push_node.id, migrated, pull_id
935 );
936 }
937 }
938 while let Some(puller) = pullers.pop() {
939 self.nodes.insert(puller);
940 }
941
942 if pushed > 0.0f64 {
943 debug!("NODE {} pushed {:.06} total load", push_node.id, pushed);
944 }
945 pushers.push_back(push_node);
946 }
947
948 while let Some(pusher) = pushers.pop_front() {
949 self.nodes.insert(pusher);
950 }
951
952 Ok(())
953 }
954
955 fn balance_within_node(&mut self, node: &mut NumaNode) -> Result<()> {
956 if node.domains.len() < 2 {
957 return Ok(());
958 }
959
960 debug!("Intra node {} LB started", node.id);
961
962 let mut pushers = VecDeque::with_capacity(node.domains.len());
967 let mut pullers = Vec::new();
968
969 while node.domains.len() >= 2 {
970 let mut push_dom = node.domains.pop().unwrap();
971 if node.domains.len() == 0 || push_dom.load.state() != BalanceState::NeedsPush {
972 node.domains.insert(push_dom);
973 break;
974 }
975
976 let mut pushed = 0.0f64;
977 let push_cutoff = push_dom.load.push_cutoff();
978 let push_imbal = push_dom.load.imbal();
979 if push_imbal < 0.0f64 {
980 bail!(
981 "Node {} push dom {} had imbal {}",
982 node.id,
983 push_dom.id,
984 push_imbal
985 );
986 }
987
988 while node.domains.len() > 0 && pushed < push_cutoff {
989 let mut pull_dom = node.domains.remove_index(0);
990 if pull_dom.load.state() != BalanceState::NeedsPull {
991 node.domains.push(pull_dom);
992 break;
993 }
994 let pull_imbal = pull_dom.load.imbal();
995 if pull_imbal >= 0.0f64 {
996 bail!(
997 "Node {} pull dom {} had imbal {}",
998 node.id,
999 pull_dom.id,
1000 pull_imbal
1001 );
1002 }
1003 let xfer = push_dom.xfer_between(&pull_dom);
1004 let mut transferred = self.try_find_move_task(
1005 (&mut push_dom, push_imbal),
1006 (&mut pull_dom, pull_imbal),
1007 |task: &TaskInfo, pull_dom: u32| -> bool {
1008 (task.preferred_dom_mask & (1 << pull_dom)) > 0
1009 },
1010 xfer,
1011 )?;
1012 if transferred.is_none() {
1013 transferred = self.try_find_move_task(
1014 (&mut push_dom, push_imbal),
1015 (&mut pull_dom, pull_imbal),
1016 |_task: &TaskInfo, _pull_dom: u32| -> bool { true },
1017 xfer,
1018 )?;
1019 }
1020
1021 if let Some(transferred) = transferred {
1022 if transferred <= 0.0f64 {
1023 bail!("Expected nonzero load transfer")
1024 }
1025 pushed += transferred;
1026 node.domains.insert(pull_dom);
1032 continue;
1033 }
1034
1035 pullers.push(pull_dom);
1037 }
1038 while let Some(puller) = pullers.pop() {
1039 node.domains.insert(puller);
1040 }
1041
1042 if pushed > 0.0f64 {
1043 debug!("DOM {} pushed {:.06} total load", push_dom.id, pushed);
1044 }
1045 pushers.push_back(push_dom);
1046 }
1047 while let Some(pusher) = pushers.pop_front() {
1048 node.domains.insert(pusher);
1049 }
1050
1051 Ok(())
1052 }
1053
1054 fn perform_balancing(&mut self) -> Result<()> {
1055 if self.dom_group.nr_nodes() > 1 {
1060 self.balance_between_nodes()?;
1061 }
1062
1063 debug!("Intra node LBs started");
1067
1068 let mut nodes = std::mem::take(&mut self.nodes).into_vec();
1071 for node in nodes.iter_mut() {
1072 self.balance_within_node(node)?;
1073 }
1074 std::mem::swap(&mut self.nodes, &mut SortedVec::from_unsorted(nodes));
1075
1076 Ok(())
1077 }
1078}