Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/pkg/ingress/v1/network_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package v1
import (
"log"
"net"
"sync"

metrics "code.cloudfoundry.org/go-metric-registry"

gendiodes "code.cloudfoundry.org/go-diodes"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/diodes"
)

const bufferSize int = 65535

var packetPool = sync.Pool{
New: func() any {
return new([bufferSize]byte)
},
}

type ByteArrayWriter interface {
Write(message []byte)
}
Expand Down Expand Up @@ -58,17 +67,16 @@ func NewNetworkReader(
}

func (nr *NetworkReader) StartReading() {
readBuffer := make([]byte, 65535) //buffer with size = max theoretical UDP size
for {
readCount, _, err := nr.connection.ReadFrom(readBuffer)
bufPtr := packetPool.Get().(*[bufferSize]byte)
readCount, _, err := nr.connection.ReadFrom(bufPtr[:])
if err != nil {
log.Printf("Error while reading: %s", err)
packetPool.Put(bufPtr)
return
}
readData := make([]byte, readCount)
copy(readData, readBuffer[:readCount])

nr.buffer.Set(readData)
nr.buffer.Set(bufPtr[:readCount])
}
}

Expand All @@ -77,6 +85,11 @@ func (nr *NetworkReader) StartWriting() {
data := nr.buffer.Next()
nr.rxMsgCount(1)
nr.writer.Write(data)

if cap(data) == bufferSize {
fullSlice := data[:bufferSize]
packetPool.Put((*[bufferSize]byte)(fullSlice))
}
}
}

Expand Down
99 changes: 99 additions & 0 deletions src/pkg/ingress/v1/network_reader_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package v1_test

import (
"net"
"strconv"
"testing"

metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers"
ingress "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/v1"
)

type benchmarkWriter struct {
ch chan struct{}
}

func (w *benchmarkWriter) Write(p []byte) {
w.ch <- struct{}{}
}

func getFreePort(b *testing.B) int {
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
b.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
b.Fatal(err)
}
defer conn.Close()
_, addrPort, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
b.Fatal(err)
}
port, err := strconv.Atoi(addrPort)
if err != nil {
b.Fatal(err)
}
return port
}

func benchmarkNetworkReaderWithSize(b *testing.B, size int) {
port := getFreePort(b)
address := net.JoinHostPort("127.0.0.1", strconv.Itoa(port))

metricClient := metricsHelpers.NewMetricsRegistry()
writer := &benchmarkWriter{
ch: make(chan struct{}, 1),
}

reader, err := ingress.NewNetworkReader(address, writer, metricClient)
if err != nil {
b.Fatalf("failed to create network reader: %s", err)
}
defer reader.Stop()

go reader.StartReading()
go reader.StartWriting()

connection, err := net.Dial("udp", address)
if err != nil {
b.Fatalf("failed to dial: %s", err)
}
defer connection.Close()

packet := make([]byte, size)
for i := range packet {
packet[i] = 'a'
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := connection.Write(packet)
if err != nil {
b.Fatalf("failed to write: %s", err)
}
<-writer.ch
}
}

func BenchmarkNetworkReader_8KB(b *testing.B) {
benchmarkNetworkReaderWithSize(b, 8192)
}

func BenchmarkNetworkReader_16KB(b *testing.B) {
benchmarkNetworkReaderWithSize(b, 16384)
}

func BenchmarkNetworkReader_32KB(b *testing.B) {
benchmarkNetworkReaderWithSize(b, 32768)
}

func BenchmarkNetworkReader_48KB(b *testing.B) {
benchmarkNetworkReaderWithSize(b, 49152)
}

func BenchmarkNetworkReader_64KB(b *testing.B) {
benchmarkNetworkReaderWithSize(b, 65507) //65535 is the max UDP size, but we need to leave room for the headers: 20 for IPv4 and 8 for UDP

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch ;)

}
Loading