diff --git a/collector/collector.go b/collector/collector.go index 9768fb6a..f349c6ff 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -15,6 +15,7 @@ package collector import ( + "context" "errors" "fmt" "log/slog" @@ -27,6 +28,7 @@ import ( // Namespace defines the common namespace to be used by all metrics. const namespace = "node" +const collectorTimeout = 20 * time.Second var ( scrapeDurationDesc = prometheus.NewDesc( @@ -157,12 +159,56 @@ func (n NodeCollector) Collect(ch chan<- prometheus.Metric) { func execute(name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) { begin := time.Now() - err := c.Update(ch) + + // Create a context with a 20-second timeout + ctx, cancel := context.WithTimeout(context.Background(), collectorTimeout) + defer cancel() + + // Create a separate channel for the collector to write to + collectorCh := make(chan prometheus.Metric, 100) // Buffered channel to reduce blocking + + // Create a channel to receive the error from Update + errCh := make(chan error, 1) + + // Run Update in a goroutine + go func() { + defer close(collectorCh) + errCh <- c.Update(collectorCh) + }() + + // Forward metrics from the collector to the channel + var err error + doForwarding := true + + for doForwarding { + select { + case metric, ok := <-collectorCh: + if !ok { + // Channel closed, get the error from Update + err = <-errCh + doForwarding = false + break + } + sendMetricSafely(ch, metric, logger, name) + case <-ctx.Done(): + err = ctx.Err() + doForwarding = false + // Start a goroutine to drain collectorCh without blocking + go func() { + for range collectorCh { + // Drain collectorCh + } + }() + } + } + duration := time.Since(begin) var success float64 if err != nil { - if IsNoDataError(err) { + if err == context.DeadlineExceeded { + logger.Error("collector timed out", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } else if IsNoDataError(err) { logger.Debug("collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err) } else { logger.Error("collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) @@ -172,8 +218,18 @@ func execute(name string, c Collector, ch chan<- prometheus.Metric, logger *slog logger.Debug("collector succeeded", "name", name, "duration_seconds", duration.Seconds()) success = 1 } - ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name) - ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name) + // Send the final metrics + sendMetricSafely(ch, prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name), logger, name) + sendMetricSafely(ch, prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name), logger, name) +} + +// sendMetricSafely sends a metric to the channel, logging a warning if it fails +func sendMetricSafely(ch chan<- prometheus.Metric, metric prometheus.Metric, logger *slog.Logger, name string) { + select { + case ch <- metric: + default: + logger.Info("msg", "failed to send metric", "name", name) + } } // Collector is the interface a collector has to implement.