Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
ArchiverNodes []string
Rewards []rewards.Reward
AudiusdURL string
CoreBlockStreamEnabled bool
OpenAudioURLs []string
ChainId string
BirdeyeToken string
Expand Down Expand Up @@ -95,6 +96,7 @@ var Cfg = Config{
AxiomDataset: os.Getenv("axiomDataset"),
NetworkTakeRate: 10,
AudiusdURL: os.Getenv("audiusdUrl"),
CoreBlockStreamEnabled: os.Getenv("coreBlockStreamEnabled") == "true",
OpenAudioURLs: []string{},
BirdeyeToken: os.Getenv("birdeyeToken"),
EthRpcUrl: os.Getenv("ethRpcUrl"),
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
connectrpc.com/connect v1.18.1
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b
github.com/aquasecurity/esquery v0.2.0
github.com/axiomhq/axiom-go v0.23.0
github.com/axiomhq/hyperloglog v0.2.5
Expand Down
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609040102-bda0b8b592f5 h1:ncJjkgXNaL7r8LyNGigHONOgFVq+jBpkMKtgyZrju1o=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609040102-bda0b8b592f5/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6 h1:lRaOXeltp59DNPe9+btz7HeqxOl5Edf0AK/ePT9khak=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609193221-97ccec32c1c6/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609040102-bda0b8b592f5 h1:l4KjBR+wR5DpHPV03N0DKFLcPsMhP3Hq82JquBL5Ajo=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609040102-bda0b8b592f5/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6 h1:vuZ8SeGb2tQemF5478KsPl1gPqNif8airkZSaehud2I=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609193221-97ccec32c1c6/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b h1:vlK/jCRaB8E/0pKgVVanP7hSrESH4JSYl/+dZfwYQoA=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260609211151-aee578ec923b/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b h1:z77PTyEzh/CT6n31U5fQ1Pn7AzttGaduzqvBk8xgAqM=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609211151-aee578ec923b/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
Expand Down
26 changes: 26 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package indexer
import (
"context"
"fmt"
"net/http"
"strings"
"time"

"api.audius.co/config"
dbv1 "api.audius.co/database"
"api.audius.co/jobs"
"api.audius.co/logging"
"connectrpc.com/connect"
corev1connect "github.com/OpenAudio/go-openaudio/pkg/api/core/v1/v1connect"
etl "github.com/OpenAudio/go-openaudio/pkg/etl"
em "github.com/OpenAudio/go-openaudio/pkg/etl/processors/entity_manager"
"github.com/OpenAudio/go-openaudio/pkg/sdk"
Expand Down Expand Up @@ -68,12 +72,22 @@ func NewIndexer(cfg config.Config) *CoreIndexer {
etlCfg.DisableMaterializedViewRefresh()
etlCfg.DisablePgNotifyListener()
etlCfg.ReadDataTypesEnv() // honors OPENAUDIO_ETL_ENTITY_MANAGER_DATA_TYPES if set
etlCfg.BlockStreamEnabled = cfg.CoreBlockStreamEnabled

etlIndexer := etl.New(openAudioSDK.Core, logger)
etlIndexer.SetConfig(etlCfg)
etlIndexer.SetDBURL(cfg.WriteDbUrl)
etlIndexer.SetCheckReadiness(true)

// When enabled, source blocks from the CoreService.StreamBlocks gRPC stream
// instead of polling GetBlocks. The ETL falls back to polling automatically
// if the endpoint doesn't support it, so this is safe to flip on per-env.
// Kept separate from the SDK's unary Core client (used for status + fallback).
if cfg.CoreBlockStreamEnabled {
etlIndexer.SetBlockStreamClient(newCoreStreamClient(cfg.AudiusdURL))
logger.Info("etl block source: gRPC stream", zap.String("endpoint", cfg.AudiusdURL))
}

// Restore the pre-vendor setPubkeyForUser behavior via the upstream
// post-create hook (go-openaudio #317). Recovers the EIP-712 pubkey
// from each User Create tx and writes it to user_pubkeys in the same
Expand Down Expand Up @@ -108,6 +122,18 @@ func NewIndexer(cfg config.Config) *CoreIndexer {
}
}

// newCoreStreamClient builds a gRPC (HTTP/2) CoreService client for the block
// stream. connect.WithGRPC requires HTTP/2, which the default transport
// negotiates over TLS. This is separate from the SDK's unary Core client, which
// the ETL still uses for status checks and the polling fallback.
func newCoreStreamClient(audiusdURL string) corev1connect.CoreServiceClient {
baseURL := audiusdURL
if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") {
baseURL = "https://" + baseURL
}
return corev1connect.NewCoreServiceClient(http.DefaultClient, baseURL, connect.WithGRPC())
}

// Start runs the ETL indexer alongside the aggregates calculator. Both are
// long-lived; errgroup propagates the first error (and the ctx cancellation
// it triggers) to all members.
Expand Down
Loading