Skip to content

Commit 3168caa

Browse files
committed
[factory] implement versioned stateDB to support archive mode
1 parent 6ac9264 commit 3168caa

File tree

6 files changed

+144
-40
lines changed

6 files changed

+144
-40
lines changed

blockchain/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ type (
5252
EnableStateDBCaching bool `yaml:"enableStateDBCaching"`
5353
// EnableArchiveMode is only meaningful when EnableTrielessStateDB is false
5454
EnableArchiveMode bool `yaml:"enableArchiveMode"`
55+
// VersionedNamespaces specifies the versioned namespaces for versioned state DB
56+
VersionedNamespaces []string `yaml:"versionedNamespaces"`
57+
// VersionedMetadata specifies the metadata namespace for versioned state DB
58+
VersionedMetadata string `yaml:"versionedMetadata"`
5559
// EnableAsyncIndexWrite enables writing the block actions' and receipts' index asynchronously
5660
EnableAsyncIndexWrite bool `yaml:"enableAsyncIndexWrite"`
5761
// deprecated
@@ -107,6 +111,7 @@ var (
107111
GravityChainAPIs: []string{},
108112
},
109113
EnableTrielessStateDB: true,
114+
VersionedNamespaces: []string{},
110115
EnableStateDBCaching: false,
111116
EnableArchiveMode: false,
112117
EnableAsyncIndexWrite: true,

chainservice/builder.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) {
173173
factory.RegistryStateDBOption(builder.cs.registry),
174174
factory.DefaultPatchOption(),
175175
}
176-
if builder.cfg.Chain.EnableStateDBCaching {
176+
if builder.cfg.Chain.EnableArchiveMode {
177+
dao, err = db.CreateKVStoreVersioned(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.VersionedNamespaces)
178+
opts = append(opts, factory.MetadataNamespaceOption(builder.cfg.Chain.VersionedMetadata))
179+
} else if builder.cfg.Chain.EnableStateDBCaching {
177180
dao, err = db.CreateKVStoreWithCache(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.StateDBCacheSize)
178181
} else {
179182
dao, err = db.CreateKVStore(factoryDBCfg, builder.cfg.Chain.TrieDBPath)

db/builder.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,12 @@ func CreateKVStoreWithCache(cfg Config, dbPath string, cacheSize int) (KVStore,
3232

3333
return NewKvStoreWithCache(dao, cacheSize), nil
3434
}
35+
36+
// CreateKVStoreVersioned creates versioned db from config and db path
37+
func CreateKVStoreVersioned(cfg Config, dbPath string, vns []string) (KVStore, error) {
38+
if len(dbPath) == 0 {
39+
return nil, ErrEmptyDBPath
40+
}
41+
cfg.DbPath = dbPath
42+
return NewKVStoreWithVersion(cfg, VersionedNamespaceOption(vns...)), nil
43+
}

state/factory/statedb.go

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@ type stateDB struct {
4040
cfg Config
4141
registry *protocol.Registry
4242
dao db.KVStore // the underlying DB for account/contract storage
43+
daoVersioned db.KvVersioned
4344
timerFactory *prometheustimer.TimerFactory
4445
workingsets cache.LRUCache // lru cache for workingsets
4546
protocolView protocol.View
4647
skipBlockValidationOnPut bool
48+
versioned bool
49+
metaNS string // metadata namespace for versioned DB
4750
ps *patchStore
4851
}
4952

@@ -82,22 +85,39 @@ func DisableWorkingSetCacheOption() StateDBOption {
8285
}
8386
}
8487

88+
// MetadataNamespaceOption specifies the metadat namespace for versioned DB
89+
func MetadataNamespaceOption(ns string) StateDBOption {
90+
return func(sdb *stateDB, cfg *Config) error {
91+
sdb.metaNS = ns
92+
return nil
93+
}
94+
}
95+
8596
// NewStateDB creates a new state db
8697
func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, error) {
8798
sdb := stateDB{
8899
cfg: cfg,
89100
currentChainHeight: 0,
101+
versioned: cfg.Chain.EnableArchiveMode,
90102
registry: protocol.NewRegistry(),
91103
protocolView: protocol.View{},
92104
workingsets: cache.NewThreadSafeLruCache(int(cfg.Chain.WorkingSetCacheSize)),
93-
dao: dao,
94105
}
95106
for _, opt := range opts {
96107
if err := opt(&sdb, &cfg); err != nil {
97108
log.S().Errorf("Failed to execute state factory creation option %p: %v", opt, err)
98109
return nil, err
99110
}
100111
}
112+
if sdb.versioned {
113+
daoVersioned, ok := dao.(db.KvVersioned)
114+
if !ok {
115+
return nil, errors.Wrap(ErrNotSupported, "cannot enable archive mode StateDB with non-versioned DB")
116+
}
117+
sdb.daoVersioned = daoVersioned
118+
} else {
119+
sdb.dao = dao
120+
}
101121
timerFactory, err := prometheustimer.New(
102122
"iotex_statefactory_perf",
103123
"Performance of state factory module",
@@ -111,23 +131,30 @@ func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, err
111131
return &sdb, nil
112132
}
113133

134+
func (sdb *stateDB) DAO(height uint64) db.KVStore {
135+
if sdb.versioned {
136+
return sdb.daoVersioned.SetVersion(height)
137+
}
138+
return sdb.dao
139+
}
140+
114141
func (sdb *stateDB) Start(ctx context.Context) error {
115142
ctx = protocol.WithRegistry(ctx, sdb.registry)
116-
if err := sdb.dao.Start(ctx); err != nil {
143+
if err := sdb.DAO(0).Start(ctx); err != nil {
117144
return err
118145
}
119146
// check factory height
120-
h, err := sdb.dao.Get(AccountKVNamespace, []byte(CurrentHeightKey))
147+
h, err := sdb.getHeight()
121148
switch errors.Cause(err) {
122149
case nil:
123-
sdb.currentChainHeight = byteutil.BytesToUint64(h)
150+
sdb.currentChainHeight = h
124151
// start all protocols
125152
if sdb.protocolView, err = sdb.registry.StartAll(ctx, sdb); err != nil {
126153
return err
127154
}
128155
case db.ErrNotExist:
129156
sdb.currentChainHeight = 0
130-
if err = sdb.dao.Put(AccountKVNamespace, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(0)); err != nil {
157+
if err = sdb.putHeight(0); err != nil {
131158
return errors.Wrap(err, "failed to init statedb's height")
132159
}
133160
// start all protocols
@@ -158,24 +185,46 @@ func (sdb *stateDB) Stop(ctx context.Context) error {
158185
sdb.mutex.Lock()
159186
defer sdb.mutex.Unlock()
160187
sdb.workingsets.Clear()
161-
return sdb.dao.Stop(ctx)
188+
return sdb.DAO(0).Stop(ctx)
162189
}
163190

164191
// Height returns factory's height
165192
func (sdb *stateDB) Height() (uint64, error) {
166193
sdb.mutex.RLock()
167194
defer sdb.mutex.RUnlock()
168-
height, err := sdb.dao.Get(AccountKVNamespace, []byte(CurrentHeightKey))
195+
return sdb.getHeight()
196+
}
197+
198+
func (sdb *stateDB) getHeight() (uint64, error) {
199+
height, err := sdb.DAO(0).Get(sdb.metadataNS(), []byte(CurrentHeightKey))
169200
if err != nil {
170201
return 0, errors.Wrap(err, "failed to get factory's height from underlying DB")
171202
}
172203
return byteutil.BytesToUint64(height), nil
173204
}
174205

206+
func (sdb *stateDB) putHeight(h uint64) error {
207+
return sdb.DAO(h).Put(sdb.metadataNS(), []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
208+
}
209+
210+
func (sdb *stateDB) metadataNS() string {
211+
if sdb.versioned {
212+
return sdb.metaNS
213+
}
214+
return AccountKVNamespace
215+
}
216+
217+
func (sdb *stateDB) transCurrentHeight(wi *batch.WriteInfo) *batch.WriteInfo {
218+
if wi.Namespace() == sdb.metaNS && string(wi.Key()) == CurrentHeightKey && wi.WriteType() == batch.Put {
219+
return batch.NewWriteInfo(wi.WriteType(), AccountKVNamespace, wi.Key(), wi.Value(), wi.Error())
220+
}
221+
return wi
222+
}
223+
175224
func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingSet, error) {
176225
g := genesis.MustExtractGenesisContext(ctx)
177226
flusher, err := db.NewKVStoreFlusher(
178-
sdb.dao,
227+
sdb.DAO(height),
179228
batch.NewCachedBatch(),
180229
sdb.flusherOptions(!g.IsEaster(height))...,
181230
)
@@ -189,7 +238,7 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
189238
flusher.KVStoreWithBuffer().MustPut(p.Namespace, p.Key, p.Value)
190239
}
191240
}
192-
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height))
241+
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height), sdb.metadataNS())
193242
if err := store.Start(ctx); err != nil {
194243
return nil, err
195244
}
@@ -267,7 +316,6 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
267316
}
268317

269318
func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
270-
// TODO: implement archive mode
271319
return sdb.newWorkingSet(ctx, height)
272320
}
273321

@@ -327,12 +375,13 @@ func (sdb *stateDB) State(s interface{}, opts ...protocol.StateOption) (uint64,
327375
if err != nil {
328376
return 0, err
329377
}
330-
sdb.mutex.RLock()
331-
defer sdb.mutex.RUnlock()
332378
if cfg.Keys != nil {
333379
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
334380
}
335-
return sdb.currentChainHeight, sdb.state(cfg.Namespace, cfg.Key, s)
381+
sdb.mutex.RLock()
382+
height := sdb.currentChainHeight
383+
sdb.mutex.RUnlock()
384+
return height, sdb.state(height, cfg.Namespace, cfg.Key, s)
336385
}
337386

338387
// State returns a set of states in the state factory
@@ -346,7 +395,7 @@ func (sdb *stateDB) States(opts ...protocol.StateOption) (uint64, state.Iterator
346395
if cfg.Key != nil {
347396
return sdb.currentChainHeight, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet")
348397
}
349-
keys, values, err := readStates(sdb.dao, cfg.Namespace, cfg.Keys)
398+
keys, values, err := readStates(sdb.DAO(sdb.currentChainHeight), cfg.Namespace, cfg.Keys)
350399
if err != nil {
351400
return 0, nil, err
352401
}
@@ -358,28 +407,23 @@ func (sdb *stateDB) States(opts ...protocol.StateOption) (uint64, state.Iterator
358407
return sdb.currentChainHeight, iter, nil
359408
}
360409

361-
// StateAtHeight returns a confirmed state at height -- archive mode
362-
func (sdb *stateDB) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
363-
return ErrNotSupported
364-
}
365-
366-
// StatesAtHeight returns a set states in the state factory at height -- archive mode
367-
func (sdb *stateDB) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
368-
return nil, errors.Wrap(ErrNotSupported, "state db does not support archive mode")
369-
}
370-
371410
// ReadView reads the view
372411
func (sdb *stateDB) ReadView(name string) (interface{}, error) {
373412
return sdb.protocolView.Read(name)
374413
}
375414

376415
//======================================
377-
// private trie constructor functions
416+
// private statedb functions
378417
//======================================
379418

380419
func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
381420
opts := []db.KVStoreFlusherOption{
382421
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
422+
if sdb.versioned {
423+
// current height is moved to another namespace
424+
// transform it back for the purpose of calculating digest
425+
wi = sdb.transCurrentHeight(wi)
426+
}
383427
if preEaster {
384428
return wi.SerializeWithoutWriteType()
385429
}
@@ -397,8 +441,8 @@ func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
397441
)
398442
}
399443

400-
func (sdb *stateDB) state(ns string, addr []byte, s interface{}) error {
401-
data, err := sdb.dao.Get(ns, addr)
444+
func (sdb *stateDB) state(h uint64, ns string, addr []byte, s interface{}) error {
445+
data, err := sdb.DAO(h).Get(ns, addr)
402446
if err != nil {
403447
if errors.Cause(err) == db.ErrNotExist {
404448
return errors.Wrapf(state.ErrStateNotExist, "state of %x doesn't exist", addr)

state/factory/workingsetstore_statedb.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ import (
1919
type stateDBWorkingSetStore struct {
2020
*workingSetStoreCommon
2121
readBuffer bool
22+
metaNS string // metadata namespace for versioned DB
2223
}
2324

24-
func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool) workingSetStore {
25+
func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool, ns string) workingSetStore {
2526
return &stateDBWorkingSetStore{
2627
workingSetStoreCommon: &workingSetStoreCommon{
2728
flusher: flusher,
2829
view: view,
2930
},
3031
readBuffer: readBuffer,
32+
metaNS: ns,
3133
}
3234
}
3335

@@ -61,7 +63,7 @@ func (store *stateDBWorkingSetStore) States(ns string, keys [][]byte) ([][]byte,
6163
func (store *stateDBWorkingSetStore) Finalize(height uint64) error {
6264
// Persist current chain Height
6365
store.flusher.KVStoreWithBuffer().MustPut(
64-
AccountKVNamespace,
66+
store.metaNS,
6567
[]byte(CurrentHeightKey),
6668
byteutil.Uint64ToBytes(height),
6769
)

state/factory/workingsetstore_test.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,35 @@ import (
1212
"testing"
1313

1414
"github.com/iotexproject/iotex-core/v2/action/protocol"
15+
"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
1516
"github.com/iotexproject/iotex-core/v2/db"
1617
"github.com/iotexproject/iotex-core/v2/db/batch"
1718
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
1819
"github.com/stretchr/testify/require"
1920
)
2021

22+
var (
23+
name = "name"
24+
viewValue = "value"
25+
namespace = "namespace"
26+
key1 = []byte("key1")
27+
value1 = []byte("value1")
28+
key2 = []byte("key2")
29+
value2 = []byte("value2")
30+
key3 = []byte("key3")
31+
value3 = []byte("value3")
32+
)
33+
2134
func TestStateDBWorkingSetStore(t *testing.T) {
2235
require := require.New(t)
2336
ctx := context.Background()
2437
view := protocol.View{}
2538
inMemStore := db.NewMemKVStore()
2639
flusher, err := db.NewKVStoreFlusher(inMemStore, batch.NewCachedBatch())
2740
require.NoError(err)
28-
store := newStateDBWorkingSetStore(view, flusher, true)
41+
store := newStateDBWorkingSetStore(view, flusher, true, AccountKVNamespace)
2942
require.NotNil(store)
3043
require.NoError(store.Start(ctx))
31-
name := "name"
32-
viewValue := "value"
33-
namespace := "namespace"
34-
key1 := []byte("key1")
35-
value1 := []byte("value1")
36-
key2 := []byte("key2")
37-
value2 := []byte("value2")
38-
key3 := []byte("key3")
39-
value3 := []byte("value3")
4044
t.Run("test view", func(t *testing.T) {
4145
_, err := store.ReadView(name)
4246
require.Error(err)
@@ -109,6 +113,43 @@ func TestStateDBWorkingSetStore(t *testing.T) {
109113
require.NoError(store.Stop(ctx))
110114
}
111115

116+
func TestVersionedWorkingSetStore(t *testing.T) {
117+
r := require.New(t)
118+
var (
119+
mns = "mta"
120+
stores = []workingSetStore{}
121+
digest = [2]string{
122+
"e6958faedcc37528dad9ac99f5e6613fbefbf403a06fe962535225d42a27b189",
123+
"bb262ac0603e48aa737f5eb42014f481cb54d831c14fe736b8f61b69e5b4924a",
124+
}
125+
)
126+
for _, preEaster := range []bool{false, true} {
127+
for _, versioned := range []bool{true, false} {
128+
for _, ns := range []string{mns, "test1", "test can pass with any string here"} {
129+
sdb := stateDB{
130+
versioned: versioned,
131+
metaNS: ns,
132+
}
133+
flusher, err := db.NewKVStoreFlusher(db.NewMemKVStore(), batch.NewCachedBatch(), sdb.flusherOptions(preEaster)...)
134+
r.NoError(err)
135+
stores = append(stores, newStateDBWorkingSetStore(nil, flusher, true, sdb.metadataNS()))
136+
}
137+
}
138+
}
139+
for i, store := range stores {
140+
r.NotNil(store)
141+
r.NoError(store.Put(namespace, key1, value1))
142+
r.NoError(store.Put(namespace, key2, value2))
143+
r.NoError(store.Put(namespace, []byte(CurrentHeightKey), value3))
144+
r.NoError(store.Put(mns, key1, value1))
145+
r.NoError(store.Delete(namespace, key2))
146+
r.NoError(store.Put(evm.CodeKVNameSpace, key3, value1))
147+
r.NoError(store.Finalize(3))
148+
h := store.Digest()
149+
r.Equal(digest[i/6], hex.EncodeToString(h[:]))
150+
}
151+
}
152+
112153
func TestFactoryWorkingSetStore(t *testing.T) {
113154
// TODO: add unit test for factory working set store
114155
}

0 commit comments

Comments
 (0)