mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Switch back to protos over HTTP, instead of GRPC.
My aim is to support the new grpc generic write path in Frankenstein. On the surface this seems easy - however I've hit a number of problems that make me think it might be better to not use grpc just yet. The explanation of the problems requires a little background. At weave, traffic to frankenstein need to go through a couple of services first, for SSL and to be authenticated. So traffic goes: internet -> frontend -> authfe -> frankenstein - The frontend is Nginx, and adds/removes SSL. Its done this way for legacy reasons, so the certs can be managed in one place, although eventually we imagine we'll merge it with authfe. All traffic from frontend is sent to authfe. - Authfe checks the auth tokens / cookie etc and then picks the service to forward the RPC to. - Frankenstein accepts the reads and does the right thing with them. First problem I hit was Nginx won't proxy http2 requests - it can accept them, but all calls downstream are http1 (see https://trac.nginx.org/nginx/ticket/923). This wasn't such a big deal, so it now looks like: internet --(grpc/http2)--> frontend --(grpc/http1)--> authfe --(grpc/http1)--> frankenstein Next problem was golang grpc server won't accept http1 requests (see https://groups.google.com/forum/#!topic/grpc-io/JnjCYGPMUms). It is possible to link a grpc server in with a normal go http mux, as long as the mux server is serving over SSL, as the golang http client & server won't do http2 over anything other than an SSL connection. This would require making all our service to service comms SSL. So I had a go a writing a grpc http1 server, and got pretty far. But is was a bit of a mess. So finally I thought I'd make a separate grpc frontend for this, running in parallel with the frontend/authfe combo on a different port - and first up I'd need a grpc reverse proxy. Ideally we'd have some nice, generic reverse proxy that only knew about a map from service names -> downstream service, and didn't need to decode & re-encode every request as it went through. It seems like this can't be done with golang's grpc library - see https://github.com/mwitkow/grpc-proxy/issues/1. And then I was surprised to find you can't do grpc from browsers! See http://www.grpc.io/faq/ - not important to us, but I'm starting to question why we decided to use grpc in the first place? It would seem we could have most of the benefits of grpc with protos over HTTP, and this wouldn't preclude moving to grpc when its a bit more mature? In fact, the grcp FAQ even admits as much: > Why is gRPC better than any binary blob over HTTP/2? > This is largely what gRPC is on the wire.
This commit is contained in:
parent
e0989fde89
commit
d83879210c
|
@ -207,8 +207,8 @@ func init() {
|
|||
"The name of the database to use for storing samples in InfluxDB.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.Address, "experimental.storage.remote.address", "",
|
||||
"The address of the remote server to send samples to. None, if empty. EXPERIMENTAL.",
|
||||
&cfg.remote.URL, "experimental.storage.remote.url", "",
|
||||
"The URL of the remote endpoint to send samples to. None, if empty. EXPERIMENTAL.",
|
||||
)
|
||||
|
||||
cfg.fs.DurationVar(
|
||||
|
|
|
@ -15,16 +15,14 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
|
@ -46,23 +44,34 @@ func (server *server) Write(ctx context.Context, req *remote.WriteRequest) (*rem
|
|||
return &remote.WriteResponse{}, nil
|
||||
}
|
||||
|
||||
type snappyDecompressor struct{}
|
||||
|
||||
func (d *snappyDecompressor) Do(r io.Reader) ([]byte, error) {
|
||||
sr := snappy.NewReader(r)
|
||||
return ioutil.ReadAll(sr)
|
||||
}
|
||||
|
||||
func (d *snappyDecompressor) Type() string {
|
||||
return "snappy"
|
||||
}
|
||||
|
||||
func main() {
|
||||
lis, err := net.Listen("tcp", ":1234")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to listen: %v", err)
|
||||
}
|
||||
s := grpc.NewServer(grpc.RPCDecompressor(&snappyDecompressor{}))
|
||||
remote.RegisterWriteServer(s, &server{})
|
||||
s.Serve(lis)
|
||||
http.Handle("/push", AppenderHandler(&server{}))
|
||||
http.ListenAndServe(":1234", nil)
|
||||
}
|
||||
|
||||
type WriteServer interface {
|
||||
Write(context.Context, *remote.WriteRequest) (*remote.WriteResponse, error)
|
||||
}
|
||||
|
||||
// AppenderHandler returns a http.Handler that accepts proto encoded samples.
|
||||
func AppenderHandler(s WriteServer) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var req remote.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := s.Write(context.Background(), &req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -14,38 +14,29 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
|
||||
type Client struct {
|
||||
client WriteClient
|
||||
timeout time.Duration
|
||||
url string
|
||||
client http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(address string, timeout time.Duration) (*Client, error) {
|
||||
conn, err := grpc.Dial(
|
||||
address,
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithTimeout(timeout),
|
||||
grpc.WithCompressor(&snappyCompressor{}),
|
||||
)
|
||||
if err != nil {
|
||||
// grpc.Dial() returns immediately and doesn't error when the server is
|
||||
// unreachable when not passing in the WithBlock() option. The client then
|
||||
// will continuously try to (re)establish the connection in the background.
|
||||
// So this will only return here if some other uncommon error occurred.
|
||||
return nil, err
|
||||
}
|
||||
func NewClient(url string, timeout time.Duration) (*Client, error) {
|
||||
return &Client{
|
||||
client: NewWriteClient(conn),
|
||||
timeout: timeout,
|
||||
url: url,
|
||||
client: http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -74,13 +65,29 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
req.Timeseries = append(req.Timeseries, ts)
|
||||
}
|
||||
|
||||
ctxt, cancel := context.WithTimeout(context.TODO(), c.timeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := c.client.Write(ctxt, req)
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequest("POST", c.url, &buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||
httpResp, err := c.client.Do(httpReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer httpResp.Body.Close()
|
||||
if httpResp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("server returned HTTP status %s", httpResp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -69,8 +69,8 @@ func New(o *Options) (*Storage, error) {
|
|||
prometheus.MustRegister(c)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.Address != "" {
|
||||
c, err := NewClient(o.Address, o.StorageTimeout)
|
||||
if o.URL != "" {
|
||||
c, err := NewClient(o.URL, o.StorageTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -94,9 +94,9 @@ type Options struct {
|
|||
GraphiteAddress string
|
||||
GraphiteTransport string
|
||||
GraphitePrefix string
|
||||
// TODO: This just being called "Address" will make more sense once the
|
||||
// TODO: This just being called "URL" will make more sense once the
|
||||
// other remote storage mechanisms are removed.
|
||||
Address string
|
||||
URL string
|
||||
}
|
||||
|
||||
// Run starts the background processing of the storage queues.
|
||||
|
|
|
@ -21,11 +21,6 @@ import proto "github.com/golang/protobuf/proto"
|
|||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
|
@ -114,78 +109,6 @@ func init() {
|
|||
proto.RegisterType((*WriteResponse)(nil), "remote.WriteResponse")
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion3
|
||||
|
||||
// Client API for Write service
|
||||
|
||||
type WriteClient interface {
|
||||
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
|
||||
}
|
||||
|
||||
type writeClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewWriteClient(cc *grpc.ClientConn) WriteClient {
|
||||
return &writeClient{cc}
|
||||
}
|
||||
|
||||
func (c *writeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
|
||||
out := new(WriteResponse)
|
||||
err := grpc.Invoke(ctx, "/remote.Write/Write", in, out, c.cc, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Server API for Write service
|
||||
|
||||
type WriteServer interface {
|
||||
Write(context.Context, *WriteRequest) (*WriteResponse, error)
|
||||
}
|
||||
|
||||
func RegisterWriteServer(s *grpc.Server, srv WriteServer) {
|
||||
s.RegisterService(&_Write_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _Write_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(WriteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(WriteServer).Write(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/remote.Write/Write",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(WriteServer).Write(ctx, req.(*WriteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
var _Write_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "remote.Write",
|
||||
HandlerType: (*WriteServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "Write",
|
||||
Handler: _Write_Write_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: fileDescriptor0,
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
type snappyCompressor struct{}
|
||||
|
||||
func (c *snappyCompressor) Do(w io.Writer, p []byte) error {
|
||||
sw := snappy.NewWriter(w)
|
||||
if _, err := sw.Write(p); err != nil {
|
||||
return err
|
||||
}
|
||||
return sw.Close()
|
||||
}
|
||||
|
||||
func (c *snappyCompressor) Type() string {
|
||||
return "snappy"
|
||||
}
|
Loading…
Reference in a new issue