8
8
"fmt"
9
9
"log/slog"
10
10
"os"
11
+ "sync"
11
12
"time"
12
13
13
14
"github.com/sustainable-computing-io/kepler/internal/k8s/pod"
@@ -162,19 +163,19 @@ func (ri *resourceInformer) Init() error {
162
163
return nil
163
164
}
164
165
165
- func (ri * resourceInformer ) Refresh () error {
166
- started := ri .clock .Now ()
167
-
166
+ // refreshProcesses refreshes the process cache and returns the procs for containers and VMs
167
+ func (ri * resourceInformer ) refreshProcesses () ([]* Process , []* Process , error ) {
168
168
procs , err := ri .fs .AllProcs ()
169
169
if err != nil {
170
- return fmt .Errorf ("failed to get processes: %w" , err )
170
+ return nil , nil , fmt .Errorf ("failed to get processes: %w" , err )
171
171
}
172
172
173
- // construct current running processes and containers
173
+ // construct current running processes
174
174
procsRunning := make (map [int ]* Process , len (procs ))
175
- containersRunning := make (map [string ]* Container )
176
- vmsRunning := make (map [string ]* VirtualMachine )
177
- podsRunning := make (map [string ]* Pod )
175
+
176
+ // collect categorized processes during iteration
177
+ containerProcs := make ([]* Process , 0 )
178
+ vmProcs := make ([]* Process , 0 )
178
179
179
180
// Refresh process cache and update running processes
180
181
var refreshErrs error
@@ -194,133 +195,215 @@ func (ri *resourceInformer) Refresh() error {
194
195
}
195
196
procsRunning [pid ] = proc
196
197
198
+ // categorize processes during iteration
197
199
switch proc .Type {
198
200
case ContainerProcess :
199
- c := proc .Container
200
- _ , seen := containersRunning [c .ID ]
201
- // reset CPU Time of the container if it is getting added to the running list for the first time
202
- // in the subsequent iteration, the CPUTimeDelta should be incremented by process's CPUTimeDelta
203
- resetCPUTime := ! seen
204
- containersRunning [c .ID ] = ri .updateContainerCache (proc , resetCPUTime )
205
-
201
+ containerProcs = append (containerProcs , proc )
206
202
case VMProcess :
207
- vm := proc .VirtualMachine
208
- vmsRunning [vm .ID ] = ri .updateVMCache (proc )
209
- }
210
-
211
- }
212
-
213
- containersNoPod := []string {}
214
- if ri .podInformer != nil {
215
- for _ , container := range containersRunning {
216
- cntrInfo , found , err := ri .podInformer .LookupByContainerID (container .ID )
217
- if err != nil {
218
- ri .logger .Debug ("Failed to get pod for container" , "container" , container .ID , "error" , err )
219
- refreshErrs = errors .Join (refreshErrs , fmt .Errorf ("failed to get pod for container: %w" , err ))
220
- continue
221
- }
222
-
223
- if ! found {
224
- containersNoPod = append (containersNoPod , container .ID )
225
- continue
226
- }
227
-
228
- pod := & Pod {
229
- ID : cntrInfo .PodID ,
230
- Name : cntrInfo .PodName ,
231
- Namespace : cntrInfo .Namespace ,
232
- }
233
- container .Pod = pod
234
- container .Name = cntrInfo .ContainerName
235
-
236
- _ , seen := podsRunning [pod .ID ]
237
- // reset CPU Time of the pod if it is getting added to the running list for the first time
238
- // in the subsequent iteration, the CPUTimeDelta should be incremented by container's CPUTimeDelta
239
- resetCPUTime := ! seen
240
- podsRunning [pod .ID ] = ri .updatePodCache (container , resetCPUTime )
203
+ vmProcs = append (vmProcs , proc )
241
204
}
242
205
}
243
206
244
207
// Find terminated processes
245
- procCPUDeltaTotal := float64 (0 )
246
208
procsTerminated := make (map [int ]* Process )
247
209
for pid , proc := range ri .procCache {
248
- if _ , isRunning := procsRunning [pid ]; isRunning {
249
- procCPUDeltaTotal + = proc . CPUTimeDelta
250
- continue
210
+ if _ , isRunning := procsRunning [pid ]; ! isRunning {
211
+ procsTerminated [ pid ] = proc
212
+ delete ( ri . procCache , pid )
251
213
}
252
- procsTerminated [pid ] = proc
253
- delete (ri .procCache , pid )
254
214
}
255
215
256
- // Node
257
- usage , err := ri .fs .CPUUsageRatio ()
258
- if err != nil {
259
- return fmt .Errorf ("failed to get procfs usage: %w" , err )
260
- }
261
- ri .node .ProcessTotalCPUTimeDelta = procCPUDeltaTotal
262
- ri .node .CPUUsageRatio = usage
263
-
264
216
// Update tracking structures
265
217
ri .processes .Running = procsRunning
266
218
ri .processes .Terminated = procsTerminated
267
219
220
+ return containerProcs , vmProcs , refreshErrs
221
+ }
222
+
223
+ func (ri * resourceInformer ) refreshContainers (containerProcs []* Process ) error {
224
+ containersRunning := make (map [string ]* Container )
225
+
226
+ // Build running containers from pre-categorized container processes
227
+ for _ , proc := range containerProcs {
228
+ c := proc .Container
229
+ _ , seen := containersRunning [c .ID ]
230
+ // reset CPU Time of the container if it is getting added to the running list for the first time
231
+ // in the subsequent iteration, the CPUTimeDelta should be incremented by process's CPUTimeDelta
232
+ resetCPUTime := ! seen
233
+ containersRunning [c .ID ] = ri .updateContainerCache (proc , resetCPUTime )
234
+ }
235
+
268
236
// Find terminated containers
269
- totalContainerDelta := float64 (0 )
270
237
containersTerminated := make (map [string ]* Container )
271
238
for id , container := range ri .containerCache {
272
- if _ , isRunning := containersRunning [id ]; isRunning {
273
- totalContainerDelta + = container . CPUTimeDelta
274
- continue
239
+ if _ , isRunning := containersRunning [id ]; ! isRunning {
240
+ containersTerminated [ id ] = container
241
+ delete ( ri . containerCache , id )
275
242
}
276
- containersTerminated [id ] = container
277
- delete (ri .containerCache , id )
278
243
}
279
244
280
245
ri .containers .Running = containersRunning
281
246
ri .containers .Terminated = containersTerminated
282
247
248
+ return nil
249
+ }
250
+
251
+ func (ri * resourceInformer ) refreshVMs (vmProcs []* Process ) error {
252
+ vmsRunning := make (map [string ]* VirtualMachine )
253
+
254
+ // Build running VMs from pre-categorized VM processes
255
+ for _ , proc := range vmProcs {
256
+ vm := proc .VirtualMachine
257
+ vmsRunning [vm .ID ] = ri .updateVMCache (proc )
258
+ }
259
+
283
260
// Find terminated VMs
284
261
vmsTerminated := make (map [string ]* VirtualMachine )
285
262
for id , vm := range ri .vmCache {
286
- if _ , isRunning := vmsRunning [id ]; isRunning {
287
- continue
263
+ if _ , isRunning := vmsRunning [id ]; ! isRunning {
264
+ vmsTerminated [id ] = vm
265
+ delete (ri .vmCache , id )
288
266
}
289
- vmsTerminated [id ] = vm
290
- delete (ri .vmCache , id )
291
267
}
292
268
293
269
ri .vms .Running = vmsRunning
294
270
ri .vms .Terminated = vmsTerminated
295
271
272
+ return nil
273
+ }
274
+
275
+ func (ri * resourceInformer ) refreshPods () error {
276
+ if ri .podInformer == nil {
277
+ return nil
278
+ }
279
+
280
+ podsRunning := make (map [string ]* Pod )
281
+ containersNoPod := []string {}
282
+ var refreshErrs error
283
+
284
+ for _ , container := range ri .containers .Running {
285
+ cntrInfo , found , err := ri .podInformer .LookupByContainerID (container .ID )
286
+ if err != nil {
287
+ ri .logger .Debug ("Failed to get pod for container" , "container" , container .ID , "error" , err )
288
+ refreshErrs = errors .Join (refreshErrs , fmt .Errorf ("failed to get pod for container: %w" , err ))
289
+ continue
290
+ }
291
+
292
+ if ! found {
293
+ containersNoPod = append (containersNoPod , container .ID )
294
+ continue
295
+ }
296
+
297
+ pod := & Pod {
298
+ ID : cntrInfo .PodID ,
299
+ Name : cntrInfo .PodName ,
300
+ Namespace : cntrInfo .Namespace ,
301
+ }
302
+ container .Pod = pod
303
+ container .Name = cntrInfo .ContainerName
304
+
305
+ _ , seen := podsRunning [pod .ID ]
306
+ // reset CPU Time of the pod if it is getting added to the running list for the first time
307
+ // in the subsequent iteration, the CPUTimeDelta should be incremented by container's CPUTimeDelta
308
+ resetCPUTime := ! seen
309
+ podsRunning [pod .ID ] = ri .updatePodCache (container , resetCPUTime )
310
+ }
311
+
296
312
// Find terminated pods
297
313
podsTerminated := make (map [string ]* Pod )
298
314
for id , pod := range ri .podCache {
299
- if _ , isRunning := podsRunning [id ]; isRunning {
300
- continue
315
+ if _ , isRunning := podsRunning [id ]; ! isRunning {
316
+ podsTerminated [id ] = pod
317
+ delete (ri .podCache , id )
301
318
}
302
- podsTerminated [id ] = pod
303
- delete (ri .podCache , id )
304
319
}
305
320
306
321
ri .pods .Running = podsRunning
307
322
ri .pods .Terminated = podsTerminated
308
323
ri .pods .ContainersNoPod = containersNoPod
309
324
325
+ return refreshErrs
326
+ }
327
+
328
+ func (ri * resourceInformer ) refreshNode () error {
329
+ // Calculate total CPU delta from all running processes
330
+ procCPUDeltaTotal := float64 (0 )
331
+ for _ , proc := range ri .processes .Running {
332
+ procCPUDeltaTotal += proc .CPUTimeDelta
333
+ }
334
+
335
+ // Get current CPU usage ratio
336
+ usage , err := ri .fs .CPUUsageRatio ()
337
+ if err != nil {
338
+ return fmt .Errorf ("failed to get procfs usage: %w" , err )
339
+ }
340
+
341
+ ri .node .ProcessTotalCPUTimeDelta = procCPUDeltaTotal
342
+ ri .node .CPUUsageRatio = usage
343
+
344
+ return nil
345
+ }
346
+
347
+ // Refresh updates the internal state by scanning processes, containers, VMs, and pods.
348
+ // This method is NOT thread-safe and should not be called concurrently.
349
+ func (ri * resourceInformer ) Refresh () error {
350
+ started := ri .clock .Now ()
351
+
352
+ // Refresh workloads in dependency order:
353
+ // processes -> {
354
+ // -> containers -> pod
355
+ // -> VMs
356
+ // -> node
357
+ // }
358
+ var refreshErrs error
359
+
360
+ containerProcs , vmProcs , err := ri .refreshProcesses ()
361
+ if err != nil {
362
+ refreshErrs = errors .Join (refreshErrs , err )
363
+ }
364
+
365
+ // refresh containers and VMs in parallel
366
+ // Note: No locking needed on ri fields since refreshContainers() and refreshVMs()
367
+ // operate on completely disjoint data structures (containers vs VMs)
368
+ wg := sync.WaitGroup {}
369
+ wg .Add (3 )
370
+
371
+ var cntrErrs , podErrs , vmErrs , nodeErrs error
372
+ go func () {
373
+ defer wg .Done ()
374
+ cntrErrs = ri .refreshContainers (containerProcs )
375
+ podErrs = ri .refreshPods ()
376
+ }()
377
+
378
+ go func () {
379
+ defer wg .Done ()
380
+ vmErrs = ri .refreshVMs (vmProcs )
381
+ }()
382
+
383
+ go func () {
384
+ defer wg .Done ()
385
+ nodeErrs = ri .refreshNode ()
386
+ }()
387
+
388
+ wg .Wait ()
389
+
390
+ refreshErrs = errors .Join (refreshErrs , cntrErrs , podErrs , vmErrs , nodeErrs )
391
+
392
+ // Update timing
310
393
now := ri .clock .Now ()
311
394
ri .lastScanTime = now
312
395
duration := now .Sub (started )
313
396
314
397
ri .logger .Debug ("Resource information collected" ,
315
- "process.running" , len (procsRunning ),
316
- "process.terminated" , len (procsTerminated ),
317
- "container.running" , len (containersRunning ),
318
- "container.terminated" , len (containersTerminated ),
319
- "vm.running" , len (vmsRunning ),
320
- "vm.terminated" , len (vmsTerminated ),
321
- "pod.running" , len (podsRunning ),
322
- "pod.terminated" , len (podsTerminated ),
323
- "container.no-pod" , len (containersNoPod ),
398
+ "process.running" , len (ri . processes . Running ),
399
+ "process.terminated" , len (ri . processes . Terminated ),
400
+ "container.running" , len (ri . containers . Running ),
401
+ "container.terminated" , len (ri . containers . Terminated ),
402
+ "vm.running" , len (ri . vms . Running ),
403
+ "vm.terminated" , len (ri . vms . Terminated ),
404
+ "pod.running" , len (ri . pods . Running ),
405
+ "pod.terminated" , len (ri . pods . Terminated ),
406
+ "container.no-pod" , len (ri . pods . ContainersNoPod ),
324
407
"duration" , duration )
325
408
326
409
return refreshErrs
0 commit comments