Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 57 additions & 18 deletions cmd/mtc/mirror/posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"path/filepath"
"net/url"

fnote "github.com/transparency-dev/formats/note"
"github.com/transparency-dev/tessera"
"github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/config"
"github.com/transparency-dev/tessera/storage/posix"
"github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/handler"
"github.com/transparency-dev/witness/omniwitness"
"github.com/transparency-dev/witness/witness"
Expand All @@ -41,7 +44,8 @@ const (
var (
listenAddr = flag.String("listen_addr", ":8080", "The address to listen on for HTTP requests.")
storageDir = flag.String("storage_dir", "", "Directory to store mirror data.")
witnessSignerPath = flag.String("witness_signer_path", "", "The path to the note-formatted witness signer secret key.")
witnessCosignerPath = flag.String("witness_cosigner_path", "", "The path to the note-formatted witness cosigner secret key.")
mirrorCosignerPath = flag.String("mirror_cosigner_path", "", "The path to the note-formatted mirror cosigner secret key.")
mirrorConfigPath = flag.String("config_path", "", "The path to the mirror configuration file.")
)

Expand All @@ -57,6 +61,8 @@ func main() {
}
}()

mirrorCosigner := mustCreateCosigner(ctx, *mirrorCosignerPath)

mux := handler.NewMirrorMux()
cfg := mirrorConfigFromFlags(ctx)
for _, l := range cfg.Logs {
Expand All @@ -65,11 +71,21 @@ func main() {
slog.ErrorContext(ctx, "Failed to create verifier", slog.String("vkey", l.VKey), slog.Any("error", err))
os.Exit(1)
}

origin := v.Name()
if err := mux.AddTarget(origin, &fakeTarget{}); err != nil {
slog.ErrorContext(ctx, "Failed to add target", slog.String("origin", origin), slog.Any("error", err))

// Create the mirror
t, err := newMirrorTarget(ctx, w, origin, mirrorCosigner)
if err != nil {
slog.ErrorContext(ctx, "Failed to create mirror target", slog.String("origin", origin), slog.Any("error", err))
os.Exit(1)
}
if err := mux.AddTarget(origin, t); err != nil {
slog.ErrorContext(ctx, "Failed to add target to mux", slog.String("origin", origin), slog.Any("error", err))
os.Exit(1)
}

// Ensure log is known by the witness
if err := wp.AddLogs(ctx, []omniwitness.Log{
{Origin: origin, VKey: l.VKey},
}); err != nil {
Expand All @@ -87,6 +103,29 @@ func main() {
}
}

// newMirrorTarget creates a new POSIX driver and MirrorTarget for the given origin.
//
// The target directory for the driver is derived from the storage directory and the origin in accordance
// with the `tlog-mirror` spec, allowing the root of the storage directory to be exported directly to read-only clients.
func newMirrorTarget(ctx context.Context, w *witness.Witness, origin string, mirrorSigner note.Signer) (*tessera.MirrorTarget, error) {
targetDir := filepath.Join(*storageDir, url.PathEscape(origin))
if err := os.MkdirAll(targetDir, 0o755); err != nil {
return nil, fmt.Errorf("mkdir %q: %v", targetDir, err)
}
d, err := posix.New(ctx, posix.Config{Path: targetDir})
if err != nil {
return nil, fmt.Errorf("posix.New: %v", err)
}
mOpts := tessera.NewMirrorOptions().
WithCheckpointSource(func(ctx context.Context) ([]byte, error) {
return w.GetCheckpoint(ctx, origin)
}).
WithSigner(mirrorSigner)
return tessera.NewMirrorTarget(ctx, d, mOpts)
}

// mirrorConfigFromFlags returns a mirror configuration loaded from the provided flags.
// Exits if the mirror configuration could not be loaded.
func mirrorConfigFromFlags(ctx context.Context) config.Config {
if *mirrorConfigPath == "" {
slog.ErrorContext(ctx, "Mirror config path not specified")
Expand Down Expand Up @@ -131,7 +170,7 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc

w, err := witness.New(ctx, witness.Opts{
Persistence: p,
Signers: witnessSignerFromFlags(ctx),
Signers: witnessCosignerFromFlags(ctx),
VerifierForLog: func(ctx context.Context, origin string) (note.Verifier, bool, error) {
log, ok, err := p.Log(ctx, origin)
if err != nil || !ok {
Expand All @@ -147,30 +186,30 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc
return w, p, shutdown
}

// fakeTarget is a temporary mirror target impl, and will be removed in due course.
type fakeTarget struct {}

func (f fakeTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*tessera.MirrorPackage, error)) (nextIdx uint64, curSize uint64, newTicket []byte, cosigs []byte, err error) {
slog.InfoContext(ctx, "fake target: AddEntries", slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd))
return uploadEnd, 0, nil, nil, nil
}

func witnessSignerFromFlags(ctx context.Context) []note.Signer {
if *witnessSignerPath == "" {
func witnessCosignerFromFlags(ctx context.Context) []note.Signer {
if *witnessCosignerPath == "" {
slog.WarnContext(ctx, "Witness cosigner not configured, add-checkpoint will not return cosigs")
return []note.Signer{}
}
r, err := os.ReadFile(*witnessSignerPath)
return []note.Signer{mustCreateCosigner(ctx, *witnessCosignerPath)}
}

func mustCreateCosigner(ctx context.Context, path string) note.Signer {
if path == "" {
slog.ErrorContext(ctx, "Cosigner key path not specified")
os.Exit(1)
}
r, err := os.ReadFile(path)
if err != nil {
slog.ErrorContext(ctx, "Failed to read witness cosigner file", slog.String("path", *witnessSignerPath), slog.Any("error", err))
slog.ErrorContext(ctx, "Failed to read cosigner key", slog.String("path", path), slog.Any("error", err))
os.Exit(1)
}
s, err := fnote.NewSignerForCosignatureV1(string(r))
if err != nil {
slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", *witnessSignerPath), slog.Any("error", err))
slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", path), slog.Any("error", err))
os.Exit(1)
}
return []note.Signer{s}
return s
}


89 changes: 85 additions & 4 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io/fs"
"iter"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -313,6 +314,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
a.s.mu.Lock()
defer a.s.mu.Unlock()
unlock, err := a.s.lockFile(ctx, treeStateLock)
if err != nil {
panic(err)
Expand All @@ -321,7 +323,6 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
if err := unlock(); err != nil {
panic(err)
}
a.s.mu.Unlock()
}()

size, _, err := a.s.readTreeState(ctx)
Expand Down Expand Up @@ -538,6 +539,7 @@ func (a *appender) initialise(ctx context.Context) error {
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
a.s.mu.Lock()
defer a.s.mu.Unlock()
unlock, err := a.s.lockFile(ctx, treeStateLock)
if err != nil {
panic(err)
Expand All @@ -546,7 +548,6 @@ func (a *appender) initialise(ctx context.Context) error {
if err := unlock(); err != nil {
panic(err)
}
a.s.mu.Unlock()
}()

if err := a.s.ensureVersion(compatibilityVersion); err != nil {
Expand Down Expand Up @@ -996,6 +997,7 @@ func (m *MigrationStorage) initialise(ctx context.Context) error {
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
m.s.mu.Lock()
defer m.s.mu.Unlock()
unlock, err := m.s.lockFile(ctx, treeStateLock)
if err != nil {
panic(err)
Expand All @@ -1004,7 +1006,6 @@ func (m *MigrationStorage) initialise(ctx context.Context) error {
if err := unlock(); err != nil {
panic(err)
}
m.s.mu.Unlock()
}()

if err := m.s.ensureVersion(compatibilityVersion); err != nil {
Expand Down Expand Up @@ -1041,6 +1042,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
m.s.mu.Lock()
defer m.s.mu.Unlock()
unlock, err := m.s.lockFile(ctx, treeStateLock)
if err != nil {
panic(err)
Expand All @@ -1049,7 +1051,6 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
if err := unlock(); err != nil {
panic(err)
}
m.s.mu.Unlock()
}()

size, _, err := m.s.readTreeState(ctx)
Expand Down Expand Up @@ -1110,3 +1111,83 @@ func (m *MigrationStorage) fetchLeafHashes(ctx context.Context, from, to, source
}
return lh, nil
}


// MirrorWriter creates a new POSIX storage for the MirrorTarget lifecycle mode.
func (s *Storage) MirrorWriter(ctx context.Context, opts *tessera.MirrorOptions) (tessera.MirrorWriter, tessera.LogReader, error) {
r := &MirrorWriter{
s: s,
logStorage: &logResourceStorage{
entriesPath: opts.EntriesPath(),
s: s,
},
bundleHasher: opts.LeafHasher(),
}
if err := r.initialise(ctx); err != nil {
return nil, nil, err
}
return r, r.logStorage, nil
}

// MirrorWriter implements the tessera.MirrorWriter lifecycle contract.
type MirrorWriter struct {
s *Storage
logStorage *logResourceStorage
bundleHasher func(entryBundle []byte) ([][]byte, error)
curSize uint64
}

var _ tessera.MirrorWriter = &MirrorWriter{}

func (m *MirrorWriter) initialise(ctx context.Context) error {
// Idempotent: If folder exists, nothing happens.
if err := mkdirAll(filepath.Join(m.s.cfg.Path, stateDir), dirPerm); err != nil {
return fmt.Errorf("failed to create log directory: %q", err)
}
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
m.s.mu.Lock()
defer m.s.mu.Unlock()
unlock, err := m.s.lockFile(ctx, treeStateLock)
if err != nil {
// Oh dear, panic Mr Mainwaring!
panic(fmt.Errorf("failed to lock tree state: %v", err))
}
defer func() {
if err := unlock(); err != nil {
// Oh dear, panic Mr Mainwaring!
panic(fmt.Errorf("failed to unlock tree state: %v", err))
}
}()

if err := m.s.ensureVersion(compatibilityVersion); err != nil {
return err
}
curSize, _, err := m.s.readTreeState(ctx)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to load checkpoint for mirror: %v", err)
}
// Create the directory structure and write out an empty checkpoint
slog.InfoContext(ctx, "Initializing directory for POSIX mirror (this should only happen ONCE per mirror!)", slog.String("path", m.s.cfg.Path))
if err := m.s.writeTreeState(ctx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
return fmt.Errorf("failed to write tree-state checkpoint: %v", err)
}
return nil
}
m.curSize = curSize

return nil
}

// IntegrateBundles integrates a sequence of entry bundles into the tree, starting at the provided bundle index bundleIdx.
// Returns the new size and root hash of the tree if successful.
func (m *MirrorWriter) IntegrateBundles(ctx context.Context, bundleIdx uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
return 0, nil, errors.New("unimplemented")
}

func (m *MirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) {
sz, _, err := m.s.readTreeState(ctx)
return sz, err
}
Loading