Initial SDP skeleton
Sandbox Deployment Platform — Go control plane + agents, NextJS dashboard, nginx reverse proxy. Cross-compile via Docker; deploy via sshpass to 172.18.136.92 (micro) and 172.18.139.186 (gateway). - control-plane: HTTP API, WS hub, SQLite (modernc.org/sqlite) for progress, .log files for log persistence - agent-micro / agent-gateway: alpine:3.20 + bind-mounted repo, binary exec'd in container, no Dockerfile build step - dashboard: NextJS static export + shadcn/ui components, single WebSocket hook - docker-compose.yml: three services on alpine:latest with docker socket bind for agents - scripts/: build.sh (golang:1.23-alpine cross-compile), deploy.sh, patch-nginx.sh (idempotent nginx splice), ssh wrappers Runtime model: pass-through Bitbucket creds per deploy, never logged or persisted on the agent. Control plane never touches git or docker directly — agents do all the work locally.
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
// Command control-plane is the SDP brain. It owns the metadata DB, the
|
||||
// WebSocket hub, and the HTTP API. Agents and the dashboard talk to it;
|
||||
// it never SSHes, never builds, never touches git directly.
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/sdp/control-plane/internal/api"
|
||||
"github.com/sdp/control-plane/internal/config"
|
||||
"github.com/sdp/control-plane/internal/store"
|
||||
"github.com/sdp/control-plane/internal/ws"
|
||||
)
|
||||
|
||||
func main() {
|
||||
cfg := config.Load()
|
||||
|
||||
st, err := store.Open(cfg.DataDir)
|
||||
if err != nil {
|
||||
log.Fatalf("store: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
hub := ws.New()
|
||||
srv := api.New(st, hub)
|
||||
|
||||
log.Printf("control-plane listening on %s (data=%s)", cfg.Addr, cfg.DataDir)
|
||||
log.Fatal(http.ListenAndServe(cfg.Addr, srv.Routes()))
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
module github.com/sdp/control-plane
|
||||
|
||||
go 1.23
|
||||
|
||||
require (
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
modernc.org/sqlite v1.28.0
|
||||
)
|
||||
@@ -0,0 +1,256 @@
|
||||
// Package api wires the HTTP endpoints. Kept on net/http — no router lib
|
||||
// for a handful of endpoints, stdlib mux is plenty.
|
||||
package api
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/sdp/control-plane/internal/store"
|
||||
"github.com/sdp/control-plane/internal/ws"
|
||||
"github.com/sdp/protocol"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
st *store.Store
|
||||
hub *ws.Hub
|
||||
agents *AgentRegistry
|
||||
sess *Sessions
|
||||
}
|
||||
|
||||
type AgentRegistry struct {
|
||||
mu sync.RWMutex
|
||||
conns map[string]bool // nodeIDs currently connected over WS
|
||||
}
|
||||
|
||||
func New(st *store.Store, hub *ws.Hub) *Server {
|
||||
return &Server{
|
||||
st: st,
|
||||
hub: hub,
|
||||
agents: &AgentRegistry{conns: make(map[string]bool)},
|
||||
sess: NewSessions(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Routes() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/api/login", s.handleLogin)
|
||||
mux.HandleFunc("/api/repos", s.handleListRepos)
|
||||
mux.HandleFunc("/api/repos/branches", s.handleListBranches)
|
||||
mux.HandleFunc("/api/deployments", s.handleDeployments) // GET list, POST create
|
||||
mux.HandleFunc("/api/deployments/stop", s.handleStopDeployment) // POST
|
||||
mux.Handle("/ws/agent", s.hub.AgentWS(s.st,
|
||||
func(nodeID string) {
|
||||
s.agents.mu.Lock()
|
||||
s.agents.conns[nodeID] = true
|
||||
s.agents.mu.Unlock()
|
||||
},
|
||||
func(nodeID string) {
|
||||
s.agents.mu.Lock()
|
||||
delete(s.agents.conns, nodeID)
|
||||
s.agents.mu.Unlock()
|
||||
},
|
||||
))
|
||||
mux.HandleFunc("/ws/deployments/", s.hub.DeploymentWS(s.st))
|
||||
return s.withAuth(mux)
|
||||
}
|
||||
|
||||
// withAuth checks the session cookie on /api/* (skipping login). /ws/* is
|
||||
// protected at the handler — we don't pass auth in headers easily on the WS
|
||||
// upgrade from the browser, so the dashboard sends ?token=... instead.
|
||||
func (s *Server) withAuth(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/api/") || r.URL.Path == "/api/login" {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
c, err := r.Cookie("sdp_session")
|
||||
if err != nil || !s.sess.Valid(c.Value) {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// ---- login ----
|
||||
|
||||
type loginReq struct {
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
Repo string `json:"repo"` // optional: validate against this specific repo
|
||||
}
|
||||
|
||||
func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
var body loginReq
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// ponytail: trust boundary lives in the agent — it does the actual git
|
||||
// ls-remote. The control plane just hands off credentials per-op.
|
||||
// For login we ask any connected agent to validate. If none are
|
||||
// connected, fail. Real impl: pick a known bootstrap repo.
|
||||
ok := s.validateViaAgent(body.Username, body.Password, body.Repo)
|
||||
if !ok {
|
||||
http.Error(w, "login failed", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
tok := s.sess.Issue(body.Username)
|
||||
http.SetCookie(w, &http.Cookie{
|
||||
Name: "sdp_session",
|
||||
Value: tok,
|
||||
Path: "/",
|
||||
HttpOnly: true,
|
||||
SameSite: http.SameSiteLaxMode,
|
||||
MaxAge: 12 * 3600,
|
||||
})
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"ok":true}`))
|
||||
}
|
||||
|
||||
// validateViaAgent does a git ls-remote through one of the connected agents.
|
||||
// The agent holds the repos; the control plane never touches git directly.
|
||||
//
|
||||
// ponytail: MVP stub. Returns true if any agent is connected so the smoke
|
||||
// flow can run. Real impl will send a "probe" frame over the agent's WS
|
||||
// and wait for a reply.
|
||||
func (s *Server) validateViaAgent(user, pass, repo string) bool {
|
||||
_ = user
|
||||
_ = pass
|
||||
_ = repo
|
||||
s.agents.mu.RLock()
|
||||
defer s.agents.mu.RUnlock()
|
||||
return len(s.agents.conns) > 0
|
||||
}
|
||||
|
||||
// ---- repos ----
|
||||
|
||||
type repoInfo struct {
|
||||
Name string `json:"name"`
|
||||
Node string `json:"node"`
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) {
|
||||
// ponytail: real impl asks the connected agents for their repo list.
|
||||
// For MVP smoke, stub with the spec's example.
|
||||
repos := []repoInfo{
|
||||
{Name: "account", Node: "micro", Path: "/home/user/AppGolang/account"},
|
||||
{Name: "payment", Node: "micro", Path: "/home/user/AppGolang/payment"},
|
||||
{Name: "user", Node: "micro", Path: "/home/user/AppGolang/user"},
|
||||
{Name: "notification", Node: "micro", Path: "/home/user/AppGolang/notification"},
|
||||
{Name: "api-gateway", Node: "gateway", Path: "/home/user/SDP"},
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(repos)
|
||||
}
|
||||
|
||||
func (s *Server) handleListBranches(w http.ResponseWriter, r *http.Request) {
|
||||
repo := r.URL.Query().Get("repo")
|
||||
if repo == "" {
|
||||
http.Error(w, "repo required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// ponytail: real impl forwards to the agent. For MVP, stub.
|
||||
branches := []string{"main", "develop", "feature/login-error"}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(branches)
|
||||
}
|
||||
|
||||
// ---- deployments ----
|
||||
|
||||
type deployReq struct {
|
||||
Repository string `json:"repository"`
|
||||
Branch string `json:"branch"`
|
||||
Env map[string]string `json:"env,omitempty"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
func (s *Server) handleDeployments(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
// ponytail: list from SQLite. Real impl: SELECT with filter.
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`[]`))
|
||||
case http.MethodPost:
|
||||
var body deployReq
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// resolve repo -> node
|
||||
node := "micro"
|
||||
if body.Repository == "api-gateway" {
|
||||
node = "gateway"
|
||||
}
|
||||
// ensure agent connected
|
||||
s.agents.mu.RLock()
|
||||
connected := s.agents.conns[node]
|
||||
s.agents.mu.RUnlock()
|
||||
if !connected {
|
||||
http.Error(w, "agent "+node+" not connected", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
id := newID()
|
||||
_ = s.st.StartDeployment(id, body.Repository, body.Branch, body.Username)
|
||||
|
||||
// send deploy request to agent over its WS
|
||||
req := protocol.DeployRequest{
|
||||
DeploymentID: id,
|
||||
Repository: body.Repository,
|
||||
Branch: body.Branch,
|
||||
Env: body.Env,
|
||||
Username: body.Username,
|
||||
Password: body.Password,
|
||||
}
|
||||
payload, _ := json.Marshal(map[string]any{
|
||||
"op": "deploy",
|
||||
"data": req,
|
||||
})
|
||||
if !s.hub.SendToAgent(node, payload) {
|
||||
http.Error(w, "agent buffer full", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{"id": id})
|
||||
default:
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleStopDeployment(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
ID string `json:"id"`
|
||||
Node string `json:"node"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
payload, _ := json.Marshal(map[string]any{"op": "stop", "id": body.ID})
|
||||
if !s.hub.SendToAgent(body.Node, payload) {
|
||||
http.Error(w, "agent not reachable", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func newID() string {
|
||||
b := make([]byte, 8)
|
||||
_, _ = rand.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sessions is an in-memory token store. Replace with a signed JWT or
|
||||
// a Redis-backed store when we need multi-replica CP. For MVP one process
|
||||
// is enough.
|
||||
type Sessions struct {
|
||||
mu sync.RWMutex
|
||||
store map[string]session
|
||||
}
|
||||
|
||||
type session struct {
|
||||
user string
|
||||
expires time.Time
|
||||
}
|
||||
|
||||
func NewSessions() *Sessions {
|
||||
return &Sessions{store: make(map[string]session)}
|
||||
}
|
||||
|
||||
func (s *Sessions) Issue(user string) string {
|
||||
b := make([]byte, 16)
|
||||
_, _ = rand.Read(b)
|
||||
tok := hex.EncodeToString(b)
|
||||
s.mu.Lock()
|
||||
s.store[tok] = session{user: user, expires: time.Now().Add(12 * time.Hour)}
|
||||
s.mu.Unlock()
|
||||
return tok
|
||||
}
|
||||
|
||||
func (s *Sessions) Valid(tok string) bool {
|
||||
s.mu.RLock()
|
||||
sess, ok := s.store[tok]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return time.Now().Before(sess.expires)
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Addr string // listen addr, e.g. ":8080"
|
||||
DataDir string // SQLite + .log files live here
|
||||
AgentHealth string // map of nodeID -> agent base URL (TODO: real map)
|
||||
}
|
||||
|
||||
// Load reads flags and env. Env wins over defaults; flags win over env.
|
||||
func Load() Config {
|
||||
c := Config{
|
||||
Addr: envOr("SDP_ADDR", ":8080"),
|
||||
DataDir: envOr("SDP_DATA", "./data"),
|
||||
AgentHealth: envOr("SDP_AGENT_HEALTH", ""),
|
||||
}
|
||||
flag.StringVar(&c.Addr, "addr", c.Addr, "control plane listen addr")
|
||||
flag.StringVar(&c.DataDir, "data", c.DataDir, "data directory for sqlite and logs")
|
||||
flag.Parse()
|
||||
return c
|
||||
}
|
||||
|
||||
func envOr(k, def string) string {
|
||||
if v := os.Getenv(k); v != "" {
|
||||
return v
|
||||
}
|
||||
return def
|
||||
}
|
||||
@@ -0,0 +1,161 @@
|
||||
// Package store persists deployment progress in SQLite and log lines in
|
||||
// append-only .log files. The hot path is AppendEvent — agents emit a lot
|
||||
// of these and the dashboard wants them live.
|
||||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
|
||||
"github.com/sdp/protocol"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
dir string
|
||||
logs map[string]*os.File // deploymentID -> file
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func Open(dir string) (*Store, error) {
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(dir, "logs"), 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db, err := sql.Open("sqlite3", filepath.Join(dir, "sdp.db"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS deployments (
|
||||
id TEXT PRIMARY KEY,
|
||||
repository TEXT,
|
||||
branch TEXT,
|
||||
user TEXT,
|
||||
state TEXT,
|
||||
started_at INTEGER,
|
||||
completed_at INTEGER
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS progress (
|
||||
deployment_id TEXT,
|
||||
stage TEXT,
|
||||
ok INTEGER,
|
||||
at INTEGER
|
||||
);
|
||||
`); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Store{db: db, dir: dir, logs: make(map[string]*os.File)}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, f := range s.logs {
|
||||
_ = f.Close()
|
||||
}
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
// StartDeployment records a new deployment row. Idempotent on id.
|
||||
func (s *Store) StartDeployment(id, repo, branch, user string) error {
|
||||
_, err := s.db.Exec(
|
||||
`INSERT OR IGNORE INTO deployments(id, repository, branch, user, state, started_at) VALUES(?,?,?,?,?,?)`,
|
||||
id, repo, branch, user, "QUEUED", time.Now().UnixMilli(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// FinishDeployment marks the final state. completed_at is set if state is terminal.
|
||||
func (s *Store) FinishDeployment(id, state string) error {
|
||||
_, err := s.db.Exec(
|
||||
`UPDATE deployments SET state=?, completed_at=? WHERE id=?`,
|
||||
state, time.Now().UnixMilli(), id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// MarkStage records a stage transition. ok=1 success, 0 failure.
|
||||
func (s *Store) MarkStage(id, stage string, ok bool) error {
|
||||
v := 0
|
||||
if ok {
|
||||
v = 1
|
||||
}
|
||||
_, err := s.db.Exec(
|
||||
`INSERT INTO progress(deployment_id, stage, ok, at) VALUES(?,?,?,?)`,
|
||||
id, stage, v, time.Now().UnixMilli(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// AppendEvent writes an event. Log lines go to .log; progress/status hit SQLite.
|
||||
// The deployment's running state is also updated so /api/deployments/{id} can
|
||||
// serve a snapshot without replaying the whole log.
|
||||
func (s *Store) AppendEvent(e protocol.Event) error {
|
||||
switch e.Kind {
|
||||
case "log":
|
||||
return s.appendLog(e)
|
||||
case "status":
|
||||
_, err := s.db.Exec(`UPDATE deployments SET state=? WHERE id=?`, e.State, e.DeploymentID)
|
||||
return err
|
||||
case "progress":
|
||||
return s.MarkStage(e.DeploymentID, e.Stage, e.State != "FAILED")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) appendLog(e protocol.Event) error {
|
||||
s.mu.Lock()
|
||||
f, ok := s.logs[e.DeploymentID]
|
||||
if !ok {
|
||||
path := filepath.Join(s.dir, "logs", e.DeploymentID+".log")
|
||||
var err error
|
||||
f, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
s.logs[e.DeploymentID] = f
|
||||
}
|
||||
s.mu.Unlock()
|
||||
ts := time.UnixMilli(e.At).Format("15:04:05.000")
|
||||
_, err := fmt.Fprintf(f, "%s %s\n", ts, e.Line)
|
||||
return err
|
||||
}
|
||||
|
||||
// TailLogs returns the last n lines of a deployment's log file. Used by the
|
||||
// dashboard on first connect to backfill.
|
||||
func (s *Store) TailLogs(id string, n int) ([]string, error) {
|
||||
path := filepath.Join(s.dir, "logs", id+".log")
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// ponytail: O(n) scan, fine for tail use. swap to a ring buffer if logs get huge.
|
||||
var lines []string
|
||||
start := 0
|
||||
for i, b := range data {
|
||||
if b == '\n' {
|
||||
lines = append(lines, string(data[start:i]))
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
if start < len(data) {
|
||||
lines = append(lines, string(data[start:]))
|
||||
}
|
||||
if n > 0 && len(lines) > n {
|
||||
lines = lines[len(lines)-n:]
|
||||
}
|
||||
return lines, nil
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/sdp/protocol"
|
||||
|
||||
"github.com/sdp/control-plane/internal/store"
|
||||
)
|
||||
|
||||
// AgentWS handles /ws/agent?node=<id>. The agent pushes events; the control
|
||||
// plane can send requests back over the same socket.
|
||||
func (h *Hub) AgentWS(st *store.Store, onConnect, onDisconnect func(nodeID string)) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
nodeID := r.URL.Query().Get("node")
|
||||
if nodeID == "" {
|
||||
http.Error(w, "missing node query", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// outbound to agent: buffered; if the agent stalls we drop rather than block.
|
||||
out := make(chan []byte, 32)
|
||||
h.RegisterAgent(nodeID, out)
|
||||
defer func() {
|
||||
h.UnregisterAgent(nodeID)
|
||||
close(out) // unblock the writer goroutine
|
||||
}()
|
||||
if onConnect != nil {
|
||||
onConnect(nodeID)
|
||||
}
|
||||
defer func() {
|
||||
if onDisconnect != nil {
|
||||
onDisconnect(nodeID)
|
||||
}
|
||||
}()
|
||||
|
||||
// writer goroutine: drains out to the agent
|
||||
go func() {
|
||||
for msg := range out {
|
||||
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// reader loop: agent -> control plane
|
||||
for {
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var e protocol.Event
|
||||
if err := json.Unmarshal(raw, &e); err != nil {
|
||||
log.Printf("agent %s: bad event: %v", nodeID, err)
|
||||
continue
|
||||
}
|
||||
_ = st.AppendEvent(e) // ponytail: best-effort persist
|
||||
h.Publish(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeploymentWS handles /ws/deployments/{id}. Dashboard subscribes; we send
|
||||
// a tail of the existing log first, then live events.
|
||||
func (h *Hub) DeploymentWS(st *store.Store) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
id := strings.TrimPrefix(r.URL.Path, "/ws/deployments/")
|
||||
if id == "" {
|
||||
http.Error(w, "missing deployment id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// backfill: send last N log lines as synthetic events.
|
||||
lines, _ := st.TailLogs(id, 200)
|
||||
for _, ln := range lines {
|
||||
msg := map[string]any{
|
||||
"deploymentId": id,
|
||||
"kind": "log",
|
||||
"line": ln,
|
||||
"at": int64(0),
|
||||
"backfill": true,
|
||||
}
|
||||
b, _ := json.Marshal(msg)
|
||||
if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ch, unsub := h.Subscribe(id)
|
||||
defer unsub()
|
||||
for e := range ch {
|
||||
b, _ := json.Marshal(e)
|
||||
if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
// Package ws is the WebSocket fan-out for SDP. Two flows:
|
||||
//
|
||||
// agent --(events)--> /ws/agent (one conn per agent)
|
||||
// dashboard client --(subscribe)-> /ws/deployments/{id} (one conn per viewer)
|
||||
//
|
||||
// On agent connect we record the agent's nodeID. On dashboard connect we
|
||||
// register a subscriber for one deployment. Events are best-effort fanned out;
|
||||
// a slow client is dropped, not allowed to backpressure the agent.
|
||||
package ws
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"github.com/sdp/protocol"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true }, // ponytail: internal tool, allow all
|
||||
}
|
||||
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// one channel per deployment; nil = no subscribers.
|
||||
subs map[string]map[chan protocol.Event]struct{}
|
||||
|
||||
// one channel per connected agent; keyed by nodeID.
|
||||
agents map[string]chan<- []byte // outbound to agent (deploy requests, etc.)
|
||||
}
|
||||
|
||||
func New() *Hub {
|
||||
return &Hub{
|
||||
subs: make(map[string]map[chan protocol.Event]struct{}),
|
||||
agents: make(map[string]chan<- []byte),
|
||||
}
|
||||
}
|
||||
|
||||
// Publish fans an event out to all subscribers of that deployment.
|
||||
// Non-blocking; drops the event for a subscriber if its buffer is full.
|
||||
func (h *Hub) Publish(e protocol.Event) {
|
||||
h.mu.RLock()
|
||||
subs := h.subs[e.DeploymentID]
|
||||
chans := make([]chan protocol.Event, 0, len(subs))
|
||||
for c := range subs {
|
||||
chans = append(chans, c)
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
for _, c := range chans {
|
||||
select {
|
||||
case c <- e:
|
||||
default:
|
||||
// ponytail: drop slow subscriber; they'll reconnect.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers a subscriber channel for one deployment.
|
||||
// Returns an unsubscribe func the caller must invoke.
|
||||
func (h *Hub) Subscribe(deploymentID string) (chan protocol.Event, func()) {
|
||||
ch := make(chan protocol.Event, 64)
|
||||
h.mu.Lock()
|
||||
if h.subs[deploymentID] == nil {
|
||||
h.subs[deploymentID] = make(map[chan protocol.Event]struct{})
|
||||
}
|
||||
h.subs[deploymentID][ch] = struct{}{}
|
||||
h.mu.Unlock()
|
||||
return ch, func() {
|
||||
h.mu.Lock()
|
||||
if subs, ok := h.subs[deploymentID]; ok {
|
||||
delete(subs, ch)
|
||||
if len(subs) == 0 {
|
||||
delete(h.subs, deploymentID)
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterAgent stores the outbound channel for a node. Used when the control
|
||||
// plane wants to send something back to an agent (deploy requests, stop, etc.).
|
||||
func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) {
|
||||
h.mu.Lock()
|
||||
h.agents[nodeID] = out
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *Hub) UnregisterAgent(nodeID string) {
|
||||
h.mu.Lock()
|
||||
delete(h.agents, nodeID)
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// SendToAgent best-effort sends a payload to a connected agent. Returns false
|
||||
// if the agent isn't connected or its buffer is full.
|
||||
func (h *Hub) SendToAgent(nodeID string, payload []byte) bool {
|
||||
h.mu.RLock()
|
||||
ch, ok := h.agents[nodeID]
|
||||
h.mu.RUnlock()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case ch <- payload:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user