mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Capitalizing first letter of all log lines (#7043)
Signed-off-by: Marek Slabicki <thaniri@gmail.com>
This commit is contained in:
parent
b3cf6ef332
commit
8224ddec23
|
@ -295,7 +295,7 @@ func main() {
|
||||||
|
|
||||||
if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 {
|
if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 {
|
||||||
cfg.tsdb.RetentionDuration = defaultRetentionDuration
|
cfg.tsdb.RetentionDuration = defaultRetentionDuration
|
||||||
level.Info(logger).Log("msg", "no time or size retention was set so using the default time retention", "duration", defaultRetentionDuration)
|
level.Info(logger).Log("msg", "No time or size retention was set so using the default time retention", "duration", defaultRetentionDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for overflows. This limits our max retention to 100y.
|
// Check for overflows. This limits our max retention to 100y.
|
||||||
|
@ -305,7 +305,7 @@ func main() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
cfg.tsdb.RetentionDuration = y
|
cfg.tsdb.RetentionDuration = y
|
||||||
level.Warn(logger).Log("msg", "time retention value is too high. Limiting to: "+y.String())
|
level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -239,7 +239,7 @@ func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targ
|
||||||
case tgs, ok := <-updates:
|
case tgs, ok := <-updates:
|
||||||
receivedUpdates.WithLabelValues(m.name).Inc()
|
receivedUpdates.WithLabelValues(m.name).Inc()
|
||||||
if !ok {
|
if !ok {
|
||||||
level.Debug(m.logger).Log("msg", "discoverer channel closed", "provider", p.name)
|
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,7 +271,7 @@ func (m *Manager) sender() {
|
||||||
case m.syncCh <- m.allGroups():
|
case m.syncCh <- m.allGroups():
|
||||||
default:
|
default:
|
||||||
delayedUpdates.WithLabelValues(m.name).Inc()
|
delayedUpdates.WithLabelValues(m.name).Inc()
|
||||||
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle")
|
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
|
||||||
select {
|
select {
|
||||||
case m.triggerSend <- struct{}{}:
|
case m.triggerSend <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (c *Client) Write(samples model.Samples) error {
|
||||||
t := float64(s.Timestamp.UnixNano()) / 1e9
|
t := float64(s.Timestamp.UnixNano()) / 1e9
|
||||||
v := float64(s.Value)
|
v := float64(s.Value)
|
||||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||||
level.Debug(c.logger).Log("msg", "cannot send value to Graphite, skipping sample", "value", v, "sample", s)
|
level.Debug(c.logger).Log("msg", "Cannot send value to Graphite, skipping sample", "value", v, "sample", s)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Fprintf(&buf, "%s %f %f\n", k, v, t)
|
fmt.Fprintf(&buf, "%s %f %f\n", k, v, t)
|
||||||
|
|
|
@ -84,7 +84,7 @@ func (c *Client) Write(samples model.Samples) error {
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
v := float64(s.Value)
|
v := float64(s.Value)
|
||||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||||
level.Debug(c.logger).Log("msg", "cannot send to InfluxDB, skipping sample", "value", v, "sample", s)
|
level.Debug(c.logger).Log("msg", "Cannot send to InfluxDB, skipping sample", "value", v, "sample", s)
|
||||||
c.ignoredSamples.Inc()
|
c.ignoredSamples.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (c *Client) Write(samples model.Samples) error {
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
v := float64(s.Value)
|
v := float64(s.Value)
|
||||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||||
level.Debug(c.logger).Log("msg", "cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s)
|
level.Debug(c.logger).Log("msg", "Cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
metric := TagValue(s.Metric[model.MetricNameLabel])
|
metric := TagValue(s.Metric[model.MetricNameLabel])
|
||||||
|
|
|
@ -308,7 +308,7 @@ func NewEngine(opts EngineOpts) *Engine {
|
||||||
if opts.LookbackDelta == 0 {
|
if opts.LookbackDelta == 0 {
|
||||||
opts.LookbackDelta = defaultLookbackDelta
|
opts.LookbackDelta = defaultLookbackDelta
|
||||||
if l := opts.Logger; l != nil {
|
if l := opts.Logger; l != nil {
|
||||||
level.Debug(l).Log("msg", "lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
|
level.Debug(l).Log("msg", "Lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +345,7 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) {
|
||||||
// not make reload fail; only log a warning.
|
// not make reload fail; only log a warning.
|
||||||
err := ng.queryLogger.Close()
|
err := ng.queryLogger.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Warn(ng.logger).Log("msg", "error while closing the previous query log file", "err", err)
|
level.Warn(ng.logger).Log("msg", "Error while closing the previous query log file", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
|
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := app.Commit(); err != nil {
|
if err := app.Commit(); err != nil {
|
||||||
level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err)
|
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
g.seriesInPreviousEval[i] = seriesReturned
|
g.seriesInPreviousEval[i] = seriesReturned
|
||||||
|
@ -637,7 +637,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
// Do not count these in logging, as this is expected if series
|
// Do not count these in logging, as this is expected if series
|
||||||
// is exposed from a different rule.
|
// is exposed from a different rule.
|
||||||
default:
|
default:
|
||||||
level.Warn(g.logger).Log("msg", "adding stale sample failed", "sample", metric, "err", err)
|
level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -660,11 +660,11 @@ func (g *Group) cleanupStaleSeries(ts time.Time) {
|
||||||
// Do not count these in logging, as this is expected if series
|
// Do not count these in logging, as this is expected if series
|
||||||
// is exposed from a different rule.
|
// is exposed from a different rule.
|
||||||
default:
|
default:
|
||||||
level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err)
|
level.Warn(g.logger).Log("msg", "Adding stale sample for previous configuration failed", "sample", s, "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := app.Commit(); err != nil {
|
if err := app.Commit(); err != nil {
|
||||||
level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err)
|
level.Warn(g.logger).Log("msg", "Stale sample appending for previous configuration failed", "err", err)
|
||||||
} else {
|
} else {
|
||||||
g.staleSeries = nil
|
g.staleSeries = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -969,11 +969,11 @@ mainLoop:
|
||||||
// we still call sl.append to trigger stale markers.
|
// we still call sl.append to trigger stale markers.
|
||||||
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
|
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
level.Debug(sl.l).Log("msg", "append failed", "err", appErr)
|
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
||||||
// The append failed, probably due to a parse error or sample limit.
|
// The append failed, probably due to a parse error or sample limit.
|
||||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||||
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
|
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
|
||||||
level.Warn(sl.l).Log("msg", "append failed", "err", err)
|
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -984,7 +984,7 @@ mainLoop:
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||||
level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
|
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
|
||||||
}
|
}
|
||||||
last = start
|
last = start
|
||||||
|
|
||||||
|
@ -1172,7 +1172,7 @@ loop:
|
||||||
sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, appErrs)
|
sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, appErrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != storage.ErrNotFound {
|
if err != storage.ErrNotFound {
|
||||||
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
|
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
|
||||||
}
|
}
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
|
|
@ -318,7 +318,7 @@ outer:
|
||||||
t.droppedSamplesTotal.Inc()
|
t.droppedSamplesTotal.Inc()
|
||||||
t.samplesDropped.incr(1)
|
t.samplesDropped.incr(1)
|
||||||
if _, ok := t.droppedSeries[s.Ref]; !ok {
|
if _, ok := t.droppedSeries[s.Ref]; !ok {
|
||||||
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
|
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
|
||||||
}
|
}
|
||||||
t.seriesMtx.Unlock()
|
t.seriesMtx.Unlock()
|
||||||
continue
|
continue
|
||||||
|
@ -881,7 +881,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.qm.retriedSamplesTotal.Add(float64(len(samples)))
|
s.qm.retriedSamplesTotal.Add(float64(len(samples)))
|
||||||
level.Debug(s.qm.logger).Log("msg", "failed to send batch, retrying", "err", err)
|
level.Debug(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||||
|
|
||||||
time.Sleep(time.Duration(backoff))
|
time.Sleep(time.Duration(backoff))
|
||||||
backoff = backoff * 2
|
backoff = backoff * 2
|
||||||
|
|
|
@ -107,7 +107,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
// external labels change.
|
// external labels change.
|
||||||
externalLabelUnchanged := externalLabelHash == rws.externalLabelHash
|
externalLabelUnchanged := externalLabelHash == rws.externalLabelHash
|
||||||
if configHash == rws.configHash && externalLabelUnchanged {
|
if configHash == rws.configHash && externalLabelUnchanged {
|
||||||
level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers")
|
level.Debug(rws.logger).Log("msg", "Remote write config has not changed, no need to restart QueueManagers")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -674,7 +674,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
if i > 0 && b.Meta().MinTime < globalMaxt {
|
if i > 0 && b.Meta().MinTime < globalMaxt {
|
||||||
c.metrics.overlappingBlocks.Inc()
|
c.metrics.overlappingBlocks.Inc()
|
||||||
overlapping = true
|
overlapping = true
|
||||||
level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
|
level.Warn(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
|
||||||
}
|
}
|
||||||
if b.Meta().MaxTime > globalMaxt {
|
if b.Meta().MaxTime > globalMaxt {
|
||||||
globalMaxt = b.Meta().MaxTime
|
globalMaxt = b.Meta().MaxTime
|
||||||
|
|
16
tsdb/db.go
16
tsdb/db.go
|
@ -433,7 +433,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
||||||
if len(corrupted) > 0 {
|
if len(corrupted) > 0 {
|
||||||
for _, b := range loadable {
|
for _, b := range loadable {
|
||||||
if err := b.Close(); err != nil {
|
if err := b.Close(); err != nil {
|
||||||
level.Warn(db.logger).Log("msg", "closing a block", err)
|
level.Warn(db.logger).Log("msg", "Closing a block", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted)
|
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted)
|
||||||
|
@ -452,7 +452,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
||||||
blockMetas = append(blockMetas, b.Meta())
|
blockMetas = append(blockMetas, b.Meta())
|
||||||
}
|
}
|
||||||
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
|
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
|
||||||
level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String())
|
level.Warn(db.logger).Log("msg", "Overlapping blocks found during opening", "detail", overlaps.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all previously open readers and add the new ones to the cache.
|
// Close all previously open readers and add the new ones to the cache.
|
||||||
|
@ -612,7 +612,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
|
|
||||||
if initErr := db.head.Init(minValidTime); initErr != nil {
|
if initErr := db.head.Init(minValidTime); initErr != nil {
|
||||||
db.head.metrics.walCorruptionsTotal.Inc()
|
db.head.metrics.walCorruptionsTotal.Inc()
|
||||||
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", initErr)
|
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
|
||||||
if err := wlog.Repair(initErr); err != nil {
|
if err := wlog.Repair(initErr); err != nil {
|
||||||
return nil, errors.Wrap(err, "repair corrupted WAL")
|
return nil, errors.Wrap(err, "repair corrupted WAL")
|
||||||
}
|
}
|
||||||
|
@ -908,7 +908,7 @@ func (db *DB) reload() (err error) {
|
||||||
blockMetas = append(blockMetas, b.Meta())
|
blockMetas = append(blockMetas, b.Meta())
|
||||||
}
|
}
|
||||||
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
|
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
|
||||||
level.Warn(db.logger).Log("msg", "overlapping blocks found during reload", "detail", overlaps.String())
|
level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range oldBlocks {
|
for _, b := range oldBlocks {
|
||||||
|
@ -1041,7 +1041,7 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
|
||||||
for ulid, block := range blocks {
|
for ulid, block := range blocks {
|
||||||
if block != nil {
|
if block != nil {
|
||||||
if err := block.Close(); err != nil {
|
if err := block.Close(); err != nil {
|
||||||
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
|
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
|
||||||
|
@ -1220,7 +1220,7 @@ func (db *DB) DisableCompactions() {
|
||||||
defer db.autoCompactMtx.Unlock()
|
defer db.autoCompactMtx.Unlock()
|
||||||
|
|
||||||
db.autoCompact = false
|
db.autoCompact = false
|
||||||
level.Info(db.logger).Log("msg", "compactions disabled")
|
level.Info(db.logger).Log("msg", "Compactions disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableCompactions enables auto compactions.
|
// EnableCompactions enables auto compactions.
|
||||||
|
@ -1229,7 +1229,7 @@ func (db *DB) EnableCompactions() {
|
||||||
defer db.autoCompactMtx.Unlock()
|
defer db.autoCompactMtx.Unlock()
|
||||||
|
|
||||||
db.autoCompact = true
|
db.autoCompact = true
|
||||||
level.Info(db.logger).Log("msg", "compactions enabled")
|
level.Info(db.logger).Log("msg", "Compactions enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot writes the current data to the directory. If withHead is set to true it
|
// Snapshot writes the current data to the directory. If withHead is set to true it
|
||||||
|
@ -1249,7 +1249,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
|
||||||
defer db.mtx.RUnlock()
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
level.Info(db.logger).Log("msg", "Snapshotting block", "block", b)
|
||||||
|
|
||||||
if err := b.Snapshot(dir); err != nil {
|
if err := b.Snapshot(dir); err != nil {
|
||||||
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
|
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
|
||||||
|
|
12
tsdb/head.go
12
tsdb/head.go
|
@ -555,7 +555,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if unknownRefs > 0 {
|
if unknownRefs > 0 {
|
||||||
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
|
level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -572,7 +572,7 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile")
|
level.Info(h.logger).Log("msg", "Replaying WAL, this may take awhile")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
// Backfill the checkpoint first if it exists.
|
// Backfill the checkpoint first if it exists.
|
||||||
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
|
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
|
||||||
|
@ -587,7 +587,7 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := sr.Close(); err != nil {
|
if err := sr.Close(); err != nil {
|
||||||
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -616,7 +616,7 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
sr := wal.NewSegmentBufReader(s)
|
sr := wal.NewSegmentBufReader(s)
|
||||||
err = h.loadWAL(wal.NewReader(sr), multiRef)
|
err = h.loadWAL(wal.NewReader(sr), multiRef)
|
||||||
if err := sr.Close(); err != nil {
|
if err := sr.Close(); err != nil {
|
||||||
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -659,7 +659,7 @@ func (h *Head) Truncate(mint int64) (err error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
h.gc()
|
h.gc()
|
||||||
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
|
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
|
||||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
if h.wal == nil {
|
if h.wal == nil {
|
||||||
|
@ -1399,7 +1399,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
s := h.head.series.getByID(p.At())
|
s := h.head.series.getByID(p.At())
|
||||||
if s == nil {
|
if s == nil {
|
||||||
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
level.Debug(h.head.logger).Log("msg", "Looked up series not found")
|
||||||
} else {
|
} else {
|
||||||
series = append(series, s)
|
series = append(series, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||||
}
|
}
|
||||||
if meta.Version == metaVersion1 {
|
if meta.Version == metaVersion1 {
|
||||||
level.Info(logger).Log(
|
level.Info(logger).Log(
|
||||||
"msg", "found healthy block",
|
"msg", "Found healthy block",
|
||||||
"mint", meta.MinTime,
|
"mint", meta.MinTime,
|
||||||
"maxt", meta.MaxTime,
|
"maxt", meta.MaxTime,
|
||||||
"ulid", meta.ULID,
|
"ulid", meta.ULID,
|
||||||
|
@ -65,7 +65,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
level.Info(logger).Log(
|
level.Info(logger).Log(
|
||||||
"msg", "fixing broken block",
|
"msg", "Fixing broken block",
|
||||||
"mint", meta.MinTime,
|
"mint", meta.MinTime,
|
||||||
"maxt", meta.MaxTime,
|
"maxt", meta.MaxTime,
|
||||||
"ulid", meta.ULID,
|
"ulid", meta.ULID,
|
||||||
|
|
|
@ -204,7 +204,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
|
||||||
w.files = append(w.files, newSegmentFile(f))
|
w.files = append(w.files, newSegmentFile(f))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn)
|
level.Warn(logger).Log("msg", "Invalid segment file detected, truncating WAL", "err", err, "file", fn)
|
||||||
|
|
||||||
for _, fn := range fns[i:] {
|
for _, fn := range fns[i:] {
|
||||||
if err := os.Remove(fn); err != nil {
|
if err := os.Remove(fn); err != nil {
|
||||||
|
@ -1233,7 +1233,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
|
||||||
if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists {
|
if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
level.Info(logger).Log("msg", "migrating WAL format")
|
level.Info(logger).Log("msg", "Migrating WAL format")
|
||||||
|
|
||||||
tmpdir := dir + ".tmp"
|
tmpdir := dir + ".tmp"
|
||||||
if err := os.RemoveAll(tmpdir); err != nil {
|
if err := os.RemoveAll(tmpdir); err != nil {
|
||||||
|
|
|
@ -297,7 +297,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
|
||||||
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
|
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
|
||||||
}
|
}
|
||||||
r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
|
r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
|
||||||
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
|
level.Warn(r.logger).Log("msg", "Record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
|
||||||
}
|
}
|
||||||
if recordHeaderSize+length > pageSize {
|
if recordHeaderSize+length > pageSize {
|
||||||
return nil, 0, fmt.Errorf("record length greater than a single page: %d > %d", recordHeaderSize+length, pageSize)
|
return nil, 0, fmt.Errorf("record length greater than a single page: %d > %d", recordHeaderSize+length, pageSize)
|
||||||
|
|
|
@ -123,7 +123,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
|
||||||
// If it was torn mid-record, a full read (which the caller should do anyway
|
// If it was torn mid-record, a full read (which the caller should do anyway
|
||||||
// to ensure integrity) will detect it as a corruption by the end.
|
// to ensure integrity) will detect it as a corruption by the end.
|
||||||
if d := stat.Size() % pageSize; d != 0 {
|
if d := stat.Size() % pageSize; d != 0 {
|
||||||
level.Warn(logger).Log("msg", "last page of the wal is torn, filling it with zeros", "segment", segName)
|
level.Warn(logger).Log("msg", "Last page of the wal is torn, filling it with zeros", "segment", segName)
|
||||||
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
|
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
return nil, errors.Wrap(err, "zero-pad torn page")
|
return nil, errors.Wrap(err, "zero-pad torn page")
|
||||||
|
@ -351,7 +351,7 @@ func (w *WAL) Repair(origErr error) error {
|
||||||
if cerr.Segment < 0 {
|
if cerr.Segment < 0 {
|
||||||
return errors.New("corruption error does not specify position")
|
return errors.New("corruption error does not specify position")
|
||||||
}
|
}
|
||||||
level.Warn(w.logger).Log("msg", "starting corruption repair",
|
level.Warn(w.logger).Log("msg", "Starting corruption repair",
|
||||||
"segment", cerr.Segment, "offset", cerr.Offset)
|
"segment", cerr.Segment, "offset", cerr.Offset)
|
||||||
|
|
||||||
// All segments behind the corruption can no longer be used.
|
// All segments behind the corruption can no longer be used.
|
||||||
|
@ -359,7 +359,7 @@ func (w *WAL) Repair(origErr error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "list segments")
|
return errors.Wrap(err, "list segments")
|
||||||
}
|
}
|
||||||
level.Warn(w.logger).Log("msg", "deleting all segments newer than corrupted segment", "segment", cerr.Segment)
|
level.Warn(w.logger).Log("msg", "Deleting all segments newer than corrupted segment", "segment", cerr.Segment)
|
||||||
|
|
||||||
for _, s := range segs {
|
for _, s := range segs {
|
||||||
if w.segment.i == s.index {
|
if w.segment.i == s.index {
|
||||||
|
@ -381,7 +381,7 @@ func (w *WAL) Repair(origErr error) error {
|
||||||
// Regardless of the corruption offset, no record reaches into the previous segment.
|
// Regardless of the corruption offset, no record reaches into the previous segment.
|
||||||
// So we can safely repair the WAL by removing the segment and re-inserting all
|
// So we can safely repair the WAL by removing the segment and re-inserting all
|
||||||
// its records up to the corruption.
|
// its records up to the corruption.
|
||||||
level.Warn(w.logger).Log("msg", "rewrite corrupted segment", "segment", cerr.Segment)
|
level.Warn(w.logger).Log("msg", "Rewrite corrupted segment", "segment", cerr.Segment)
|
||||||
|
|
||||||
fn := SegmentName(w.dir, cerr.Segment)
|
fn := SegmentName(w.dir, cerr.Segment)
|
||||||
tmpfn := fn + ".repair"
|
tmpfn := fn + ".repair"
|
||||||
|
|
|
@ -168,7 +168,7 @@ func (w *Watcher) setMetrics() {
|
||||||
// Start the Watcher.
|
// Start the Watcher.
|
||||||
func (w *Watcher) Start() {
|
func (w *Watcher) Start() {
|
||||||
w.setMetrics()
|
w.setMetrics()
|
||||||
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
|
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
|
||||||
|
|
||||||
go w.loop()
|
go w.loop()
|
||||||
}
|
}
|
||||||
|
@ -220,7 +220,7 @@ func (w *Watcher) Run() error {
|
||||||
// Run will be called again if there was a failure to read the WAL.
|
// Run will be called again if there was a failure to read the WAL.
|
||||||
w.sendSamples = false
|
w.sendSamples = false
|
||||||
|
|
||||||
level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name)
|
level.Info(w.logger).Log("msg", "Replaying WAL", "queue", w.name)
|
||||||
|
|
||||||
// Backfill from the checkpoint first if it exists.
|
// Backfill from the checkpoint first if it exists.
|
||||||
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
|
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
|
||||||
|
@ -240,10 +240,10 @@ func (w *Watcher) Run() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
|
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
|
||||||
for !isClosed(w.quit) {
|
for !isClosed(w.quit) {
|
||||||
w.currentSegmentMetric.Set(float64(currentSegment))
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
||||||
level.Debug(w.logger).Log("msg", "processing segment", "currentSegment", currentSegment)
|
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
|
||||||
|
|
||||||
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
||||||
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
||||||
|
@ -369,7 +369,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
||||||
<-gcSem
|
<-gcSem
|
||||||
}()
|
}()
|
||||||
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
||||||
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err)
|
level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
default:
|
default:
|
||||||
|
@ -392,9 +392,9 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
||||||
// Ignore errors reading to end of segment whilst replaying the WAL.
|
// Ignore errors reading to end of segment whilst replaying the WAL.
|
||||||
if !tail {
|
if !tail {
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "err", err)
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
|
||||||
} else if reader.Offset() != size {
|
} else if reader.Offset() != size {
|
||||||
level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -412,9 +412,9 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
||||||
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
||||||
if !tail {
|
if !tail {
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
||||||
} else if reader.Offset() != size {
|
} else if reader.Offset() != size {
|
||||||
level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -444,11 +444,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if index >= segmentNum {
|
if index >= segmentNum {
|
||||||
level.Debug(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
|
level.Debug(w.logger).Log("msg", "Current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
||||||
|
|
||||||
if err = w.readCheckpoint(dir); err != nil {
|
if err = w.readCheckpoint(dir); err != nil {
|
||||||
return errors.Wrap(err, "readCheckpoint")
|
return errors.Wrap(err, "readCheckpoint")
|
||||||
|
@ -495,7 +495,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||||
if !w.sendSamples {
|
if !w.sendSamples {
|
||||||
w.sendSamples = true
|
w.sendSamples = true
|
||||||
duration := time.Since(w.startTime)
|
duration := time.Since(w.startTime)
|
||||||
level.Info(w.logger).Log("msg", "done replaying WAL", "duration", duration)
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
||||||
}
|
}
|
||||||
send = append(send, s)
|
send = append(send, s)
|
||||||
}
|
}
|
||||||
|
@ -541,7 +541,7 @@ func recordType(rt record.Type) string {
|
||||||
|
|
||||||
// Read all the series records from a Checkpoint directory.
|
// Read all the series records from a Checkpoint directory.
|
||||||
func (w *Watcher) readCheckpoint(checkpointDir string) error {
|
func (w *Watcher) readCheckpoint(checkpointDir string) error {
|
||||||
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
|
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
|
||||||
index, err := checkpointNum(checkpointDir)
|
index, err := checkpointNum(checkpointDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "checkpointNum")
|
return errors.Wrap(err, "checkpointNum")
|
||||||
|
@ -574,7 +574,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1265,7 +1265,7 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := querier.Close(); err != nil {
|
if err := querier.Close(); err != nil {
|
||||||
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
|
level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error())
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
for _, mset := range matcherSets {
|
for _, mset := range matcherSets {
|
||||||
s, wrns, err := q.Select(false, hints, mset...)
|
s, wrns, err := q.Select(false, hints, mset...)
|
||||||
if wrns != nil {
|
if wrns != nil {
|
||||||
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
|
level.Debug(h.logger).Log("msg", "Federation select returned warnings", "warnings", wrns)
|
||||||
federationWarnings.Add(float64(len(wrns)))
|
federationWarnings.Add(float64(len(wrns)))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -556,7 +556,7 @@ func (h *Handler) Run(ctx context.Context) error {
|
||||||
apiPath := "/api"
|
apiPath := "/api"
|
||||||
if h.options.RoutePrefix != "/" {
|
if h.options.RoutePrefix != "/" {
|
||||||
apiPath = h.options.RoutePrefix + apiPath
|
apiPath = h.options.RoutePrefix + apiPath
|
||||||
level.Info(h.logger).Log("msg", "router prefix", "prefix", h.options.RoutePrefix)
|
level.Info(h.logger).Log("msg", "Router prefix", "prefix", h.options.RoutePrefix)
|
||||||
}
|
}
|
||||||
av1 := route.New().
|
av1 := route.New().
|
||||||
WithInstrumentation(h.metrics.instrumentHandlerWithPrefix("/api/v1")).
|
WithInstrumentation(h.metrics.instrumentHandlerWithPrefix("/api/v1")).
|
||||||
|
|
Loading…
Reference in a new issue