package ws import ( "encoding/json" "log" "net/http" "strings" "github.com/gorilla/websocket" "github.com/sdp/protocol" "github.com/sdp/control-plane/internal/store" ) // AgentWS handles /ws/agent?node=. The agent pushes events; the control // plane can send requests back over the same socket. func (h *Hub) AgentWS(st *store.Store, onConnect, onDisconnect func(nodeID string)) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { nodeID := r.URL.Query().Get("node") if nodeID == "" { http.Error(w, "missing node query", http.StatusBadRequest) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() // outbound to agent: buffered; if the agent stalls we drop rather than block. out := make(chan []byte, 32) h.RegisterAgent(nodeID, out) defer func() { h.UnregisterAgent(nodeID) close(out) // unblock the writer goroutine }() if onConnect != nil { onConnect(nodeID) } defer func() { if onDisconnect != nil { onDisconnect(nodeID) } }() // writer goroutine: drains out to the agent go func() { for msg := range out { if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { return } } }() // reader loop: agent -> control plane for { _, raw, err := conn.ReadMessage() if err != nil { return } var e protocol.Event if err := json.Unmarshal(raw, &e); err != nil { log.Printf("agent %s: bad event: %v", nodeID, err) continue } _ = st.AppendEvent(e) // ponytail: best-effort persist h.Publish(e) } } } // DeploymentWS handles /ws/deployments/{id}. Dashboard subscribes; we send // a tail of the existing log first, then live events. func (h *Hub) DeploymentWS(st *store.Store) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := strings.TrimPrefix(r.URL.Path, "/ws/deployments/") if id == "" { http.Error(w, "missing deployment id", http.StatusBadRequest) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() // backfill: send last N log lines as synthetic events. lines, _ := st.TailLogs(id, 200) for _, ln := range lines { msg := map[string]any{ "deploymentId": id, "kind": "log", "line": ln, "at": int64(0), "backfill": true, } b, _ := json.Marshal(msg) if err := conn.WriteMessage(websocket.TextMessage, b); err != nil { return } } ch, unsub := h.Subscribe(id) defer unsub() for e := range ch { b, _ := json.Marshal(e) if err := conn.WriteMessage(websocket.TextMessage, b); err != nil { return } } } }