Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type clientOptions struct {
maxEdgeTraversal int
cacheSizeMB int
maxRecvMsgSize int
grpcDialOptions []grpc.DialOption
namespace string
logger logr.Logger
validator StructValidator
Expand Down Expand Up @@ -189,6 +190,18 @@ func WithMaxRecvMsgSize(size int) ClientOpt {
}
}

// WithGRPCDialOption appends a custom grpc.DialOption applied when opening a
// remote (dgraph://) connection. It is the general escape hatch for gRPC dial
// settings the dedicated options do not cover — TLS transport credentials,
// interceptors, keepalive parameters, and so on. May be supplied multiple
// times; the options are applied in the order given, after any option implied
// by WithMaxRecvMsgSize. Ignored for embedded (file://) URIs.
func WithGRPCDialOption(opt grpc.DialOption) ClientOpt {
return func(o *clientOptions) {
o.grpcDialOptions = append(o.grpcDialOptions, opt)
}
}

// WithValidator sets a validator instance for struct validation.
// The validator will be used to validate structs before insert, upsert, and update operations.
// If no validator is provided, validation will be skipped.
Expand Down Expand Up @@ -279,16 +292,26 @@ func NewClient(uri string, opts ...ClientOpt) (Client, error) {
client.logger.V(2).Info("Opening new Dgraph connection", "uri", uri)
return dgo.Open(uri)
}
// Assemble any custom gRPC dial options. maxRecvMsgSize is folded
// into the same mechanism as WithGRPCDialOption so the two compose.
var dialOpts []grpc.DialOption
if options.maxRecvMsgSize > 0 {
dialOpts = append(dialOpts,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(options.maxRecvMsgSize)))
}
dialOpts = append(dialOpts, options.grpcDialOptions...)
if len(dialOpts) > 0 {
endpoint, dgoOpts, err := parseDgraphURI(uri)
if err != nil {
return nil, err
}
dgoOpts = append(dgoOpts, dgo.WithGrpcOption(
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(options.maxRecvMsgSize))))
for _, opt := range dialOpts {
dgoOpts = append(dgoOpts, dgo.WithGrpcOption(opt))
}
factory = func() (*dgo.Dgraph, error) {
client.logger.V(2).Info("Opening new Dgraph connection",
"uri", uri, "maxRecvMsgSize", options.maxRecvMsgSize)
"uri", uri, "maxRecvMsgSize", options.maxRecvMsgSize,
"grpcDialOptions", len(options.grpcDialOptions))
return dgo.NewClient(endpoint, dgoOpts...)
}
}
Expand Down Expand Up @@ -430,9 +453,34 @@ func (c client) key() string {
if c.options.embeddingProvider != nil {
embeddingKey = fmt.Sprintf("%p", c.options.embeddingProvider)
}
return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s", c.uri, c.options.autoSchema, c.options.poolSize,
// Custom gRPC dial options only apply to remote (dgraph://) connections;
// they are ignored for embedded (file://) URIs, so they only contribute to
// the dedup key for remote clients — matching that documented behavior.
dialKey := "0"
if strings.HasPrefix(c.uri, dgraphURIPrefix) {
dialKey = dialOptionsKey(c.options.grpcDialOptions)
}
return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s:%s", c.uri, c.options.autoSchema, c.options.poolSize,
c.options.maxEdgeTraversal, c.options.cacheSizeMB, c.options.maxRecvMsgSize,
c.options.namespace, validatorKey, embeddingKey)
c.options.namespace, validatorKey, embeddingKey, dialKey)
}

// dialOptionsKey identifies a set of custom gRPC dial options for the client
// dedup cache. grpc.DialOption values are opaque and not comparable, so the key
// uses each option's runtime identity rather than just the count: two clients
// configured with different options get different keys and are never merged.
// Two clients built from separately-constructed but equivalent options also
// differ, which is safe — the cache errs toward keeping them apart rather than
// merging connections that were configured differently.
func dialOptionsKey(opts []grpc.DialOption) string {
if len(opts) == 0 {
return "0"
}
parts := make([]string, len(opts))
for i, opt := range opts {
parts[i] = fmt.Sprintf("%p", opt)
}
return strings.Join(parts, ",")
}

// embeddingProvider implements the embeddingClient interface, exposing the
Expand Down
35 changes: 35 additions & 0 deletions dial_options_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package modusgraph_test

import (
"time"

mg "github.com/matthewmcneely/modusgraph"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)

// ExampleWithGRPCDialOption configures gRPC dial settings the dedicated options
// do not cover — here, transport credentials and keepalive parameters — when
// opening a remote dgraph:// connection. Each WithGRPCDialOption adds one
// grpc.DialOption; they compose with WithMaxRecvMsgSize. The options are ignored
// for embedded (file://) URIs.
func ExampleWithGRPCDialOption() {
client, err := mg.NewClient(
"dgraph://localhost:9080",
mg.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
mg.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
})),
)
if err != nil {
panic(err)
}
defer client.Close()
}
53 changes: 53 additions & 0 deletions dial_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package modusgraph

import (
"testing"

"google.golang.org/grpc"
)

func TestWithGRPCDialOptionAppends(t *testing.T) {
var o clientOptions
WithGRPCDialOption(grpc.WithUserAgent("a"))(&o)
WithGRPCDialOption(grpc.WithUserAgent("b"))(&o)
if got := len(o.grpcDialOptions); got != 2 {
t.Fatalf("expected 2 dial options, got %d", got)
}
}

func TestKeyDistinguishesGRPCDialOptions(t *testing.T) {
// A client with no dial options must differ from one with a dial option.
base := client{uri: "dgraph://localhost:9080"}
withOpt := client{uri: "dgraph://localhost:9080"}
WithGRPCDialOption(grpc.WithUserAgent("x"))(&withOpt.options)
if base.key() == withOpt.key() {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
t.Fatal("client.key() must differ when grpcDialOptions differ, else clients dedup incorrectly")
}

// Two clients with the SAME number of dial options but DIFFERENT options
// must also differ. A count-only key would collide here and merge clients
// that were configured differently.
a := client{uri: "dgraph://localhost:9080"}
b := client{uri: "dgraph://localhost:9080"}
WithGRPCDialOption(grpc.WithUserAgent("x"))(&a.options)
WithGRPCDialOption(grpc.WithUserAgent("y"))(&b.options)
if a.key() == b.key() {
t.Fatal("client.key() must differ when dial options differ at the same count")
}
}

func TestKeyIgnoresGRPCDialOptionsForEmbedded(t *testing.T) {
// Dial options are ignored for embedded (file://) URIs, so they must not
// affect the dedup key there.
plain := client{uri: "file:///tmp/db"}
withOpt := client{uri: "file:///tmp/db"}
WithGRPCDialOption(grpc.WithUserAgent("x"))(&withOpt.options)
if plain.key() != withOpt.key() {
t.Fatal("dial options must not affect the cache key for embedded (file://) clients")
}
}
Loading