diff --git a/cmd/kepler/main.go b/cmd/kepler/main.go index 4898be1579..c6c49f1a92 100644 --- a/cmd/kepler/main.go +++ b/cmd/kepler/main.go @@ -194,11 +194,15 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service, func createPrometheusExporter(logger *slog.Logger, cfg *config.Config, apiServer *server.APIServer, pm *monitor.PowerMonitor) (*prometheus.Exporter, error) { logger.Debug("Creating Prometheus exporter") + // Use metrics level from configuration (already parsed) + metricsLevel := cfg.Exporter.Prometheus.MetricsLevel + collectors, err := prometheus.CreateCollectors( pm, prometheus.WithLogger(logger), prometheus.WithProcFSPath(cfg.Host.ProcFS), prometheus.WithNodeName(cfg.Kube.Node), + prometheus.WithMetricsLevel(metricsLevel), ) if err != nil { return nil, fmt.Errorf("failed to create Prometheus collectors: %w", err) diff --git a/compose/dev/kepler-dev/etc/kepler/config.yaml b/compose/dev/kepler-dev/etc/kepler/config.yaml index 5c3763f41b..4bdf7cbcfc 100644 --- a/compose/dev/kepler-dev/etc/kepler/config.yaml +++ b/compose/dev/kepler-dev/etc/kepler/config.yaml @@ -36,6 +36,10 @@ exporter: debugCollectors: - go - process + metricsLevel: + - node + - container + - process debug: # debug related config pprof: # pprof related config diff --git a/config/config.go b/config/config.go index 2bfceb7785..f67a6fc89b 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/alecthomas/kingpin/v2" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "gopkg.in/yaml.v3" "k8s.io/utils/ptr" ) @@ -53,8 +54,9 @@ type ( } PrometheusExporter struct { - Enabled *bool `yaml:"enabled"` - DebugCollectors []string `yaml:"debugCollectors"` + Enabled *bool `yaml:"enabled"` + DebugCollectors []string `yaml:"debugCollectors"` + MetricsLevel metrics.Level `yaml:"metricsLevel"` } Exporter struct { @@ -91,6 +93,45 @@ type ( } ) +// MetricsLevelValue is a custom kingpin.Value that parses metrics levels directly into metrics.Level +type MetricsLevelValue struct { + level *metrics.Level +} + +// NewMetricsLevelValue creates a new MetricsLevelValue with the given target +func NewMetricsLevelValue(target *metrics.Level) *MetricsLevelValue { + return &MetricsLevelValue{level: target} +} + +// Set implements kingpin.Value interface - parses and accumulates metrics levels +func (m *MetricsLevelValue) Set(value string) error { + // Parse the single value into a level + level, err := metrics.ParseLevel([]string{value}) + if err != nil { + return err + } + + // If this is the first value, initialize to 0 first to clear any default + allLevels := metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + if *m.level == allLevels { + *m.level = 0 + } + + // Accumulate the level using bitwise OR + *m.level |= level + return nil +} + +// String implements kingpin.Value interface +func (m *MetricsLevelValue) String() string { + return m.level.String() +} + +// IsCumulative implements kingpin.Value interface to support multiple values +func (m *MetricsLevelValue) IsCumulative() bool { + return true +} + type SkipValidation int const ( @@ -122,6 +163,7 @@ const ( ExporterPrometheusEnabledFlag = "exporter.prometheus" // NOTE: not a flag ExporterPrometheusDebugCollectors = "exporter.prometheus.debug-collectors" + ExporterPrometheusMetricsFlag = "metrics" // kubernetes flags KubernetesFlag = "kube.enable" @@ -156,6 +198,7 @@ func DefaultConfig() *Config { Prometheus: PrometheusExporter{ Enabled: ptr.To(true), DebugCollectors: []string{"go"}, + MetricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, }, }, Debug: Debug{ @@ -252,6 +295,9 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn { prometheusExporterEnabled := app.Flag(ExporterPrometheusEnabledFlag, "Enable Prometheus exporter").Default("true").Bool() + var metricsLevel = metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + app.Flag(ExporterPrometheusMetricsFlag, "Metrics levels to export (node,process,container,vm,pod)").SetValue(NewMetricsLevelValue(&metricsLevel)) + kubernetes := app.Flag(KubernetesFlag, "Monitor kubernetes").Default("false").Bool() kubeconfig := app.Flag(KubeConfigFlag, "Path to a kubeconfig. Only required if out-of-cluster.").ExistingFile() nodeName := app.Flag(KubeNodeNameFlag, "Name of kubernetes node on which kepler is running.").String() @@ -295,6 +341,10 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn { cfg.Exporter.Prometheus.Enabled = prometheusExporterEnabled } + if flagsSet[ExporterPrometheusMetricsFlag] { + cfg.Exporter.Prometheus.MetricsLevel = metricsLevel + } + if flagsSet[KubernetesFlag] { cfg.Kube.Enabled = kubernetes } @@ -468,6 +518,7 @@ func (c *Config) manualString() string { {ExporterStdoutEnabledFlag, fmt.Sprintf("%v", c.Exporter.Stdout.Enabled)}, {ExporterPrometheusEnabledFlag, fmt.Sprintf("%v", c.Exporter.Prometheus.Enabled)}, {ExporterPrometheusDebugCollectors, strings.Join(c.Exporter.Prometheus.DebugCollectors, ", ")}, + {ExporterPrometheusMetricsFlag, c.Exporter.Prometheus.MetricsLevel.String()}, {pprofEnabledFlag, fmt.Sprintf("%v", c.Debug.Pprof.Enabled)}, {KubeConfigFlag, fmt.Sprintf("%v", c.Kube.Config)}, } diff --git a/config/config_test.go b/config/config_test.go index 30b6abd0e9..068d7ce412 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -12,6 +12,8 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/stretchr/testify/assert" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" + "gopkg.in/yaml.v3" "k8s.io/utils/ptr" ) @@ -805,3 +807,410 @@ exporter: assert.Equal(t, exp.String(), cfg.String()) }) } + +func TestMetricsLevelValue_Set(t *testing.T) { + tests := []struct { + name string + initialLevel metrics.Level + setValue string + expectedLevel metrics.Level + expectError bool + errorMessage string + }{ + { + name: "Set node from default all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "node", + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + { + name: "Set process from default all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "process", + expectedLevel: metrics.MetricsLevelProcess, + expectError: false, + }, + { + name: "Set container from default all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "container", + expectedLevel: metrics.MetricsLevelContainer, + expectError: false, + }, + { + name: "Set vm from default all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "vm", + expectedLevel: metrics.MetricsLevelVM, + expectError: false, + }, + { + name: "Set pod from default all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "pod", + expectedLevel: metrics.MetricsLevelPod, + expectError: false, + }, + { + name: "Accumulate node to existing process", + initialLevel: metrics.MetricsLevelProcess, + setValue: "node", + expectedLevel: metrics.MetricsLevelProcess | metrics.MetricsLevelNode, + expectError: false, + }, + { + name: "Accumulate container to existing node+process", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess, + setValue: "container", + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer, + expectError: false, + }, + { + name: "Invalid level returns error", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "invalid", + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, // Should remain unchanged + expectError: true, + errorMessage: "unknown metrics level: invalid", + }, + { + name: "Empty string returns error", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "", + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, // Should remain unchanged + expectError: true, + errorMessage: "unknown metrics level: ", + }, + { + name: "Case insensitive - NODE", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "NODE", + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + { + name: "Case insensitive - Process", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "Process", + expectedLevel: metrics.MetricsLevelProcess, + expectError: false, + }, + { + name: "Whitespace handling - node with spaces", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: " node ", + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + { + name: "Set same level twice (idempotent)", + initialLevel: metrics.MetricsLevelNode, + setValue: "node", + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + level := tt.initialLevel + mlv := NewMetricsLevelValue(&level) + + err := mlv.Set(tt.setValue) + + if tt.expectError { + assert.Error(t, err) + if tt.errorMessage != "" { + assert.Contains(t, err.Error(), tt.errorMessage) + } + // Level should remain unchanged on error + assert.Equal(t, tt.initialLevel, level) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedLevel, level) + } + }) + } +} + +func TestMetricsLevelValue_AccumulativeBehavior(t *testing.T) { + // Test the cumulative behavior when multiple Set calls are made + tests := []struct { + name string + initialLevel metrics.Level + setValues []string + expectedLevel metrics.Level + expectError bool + }{ + { + name: "Accumulate multiple levels from all", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValues: []string{"node", "process"}, + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess, + expectError: false, + }, + { + name: "Accumulate multiple levels from none", + initialLevel: metrics.Level(0), + setValues: []string{"node", "process", "container"}, + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer, + expectError: false, + }, + { + name: "Error in middle stops processing", + initialLevel: metrics.Level(0), + setValues: []string{"node", "invalid", "process"}, + expectedLevel: metrics.MetricsLevelNode, // Should have node from first call + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + level := tt.initialLevel + mlv := NewMetricsLevelValue(&level) + + var lastErr error + for _, setValue := range tt.setValues { + err := mlv.Set(setValue) + if err != nil { + lastErr = err + break // Stop on first error + } + } + + if tt.expectError { + assert.Error(t, lastErr) + } else { + assert.NoError(t, lastErr) + } + + assert.Equal(t, tt.expectedLevel, level) + }) + } +} + +func TestMetricsLevelValue_String(t *testing.T) { + tests := []struct { + name string + level metrics.Level + expected string + }{ + { + name: "No levels (empty)", + level: metrics.Level(0), + expected: "", + }, + { + name: "All individual levels", + level: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + expected: "node,process,container,vm,pod", + }, + { + name: "Single level - node", + level: metrics.MetricsLevelNode, + expected: "node", + }, + { + name: "Multiple levels - node and process", + level: metrics.MetricsLevelNode | metrics.MetricsLevelProcess, + expected: "node,process", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mlv := NewMetricsLevelValue(&tt.level) + result := mlv.String() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestMetricsLevelValue_IsCumulative(t *testing.T) { + level := metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + mlv := NewMetricsLevelValue(&level) + assert.True(t, mlv.IsCumulative(), "MetricsLevelValue should be cumulative") +} + +func TestNewMetricsLevelValue(t *testing.T) { + t.Run("Creates valid MetricsLevelValue", func(t *testing.T) { + level := metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + mlv := NewMetricsLevelValue(&level) + + assert.NotNil(t, mlv) + assert.Equal(t, level, *mlv.level) + }) + + t.Run("Modifying target level affects MetricsLevelValue", func(t *testing.T) { + level := metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + mlv := NewMetricsLevelValue(&level) + + // Modify the original level + level = metrics.MetricsLevelNode + + // MetricsLevelValue should reflect the change + assert.Equal(t, metrics.MetricsLevelNode, *mlv.level) + }) +} + +func TestMetricsLevelValue_CommandLineIntegration(t *testing.T) { + // Test integration with kingpin command line parsing + tests := []struct { + name string + args []string + expectedLevel metrics.Level + expectError bool + }{ + { + name: "Single flag value - node", + args: []string{"--metrics", "node"}, + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + { + name: "Multiple flag values accumulate", + args: []string{"--metrics", "node", "--metrics", "process"}, + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess, + expectError: false, + }, + { + name: "All flag values", + args: []string{"--metrics", "node", "--metrics", "process", "--metrics", "container", "--metrics", "vm", "--metrics", "pod"}, + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + expectError: false, + }, + { + name: "Invalid flag value", + args: []string{"--metrics", "invalid"}, + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, // Should remain at default + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a kingpin application for testing + app := kingpin.New("test", "test application") + var metricsLevel = metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod + app.Flag("metrics", "Metrics levels to export").SetValue(NewMetricsLevelValue(&metricsLevel)) + + // Parse the arguments + _, err := app.Parse(tt.args) + + if tt.expectError { + assert.Error(t, err) + // On error, the level should remain unchanged (default) + assert.Equal(t, metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod, metricsLevel) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedLevel, metricsLevel) + } + }) + } +} + +func TestMetricsLevelValue_EdgeCases(t *testing.T) { + tests := []struct { + name string + initialLevel metrics.Level + setValue string + expectedLevel metrics.Level + expectError bool + }{ + { + name: "Special characters in value", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "node!@#", + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + expectError: true, + }, + { + name: "Numeric value", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "123", + expectedLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + expectError: true, + }, + { + name: "Tab and newline whitespace", + initialLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + setValue: "\t\nnode\t\n", + expectedLevel: metrics.MetricsLevelNode, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + level := tt.initialLevel + mlv := NewMetricsLevelValue(&level) + + err := mlv.Set(tt.setValue) + + if tt.expectError { + assert.Error(t, err) + assert.Equal(t, tt.initialLevel, level) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedLevel, level) + } + }) + } +} + +func TestMetricsLevelYAMLMarshalling(t *testing.T) { + tests := []struct { + name string + metricsLevel metrics.Level + expectedYAML string + }{ + { + name: "All individual levels", + metricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, + expectedYAML: "metricsLevel:\n - node\n - process\n - container\n - vm\n - pod", + }, + { + name: "Node only", + metricsLevel: metrics.MetricsLevelNode, + expectedYAML: "node", + }, + { + name: "Pod and Node", + metricsLevel: metrics.MetricsLevelPod | metrics.MetricsLevelNode, + expectedYAML: "metricsLevel:\n - node\n - pod", + }, + { + name: "Node and Process", + metricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess, + expectedYAML: "metricsLevel:\n - node\n - process", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := DefaultConfig() + cfg.Exporter.Prometheus.MetricsLevel = tt.metricsLevel + + // Marshal the prometheus exporter section + data, err := yaml.Marshal(cfg.Exporter.Prometheus) + assert.NoError(t, err) + + yamlStr := string(data) + + // Check that the YAML contains the expected metrics level representation + assert.Contains(t, yamlStr, tt.expectedYAML, "YAML should contain expected metrics level representation") + + // Importantly, it should NOT contain the integer representation + integerStr := fmt.Sprintf("metricsLevel: %d", tt.metricsLevel) + assert.NotContains(t, yamlStr, integerStr, "YAML should not contain integer representation") + + // Test round-trip: unmarshal back and verify it's the same + var unmarshaled PrometheusExporter + err = yaml.Unmarshal(data, &unmarshaled) + assert.NoError(t, err) + assert.Equal(t, tt.metricsLevel, unmarshaled.MetricsLevel) + }) + } +} diff --git a/hack/gen-metric-docs/main.go b/hack/gen-metric-docs/main.go index 5c29be5ac9..b44d0eef88 100644 --- a/hack/gen-metric-docs/main.go +++ b/hack/gen-metric-docs/main.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/collector" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "github.com/sustainable-computing-io/kepler/internal/monitor" ) @@ -260,7 +261,7 @@ func main() { fmt.Println("Creating collectors...") // Create a logger for the collectors logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - powerCollector := collector.NewPowerCollector(mockMonitor, "test-node", logger) + powerCollector := collector.NewPowerCollector(mockMonitor, "test-node", logger, metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) fmt.Println("Created power collector") buildInfoCollector := collector.NewKeplerBuildInfoCollector() fmt.Println("Created build info collector") diff --git a/internal/exporter/prometheus/collector/power_collector.go b/internal/exporter/prometheus/collector/power_collector.go index 42f3106fd5..66d3c6aaa1 100644 --- a/internal/exporter/prometheus/collector/power_collector.go +++ b/internal/exporter/prometheus/collector/power_collector.go @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "github.com/sustainable-computing-io/kepler/internal/monitor" ) @@ -20,8 +21,9 @@ type PowerDataProvider = monitor.PowerDataProvider // PowerCollector combines Node, Process, and Container collectors to ensure data consistency // by fetching all data in a single atomic operation during collection type PowerCollector struct { - pm PowerDataProvider - logger *slog.Logger + pm PowerDataProvider + logger *slog.Logger + metricsLevel metrics.Level // Lock to ensure thread safety during collection mutex sync.RWMutex @@ -95,7 +97,7 @@ func timeDesc(level, device, nodeName string, labels []string) *prometheus.Desc // NewPowerCollector creates a collector that provides consistent metrics // by fetching all data in a single snapshot during collection -func NewPowerCollector(monitor PowerDataProvider, nodeName string, logger *slog.Logger) *PowerCollector { +func NewPowerCollector(monitor PowerDataProvider, nodeName string, logger *slog.Logger, metricsLevel metrics.Level) *PowerCollector { const ( // these labels should remain the same across all descriptors to ease querying zone = "zone" @@ -105,8 +107,9 @@ func NewPowerCollector(monitor PowerDataProvider, nodeName string, logger *slog. ) c := &PowerCollector{ - pm: monitor, - logger: logger.With("collector", "power"), + pm: monitor, + logger: logger.With("collector", "power"), + metricsLevel: metricsLevel, nodeCPUJoulesDescriptor: joulesDesc("node", "cpu", nodeName, []string{zone, "path"}), nodeCPUWattsDescriptor: wattsDesc("node", "cpu", nodeName, []string{zone, "path"}), @@ -151,33 +154,43 @@ func (c *PowerCollector) waitForData() { // Describe implements the prometheus.Collector interface func (c *PowerCollector) Describe(ch chan<- *prometheus.Desc) { // node - ch <- c.nodeCPUJoulesDescriptor - ch <- c.nodeCPUWattsDescriptor - ch <- c.nodeCPUUsageRatioDescriptor - // node cpu active - ch <- c.nodeCPUActiveJoulesDesc - ch <- c.nodeCPUActiveWattsDesc - // node cpu idle - ch <- c.nodeCPUIdleJoulesDesc - ch <- c.nodeCPUIdleWattsDesc + if c.metricsLevel.IsNodeEnabled() { + ch <- c.nodeCPUJoulesDescriptor + ch <- c.nodeCPUWattsDescriptor + ch <- c.nodeCPUUsageRatioDescriptor + // node cpu active + ch <- c.nodeCPUActiveJoulesDesc + ch <- c.nodeCPUActiveWattsDesc + // node cpu idle + ch <- c.nodeCPUIdleJoulesDesc + ch <- c.nodeCPUIdleWattsDesc + } // process - ch <- c.processCPUJoulesDescriptor - ch <- c.processCPUWattsDescriptor - ch <- c.processCPUTimeDescriptor + if c.metricsLevel.IsProcessEnabled() { + ch <- c.processCPUJoulesDescriptor + ch <- c.processCPUWattsDescriptor + ch <- c.processCPUTimeDescriptor + } // container - ch <- c.containerCPUJoulesDescriptor - ch <- c.containerCPUWattsDescriptor - // ch <- c.containerCPUTimeDescriptor // TODO: add conntainerCPUTimeDescriptor + if c.metricsLevel.IsContainerEnabled() { + ch <- c.containerCPUJoulesDescriptor + ch <- c.containerCPUWattsDescriptor + // ch <- c.containerCPUTimeDescriptor // TODO: add conntainerCPUTimeDescriptor + } // vm - ch <- c.vmCPUJoulesDescriptor - ch <- c.vmCPUWattsDescriptor + if c.metricsLevel.IsVMEnabled() { + ch <- c.vmCPUJoulesDescriptor + ch <- c.vmCPUWattsDescriptor + } // pod - ch <- c.podCPUJoulesDescriptor - ch <- c.podCPUWattsDescriptor + if c.metricsLevel.IsPodEnabled() { + ch <- c.podCPUJoulesDescriptor + ch <- c.podCPUWattsDescriptor + } } func (c *PowerCollector) isReady() bool { @@ -205,13 +218,26 @@ func (c *PowerCollector) Collect(ch chan<- prometheus.Metric) { return } - c.collectNodeMetrics(ch, snapshot.Node) - c.collectProcessMetrics(ch, "running", snapshot.Processes) - c.collectProcessMetrics(ch, "terminated", snapshot.TerminatedProcesses) + if c.metricsLevel.IsNodeEnabled() { + c.collectNodeMetrics(ch, snapshot.Node) + } - c.collectContainerMetrics(ch, snapshot.Containers) - c.collectVMMetrics(ch, snapshot.VirtualMachines) - c.collectPodMetrics(ch, snapshot.Pods) + if c.metricsLevel.IsProcessEnabled() { + c.collectProcessMetrics(ch, "running", snapshot.Processes) + c.collectProcessMetrics(ch, "terminated", snapshot.TerminatedProcesses) + } + + if c.metricsLevel.IsContainerEnabled() { + c.collectContainerMetrics(ch, snapshot.Containers) + } + + if c.metricsLevel.IsVMEnabled() { + c.collectVMMetrics(ch, snapshot.VirtualMachines) + } + + if c.metricsLevel.IsPodEnabled() { + c.collectPodMetrics(ch, snapshot.Pods) + } } // collectNodeMetrics collects node-level power metrics diff --git a/internal/exporter/prometheus/collector/power_collector_concurrency_test.go b/internal/exporter/prometheus/collector/power_collector_concurrency_test.go index 2d00f19d81..6a7935b272 100644 --- a/internal/exporter/prometheus/collector/power_collector_concurrency_test.go +++ b/internal/exporter/prometheus/collector/power_collector_concurrency_test.go @@ -19,6 +19,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/sustainable-computing-io/kepler/internal/device" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "github.com/sustainable-computing-io/kepler/internal/monitor" ) @@ -40,7 +41,7 @@ func TestPowerCollectorConcurrency(t *testing.T) { musT(device.NewFakeCPUMeter(nil)), monitor.WithResourceInformer(ri), ) - collector := NewPowerCollector(fakeMonitor, "test-node", newLogger()) + collector := NewPowerCollector(fakeMonitor, "test-node", newLogger(), metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) assert.NoError(t, fakeMonitor.Init()) @@ -155,7 +156,7 @@ func TestPowerCollectorWithRegistry(t *testing.T) { } mockMonitor.On("Snapshot").Return(snapshot, nil) - collector := NewPowerCollector(mockMonitor, "test-node", newLogger()) + collector := NewPowerCollector(mockMonitor, "test-node", newLogger(), metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) mockMonitor.TriggerUpdate() time.Sleep(10 * time.Millisecond) @@ -261,7 +262,7 @@ func TestUpdateDuringCollection(t *testing.T) { }, }, nil) - collector := NewPowerCollector(mockMonitor, "test-node", newLogger()) + collector := NewPowerCollector(mockMonitor, "test-node", newLogger(), metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) mockMonitor.TriggerUpdate() // collector should now start building descriptors time.Sleep(10 * time.Millisecond) @@ -332,7 +333,7 @@ func TestConcurrentRegistration(t *testing.T) { monitor.WithResourceInformer(ri), ) - collector := NewPowerCollector(fakeMonitor, "test-node", newLogger()) + collector := NewPowerCollector(fakeMonitor, "test-node", newLogger(), metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) assert.NoError(t, fakeMonitor.Init()) go func() { @@ -388,7 +389,7 @@ func TestFastCollectAndDescribe(t *testing.T) { musT(device.NewFakeCPUMeter(nil)), monitor.WithResourceInformer(ri), ) - collector := NewPowerCollector(fakeMonitor, "test-node", newLogger()) + collector := NewPowerCollector(fakeMonitor, "test-node", newLogger(), metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) assert.NoError(t, fakeMonitor.Init()) diff --git a/internal/exporter/prometheus/collector/power_collector_test.go b/internal/exporter/prometheus/collector/power_collector_test.go index 1a616f519e..87e85645e5 100644 --- a/internal/exporter/prometheus/collector/power_collector_test.go +++ b/internal/exporter/prometheus/collector/power_collector_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/sustainable-computing-io/kepler/internal/device" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "github.com/sustainable-computing-io/kepler/internal/monitor" "github.com/sustainable-computing-io/kepler/internal/resource" ) @@ -284,7 +285,7 @@ func TestPowerCollector(t *testing.T) { mockMonitor.On("Snapshot").Return(testData, nil) // Create collector - collector := NewPowerCollector(mockMonitor, "test-node", logger) + collector := NewPowerCollector(mockMonitor, "test-node", logger, metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) // Trigger update to ensure descriptors are created mockMonitor.TriggerUpdate() @@ -565,7 +566,7 @@ func TestTerminatedProcessExport(t *testing.T) { mockMonitor.On("Snapshot").Return(testSnapshot, nil) - collector := NewPowerCollector(mockMonitor, "test-node", logger) + collector := NewPowerCollector(mockMonitor, "test-node", logger, metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) registry := prometheus.NewRegistry() registry.MustRegister(collector) @@ -670,7 +671,7 @@ func TestEnhancedErrorReporting(t *testing.T) { } mockMonitor.On("Snapshot").Return(testSnapshot, nil) - collector := NewPowerCollector(mockMonitor, "test-node", logger) + collector := NewPowerCollector(mockMonitor, "test-node", logger, metrics.MetricsLevelNode|metrics.MetricsLevelProcess|metrics.MetricsLevelContainer|metrics.MetricsLevelVM|metrics.MetricsLevelPod) registry := prometheus.NewRegistry() registry.MustRegister(collector) mockMonitor.TriggerUpdate() @@ -682,3 +683,180 @@ func TestEnhancedErrorReporting(t *testing.T) { mockMonitor.AssertExpectations(t) } + +func TestPowerCollector_MetricsLevelFiltering(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + tests := []struct { + name string + metricsLevel metrics.Level + expectedMetrics map[string]bool + }{ + { + name: "Only Node metrics", + metricsLevel: metrics.MetricsLevelNode, + expectedMetrics: map[string]bool{ + "kepler_node_cpu_joules_total": true, + "kepler_node_cpu_watts": true, + "kepler_node_cpu_usage_ratio": true, + "kepler_node_cpu_active_joules_total": true, + "kepler_node_cpu_active_watts": true, + "kepler_node_cpu_idle_joules_total": true, + "kepler_node_cpu_idle_watts": true, + "kepler_process_cpu_joules_total": false, + "kepler_container_cpu_joules_total": false, + "kepler_vm_cpu_joules_total": false, + "kepler_pod_cpu_joules_total": false, + }, + }, + { + name: "Only Process metrics", + metricsLevel: metrics.MetricsLevelProcess, + expectedMetrics: map[string]bool{ + "kepler_node_cpu_joules_total": false, + "kepler_process_cpu_joules_total": true, + "kepler_process_cpu_watts": true, + "kepler_process_cpu_seconds_total": true, + "kepler_container_cpu_joules_total": false, + "kepler_vm_cpu_joules_total": false, + "kepler_pod_cpu_joules_total": false, + }, + }, + { + name: "Node and Container metrics", + metricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelContainer, + expectedMetrics: map[string]bool{ + "kepler_node_cpu_joules_total": true, + "kepler_node_cpu_watts": true, + "kepler_process_cpu_joules_total": false, + "kepler_container_cpu_joules_total": true, + "kepler_container_cpu_watts": true, + "kepler_vm_cpu_joules_total": false, + "kepler_pod_cpu_joules_total": false, + }, + }, + { + name: "No metrics", + metricsLevel: metrics.Level(0), + expectedMetrics: map[string]bool{ + "kepler_node_cpu_joules_total": false, + "kepler_process_cpu_joules_total": false, + "kepler_container_cpu_joules_total": false, + "kepler_vm_cpu_joules_total": false, + "kepler_pod_cpu_joules_total": false, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockMonitor := NewMockPowerMonitor() + + // Create test data with all types of metrics + packageZone := device.NewMockRaplZone("package", 0, "/sys/class/powercap/intel-rapl/intel-rapl:0", 1000) + testData := &monitor.Snapshot{ + Timestamp: time.Now(), + Node: &monitor.Node{ + Zones: monitor.NodeZoneUsageMap{ + packageZone: monitor.NodeUsage{ + EnergyTotal: 1000 * device.Joule, + Power: 10 * device.Watt, + ActiveEnergyTotal: 500 * device.Joule, + IdleEnergyTotal: 500 * device.Joule, + ActivePower: 5 * device.Watt, + IdlePower: 5 * device.Watt, + }, + }, + UsageRatio: 0.5, + }, + Processes: monitor.Processes{ + 123: &monitor.Process{ + PID: 123, + Comm: "test-process", + Exe: "/usr/bin/test-process", + Type: resource.RegularProcess, + CPUTotalTime: 5.0, + Zones: monitor.ZoneUsageMap{ + packageZone: monitor.Usage{ + EnergyTotal: 100 * device.Joule, + Power: 5 * device.Watt, + }, + }, + ContainerID: "test-container", + VirtualMachineID: "test-vm", + }, + }, + Containers: monitor.Containers{ + "test-container": &monitor.Container{ + ID: "test-container", + Name: "test-container", + Runtime: resource.PodmanRuntime, + PodID: "test-pod", + Zones: monitor.ZoneUsageMap{ + packageZone: monitor.Usage{ + EnergyTotal: 100 * device.Joule, + Power: 5 * device.Watt, + }, + }, + }, + }, + VirtualMachines: monitor.VirtualMachines{ + "test-vm": &monitor.VirtualMachine{ + ID: "test-vm", + Name: "test-vm", + Hypervisor: resource.KVMHypervisor, + Zones: monitor.ZoneUsageMap{ + packageZone: monitor.Usage{ + EnergyTotal: 100 * device.Joule, + Power: 5 * device.Watt, + }, + }, + }, + }, + Pods: monitor.Pods{ + "test-pod": &monitor.Pod{ + ID: "test-pod", + Name: "test-pod", + Namespace: "default", + Zones: monitor.ZoneUsageMap{ + packageZone: monitor.Usage{ + EnergyTotal: 100 * device.Joule, + Power: 5 * device.Watt, + }, + }, + }, + }, + } + + mockMonitor.On("Snapshot").Return(testData, nil) + + collector := NewPowerCollector(mockMonitor, "test-node", logger, tt.metricsLevel) + registry := prometheus.NewRegistry() + registry.MustRegister(collector) + + mockMonitor.TriggerUpdate() + time.Sleep(10 * time.Millisecond) + + // Gather metrics + metricFamilies, err := registry.Gather() + assert.NoError(t, err) + + // Create a map of existing metrics + existingMetrics := make(map[string]bool) + for _, mf := range metricFamilies { + existingMetrics[mf.GetName()] = true + } + + // Check expected metrics + for metricName, shouldExist := range tt.expectedMetrics { + if shouldExist { + assert.True(t, existingMetrics[metricName], "Expected metric %s to exist", metricName) + } else { + assert.False(t, existingMetrics[metricName], "Expected metric %s to not exist", metricName) + } + } + + mockMonitor.AssertExpectations(t) + }) + } +} diff --git a/internal/exporter/prometheus/metrics/level.go b/internal/exporter/prometheus/metrics/level.go new file mode 100644 index 0000000000..80314a24d5 --- /dev/null +++ b/internal/exporter/prometheus/metrics/level.go @@ -0,0 +1,152 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "fmt" + "strings" +) + +// Level represents the metrics level configuration using bit patterns +type Level uint32 + +const ( + // Individual metric levels using bit patterns + MetricsLevelNode Level = 1 << iota // 1 + MetricsLevelProcess // 2 + MetricsLevelContainer // 4 + MetricsLevelVM // 8 + MetricsLevelPod // 16 +) + +// String returns the string representation of the level +func (l Level) String() string { + var levels []string + if l.IsNodeEnabled() { + levels = append(levels, "node") + } + if l.IsProcessEnabled() { + levels = append(levels, "process") + } + if l.IsContainerEnabled() { + levels = append(levels, "container") + } + if l.IsVMEnabled() { + levels = append(levels, "vm") + } + if l.IsPodEnabled() { + levels = append(levels, "pod") + } + return strings.Join(levels, ",") +} + +// IsNodeEnabled checks if node metrics are enabled +func (l Level) IsNodeEnabled() bool { + return l&MetricsLevelNode != 0 +} + +// IsProcessEnabled checks if process metrics are enabled +func (l Level) IsProcessEnabled() bool { + return l&MetricsLevelProcess != 0 +} + +// IsContainerEnabled checks if container metrics are enabled +func (l Level) IsContainerEnabled() bool { + return l&MetricsLevelContainer != 0 +} + +// IsVMEnabled checks if VM metrics are enabled +func (l Level) IsVMEnabled() bool { + return l&MetricsLevelVM != 0 +} + +// IsPodEnabled checks if pod metrics are enabled +func (l Level) IsPodEnabled() bool { + return l&MetricsLevelPod != 0 +} + +// ParseLevel parses a slice of strings into a Level +func ParseLevel(levels []string) (Level, error) { + if len(levels) == 0 { + return MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, nil + } + + var result Level + for _, level := range levels { + switch strings.ToLower(strings.TrimSpace(level)) { + case "node": + result |= MetricsLevelNode + case "process": + result |= MetricsLevelProcess + case "container": + result |= MetricsLevelContainer + case "vm": + result |= MetricsLevelVM + case "pod": + result |= MetricsLevelPod + default: + return 0, fmt.Errorf("unknown metrics level: %s", level) + } + } + + return result, nil +} + +// ValidLevels returns the list of valid metrics levels +func ValidLevels() []string { + return []string{"node", "process", "container", "vm", "pod"} +} + +// MarshalYAML implements yaml.Marshaler interface +func (l Level) MarshalYAML() (interface{}, error) { + var levels []string + if l.IsNodeEnabled() { + levels = append(levels, "node") + } + if l.IsProcessEnabled() { + levels = append(levels, "process") + } + if l.IsContainerEnabled() { + levels = append(levels, "container") + } + if l.IsVMEnabled() { + levels = append(levels, "vm") + } + if l.IsPodEnabled() { + levels = append(levels, "pod") + } + + // Return as slice for multiple levels, single string for one level + if len(levels) == 1 { + return levels[0], nil + } + return levels, nil +} + +// UnmarshalYAML implements yaml.Unmarshaler interface +func (l *Level) UnmarshalYAML(unmarshal func(interface{}) error) error { + // Try to unmarshal as a string first + var single string + if err := unmarshal(&single); err == nil { + parsed, parseErr := ParseLevel([]string{single}) + if parseErr != nil { + return parseErr + } + *l = parsed + return nil + } + + // Try to unmarshal as a slice of strings + var multiple []string + if err := unmarshal(&multiple); err == nil { + parsed, parseErr := ParseLevel(multiple) + if parseErr != nil { + return parseErr + } + *l = parsed + return nil + } + + return fmt.Errorf("cannot unmarshal metrics level: must be a string or array of strings") +} diff --git a/internal/exporter/prometheus/metrics/level_test.go b/internal/exporter/prometheus/metrics/level_test.go new file mode 100644 index 0000000000..7cdbdf694d --- /dev/null +++ b/internal/exporter/prometheus/metrics/level_test.go @@ -0,0 +1,348 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" +) + +func TestLevel_IsEnabled(t *testing.T) { + tests := []struct { + name string + level Level + expected map[string]bool + }{ + { + name: "All levels", + level: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: map[string]bool{ + "node": true, + "process": true, + "container": true, + "vm": true, + "pod": true, + }, + }, + { + name: "Node only", + level: MetricsLevelNode, + expected: map[string]bool{ + "node": true, + "process": false, + "container": false, + "vm": false, + "pod": false, + }, + }, + { + name: "Node and Process", + level: MetricsLevelNode | MetricsLevelProcess, + expected: map[string]bool{ + "node": true, + "process": true, + "container": false, + "vm": false, + "pod": false, + }, + }, + { + name: "Container, VM, and Pod", + level: MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: map[string]bool{ + "node": false, + "process": false, + "container": true, + "vm": true, + "pod": true, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected["node"], tt.level.IsNodeEnabled()) + assert.Equal(t, tt.expected["process"], tt.level.IsProcessEnabled()) + assert.Equal(t, tt.expected["container"], tt.level.IsContainerEnabled()) + assert.Equal(t, tt.expected["vm"], tt.level.IsVMEnabled()) + assert.Equal(t, tt.expected["pod"], tt.level.IsPodEnabled()) + }) + } +} + +func TestLevel_String(t *testing.T) { + tests := []struct { + name string + level Level + expected string + }{ + { + name: "All levels", + level: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: "node,process,container,vm,pod", + }, + { + name: "Node only", + level: MetricsLevelNode, + expected: "node", + }, + { + name: "Process only", + level: MetricsLevelProcess, + expected: "process", + }, + { + name: "Node and Process", + level: MetricsLevelNode | MetricsLevelProcess, + expected: "node,process", + }, + { + name: "Container, VM, and Pod", + level: MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: "container,vm,pod", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.level.String()) + }) + } +} + +func TestParseLevel(t *testing.T) { + tests := []struct { + name string + levels []string + expected Level + expectError bool + }{ + { + name: "Empty slice", + levels: []string{}, + expected: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expectError: false, + }, + { + name: "Single level", + levels: []string{"node"}, + expected: MetricsLevelNode, + expectError: false, + }, + { + name: "Multiple levels", + levels: []string{"node", "process"}, + expected: MetricsLevelNode | MetricsLevelProcess, + expectError: false, + }, + { + name: "All levels", + levels: []string{"node", "process", "container", "vm", "pod"}, + expected: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expectError: false, + }, + { + name: "Case insensitive", + levels: []string{"NODE", "Process", "CONTAINER"}, + expected: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer, + expectError: false, + }, + { + name: "With whitespace", + levels: []string{" node ", " process "}, + expected: MetricsLevelNode | MetricsLevelProcess, + expectError: false, + }, + { + name: "Invalid level", + levels: []string{"invalid"}, + expected: 0, + expectError: true, + }, + { + name: "Mixed valid and invalid", + levels: []string{"node", "invalid"}, + expected: 0, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := ParseLevel(tt.levels) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func TestValidLevels(t *testing.T) { + expected := []string{"node", "process", "container", "vm", "pod"} + result := ValidLevels() + assert.Equal(t, expected, result) +} + +func TestBitPatterns(t *testing.T) { + // Test that bit patterns are unique powers of 2 + assert.Equal(t, Level(1), MetricsLevelNode) // 1 << 1 = 2 (corrected after fix) + assert.Equal(t, Level(2), MetricsLevelProcess) // 1 << 2 = 4 + assert.Equal(t, Level(4), MetricsLevelContainer) // 1 << 3 = 8 + assert.Equal(t, Level(8), MetricsLevelVM) // 1 << 4 = 16 + assert.Equal(t, Level(16), MetricsLevelPod) // 1 << 5 = 32 + + // Test that combined levels work correctly + expected := MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod + assert.Equal(t, expected, MetricsLevelNode|MetricsLevelProcess|MetricsLevelContainer|MetricsLevelVM|MetricsLevelPod) +} + +func TestLevel_MarshalYAML(t *testing.T) { + tests := []struct { + name string + level Level + expected string + }{ + { + name: "All levels", + level: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: "- node\n- process\n- container\n- vm\n- pod\n", + }, + { + name: "Node only", + level: MetricsLevelNode, + expected: "node\n", + }, + { + name: "Process only", + level: MetricsLevelProcess, + expected: "process\n", + }, + { + name: "Node and Process", + level: MetricsLevelNode | MetricsLevelProcess, + expected: "- node\n- process\n", + }, + { + name: "Container, VM, and Pod", + level: MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expected: "- container\n- vm\n- pod\n", + }, + { + name: "Pod and Node (17)", + level: MetricsLevelPod | MetricsLevelNode, // 16 + 1 = 17 + expected: "- node\n- pod\n", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, err := yaml.Marshal(tt.level) + assert.NoError(t, err) + assert.Equal(t, tt.expected, string(data)) + }) + } +} + +func TestLevel_UnmarshalYAML(t *testing.T) { + tests := []struct { + name string + yamlData string + expected Level + expectError bool + }{ + { + name: "Node string", + yamlData: "node", + expected: MetricsLevelNode, + expectError: false, + }, + { + name: "Process string", + yamlData: "process", + expected: MetricsLevelProcess, + expectError: false, + }, + { + name: "Array of levels", + yamlData: "- node\n- process", + expected: MetricsLevelNode | MetricsLevelProcess, + expectError: false, + }, + { + name: "Array with all levels", + yamlData: "- node\n- process\n- container\n- vm\n- pod", + expected: MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + expectError: false, + }, + { + name: "Pod and Node array (should be 17)", + yamlData: "- node\n- pod", + expected: MetricsLevelPod | MetricsLevelNode, // 16 + 1 = 17 + expectError: false, + }, + { + name: "Case insensitive", + yamlData: "- NODE\n- Process", + expected: MetricsLevelNode | MetricsLevelProcess, + expectError: false, + }, + { + name: "Invalid level string", + yamlData: "invalid", + expected: 0, + expectError: true, + }, + { + name: "Invalid level in array", + yamlData: "- node\n- invalid", + expected: 0, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var level Level + err := yaml.Unmarshal([]byte(tt.yamlData), &level) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, level) + } + }) + } +} + +func TestLevel_YAMLRoundTrip(t *testing.T) { + tests := []Level{ + MetricsLevelNode, + MetricsLevelProcess, + MetricsLevelNode | MetricsLevelProcess, + MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + MetricsLevelPod | MetricsLevelNode, // 17 + MetricsLevelNode | MetricsLevelProcess | MetricsLevelContainer | MetricsLevelVM | MetricsLevelPod, + } + + for _, original := range tests { + t.Run(original.String(), func(t *testing.T) { + // Marshal to YAML + data, err := yaml.Marshal(original) + assert.NoError(t, err) + + // Unmarshal back + var roundTrip Level + err = yaml.Unmarshal(data, &roundTrip) + assert.NoError(t, err) + + // Should be equal + assert.Equal(t, original, roundTrip) + }) + } +} diff --git a/internal/exporter/prometheus/prometheus.go b/internal/exporter/prometheus/prometheus.go index c6f61b1bff..93e0514755 100644 --- a/internal/exporter/prometheus/prometheus.go +++ b/internal/exporter/prometheus/prometheus.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" collector "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/collector" + "github.com/sustainable-computing-io/kepler/internal/exporter/prometheus/metrics" "github.com/sustainable-computing-io/kepler/internal/monitor" "github.com/sustainable-computing-io/kepler/internal/service" ) @@ -31,6 +32,7 @@ type Opts struct { collectors map[string]prom.Collector procfs string nodeName string + metricsLevel metrics.Level } // DefaultOpts() returns a new Opts with defaults set @@ -40,7 +42,8 @@ func DefaultOpts() Opts { debugCollectors: map[string]bool{ "go": true, }, - collectors: map[string]prom.Collector{}, + collectors: map[string]prom.Collector{}, + metricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, } } @@ -85,6 +88,12 @@ func WithNodeName(nodeName string) OptionFn { } } +func WithMetricsLevel(level metrics.Level) OptionFn { + return func(o *Opts) { + o.metricsLevel = level + } +} + // Exporter exports power data to Prometheus type Exporter struct { logger *slog.Logger @@ -129,15 +138,16 @@ func collectorForName(name string) (prom.Collector, error) { func CreateCollectors(pm Monitor, applyOpts ...OptionFn) (map[string]prom.Collector, error) { opts := Opts{ - logger: slog.Default(), - procfs: "/proc", + logger: slog.Default(), + procfs: "/proc", + metricsLevel: metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod, } for _, apply := range applyOpts { apply(&opts) } collectors := map[string]prom.Collector{ "build_info": collector.NewKeplerBuildInfoCollector(), - "power": collector.NewPowerCollector(pm, opts.nodeName, opts.logger), + "power": collector.NewPowerCollector(pm, opts.nodeName, opts.logger, opts.metricsLevel), } cpuInfoCollector, err := collector.NewCPUInfoCollector(opts.procfs) if err != nil {