From ee592efb8c4a90449c9a73fa8f017f9c913af6aa Mon Sep 17 00:00:00 2001 From: machine424 Date: Sun, 27 Oct 2024 18:35:22 +0100 Subject: [PATCH] feat(tsdb): allow using Direct IO for chuncks segments writing Signed-off-by: machine424 --- .github/workflows/ci.yml | 1 + cmd/prometheus/main.go | 9 +- docs/command-line/prometheus.md | 2 +- docs/feature_flags.md | 6 + tsdb/chunks/chunks.go | 74 +++-- tsdb/compact.go | 6 +- tsdb/db.go | 4 + tsdb/db_test.go | 2 +- tsdb/fileutil/direct_io.go | 39 +++ tsdb/fileutil/direct_io_force.go | 28 ++ tsdb/fileutil/direct_io_linux.go | 29 ++ tsdb/fileutil/direct_io_unsupported.go | 29 ++ tsdb/fileutil/direct_io_writer.go | 405 +++++++++++++++++++++++++ tsdb/fileutil/direct_io_writer_test.go | 197 ++++++++++++ 14 files changed, 806 insertions(+), 25 deletions(-) create mode 100644 tsdb/fileutil/direct_io.go create mode 100644 tsdb/fileutil/direct_io_force.go create mode 100644 tsdb/fileutil/direct_io_linux.go create mode 100644 tsdb/fileutil/direct_io_unsupported.go create mode 100644 tsdb/fileutil/direct_io_writer.go create mode 100644 tsdb/fileutil/direct_io_writer_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 35a2651a0a..317e2d2f2a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,7 @@ jobs: - uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6 - uses: ./.github/promci/actions/setup_environment - run: go test --tags=dedupelabels ./... + - run: go test --tags=forcedirectio -race ./tsdb/ - run: GOARCH=386 go test ./... - uses: ./.github/promci/actions/check_proto with: diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d69648d88b..7d69e5c581 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -278,6 +278,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "otlp-deltatocumulative": c.web.ConvertOTLPDelta = true logger.Info("Converting delta OTLP metrics to cumulative") + case "use-direct-io": + c.tsdb.UseDirectIO = true + logger.Info("Experimental Direct IO is enabled.") default: logger.Warn("Unknown option for --enable-feature", "option", o) } @@ -519,7 +522,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, use-direct-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode) @@ -1222,6 +1225,8 @@ func main() { if !agentMode { // TSDB. opts := cfg.tsdb.ToTSDBOptions() + // TODO: temp for benchmarking only + opts.UseDirectIO = true cancel := make(chan struct{}) g.Add( func() error { @@ -1797,6 +1802,7 @@ type tsdbOptions struct { CompactionDelayMaxPercent int EnableOverlappingCompaction bool EnableOOONativeHistograms bool + UseDirectIO bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1821,6 +1827,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableDelayedCompaction: opts.EnableDelayedCompaction, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + UseDirectIO: opts.UseDirectIO, } } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 9b4ec8b736..b1c079770f 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -60,7 +60,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, use-direct-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --agent | Run Prometheus in 'Agent mode'. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 6973d6d73b..54c163829a 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -184,3 +184,9 @@ Enabling this _can_ have negative impact on performance, because the in-memory state is mutex guarded. Cumulative-only OTLP requests are not affected. [d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor + +## Use Direct IO + +`--enable-feature=use-direct-io` + +TODO diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index f505d762bb..224c481cba 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -14,7 +14,6 @@ package chunks import ( - "bufio" "encoding/binary" "errors" "fmt" @@ -281,12 +280,13 @@ func checkCRC32(data, sum []byte) error { type Writer struct { dirFile *os.File files []*os.File - wbuf *bufio.Writer + wbuf fileutil.BufWriter n int64 crc32 hash.Hash buf [binary.MaxVarintLen32]byte segmentSize int64 + useDirectIO bool } const ( @@ -294,21 +294,34 @@ const ( DefaultChunkSegmentSize = 512 * 1024 * 1024 ) -// NewWriterWithSegSize returns a new writer against the given directory -// and allows setting a custom size for the segments. -func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) { - return newWriter(dir, segmentSize) +type writerOptions struct { + segmentSize int64 + useDirectIO bool } -// NewWriter returns a new writer against the given directory -// using the default segment size. -func NewWriter(dir string) (*Writer, error) { - return newWriter(dir, DefaultChunkSegmentSize) +type WriterOption func(*writerOptions) + +func WithDirectIO(enabled bool) WriterOption { + return func(o *writerOptions) { + o.useDirectIO = enabled + } } -func newWriter(dir string, segmentSize int64) (*Writer, error) { - if segmentSize <= 0 { - segmentSize = DefaultChunkSegmentSize +func WithSegmentSize(segmentSize int64) WriterOption { + return func(o *writerOptions) { + if segmentSize <= 0 { + segmentSize = DefaultChunkSegmentSize + } + o.segmentSize = segmentSize + } +} + +// NewWriter returns a new writer against the given directory. +func NewWriter(dir string, opts ...WriterOption) (*Writer, error) { + options := &writerOptions{} + + for _, opt := range opts { + opt(options) } if err := os.MkdirAll(dir, 0o777); err != nil { @@ -322,7 +335,8 @@ func newWriter(dir string, segmentSize int64) (*Writer, error) { dirFile: dirFile, n: 0, crc32: newCRC32(), - segmentSize: segmentSize, + segmentSize: options.segmentSize, + useDirectIO: options.useDirectIO, }, nil } @@ -333,7 +347,7 @@ func (w *Writer) tail() *os.File { return w.files[len(w.files)-1] } -// finalizeTail writes all pending data to the current tail file, +// finalizeTail writes all pending data to the current tail file if any, // truncates its size, and closes it. func (w *Writer) finalizeTail() error { tf := w.tail() @@ -341,8 +355,10 @@ func (w *Writer) finalizeTail() error { return nil } - if err := w.wbuf.Flush(); err != nil { - return err + if w.wbuf != nil { + if err := w.wbuf.Flush(); err != nil { + return err + } } if err := tf.Sync(); err != nil { return err @@ -373,9 +389,24 @@ func (w *Writer) cut() error { w.files = append(w.files, f) if w.wbuf != nil { - w.wbuf.Reset(f) + if err := w.wbuf.Reset(f); err != nil { + return err + } } else { - w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) + var ( + wbuf fileutil.BufWriter + err error + ) + size := 8 * 1024 * 1024 + if w.useDirectIO { + wbuf, err = fileutil.NewDirectIOWriter(f, size) + } else { + wbuf, err = fileutil.NewBufioWriterWithSeek(f, size) + } + if err != nil { + return err + } + w.wbuf = wbuf } return nil @@ -434,8 +465,9 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all return 0, nil, 0, fmt.Errorf("open final file: %w", err) } // Skip header for further writes. - if _, err := f.Seek(int64(n), 0); err != nil { - return 0, nil, 0, fmt.Errorf("seek in final file: %w", err) + offset := int64(n) + if _, err := f.Seek(offset, 0); err != nil { + return 0, nil, 0, fmt.Errorf("seek to %d in final file: %w", offset, err) } return n, f, seq, nil } diff --git a/tsdb/compact.go b/tsdb/compact.go index 31b445f227..aa46212765 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -85,6 +85,7 @@ type LeveledCompactor struct { chunkPool chunkenc.Pool ctx context.Context maxBlockChunkSegmentSize int64 + useDirectIO bool mergeFunc storage.VerticalChunkSeriesMergeFunc postingsEncoder index.PostingsEncoder postingsDecoderFactory PostingsDecoderFactory @@ -169,6 +170,8 @@ type LeveledCompactorOptions struct { // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. EnableOverlappingCompaction bool + // UseDirectIO XXX + UseDirectIO bool } type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder @@ -221,6 +224,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer metrics: NewCompactorMetrics(r), ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + useDirectIO: opts.UseDirectIO, mergeFunc: mergeFunc, postingsEncoder: pe, postingsDecoderFactory: opts.PD, @@ -646,7 +650,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl // data of all blocks. var chunkw ChunkWriter - chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) + chunkw, err = chunks.NewWriter(chunkDir(tmp), chunks.WithSegmentSize(c.maxBlockChunkSegmentSize), chunks.WithDirectIO(c.useDirectIO)) if err != nil { return fmt.Errorf("open chunk writer: %w", err) } diff --git a/tsdb/db.go b/tsdb/db.go index 9ab150c5b4..13b16e5354 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -224,6 +224,9 @@ type Options struct { // PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta. // By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder. PostingsDecoderFactory PostingsDecoderFactory + + // UseDirectIO TODO + UseDirectIO bool } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -908,6 +911,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, PD: opts.PostingsDecoderFactory, + UseDirectIO: opts.UseDirectIO, }) } if err != nil { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 826be61a42..55914b7e20 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2924,7 +2924,7 @@ func TestChunkWriter_ReadAfterWrite(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { tempDir := t.TempDir() - chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize)) + chunkw, err := chunks.NewWriter(tempDir, chunks.WithSegmentSize(chunks.SegmentHeaderSize+int64(test.segmentSize))) require.NoError(t, err) for _, chks := range test.chks { diff --git a/tsdb/fileutil/direct_io.go b/tsdb/fileutil/direct_io.go new file mode 100644 index 0000000000..ad306776ca --- /dev/null +++ b/tsdb/fileutil/direct_io.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "bufio" + "errors" + "os" +) + +var errDirectIOUnsupported = errors.New("direct IO is unsupported") + +type BufWriter interface { + Write([]byte) (int, error) + Flush() error + Reset(f *os.File) error +} + +// writer is a specialized wrapper around bufio.Writer. +// It is used when Direct IO isn't enabled, as using directIOWriter in such cases is impractical. +type writer struct { + *bufio.Writer +} + +func (b *writer) Reset(f *os.File) error { + b.Writer.Reset(f) + return nil +} diff --git a/tsdb/fileutil/direct_io_force.go b/tsdb/fileutil/direct_io_force.go new file mode 100644 index 0000000000..e2f811b9f2 --- /dev/null +++ b/tsdb/fileutil/direct_io_force.go @@ -0,0 +1,28 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This allows seamless testing of the Direct I/O writer across all tsdb tests. + +//go:build linux && forcedirectio + +package fileutil + +import "os" + +func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) { + return newDirectIOWriter(f, size) +} + +func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) { + return NewDirectIOWriter(f, size) +} diff --git a/tsdb/fileutil/direct_io_linux.go b/tsdb/fileutil/direct_io_linux.go new file mode 100644 index 0000000000..7406cc1594 --- /dev/null +++ b/tsdb/fileutil/direct_io_linux.go @@ -0,0 +1,29 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && !forcedirectio + +package fileutil + +import ( + "bufio" + "os" +) + +func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) { + return &writer{bufio.NewWriterSize(f, size)}, nil +} + +func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) { + return newDirectIOWriter(f, size) +} diff --git a/tsdb/fileutil/direct_io_unsupported.go b/tsdb/fileutil/direct_io_unsupported.go new file mode 100644 index 0000000000..ff64706399 --- /dev/null +++ b/tsdb/fileutil/direct_io_unsupported.go @@ -0,0 +1,29 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !linux + +package fileutil + +import ( + "bufio" + "os" +) + +func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) { + return &writer{bufio.NewWriterSize(f, size)}, nil +} + +func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) { + return nil, errDirectIOUnsupported +} diff --git a/tsdb/fileutil/direct_io_writer.go b/tsdb/fileutil/direct_io_writer.go new file mode 100644 index 0000000000..a9f00eb00c --- /dev/null +++ b/tsdb/fileutil/direct_io_writer.go @@ -0,0 +1,405 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package fileutil + +import ( + "errors" + "fmt" + "io" + "os" + "unsafe" + + "golang.org/x/sys/unix" +) + +const ( + // defaultAlignment is deliberately set higher to cover most setups. We rely on statx for more precise alignment. + defaultAlignment = 4096 + defaultBufSize = 4096 +) + +var ( + errWriterInvalid = errors.New("the last flush resulted in an unaligned offset, the writer can no longer ensure contiguous writes") + errStatxNotSupported = errors.New("the statx syscall with STATX_DIOALIGN is not supported. At least Linux kernel 6.1 is needed") +) + +// directIOWriter is a specialized bufio.Writer that supports Direct IO to a file +// by ensuring all alignment restrictions are satisfied. +// The writer can handle files whose initial offsets are not aligned. +// Once Direct IO is in use, if an explicit call to Flush() results in an unaligned offset, the writer +// should no longer be used, as it can no longer support continuous writes. +type directIOWriter struct { + buf []byte + n int + + f *os.File + // offsetAlignmentGap represents the number of bytes needed to reach the nearest + // offset alignment on the file, making Direct IO possible. + offsetAlignmentGap int + alignmentRqmts *directIORqmts + + err error + invalid bool +} + +func newDirectIOWriter(f *os.File, size int) (*directIOWriter, error) { + alignmentRqmts, err := fileDirectIORqmts(f) + if err != nil { + return nil, err + } + + if size <= 0 { + size = defaultBufSize + } + if size%alignmentRqmts.offsetAlign != 0 { + return nil, fmt.Errorf("size %d should be a multiple of %d", size, alignmentRqmts.offsetAlign) + } + gap, err := checkInitialUnalignedOffset(f, alignmentRqmts) + if err != nil { + return nil, err + } + + return &directIOWriter{ + buf: alignedBlock(size, alignmentRqmts), + f: f, + offsetAlignmentGap: gap, + alignmentRqmts: alignmentRqmts, + }, nil +} + +func (b *directIOWriter) Available() int { return len(b.buf) - b.n } + +func (b *directIOWriter) Buffered() int { return b.n } + +// fillInitialOffsetGap writes the necessary bytes from the buffer without Direct IO +// to fill offsetAlignmentGap and align the file offset, enabling Direct IO usage. +// Once alignment is achieved, Direct IO is enabled. +func (b *directIOWriter) fillInitialOffsetGap() { + if b.n == 0 || b.offsetAlignmentGap == 0 { + return + } + + bytesToAlign := min(b.n, b.offsetAlignmentGap) + n, err := b.f.Write(b.buf[:bytesToAlign]) + if n < bytesToAlign && err == nil { + err = io.ErrShortWrite + } + if n > 0 { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + b.n -= n + } + // If the file offset was aligned, enable Direct IO. + b.offsetAlignmentGap -= n + if b.offsetAlignmentGap == 0 { + err = errors.Join(err, enableDirectIO(b.f.Fd())) + } + b.err = err +} + +func (b *directIOWriter) directIOWrite(p []byte, padding int) (int, error) { + relevant := len(p) - padding + + n, err := b.f.Write(p) + switch { + case n < relevant: + relevant = n + if err == nil { + err = io.ErrShortWrite + } + case n > relevant: + // Adjust the offset to discard the padding that was written. + writtenPadding := int64(n - relevant) + _, err := b.f.Seek(-writtenPadding, io.SeekCurrent) + if err != nil { + b.err = errors.Join(b.err, fmt.Errorf("seek to discard written padding %d: %w", writtenPadding, err)) + } + } + + if relevant%b.alignmentRqmts.offsetAlign != 0 { + b.invalid = true + } + return relevant, err +} + +// canDirectIOWrite returns true when all Direct IO alignment restrictions +// are met for the p block to be written into the file. +func (b *directIOWriter) canDirectIOWrite(p []byte) bool { + return isAligned(p, b.alignmentRqmts) && b.offsetAlignmentGap == 0 +} + +func (b *directIOWriter) Write(p []byte) (nn int, err error) { + if b.invalid { + return 0, errWriterInvalid + } + + for len(p) > b.Available() && b.err == nil { + var n1, n2 int + if b.Buffered() == 0 && b.canDirectIOWrite(p) { + // Large write, empty buffer. + // Write from p via Direct IO as the block and the file offset are aligned, + // to avoid copy. + n1, b.err = b.directIOWrite(p, 0) + } else { + n1 = copy(b.buf[b.n:], p) + b.n += n1 + if b.offsetAlignmentGap != 0 { + b.fillInitialOffsetGap() + // Refill the buffer. + n2 = copy(b.buf[b.n:], p[n1:]) + b.n += n2 + } + if b.Available() == 0 { + // Avoid flushing in case the second refill wasn't complete. + b.flush() + } + } + nn += n1 + n2 + p = p[n1+n2:] + } + + if b.err != nil { + return nn, b.err + } + + n := copy(b.buf[b.n:], p) + b.n += n + nn += n + return nn, nil +} + +func (b *directIOWriter) flush() error { + if b.invalid { + return errWriterInvalid + } + if b.err != nil { + return b.err + } + if b.n == 0 { + return nil + } + + // Ensure the segment length alignment restriction is met. + // If the buffer length isn't a multiple of offsetAlign, round + // it to the nearest upper multiple and add zero padding. + uOffset := b.n + if uOffset%b.alignmentRqmts.offsetAlign != 0 { + uOffset = ((uOffset / b.alignmentRqmts.offsetAlign) + 1) * b.alignmentRqmts.offsetAlign + for i := b.n; i < uOffset; i++ { + b.buf[i] = 0 + } + } + n, err := b.directIOWrite(b.buf[:uOffset], uOffset-b.n) + if err != nil { + if n > 0 && n < b.n { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + } + b.n -= n + b.err = errors.Join(b.err, err) + return err + } + + b.n = 0 + return nil +} + +func (b *directIOWriter) Flush() error { + if b.offsetAlignmentGap != 0 { + b.fillInitialOffsetGap() + if b.err != nil { + return b.err + } + } + return b.flush() +} + +func (b *directIOWriter) Reset(f *os.File) error { + alignmentRqmts, err := fileDirectIORqmts(f) + if err != nil { + return err + } + b.alignmentRqmts = alignmentRqmts + + if b.buf == nil { + b.buf = alignedBlock(defaultBufSize, b.alignmentRqmts) + } + gap, err := checkInitialUnalignedOffset(f, b.alignmentRqmts) + if err != nil { + return err + } + b.offsetAlignmentGap = gap + b.err = nil + b.invalid = false + b.n = 0 + b.f = f + return nil +} + +func fileDirectIORqmts(f *os.File) (*directIORqmts, error) { + alignmentRqmts, err := fetchDirectIORqmts(f.Fd()) + switch { + case errors.Is(err, errStatxNotSupported): + alignmentRqmts = defaultDirectIORqmts() + case err != nil: + return nil, err + } + + if alignmentRqmts.memoryAlign == 0 || alignmentRqmts.offsetAlign == 0 { + // This may require some extra testing. + return nil, fmt.Errorf("zero alignment requirement is not supported %+v", alignmentRqmts) + } + return alignmentRqmts, nil +} + +func alignmentOffset(block []byte, requiredAlignment int) int { + return computeAlignmentOffset(block, requiredAlignment) +} + +func computeAlignmentOffset(block []byte, alignment int) int { + if alignment == 0 { + return 0 + } + if len(block) == 0 { + panic("empty block not supported") + } + return int(uintptr(unsafe.Pointer(&block[0])) & uintptr(alignment-1)) +} + +// isAligned checks if the length of the block is a multiple of offsetAlign +// and if its address is aligned with memoryAlign. +func isAligned(block []byte, alignmentRqmts *directIORqmts) bool { + return alignmentOffset(block, alignmentRqmts.memoryAlign) == 0 && len(block)%alignmentRqmts.offsetAlign == 0 +} + +// alignedBlock returns a block whose address is alignment aligned. +// The size should be a multiple of offsetAlign. +func alignedBlock(size int, alignmentRqmts *directIORqmts) []byte { + if size == 0 || size%alignmentRqmts.offsetAlign != 0 { + panic(fmt.Errorf("size %d should be > 0 and a multiple of offsetAlign=%d", size, alignmentRqmts.offsetAlign)) + } + if alignmentRqmts.memoryAlign == 0 { + return make([]byte, size) + } + + block := make([]byte, size+alignmentRqmts.memoryAlign) + a := alignmentOffset(block, alignmentRqmts.memoryAlign) + if a == 0 { + return block[:size] + } + + offset := alignmentRqmts.memoryAlign - a + block = block[offset : offset+size] + if !isAligned(block, alignmentRqmts) { + // Assuming this to be rare, if not impossible. + panic("cannot create an aligned block") + } + return block +} + +func currentFileOffset(f *os.File) (int, error) { + curOff, err := f.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("cannot get the current offset: %w", err) + } + return int(curOff), nil +} + +func fileStatusFlags(fd uintptr) (int, error) { + flag, err := unix.FcntlInt(fd, unix.F_GETFL, 0) + if err != nil { + return 0, fmt.Errorf("cannot get file status flags: %w", err) + } + return flag, err +} + +// enableDirectIO enables Direct IO on the file if needed. +func enableDirectIO(fd uintptr) error { + flag, err := fileStatusFlags(fd) + if err != nil { + return err + } + + if (flag & unix.O_DIRECT) == unix.O_DIRECT { + return nil + } + + _, err = unix.FcntlInt(fd, unix.F_SETFL, flag|unix.O_DIRECT) + if err != nil { + return fmt.Errorf("cannot enable Direct IO: %w", err) + } + return nil +} + +// checkInitialUnalignedOffset returns the gap between the current offset of the file +// and the nearest aligned offset. +// If the current offset is aligned, Direct IO is enabled on the file. +func checkInitialUnalignedOffset(f *os.File, alignmentRqmts *directIORqmts) (int, error) { + offset, err := currentFileOffset(f) + if err != nil { + return 0, err + } + alignment := alignmentRqmts.offsetAlign + gap := (alignment - offset%alignment) % alignment + if gap == 0 { + if err := enableDirectIO(f.Fd()); err != nil { + return 0, err + } + } + return gap, nil +} + +// directIORqmts holds the alignment requirements for direct I/O. +// All fields are in bytes. +type directIORqmts struct { + // The required alignment for memory buffers addresses. + memoryAlign int + // The required alignment for I/O segment lengths and file offsets. + offsetAlign int +} + +func defaultDirectIORqmts() *directIORqmts { + return &directIORqmts{ + memoryAlign: defaultAlignment, + offsetAlign: defaultAlignment, + } +} + +// fetchDirectIORqmts retrieves direct I/O alignment requirements for a file descriptor using statx +// when possible. +func fetchDirectIORqmts(fd uintptr) (*directIORqmts, error) { + var stat unix.Statx_t + flags := unix.AT_SYMLINK_NOFOLLOW | unix.AT_EMPTY_PATH | unix.AT_STATX_DONT_SYNC + mask := unix.STATX_DIOALIGN + + if err := unix.Statx(int(fd), "", flags, unix.STATX_DIOALIGN, &stat); err != nil { + if err == unix.ENOSYS { + return nil, errStatxNotSupported + } + return nil, fmt.Errorf("statx failed on fd %d: %w", fd, err) + } + + if stat.Mask&uint32(mask) == 0 { + return nil, errStatxNotSupported + } + + if stat.Dio_mem_align == 0 || stat.Dio_offset_align == 0 { + return nil, fmt.Errorf("%w: kernel may be old or the file may be on an unsupported FS", errDirectIOUnsupported) + } + + return &directIORqmts{ + memoryAlign: int(stat.Dio_mem_align), + offsetAlign: int(stat.Dio_offset_align), + }, nil +} diff --git a/tsdb/fileutil/direct_io_writer_test.go b/tsdb/fileutil/direct_io_writer_test.go new file mode 100644 index 0000000000..31ea6fda6e --- /dev/null +++ b/tsdb/fileutil/direct_io_writer_test.go @@ -0,0 +1,197 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package fileutil + +import ( + "io" + "os" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +func directIORqmtsForTest(tb testing.TB) *directIORqmts { + f, err := os.OpenFile(path.Join(tb.TempDir(), "foo"), os.O_CREATE|os.O_WRONLY, 0o666) + require.NoError(tb, err) + alignmentRqmts, err := fetchDirectIORqmts(f.Fd()) + require.NoError(tb, err) + return alignmentRqmts +} + +func TestDirectIOFile(t *testing.T) { + tmpDir := t.TempDir() + + f, err := os.OpenFile(path.Join(tmpDir, "test"), os.O_CREATE|os.O_WRONLY, 0o666) + require.NoError(t, err) + + require.NoError(t, enableDirectIO(f.Fd())) +} + +func TestAlignedBlockEarlyPanic(t *testing.T) { + alignRqmts := directIORqmtsForTest(t) + cases := []struct { + desc string + size int + }{ + {"Zero size", 0}, + {"Size not multiple of offset alignment", 9973}, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require.Panics(t, func() { + alignedBlock(tc.size, alignRqmts) + }) + }) + } +} + +func TestAlignedBloc(t *testing.T) { + alignRqmts := directIORqmtsForTest(t) + block := alignedBlock(5*alignRqmts.offsetAlign, alignRqmts) + require.True(t, isAligned(block, alignRqmts)) + require.Len(t, block, 5*alignRqmts.offsetAlign) + require.False(t, isAligned(block[1:], alignRqmts)) +} + +func TestDirectIOWriter(t *testing.T) { + alignRqmts := directIORqmtsForTest(t) + cases := []struct { + name string + initialOffset int + bufferSize int + dataSize int + // writtenBytes should also consider needed zero padding. + writtenBytes int + shouldInvalidate bool + }{ + { + name: "data equal to buffer", + bufferSize: 8 * alignRqmts.offsetAlign, + dataSize: 8 * alignRqmts.offsetAlign, + writtenBytes: 8 * alignRqmts.offsetAlign, + }, + { + name: "data exceeds buffer", + bufferSize: 4 * alignRqmts.offsetAlign, + dataSize: 64 * alignRqmts.offsetAlign, + writtenBytes: 64 * alignRqmts.offsetAlign, + }, + { + name: "data exceeds buffer + final offset unaligned", + bufferSize: 2 * alignRqmts.offsetAlign, + dataSize: 4*alignRqmts.offsetAlign + 33, + writtenBytes: 4*alignRqmts.offsetAlign + alignRqmts.offsetAlign, + shouldInvalidate: true, + }, + { + name: "data smaller than buffer", + bufferSize: 8 * alignRqmts.offsetAlign, + dataSize: 3 * alignRqmts.offsetAlign, + writtenBytes: 3 * alignRqmts.offsetAlign, + }, + { + name: "data smaller than buffer + final offset unaligned", + bufferSize: 4 * alignRqmts.offsetAlign, + dataSize: alignRqmts.offsetAlign + 70, + writtenBytes: alignRqmts.offsetAlign + alignRqmts.offsetAlign, + shouldInvalidate: true, + }, + { + name: "offset aligned", + initialOffset: alignRqmts.offsetAlign, + bufferSize: 8 * alignRqmts.offsetAlign, + dataSize: alignRqmts.offsetAlign, + writtenBytes: alignRqmts.offsetAlign, + }, + { + name: "initial offset unaligned + final offset unaligned", + initialOffset: 8, + bufferSize: 8 * alignRqmts.offsetAlign, + dataSize: 64 * alignRqmts.offsetAlign, + writtenBytes: 64*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8), + shouldInvalidate: true, + }, + { + name: "offset unaligned + final offset aligned", + initialOffset: 8, + bufferSize: 4 * alignRqmts.offsetAlign, + dataSize: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8), + writtenBytes: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8), + }, + { + name: "empty data", + bufferSize: 4 * alignRqmts.offsetAlign, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fileName := path.Join(t.TempDir(), "test") + + data := make([]byte, tc.dataSize) + for i := 0; i < len(data); i++ { + // Do not use 256 as it may be a divider of requiredAlignment. To avoid patterns. + data[i] = byte(i % 251) + } + + f, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0o666) + require.NoError(t, err) + + if tc.initialOffset != 0 { + _, err = f.Seek(int64(tc.initialOffset), io.SeekStart) + require.NoError(t, err) + } + + w, err := newDirectIOWriter(f, tc.bufferSize) + require.NoError(t, err) + + n, err := w.Write(data) + require.NoError(t, err) + require.Equal(t, tc.dataSize, n) + require.NoError(t, w.Flush()) + + // Check the file's final offset. + currOffset, err := currentFileOffset(f) + require.NoError(t, err) + require.Equal(t, tc.dataSize+tc.initialOffset, currOffset) + + // Check the written data. + fileBytes, err := os.ReadFile(fileName) + require.NoError(t, err) + if tc.dataSize > 0 { + require.Len(t, fileBytes, tc.writtenBytes+tc.initialOffset) + require.Equal(t, data, fileBytes[tc.initialOffset:tc.dataSize+tc.initialOffset]) + } else { + require.Empty(t, fileBytes) + } + + // Check the writer state. + if tc.shouldInvalidate { + require.True(t, w.invalid) + require.Error(t, w.Flush()) + _, err = w.Write([]byte{}) + require.Error(t, err) + } else { + require.False(t, w.invalid) + require.NoError(t, w.Flush()) + _, err = w.Write([]byte{}) + require.NoError(t, err) + } + }) + } +}