diff --git a/config/config.go b/config/config.go index e14802a9..de32deae 100644 --- a/config/config.go +++ b/config/config.go @@ -34,6 +34,7 @@ type Config struct { ArchiverNodes []string Rewards []rewards.Reward AudiusdURL string + CoreBlockStreamEnabled bool OpenAudioURLs []string ChainId string BirdeyeToken string @@ -95,6 +96,7 @@ var Cfg = Config{ AxiomDataset: os.Getenv("axiomDataset"), NetworkTakeRate: 10, AudiusdURL: os.Getenv("audiusdUrl"), + CoreBlockStreamEnabled: os.Getenv("coreBlockStreamEnabled") == "true", OpenAudioURLs: []string{}, BirdeyeToken: os.Getenv("birdeyeToken"), EthRpcUrl: os.Getenv("ethRpcUrl"), diff --git a/go.mod b/go.mod index a3b8098f..53aba344 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( connectrpc.com/connect v1.18.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 - github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6 - github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6 + github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 diff --git a/go.sum b/go.sum index 13a06135..75b15854 100644 --- a/go.sum +++ b/go.sum @@ -20,14 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260609040102-bda0b8b592f5 h1:ncJjkgXNaL7r8LyNGigHONOgFVq+jBpkMKtgyZrju1o= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260609040102-bda0b8b592f5/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6 h1:lRaOXeltp59DNPe9+btz7HeqxOl5Edf0AK/ePT9khak= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609040102-bda0b8b592f5 h1:l4KjBR+wR5DpHPV03N0DKFLcPsMhP3Hq82JquBL5Ajo= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609040102-bda0b8b592f5/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6 h1:vuZ8SeGb2tQemF5478KsPl1gPqNif8airkZSaehud2I= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b h1:vlK/jCRaB8E/0pKgVVanP7hSrESH4JSYl/+dZfwYQoA= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b h1:z77PTyEzh/CT6n31U5fQ1Pn7AzttGaduzqvBk8xgAqM= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/indexer/indexer.go b/indexer/indexer.go index 0216c471..4e458296 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -3,12 +3,16 @@ package indexer import ( "context" "fmt" + "net/http" + "strings" "time" "api.audius.co/config" dbv1 "api.audius.co/database" "api.audius.co/jobs" "api.audius.co/logging" + "connectrpc.com/connect" + corev1connect "github.com/OpenAudio/go-openaudio/pkg/api/core/v1/v1connect" etl "github.com/OpenAudio/go-openaudio/pkg/etl" em "github.com/OpenAudio/go-openaudio/pkg/etl/processors/entity_manager" "github.com/OpenAudio/go-openaudio/pkg/sdk" @@ -68,12 +72,22 @@ func NewIndexer(cfg config.Config) *CoreIndexer { etlCfg.DisableMaterializedViewRefresh() etlCfg.DisablePgNotifyListener() etlCfg.ReadDataTypesEnv() // honors OPENAUDIO_ETL_ENTITY_MANAGER_DATA_TYPES if set + etlCfg.BlockStreamEnabled = cfg.CoreBlockStreamEnabled etlIndexer := etl.New(openAudioSDK.Core, logger) etlIndexer.SetConfig(etlCfg) etlIndexer.SetDBURL(cfg.WriteDbUrl) etlIndexer.SetCheckReadiness(true) + // When enabled, source blocks from the CoreService.StreamBlocks gRPC stream + // instead of polling GetBlocks. The ETL falls back to polling automatically + // if the endpoint doesn't support it, so this is safe to flip on per-env. + // Kept separate from the SDK's unary Core client (used for status + fallback). + if cfg.CoreBlockStreamEnabled { + etlIndexer.SetBlockStreamClient(newCoreStreamClient(cfg.AudiusdURL)) + logger.Info("etl block source: gRPC stream", zap.String("endpoint", cfg.AudiusdURL)) + } + // Restore the pre-vendor setPubkeyForUser behavior via the upstream // post-create hook (go-openaudio #317). Recovers the EIP-712 pubkey // from each User Create tx and writes it to user_pubkeys in the same @@ -108,6 +122,18 @@ func NewIndexer(cfg config.Config) *CoreIndexer { } } +// newCoreStreamClient builds a gRPC (HTTP/2) CoreService client for the block +// stream. connect.WithGRPC requires HTTP/2, which the default transport +// negotiates over TLS. This is separate from the SDK's unary Core client, which +// the ETL still uses for status checks and the polling fallback. +func newCoreStreamClient(audiusdURL string) corev1connect.CoreServiceClient { + baseURL := audiusdURL + if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") { + baseURL = "https://" + baseURL + } + return corev1connect.NewCoreServiceClient(http.DefaultClient, baseURL, connect.WithGRPC()) +} + // Start runs the ETL indexer alongside the aggregates calculator. Both are // long-lived; errgroup propagates the first error (and the ctx cancellation // it triggers) to all members.