Fix various races

This commit is contained in:
Fabian Reinartz 2017-09-08 08:48:19 +02:00
parent c2916736be
commit 1d5f85817d
2 changed files with 31 additions and 18 deletions

35
head.go
View file

@ -402,9 +402,11 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if s == nil { if s == nil {
return errors.Wrap(ErrNotFound, "unknown series") return errors.Wrap(ErrNotFound, "unknown series")
} }
s.Lock()
if err := s.appendable(t, v); err != nil { if err := s.appendable(t, v); err != nil {
return err return err
} }
s.Unlock()
if t < a.mint { if t < a.mint {
return ErrOutOfBounds return ErrOutOfBounds
@ -435,7 +437,10 @@ func (a *headAppender) Commit() error {
total := len(a.samples) total := len(a.samples)
for _, s := range a.samples { for _, s := range a.samples {
s.series.Lock()
ok, chunkCreated := s.series.append(s.T, s.V) ok, chunkCreated := s.series.append(s.T, s.V)
s.series.Unlock()
if !ok { if !ok {
total-- total--
} }
@ -672,9 +677,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
s := h.head.series.getByID(sid) s := h.head.series.getByID(sid)
s.mtx.RLock() s.Lock()
c := s.chunk(int(cid)) c := s.chunk(int(cid))
s.mtx.RUnlock() s.Unlock()
// Do not expose chunks that are outside of the specified range. // Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
@ -694,9 +699,10 @@ type safeChunk struct {
} }
func (c *safeChunk) Iterator() chunks.Iterator { func (c *safeChunk) Iterator() chunks.Iterator {
c.s.mtx.RLock() c.s.Lock()
defer c.s.mtx.RUnlock() it := c.s.iterator(c.cid)
return c.s.iterator(c.cid) c.s.Unlock()
return it
} }
// func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") }
@ -803,8 +809,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
} }
*lbls = append((*lbls)[:0], s.lset...) *lbls = append((*lbls)[:0], s.lset...)
s.mtx.RLock() s.Lock()
defer s.mtx.RUnlock() defer s.Unlock()
*chks = (*chks)[:0] *chks = (*chks)[:0]
@ -956,11 +962,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
for hash, all := range s.hashes[i] { for hash, all := range s.hashes[i] {
for _, series := range all { for _, series := range all {
series.mtx.Lock() series.Lock()
rmChunks += series.truncateChunksBefore(mint) rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 { if len(series.chunks) > 0 {
series.mtx.Unlock() series.Unlock()
continue continue
} }
@ -983,7 +989,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
s.locks[j].Unlock() s.locks[j].Unlock()
} }
series.mtx.Unlock() series.Unlock()
} }
} }
@ -1040,8 +1046,10 @@ type sample struct {
v float64 v float64
} }
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and its the callers responsibility to lock it.
type memSeries struct { type memSeries struct {
mtx sync.RWMutex sync.Mutex
ref uint64 ref uint64
lset labels.Labels lset labels.Labels
@ -1143,8 +1151,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
const samplesPerChunk = 120 const samplesPerChunk = 120
s.mtx.Lock()
c := s.head() c := s.head()
if c == nil { if c == nil {
@ -1152,7 +1158,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
chunkCreated = true chunkCreated = true
} }
if c.maxTime >= t { if c.maxTime >= t {
s.mtx.Unlock()
return false, chunkCreated return false, chunkCreated
} }
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
@ -1175,8 +1180,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v} s.sampleBuf[3] = sample{t: t, v: v}
s.mtx.Unlock()
return true, chunkCreated return true, chunkCreated
} }

14
wal.go
View file

@ -398,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error {
buf := w.getBuffer() buf := w.getBuffer()
flag := w.encodeSeries(buf, series) flag := w.encodeSeries(buf, series)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntrySeries, flag, buf.get()) err := w.write(WALEntrySeries, flag, buf.get())
w.putBuffer(buf) w.putBuffer(buf)
@ -425,6 +429,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
buf := w.getBuffer() buf := w.getBuffer()
flag := w.encodeSamples(buf, samples) flag := w.encodeSamples(buf, samples)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntrySamples, flag, buf.get()) err := w.write(WALEntrySamples, flag, buf.get())
w.putBuffer(buf) w.putBuffer(buf)
@ -451,6 +459,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
buf := w.getBuffer() buf := w.getBuffer()
flag := w.encodeDeletes(buf, stones) flag := w.encodeDeletes(buf, stones)
w.mtx.Lock()
defer w.mtx.Unlock()
err := w.write(WALEntryDeletes, flag, buf.get()) err := w.write(WALEntryDeletes, flag, buf.get())
w.putBuffer(buf) w.putBuffer(buf)
@ -661,8 +673,6 @@ const (
) )
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
w.mtx.Lock()
defer w.mtx.Unlock()
// Cut to the next segment if the entry exceeds the file size unless it would also // Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment. // exceed the size of a new segment.
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.