Skip to content
Snippets Groups Projects
Commit 10669f50 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: fixed caching bug in objectstorage

parent 8981b8c0
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ package ledgerstate
import (
"reflect"
"sort"
"time"
"golang.org/x/crypto/blake2b"
......@@ -20,9 +21,9 @@ type LedgerState struct {
func NewLedgerState(storageId string) *LedgerState {
result := &LedgerState{
storageId: []byte(storageId),
transferOutputs: objectstorage.New(storageId+"TRANSFER_OUTPUTS", transferOutputFactory),
transferOutputBookings: objectstorage.New(storageId+"TRANSFER_OUTPUT_BOOKING", transferOutputBookingFactory),
realities: objectstorage.New(storageId+"REALITIES", realityFactory),
transferOutputs: objectstorage.New(storageId+"TRANSFER_OUTPUTS", transferOutputFactory, objectstorage.CacheTime(1*time.Second)),
transferOutputBookings: objectstorage.New(storageId+"TRANSFER_OUTPUT_BOOKING", transferOutputBookingFactory, objectstorage.CacheTime(1*time.Second)),
realities: objectstorage.New(storageId+"REALITIES", realityFactory, objectstorage.CacheTime(1*time.Second)),
}
mainReality := newReality(MAIN_REALITY_ID)
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"strconv"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/objectstorage"
)
......@@ -29,11 +28,13 @@ func Benchmark(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ledgerState.BookTransfer(NewTransfer(NewTransferHash(strconv.Itoa(i))).AddInput(
if err := ledgerState.BookTransfer(NewTransfer(NewTransferHash(strconv.Itoa(i))).AddInput(
NewTransferOutputReference(transferHash1, addressHash1),
).AddOutput(
addressHash3, NewColoredBalance(eth, 1337),
))
)); err != nil {
b.Error(err)
}
}
}
......@@ -56,8 +57,11 @@ func Test(t *testing.T) {
addressHash4, NewColoredBalance(eth, 1000),
)
ledgerState.BookTransfer(transfer)
ledgerState.BookTransfer(NewTransfer(transferHash3).AddInput(
if err := ledgerState.BookTransfer(transfer); err != nil {
t.Error(err)
}
if err := ledgerState.BookTransfer(NewTransfer(transferHash3).AddInput(
NewTransferOutputReference(transferHash1, addressHash1),
).AddOutput(
addressHash3, NewColoredBalance(iota, 338),
......@@ -67,9 +71,9 @@ func Test(t *testing.T) {
addressHash4, NewColoredBalance(iota, 1000),
).AddOutput(
addressHash4, NewColoredBalance(eth, 1000),
))
time.Sleep(100 * time.Millisecond)
)); err != nil {
t.Error(err)
}
objectstorage.WaitForWritesToFlush()
......
......@@ -57,11 +57,12 @@ func (reality *Reality) GetParentRealities() map[RealityId]*objectstorage.Cached
parentRealities := make(map[RealityId]*objectstorage.CachedObject)
for _, parentRealityId := range reality.parentRealities {
if loadedParentReality := reality.ledgerState.GetReality(parentRealityId); !loadedParentReality.Exists() {
loadedParentReality := reality.ledgerState.GetReality(parentRealityId)
if !loadedParentReality.Exists() {
panic("could not load parent reality with id \"" + string(parentRealityId[:]) + "\"")
} else {
parentRealities[loadedParentReality.Get().(*Reality).GetId()] = loadedParentReality
}
parentRealities[loadedParentReality.Get().(*Reality).GetId()] = loadedParentReality
}
return parentRealities
......@@ -132,18 +133,26 @@ func (reality *Reality) BookTransfer(transfer *Transfer) error {
func (reality *Reality) elevateTransferOutput(transferOutputReference *TransferOutputReference, newRealityId RealityId) error {
cachedTransferOutputToElevate := reality.ledgerState.GetTransferOutput(transferOutputReference)
defer cachedTransferOutputToElevate.Release()
if !cachedTransferOutputToElevate.Exists() {
return errors.New("could not find TransferOutput to elevate")
}
transferOutputToElevate := cachedTransferOutputToElevate.Get().(*TransferOutput)
if transferOutputToElevate.GetRealityId() == reality.id {
if err := transferOutputToElevate.moveToReality(newRealityId); err != nil {
panic(err)
return err
}
cachedTransferOutputToElevate.Store()
cachedTransferOutputToElevate.Release()
for transferHash, addresses := range transferOutputToElevate.GetConsumers() {
for _, addressHash := range addresses {
if err := reality.elevateTransferOutput(NewTransferOutputReference(transferHash, addressHash), newRealityId); err != nil {
return err
}
}
}
} else {
fmt.Println("ALREADY ELEVATED")
}
......
......@@ -64,6 +64,19 @@ func (transferOutput *TransferOutput) GetBalances() []*ColoredBalance {
return transferOutput.balances
}
func (transferOutput *TransferOutput) GetConsumers() (consumers map[TransferHash][]AddressHash) {
consumers = make(map[TransferHash][]AddressHash)
transferOutput.consumersMutex.RLock()
for transferHash, addresses := range transferOutput.consumers {
consumers[transferHash] = make([]AddressHash, len(addresses))
copy(consumers[transferHash], addresses)
}
transferOutput.consumersMutex.RUnlock()
return
}
func (transferOutput *TransferOutput) addConsumer(consumer TransferHash, outputs map[AddressHash][]*ColoredBalance) (isConflicting bool, consumersToElevate map[TransferHash][]AddressHash, err error) {
transferOutput.consumersMutex.RLock()
if _, exist := transferOutput.consumers[consumer]; exist {
......@@ -165,6 +178,9 @@ func (transferOutput *TransferOutput) GetStorageKey() []byte {
func (transferOutput *TransferOutput) Update(other objectstorage.StorableObject) {}
func (transferOutput *TransferOutput) MarshalBinary() ([]byte, error) {
transferOutput.realityIdMutex.RLock()
transferOutput.consumersMutex.RLock()
balanceCount := len(transferOutput.balances)
consumerCount := len(transferOutput.consumers)
......@@ -203,6 +219,9 @@ func (transferOutput *TransferOutput) MarshalBinary() ([]byte, error) {
}
}
transferOutput.consumersMutex.RUnlock()
transferOutput.realityIdMutex.RUnlock()
return result, nil
}
......
......@@ -80,7 +80,7 @@ func releaseObject(cachedObject *CachedObject) {
objectStorage := cachedObject.objectStorage
objectStorage.cacheMutex.Lock()
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 {
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 && cachedObject.value != nil {
delete(objectStorage.cachedObjects, string(cachedObject.value.GetStorageKey()))
}
objectStorage.cacheMutex.Unlock()
......
......@@ -4,6 +4,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
)
type CachedObject struct {
......@@ -17,6 +18,7 @@ type CachedObject struct {
delete int32
wg sync.WaitGroup
valueMutex sync.RWMutex
releaseTimer unsafe.Pointer
}
func newCachedObject(database *ObjectStorage) (result *CachedObject) {
......@@ -44,14 +46,18 @@ func (cachedObject *CachedObject) Get() (result StorableObject) {
func (cachedObject *CachedObject) Release() {
if consumers := atomic.AddInt32(&(cachedObject.consumers), -1); consumers == 0 {
if cachedObject.objectStorage.options.cacheTime != 0 {
time.AfterFunc(cachedObject.objectStorage.options.cacheTime, func() {
atomic.StorePointer(&cachedObject.releaseTimer, unsafe.Pointer(time.AfterFunc(cachedObject.objectStorage.options.cacheTime, func() {
atomic.StorePointer(&cachedObject.releaseTimer, nil)
if consumers := atomic.LoadInt32(&(cachedObject.consumers)); consumers == 0 {
batchWrite(cachedObject)
})
} else if consumers < 0 {
panic("called Release() too often")
}
})))
} else {
batchWrite(cachedObject)
}
} else if consumers < 0 {
panic("called Release() too often")
}
}
......@@ -81,6 +87,7 @@ func (cachedObject *CachedObject) IsDeleted() bool {
// Marks an object for being stored in the persistence layer.
func (cachedObject *CachedObject) Store() {
atomic.StoreInt32(&(cachedObject.delete), 0)
atomic.StoreInt32(&(cachedObject.stored), 0)
atomic.StoreInt32(&(cachedObject.store), 1)
}
......@@ -92,6 +99,10 @@ func (cachedObject *CachedObject) IsStored() bool {
// Registers a new consumer for this cached object.
func (cachedObject *CachedObject) RegisterConsumer() {
atomic.AddInt32(&(cachedObject.consumers), 1)
if timer := atomic.LoadPointer(&cachedObject.releaseTimer); timer != nil {
(*(*time.Timer)(timer)).Stop()
}
}
func (cachedObject *CachedObject) Exists() bool {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment