Add generic write path using grpc.

This uses a new proto format, with scope for multiple samples per
timeseries in future. This will allow users to pump samples out to
whatever they like without having to change the core Prometheus code.

There's also an example receiver to save users figuring out the
boilerplate themselves.
This commit is contained in:
Brian Brazil 2016-08-29 19:15:08 +02:00 committed by Julius Volz
parent 72475cfa84
commit 36d2c4bd0b
7 changed files with 414 additions and 0 deletions

View file

@ -206,6 +206,11 @@ func init() {
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
"The name of the database to use for storing samples in InfluxDB.",
)
cfg.fs.StringVar(
&cfg.remote.GenericAddress, "storage.remote.generic-address", "",
"The address of the generic remote server to send samples to via gRPC. None, if empty.",
)
cfg.fs.DurationVar(
&cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
"The timeout to use when sending samples to the remote storage.",

View file

@ -0,0 +1,17 @@
## Generic Remote Storage Example
This is a simple example of how to write a server to
recieve samples from the generic remote storage output.
To use it:
```
go build
remote_storage_generic
```
and then run Prometheus as:
```
./prometheus -storage.remote.generic-url http://localhost:1234/remote
```

View file

@ -0,0 +1,53 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"net"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/prometheus/prometheus/storage/remote/generic"
)
type server struct{}
func (server *server) Write(ctx context.Context, req *generic.GenericWriteRequest) (*generic.GenericWriteResponse, error) {
for _, ts := range req.Timeseries {
fmt.Printf("%s", ts.Name)
for _, l := range ts.Labels {
fmt.Printf(" %s=%s", l.Name, l.Value)
}
fmt.Printf("\n")
for _, s := range ts.Samples {
fmt.Printf(" %f %d\n", s.Value, s.TimestampMs)
}
}
return &generic.GenericWriteResponse{}, nil
}
func main() {
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
generic.RegisterGenericWriteServer(s, &server{})
s.Serve(lis)
}

View file

@ -0,0 +1,77 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package generic
import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/prometheus/common/model"
)
// Client allows sending batches of Prometheus samples to a http endpoint.
type Client struct {
conn *grpc.ClientConn
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(address string, timeout time.Duration) *Client {
// TODO: Do something with this error.
conn, _ := grpc.Dial(address, grpc.WithInsecure())
return &Client{
conn: conn,
timeout: timeout,
}
}
// Store sends a batch of samples to the http endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &GenericWriteRequest{}
for _, s := range samples {
ts := &TimeSeries{
Name: string(s.Metric[model.MetricNameLabel]),
}
for k, v := range s.Metric {
if k != model.MetricNameLabel {
ts.Labels = append(ts.Labels,
&LabelPair{
Name: string(k),
Value: string(v),
})
}
}
ts.Samples = []*Sample{
&Sample{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
client := NewGenericWriteClient(c.conn)
ctxt, _ := context.WithTimeout(context.Background(), c.timeout)
_, err := client.Write(ctxt, req)
if err != nil {
return err
}
return nil
}
// Name identifies the client as a genric client.
func (c Client) Name() string {
return "generic"
}

View file

@ -0,0 +1,211 @@
// Code generated by protoc-gen-go.
// source: generic.proto
// DO NOT EDIT!
/*
Package generic is a generated protocol buffer package.
It is generated from these files:
generic.proto
It has these top-level messages:
Sample
LabelPair
TimeSeries
GenericWriteRequest
GenericWriteResponse
*/
package generic
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
}
func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type LabelPair struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
}
func (m *LabelPair) Reset() { *m = LabelPair{} }
func (m *LabelPair) String() string { return proto.CompactTextString(m) }
func (*LabelPair) ProtoMessage() {}
func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type TimeSeries struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Labels []*LabelPair `protobuf:"bytes,2,rep,name=labels" json:"labels,omitempty"`
// Sorted by time, oldest sample first.
Samples []*Sample `protobuf:"bytes,3,rep,name=samples" json:"samples,omitempty"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *TimeSeries) GetLabels() []*LabelPair {
if m != nil {
return m.Labels
}
return nil
}
func (m *TimeSeries) GetSamples() []*Sample {
if m != nil {
return m.Samples
}
return nil
}
type GenericWriteRequest struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *GenericWriteRequest) Reset() { *m = GenericWriteRequest{} }
func (m *GenericWriteRequest) String() string { return proto.CompactTextString(m) }
func (*GenericWriteRequest) ProtoMessage() {}
func (*GenericWriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *GenericWriteRequest) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type GenericWriteResponse struct {
}
func (m *GenericWriteResponse) Reset() { *m = GenericWriteResponse{} }
func (m *GenericWriteResponse) String() string { return proto.CompactTextString(m) }
func (*GenericWriteResponse) ProtoMessage() {}
func (*GenericWriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func init() {
proto.RegisterType((*Sample)(nil), "generic.Sample")
proto.RegisterType((*LabelPair)(nil), "generic.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "generic.TimeSeries")
proto.RegisterType((*GenericWriteRequest)(nil), "generic.GenericWriteRequest")
proto.RegisterType((*GenericWriteResponse)(nil), "generic.GenericWriteResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
// Client API for GenericWrite service
type GenericWriteClient interface {
Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error)
}
type genericWriteClient struct {
cc *grpc.ClientConn
}
func NewGenericWriteClient(cc *grpc.ClientConn) GenericWriteClient {
return &genericWriteClient{cc}
}
func (c *genericWriteClient) Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) {
out := new(GenericWriteResponse)
err := grpc.Invoke(ctx, "/generic.GenericWrite/Write", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for GenericWrite service
type GenericWriteServer interface {
Write(context.Context, *GenericWriteRequest) (*GenericWriteResponse, error)
}
func RegisterGenericWriteServer(s *grpc.Server, srv GenericWriteServer) {
s.RegisterService(&_GenericWrite_serviceDesc, srv)
}
func _GenericWrite_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GenericWriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GenericWriteServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/generic.GenericWrite/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GenericWriteServer).Write(ctx, req.(*GenericWriteRequest))
}
return interceptor(ctx, in, info, handler)
}
var _GenericWrite_serviceDesc = grpc.ServiceDesc{
ServiceName: "generic.GenericWrite",
HandlerType: (*GenericWriteServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Write",
Handler: _GenericWrite_Write_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
}
func init() { proto.RegisterFile("generic.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 264 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x91, 0x4f, 0x4b, 0xc3, 0x40,
0x10, 0xc5, 0x4d, 0x63, 0x53, 0xfa, 0x5a, 0x11, 0xa6, 0x45, 0x82, 0x28, 0xe8, 0x9e, 0xaa, 0x87,
0x1e, 0x5a, 0xfc, 0x00, 0x5e, 0x14, 0x44, 0x41, 0xb6, 0xa2, 0x47, 0xd9, 0xca, 0x20, 0x81, 0xfc,
0x33, 0xbb, 0xd5, 0xaf, 0xef, 0x66, 0xb7, 0xdd, 0x56, 0xa8, 0xb7, 0x9d, 0x79, 0x2f, 0xbf, 0x79,
0x8f, 0xe0, 0xe8, 0x93, 0x4b, 0x6e, 0xb2, 0x8f, 0x69, 0xdd, 0x54, 0xa6, 0xa2, 0xde, 0x7a, 0x14,
0xb7, 0x48, 0x16, 0xaa, 0xa8, 0x73, 0xa6, 0x31, 0xba, 0xdf, 0x2a, 0x5f, 0x71, 0x1a, 0x5d, 0x44,
0x93, 0x48, 0xfa, 0x81, 0x2e, 0x31, 0x34, 0x59, 0xc1, 0xda, 0x58, 0xd3, 0x7b, 0xa1, 0xd3, 0x8e,
0x15, 0x63, 0x39, 0x08, 0xbb, 0x27, 0x2d, 0x6e, 0xd0, 0x7f, 0x54, 0x4b, 0xce, 0x9f, 0x55, 0xd6,
0x10, 0xe1, 0xb0, 0x54, 0x85, 0x87, 0xf4, 0xa5, 0x7b, 0x6f, 0xc9, 0x1d, 0xb7, 0xf4, 0x83, 0xf8,
0x01, 0x5e, 0x2c, 0x65, 0x61, 0x63, 0xb0, 0xde, 0xfb, 0xdd, 0x35, 0x92, 0xbc, 0x05, 0xb7, 0x57,
0xe3, 0xc9, 0x60, 0x46, 0xd3, 0x4d, 0x89, 0x70, 0x4f, 0xae, 0x1d, 0x74, 0x85, 0x9e, 0x76, 0x3d,
0x74, 0x1a, 0x3b, 0xf3, 0x71, 0x30, 0xfb, 0x7e, 0x72, 0xa3, 0x8b, 0x07, 0x8c, 0xee, 0xbd, 0xf4,
0xd6, 0x64, 0x86, 0x25, 0x7f, 0xad, 0x6c, 0x17, 0x9a, 0x03, 0xae, 0x95, 0xcb, 0x63, 0x73, 0xb4,
0x90, 0x51, 0x80, 0x6c, 0xa3, 0xca, 0x1d, 0x9b, 0x38, 0xc1, 0xf8, 0x2f, 0x4b, 0xd7, 0x55, 0xa9,
0x79, 0xf6, 0x8a, 0xe1, 0xee, 0x9e, 0xee, 0xd0, 0xf5, 0x8f, 0xb3, 0x40, 0xdc, 0x93, 0xe1, 0xf4,
0xfc, 0x1f, 0xd5, 0x53, 0xc5, 0xc1, 0x32, 0x71, 0xbf, 0x6f, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff,
0x40, 0x1e, 0x0f, 0xf8, 0xcf, 0x01, 0x00, 0x00,
}

View file

@ -0,0 +1,45 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package generic;
message Sample {
double value = 1;
int64 timestamp_ms = 2;
}
message LabelPair {
string name = 1;
string value = 2;
}
message TimeSeries {
string name = 1;
repeated LabelPair labels = 2;
// Sorted by time, oldest sample first.
repeated Sample samples = 3;
}
message GenericWriteRequest {
repeated TimeSeries timeseries = 1;
}
message GenericWriteResponse {
}
service GenericWrite {
rpc Write(GenericWriteRequest) returns (GenericWriteResponse) {}
}

View file

@ -24,6 +24,7 @@ import (
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/remote/generic"
"github.com/prometheus/prometheus/storage/remote/graphite"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
@ -69,6 +70,10 @@ func New(o *Options) *Storage {
prometheus.MustRegister(c)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if o.GenericAddress != "" {
c := generic.NewClient(o.GenericAddress, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if len(s.queues) == 0 {
return nil
}
@ -87,6 +92,7 @@ type Options struct {
GraphiteAddress string
GraphiteTransport string
GraphitePrefix string
GenericAddress string
}
// Run starts the background processing of the storage queues.