Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/neofs-node/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
blobstorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor"
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/metachain/gas"
netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
Expand Down Expand Up @@ -254,6 +255,10 @@ func initMeta(c *cfg) {
c.sidechain.Stop()
})

var batchFlushInterval = blobstorconfig.DefaultFlushInterval
if c.appCfg.Storage.Default.Blobstor.FlushInterval > 0 {
batchFlushInterval = c.appCfg.Storage.Default.Blobstor.FlushInterval
}
c.metaService, err = meta.New(meta.Parameters{
Logger: c.log.With(zap.String("component", "metadata service")),
Chain: c.sidechain,
Expand All @@ -265,6 +270,7 @@ func initMeta(c *cfg) {
network: c.netMapSource,
header: c.cfgObject.getSvc,
},
FlushBatchInterval: batchFlushInterval,
})
fatalOnErr(err)

Expand Down
34 changes: 22 additions & 12 deletions pkg/services/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ type Meta struct {
bCh chan *block.Header
evsCh chan *state.ContainedNotificationEvent

taskQueue chan storageTask
batchFlushInterval time.Duration
taskQueue chan storageTask

metabase *metabase.DB
notifier *objectNotifier
Expand All @@ -174,10 +175,11 @@ const blockBuffSize = 10000
// Parameters groups arguments for [New] call. Logger, Chain and Network
// must not be nil, path must not be empty.
type Parameters struct {
Logger *zap.Logger
Chain MetaChain
Path string
Network NeoFSNetwork
Logger *zap.Logger
Chain MetaChain
Path string
Network NeoFSNetwork
FlushBatchInterval time.Duration
}

func validatePrm(p Parameters) error {
Expand All @@ -193,6 +195,9 @@ func validatePrm(p Parameters) error {
if p.Network == nil {
return errors.New("missing NeoFS network")
}
if p.FlushBatchInterval < 0 {
return fmt.Errorf("invalid flush batch threshold: %d, must be non-negative", p.FlushBatchInterval)
}

return nil
}
Expand Down Expand Up @@ -236,14 +241,19 @@ func New(p Parameters) (*Meta, error) {
return nil, fmt.Errorf("failed to init metabase: %w", err)
}

var flushInterval = p.FlushBatchInterval
if flushInterval == 0 {
flushInterval = 10 * time.Millisecond
}
m := &Meta{
l: p.Logger,
chain: p.Chain,
net: p.Network,
bCh: make(chan *block.Header, blockBuffSize),
evsCh: make(chan *state.ContainedNotificationEvent, notificationBuffSize),
taskQueue: make(chan storageTask, notificationBuffSize),
metabase: metaDB,
l: p.Logger,
chain: p.Chain,
net: p.Network,
bCh: make(chan *block.Header, blockBuffSize),
evsCh: make(chan *state.ContainedNotificationEvent, notificationBuffSize),
taskQueue: make(chan storageTask, notificationBuffSize),
metabase: metaDB,
batchFlushInterval: flushInterval,
}
notifier := newNotifier(m)
m.notifier = notifier
Expand Down
13 changes: 6 additions & 7 deletions pkg/services/meta/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (m *Meta) PutObjects(o []*object.Object) error {

func (m *Meta) storager(ctx context.Context, buff <-chan storageTask) {
ch := make(chan *object.Object, cap(buff))
go batchWriter(ctx, m.l, m.metabase, ch)
go batchWriter(ctx, m.l, m.batchFlushInterval, m.metabase, ch)

for {
if len(buff) >= notificationBuffSize-1 {
Expand Down Expand Up @@ -52,21 +52,20 @@ func (m *Meta) storager(ctx context.Context, buff <-chan storageTask) {
}
}

func batchWriter(ctx context.Context, l *zap.Logger, m *metabase.DB, buff <-chan *object.Object) {
func batchWriter(ctx context.Context, l *zap.Logger, batchFlushThreshold time.Duration, m *metabase.DB, buff <-chan *object.Object) {
const (
maxBuffSize = 100
maxBatchDelay = time.Second
maxBuffSize = 100
)
var (
t = time.NewTicker(maxBatchDelay)
t = time.NewTicker(batchFlushThreshold)
batch = make([]*object.Object, 0, maxBuffSize)
writeBatch = func() {
err := m.PutBatch(batch)
if err != nil {
l.Error("failed to put objects batch", zap.Int("batchSize", len(batch)), zap.Error(err))
}
batch = batch[:0]
t.Reset(maxBatchDelay)
t.Reset(batchFlushThreshold)
}
)

Expand All @@ -82,7 +81,7 @@ func batchWriter(ctx context.Context, l *zap.Logger, m *metabase.DB, buff <-chan
writeBatch()
case o := <-buff:
if len(batch) == 0 {
t.Reset(maxBatchDelay)
t.Reset(batchFlushThreshold)
}
batch = append(batch, o)
if len(batch) == maxBuffSize {
Expand Down
3 changes: 1 addition & 2 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (t *distributedTarget) Close() (oid.ID, error) {
defer func() {
putPayload(t.encodedObject.b)
t.encodedObject.b = nil
t.resetMetaCollection()
}()

t.obj.SetPayload(t.encodedObject.b[t.encodedObject.pldOff:])
Expand Down Expand Up @@ -297,8 +298,6 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)
}
}

t.resetMetaCollection()

getRuleIdx := func(i int) int {
if ruleOrder == nil {
return i
Expand Down
Loading