diff --git a/README.md b/README.md index 15e5c78..5b9e8e3 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,26 @@ GET /data?start=2024-01-01T00:00:00Z&stop=2024-01-02T00:00:00Z&fields=co2,lat,lo } ``` +--- + +### `GET /sensors` + +Returns the list of sensor IDs known to the network (over roughly the last 30 days, per InfluxDB's `schema.tagValues` default). + +Requires an API key passed as `Authorization: Bearer ` or `X-API-Key: `. + +#### JSON response + +``` +GET /sensors +``` + +```json +{ + "sensors": ["a3f2...", "b91c...", "..."] +} +``` + ## Running locally **Prerequisites:** [Go](https://go.dev/doc/install) 1.25+ diff --git a/internal/sensors/sensors.go b/internal/sensors/sensors.go new file mode 100644 index 0000000..317a7bb --- /dev/null +++ b/internal/sensors/sensors.go @@ -0,0 +1,88 @@ +package sensors + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "strconv" + + "github.com/Ribbit-Network/api/internal" + influxquery "github.com/influxdata/influxdb-client-go/v2/api/query" +) + +var ( + errQueryFailed = errors.New("query failed") + errQueryResult = errors.New("query result error") +) + +type recordIterator interface { + Next() bool + Record() *influxquery.FluxRecord +} + +var fetchSensors = func() ([]string, error) { + db := internal.NewDB() + defer db.Close() + + res, err := db.Query(buildQuery(os.Getenv("INFLUXDB_BUCKET"))) + if err != nil { + return nil, fmt.Errorf("%w: %v", errQueryFailed, err) + } + defer res.Close() + + ids := collectIDs(res) + if err := res.Err(); err != nil { + return nil, fmt.Errorf("%w: %v", errQueryResult, err) + } + return ids, nil +} + +func Handle(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + ids, err := fetchSensors() + if err != nil { + msg := "query failed" + if errors.Is(err, errQueryResult) { + msg = "query result error" + } + http.Error(w, msg, http.StatusInternalServerError) + return + } + + writeJSON(w, ids) +} + +func buildQuery(bucket string) string { + return fmt.Sprintf(`import "influxdata/influxdb/schema" +schema.tagValues(bucket: %s, tag: "host")`, strconv.Quote(bucket)) +} + +func collectIDs(res recordIterator) []string { + ids := []string{} + for res.Next() { + if v, ok := res.Record().Value().(string); ok { + ids = append(ids, v) + } + } + return ids +} + +func writeJSON(w http.ResponseWriter, ids []string) { + w.Header().Set("Content-Type", "application/json") + if ids == nil { + ids = []string{} + } + payload := struct { + Sensors []string `json:"sensors"` + }{Sensors: ids} + if err := json.NewEncoder(w).Encode(payload); err != nil { + log.Println("json encode error:", err) + } +} diff --git a/internal/sensors/sensors_test.go b/internal/sensors/sensors_test.go new file mode 100644 index 0000000..dcb74dd --- /dev/null +++ b/internal/sensors/sensors_test.go @@ -0,0 +1,145 @@ +package sensors + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + influxquery "github.com/influxdata/influxdb-client-go/v2/api/query" + "github.com/stretchr/testify/require" +) + +func withFetchSensors(t *testing.T, fn func() ([]string, error)) { + t.Helper() + orig := fetchSensors + fetchSensors = fn + t.Cleanup(func() { fetchSensors = orig }) +} + +func TestHandle_MethodNotAllowed(t *testing.T) { + withFetchSensors(t, func() ([]string, error) { + t.Fatal("fetchSensors should not be called for non-GET") + return nil, nil + }) + + req := httptest.NewRequest(http.MethodPost, "/sensors", nil) + rec := httptest.NewRecorder() + + Handle(rec, req) + + require.Equal(t, http.StatusMethodNotAllowed, rec.Code) +} + +func TestHandle_ReturnsJSON(t *testing.T) { + withFetchSensors(t, func() ([]string, error) { + return []string{"frog-01", "frog-02"}, nil + }) + + req := httptest.NewRequest(http.MethodGet, "/sensors", nil) + rec := httptest.NewRecorder() + + Handle(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Equal(t, "application/json", rec.Header().Get("Content-Type")) + + var payload struct { + Sensors []string `json:"sensors"` + } + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &payload)) + require.Equal(t, []string{"frog-01", "frog-02"}, payload.Sensors) +} + +func TestHandle_EmptyResult_ReturnsEmptyArray(t *testing.T) { + withFetchSensors(t, func() ([]string, error) { + return nil, nil + }) + + req := httptest.NewRequest(http.MethodGet, "/sensors", nil) + rec := httptest.NewRecorder() + + Handle(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"sensors":[]`, "nil result must serialize as [], not null") +} + +func TestHandle_FetchError_Returns500_QueryFailedBody(t *testing.T) { + withFetchSensors(t, func() ([]string, error) { + return nil, fmt.Errorf("%w: %v", errQueryFailed, errFake) + }) + + req := httptest.NewRequest(http.MethodGet, "/sensors", nil) + rec := httptest.NewRecorder() + + Handle(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Equal(t, "query failed", strings.TrimSpace(rec.Body.String())) +} + +func TestHandle_IteratorError_Returns500_QueryResultErrorBody(t *testing.T) { + withFetchSensors(t, func() ([]string, error) { + return nil, fmt.Errorf("%w: %v", errQueryResult, errFake) + }) + + req := httptest.NewRequest(http.MethodGet, "/sensors", nil) + rec := httptest.NewRecorder() + + Handle(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Equal(t, "query result error", strings.TrimSpace(rec.Body.String())) +} + +func TestBuildQuery(t *testing.T) { + got := buildQuery("frog_fleet") + require.Contains(t, got, `import "influxdata/influxdb/schema"`) + require.Contains(t, got, `schema.tagValues(bucket: "frog_fleet", tag: "host")`) +} + +func TestBuildQuery_EscapesBucketName(t *testing.T) { + got := buildQuery(`weird"name`) + require.Contains(t, got, `schema.tagValues(bucket: "weird\"name", tag: "host")`, + "bucket names with quotes must be escaped, not interpolated raw") +} + +type fakeIterator struct { + records []*influxquery.FluxRecord + idx int +} + +func (f *fakeIterator) Next() bool { + if f.idx >= len(f.records) { + return false + } + f.idx++ + return true +} + +func (f *fakeIterator) Record() *influxquery.FluxRecord { return f.records[f.idx-1] } + +func TestCollectIDs_PullsStringValues(t *testing.T) { + it := &fakeIterator{records: []*influxquery.FluxRecord{ + influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-01"}), + influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-02"}), + }} + + require.Equal(t, []string{"frog-01", "frog-02"}, collectIDs(it)) +} + +func TestCollectIDs_SkipsNonStringValues(t *testing.T) { + it := &fakeIterator{records: []*influxquery.FluxRecord{ + influxquery.NewFluxRecord(0, map[string]interface{}{"_value": "frog-01"}), + influxquery.NewFluxRecord(0, map[string]interface{}{"_value": 42}), + influxquery.NewFluxRecord(0, map[string]interface{}{"_value": nil}), + }} + + require.Equal(t, []string{"frog-01"}, collectIDs(it)) +} + +var errFake = errors.New("boom") diff --git a/main.go b/main.go index f1a9a3b..d59e7c7 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "github.com/Ribbit-Network/api/internal/auth" "github.com/Ribbit-Network/api/internal/data" "github.com/Ribbit-Network/api/internal/ratelimit" + "github.com/Ribbit-Network/api/internal/sensors" "github.com/joho/godotenv" "golang.org/x/time/rate" ) @@ -46,6 +47,7 @@ func runServer() { mux.HandleFunc("/", handleRoot) mux.HandleFunc("/healthz", handleHealthz) mux.Handle("/data", requireKey(limiter.Middleware(http.HandlerFunc(data.Handle)))) + mux.Handle("/sensors", requireKey(limiter.Middleware(http.HandlerFunc(sensors.Handle)))) port := os.Getenv("PORT") if port == "" { diff --git a/scripts/list_sensors.py b/scripts/list_sensors.py new file mode 100755 index 0000000..1af95f5 --- /dev/null +++ b/scripts/list_sensors.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +"""Fetch the list of sensor IDs from the Ribbit API.""" + +import argparse +import json +import sys +import urllib.error +import urllib.request + +DEFAULT_BASE_URL = "https://ribbit-api.fly.dev" + + +def list_sensors(base_url, api_key, timeout=30): + req = urllib.request.Request( + f"{base_url.rstrip('/')}/sensors", + headers={"Authorization": f"Bearer {api_key}"}, + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8"))["sensors"] + + +def main(): + parser = argparse.ArgumentParser(description="List sensor IDs from the Ribbit API.") + parser.add_argument("--api-key", required=True, help="API key for /sensors") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help=f"API base URL (default: {DEFAULT_BASE_URL})") + args = parser.parse_args() + + try: + sensors = list_sensors(args.base_url, args.api_key) + except urllib.error.HTTPError as e: + print(f"HTTP {e.code}: {e.read().decode('utf-8', errors='replace').strip()}", file=sys.stderr) + sys.exit(1) + + for sensor_id in sensors: + print(sensor_id) + print(f"\n{len(sensors)} sensor(s)", file=sys.stderr) + + +if __name__ == "__main__": + main()