diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index cce47d198..860516882 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -48,6 +48,7 @@ var cfg = struct { remote remote.Options prometheusURL string + influxdbURL string }{} func init() { @@ -167,13 +168,17 @@ func init() { "The URL of the remote OpenTSDB server to send samples to. None, if empty.", ) cfg.fs.StringVar( - &cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "", + &cfg.influxdbURL, "storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.", ) cfg.fs.StringVar( &cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.", ) + cfg.fs.StringVar( + &cfg.remote.InfluxdbUsername, "storage.remote.influxdb.username", "", + "The username to use when sending samples to InfluxDB.", + ) cfg.fs.StringVar( &cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.", @@ -221,6 +226,20 @@ func parse(args []string) error { return err } + if err := parsePrometheusURL(); err != nil { + return err + } + + if err := parseInfluxdbURL(); err != nil { + return err + } + + cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW") + + return nil +} + +func parsePrometheusURL() error { if cfg.prometheusURL == "" { hostname, err := os.Hostname() if err != nil { @@ -244,7 +263,20 @@ func parse(args []string) error { ppref = "/" + ppref } cfg.web.ExternalURL.Path = ppref + return nil +} +func parseInfluxdbURL() error { + if cfg.influxdbURL == "" { + return nil + } + + url, err := url.Parse(cfg.influxdbURL) + if err != nil { + return err + } + + cfg.remote.InfluxdbURL = url return nil } diff --git a/storage/remote/influxdb/client.go b/storage/remote/influxdb/client.go index 618363ed6..7373ad50b 100644 --- a/storage/remote/influxdb/client.go +++ b/storage/remote/influxdb/client.go @@ -14,148 +14,96 @@ package influxdb import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" "math" - "net/http" - "net/url" - "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/log" - "github.com/prometheus/prometheus/util/httputil" -) - -const ( - writeEndpoint = "/write" - contentTypeJSON = "application/json" + influx "github.com/influxdb/influxdb/client" ) // Client allows sending batches of Prometheus samples to InfluxDB. type Client struct { - url string - httpClient *http.Client - retentionPolicy string + client *influx.Client database string + retentionPolicy string + ignoredSamples prometheus.Counter } // NewClient creates a new Client. -func NewClient(url string, timeout time.Duration, database, retentionPolicy string) *Client { +func NewClient(conf influx.Config, db string, rp string) *Client { + c, err := influx.NewClient(conf) + // Currently influx.NewClient() *should* never return an error. + if err != nil { + log.Fatal(err) + } + return &Client{ - url: url, - httpClient: httputil.NewDeadlineClient(timeout, nil), - retentionPolicy: retentionPolicy, - database: database, + client: c, + database: db, + retentionPolicy: rp, + ignoredSamples: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_influxdb_ignored_samples_total", + Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).", + }, + ), } } -// StoreSamplesRequest is used for building a JSON request for storing samples -// in InfluxDB. -type StoreSamplesRequest struct { - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Points []point `json:"points"` -} - -// point represents a single InfluxDB measurement. -type point struct { - Timestamp int64 `json:"timestamp"` - Precision string `json:"precision"` - Name model.LabelValue `json:"name"` - Tags model.LabelSet `json:"tags"` - Fields fields `json:"fields"` -} - -// fields represents the fields/columns sent to InfluxDB for a given measurement. -type fields struct { - Value model.SampleValue `json:"value"` -} - // tagsFromMetric extracts InfluxDB tags from a Prometheus metric. -func tagsFromMetric(m model.Metric) model.LabelSet { - tags := make(model.LabelSet, len(m)-1) +func tagsFromMetric(m model.Metric) map[string]string { + tags := make(map[string]string, len(m)-1) for l, v := range m { - if l == model.MetricNameLabel { - continue + if l != model.MetricNameLabel { + tags[string(l)] = string(v) } - tags[l] = v } return tags } // Store sends a batch of samples to InfluxDB via its HTTP API. func (c *Client) Store(samples model.Samples) error { - points := make([]point, 0, len(samples)) + points := make([]influx.Point, 0, len(samples)) for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { - // TODO(julius): figure out if it's possible to insert special float - // values into InfluxDB somehow. - log.Warnf("cannot send value %f to InfluxDB, skipping sample %#v", v, s) + log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s) + c.ignoredSamples.Inc() continue } - metric := s.Metric[model.MetricNameLabel] - points = append(points, point{ - Timestamp: s.Timestamp.UnixNano(), - Precision: "n", - Name: metric, - Tags: tagsFromMetric(s.Metric), - Fields: fields{ - Value: s.Value, + points = append(points, influx.Point{ + Measurement: string(s.Metric[model.MetricNameLabel]), + Tags: tagsFromMetric(s.Metric), + Time: s.Timestamp.Time(), + Precision: "ms", + Fields: map[string]interface{}{ + "value": v, }, }) } - u, err := url.Parse(c.url) - if err != nil { - return err - } - - u.Path = writeEndpoint - - req := StoreSamplesRequest{ + bps := influx.BatchPoints{ + Points: points, Database: c.database, RetentionPolicy: c.retentionPolicy, - Points: points, } - buf, err := json.Marshal(req) - if err != nil { - return err - } - - resp, err := c.httpClient.Post( - u.String(), - contentTypeJSON, - bytes.NewBuffer(buf), - ) - if err != nil { - return err - } - defer resp.Body.Close() - - // API returns status code 200 for successful writes. - // http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html#response - if resp.StatusCode == http.StatusOK { - return nil - } - - // API returns error details in the response content in JSON. - buf, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - var r map[string]string - if err := json.Unmarshal(buf, &r); err != nil { - return err - } - return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"]) + _, err := c.client.Write(bps) + return err } // Name identifies the client as an InfluxDB client. func (c Client) Name() string { return "influxdb" } + +// Describe implements prometheus.Collector. +func (c *Client) Describe(ch chan<- *prometheus.Desc) { + ch <- c.ignoredSamples.Desc() +} + +// Collect implements prometheus.Collector. +func (c *Client) Collect(ch chan<- prometheus.Metric) { + ch <- c.ignoredSamples +} diff --git a/storage/remote/influxdb/client_test.go b/storage/remote/influxdb/client_test.go index a72321438..38a5e1a68 100644 --- a/storage/remote/influxdb/client_test.go +++ b/storage/remote/influxdb/client_test.go @@ -18,9 +18,12 @@ import ( "math" "net/http" "net/http/httptest" + "net/url" "testing" "time" + influx "github.com/influxdb/influxdb/client" + "github.com/prometheus/common/model" ) @@ -44,43 +47,63 @@ func TestClient(t *testing.T) { }, { Metric: model.Metric{ - model.MetricNameLabel: "special_float_value", + model.MetricNameLabel: "nan_value", }, Timestamp: model.Time(123456789123), Value: model.SampleValue(math.NaN()), }, + { + Metric: model.Metric{ + model.MetricNameLabel: "pos_inf_value", + }, + Timestamp: model.Time(123456789123), + Value: model.SampleValue(math.Inf(1)), + }, + { + Metric: model.Metric{ + model.MetricNameLabel: "neg_inf_value", + }, + Timestamp: model.Time(123456789123), + Value: model.SampleValue(math.Inf(-1)), + }, } - expectedJSON := `{"database":"prometheus","retentionPolicy":"default","points":[{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value1"},"fields":{"value":"1.23"}},{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value2"},"fields":{"value":"5.1234"}}]}` + expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000 +testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 +` server := httptest.NewServer(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { t.Fatalf("Unexpected method; expected POST, got %s", r.Method) } - if r.URL.Path != writeEndpoint { - t.Fatalf("Unexpected path; expected %s, got %s", writeEndpoint, r.URL.Path) - } - ct := r.Header["Content-Type"] - if len(ct) != 1 { - t.Fatalf("Unexpected number of 'Content-Type' headers; got %d, want 1", len(ct)) - } - if ct[0] != contentTypeJSON { - t.Fatalf("Unexpected 'Content-type'; expected %s, got %s", contentTypeJSON, ct[0]) + if r.URL.Path != "/write" { + t.Fatalf("Unexpected path; expected %s, got %s", "/write", r.URL.Path) } b, err := ioutil.ReadAll(r.Body) if err != nil { t.Fatalf("Error reading body: %s", err) } - if string(b) != expectedJSON { - t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedJSON, string(b)) + if string(b) != expectedBody { + t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedBody, string(b)) } }, )) defer server.Close() - c := NewClient(server.URL, time.Minute, "prometheus", "default") + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("Unable to parse server URL %s: %s", server.URL, err) + } + + conf := influx.Config{ + URL: *serverURL, + Username: "testuser", + Password: "testpass", + Timeout: time.Minute, + } + c := NewClient(conf, "test_db", "default") if err := c.Store(samples); err != nil { t.Fatalf("Error sending samples: %s", err) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index fa6b65b67..6e33ee734 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -62,7 +62,8 @@ type StorageQueueManager struct { samplesCount *prometheus.CounterVec sendLatency prometheus.Summary - sendErrors prometheus.Counter + failedBatches prometheus.Counter + failedSamples prometheus.Counter queueLength prometheus.Gauge queueCapacity prometheus.Metric } @@ -92,15 +93,22 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "sent_latency_milliseconds", + Name: "send_latency_seconds", Help: "Latency quantiles for sending sample batches to the remote storage.", ConstLabels: constLabels, }), - sendErrors: prometheus.NewCounter(prometheus.CounterOpts{ + failedBatches: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "sent_errors_total", - Help: "Total number of errors sending sample batches to the remote storage.", + Name: "failed_batches_total", + Help: "Total number of sample batches that encountered an error while being sent to the remote storage.", + ConstLabels: constLabels, + }), + failedSamples: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_samples_total", + Help: "Total number of samples that encountered an error while being sent to the remote storage.", ConstLabels: constLabels, }), queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -151,6 +159,8 @@ func (t *StorageQueueManager) Stop() { func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { t.samplesCount.Describe(ch) t.sendLatency.Describe(ch) + ch <- t.failedBatches.Desc() + ch <- t.failedSamples.Desc() ch <- t.queueLength.Desc() ch <- t.queueCapacity.Desc() } @@ -160,6 +170,8 @@ func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { t.samplesCount.Collect(ch) t.sendLatency.Collect(ch) t.queueLength.Set(float64(len(t.queue))) + ch <- t.failedBatches + ch <- t.failedSamples ch <- t.queueLength ch <- t.queueCapacity } @@ -175,13 +187,14 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) { // floor. begin := time.Now() err := t.tsdb.Store(s) - duration := time.Since(begin) / time.Millisecond + duration := time.Since(begin) / time.Second labelValue := success if err != nil { log.Warnf("error sending %d samples to remote storage: %s", len(s), err) labelValue = failure - t.sendErrors.Inc() + t.failedBatches.Inc() + t.failedSamples.Add(float64(len(s))) } t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) t.sendLatency.Observe(float64(duration)) diff --git a/storage/remote/remote.go b/storage/remote/remote.go index aab7f34d3..e570cc57c 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -14,12 +14,15 @@ package remote import ( + "net/url" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + influx "github.com/influxdb/influxdb/client" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -49,8 +52,15 @@ func New(o *Options) *Storage { c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) } - if o.InfluxdbURL != "" { - c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) + if o.InfluxdbURL != nil { + conf := influx.Config{ + URL: *o.InfluxdbURL, + Username: o.InfluxdbUsername, + Password: o.InfluxdbPassword, + Timeout: o.StorageTimeout, + } + c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) + prometheus.MustRegister(c) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) } if len(s.queues) == 0 { @@ -62,8 +72,10 @@ func New(o *Options) *Storage { // Options contains configuration parameters for a remote storage. type Options struct { StorageTimeout time.Duration - InfluxdbURL string + InfluxdbURL *url.URL InfluxdbRetentionPolicy string + InfluxdbUsername string + InfluxdbPassword string InfluxdbDatabase string OpentsdbURL string } diff --git a/vendor/github.com/influxdb/influxdb/client/influxdb.go b/vendor/github.com/influxdb/influxdb/client/influxdb.go new file mode 100644 index 000000000..235beb964 --- /dev/null +++ b/vendor/github.com/influxdb/influxdb/client/influxdb.go @@ -0,0 +1,180 @@ +package client + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/influxdb/influxdb/tsdb" +) + +const ( + // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance + DefaultTimeout = 0 +) + +// Config is used to specify what server to connect to. +// URL: The URL of the server connecting to. +// Username/Password are optional. They will be passed via basic auth if provided. +// UserAgent: If not provided, will default "InfluxDBClient", +// Timeout: If not provided, will default to 0 (no timeout) +type Config struct { + URL url.URL + Username string + Password string + UserAgent string + Timeout time.Duration + Precision string +} + +// NewConfig will create a config to be used in connecting to the client +func NewConfig() Config { + return Config{ + Timeout: DefaultTimeout, + } +} + +// Client is used to make calls to the server. +type Client struct { + url url.URL + username string + password string + httpClient *http.Client + userAgent string + precision string +} + +const ( + ConsistencyOne = "one" + ConsistencyAll = "all" + ConsistencyQuorum = "quorum" + ConsistencyAny = "any" +) + +// NewClient will instantiate and return a connected client to issue commands to the server. +func NewClient(c Config) (*Client, error) { + client := Client{ + url: c.URL, + username: c.Username, + password: c.Password, + httpClient: &http.Client{Timeout: c.Timeout}, + userAgent: c.UserAgent, + precision: c.Precision, + } + if client.userAgent == "" { + client.userAgent = "InfluxDBClient" + } + return &client, nil +} + +// Write takes BatchPoints and allows for writing of multiple points with defaults +// If successful, error is nil and Response is nil +// If an error occurs, Response may contain additional information if populated. +func (c *Client) Write(bp BatchPoints) (*Response, error) { + u := c.url + u.Path = "write" + + var b bytes.Buffer + for _, p := range bp.Points { + if p.Raw != "" { + if _, err := b.WriteString(p.Raw); err != nil { + return nil, err + } + } else { + for k, v := range bp.Tags { + if p.Tags == nil { + p.Tags = make(map[string]string, len(bp.Tags)) + } + p.Tags[k] = v + } + + if _, err := b.WriteString(p.MarshalString()); err != nil { + return nil, err + } + } + + if err := b.WriteByte('\n'); err != nil { + return nil, err + } + } + + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + params := req.URL.Query() + params.Set("db", bp.Database) + params.Set("rp", bp.RetentionPolicy) + params.Set("precision", bp.Precision) + params.Set("consistency", bp.WriteConsistency) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + response.Err = err + return &response, err + } + + return nil, nil +} + +// Structs + +// Response represents a list of statement results. +type Response struct { + Err error +} + +// Point defines the fields that will be written to the database +// Measurement, Time, and Fields are required +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type Point struct { + Measurement string + Tags map[string]string + Time time.Time + Fields map[string]interface{} + Precision string + Raw string +} + +func (p *Point) MarshalString() string { + return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String() +} + +// BatchPoints is used to send batched data in a single write. +// Database and Points are required +// If no retention policy is specified, it will use the databases default retention policy. +// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored. +// If time is specified, it will be applied to any point with an empty time. +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type BatchPoints struct { + Points []Point `json:"points,omitempty"` + Database string `json:"database,omitempty"` + RetentionPolicy string `json:"retentionPolicy,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Time time.Time `json:"time,omitempty"` + Precision string `json:"precision,omitempty"` + WriteConsistency string `json:"-"` +} diff --git a/vendor/github.com/influxdb/influxdb/tsdb/points.go b/vendor/github.com/influxdb/influxdb/tsdb/points.go new file mode 100644 index 000000000..dd8dbb644 --- /dev/null +++ b/vendor/github.com/influxdb/influxdb/tsdb/points.go @@ -0,0 +1,1392 @@ +package tsdb + +import ( + "bytes" + "fmt" + "hash/fnv" + "regexp" + "sort" + "strconv" + "strings" + "time" +) + +// Point defines the values that will be written to the database +type Point interface { + Name() string + SetName(string) + + Tags() Tags + AddTag(key, value string) + SetTags(tags Tags) + + Fields() Fields + AddField(name string, value interface{}) + + Time() time.Time + SetTime(t time.Time) + UnixNano() int64 + + HashID() uint64 + Key() []byte + + Data() []byte + SetData(buf []byte) + + String() string +} + +// Points represents a sortable list of points by timestamp. +type Points []Point + +func (a Points) Len() int { return len(a) } +func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) } +func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// point is the default implementation of Point. +type point struct { + time time.Time + + // text encoding of measurement and tags + // key must always be stored sorted by tags, if the original line was not sorted, + // we need to resort it + key []byte + + // text encoding of field data + fields []byte + + // text encoding of timestamp + ts []byte + + // binary encoded field data + data []byte + + // cached version of parsed fields from data + cachedFields map[string]interface{} + + // cached version of parsed name from key + cachedName string +} + +const ( + // the number of characters for the largest possible int64 (9223372036854775807) + maxInt64Digits = 19 + + // the number of characters for the smallest possible int64 (-9223372036854775808) + minInt64Digits = 20 + + // the number of characters required for the largest float64 before a range check + // would occur during parsing + maxFloat64Digits = 25 + + // the number of characters required for smallest float64 before a range check occur + // would occur during parsing + minFloat64Digits = 27 +) + +var ( + // Compile the regex that detects unquoted double quote sequences + quoteReplacer = regexp.MustCompile(`([^\\])"`) + + escapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + '"': []byte(`\"`), + ' ': []byte(`\ `), + '=': []byte(`\=`), + } + + escapeCodesStr = map[string]string{} + + measurementEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + } + + tagEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + '=': []byte(`\=`), + } +) + +func init() { + for k, v := range escapeCodes { + escapeCodesStr[string(k)] = string(v) + } +} + +func ParsePointsString(buf string) ([]Point, error) { + return ParsePoints([]byte(buf)) +} + +// ParsePoints returns a slice of Points from a text representation of a point +// with each point separated by newlines. +func ParsePoints(buf []byte) ([]Point, error) { + return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") +} + +func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { + points := []Point{} + var ( + pos int + block []byte + ) + for { + pos, block = scanLine(buf, pos) + pos += 1 + + if len(block) == 0 { + break + } + + // lines which start with '#' are comments + start := skipWhitespace(block, 0) + + // If line is all whitespace, just skip it + if start >= len(block) { + continue + } + + if block[start] == '#' { + continue + } + + // strip the newline if one is present + if block[len(block)-1] == '\n' { + block = block[:len(block)-1] + } + + pt, err := parsePoint(block[start:len(block)], defaultTime, precision) + if err != nil { + return nil, fmt.Errorf("unable to parse '%s': %v", string(block[start:len(block)]), err) + } + points = append(points, pt) + + if pos >= len(buf) { + break + } + + } + return points, nil + +} + +func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { + // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + + if err != nil { + return nil, err + } + + pt := &point{ + key: key, + fields: fields, + ts: ts, + } + + if len(ts) == 0 { + pt.time = defaultTime + pt.SetPrecision(precision) + } else { + ts, err := strconv.ParseInt(string(ts), 10, 64) + if err != nil { + return nil, err + } + pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)) + } + return pt, nil +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + + i = start + + // Determines whether the tags are sort, assume they are + sorted := true + + // indices holds the indexes within buf of the start of each tag. For example, + // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] + // which indicates that the first tag starts at buf[4], seconds at buf[11], and + // last at buf[20] + indices := make([]int, 100) + + // tracks how many commas we've seen so we know how many values are indices. + // Since indices is an arbitrarily large slice, + // we need to know how many values in the buffer are in use. + commas := 0 + + // tracks whether we've see an '=' + equals := 0 + + // loop over each byte in buf + for { + // reached the end of buf? + if i >= len(buf) { + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + + break + } + + // equals is special in the tags section. It must be escaped if part of a tag name or value. + // It does not need to be escaped if part of the measurement. + if buf[i] == '=' && commas > 0 { + if i-1 < 0 || i-2 < 0 { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + // Check for "cpu,=value" but allow "cpu,a\,=value" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + // Check for "cpu,\ =value" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing tag name") + } + + i += 1 + equals += 1 + + // Check for "cpu,a=1,b= value=1" + if i < len(buf) && buf[i] == ' ' { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + continue + } + + // escaped character + if buf[i] == '\\' { + i += 2 + continue + } + + // At a tag separator (comma), track it's location + if buf[i] == ',' { + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + i += 1 + + // grow our indices slice if we have too many tags + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + indices[commas] = i + commas += 1 + + // Check for "cpu, value=1" + if i < len(buf) && buf[i] == ' ' { + return i, buf[start:i], fmt.Errorf("missing tag key") + } + continue + } + + // reached end of the block? (next block would be fields) + if buf[i] == ' ' { + // check for "cpu,tag value=1" + if equals == 0 && commas > 0 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + if equals > 0 && commas-1 != equals-1 { + return i, buf[start:i], fmt.Errorf("missing tag value") + } + + // grow our indices slice if we have too many tags + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + + indices[commas] = i + 1 + break + } + + i += 1 + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + // We're using commas -1 because there should always be a comma after measurement + if equals > 0 && commas-1 != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid tag format") + } + + // This check makes sure we actually received fields from the user. #3379 + // This will catch invalid syntax such as: `cpu,host=serverA,region=us-west` + if i >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing fields") + } + + // Now we know where the key region is within buf, and the locations of tags, we + // need to deterimine if duplicate tags exist and if the tags are sorted. This iterates + // 1/2 of the list comparing each end with each other, walking towards the center from + // both sides. + for j := 0; j < commas/2; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + _, right := scanTo(buf[indices[commas-j-1]:indices[commas-j]-1], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort + if bytes.Equal(left, right) { + return i, buf[start:i], fmt.Errorf("duplicate tags") + } + + // If left is greater than right, the tags are not sorted. We must continue + // since their could be duplicate tags still. + if bytes.Compare(left, right) > 0 { + sorted = false + } + } + + // If the tags are not sorted, then sort them. This sort is inline and + // uses the tag indices we created earlier. The actual buffer is not sorted, the + // indices are using the buffer for value comparison. After the indices are sorted, + // the buffer is reconstructed from the sorted indices. + if !sorted && commas > 0 { + // Get the measurement name for later + measurement := buf[start : indices[0]-1] + + // Sort the indices + indices := indices[:commas] + insertionSort(0, commas, buf, indices) + + // Create a new key using the measurement and sorted indices + b := make([]byte, len(buf[start:i])) + pos := copy(b, measurement) + for _, i := range indices { + b[pos] = ',' + pos += 1 + _, v := scanToSpaceOr(buf, i, ',') + pos += copy(b[pos:], v) + } + + return i, b, nil + } + + return i, buf[start:i], nil +} + +func insertionSort(l, r int, buf []byte, indices []int) { + for i := l + 1; i < r; i++ { + for j := i; j > l && less(buf, indices, j, j-1); j-- { + indices[j], indices[j-1] = indices[j-1], indices[j] + } + } +} + +func less(buf []byte, indices []int, i, j int) bool { + // This grabs the tag names for i & j, it ignores the values + _, a := scanTo(buf, indices[i], '=') + _, b := scanTo(buf, indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +func isFieldEscapeChar(b byte) bool { + for c := range escapeCodes { + if c == b { + return true + } + } + return false +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + + // tracks how many '=' we've seen + equals := 0 + + // tracks how many commas we've seen + commas := 0 + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped characters? + if buf[i] == '\\' && i+1 < len(buf) { + + // Is this an escape char within a string field? Only " and \ are allowed. + if quoted && (buf[i+1] == '"' || buf[i+1] == '\\') { + i += 2 + continue + // Non-string field escaped chars + } else if !quoted && isFieldEscapeChar(buf[i+1]) { + i += 2 + continue + } + } + + // If the value is quoted, scan until we get to the end quote + if buf[i] == '"' { + quoted = !quoted + i += 1 + continue + } + + // If we see an =, ensure that there is at least on char before and after it + if buf[i] == '=' && !quoted { + equals += 1 + + // check for "... =123" but allow "a\ =123" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field name") + } + + // check for "...a=123,=456" but allow "a=123,a\,=456" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field name") + } + + // check for "... value=" + if i+1 >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + // check for "... value=,value2=..." + if buf[i+1] == ',' || buf[i+1] == ' ' { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { + var err error + i, err = scanNumber(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + // If next byte is not a double-quote, the value must be a boolean + if buf[i+1] != '"' { + var err error + i, _, err = scanBoolean(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + } + + if buf[i] == ',' && !quoted { + commas += 1 + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i += 1 + } + + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + if equals == 0 || commas != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid field format") + } + + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It returns +// the ending position and the byte slice of the fields within buf and error if the +// timestamp is not in the correct numeric format +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Timestamps should integers, make sure they are so we don't need to actually + // parse the timestamp until needed + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + + // reached end of block? + if buf[i] == '\n' { + break + } + i += 1 + } + return i, buf[start:i], nil +} + +func isNumeric(b byte) bool { + return (b >= '0' && b <= '9') || b == '.' +} + +// scanNumber returns the end position within buf, start at i after +// scanning over buf for an integer, or float. It returns an +// error if a invalid number is scanned. +func scanNumber(buf []byte, i int) (int, error) { + start := i + var isInt bool + + // Is negative number? + if i < len(buf) && buf[i] == '-' { + i += 1 + } + + // how many decimal points we've see + decimals := 0 + + // indicates the number is float in scientific notation + scientific := false + + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + + if buf[i] == 'i' && i > start && !isInt { + isInt = true + i += 1 + continue + } + + if buf[i] == '.' { + decimals += 1 + } + + // Can't have more than 1 decimal (e.g. 1.1.1 should fail) + if decimals > 1 { + return i, fmt.Errorf("invalid number") + } + + // `e` is valid for floats but not as the first char + if i > start && (buf[i] == 'e') { + scientific = true + i += 1 + continue + } + + // + and - are only valid at this point if they follow an e (scientific notation) + if (buf[i] == '+' || buf[i] == '-') && buf[i-1] == 'e' { + i += 1 + continue + } + + // NaN is a valid float + if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { + if (buf[i+1] == 'a' || buf[i+1] == 'A') && (buf[i+2] == 'N' || buf[i+2] == 'n') { + i += 3 + continue + } + return i, fmt.Errorf("invalid number") + } + if !isNumeric(buf[i]) { + return i, fmt.Errorf("invalid number") + } + i += 1 + } + if isInt && (decimals > 0 || scientific) { + return i, fmt.Errorf("invalid number") + } + + // It's more common that numbers will be within min/max range for their type but we need to prevent + // out or range numbers from being parsed successfully. This uses some simple heuristics to decide + // if we should parse the number to the actual type. It does not do it all the time because it incurs + // extra allocations and we end up converting the type again when writing points to disk. + if isInt { + // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) + if buf[i-1] != 'i' { + return i, fmt.Errorf("invalid number") + } + // Parse the int to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `i` from our tests + if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { + if _, err := strconv.ParseInt(string(buf[start:i-1]), 10, 64); err != nil { + return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) + } + } + } else { + // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range + if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { + if _, err := strconv.ParseFloat(string(buf[start:i]), 10); err != nil { + return i, fmt.Errorf("invalid float") + } + } + } + + return i, nil +} + +// scanBoolean returns the end position within buf, start at i after +// scanning over buf for boolean. Valid values for a boolean are +// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean +// is scanned. +func scanBoolean(buf []byte, i int) (int, []byte, error) { + start := i + + if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + i += 1 + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + i += 1 + } + + // Single char bool (t, T, f, F) is ok + if i-start == 1 { + return i, buf[start:i], nil + } + + // length must be 4 for true or TRUE + if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // length must be 5 for false or FALSE + if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // Otherwise + valid := false + switch buf[start] { + case 't': + valid = bytes.Equal(buf[start:i], []byte("true")) + case 'f': + valid = bytes.Equal(buf[start:i], []byte("false")) + case 'T': + valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) + case 'F': + valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) + } + + if !valid { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + return i, buf[start:i], nil + +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags +func skipWhitespace(buf []byte, i int) int { + for { + if i >= len(buf) { + return i + } + + if buf[i] == ' ' || buf[i] == '\t' { + i += 1 + continue + } + break + } + return i +} + +// scanLine returns the end position in buf and the next line found within +// buf. +func scanLine(buf []byte, i int) (int, []byte) { + start := i + quoted := false + fields := false + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == ' ' { + fields = true + } + + // If we see a double quote, makes sure it is not escaped + if fields && buf[i] == '"' && (i-1 > 0 && buf[i-1] != '\\') { + i += 1 + quoted = !quoted + continue + } + + if buf[i] == '\n' && !quoted { + break + } + + i += 1 + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces or escaped chars, they are skipped. +func scanTo(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + + // reached end of block? + if buf[i] == stop { + break + } + i += 1 + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + // reached end of block? + if buf[i] == stop || buf[i] == ' ' { + break + } + i += 1 + } + + return i, buf[start:i] +} + +func scanTagValue(buf []byte, i int) (int, []byte) { + start := i + for { + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + + if buf[i] == ',' { + break + } + i += 1 + } + return i, buf[start:i] +} + +func scanFieldValue(buf []byte, i int) (int, []byte) { + start := i + quoted := false + for { + if i >= len(buf) { + break + } + + // Only escape char for a field value is a double-quote + if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' { + i += 2 + continue + } + + // Quoted value? (e.g. string) + if buf[i] == '"' { + i += 1 + quoted = !quoted + continue + } + + if buf[i] == ',' && !quoted { + break + } + i += 1 + } + return i, buf[start:i] +} + +func escapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func unescapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func escapeTag(in []byte) []byte { + for b, esc := range tagEscapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func unescapeTag(in []byte) []byte { + for b, esc := range tagEscapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func escape(in []byte) []byte { + for b, esc := range escapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func escapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, b, esc, -1) + } + return in +} + +func unescape(in []byte) []byte { + i := 0 + inLen := len(in) + var out []byte + + for { + if i >= inLen { + break + } + if in[i] == '\\' && i+1 < inLen { + switch in[i+1] { + case ',': + out = append(out, ',') + i += 2 + continue + case '"': + out = append(out, '"') + i += 2 + continue + case ' ': + out = append(out, ' ') + i += 2 + continue + case '=': + out = append(out, '=') + i += 2 + continue + } + } + out = append(out, in[i]) + i += 1 + } + return out +} + +func unescapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, esc, b, -1) + } + return in +} + +// escapeStringField returns a copy of in with any double quotes or +// backslashes with escaped values +func escapeStringField(in string) string { + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // escape double-quotes + if in[i] == '\\' { + out = append(out, '\\') + out = append(out, '\\') + i += 1 + continue + } + // escape double-quotes + if in[i] == '"' { + out = append(out, '\\') + out = append(out, '"') + i += 1 + continue + } + out = append(out, in[i]) + i += 1 + + } + return string(out) +} + +// unescapeStringField returns a copy of in with any escaped double-quotes +// or backslashes unescaped +func unescapeStringField(in string) string { + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // unescape backslashes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' { + out = append(out, '\\') + i += 2 + continue + } + // unescape double-quotes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' { + out = append(out, '"') + i += 2 + continue + } + out = append(out, in[i]) + i += 1 + + } + return string(out) +} + +// NewPoint returns a new point with the given measurement name, tags, fields and timestamp +func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point { + return &point{ + key: MakeKey([]byte(name), tags), + time: time, + fields: fields.MarshalBinary(), + } +} + +func (p *point) Data() []byte { + return p.data +} + +func (p *point) SetData(b []byte) { + p.data = b +} + +func (p *point) Key() []byte { + return p.key +} + +func (p *point) name() []byte { + _, name := scanTo(p.key, 0, ',') + return name +} + +// Name return the measurement name for the point +func (p *point) Name() string { + if p.cachedName != "" { + return p.cachedName + } + p.cachedName = string(unescape(p.name())) + return p.cachedName +} + +// SetName updates the measurement name for the point +func (p *point) SetName(name string) { + p.cachedName = "" + p.key = MakeKey([]byte(name), p.Tags()) +} + +// Time return the timestamp for the point +func (p *point) Time() time.Time { + return p.time +} + +// SetTime updates the timestamp for the point +func (p *point) SetTime(t time.Time) { + p.time = t +} + +// Tags returns the tag set for the point +func (p *point) Tags() Tags { + tags := map[string]string{} + + if len(p.key) != 0 { + pos, name := scanTo(p.key, 0, ',') + + // it's an empyt key, so there are no tags + if len(name) == 0 { + return tags + } + + i := pos + 1 + var key, value []byte + for { + if i >= len(p.key) { + break + } + i, key = scanTo(p.key, i, '=') + i, value = scanTagValue(p.key, i+1) + + tags[string(unescapeTag(key))] = string(unescapeTag(value)) + + i += 1 + } + } + return tags +} + +func MakeKey(name []byte, tags Tags) []byte { + // unescape the name and then re-escape it to avoid double escaping. + // The key should always be stored in escaped form. + return append(escapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) +} + +// SetTags replaces the tags for the point +func (p *point) SetTags(tags Tags) { + p.key = MakeKey([]byte(p.Name()), tags) +} + +// AddTag adds or replaces a tag value for a point +func (p *point) AddTag(key, value string) { + tags := p.Tags() + tags[key] = value + p.key = MakeKey([]byte(p.Name()), tags) +} + +// Fields returns the fields for the point +func (p *point) Fields() Fields { + if p.cachedFields != nil { + return p.cachedFields + } + p.cachedFields = p.unmarshalBinary() + return p.cachedFields +} + +// AddField adds or replaces a field value for a point +func (p *point) AddField(name string, value interface{}) { + fields := p.Fields() + fields[name] = value + p.fields = fields.MarshalBinary() + p.cachedFields = nil +} + +// SetPrecision will round a time to the specified precision +func (p *point) SetPrecision(precision string) { + switch precision { + case "n": + case "u": + p.SetTime(p.Time().Truncate(time.Microsecond)) + case "ms": + p.SetTime(p.Time().Truncate(time.Millisecond)) + case "s": + p.SetTime(p.Time().Truncate(time.Second)) + case "m": + p.SetTime(p.Time().Truncate(time.Minute)) + case "h": + p.SetTime(p.Time().Truncate(time.Hour)) + } +} + +// GetPrecisionMultiplier will return a multiplier for the precision specified +func (p *point) GetPrecisionMultiplier(precision string) int64 { + d := time.Nanosecond + switch precision { + case "u": + d = time.Microsecond + case "ms": + d = time.Millisecond + case "s": + d = time.Second + case "m": + d = time.Minute + case "h": + d = time.Hour + } + return int64(d) +} + +func (p *point) String() string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), p.UnixNano()) +} + +func (p *point) unmarshalBinary() Fields { + return newFieldsFromBinary(p.fields) +} + +func (p *point) HashID() uint64 { + h := fnv.New64a() + h.Write(p.key) + sum := h.Sum64() + return sum +} + +func (p *point) UnixNano() int64 { + return p.Time().UnixNano() +} + +type Tags map[string]string + +func (t Tags) HashKey() []byte { + // Empty maps marshal to empty bytes. + if len(t) == 0 { + return nil + } + + escaped := Tags{} + for k, v := range t { + ek := escapeTag([]byte(k)) + ev := escapeTag([]byte(v)) + escaped[string(ek)] = string(ev) + } + + // Extract keys and determine final size. + sz := len(escaped) + (len(escaped) * 2) // separators + keys := make([]string, len(escaped)+1) + i := 0 + for k, v := range escaped { + keys[i] = k + i += 1 + sz += len(k) + len(v) + } + keys = keys[:i] + sort.Strings(keys) + // Generate marshaled bytes. + b := make([]byte, sz) + buf := b + idx := 0 + for _, k := range keys { + buf[idx] = ',' + idx += 1 + copy(buf[idx:idx+len(k)], k) + idx += len(k) + buf[idx] = '=' + idx += 1 + v := escaped[k] + copy(buf[idx:idx+len(v)], v) + idx += len(v) + } + return b[:idx] +} + +type Fields map[string]interface{} + +func parseNumber(val []byte) (interface{}, error) { + if val[len(val)-1] == 'i' { + val = val[:len(val)-1] + return strconv.ParseInt(string(val), 10, 64) + } + for i := 0; i < len(val); i++ { + // If there is a decimal or an N (NaN), I (Inf), parse as float + if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' { + return strconv.ParseFloat(string(val), 64) + } + if val[i] < '0' && val[i] > '9' { + return string(val), nil + } + } + return strconv.ParseFloat(string(val), 64) +} + +func newFieldsFromBinary(buf []byte) Fields { + fields := Fields{} + var ( + i int + name, valueBuf []byte + value interface{} + err error + ) + for { + if i >= len(buf) { + break + } + + i, name = scanTo(buf, i, '=') + if len(name) == 0 { + continue + } + name = unescape(name) + + i, valueBuf = scanFieldValue(buf, i+1) + if len(valueBuf) == 0 { + fields[string(name)] = nil + continue + } + + // If the first char is a double-quote, then unmarshal as string + if valueBuf[0] == '"' { + value = unescapeStringField(string(valueBuf[1 : len(valueBuf)-1])) + // Check for numeric characters and special NaN or Inf + } else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '+' || valueBuf[0] == '.' || + valueBuf[0] == 'N' || valueBuf[0] == 'n' || // NaN + valueBuf[0] == 'I' || valueBuf[0] == 'i' { // Inf + + value, err = parseNumber(valueBuf) + if err != nil { + panic(fmt.Sprintf("unable to parse number value '%v': %v", string(valueBuf), err)) + } + + // Otherwise parse it as bool + } else { + value, err = strconv.ParseBool(string(valueBuf)) + if err != nil { + panic(fmt.Sprintf("unable to parse bool value '%v': %v\n", string(valueBuf), err)) + } + } + fields[string(name)] = value + i += 1 + } + return fields +} + +// MarshalBinary encodes all the fields to their proper type and returns the binary +// represenation +// NOTE: uint64 is specifically not supported due to potential overflow when we decode +// again later to an int64 +func (p Fields) MarshalBinary() []byte { + b := []byte{} + keys := make([]string, len(p)) + i := 0 + for k, _ := range p { + keys[i] = k + i += 1 + } + sort.Strings(keys) + + for _, k := range keys { + v := p[k] + b = append(b, []byte(escapeString(k))...) + b = append(b, '=') + switch t := v.(type) { + case int: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int8: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int16: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int32: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case int64: + b = append(b, []byte(strconv.FormatInt(t, 10))...) + b = append(b, 'i') + case uint: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint8: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint16: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case uint32: + b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) + b = append(b, 'i') + case float32: + val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32)) + b = append(b, val...) + case float64: + val := []byte(strconv.FormatFloat(t, 'f', -1, 64)) + b = append(b, val...) + case bool: + b = append(b, []byte(strconv.FormatBool(t))...) + case []byte: + b = append(b, t...) + case string: + b = append(b, '"') + b = append(b, []byte(escapeStringField(t))...) + b = append(b, '"') + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(escapeStringField(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + + } + b = append(b, ',') + } + if len(b) > 0 { + return b[0 : len(b)-1] + } + return b +} + +type indexedSlice struct { + indices []int + b []byte +} + +func (s *indexedSlice) Less(i, j int) bool { + _, a := scanTo(s.b, s.indices[i], '=') + _, b := scanTo(s.b, s.indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +func (s *indexedSlice) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} + +func (s *indexedSlice) Len() int { + return len(s.indices) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index fab45f487..2f64f88e9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -32,6 +32,16 @@ "revision": "34f98f7bdf2eec7517e3aac44691566963152721", "revisionTime": "2015-09-08T11:01:49-04:00" }, + { + "path": "github.com/influxdb/influxdb/client", + "revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", + "revisionTime": "2015-09-16T14:41:53+02:00" + }, + { + "path": "github.com/influxdb/influxdb/tsdb", + "revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", + "revisionTime": "2015-09-16T14:41:53+02:00" + }, { "path": "github.com/julienschmidt/httprouter", "revision": "109e267447e95ad1bb48b758e40dd7453eb7b039", @@ -163,4 +173,4 @@ "revisionTime": "2015-06-24T11:29:02+01:00" } ] -} \ No newline at end of file +}