Files
bri-sandbox-development-pla…/control-plane/internal/ws/hub.go
T
Achmad 55d7705c63 Slice 2: real auth, agent-mediated repo/branch listing, deployment list from SQLite
- protocol: add RepoInfo, RouteOverride; add HostPort, SandboxID to DeployRequest.
- ws hub: add CallAgent for sync request/response RPCs over the agent WS,
  and DeliverAgentReply to route {op:reply} frames back to the caller.
  UnregisterAgent now also fails any pending RPCs so callers don't hang.
- agent-micro: new op handlers list_repos, list_branches, probe.
  Wire protocol.Event frames use json.RawMessage so each op decodes
  its own data shape.
- agent-gateway: same op handlers (list_repos/list_branches/probe) plus
  push_routes, which the gateway uses to rewrite the api-gateway
  config.php. Detailed in a later commit.
- control-plane login: validateViaAgent now calls CallAgent('probe')
  against the gateway agent (git ls-remote), replacing the
  accept-any-creds stub.
- control-plane repos: handleListRepos and handleListBranches forward
  to the agents via list_repos / list_branches RPCs, replacing the
  hardcoded fixtures.
- control-plane deployments: split into its own file. handleListDeployments
  reads from SQLite (was hardcoded []). handleCreateDeployment now
  supports sandbox-scoped deploys with a host port + env merge.
  handleStopDeployment looks up the node from the deployment row.
- store: split into store.go + deployments.go. The Deployment type
  adds sandboxId, containerId, hostPort. StartDeploymentInSandbox,
  SetContainerID, ListDeployments, GetDeployment, LatestDeploymentBySandboxService
  are new.
- store_test.go: round-trips every Slice-2 path (env, sandbox,
  template, clone, routes, deployment).
- .gitignore: track bin/ — the build runs on a separate Linux box
  with the golang:1.24 toolchain, and the binaries are SCPed from
  there to the company VMs (92 / 186). The VMs have no internet.
- Tracked bin/{control-plane,agent-micro,agent-gateway}.
2026-06-24 03:58:53 +00:00

211 lines
5.3 KiB
Go

// Package ws is the WebSocket fan-out for SDP. Three flows:
//
// agent --(events)--> /ws/agent (one conn per agent)
// dashboard client --(subscribe)-> /ws/deployments/{id} (one conn per viewer)
// control plane --(RPC)----> /ws/agent (request/response, sync)
//
// On agent connect we record the agent's nodeID. On dashboard connect we
// register a subscriber for one deployment. Events are best-effort fanned out;
// a slow client is dropped, not allowed to backpressure the agent. The RPC
// flow is request/response with a per-call channel indexed by a unique id.
package ws
import (
"context"
cryptorand "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/sdp/protocol"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true }, // ponytail: internal tool, allow all
}
// Hub is the central registry of agent connections and dashboard subscribers.
type Hub struct {
mu sync.RWMutex
// one channel per deployment; nil = no subscribers.
subs map[string]map[chan protocol.Event]struct{}
// one channel per connected agent; keyed by nodeID.
agents map[string]chan<- []byte // outbound to agent (deploy requests, etc.)
// pending RPCs keyed by request id. Each value is a channel that
// receives the agent's reply. The agent's inbound reader delivers
// replies here.
pending map[string]chan []byte
}
func New() *Hub {
return &Hub{
subs: make(map[string]map[chan protocol.Event]struct{}),
agents: make(map[string]chan<- []byte),
pending: make(map[string]chan []byte),
}
}
// Publish fans an event out to all subscribers of that deployment.
// Non-blocking; drops the event for a subscriber if its buffer is full.
func (h *Hub) Publish(e protocol.Event) {
h.mu.RLock()
subs := h.subs[e.DeploymentID]
chans := make([]chan protocol.Event, 0, len(subs))
for c := range subs {
chans = append(chans, c)
}
h.mu.RUnlock()
for _, c := range chans {
select {
case c <- e:
default:
// ponytail: drop slow subscriber; they'll reconnect.
}
}
}
// Subscribe registers a subscriber channel for one deployment.
// Returns an unsubscribe func the caller must invoke.
func (h *Hub) Subscribe(deploymentID string) (chan protocol.Event, func()) {
ch := make(chan protocol.Event, 64)
h.mu.Lock()
if h.subs[deploymentID] == nil {
h.subs[deploymentID] = make(map[chan protocol.Event]struct{})
}
h.subs[deploymentID][ch] = struct{}{}
h.mu.Unlock()
return ch, func() {
h.mu.Lock()
if subs, ok := h.subs[deploymentID]; ok {
delete(subs, ch)
if len(subs) == 0 {
delete(h.subs, deploymentID)
}
}
h.mu.Unlock()
close(ch)
}
}
// RegisterAgent stores the outbound channel for a node. Used when the control
// plane wants to send something back to an agent (deploy requests, RPCs, etc.).
func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) {
h.mu.Lock()
h.agents[nodeID] = out
h.mu.Unlock()
}
func (h *Hub) UnregisterAgent(nodeID string) {
h.mu.Lock()
delete(h.agents, nodeID)
// also fail any pending RPCs.
toFail := make([]chan []byte, 0, len(h.pending))
for _, ch := range h.pending {
toFail = append(toFail, ch)
}
h.pending = make(map[string]chan []byte)
h.mu.Unlock()
for _, ch := range toFail {
select {
case ch <- nil:
default:
}
}
}
// SendToAgent best-effort sends a payload to a connected agent. Returns false
// if the agent isn't connected or its buffer is full.
func (h *Hub) SendToAgent(nodeID string, payload []byte) bool {
h.mu.RLock()
ch, ok := h.agents[nodeID]
h.mu.RUnlock()
if !ok {
return false
}
select {
case ch <- payload:
return true
default:
return false
}
}
// CallAgent is a request/response round-trip over the agent's WS. It sends
// {op, id, data} and waits for a {op:"reply", id, ...} back. The raw
// reply JSON is returned; the caller unmarshals into the concrete type.
//
// Errors: ErrAgentOffline, ErrRPCTimeout, or a delivery failure.
func (h *Hub) CallAgent(ctx context.Context, nodeID, op string, data interface{}, timeout time.Duration) ([]byte, error) {
id := newRPCID()
reply := make(chan []byte, 1)
h.mu.Lock()
h.pending[id] = reply
h.mu.Unlock()
defer func() {
h.mu.Lock()
delete(h.pending, id)
h.mu.Unlock()
}()
frame := map[string]interface{}{"op": op, "id": id, "data": data}
payload, err := json.Marshal(frame)
if err != nil {
return nil, err
}
if !h.SendToAgent(nodeID, payload) {
return nil, ErrAgentOffline
}
if timeout <= 0 {
timeout = 10 * time.Second
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case raw := <-reply:
if raw == nil {
return nil, ErrAgentOffline
}
return raw, nil
case <-timer.C:
return nil, ErrRPCTimeout
case <-ctx.Done():
return nil, ctx.Err()
}
}
// DeliverAgentReply is called from the agent's inbound reader when a
// frame with op="reply" arrives. It looks up the pending call by id and
// hands the payload over.
func (h *Hub) DeliverAgentReply(id string, raw []byte) {
h.mu.RLock()
ch, ok := h.pending[id]
h.mu.RUnlock()
if !ok {
return
}
select {
case ch <- raw:
default:
}
}
var (
ErrAgentOffline = errors.New("agent offline")
ErrRPCTimeout = errors.New("rpc timeout")
)
func newRPCID() string {
var b [8]byte
_, _ = cryptorand.Read(b[:])
return hex.EncodeToString(b[:])
}