latest tweaks

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>
This commit is contained in:
Alex Greenbank 2024-04-04 13:27:28 +00:00
parent 377fc5a625
commit ae9d2a772a
3 changed files with 17 additions and 20 deletions

View file

@ -78,55 +78,55 @@ func TestContentNegotiation(t *testing.T) {
rwFormat config.RemoteWriteFormat rwFormat config.RemoteWriteFormat
steps []contentNegotiationStep steps []contentNegotiationStep
}{ }{
// Test a simple case where the v2 request we send is processed first time // Test a simple case where the v2 request we send is processed first time.
{ {
success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"},
}, },
}, },
// Test a simple case where the v1 request we send is processed first time // Test a simple case where the v1 request we send is processed first time.
{ {
success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Test a case where the v1 request has a temporary delay but goes through on retry // Test a case where the v1 request has a temporary delay but goes through on retry.
// There is no content re-negotiation between first and retry attempts // There is no content re-negotiation between first and retry attempts.
{ {
success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"},
}, },
}, },
// Repeat the above test but with v2. The request has a temporary delay but goes through on retry // Repeat the above test but with v2. The request has a temporary delay but goes through on retry.
// There is no content re-negotiation between first and retry attempts // There is no content re-negotiation between first and retry attempts.
{ {
success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"},
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"},
}, },
}, },
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade // Now test where the server suddenly stops speaking 2.0 and we need to downgrade.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400 // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server flip flops between "2.0;snappy" and "0.1.0" only // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only.
{ {
success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"},
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
// There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries) // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries).
}, },
}, },
} }
@ -138,8 +138,6 @@ func TestContentNegotiation(t *testing.T) {
// We need to set URL's so that metric creation doesn't panic. // We need to set URL's so that metric creation doesn't panic.
writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = queueConfig writeConfig.QueueConfig = queueConfig
writeConfig.SendExemplars = true // ALEXG - need?
writeConfig.SendNativeHistograms = true // ALEXG - need?
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -175,7 +173,7 @@ func TestContentNegotiation(t *testing.T) {
qm := s.rws.queues[hash] qm := s.rws.queues[hash]
c := NewTestWriteClient(tc.rwFormat) c := NewTestWriteClient(tc.rwFormat)
c.setSteps(tc.steps) // set expected behaviour c.setSteps(tc.steps) // set expected behaviour.
qm.SetClient(c) qm.SetClient(c)
qm.StoreSeries(series, 0) qm.StoreSeries(series, 0)
@ -188,12 +186,12 @@ func TestContentNegotiation(t *testing.T) {
qm.Append(samples) qm.Append(samples)
if !tc.success { if !tc.success {
// We just need to sleep for a bit to give it time to run // We just need to sleep for a bit to give it time to run.
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
// But we still need to check for data with no delay to avoid race // But we still need to check for data with no delay to avoid race.
c.waitForExpectedData(t, 0*time.Second) c.waitForExpectedData(t, 0*time.Second)
} else { } else {
// We expected data so wait for it // We expected data so wait for it.
c.waitForExpectedData(t, 5*time.Second) c.waitForExpectedData(t, 5*time.Second)
} }
@ -1253,11 +1251,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression) attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression)
if attemptNos > 0 { if attemptNos > 0 {
// If this is a second attempt then we need to bump to the next step otherwise we loop // If this is a second attempt then we need to bump to the next step otherwise we loop.
c.currstep++ c.currstep++
} }
// Check if we've been told to return something for this config // Check if we've been told to return something for this config.
if len(c.steps) > 0 { if len(c.steps) > 0 {
if err = c.steps[c.currstep].behaviour; err != nil { if err = c.steps[c.currstep].behaviour; err != nil {
c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err))

View file

@ -152,7 +152,6 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// We know this header, woo. // We know this header, woo.
default: default:
// We have a version in the header but it is not one we recognise. // We have a version in the header but it is not one we recognise.
// TODO(alexg) - make a proper error for this?
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer)
// Return a 406 so that the client can choose a more appropriate protocol to use. // Return a 406 so that the client can choose a more appropriate protocol to use.
http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable)

View file

@ -196,7 +196,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
j := 0 j := 0
k := 0 k := 0
// the reduced write request is equivalent to the write request fixture. // the reduced write request is equivalent to the write request fixture.
// we can use it for. // we can use it for
for _, ts := range writeRequestMinimizedFixture.Timeseries { for _, ts := range writeRequestMinimizedFixture.Timeseries {
ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols) ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols)
for _, s := range ts.Samples { for _, s := range ts.Samples {