Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ GET /data?start=2024-01-01T00:00:00Z&stop=2024-01-02T00:00:00Z&fields=co2,lat,lo
}
```

---

### `GET /sensors`

Returns the list of sensor IDs known to the network (over roughly the last 30 days, per InfluxDB's `schema.tagValues` default).

Requires an API key passed as `Authorization: Bearer <key>` or `X-API-Key: <key>`.

#### JSON response

```
GET /sensors
```

```json
{
"sensors": ["a3f2...", "b91c...", "..."]
}
```

## Running locally

**Prerequisites:** [Go](https://go.dev/doc/install) 1.25+
Expand Down
88 changes: 88 additions & 0 deletions internal/sensors/sensors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package sensors

import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"strconv"

"github.com/Ribbit-Network/api/internal"
influxquery "github.com/influxdata/influxdb-client-go/v2/api/query"
)

var (
errQueryFailed = errors.New("query failed")
errQueryResult = errors.New("query result error")
)

type recordIterator interface {
Next() bool
Record() *influxquery.FluxRecord
}

var fetchSensors = func() ([]string, error) {
db := internal.NewDB()
defer db.Close()

res, err := db.Query(buildQuery(os.Getenv("INFLUXDB_BUCKET")))
if err != nil {
return nil, fmt.Errorf("%w: %v", errQueryFailed, err)
}
defer res.Close()

ids := collectIDs(res)
if err := res.Err(); err != nil {
return nil, fmt.Errorf("%w: %v", errQueryResult, err)
}
return ids, nil
}

func Handle(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

ids, err := fetchSensors()
if err != nil {
msg := "query failed"
if errors.Is(err, errQueryResult) {
msg = "query result error"
}
http.Error(w, msg, http.StatusInternalServerError)
return
}

writeJSON(w, ids)
}

func buildQuery(bucket string) string {
return fmt.Sprintf(`import "influxdata/influxdb/schema"
schema.tagValues(bucket: %s, tag: "host")`, strconv.Quote(bucket))
}
Comment thread
keenanjohnson marked this conversation as resolved.

func collectIDs(res recordIterator) []string {
ids := []string{}
for res.Next() {
if v, ok := res.Record().Value().(string); ok {
ids = append(ids, v)
}
}
return ids
}

func writeJSON(w http.ResponseWriter, ids []string) {
w.Header().Set("Content-Type", "application/json")
if ids == nil {
ids = []string{}
}
payload := struct {
Sensors []string `json:"sensors"`
}{Sensors: ids}
if err := json.NewEncoder(w).Encode(payload); err != nil {
log.Println("json encode error:", err)
}
}
145 changes: 145 additions & 0 deletions internal/sensors/sensors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package sensors

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

influxquery "github.com/influxdata/influxdb-client-go/v2/api/query"
"github.com/stretchr/testify/require"
)

func withFetchSensors(t *testing.T, fn func() ([]string, error)) {
t.Helper()
orig := fetchSensors
fetchSensors = fn
t.Cleanup(func() { fetchSensors = orig })
}

func TestHandle_MethodNotAllowed(t *testing.T) {
withFetchSensors(t, func() ([]string, error) {
t.Fatal("fetchSensors should not be called for non-GET")
return nil, nil
})

req := httptest.NewRequest(http.MethodPost, "/sensors", nil)
rec := httptest.NewRecorder()

Handle(rec, req)

require.Equal(t, http.StatusMethodNotAllowed, rec.Code)
}

func TestHandle_ReturnsJSON(t *testing.T) {
withFetchSensors(t, func() ([]string, error) {
return []string{"frog-01", "frog-02"}, nil
})

req := httptest.NewRequest(http.MethodGet, "/sensors", nil)
rec := httptest.NewRecorder()

Handle(rec, req)

require.Equal(t, http.StatusOK, rec.Code)
require.Equal(t, "application/json", rec.Header().Get("Content-Type"))

var payload struct {
Sensors []string `json:"sensors"`
}
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &payload))
require.Equal(t, []string{"frog-01", "frog-02"}, payload.Sensors)
}

func TestHandle_EmptyResult_ReturnsEmptyArray(t *testing.T) {
withFetchSensors(t, func() ([]string, error) {
return nil, nil
})

req := httptest.NewRequest(http.MethodGet, "/sensors", nil)
rec := httptest.NewRecorder()

Handle(rec, req)

require.Equal(t, http.StatusOK, rec.Code)
require.Contains(t, rec.Body.String(), `"sensors":[]`, "nil result must serialize as [], not null")
}

func TestHandle_FetchError_Returns500_QueryFailedBody(t *testing.T) {
withFetchSensors(t, func() ([]string, error) {
return nil, fmt.Errorf("%w: %v", errQueryFailed, errFake)
})

req := httptest.NewRequest(http.MethodGet, "/sensors", nil)
rec := httptest.NewRecorder()

Handle(rec, req)

require.Equal(t, http.StatusInternalServerError, rec.Code)
require.Equal(t, "query failed", strings.TrimSpace(rec.Body.String()))
}

func TestHandle_IteratorError_Returns500_QueryResultErrorBody(t *testing.T) {
withFetchSensors(t, func() ([]string, error) {
return nil, fmt.Errorf("%w: %v", errQueryResult, errFake)
})

req := httptest.NewRequest(http.MethodGet, "/sensors", nil)
rec := httptest.NewRecorder()

Handle(rec, req)

require.Equal(t, http.StatusInternalServerError, rec.Code)
require.Equal(t, "query result error", strings.TrimSpace(rec.Body.String()))
}

func TestBuildQuery(t *testing.T) {
got := buildQuery("frog_fleet")
require.Contains(t, got, `import "influxdata/influxdb/schema"`)
require.Contains(t, got, `schema.tagValues(bucket: "frog_fleet", tag: "host")`)
}

func TestBuildQuery_EscapesBucketName(t *testing.T) {
got := buildQuery(`weird"name`)
require.Contains(t, got, `schema.tagValues(bucket: "weird\"name", tag: "host")`,
"bucket names with quotes must be escaped, not interpolated raw")
}

type fakeIterator struct {
records []*influxquery.FluxRecord
idx int
}

func (f *fakeIterator) Next() bool {
if f.idx >= len(f.records) {
return false
}
f.idx++
return true
}

func (f *fakeIterator) Record() *influxquery.FluxRecord { return f.records[f.idx-1] }

func TestCollectIDs_PullsStringValues(t *testing.T) {
it := &fakeIterator{records: []*influxquery.FluxRecord{
influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-01"}),
influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-02"}),
}}

require.Equal(t, []string{"frog-01", "frog-02"}, collectIDs(it))
}

func TestCollectIDs_SkipsNonStringValues(t *testing.T) {
it := &fakeIterator{records: []*influxquery.FluxRecord{
influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-01"}),
influxquery.NewFluxRecord(0, map[string]interface{}{"_value": 42}),
influxquery.NewFluxRecord(0, map[string]interface{}{"_value": nil}),
}}

require.Equal(t, []string{"frog-01"}, collectIDs(it))
}

var errFake = errors.New("boom")
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Ribbit-Network/api/internal/auth"
"github.com/Ribbit-Network/api/internal/data"
"github.com/Ribbit-Network/api/internal/ratelimit"
"github.com/Ribbit-Network/api/internal/sensors"
"github.com/joho/godotenv"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -46,6 +47,7 @@ func runServer() {
mux.HandleFunc("/", handleRoot)
mux.HandleFunc("/healthz", handleHealthz)
mux.Handle("/data", requireKey(limiter.Middleware(http.HandlerFunc(data.Handle))))
mux.Handle("/sensors", requireKey(limiter.Middleware(http.HandlerFunc(sensors.Handle))))

port := os.Getenv("PORT")
if port == "" {
Expand Down
40 changes: 40 additions & 0 deletions scripts/list_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""Fetch the list of sensor IDs from the Ribbit API."""

import argparse
import json
import sys
import urllib.error
import urllib.request

DEFAULT_BASE_URL = "https://ribbit-api.fly.dev"


def list_sensors(base_url, api_key, timeout=30):
req = urllib.request.Request(
f"{base_url.rstrip('/')}/sensors",
headers={"Authorization": f"Bearer {api_key}"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))["sensors"]


def main():
parser = argparse.ArgumentParser(description="List sensor IDs from the Ribbit API.")
parser.add_argument("--api-key", required=True, help="API key for /sensors")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help=f"API base URL (default: {DEFAULT_BASE_URL})")
args = parser.parse_args()

try:
sensors = list_sensors(args.base_url, args.api_key)
except urllib.error.HTTPError as e:
print(f"HTTP {e.code}: {e.read().decode('utf-8', errors='replace').strip()}", file=sys.stderr)
sys.exit(1)

for sensor_id in sensors:
print(sensor_id)
print(f"\n{len(sensors)} sensor(s)", file=sys.stderr)


if __name__ == "__main__":
main()
Loading