Skip to content

Commit 4934d7e

Browse files
committed
refactor(resource): split Refresh into focused functions with parallel execution
This change breaks down the Refresh method into smaller functions following Single Responsibility Principle. Changes are .. - refreshProcesses: categorizes processes during initial scan - refreshContainers/refreshVMs: build workload-specific collections - refreshPods: handles container-to-pod mapping (depends on containers) - refreshNode: calculates node-level aggregations Note that this change introduces parallel execution using goroutines for independent workloads (containers+pods, VMs, and node calculations) while maintaining proper dependency ordering. Also adds test coverage for concurrency testing, missing coverage and error handling scenarios. Signed-off-by: Sunil Thaha <sthaha@redhat.com>
1 parent bec7b86 commit 4934d7e

File tree

4 files changed

+502
-88
lines changed

4 files changed

+502
-88
lines changed

internal/resource/informer.go

Lines changed: 169 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"log/slog"
1010
"os"
11+
"sync"
1112
"time"
1213

1314
"github.com/sustainable-computing-io/kepler/internal/k8s/pod"
@@ -162,19 +163,19 @@ func (ri *resourceInformer) Init() error {
162163
return nil
163164
}
164165

165-
func (ri *resourceInformer) Refresh() error {
166-
started := ri.clock.Now()
167-
166+
// refreshProcesses refreshes the process cache and returns the procs for containers and VMs
167+
func (ri *resourceInformer) refreshProcesses() ([]*Process, []*Process, error) {
168168
procs, err := ri.fs.AllProcs()
169169
if err != nil {
170-
return fmt.Errorf("failed to get processes: %w", err)
170+
return nil, nil, fmt.Errorf("failed to get processes: %w", err)
171171
}
172172

173-
// construct current running processes and containers
173+
// construct current running processes
174174
procsRunning := make(map[int]*Process, len(procs))
175-
containersRunning := make(map[string]*Container)
176-
vmsRunning := make(map[string]*VirtualMachine)
177-
podsRunning := make(map[string]*Pod)
175+
176+
// collect categorized processes during iteration
177+
containerProcs := make([]*Process, 0)
178+
vmProcs := make([]*Process, 0)
178179

179180
// Refresh process cache and update running processes
180181
var refreshErrs error
@@ -194,133 +195,215 @@ func (ri *resourceInformer) Refresh() error {
194195
}
195196
procsRunning[pid] = proc
196197

198+
// categorize processes during iteration
197199
switch proc.Type {
198200
case ContainerProcess:
199-
c := proc.Container
200-
_, seen := containersRunning[c.ID]
201-
// reset CPU Time of the container if it is getting added to the running list for the first time
202-
// in the subsequent iteration, the CPUTimeDelta should be incremented by process's CPUTimeDelta
203-
resetCPUTime := !seen
204-
containersRunning[c.ID] = ri.updateContainerCache(proc, resetCPUTime)
205-
201+
containerProcs = append(containerProcs, proc)
206202
case VMProcess:
207-
vm := proc.VirtualMachine
208-
vmsRunning[vm.ID] = ri.updateVMCache(proc)
209-
}
210-
211-
}
212-
213-
containersNoPod := []string{}
214-
if ri.podInformer != nil {
215-
for _, container := range containersRunning {
216-
cntrInfo, found, err := ri.podInformer.LookupByContainerID(container.ID)
217-
if err != nil {
218-
ri.logger.Debug("Failed to get pod for container", "container", container.ID, "error", err)
219-
refreshErrs = errors.Join(refreshErrs, fmt.Errorf("failed to get pod for container: %w", err))
220-
continue
221-
}
222-
223-
if !found {
224-
containersNoPod = append(containersNoPod, container.ID)
225-
continue
226-
}
227-
228-
pod := &Pod{
229-
ID: cntrInfo.PodID,
230-
Name: cntrInfo.PodName,
231-
Namespace: cntrInfo.Namespace,
232-
}
233-
container.Pod = pod
234-
container.Name = cntrInfo.ContainerName
235-
236-
_, seen := podsRunning[pod.ID]
237-
// reset CPU Time of the pod if it is getting added to the running list for the first time
238-
// in the subsequent iteration, the CPUTimeDelta should be incremented by container's CPUTimeDelta
239-
resetCPUTime := !seen
240-
podsRunning[pod.ID] = ri.updatePodCache(container, resetCPUTime)
203+
vmProcs = append(vmProcs, proc)
241204
}
242205
}
243206

244207
// Find terminated processes
245-
procCPUDeltaTotal := float64(0)
246208
procsTerminated := make(map[int]*Process)
247209
for pid, proc := range ri.procCache {
248-
if _, isRunning := procsRunning[pid]; isRunning {
249-
procCPUDeltaTotal += proc.CPUTimeDelta
250-
continue
210+
if _, isRunning := procsRunning[pid]; !isRunning {
211+
procsTerminated[pid] = proc
212+
delete(ri.procCache, pid)
251213
}
252-
procsTerminated[pid] = proc
253-
delete(ri.procCache, pid)
254214
}
255215

256-
// Node
257-
usage, err := ri.fs.CPUUsageRatio()
258-
if err != nil {
259-
return fmt.Errorf("failed to get procfs usage: %w", err)
260-
}
261-
ri.node.ProcessTotalCPUTimeDelta = procCPUDeltaTotal
262-
ri.node.CPUUsageRatio = usage
263-
264216
// Update tracking structures
265217
ri.processes.Running = procsRunning
266218
ri.processes.Terminated = procsTerminated
267219

220+
return containerProcs, vmProcs, refreshErrs
221+
}
222+
223+
func (ri *resourceInformer) refreshContainers(containerProcs []*Process) error {
224+
containersRunning := make(map[string]*Container)
225+
226+
// Build running containers from pre-categorized container processes
227+
for _, proc := range containerProcs {
228+
c := proc.Container
229+
_, seen := containersRunning[c.ID]
230+
// reset CPU Time of the container if it is getting added to the running list for the first time
231+
// in the subsequent iteration, the CPUTimeDelta should be incremented by process's CPUTimeDelta
232+
resetCPUTime := !seen
233+
containersRunning[c.ID] = ri.updateContainerCache(proc, resetCPUTime)
234+
}
235+
268236
// Find terminated containers
269-
totalContainerDelta := float64(0)
270237
containersTerminated := make(map[string]*Container)
271238
for id, container := range ri.containerCache {
272-
if _, isRunning := containersRunning[id]; isRunning {
273-
totalContainerDelta += container.CPUTimeDelta
274-
continue
239+
if _, isRunning := containersRunning[id]; !isRunning {
240+
containersTerminated[id] = container
241+
delete(ri.containerCache, id)
275242
}
276-
containersTerminated[id] = container
277-
delete(ri.containerCache, id)
278243
}
279244

280245
ri.containers.Running = containersRunning
281246
ri.containers.Terminated = containersTerminated
282247

248+
return nil
249+
}
250+
251+
func (ri *resourceInformer) refreshVMs(vmProcs []*Process) error {
252+
vmsRunning := make(map[string]*VirtualMachine)
253+
254+
// Build running VMs from pre-categorized VM processes
255+
for _, proc := range vmProcs {
256+
vm := proc.VirtualMachine
257+
vmsRunning[vm.ID] = ri.updateVMCache(proc)
258+
}
259+
283260
// Find terminated VMs
284261
vmsTerminated := make(map[string]*VirtualMachine)
285262
for id, vm := range ri.vmCache {
286-
if _, isRunning := vmsRunning[id]; isRunning {
287-
continue
263+
if _, isRunning := vmsRunning[id]; !isRunning {
264+
vmsTerminated[id] = vm
265+
delete(ri.vmCache, id)
288266
}
289-
vmsTerminated[id] = vm
290-
delete(ri.vmCache, id)
291267
}
292268

293269
ri.vms.Running = vmsRunning
294270
ri.vms.Terminated = vmsTerminated
295271

272+
return nil
273+
}
274+
275+
func (ri *resourceInformer) refreshPods() error {
276+
if ri.podInformer == nil {
277+
return nil
278+
}
279+
280+
podsRunning := make(map[string]*Pod)
281+
containersNoPod := []string{}
282+
var refreshErrs error
283+
284+
for _, container := range ri.containers.Running {
285+
cntrInfo, found, err := ri.podInformer.LookupByContainerID(container.ID)
286+
if err != nil {
287+
ri.logger.Debug("Failed to get pod for container", "container", container.ID, "error", err)
288+
refreshErrs = errors.Join(refreshErrs, fmt.Errorf("failed to get pod for container: %w", err))
289+
continue
290+
}
291+
292+
if !found {
293+
containersNoPod = append(containersNoPod, container.ID)
294+
continue
295+
}
296+
297+
pod := &Pod{
298+
ID: cntrInfo.PodID,
299+
Name: cntrInfo.PodName,
300+
Namespace: cntrInfo.Namespace,
301+
}
302+
container.Pod = pod
303+
container.Name = cntrInfo.ContainerName
304+
305+
_, seen := podsRunning[pod.ID]
306+
// reset CPU Time of the pod if it is getting added to the running list for the first time
307+
// in the subsequent iteration, the CPUTimeDelta should be incremented by container's CPUTimeDelta
308+
resetCPUTime := !seen
309+
podsRunning[pod.ID] = ri.updatePodCache(container, resetCPUTime)
310+
}
311+
296312
// Find terminated pods
297313
podsTerminated := make(map[string]*Pod)
298314
for id, pod := range ri.podCache {
299-
if _, isRunning := podsRunning[id]; isRunning {
300-
continue
315+
if _, isRunning := podsRunning[id]; !isRunning {
316+
podsTerminated[id] = pod
317+
delete(ri.podCache, id)
301318
}
302-
podsTerminated[id] = pod
303-
delete(ri.podCache, id)
304319
}
305320

306321
ri.pods.Running = podsRunning
307322
ri.pods.Terminated = podsTerminated
308323
ri.pods.ContainersNoPod = containersNoPod
309324

325+
return refreshErrs
326+
}
327+
328+
func (ri *resourceInformer) refreshNode() error {
329+
// Calculate total CPU delta from all running processes
330+
procCPUDeltaTotal := float64(0)
331+
for _, proc := range ri.processes.Running {
332+
procCPUDeltaTotal += proc.CPUTimeDelta
333+
}
334+
335+
// Get current CPU usage ratio
336+
usage, err := ri.fs.CPUUsageRatio()
337+
if err != nil {
338+
return fmt.Errorf("failed to get procfs usage: %w", err)
339+
}
340+
341+
ri.node.ProcessTotalCPUTimeDelta = procCPUDeltaTotal
342+
ri.node.CPUUsageRatio = usage
343+
344+
return nil
345+
}
346+
347+
// Refresh updates the internal state by scanning processes, containers, VMs, and pods.
348+
// This method is NOT thread-safe and should not be called concurrently.
349+
func (ri *resourceInformer) Refresh() error {
350+
started := ri.clock.Now()
351+
352+
// Refresh workloads in dependency order:
353+
// processes -> {
354+
// -> containers -> pod
355+
// -> VMs
356+
// -> node
357+
// }
358+
var refreshErrs error
359+
360+
containerProcs, vmProcs, err := ri.refreshProcesses()
361+
if err != nil {
362+
refreshErrs = errors.Join(refreshErrs, err)
363+
}
364+
365+
// refresh containers and VMs in parallel
366+
// Note: No locking needed on ri fields since refreshContainers() and refreshVMs()
367+
// operate on completely disjoint data structures (containers vs VMs)
368+
wg := sync.WaitGroup{}
369+
wg.Add(3)
370+
371+
var cntrErrs, podErrs, vmErrs, nodeErrs error
372+
go func() {
373+
defer wg.Done()
374+
cntrErrs = ri.refreshContainers(containerProcs)
375+
podErrs = ri.refreshPods()
376+
}()
377+
378+
go func() {
379+
defer wg.Done()
380+
vmErrs = ri.refreshVMs(vmProcs)
381+
}()
382+
383+
go func() {
384+
defer wg.Done()
385+
nodeErrs = ri.refreshNode()
386+
}()
387+
388+
wg.Wait()
389+
390+
refreshErrs = errors.Join(refreshErrs, cntrErrs, podErrs, vmErrs, nodeErrs)
391+
392+
// Update timing
310393
now := ri.clock.Now()
311394
ri.lastScanTime = now
312395
duration := now.Sub(started)
313396

314397
ri.logger.Debug("Resource information collected",
315-
"process.running", len(procsRunning),
316-
"process.terminated", len(procsTerminated),
317-
"container.running", len(containersRunning),
318-
"container.terminated", len(containersTerminated),
319-
"vm.running", len(vmsRunning),
320-
"vm.terminated", len(vmsTerminated),
321-
"pod.running", len(podsRunning),
322-
"pod.terminated", len(podsTerminated),
323-
"container.no-pod", len(containersNoPod),
398+
"process.running", len(ri.processes.Running),
399+
"process.terminated", len(ri.processes.Terminated),
400+
"container.running", len(ri.containers.Running),
401+
"container.terminated", len(ri.containers.Terminated),
402+
"vm.running", len(ri.vms.Running),
403+
"vm.terminated", len(ri.vms.Terminated),
404+
"pod.running", len(ri.pods.Running),
405+
"pod.terminated", len(ri.pods.Terminated),
406+
"container.no-pod", len(ri.pods.ContainersNoPod),
324407
"duration", duration)
325408

326409
return refreshErrs

internal/resource/pod_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// SPDX-FileCopyrightText: 2025 The Kepler Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package resource
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestPodClone(t *testing.T) {
14+
t.Run("Clone full Pod with all fields", func(t *testing.T) {
15+
original := &Pod{
16+
ID: "pod-123",
17+
Name: "test-pod",
18+
Namespace: "default",
19+
CPUTotalTime: 42.5,
20+
CPUTimeDelta: 10.2,
21+
}
22+
23+
clone := original.Clone()
24+
require.NotNil(t, clone)
25+
assert.Equal(t, original.ID, clone.ID)
26+
assert.Equal(t, original.Name, clone.Name)
27+
assert.Equal(t, original.Namespace, clone.Namespace)
28+
// CPU times should not be copied in Clone
29+
assert.Equal(t, float64(0), clone.CPUTotalTime)
30+
assert.Equal(t, float64(0), clone.CPUTimeDelta)
31+
32+
// Verify they are separate objects
33+
assert.NotSame(t, original, clone)
34+
})
35+
36+
t.Run("Clone nil Pod", func(t *testing.T) {
37+
var nilPod *Pod
38+
nilClone := nilPod.Clone()
39+
assert.Nil(t, nilClone, "Cloning nil Pod should return nil")
40+
})
41+
}

0 commit comments

Comments
 (0)