// Package ws is the WebSocket fan-out for SDP. Three flows: // // agent --(events)--> /ws/agent (one conn per agent) // dashboard client --(subscribe)-> /ws/deployments/{id} (one conn per viewer) // control plane --(RPC)----> /ws/agent (request/response, sync) // // On agent connect we record the agent's nodeID. On dashboard connect we // register a subscriber for one deployment. Events are best-effort fanned out; // a slow client is dropped, not allowed to backpressure the agent. The RPC // flow is request/response with a per-call channel indexed by a unique id. package ws import ( "context" cryptorand "crypto/rand" "encoding/hex" "encoding/json" "errors" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/sdp/protocol" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, // ponytail: internal tool, allow all } // Hub is the central registry of agent connections and dashboard subscribers. type Hub struct { mu sync.RWMutex // one channel per deployment; nil = no subscribers. subs map[string]map[chan protocol.Event]struct{} // one channel per connected agent; keyed by nodeID. agents map[string]chan<- []byte // outbound to agent (deploy requests, etc.) // pending RPCs keyed by request id. Each value is a channel that // receives the agent's reply. The agent's inbound reader delivers // replies here. pending map[string]chan []byte } func New() *Hub { return &Hub{ subs: make(map[string]map[chan protocol.Event]struct{}), agents: make(map[string]chan<- []byte), pending: make(map[string]chan []byte), } } // Publish fans an event out to all subscribers of that deployment. // Non-blocking; drops the event for a subscriber if its buffer is full. func (h *Hub) Publish(e protocol.Event) { h.mu.RLock() subs := h.subs[e.DeploymentID] chans := make([]chan protocol.Event, 0, len(subs)) for c := range subs { chans = append(chans, c) } h.mu.RUnlock() for _, c := range chans { select { case c <- e: default: // ponytail: drop slow subscriber; they'll reconnect. } } } // Subscribe registers a subscriber channel for one deployment. // Returns an unsubscribe func the caller must invoke. func (h *Hub) Subscribe(deploymentID string) (chan protocol.Event, func()) { ch := make(chan protocol.Event, 64) h.mu.Lock() if h.subs[deploymentID] == nil { h.subs[deploymentID] = make(map[chan protocol.Event]struct{}) } h.subs[deploymentID][ch] = struct{}{} h.mu.Unlock() return ch, func() { h.mu.Lock() if subs, ok := h.subs[deploymentID]; ok { delete(subs, ch) if len(subs) == 0 { delete(h.subs, deploymentID) } } h.mu.Unlock() close(ch) } } // RegisterAgent stores the outbound channel for a node. Used when the control // plane wants to send something back to an agent (deploy requests, RPCs, etc.). func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) { h.mu.Lock() h.agents[nodeID] = out h.mu.Unlock() } func (h *Hub) UnregisterAgent(nodeID string) { h.mu.Lock() delete(h.agents, nodeID) // also fail any pending RPCs. toFail := make([]chan []byte, 0, len(h.pending)) for _, ch := range h.pending { toFail = append(toFail, ch) } h.pending = make(map[string]chan []byte) h.mu.Unlock() for _, ch := range toFail { select { case ch <- nil: default: } } } // SendToAgent best-effort sends a payload to a connected agent. Returns false // if the agent isn't connected or its buffer is full. func (h *Hub) SendToAgent(nodeID string, payload []byte) bool { h.mu.RLock() ch, ok := h.agents[nodeID] h.mu.RUnlock() if !ok { return false } select { case ch <- payload: return true default: return false } } // CallAgent is a request/response round-trip over the agent's WS. It sends // {op, id, data} and waits for a {op:"reply", id, ...} back. The raw // reply JSON is returned; the caller unmarshals into the concrete type. // // Errors: ErrAgentOffline, ErrRPCTimeout, or a delivery failure. func (h *Hub) CallAgent(ctx context.Context, nodeID, op string, data interface{}, timeout time.Duration) ([]byte, error) { id := newRPCID() reply := make(chan []byte, 1) h.mu.Lock() h.pending[id] = reply h.mu.Unlock() defer func() { h.mu.Lock() delete(h.pending, id) h.mu.Unlock() }() frame := map[string]interface{}{"op": op, "id": id, "data": data} payload, err := json.Marshal(frame) if err != nil { return nil, err } if !h.SendToAgent(nodeID, payload) { return nil, ErrAgentOffline } if timeout <= 0 { timeout = 10 * time.Second } timer := time.NewTimer(timeout) defer timer.Stop() select { case raw := <-reply: if raw == nil { return nil, ErrAgentOffline } return raw, nil case <-timer.C: return nil, ErrRPCTimeout case <-ctx.Done(): return nil, ctx.Err() } } // DeliverAgentReply is called from the agent's inbound reader when a // frame with op="reply" arrives. It looks up the pending call by id and // hands the payload over. func (h *Hub) DeliverAgentReply(id string, raw []byte) { h.mu.RLock() ch, ok := h.pending[id] h.mu.RUnlock() if !ok { return } select { case ch <- raw: default: } } var ( ErrAgentOffline = errors.New("agent offline") ErrRPCTimeout = errors.New("rpc timeout") ) func newRPCID() string { var b [8]byte _, _ = cryptorand.Read(b[:]) return hex.EncodeToString(b[:]) }