Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/OCAP2/web
go 1.26.3

require (
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04
github.com/getkin/kin-openapi v0.138.0
github.com/go-fuego/fuego v0.19.0
github.com/golang-jwt/jwt/v5 v5.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04 h1:D+cP4mCigM7dCgPwcS+YcJPbE/tH2BqB6+85N9WbcIs=
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04/go.mod h1:vhxiM92vYBjJ/J6zrwbYDiaPrUNwMRyWZ05duCGjHXg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
Expand Down
173 changes: 173 additions & 0 deletions internal/ingestion/chunk_flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package ingestion

import (
"fmt"
"os"
"path/filepath"
"sort"

"google.golang.org/protobuf/proto"

pbv2 "github.com/OCAP2/web/pkg/schemas/protobuf/v2"
)

// ChunkFlusher writes v2 protobuf chunks incrementally during streaming.
// Every chunkSize frames, accumulated states are flushed to disk.
// All methods are called from a single goroutine (the WebSocket read loop).
type ChunkFlusher struct {
chunksDir string
chunkSize uint32
currentChunkIdx uint32
chunkStartFrame uint32

// Buffered states keyed by frame number.
soldierStates map[uint32][]*pbv2.SoldierState
vehicleStates map[uint32][]*pbv2.VehicleState

flushedChunks uint32
}

// NewChunkFlusher creates a flusher that writes chunks of chunkSize frames
// to the given output directory.
func NewChunkFlusher(outputDir string, chunkSize uint32) (*ChunkFlusher, error) {
chunksDir := filepath.Join(outputDir, "chunks")
if err := os.MkdirAll(chunksDir, 0755); err != nil {
return nil, fmt.Errorf("create chunks dir: %w", err)
}
return &ChunkFlusher{
chunksDir: chunksDir,
chunkSize: chunkSize,
soldierStates: make(map[uint32][]*pbv2.SoldierState),
vehicleStates: make(map[uint32][]*pbv2.VehicleState),
}, nil
}

// AddSoldierState buffers a soldier state and flushes if a chunk boundary is crossed.
func (cf *ChunkFlusher) AddSoldierState(frameNum uint32, state *pbv2.SoldierState) error {
cf.soldierStates[frameNum] = append(cf.soldierStates[frameNum], state)
return cf.maybeFlush(frameNum)
}

// AddVehicleState buffers a vehicle state and flushes if a chunk boundary is crossed.
func (cf *ChunkFlusher) AddVehicleState(frameNum uint32, state *pbv2.VehicleState) error {
cf.vehicleStates[frameNum] = append(cf.vehicleStates[frameNum], state)
return cf.maybeFlush(frameNum)
}

// Flush writes any remaining buffered frames as the final chunk.
func (cf *ChunkFlusher) Flush() error {
if len(cf.soldierStates) == 0 && len(cf.vehicleStates) == 0 {
return nil
}
return cf.writeCurrentChunk()
}

// ChunkCount returns the total number of chunks written (including pending).
func (cf *ChunkFlusher) ChunkCount() uint32 {
return cf.flushedChunks
}

func (cf *ChunkFlusher) maybeFlush(frameNum uint32) error {
// Check if this frame crosses the next chunk boundary.
chunkEnd := cf.chunkStartFrame + cf.chunkSize
if frameNum >= chunkEnd {
return cf.writeCurrentChunk()
}
return nil
}

func (cf *ChunkFlusher) writeCurrentChunk() error {
// Collect all frame numbers in this chunk.
frameSet := make(map[uint32]bool)
for f := range cf.soldierStates {
frameSet[f] = true
}
for f := range cf.vehicleStates {
frameSet[f] = true
}

if len(frameSet) == 0 {
return nil
}

// Sort frame numbers.
frames := make([]uint32, 0, len(frameSet))
for f := range frameSet {
frames = append(frames, f)
}
sort.Slice(frames, func(i, j int) bool { return frames[i] < frames[j] })

// Determine which frames belong to the current chunk vs next.
chunkEnd := cf.chunkStartFrame + cf.chunkSize
var currentFrames, nextFrames []uint32
for _, f := range frames {
if f < chunkEnd {
currentFrames = append(currentFrames, f)
} else {
nextFrames = append(nextFrames, f)
}
}

// Build and write the current chunk from currentFrames.
if len(currentFrames) > 0 {
chunk := cf.buildChunk(cf.currentChunkIdx, cf.chunkStartFrame, currentFrames)
if err := cf.writeChunkFile(chunk); err != nil {
return err
}

// Remove flushed frames from buffers.
for _, f := range currentFrames {
delete(cf.soldierStates, f)
delete(cf.vehicleStates, f)
}

cf.flushedChunks++
cf.currentChunkIdx++
cf.chunkStartFrame = chunkEnd
}

// If we have frames that spill into the next chunk, check again.
// (Typically only one chunk boundary is crossed per state message.)
if len(nextFrames) > 0 {
// The next frames are already in the buffer, check if they cross another boundary.
maxNext := nextFrames[len(nextFrames)-1]
if maxNext >= cf.chunkStartFrame+cf.chunkSize {
return cf.writeCurrentChunk()
}
}

return nil
}

func (cf *ChunkFlusher) buildChunk(idx, startFrame uint32, frameNums []uint32) *pbv2.Chunk {
chunk := &pbv2.Chunk{
Index: idx,
StartFrame: startFrame,
FrameCount: cf.chunkSize,
}

for _, fn := range frameNums {
frame := &pbv2.Frame{
FrameNum: fn,
Soldiers: cf.soldierStates[fn],
Vehicles: cf.vehicleStates[fn],
}
chunk.Frames = append(chunk.Frames, frame)
}

return chunk
}

func (cf *ChunkFlusher) writeChunkFile(chunk *pbv2.Chunk) error {
data, err := proto.Marshal(chunk)
if err != nil {
return fmt.Errorf("marshal chunk %d: %w", chunk.Index, err)
}

path := filepath.Join(cf.chunksDir, fmt.Sprintf("%04d.pb", chunk.Index))
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("write chunk %d: %w", chunk.Index, err)
}

return nil
}
120 changes: 120 additions & 0 deletions internal/ingestion/chunk_flusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package ingestion

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

pbv2 "github.com/OCAP2/web/pkg/schemas/protobuf/v2"
)

func TestChunkFlusher_FlushesAtBoundary(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 3) // chunk size = 3 frames
require.NoError(t, err)

// Add states for frames 0, 1, 2 (fills chunk 0).
for frame := uint32(0); frame < 3; frame++ {
err := cf.AddSoldierState(frame, &pbv2.SoldierState{
Id: 1,
Bearing: frame * 10,
Position: &pbv2.Position3D{X: float32(frame), Y: 0, Z: 0},
})
require.NoError(t, err)
}

// Frame 3 crosses boundary → chunk 0 should be flushed.
err = cf.AddSoldierState(3, &pbv2.SoldierState{Id: 1, Bearing: 30})
require.NoError(t, err)
assert.Equal(t, uint32(1), cf.ChunkCount())

// Verify chunk 0 file exists and is valid.
chunkPath := filepath.Join(dir, "chunks", "0000.pb")
data, err := os.ReadFile(chunkPath)
require.NoError(t, err)

var chunk pbv2.Chunk
require.NoError(t, proto.Unmarshal(data, &chunk))
assert.Equal(t, uint32(0), chunk.Index)
assert.Equal(t, uint32(0), chunk.StartFrame)
assert.Equal(t, uint32(3), chunk.FrameCount)
assert.Len(t, chunk.Frames, 3)

// Flush remaining (frame 3).
require.NoError(t, cf.Flush())
assert.Equal(t, uint32(2), cf.ChunkCount())

chunkPath1 := filepath.Join(dir, "chunks", "0001.pb")
data1, err := os.ReadFile(chunkPath1)
require.NoError(t, err)

var chunk1 pbv2.Chunk
require.NoError(t, proto.Unmarshal(data1, &chunk1))
assert.Equal(t, uint32(1), chunk1.Index)
assert.Len(t, chunk1.Frames, 1)
}

func TestChunkFlusher_VehicleStates(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 5)
require.NoError(t, err)

for frame := uint32(0); frame < 4; frame++ {
err := cf.AddVehicleState(frame, &pbv2.VehicleState{
Id: 10,
Fuel: 0.8,
})
require.NoError(t, err)
}
assert.Equal(t, uint32(0), cf.ChunkCount()) // Not yet flushed.

require.NoError(t, cf.Flush())
assert.Equal(t, uint32(1), cf.ChunkCount())
}

func TestChunkFlusher_EmptyFlush(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 10)
require.NoError(t, err)

// Flushing with no data should succeed silently.
require.NoError(t, cf.Flush())
assert.Equal(t, uint32(0), cf.ChunkCount())
}

func TestChunkFlusher_MixedSoldierVehicle(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 2)
require.NoError(t, err)

// Frame 0: soldier + vehicle.
require.NoError(t, cf.AddSoldierState(0, &pbv2.SoldierState{Id: 1}))
require.NoError(t, cf.AddVehicleState(0, &pbv2.VehicleState{Id: 10}))

// Frame 1: only soldier.
require.NoError(t, cf.AddSoldierState(1, &pbv2.SoldierState{Id: 1}))

// Frame 2 crosses boundary.
require.NoError(t, cf.AddSoldierState(2, &pbv2.SoldierState{Id: 1}))
assert.Equal(t, uint32(1), cf.ChunkCount())

// Verify chunk 0 has both soldiers and vehicles.
data, err := os.ReadFile(filepath.Join(dir, "chunks", "0000.pb"))
require.NoError(t, err)

var chunk pbv2.Chunk
require.NoError(t, proto.Unmarshal(data, &chunk))
assert.Len(t, chunk.Frames, 2) // frames 0 and 1

// Frame 0 has both soldier and vehicle.
assert.Len(t, chunk.Frames[0].Soldiers, 1)
assert.Len(t, chunk.Frames[0].Vehicles, 1)

// Frame 1 has only soldier.
assert.Len(t, chunk.Frames[1].Soldiers, 1)
assert.Len(t, chunk.Frames[1].Vehicles, 0)
}
Loading
Loading