mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Improving Performance on the API Gzip Handler (#12363)
Using github.com/klauspost/compress package to replace the current Gzip Handler on the API. We see significant improvements using this handler over the current one as shown in the benchmark added. Also: * move selection of compression from `newCompressedResponseWriter` to `*CompressionHandler.ServeHTTP`. * renaming `compressedResponseWriter` since it now only does one kind of compression. Signed-off-by: Alan Protasio <alanprot@gmail.com>
This commit is contained in:
parent
1fd48bcfa7
commit
dfae954dc1
1
go.mod
1
go.mod
|
@ -32,6 +32,7 @@ require (
|
||||||
github.com/hetznercloud/hcloud-go v1.45.1
|
github.com/hetznercloud/hcloud-go v1.45.1
|
||||||
github.com/ionos-cloud/sdk-go/v6 v6.1.6
|
github.com/ionos-cloud/sdk-go/v6 v6.1.6
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
|
github.com/klauspost/compress v1.13.6
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
|
||||||
github.com/linode/linodego v1.16.1
|
github.com/linode/linodego v1.16.1
|
||||||
github.com/miekg/dns v1.1.53
|
github.com/miekg/dns v1.1.53
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -499,6 +499,7 @@ github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0Lh
|
||||||
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
|
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
|
||||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
||||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
||||||
|
|
|
@ -14,11 +14,11 @@
|
||||||
package httputil
|
package httputil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"compress/gzip"
|
|
||||||
"compress/zlib"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/gzhttp"
|
||||||
|
"github.com/klauspost/compress/zlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -28,53 +28,27 @@ const (
|
||||||
deflateEncoding = "deflate"
|
deflateEncoding = "deflate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Wrapper around http.Handler which adds suitable response compression based
|
// Wrapper around http.ResponseWriter which adds deflate compression
|
||||||
// on the client's Accept-Encoding headers.
|
type deflatedResponseWriter struct {
|
||||||
type compressedResponseWriter struct {
|
|
||||||
http.ResponseWriter
|
http.ResponseWriter
|
||||||
writer io.Writer
|
writer *zlib.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writes HTTP response content data.
|
// Writes HTTP response content data.
|
||||||
func (c *compressedResponseWriter) Write(p []byte) (int, error) {
|
func (c *deflatedResponseWriter) Write(p []byte) (int, error) {
|
||||||
return c.writer.Write(p)
|
return c.writer.Write(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Closes the compressedResponseWriter and ensures to flush all data before.
|
// Close Closes the deflatedResponseWriter and ensures to flush all data before.
|
||||||
func (c *compressedResponseWriter) Close() {
|
func (c *deflatedResponseWriter) Close() {
|
||||||
if zlibWriter, ok := c.writer.(*zlib.Writer); ok {
|
c.writer.Close()
|
||||||
zlibWriter.Flush()
|
|
||||||
}
|
|
||||||
if gzipWriter, ok := c.writer.(*gzip.Writer); ok {
|
|
||||||
gzipWriter.Flush()
|
|
||||||
}
|
|
||||||
if closer, ok := c.writer.(io.Closer); ok {
|
|
||||||
defer closer.Close()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Constructs a new compressedResponseWriter based on client request headers.
|
// Constructs a new deflatedResponseWriter to compress the original writer using 'deflate' compression.
|
||||||
func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request) *compressedResponseWriter {
|
func newDeflateResponseWriter(writer http.ResponseWriter) *deflatedResponseWriter {
|
||||||
encodings := strings.Split(req.Header.Get(acceptEncodingHeader), ",")
|
return &deflatedResponseWriter{
|
||||||
for _, encoding := range encodings {
|
|
||||||
switch strings.TrimSpace(encoding) {
|
|
||||||
case gzipEncoding:
|
|
||||||
writer.Header().Set(contentEncodingHeader, gzipEncoding)
|
|
||||||
return &compressedResponseWriter{
|
|
||||||
ResponseWriter: writer,
|
|
||||||
writer: gzip.NewWriter(writer),
|
|
||||||
}
|
|
||||||
case deflateEncoding:
|
|
||||||
writer.Header().Set(contentEncodingHeader, deflateEncoding)
|
|
||||||
return &compressedResponseWriter{
|
|
||||||
ResponseWriter: writer,
|
|
||||||
writer: zlib.NewWriter(writer),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &compressedResponseWriter{
|
|
||||||
ResponseWriter: writer,
|
ResponseWriter: writer,
|
||||||
writer: writer,
|
writer: zlib.NewWriter(writer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +60,21 @@ type CompressionHandler struct {
|
||||||
|
|
||||||
// ServeHTTP adds compression to the original http.Handler's ServeHTTP() method.
|
// ServeHTTP adds compression to the original http.Handler's ServeHTTP() method.
|
||||||
func (c CompressionHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
|
func (c CompressionHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
|
||||||
compWriter := newCompressedResponseWriter(writer, req)
|
encodings := strings.Split(req.Header.Get(acceptEncodingHeader), ",")
|
||||||
c.Handler.ServeHTTP(compWriter, req)
|
for _, encoding := range encodings {
|
||||||
compWriter.Close()
|
switch strings.TrimSpace(encoding) {
|
||||||
|
case gzipEncoding:
|
||||||
|
gzhttp.GzipHandler(c.Handler).ServeHTTP(writer, req)
|
||||||
|
return
|
||||||
|
case deflateEncoding:
|
||||||
|
compWriter := newDeflateResponseWriter(writer)
|
||||||
|
writer.Header().Set(contentEncodingHeader, deflateEncoding)
|
||||||
|
c.Handler.ServeHTTP(compWriter, req)
|
||||||
|
compWriter.Close()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
c.Handler.ServeHTTP(writer, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,23 +17,30 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"compress/zlib"
|
"compress/zlib"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
server *httptest.Server
|
server *httptest.Server
|
||||||
|
respBody = strings.Repeat("Hello World!", 500)
|
||||||
)
|
)
|
||||||
|
|
||||||
func setup() func() {
|
func setup() func() {
|
||||||
mux = http.NewServeMux()
|
mux = http.NewServeMux()
|
||||||
server = httptest.NewServer(mux)
|
server = httptest.NewServer(mux)
|
||||||
return func() {
|
return func() {
|
||||||
|
server.CloseClientConnections()
|
||||||
server.Close()
|
server.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,7 +48,7 @@ func setup() func() {
|
||||||
func getCompressionHandlerFunc() CompressionHandler {
|
func getCompressionHandlerFunc() CompressionHandler {
|
||||||
hf := func(w http.ResponseWriter, r *http.Request) {
|
hf := func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
w.Write([]byte("Hello World!"))
|
w.Write([]byte(respBody))
|
||||||
}
|
}
|
||||||
return CompressionHandler{
|
return CompressionHandler{
|
||||||
Handler: http.HandlerFunc(hf),
|
Handler: http.HandlerFunc(hf),
|
||||||
|
@ -67,9 +74,8 @@ func TestCompressionHandler_PlainText(t *testing.T) {
|
||||||
contents, err := io.ReadAll(resp.Body)
|
contents, err := io.ReadAll(resp.Body)
|
||||||
require.NoError(t, err, "unexpected error while creating the response body reader")
|
require.NoError(t, err, "unexpected error while creating the response body reader")
|
||||||
|
|
||||||
expected := "Hello World!"
|
|
||||||
actual := string(contents)
|
actual := string(contents)
|
||||||
require.Equal(t, expected, actual, "expected response with content")
|
require.Equal(t, respBody, actual, "expected response with content")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompressionHandler_Gzip(t *testing.T) {
|
func TestCompressionHandler_Gzip(t *testing.T) {
|
||||||
|
@ -103,8 +109,7 @@ func TestCompressionHandler_Gzip(t *testing.T) {
|
||||||
require.NoError(t, err, "unexpected error while reading the response body")
|
require.NoError(t, err, "unexpected error while reading the response body")
|
||||||
|
|
||||||
actual := buf.String()
|
actual := buf.String()
|
||||||
expected := "Hello World!"
|
require.Equal(t, respBody, actual, "unexpected response content")
|
||||||
require.Equal(t, expected, actual, "unexpected response content")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompressionHandler_Deflate(t *testing.T) {
|
func TestCompressionHandler_Deflate(t *testing.T) {
|
||||||
|
@ -138,6 +143,98 @@ func TestCompressionHandler_Deflate(t *testing.T) {
|
||||||
require.NoError(t, err, "unexpected error while reading the response body")
|
require.NoError(t, err, "unexpected error while reading the response body")
|
||||||
|
|
||||||
actual := buf.String()
|
actual := buf.String()
|
||||||
expected := "Hello World!"
|
require.Equal(t, respBody, actual, "expected response with content")
|
||||||
require.Equal(t, expected, actual, "expected response with content")
|
}
|
||||||
|
|
||||||
|
func Benchmark_compression(b *testing.B) {
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DisableCompression: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := map[string]struct {
|
||||||
|
enc string
|
||||||
|
numberOfLabels int
|
||||||
|
}{
|
||||||
|
"gzip-10-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 10,
|
||||||
|
},
|
||||||
|
"gzip-100-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 100,
|
||||||
|
},
|
||||||
|
"gzip-1K-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 1000,
|
||||||
|
},
|
||||||
|
"gzip-10K-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 10000,
|
||||||
|
},
|
||||||
|
"gzip-100K-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 100000,
|
||||||
|
},
|
||||||
|
"gzip-1M-labels": {
|
||||||
|
enc: gzipEncoding,
|
||||||
|
numberOfLabels: 1000000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range cases {
|
||||||
|
b.Run(name, func(b *testing.B) {
|
||||||
|
tearDown := setup()
|
||||||
|
defer tearDown()
|
||||||
|
labels := labels.ScratchBuilder{}
|
||||||
|
|
||||||
|
for i := 0; i < tc.numberOfLabels; i++ {
|
||||||
|
labels.Add(fmt.Sprintf("Name%v", i), fmt.Sprintf("Value%v", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
respBody, err := json.Marshal(labels.Labels())
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
hf := func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write(respBody)
|
||||||
|
}
|
||||||
|
h := CompressionHandler{
|
||||||
|
Handler: http.HandlerFunc(hf),
|
||||||
|
}
|
||||||
|
|
||||||
|
mux.Handle("/foo_endpoint", h)
|
||||||
|
|
||||||
|
req, _ := http.NewRequest("GET", server.URL+"/foo_endpoint", nil)
|
||||||
|
req.Header.Set(acceptEncodingHeader, tc.enc)
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
// Reusing the array to read the body and avoid allocation on the test
|
||||||
|
encRespBody := make([]byte, len(respBody))
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
require.NoError(b, err, "client get failed with unexpected error")
|
||||||
|
responseBodySize := 0
|
||||||
|
for {
|
||||||
|
n, err := resp.Body.Read(encRespBody)
|
||||||
|
responseBodySize += n
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ReportMetric(float64(responseBodySize), "ContentLength")
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
client.CloseIdleConnections()
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue