diff --git a/cmd/mtc/mirror/posix/main.go b/cmd/mtc/mirror/posix/main.go index d21ac8694..8ddbef532 100644 --- a/cmd/mtc/mirror/posix/main.go +++ b/cmd/mtc/mirror/posix/main.go @@ -19,14 +19,17 @@ import ( "encoding/json" "errors" "flag" + "fmt" "log/slog" "net/http" "os" "path/filepath" + "net/url" fnote "github.com/transparency-dev/formats/note" "github.com/transparency-dev/tessera" "github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/config" + "github.com/transparency-dev/tessera/storage/posix" "github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/handler" "github.com/transparency-dev/witness/omniwitness" "github.com/transparency-dev/witness/witness" @@ -41,7 +44,8 @@ const ( var ( listenAddr = flag.String("listen_addr", ":8080", "The address to listen on for HTTP requests.") storageDir = flag.String("storage_dir", "", "Directory to store mirror data.") - witnessSignerPath = flag.String("witness_signer_path", "", "The path to the note-formatted witness signer secret key.") + witnessCosignerPath = flag.String("witness_cosigner_path", "", "The path to the note-formatted witness cosigner secret key.") + mirrorCosignerPath = flag.String("mirror_cosigner_path", "", "The path to the note-formatted mirror cosigner secret key.") mirrorConfigPath = flag.String("config_path", "", "The path to the mirror configuration file.") ) @@ -57,6 +61,8 @@ func main() { } }() + mirrorCosigner := mustCreateCosigner(ctx, *mirrorCosignerPath) + mux := handler.NewMirrorMux() cfg := mirrorConfigFromFlags(ctx) for _, l := range cfg.Logs { @@ -65,11 +71,21 @@ func main() { slog.ErrorContext(ctx, "Failed to create verifier", slog.String("vkey", l.VKey), slog.Any("error", err)) os.Exit(1) } + origin := v.Name() - if err := mux.AddTarget(origin, &fakeTarget{}); err != nil { - slog.ErrorContext(ctx, "Failed to add target", slog.String("origin", origin), slog.Any("error", err)) + + // Create the mirror + t, err := newMirrorTarget(ctx, w, origin, mirrorCosigner) + if err != nil { + slog.ErrorContext(ctx, "Failed to create mirror target", slog.String("origin", origin), slog.Any("error", err)) + os.Exit(1) + } + if err := mux.AddTarget(origin, t); err != nil { + slog.ErrorContext(ctx, "Failed to add target to mux", slog.String("origin", origin), slog.Any("error", err)) os.Exit(1) } + + // Ensure log is known by the witness if err := wp.AddLogs(ctx, []omniwitness.Log{ {Origin: origin, VKey: l.VKey}, }); err != nil { @@ -87,6 +103,29 @@ func main() { } } +// newMirrorTarget creates a new POSIX driver and MirrorTarget for the given origin. +// +// The target directory for the driver is derived from the storage directory and the origin in accordance +// with the `tlog-mirror` spec, allowing the root of the storage directory to be exported directly to read-only clients. +func newMirrorTarget(ctx context.Context, w *witness.Witness, origin string, mirrorSigner note.Signer) (*tessera.MirrorTarget, error) { + targetDir := filepath.Join(*storageDir, url.PathEscape(origin)) + if err := os.MkdirAll(targetDir, 0o755); err != nil { + return nil, fmt.Errorf("mkdir %q: %v", targetDir, err) + } + d, err := posix.New(ctx, posix.Config{Path: targetDir}) + if err != nil { + return nil, fmt.Errorf("posix.New: %v", err) + } + mOpts := tessera.NewMirrorOptions(). + WithCheckpointSource(func(ctx context.Context) ([]byte, error) { + return w.GetCheckpoint(ctx, origin) + }). + WithSigner(mirrorSigner) + return tessera.NewMirrorTarget(ctx, d, mOpts) +} + +// mirrorConfigFromFlags returns a mirror configuration loaded from the provided flags. +// Exits if the mirror configuration could not be loaded. func mirrorConfigFromFlags(ctx context.Context) config.Config { if *mirrorConfigPath == "" { slog.ErrorContext(ctx, "Mirror config path not specified") @@ -131,7 +170,7 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc w, err := witness.New(ctx, witness.Opts{ Persistence: p, - Signers: witnessSignerFromFlags(ctx), + Signers: witnessCosignerFromFlags(ctx), VerifierForLog: func(ctx context.Context, origin string) (note.Verifier, bool, error) { log, ok, err := p.Log(ctx, origin) if err != nil || !ok { @@ -147,30 +186,30 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc return w, p, shutdown } -// fakeTarget is a temporary mirror target impl, and will be removed in due course. -type fakeTarget struct {} - -func (f fakeTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*tessera.MirrorPackage, error)) (nextIdx uint64, curSize uint64, newTicket []byte, cosigs []byte, err error) { - slog.InfoContext(ctx, "fake target: AddEntries", slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd)) - return uploadEnd, 0, nil, nil, nil -} - -func witnessSignerFromFlags(ctx context.Context) []note.Signer { - if *witnessSignerPath == "" { +func witnessCosignerFromFlags(ctx context.Context) []note.Signer { + if *witnessCosignerPath == "" { slog.WarnContext(ctx, "Witness cosigner not configured, add-checkpoint will not return cosigs") return []note.Signer{} } - r, err := os.ReadFile(*witnessSignerPath) + return []note.Signer{mustCreateCosigner(ctx, *witnessCosignerPath)} +} + +func mustCreateCosigner(ctx context.Context, path string) note.Signer { + if path == "" { + slog.ErrorContext(ctx, "Cosigner key path not specified") + os.Exit(1) + } + r, err := os.ReadFile(path) if err != nil { - slog.ErrorContext(ctx, "Failed to read witness cosigner file", slog.String("path", *witnessSignerPath), slog.Any("error", err)) + slog.ErrorContext(ctx, "Failed to read cosigner key", slog.String("path", path), slog.Any("error", err)) os.Exit(1) } s, err := fnote.NewSignerForCosignatureV1(string(r)) if err != nil { - slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", *witnessSignerPath), slog.Any("error", err)) + slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", path), slog.Any("error", err)) os.Exit(1) } - return []note.Signer{s} + return s } diff --git a/storage/posix/files.go b/storage/posix/files.go index d76ff96cf..bf4c42bf8 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io/fs" + "iter" "net/http" "os" "path/filepath" @@ -313,6 +314,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry) // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. // - The POSIX `lockFile()` ensures that distinct tasks are serialised. a.s.mu.Lock() + defer a.s.mu.Unlock() unlock, err := a.s.lockFile(ctx, treeStateLock) if err != nil { panic(err) @@ -321,7 +323,6 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry) if err := unlock(); err != nil { panic(err) } - a.s.mu.Unlock() }() size, _, err := a.s.readTreeState(ctx) @@ -538,6 +539,7 @@ func (a *appender) initialise(ctx context.Context) error { // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. // - The POSIX `lockFile()` ensures that distinct tasks are serialised. a.s.mu.Lock() + defer a.s.mu.Unlock() unlock, err := a.s.lockFile(ctx, treeStateLock) if err != nil { panic(err) @@ -546,7 +548,6 @@ func (a *appender) initialise(ctx context.Context) error { if err := unlock(); err != nil { panic(err) } - a.s.mu.Unlock() }() if err := a.s.ensureVersion(compatibilityVersion); err != nil { @@ -996,6 +997,7 @@ func (m *MigrationStorage) initialise(ctx context.Context) error { // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. // - The POSIX `lockFile()` ensures that distinct tasks are serialised. m.s.mu.Lock() + defer m.s.mu.Unlock() unlock, err := m.s.lockFile(ctx, treeStateLock) if err != nil { panic(err) @@ -1004,7 +1006,6 @@ func (m *MigrationStorage) initialise(ctx context.Context) error { if err := unlock(); err != nil { panic(err) } - m.s.mu.Unlock() }() if err := m.s.ensureVersion(compatibilityVersion); err != nil { @@ -1041,6 +1042,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. // - The POSIX `lockFile()` ensures that distinct tasks are serialised. m.s.mu.Lock() + defer m.s.mu.Unlock() unlock, err := m.s.lockFile(ctx, treeStateLock) if err != nil { panic(err) @@ -1049,7 +1051,6 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err if err := unlock(); err != nil { panic(err) } - m.s.mu.Unlock() }() size, _, err := m.s.readTreeState(ctx) @@ -1110,3 +1111,83 @@ func (m *MigrationStorage) fetchLeafHashes(ctx context.Context, from, to, source } return lh, nil } + + +// MirrorWriter creates a new POSIX storage for the MirrorTarget lifecycle mode. +func (s *Storage) MirrorWriter(ctx context.Context, opts *tessera.MirrorOptions) (tessera.MirrorWriter, tessera.LogReader, error) { + r := &MirrorWriter{ + s: s, + logStorage: &logResourceStorage{ + entriesPath: opts.EntriesPath(), + s: s, + }, + bundleHasher: opts.LeafHasher(), + } + if err := r.initialise(ctx); err != nil { + return nil, nil, err + } + return r, r.logStorage, nil +} + +// MirrorWriter implements the tessera.MirrorWriter lifecycle contract. +type MirrorWriter struct { + s *Storage + logStorage *logResourceStorage + bundleHasher func(entryBundle []byte) ([][]byte, error) + curSize uint64 +} + +var _ tessera.MirrorWriter = &MirrorWriter{} + +func (m *MirrorWriter) initialise(ctx context.Context) error { + // Idempotent: If folder exists, nothing happens. + if err := mkdirAll(filepath.Join(m.s.cfg.Path, stateDir), dirPerm); err != nil { + return fmt.Errorf("failed to create log directory: %q", err) + } + // Double locking: + // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. + // - The POSIX `lockFile()` ensures that distinct tasks are serialised. + m.s.mu.Lock() + defer m.s.mu.Unlock() + unlock, err := m.s.lockFile(ctx, treeStateLock) + if err != nil { + // Oh dear, panic Mr Mainwaring! + panic(fmt.Errorf("failed to lock tree state: %v", err)) + } + defer func() { + if err := unlock(); err != nil { + // Oh dear, panic Mr Mainwaring! + panic(fmt.Errorf("failed to unlock tree state: %v", err)) + } + }() + + if err := m.s.ensureVersion(compatibilityVersion); err != nil { + return err + } + curSize, _, err := m.s.readTreeState(ctx) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to load checkpoint for mirror: %v", err) + } + // Create the directory structure and write out an empty checkpoint + slog.InfoContext(ctx, "Initializing directory for POSIX mirror (this should only happen ONCE per mirror!)", slog.String("path", m.s.cfg.Path)) + if err := m.s.writeTreeState(ctx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil { + return fmt.Errorf("failed to write tree-state checkpoint: %v", err) + } + return nil + } + m.curSize = curSize + + return nil +} + +// IntegrateBundles integrates a sequence of entry bundles into the tree, starting at the provided bundle index bundleIdx. +// Returns the new size and root hash of the tree if successful. +func (m *MirrorWriter) IntegrateBundles(ctx context.Context, bundleIdx uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) { + return 0, nil, errors.New("unimplemented") +} + +func (m *MirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) { + sz, _, err := m.s.readTreeState(ctx) + return sz, err +}