From f074d70fe78c1672257d32dcc1c9c38e1382d928 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Thu, 4 Jun 2026 17:05:16 +0300 Subject: [PATCH] node: quic prototype Signed-off-by: Andrey Butusov --- cmd/neofs-node/config.go | 3 + cmd/neofs-node/object.go | 2 + cmd/neofs-node/quic.go | 65 ++++ go.mod | 1 + go.sum | 4 + internal/qstream/qstream.go | 77 +++++ pkg/core/client/client.go | 5 + pkg/network/cache/clients.go | 95 ++++++ pkg/services/object/get/remote.go | 8 +- pkg/services/object/put/service_test.go | 5 + pkg/services/object/server.go | 1 + pkg/services/object/server_quic.go | 378 ++++++++++++++++++++++++ pkg/services/object/server_test.go | 5 + 13 files changed, 647 insertions(+), 2 deletions(-) create mode 100644 cmd/neofs-node/quic.go create mode 100644 internal/qstream/qstream.go create mode 100644 pkg/services/object/server_quic.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index fc815bace0..a1364d9f85 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -50,6 +50,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" + "github.com/quic-go/quic-go" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/term" @@ -236,6 +237,8 @@ type cfgGRPC struct { listeners []net.Listener servers []*grpc.Server + quicListeners []*quic.Listener + // serviceRegistrators stores functions that register gRPC service // implementations into a gRPC server. serviceRegistrators []func(*grpc.Server) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index f9e948e4c0..4404585e6a 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -368,6 +368,8 @@ func initObjectService(c *cfg) { c.cfgGRPC.registerService(func(srv *grpc.Server) { srv.RegisterService(&svcDesc, server) }) + + initQUIC(c, server) } type reputationClientConstructor struct { diff --git a/cmd/neofs-node/quic.go b/cmd/neofs-node/quic.go new file mode 100644 index 0000000000..6d776ca839 --- /dev/null +++ b/cmd/neofs-node/quic.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "errors" + + "github.com/nspcc-dev/neofs-node/internal/qstream" + objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" + "github.com/quic-go/quic-go" + "go.uber.org/zap" +) + +func initQUIC(c *cfg, server *objectService.Server) { + for _, sc := range c.appCfg.GRPC { + endpoint := sc.Endpoint + + tlsCfg, err := qstream.ServerTLSConfig() + if err != nil { + c.log.Error("QUIC: failed to build TLS config", zap.String("endpoint", endpoint), zap.Error(err)) + continue + } + + ln, err := quic.ListenAddr(endpoint, tlsCfg, qstream.Config()) + if err != nil { + c.log.Error("QUIC: failed to listen", zap.String("endpoint", endpoint), zap.Error(err)) + continue + } + + c.cfgGRPC.quicListeners = append(c.cfgGRPC.quicListeners, ln) + c.log.Info("QUIC: start listening GET-stream endpoint", zap.String("endpoint", endpoint)) + + c.wg.Go(func() { + serveQUICListener(c, ln, server) + }) + } + + c.onShutdown(func() { + for _, ln := range c.cfgGRPC.quicListeners { + _ = ln.Close() + } + }) +} + +func serveQUICListener(c *cfg, ln *quic.Listener, server *objectService.Server) { + for { + conn, err := ln.Accept(context.Background()) + if err != nil { + if !errors.Is(err, quic.ErrServerClosed) { + c.log.Error("QUIC: accept connection failed", zap.Stringer("endpoint", ln.Addr()), zap.Error(err)) + } + return + } + go serveQUICConn(conn, server) + } +} + +func serveQUICConn(conn *quic.Conn, server *objectService.Server) { + for { + st, err := conn.AcceptStream(context.Background()) + if err != nil { + return + } + go server.ServeGetStream(st.Context(), st) + } +} diff --git a/go.mod b/go.mod index 5eddd7c285..f85cfdcfdc 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/nspcc-dev/tzhash v1.8.4 github.com/panjf2000/ants/v2 v2.11.5 github.com/prometheus/client_golang v1.23.2 + github.com/quic-go/quic-go v0.59.1 github.com/spf13/cast v1.10.0 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 diff --git a/go.sum b/go.sum index 6c6c2cba3c..44d45fb671 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/quic-go/quic-go v0.59.1 h1:0Gmua0HW1Tv7ANR7hUYwRyD0MG5OJfgvYSZasGZzBic= +github.com/quic-go/quic-go v0.59.1/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -294,6 +296,8 @@ go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= +go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= diff --git a/internal/qstream/qstream.go b/internal/qstream/qstream.go new file mode 100644 index 0000000000..1e3285e70a --- /dev/null +++ b/internal/qstream/qstream.go @@ -0,0 +1,77 @@ +package qstream + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "math/big" + "time" + + "github.com/quic-go/quic-go" +) + +// ALPN is the TLS application protocol negotiated by both ends of the raw GET +// stream. It must match on the node (listener) and on every client (S3 gateway +// and node-to-node forwarder). +const ALPN = "neofs-get-quic" + +const ( + StatusOK byte = 0 + StatusError byte = 1 +) + +const MaxRequestSize = 64 * 1024 + +func Config() *quic.Config { + return &quic.Config{ + MaxIncomingStreams: 1 << 16, + MaxIdleTimeout: 5 * time.Minute, + KeepAlivePeriod: 30 * time.Second, + } +} + +// ServerTLSConfig builds a server TLS config with an ephemeral self-signed +// certificate. +func ServerTLSConfig() (*tls.Config, error) { + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, fmt.Errorf("generate key: %w", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + NotBefore: time.Unix(0, 0), + NotAfter: time.Now().AddDate(10, 0, 0), + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key) + if err != nil { + return nil, fmt.Errorf("create certificate: %w", err) + } + keyDER, err := x509.MarshalPKCS8PrivateKey(key) + if err != nil { + return nil, fmt.Errorf("marshal key: %w", err) + } + cert, err := tls.X509KeyPair( + pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der}), + pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: keyDER}), + ) + if err != nil { + return nil, fmt.Errorf("build key pair: %w", err) + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{ALPN}, + MinVersion: tls.VersionTLS13, + }, nil +} + +func ClientTLSConfig() *tls.Config { + return &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // prototype: ephemeral self-signed server cert + NextProtos: []string{ALPN}, + MinVersion: tls.VersionTLS13, + } +} diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 01785963a5..7828dfe085 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -12,6 +12,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/quic-go/quic-go" "google.golang.org/grpc" ) @@ -50,4 +51,8 @@ type MultiAddressClient interface { // ForAnyGRPCConn continues. If this happens on all endpoints, ForAnyGRPCConn // returns [ErrAllConnectionsSkipped]. ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error + + // ForAnyQUICStream executes op over a QUIC connection to the multi-address + // endpoint-by-endpoint until success. + ForAnyQUICStream(context.Context, func(context.Context, *quic.Conn, string) error) error } diff --git a/pkg/network/cache/clients.go b/pkg/network/cache/clients.go index 06e961cd28..e0c2834c8a 100644 --- a/pkg/network/cache/clients.go +++ b/pkg/network/cache/clients.go @@ -10,13 +10,16 @@ import ( "iter" "maps" "slices" + "strings" "sync" "time" + "github.com/nspcc-dev/neofs-node/internal/qstream" "github.com/nspcc-dev/neofs-node/internal/uriutil" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -24,6 +27,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/quic-go/quic-go" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -197,6 +201,7 @@ func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Se log: x.log.With(zap.String("SN public key", hexKey)), nodeID: hexKey, m: m, + mq: make(map[string]*quic.Conn), }, nil } @@ -261,6 +266,7 @@ type connections struct { mtx sync.RWMutex m map[string]*client.Client // keys are multiaddrs + mq map[string]*quic.Conn // keys are multiaddrs; lazily dialed for the raw GET-over-QUIC prototype } func (x *connections) closeAll() { @@ -271,6 +277,95 @@ func (x *connections) closeAll() { } x.log.Info("connection successfully closed", zap.String("address", ma)) } + x.mtx.Lock() + for _, qc := range x.mq { + _ = qc.CloseWithError(0, "shutdown") + } + x.mq = make(map[string]*quic.Conn) + x.mtx.Unlock() +} + +func (x *connections) quicConn(ctx context.Context, ma string) (*quic.Conn, error) { + x.mtx.RLock() + qc := x.mq[ma] + x.mtx.RUnlock() + if qc != nil && qc.Context().Err() == nil { + return qc, nil + } + + var a network.Address + if err := a.FromString(ma); err != nil { + return nil, fmt.Errorf("parse network address %q: %w", ma, err) + } + addr := a.URIAddr() + if i := strings.Index(addr, "://"); i >= 0 { + addr = addr[i+3:] + } + + conn, err := quic.DialAddr(ctx, addr, qstream.ClientTLSConfig(), qstream.Config()) + if err != nil { + return nil, fmt.Errorf("dial QUIC %q: %w", addr, err) + } + + x.mtx.Lock() + if existing := x.mq[ma]; existing != nil && existing.Context().Err() == nil { + x.mtx.Unlock() + _ = conn.CloseWithError(0, "duplicate") + return existing, nil + } + x.mq[ma] = conn + x.mtx.Unlock() + return conn, nil +} + +// dropQUICConn removes and closes the cached QUIC connection for the given +// address so the next use re-dials it. +func (x *connections) dropQUICConn(ma string) { + x.mtx.Lock() + if c := x.mq[ma]; c != nil { + delete(x.mq, ma) + _ = c.CloseWithError(0, "drop on error") + } + x.mtx.Unlock() +} + +// quicForwardAttempts bounds how many times a QUIC forward to one endpoint is +// retried (with a fresh dial) on transport failures before moving on. +const quicForwardAttempts = 3 + +func (x *connections) ForAnyQUICStream(ctx context.Context, f func(context.Context, *quic.Conn, string) error) error { + var firstErr error + x.mtx.RLock() + addrs := slices.Collect(maps.Keys(x.m)) + x.mtx.RUnlock() + + for _, ma := range addrs { + var err error + for range quicForwardAttempts { + var conn *quic.Conn + conn, err = x.quicConn(ctx, ma) + if err == nil { + err = f(ctx, conn, ma) + if err == nil { + return nil + } + } + if errors.Is(err, apistatus.Error) { + break + } + x.dropQUICConn(ma) + if ctx.Err() != nil { + break + } + } + if errors.Is(err, apistatus.Error) && !errors.Is(err, apistatus.ErrObjectNotFound) { + return newEndpointError(ma, err) + } + if firstErr == nil { + firstErr = newEndpointError(ma, err) + } + } + return newMultiEndpointError(x.nodeID, firstErr) } func (x *connections) all(f func(ma string, c *client.Client) bool) { diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index fed6b11e18..04b8e7bc51 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -29,9 +29,13 @@ func (exec *execCtx) processNode(info netmap.NodeInfo) bool { switch { default: exec.status = statusUndefined - exec.err = apistatus.ErrObjectNotFound + exec.err = err - exec.log.Debug("remote call failed", nodeLog, zap.Error(err)) + if errors.Is(err, apistatus.ErrObjectNotFound) { + exec.log.Debug("remote node has no object", nodeLog) + } else { + exec.log.Warn("remote call failed", nodeLog, zap.Error(err)) + } case err == nil: exec.status = statusOK exec.err = nil diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index 549cd91b97..3dd042de81 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -51,6 +51,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" "github.com/panjf2000/ants/v2" + "github.com/quic-go/quic-go" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -1125,6 +1126,10 @@ func (m *serviceClient) ForAnyGRPCConn(context.Context, func(context.Context, *g panic("unimplemented") } +func (m *serviceClient) ForAnyQUICStream(context.Context, func(context.Context, *quic.Conn, string) error) error { + panic("unimplemented") +} + type testPayloadStream Streamer func (x *testPayloadStream) Write(p []byte) (int, error) { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 5a94c6e4ae..d12d4dff15 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -858,6 +858,7 @@ func (s *Server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe type getStream struct { base protoobject.ObjectService_GetServer + w io.Writer // set instead of base for the raw QUIC GET stream srv *Server reqInfo aclsvc.RequestInfo diff --git a/pkg/services/object/server_quic.go b/pkg/services/object/server_quic.go new file mode 100644 index 0000000000..f438cd0cde --- /dev/null +++ b/pkg/services/object/server_quic.go @@ -0,0 +1,378 @@ +package object + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + "sync" + "time" + + icrypto "github.com/nspcc-dev/neofs-node/internal/crypto" + iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-node/internal/qstream" + "github.com/nspcc-dev/neofs-node/pkg/core/client" + aclsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/container" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session" + protostatus "github.com/nspcc-dev/neofs-sdk-go/proto/status" + "github.com/nspcc-dev/neofs-sdk-go/stat" + "github.com/quic-go/quic-go" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" +) + +// statusStreamWriter prepends the qstream.StatusOK byte before the first body +// byte. It lets the OK status be committed lazily, so that errors detected +// before any payload is produced are reported via writeStreamError instead. +type statusStreamWriter struct { + w io.Writer + started bool +} + +func (s *statusStreamWriter) Write(p []byte) (int, error) { + if !s.started { + s.started = true + if _, err := s.w.Write([]byte{qstream.StatusOK}); err != nil { + return 0, err + } + } + return s.w.Write(p) +} + +// writeStreamError reports an error before any response body has been written: +// the qstream.StatusError byte followed by a serialized status.Status. +func writeStreamError(w io.Writer, err error) { + _, _ = w.Write([]byte{qstream.StatusError}) + if err == nil { + return + } + if st := util.ToStatus(err); st != nil { + if b, mErr := proto.Marshal(st); mErr == nil { + _, _ = w.Write(b) + } + } +} + +// streamWriter is the getsvc.ObjectWriter used for assembled/forwarded reads. It +// emits the same ad-hoc framing as copyStream: field 3 = object header, field 4 +// = payload. +type streamWriter struct { + io.Writer +} + +func (w streamWriter) WriteHeader(hdr *object.Object) error { + var b [32]byte + + pref := protowire.AppendTag(b[:0], 3, protowire.BytesType) + hdrBin := hdr.CutPayload().Marshal() + pref = protowire.AppendVarint(pref, uint64(len(hdrBin))) + if _, err := w.Write(pref); err != nil { + return err + } + if _, err := w.Write(hdrBin); err != nil { + return err + } + + pref = protowire.AppendTag(b[:0], 4, protowire.BytesType) + pref = protowire.AppendVarint(pref, hdr.PayloadSize()) + _, err := w.Write(pref) + return err +} + +func (w streamWriter) WriteChunk(data []byte) error { + _, err := w.Write(data) + return err +} + +// ServeGetStream handles a single GET over a raw QUIC bidirectional stream. The +// request (serialized object.GetRequest, terminated by the peer's FIN) is read +// from rw; the response is written back as described in the qstream package. +func (s *Server) ServeGetStream(ctx context.Context, rw io.ReadWriteCloser) { + var ( + err error + recheckEACL bool + t = time.Now() + ) + defer func() { s.pushOpExecResult(stat.MethodObjectGet, err, t) }() + defer func() { _ = rw.Close() }() + + sw := &statusStreamWriter{w: rw} + + buf, err := io.ReadAll(io.LimitReader(rw, qstream.MaxRequestSize)) + if err != nil { + writeStreamError(rw, err) + return + } + if len(buf) == 0 { + bad := new(apistatus.BadRequest) + bad.SetMessage("empty request") + err = bad + writeStreamError(rw, err) + return + } + + req := new(protoobject.GetRequest) + if err = proto.Unmarshal(buf, req); err != nil { + bad := new(apistatus.BadRequest) + bad.SetMessage("malformed request: " + err.Error()) + err = bad + writeStreamError(rw, err) + return + } + + if err = icrypto.VerifyRequestSignatures(req); err != nil { + writeStreamError(rw, err) + return + } + + if s.fsChain.LocalNodeUnderMaintenance() { + err = apistatus.ErrNodeUnderMaintenance + writeStreamError(rw, err) + return + } + + reqInfo, err := s.reqInfoProc.GetRequestToInfo(req) + if err != nil { + if !errors.Is(err, apistatus.Error) { + bad := new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad + } + writeStreamError(rw, err) + return + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) + writeStreamError(rw, err) + return + } + if err = s.aclChecker.CheckEACL(ctx, req, reqInfo); err != nil { + if !errors.Is(err, aclsvc.ErrNotMatched) { + err = eACLErr(reqInfo, err) + writeStreamError(rw, err) + return + } + recheckEACL = true + err = nil + } + + p, err := convertGetPrmStream(s.signer, reqInfo.Container, req, buf, &getStream{ + w: sw, + srv: s, + reqInfo: reqInfo, + recheckEACL: recheckEACL, + signResponse: needSignGetResponse(req), + }) + if err != nil { + if !errors.Is(err, apistatus.Error) { + bad := new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad + } + writeStreamError(rw, err) + return + } + + hdrRespBuf, hdrBuf := getBufferForHeadResponse() + defer hdrRespBuf.Free() + + hdrLen := -1 + var stream io.ReadCloser + defer func() { + if stream != nil { + _ = stream.Close() + } + }() + p.WithBuffer(hdrBuf, func(ln int, st io.ReadCloser) { hdrLen, stream = ln, st }) + + err = s.handlers.Get(ctx, p) + if err != nil { + if !sw.started { + writeStreamError(rw, err) + } + // otherwise the truncated stream signals the error to the client. + return + } + + if hdrLen < 0 { + // Response already streamed through sw, either by the forwarder + // (continueQUIC) or by streamWriter for an assembled/EC object. + return + } + + idf, sigf, hdrf, err := iobject.GetNonPayloadFieldBounds(hdrBuf[:hdrLen]) + if err != nil { + if !sw.started { + writeStreamError(rw, err) + } + return + } + + if recheckEACL { // previous check didn't match, but we have a header now. + if eErr := s.aclChecker.CheckEACL(ctx, hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo); eErr != nil && !errors.Is(eErr, aclsvc.ErrNotMatched) { + err = eACLErr(reqInfo, eErr) + if !sw.started { + writeStreamError(rw, err) + } + return + } + } + + pldFldOff := max(idf.To, sigf.To, hdrf.To) + err = copyStream(sw, hdrBuf, hdrLen, stream, pldFldOff) +} + +// copyStream writes the raw object response into w: field 3 (object header) then +// field 4 (payload size + streamed payload). +func copyStream(w io.Writer, hdrBuf []byte, hdrLen int, stream io.Reader, pldFldOff int) error { + var pref [16]byte + p := protowire.AppendTag(pref[:0], 3, protowire.BytesType) + p = protowire.AppendVarint(p, uint64(pldFldOff)) + if _, err := w.Write(p); err != nil { + return err + } + if _, err := w.Write(hdrBuf[:pldFldOff]); err != nil { + return err + } + + if pldFldOff >= hdrLen { + p = protowire.AppendTag(pref[:0], 4, protowire.BytesType) + p = protowire.AppendVarint(p, 0) + _, err := w.Write(p) + return err + } + + num, typ, tagSz := protowire.ConsumeTag(hdrBuf[pldFldOff:hdrLen]) + if tagSz < 0 || num != 4 || typ != protowire.BytesType { + return fmt.Errorf("bad payload field tag at offset %d", pldFldOff) + } + payloadSize, lenSz := protowire.ConsumeVarint(hdrBuf[pldFldOff+tagSz : hdrLen]) + if lenSz < 0 { + return errors.New("bad payload length varint") + } + payloadValueStart := pldFldOff + tagSz + lenSz + + p = protowire.AppendTag(pref[:0], 4, protowire.BytesType) + p = protowire.AppendVarint(p, payloadSize) + if _, err := w.Write(p); err != nil { + return err + } + if payloadValueStart < hdrLen { + if _, err := w.Write(hdrBuf[payloadValueStart:hdrLen]); err != nil { + return err + } + } + if stream != nil { + if _, err := io.Copy(w, stream); err != nil { + return err + } + } + return nil +} + +// convertGetPrmStream builds get-service parameters for the raw QUIC GET path. +func convertGetPrmStream(signer ecdsa.PrivateKey, cnr container.Container, req *protoobject.GetRequest, buf []byte, stream *getStream) (getsvc.Prm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.Prm{}, errors.New("missing object address") + } + + var addr oid.Address + if err := addr.FromProtoMessage(ma); err != nil { + return getsvc.Prm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.Prm{}, err + } + + var p getsvc.Prm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithContainer(cnr) + p.WithRawFlag(body.Raw) + p.SetObjectWriter(streamWriter{stream.w}) + if cp.LocalOnly() { + return p, nil + } + + var onceResign sync.Once + meta := req.GetMetaHeader() + if meta == nil { + return getsvc.Prm{}, errors.New("missing meta header") + } + + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) error { + var resignErr error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + Ttl: meta.GetTtl() - 1, + Origin: meta, + } + req.VerifyHeader, resignErr = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(signer), req, nil) + if resignErr != nil { + return + } + buf, resignErr = proto.MarshalOptions{}.MarshalAppend(buf[:0], req) + }) + if resignErr != nil { + return resignErr + } + return c.ForAnyQUICStream(ctx, func(ctx context.Context, conn *quic.Conn, _ string) error { + return continueQUIC(ctx, stream.w, buf, conn) + }) + }) + return p, nil +} + +// continueQUIC forwards a GET to a peer node over QUIC and copies its raw +// response body into w. +func continueQUIC(ctx context.Context, w io.Writer, buf []byte, conn *quic.Conn) error { + st, err := conn.OpenStreamSync(ctx) + if err != nil { + return fmt.Errorf("open forward stream: %w", err) + } + defer st.CancelRead(0) + + if dl, ok := ctx.Deadline(); ok { + _ = st.SetDeadline(dl) + } + + if _, err = st.Write(buf); err != nil { + return fmt.Errorf("write forward request: %w", err) + } + if err = st.Close(); err != nil { // FIN the send side + return fmt.Errorf("close forward request: %w", err) + } + + var sb [1]byte + if _, err = io.ReadFull(st, sb[:]); err != nil { + return fmt.Errorf("read forward status: %w", err) + } + if sb[0] != qstream.StatusOK { + body, _ := io.ReadAll(io.LimitReader(st, qstream.MaxRequestSize)) + var stt protostatus.Status + if proto.Unmarshal(body, &stt) == nil { + if e := apistatus.ToError(&stt); e != nil { + return e + } + } + return fmt.Errorf("forwarded peer returned error status %d", sb[0]) + } + + _, err = io.Copy(w, st) + return err +} diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index f25bacfcdf..f8ff3f846a 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -53,6 +53,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/panjf2000/ants/v2" + "github.com/quic-go/quic-go" "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" @@ -810,6 +811,10 @@ func (unimplementedConn) ForAnyGRPCConn(context.Context, func(context.Context, * panic("unimplemented") } +func (unimplementedConn) ForAnyQUICStream(context.Context, func(context.Context, *quic.Conn, string) error) error { + panic("unimplemented") +} + type emptyRemoteNode struct { unimplementedConn }