mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-07 11:57:27 -08:00
290 lines
7.6 KiB
Go
290 lines
7.6 KiB
Go
|
// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
|
||
|
// All rights reserved.
|
||
|
//
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file.
|
||
|
|
||
|
package leveldb
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||
|
"github.com/syndtr/goleveldb/leveldb/util"
|
||
|
)
|
||
|
|
||
|
var errTransactionDone = errors.New("leveldb: transaction already closed")
|
||
|
|
||
|
// Transaction is the transaction handle.
|
||
|
type Transaction struct {
|
||
|
db *DB
|
||
|
lk sync.RWMutex
|
||
|
seq uint64
|
||
|
mem *memDB
|
||
|
tables tFiles
|
||
|
ikScratch []byte
|
||
|
rec sessionRecord
|
||
|
stats cStatStaging
|
||
|
closed bool
|
||
|
}
|
||
|
|
||
|
// Get gets the value for the given key. It returns ErrNotFound if the
|
||
|
// DB does not contains the key.
|
||
|
//
|
||
|
// The returned slice is its own copy, it is safe to modify the contents
|
||
|
// of the returned slice.
|
||
|
// It is safe to modify the contents of the argument after Get returns.
|
||
|
func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
|
||
|
tr.lk.RLock()
|
||
|
defer tr.lk.RUnlock()
|
||
|
if tr.closed {
|
||
|
return nil, errTransactionDone
|
||
|
}
|
||
|
return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
|
||
|
}
|
||
|
|
||
|
// Has returns true if the DB does contains the given key.
|
||
|
//
|
||
|
// It is safe to modify the contents of the argument after Has returns.
|
||
|
func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
|
||
|
tr.lk.RLock()
|
||
|
defer tr.lk.RUnlock()
|
||
|
if tr.closed {
|
||
|
return false, errTransactionDone
|
||
|
}
|
||
|
return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
|
||
|
}
|
||
|
|
||
|
// NewIterator returns an iterator for the latest snapshot of the transaction.
|
||
|
// The returned iterator is not goroutine-safe, but it is safe to use multiple
|
||
|
// iterators concurrently, with each in a dedicated goroutine.
|
||
|
// It is also safe to use an iterator concurrently while writes to the
|
||
|
// transaction. The resultant key/value pairs are guaranteed to be consistent.
|
||
|
//
|
||
|
// Slice allows slicing the iterator to only contains keys in the given
|
||
|
// range. A nil Range.Start is treated as a key before all keys in the
|
||
|
// DB. And a nil Range.Limit is treated as a key after all keys in
|
||
|
// the DB.
|
||
|
//
|
||
|
// The iterator must be released after use, by calling Release method.
|
||
|
//
|
||
|
// Also read Iterator documentation of the leveldb/iterator package.
|
||
|
func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
||
|
tr.lk.RLock()
|
||
|
defer tr.lk.RUnlock()
|
||
|
if tr.closed {
|
||
|
return iterator.NewEmptyIterator(errTransactionDone)
|
||
|
}
|
||
|
tr.mem.incref()
|
||
|
return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
|
||
|
}
|
||
|
|
||
|
func (tr *Transaction) flush() error {
|
||
|
// Flush memdb.
|
||
|
if tr.mem.Len() != 0 {
|
||
|
tr.stats.startTimer()
|
||
|
iter := tr.mem.NewIterator(nil)
|
||
|
t, n, err := tr.db.s.tops.createFrom(iter)
|
||
|
iter.Release()
|
||
|
tr.stats.stopTimer()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if tr.mem.getref() == 1 {
|
||
|
tr.mem.Reset()
|
||
|
} else {
|
||
|
tr.mem.decref()
|
||
|
tr.mem = tr.db.mpoolGet(0)
|
||
|
tr.mem.incref()
|
||
|
}
|
||
|
tr.tables = append(tr.tables, t)
|
||
|
tr.rec.addTableFile(0, t)
|
||
|
tr.stats.write += t.size
|
||
|
tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (tr *Transaction) put(kt keyType, key, value []byte) error {
|
||
|
tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
|
||
|
if tr.mem.Free() < len(tr.ikScratch)+len(value) {
|
||
|
if err := tr.flush(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if err := tr.mem.Put(tr.ikScratch, value); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
tr.seq++
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Put sets the value for the given key. It overwrites any previous value
|
||
|
// for that key; a DB is not a multi-map.
|
||
|
// Please note that the transaction is not compacted until committed, so if you
|
||
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
||
|
//
|
||
|
// It is safe to modify the contents of the arguments after Put returns.
|
||
|
func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
|
||
|
tr.lk.Lock()
|
||
|
defer tr.lk.Unlock()
|
||
|
if tr.closed {
|
||
|
return errTransactionDone
|
||
|
}
|
||
|
return tr.put(keyTypeVal, key, value)
|
||
|
}
|
||
|
|
||
|
// Delete deletes the value for the given key.
|
||
|
// Please note that the transaction is not compacted until committed, so if you
|
||
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
||
|
//
|
||
|
// It is safe to modify the contents of the arguments after Delete returns.
|
||
|
func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
|
||
|
tr.lk.Lock()
|
||
|
defer tr.lk.Unlock()
|
||
|
if tr.closed {
|
||
|
return errTransactionDone
|
||
|
}
|
||
|
return tr.put(keyTypeDel, key, nil)
|
||
|
}
|
||
|
|
||
|
// Write apply the given batch to the transaction. The batch will be applied
|
||
|
// sequentially.
|
||
|
// Please note that the transaction is not compacted until committed, so if you
|
||
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
||
|
//
|
||
|
// It is safe to modify the contents of the arguments after Write returns.
|
||
|
func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
|
||
|
if b == nil || b.Len() == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
tr.lk.Lock()
|
||
|
defer tr.lk.Unlock()
|
||
|
if tr.closed {
|
||
|
return errTransactionDone
|
||
|
}
|
||
|
return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
|
||
|
return tr.put(kt, key, value)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (tr *Transaction) setDone() {
|
||
|
tr.closed = true
|
||
|
tr.db.tr = nil
|
||
|
tr.mem.decref()
|
||
|
<-tr.db.writeLockC
|
||
|
}
|
||
|
|
||
|
// Commit commits the transaction.
|
||
|
//
|
||
|
// Other methods should not be called after transaction has been committed.
|
||
|
func (tr *Transaction) Commit() error {
|
||
|
if err := tr.db.ok(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
tr.lk.Lock()
|
||
|
defer tr.lk.Unlock()
|
||
|
if tr.closed {
|
||
|
return errTransactionDone
|
||
|
}
|
||
|
defer tr.setDone()
|
||
|
if err := tr.flush(); err != nil {
|
||
|
tr.discard()
|
||
|
return err
|
||
|
}
|
||
|
if len(tr.tables) != 0 {
|
||
|
// Committing transaction.
|
||
|
tr.rec.setSeqNum(tr.seq)
|
||
|
tr.db.compCommitLk.Lock()
|
||
|
defer tr.db.compCommitLk.Unlock()
|
||
|
for retry := 0; retry < 3; retry++ {
|
||
|
if err := tr.db.s.commit(&tr.rec); err != nil {
|
||
|
tr.db.logf("transaction@commit error R·%d %q", retry, err)
|
||
|
select {
|
||
|
case <-time.After(time.Second):
|
||
|
case _, _ = <-tr.db.closeC:
|
||
|
tr.db.logf("transaction@commit exiting")
|
||
|
return err
|
||
|
}
|
||
|
} else {
|
||
|
// Success. Set db.seq.
|
||
|
tr.db.setSeq(tr.seq)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// Trigger table auto-compaction.
|
||
|
tr.db.compTrigger(tr.db.tcompCmdC)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (tr *Transaction) discard() {
|
||
|
// Discard transaction.
|
||
|
for _, t := range tr.tables {
|
||
|
tr.db.logf("transaction@discard @%d", t.fd.Num)
|
||
|
if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
|
||
|
tr.db.s.reuseFileNum(t.fd.Num)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Discard discards the transaction.
|
||
|
//
|
||
|
// Other methods should not be called after transaction has been discarded.
|
||
|
func (tr *Transaction) Discard() {
|
||
|
tr.lk.Lock()
|
||
|
if !tr.closed {
|
||
|
tr.discard()
|
||
|
tr.setDone()
|
||
|
}
|
||
|
tr.lk.Unlock()
|
||
|
}
|
||
|
|
||
|
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
|
||
|
// opened at a time. Write will be blocked until the transaction is committed or
|
||
|
// discarded.
|
||
|
// The returned transaction handle is goroutine-safe.
|
||
|
//
|
||
|
// The transaction must be closed once done, either by committing or discarding
|
||
|
// the transaction.
|
||
|
// Closing the DB will discard open transaction.
|
||
|
func (db *DB) OpenTransaction() (*Transaction, error) {
|
||
|
if err := db.ok(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// The write happen synchronously.
|
||
|
select {
|
||
|
case db.writeLockC <- struct{}{}:
|
||
|
case err := <-db.compPerErrC:
|
||
|
return nil, err
|
||
|
case _, _ = <-db.closeC:
|
||
|
return nil, ErrClosed
|
||
|
}
|
||
|
|
||
|
if db.tr != nil {
|
||
|
panic("leveldb: has open transaction")
|
||
|
}
|
||
|
|
||
|
// Flush current memdb.
|
||
|
if db.mem != nil && db.mem.Len() != 0 {
|
||
|
if _, err := db.rotateMem(0, true); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
tr := &Transaction{
|
||
|
db: db,
|
||
|
seq: db.seq,
|
||
|
mem: db.mpoolGet(0),
|
||
|
}
|
||
|
tr.mem.incref()
|
||
|
db.tr = tr
|
||
|
return tr, nil
|
||
|
}
|