55d7705c63
- protocol: add RepoInfo, RouteOverride; add HostPort, SandboxID to DeployRequest.
- ws hub: add CallAgent for sync request/response RPCs over the agent WS,
and DeliverAgentReply to route {op:reply} frames back to the caller.
UnregisterAgent now also fails any pending RPCs so callers don't hang.
- agent-micro: new op handlers list_repos, list_branches, probe.
Wire protocol.Event frames use json.RawMessage so each op decodes
its own data shape.
- agent-gateway: same op handlers (list_repos/list_branches/probe) plus
push_routes, which the gateway uses to rewrite the api-gateway
config.php. Detailed in a later commit.
- control-plane login: validateViaAgent now calls CallAgent('probe')
against the gateway agent (git ls-remote), replacing the
accept-any-creds stub.
- control-plane repos: handleListRepos and handleListBranches forward
to the agents via list_repos / list_branches RPCs, replacing the
hardcoded fixtures.
- control-plane deployments: split into its own file. handleListDeployments
reads from SQLite (was hardcoded []). handleCreateDeployment now
supports sandbox-scoped deploys with a host port + env merge.
handleStopDeployment looks up the node from the deployment row.
- store: split into store.go + deployments.go. The Deployment type
adds sandboxId, containerId, hostPort. StartDeploymentInSandbox,
SetContainerID, ListDeployments, GetDeployment, LatestDeploymentBySandboxService
are new.
- store_test.go: round-trips every Slice-2 path (env, sandbox,
template, clone, routes, deployment).
- .gitignore: track bin/ — the build runs on a separate Linux box
with the golang:1.24 toolchain, and the binaries are SCPed from
there to the company VMs (92 / 186). The VMs have no internet.
- Tracked bin/{control-plane,agent-micro,agent-gateway}.
211 lines
5.3 KiB
Go
211 lines
5.3 KiB
Go
// Package ws is the WebSocket fan-out for SDP. Three flows:
|
|
//
|
|
// agent --(events)--> /ws/agent (one conn per agent)
|
|
// dashboard client --(subscribe)-> /ws/deployments/{id} (one conn per viewer)
|
|
// control plane --(RPC)----> /ws/agent (request/response, sync)
|
|
//
|
|
// On agent connect we record the agent's nodeID. On dashboard connect we
|
|
// register a subscriber for one deployment. Events are best-effort fanned out;
|
|
// a slow client is dropped, not allowed to backpressure the agent. The RPC
|
|
// flow is request/response with a per-call channel indexed by a unique id.
|
|
package ws
|
|
|
|
import (
|
|
"context"
|
|
cryptorand "crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/sdp/protocol"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool { return true }, // ponytail: internal tool, allow all
|
|
}
|
|
|
|
// Hub is the central registry of agent connections and dashboard subscribers.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
|
|
// one channel per deployment; nil = no subscribers.
|
|
subs map[string]map[chan protocol.Event]struct{}
|
|
|
|
// one channel per connected agent; keyed by nodeID.
|
|
agents map[string]chan<- []byte // outbound to agent (deploy requests, etc.)
|
|
|
|
// pending RPCs keyed by request id. Each value is a channel that
|
|
// receives the agent's reply. The agent's inbound reader delivers
|
|
// replies here.
|
|
pending map[string]chan []byte
|
|
}
|
|
|
|
func New() *Hub {
|
|
return &Hub{
|
|
subs: make(map[string]map[chan protocol.Event]struct{}),
|
|
agents: make(map[string]chan<- []byte),
|
|
pending: make(map[string]chan []byte),
|
|
}
|
|
}
|
|
|
|
// Publish fans an event out to all subscribers of that deployment.
|
|
// Non-blocking; drops the event for a subscriber if its buffer is full.
|
|
func (h *Hub) Publish(e protocol.Event) {
|
|
h.mu.RLock()
|
|
subs := h.subs[e.DeploymentID]
|
|
chans := make([]chan protocol.Event, 0, len(subs))
|
|
for c := range subs {
|
|
chans = append(chans, c)
|
|
}
|
|
h.mu.RUnlock()
|
|
for _, c := range chans {
|
|
select {
|
|
case c <- e:
|
|
default:
|
|
// ponytail: drop slow subscriber; they'll reconnect.
|
|
}
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a subscriber channel for one deployment.
|
|
// Returns an unsubscribe func the caller must invoke.
|
|
func (h *Hub) Subscribe(deploymentID string) (chan protocol.Event, func()) {
|
|
ch := make(chan protocol.Event, 64)
|
|
h.mu.Lock()
|
|
if h.subs[deploymentID] == nil {
|
|
h.subs[deploymentID] = make(map[chan protocol.Event]struct{})
|
|
}
|
|
h.subs[deploymentID][ch] = struct{}{}
|
|
h.mu.Unlock()
|
|
return ch, func() {
|
|
h.mu.Lock()
|
|
if subs, ok := h.subs[deploymentID]; ok {
|
|
delete(subs, ch)
|
|
if len(subs) == 0 {
|
|
delete(h.subs, deploymentID)
|
|
}
|
|
}
|
|
h.mu.Unlock()
|
|
close(ch)
|
|
}
|
|
}
|
|
|
|
// RegisterAgent stores the outbound channel for a node. Used when the control
|
|
// plane wants to send something back to an agent (deploy requests, RPCs, etc.).
|
|
func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) {
|
|
h.mu.Lock()
|
|
h.agents[nodeID] = out
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *Hub) UnregisterAgent(nodeID string) {
|
|
h.mu.Lock()
|
|
delete(h.agents, nodeID)
|
|
// also fail any pending RPCs.
|
|
toFail := make([]chan []byte, 0, len(h.pending))
|
|
for _, ch := range h.pending {
|
|
toFail = append(toFail, ch)
|
|
}
|
|
h.pending = make(map[string]chan []byte)
|
|
h.mu.Unlock()
|
|
for _, ch := range toFail {
|
|
select {
|
|
case ch <- nil:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendToAgent best-effort sends a payload to a connected agent. Returns false
|
|
// if the agent isn't connected or its buffer is full.
|
|
func (h *Hub) SendToAgent(nodeID string, payload []byte) bool {
|
|
h.mu.RLock()
|
|
ch, ok := h.agents[nodeID]
|
|
h.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
select {
|
|
case ch <- payload:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// CallAgent is a request/response round-trip over the agent's WS. It sends
|
|
// {op, id, data} and waits for a {op:"reply", id, ...} back. The raw
|
|
// reply JSON is returned; the caller unmarshals into the concrete type.
|
|
//
|
|
// Errors: ErrAgentOffline, ErrRPCTimeout, or a delivery failure.
|
|
func (h *Hub) CallAgent(ctx context.Context, nodeID, op string, data interface{}, timeout time.Duration) ([]byte, error) {
|
|
id := newRPCID()
|
|
reply := make(chan []byte, 1)
|
|
|
|
h.mu.Lock()
|
|
h.pending[id] = reply
|
|
h.mu.Unlock()
|
|
defer func() {
|
|
h.mu.Lock()
|
|
delete(h.pending, id)
|
|
h.mu.Unlock()
|
|
}()
|
|
|
|
frame := map[string]interface{}{"op": op, "id": id, "data": data}
|
|
payload, err := json.Marshal(frame)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !h.SendToAgent(nodeID, payload) {
|
|
return nil, ErrAgentOffline
|
|
}
|
|
if timeout <= 0 {
|
|
timeout = 10 * time.Second
|
|
}
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case raw := <-reply:
|
|
if raw == nil {
|
|
return nil, ErrAgentOffline
|
|
}
|
|
return raw, nil
|
|
case <-timer.C:
|
|
return nil, ErrRPCTimeout
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// DeliverAgentReply is called from the agent's inbound reader when a
|
|
// frame with op="reply" arrives. It looks up the pending call by id and
|
|
// hands the payload over.
|
|
func (h *Hub) DeliverAgentReply(id string, raw []byte) {
|
|
h.mu.RLock()
|
|
ch, ok := h.pending[id]
|
|
h.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case ch <- raw:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var (
|
|
ErrAgentOffline = errors.New("agent offline")
|
|
ErrRPCTimeout = errors.New("rpc timeout")
|
|
)
|
|
|
|
func newRPCID() string {
|
|
var b [8]byte
|
|
_, _ = cryptorand.Read(b[:])
|
|
return hex.EncodeToString(b[:])
|
|
}
|