Skip to content
Snippets Groups Projects
Commit c4452efa authored by Hans Moog's avatar Hans Moog
Browse files

Feat: objectStorage uses batch writing

parent 97c6d957
No related branches found
No related tags found
No related merge requests found
......@@ -103,7 +103,9 @@ func (ledgerState *LedgerState) BookTransfer(transfer *Transfer) {
targetReality := ledgerState.getTargetReality(inputs)
targetReality.Get().(*Reality).bookTransfer(transfer.GetHash(), inputs, transfer.GetOutputs())
targetReality.Persist()
if !targetReality.IsStored() {
targetReality.Store()
}
targetReality.Release()
}
......
......@@ -20,6 +20,28 @@ var (
conflictingReality = NewRealityId("CONFLICTING")
)
func Benchmark(b *testing.B) {
ledgerState := NewLedgerState("testLedger").Prune().AddTransferOutput(
transferHash1, addressHash1, NewColoredBalance(eth, 1337), NewColoredBalance(iota, 1338),
)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ledgerState.BookTransfer(NewTransfer(transferHash2).AddInput(
NewTransferOutputReference(transferHash1, addressHash1),
).AddOutput(
addressHash3, NewColoredBalance(iota, 338),
).AddOutput(
addressHash3, NewColoredBalance(eth, 337),
).AddOutput(
addressHash4, NewColoredBalance(iota, 1000),
).AddOutput(
addressHash4, NewColoredBalance(eth, 1000),
))
}
}
func Test(t *testing.T) {
ledgerState := NewLedgerState("testLedger").Prune().AddTransferOutput(
transferHash1, addressHash1, NewColoredBalance(eth, 1337), NewColoredBalance(iota, 1338),
......
......@@ -2,7 +2,6 @@ package ledgerstate
import (
"encoding/binary"
"fmt"
"github.com/iotaledger/goshimmer/packages/errors"
......@@ -89,7 +88,12 @@ func (reality *Reality) checkTransferBalances(inputs []*objectstorage.CachedObje
return errors.New("missing input in transfer")
}
for _, balance := range cachedInput.Get().(*TransferOutput).GetBalances() {
transferOutput := cachedInput.Get().(*TransferOutput)
if !reality.DescendsFromReality(transferOutput.GetRealityId()) {
return errors.New("the referenced funds do not exist in this reality")
}
for _, balance := range transferOutput.GetBalances() {
totalColoredBalances[balance.GetColor()] += balance.GetValue()
}
}
......@@ -119,23 +123,20 @@ func (reality *Reality) BookTransfer(transfer *Transfer) error {
}
func (reality *Reality) bookTransfer(transferHash TransferHash, inputs []*objectstorage.CachedObject, outputs map[AddressHash][]*ColoredBalance) error {
// check if transfer is valid within this reality
if err := reality.checkTransferBalances(inputs, outputs); err != nil {
return err
}
// 1. determine target reality
// 2. check if transfer is valid within target reality
// 3. book new transfer outputs into target reality
// 4. mark inputs as spent / trigger double spend detection
// 5. release objects
// book new transfer outputs into target reality
reality.bookTransferOutputs(transferHash, outputs)
// register consumer
// mark inputs as spent / trigger double spend detection
for _, x := range inputs {
x.Get().(*TransferOutput).addConsumer(transferHash)
}
reality.bookTransferOutputs(transferHash, outputs)
fmt.Println("BOOK TO: ", reality)
// 5. release objects
return nil
}
......
package objectstorage
import (
"sync"
"sync/atomic"
"time"
"github.com/dgraph-io/badger"
"github.com/iotaledger/goshimmer/packages/daemon"
)
var writeWg sync.WaitGroup
var startStopMutex sync.Mutex
var running int32 = 0
var batchQueue = make(chan *CachedObject, BATCH_WRITER_QUEUE_SIZE)
func StartBatchWriter() {
startStopMutex.Lock()
if atomic.LoadInt32(&running) == 0 {
atomic.StoreInt32(&running, 1)
go runBatchWriter()
}
startStopMutex.Unlock()
}
func StopBatchWriter() {
startStopMutex.Lock()
if atomic.LoadInt32(&running) != 0 {
atomic.StoreInt32(&running, 0)
writeWg.Wait()
}
startStopMutex.Unlock()
}
func batchWrite(object *CachedObject) {
if atomic.LoadInt32(&running) == 0 {
StartBatchWriter()
}
batchQueue <- object
}
func writeObject(writeBatch *badger.WriteBatch, cachedObject *CachedObject) {
objectStorage := cachedObject.objectStorage
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 && atomic.AddInt32(&(cachedObject.stored), 1) == 1 {
if cachedObject.value != nil {
if cachedObject.IsDeleted() {
if err := writeBatch.Delete(objectStorage.generatePrefix([][]byte{cachedObject.value.GetStorageKey()})); err != nil {
panic(err)
}
} else if atomic.LoadInt32(&(cachedObject.store)) == 1 {
marshaledObject, _ := cachedObject.value.MarshalBinary()
if err := writeBatch.Set(objectStorage.generatePrefix([][]byte{cachedObject.value.GetStorageKey()}), marshaledObject); err != nil {
panic(err)
}
}
}
} else if consumers < 0 {
panic("too many unregistered consumers of cached object")
}
}
func releaseObject(cachedObject *CachedObject) {
objectStorage := cachedObject.objectStorage
objectStorage.cacheMutex.Lock()
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 {
delete(objectStorage.cachedObjects, string(cachedObject.value.GetStorageKey()))
}
objectStorage.cacheMutex.Unlock()
}
func runBatchWriter() {
badgerInstance := GetBadgerInstance()
for atomic.LoadInt32(&running) == 1 {
writeWg.Add(1)
wb := badgerInstance.NewWriteBatch()
writtenValues := make([]*CachedObject, BATCH_WRITER_BATCH_SIZE)
writtenValuesCounter := 0
COLLECT_VALUES:
for writtenValuesCounter < BATCH_WRITER_BATCH_SIZE {
select {
case objectToPersist := <-batchQueue:
writeObject(wb, objectToPersist)
writtenValues[writtenValuesCounter] = objectToPersist
writtenValuesCounter++
case <-time.After(BATCH_WRITER_BATCH_TIMEOUT):
break COLLECT_VALUES
case <-daemon.ShutdownSignal:
break COLLECT_VALUES
}
}
if err := wb.Flush(); err != nil {
panic(err)
}
for _, cachedObject := range writtenValues {
if cachedObject != nil {
releaseObject(cachedObject)
}
}
writeWg.Done()
}
}
const (
BATCH_WRITER_QUEUE_SIZE = BATCH_WRITER_BATCH_SIZE
BATCH_WRITER_BATCH_SIZE = 1024
BATCH_WRITER_BATCH_TIMEOUT = 500 * time.Millisecond
)
......@@ -12,9 +12,9 @@ type CachedObject struct {
err error
consumers int32
published int32
persist int32
persisted int32
deleted int32
store int32
stored int32
delete int32
wg sync.WaitGroup
valueMutex sync.RWMutex
}
......@@ -29,8 +29,9 @@ func newCachedObject(database *ObjectStorage) (result *CachedObject) {
return
}
// Retrieves the StorableObject, that is cached in this container.
func (cachedObject *CachedObject) Get() (result StorableObject) {
if !cachedObject.isDeleted() {
if !cachedObject.IsDeleted() {
cachedObject.valueMutex.RLock()
result = cachedObject.value
cachedObject.valueMutex.RUnlock()
......@@ -39,18 +40,24 @@ func (cachedObject *CachedObject) Get() (result StorableObject) {
return
}
// Releases the object, to be picked up by the persistence layer (as soon as all consumers are done).
func (cachedObject *CachedObject) Release() {
if consumers := atomic.AddInt32(&(cachedObject.consumers), -1); consumers == 0 {
if cachedObject.objectStorage.options.cacheTime != 0 {
time.AfterFunc(cachedObject.objectStorage.options.cacheTime, cachedObject.release)
time.AfterFunc(cachedObject.objectStorage.options.cacheTime, func() {
batchWrite(cachedObject)
})
} else {
cachedObject.release()
batchWrite(cachedObject)
}
} else if consumers > 0 {
panic("called Release() too often")
}
}
// Directly consumes the StorableObject. This method automatically Release()s the object when the callback is done.
func (cachedObject *CachedObject) Consume(consumer func(object StorableObject)) {
if cachedObject.isDeleted() {
if cachedObject.IsDeleted() {
consumer(nil)
} else if cachedObject.Exists() {
consumer(cachedObject.Get())
......@@ -59,56 +66,38 @@ func (cachedObject *CachedObject) Consume(consumer func(object StorableObject))
cachedObject.Release()
}
// Marks an object for deletion in the persistence layer.
func (cachedObject *CachedObject) Delete() *CachedObject {
cachedObject.setDeleted(true)
atomic.StoreInt32(&(cachedObject.store), 0)
atomic.StoreInt32(&(cachedObject.stored), 0)
atomic.StoreInt32(&(cachedObject.delete), 1)
return cachedObject
}
func (cachedObject *CachedObject) Persist() {
atomic.StoreInt32(&(cachedObject.persist), 1)
// Returns true if this object is supposed to be deleted from the in the persistence layer (Delete() was called).
func (cachedObject *CachedObject) IsDeleted() bool {
return atomic.LoadInt32(&(cachedObject.delete)) == 1
}
func (cachedObject *CachedObject) RegisterConsumer() {
atomic.AddInt32(&(cachedObject.consumers), 1)
}
func (cachedObject *CachedObject) Exists() bool {
return cachedObject.Get() != nil
}
func (cachedObject *CachedObject) setDeleted(deleted bool) {
if deleted {
atomic.StoreInt32(&(cachedObject.deleted), 1)
} else {
atomic.StoreInt32(&(cachedObject.deleted), 0)
}
// Marks an object for being stored in the persistence layer.
func (cachedObject *CachedObject) Store() {
atomic.StoreInt32(&(cachedObject.delete), 0)
atomic.StoreInt32(&(cachedObject.store), 1)
}
func (cachedObject *CachedObject) isDeleted() bool {
return atomic.LoadInt32(&(cachedObject.deleted)) == 1
// Returns true if the object is either persisted already or is supposed to be persisted (Store() was called).
func (cachedObject *CachedObject) IsStored() bool {
return atomic.LoadInt32(&(cachedObject.stored)) == 1 || atomic.LoadInt32(&(cachedObject.store)) == 1
}
func (cachedObject *CachedObject) release() {
cachedObject.objectStorage.cacheMutex.Lock()
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 && atomic.AddInt32(&(cachedObject.persisted), 1) == 1 {
if cachedObject.value != nil {
if cachedObject.isDeleted() {
if err := cachedObject.objectStorage.deleteObjectFromBadger(cachedObject.value.GetStorageKey()); err != nil {
panic(err)
}
} else if atomic.LoadInt32(&(cachedObject.persist)) == 1 {
if err := cachedObject.objectStorage.persistObjectToBadger(cachedObject.value.GetStorageKey(), cachedObject.value); err != nil {
panic(err)
}
// Registers a new consumer for this cached object.
func (cachedObject *CachedObject) RegisterConsumer() {
atomic.AddInt32(&(cachedObject.consumers), 1)
}
delete(cachedObject.objectStorage.cachedObjects, string(cachedObject.value.GetStorageKey()))
}
} else if consumers < 0 {
panic("too many unregistered consumers of cached object")
}
cachedObject.objectStorage.cacheMutex.Unlock()
func (cachedObject *CachedObject) Exists() bool {
return cachedObject.Get() != nil
}
func (cachedObject *CachedObject) updateValue(value StorableObject) {
......
......@@ -26,38 +26,21 @@ func New(storageId string, objectFactory StorableObjectFactory, optionalOptions
}
func (objectStorage *ObjectStorage) Prepare(object StorableObject) *CachedObject {
return objectStorage.accessCache(object.GetStorageKey(), func(cachedObject *CachedObject) {
if !cachedObject.publishResult(object, nil) {
if currentValue := cachedObject.Get(); currentValue != nil {
currentValue.Update(object)
} else {
cachedObject.updateValue(object)
}
}
}, func(cachedObject *CachedObject) {
cachedObject.persist = 0
cachedObject.publishResult(object, nil)
})
return objectStorage.storeObjectInCache(object, false)
}
func (objectStorage *ObjectStorage) Store(object StorableObject) *CachedObject {
return objectStorage.accessCache(object.GetStorageKey(), func(cachedObject *CachedObject) {
if !cachedObject.publishResult(object, nil) {
if currentValue := cachedObject.Get(); currentValue != nil {
currentValue.Update(object)
} else {
cachedObject.updateValue(object)
}
}
}, func(cachedObject *CachedObject) {
cachedObject.persist = 1
cachedObject.publishResult(object, nil)
})
return objectStorage.storeObjectInCache(object, true)
}
func (objectStorage *ObjectStorage) Load(key []byte) (*CachedObject, error) {
return objectStorage.accessCache(key, nil, func(cachedObject *CachedObject) {
cachedObject.publishResult(objectStorage.loadObjectFromBadger(key))
loadedObject, err := objectStorage.loadObjectFromBadger(key)
if loadedObject != nil {
cachedObject.stored = 1
}
cachedObject.publishResult(loadedObject, err)
}).waitForResult()
}
......@@ -158,21 +141,21 @@ func (objectStorage *ObjectStorage) accessCache(key []byte, onCacheHit func(*Cac
return cachedObject
}
func (objectStorage *ObjectStorage) persistObjectToBadger(key []byte, value StorableObject) error {
if value != nil {
return objectStorage.badgerInstance.Update(func(txn *badger.Txn) error {
marshaledObject, _ := value.MarshalBinary()
return txn.Set(objectStorage.generatePrefix([][]byte{key}), marshaledObject)
})
func (objectStorage *ObjectStorage) storeObjectInCache(object StorableObject, persist bool) *CachedObject {
return objectStorage.accessCache(object.GetStorageKey(), func(cachedObject *CachedObject) {
if !cachedObject.publishResult(object, nil) {
if currentValue := cachedObject.Get(); currentValue != nil {
currentValue.Update(object)
} else {
cachedObject.updateValue(object)
}
return nil
}
}, func(cachedObject *CachedObject) {
if persist {
cachedObject.store = 1
}
func (objectStorage *ObjectStorage) deleteObjectFromBadger(key []byte) error {
return objectStorage.badgerInstance.Update(func(txn *badger.Txn) error {
return txn.Delete(objectStorage.generatePrefix([][]byte{key}))
cachedObject.publishResult(object, nil)
})
}
......
......@@ -15,12 +15,17 @@ func testObjectFactory(key []byte) objectstorage.StorableObject { return &TestOb
func BenchmarkStore(b *testing.B) {
// create our storage
objects := objectstorage.New("TestObjectStorage", testObjectFactory)
if err := objects.Prune(); err != nil {
b.Error(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
objects.Store(NewTestObject("Hans"+strconv.Itoa(i), uint32(i))).Release()
}
objectstorage.StopBatchWriter()
}
func BenchmarkLoad(b *testing.B) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment