diff --git a/.gitignore b/.gitignore index e61005d..678500a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,11 @@ # Built artifacts -bin/ -dashboard/out/ +# bin/ AND dashboard/out/ are tracked on purpose: the build runs on a +# separate Linux box that has the golang:1.24 toolchain, and BOTH the +# binaries and the static dashboard are SCPed from here to the company +# VMs (92 / 186). The VMs have no internet so they can't regenerate +# either. Do NOT gitignore them again. +# dashboard/.next/ stays ignored — it's a Next.js build cache, not a +# deployable artifact. dashboard/.next/ # Node diff --git a/agent-micro/cmd/agent-micro/main.go b/agent-micro/cmd/agent-micro/main.go index 87d5875..bddddb4 100644 --- a/agent-micro/cmd/agent-micro/main.go +++ b/agent-micro/cmd/agent-micro/main.go @@ -1,6 +1,7 @@ // Command agent-micro runs on the microservices VM (172.18.136.92). It -// maintains a WebSocket to the control plane, accepts deploy/stop frames, -// and runs the build+container pipeline locally for Go microservices. +// maintains a WebSocket to the control plane, accepts deploy/stop/RPC +// frames, and runs the build+container pipeline locally for Go +// microservices. package main import ( @@ -13,8 +14,8 @@ import ( "sync" "time" - docker "github.com/moby/moby/client" "github.com/gorilla/websocket" + docker "github.com/moby/moby/client" "github.com/sdp/agentlib/deployer" "github.com/sdp/agentlib/gitutil" @@ -31,7 +32,7 @@ var repos = map[string]string{ } func main() { - cpURL := flag.String("cp", envOr("SDP_CP_URL", "ws://localhost:8080/ws/agent"), "control plane WS URL") + cpURL := flag.String("cp", envOr("SDP_CP_URL", "ws://localhost:3452/ws/agent"), "control plane WS URL") nodeID := flag.String("node", envOr("SDP_NODE_ID", "micro"), "node id sent in WS query") flag.Parse() @@ -110,9 +111,9 @@ func readLoop(c *websocket.Conn, cli *docker.Client, out chan<- []byte, mu *sync } // Inbound frame: {op, data, id}. Op is the verb. data is op-specific. var frame struct { - Op string `json:"op"` - Data protocol.DeployRequest `json:"data"` - ID string `json:"id"` + Op string `json:"op"` + Data json.RawMessage `json:"data"` + ID string `json:"id"` } if err := json.Unmarshal(raw, &frame); err != nil { log.Printf("bad frame: %v", err) @@ -120,33 +121,96 @@ func readLoop(c *websocket.Conn, cli *docker.Client, out chan<- []byte, mu *sync } switch frame.Op { case "deploy": - repoPath, ok := repos[frame.Data.Repository] + var req protocol.DeployRequest + if err := json.Unmarshal(frame.Data, &req); err != nil { + log.Printf("bad deploy data: %v", err) + continue + } + repoPath, ok := repos[req.Repository] if !ok { emit(out, protocol.Event{ - DeploymentID: frame.Data.DeploymentID, + DeploymentID: req.DeploymentID, Kind: "status", State: "FAILED", At: time.Now().UnixMilli(), }) continue } - d := deployer.NewGo(cli, frame.Data.DeploymentID, - frame.Data.Repository, repoPath, - frame.Data.Branch, frame.Data.Env, - gitutil.Creds{Username: frame.Data.Username, Password: frame.Data.Password}, + d := deployer.NewGo(cli, req.DeploymentID, + req.Repository, repoPath, + req.Branch, req.HostPort, req.Env, + gitutil.Creds{Username: req.Username, Password: req.Password}, ) ctx, cancel := context.WithCancel(context.Background()) - inflight[frame.Data.DeploymentID] = &runState{deployer: d, cancel: cancel} + inflight[req.DeploymentID] = &runState{deployer: d, cancel: cancel} go runDeploy(d, ctx, out) case "stop": if rs, ok := inflight[frame.ID]; ok { _ = rs.deployer.Stop(context.Background()) rs.cancel() // unblock StreamLogs } + case "list_repos": + handleListRepos(out, frame.ID) + case "list_branches": + var body struct { + Repo string `json:"repo"` + } + _ = json.Unmarshal(frame.Data, &body) + handleListBranches(out, frame.ID, body.Repo) + case "probe": + var body struct { + Repo string `json:"repo"` + Username string `json:"username"` + Password string `json:"password"` + } + _ = json.Unmarshal(frame.Data, &body) + handleProbe(out, frame.ID, body.Repo, body.Username, body.Password) } } } +func handleListRepos(out chan<- []byte, id string) { + // Micro agent serves all repos except the api-gateway. The control + // plane calls both nodes for the union; the gateway will return + // only its own repo. + list := make([]protocol.RepoInfo, 0, len(repos)) + for name, path := range repos { + list = append(list, protocol.RepoInfo{Name: name, Path: path}) + } + replyOK(out, id, map[string]any{"repos": list}) +} + +func handleListBranches(out chan<- []byte, id, repo string) { + path, ok := repos[repo] + if !ok { + replyErr(out, id, "unknown repo: "+repo) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + branches, err := gitutil.ListBranches(ctx, path) + if err != nil { + replyErr(out, id, err.Error()) + return + } + replyOK(out, id, map[string]any{"ok": true, "branches": branches}) +} + +func handleProbe(out chan<- []byte, id, repo, user, pass string) { + path, ok := repos[repo] + if !ok { + replyErr(out, id, "unknown repo: "+repo) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := gitutil.Probe(ctx, path, gitutil.Creds{Username: user, Password: pass}); err != nil { + replyErr(out, id, err.Error()) + return + } + replyOK(out, id, map[string]any{"ok": true}) +} + func runDeploy(d *deployer.Deployer, ctx context.Context, out chan<- []byte) { events := make(chan protocol.Event, 64) // producer: Run pipelines, then StreamLogs tails the container. Both @@ -174,6 +238,31 @@ func emit(out chan<- []byte, e protocol.Event) { } } +func replyOK(out chan<- []byte, id string, data interface{}) { + reply(out, id, true, "", data) +} + +func replyErr(out chan<- []byte, id, errMsg string) { + reply(out, id, false, errMsg, nil) +} + +func reply(out chan<- []byte, id string, ok bool, errMsg string, data interface{}) { + frame := map[string]any{ + "op": "reply", + "id": id, + "ok": ok, + "data": data, + } + if errMsg != "" { + frame["error"] = errMsg + } + b, _ := json.Marshal(frame) + select { + case out <- b: + default: + } +} + func envOr(k, def string) string { if v := os.Getenv(k); v != "" { return v diff --git a/bin/agent-gateway b/bin/agent-gateway new file mode 100755 index 0000000..2b90c82 Binary files /dev/null and b/bin/agent-gateway differ diff --git a/bin/agent-micro b/bin/agent-micro new file mode 100755 index 0000000..fdd3270 Binary files /dev/null and b/bin/agent-micro differ diff --git a/bin/control-plane b/bin/control-plane new file mode 100755 index 0000000..1a69269 Binary files /dev/null and b/bin/control-plane differ diff --git a/control-plane/internal/api/api.go b/control-plane/internal/api/api.go index fbd2ffb..20d2caa 100644 --- a/control-plane/internal/api/api.go +++ b/control-plane/internal/api/api.go @@ -1,18 +1,23 @@ // Package api wires the HTTP endpoints. Kept on net/http — no router lib // for a handful of endpoints, stdlib mux is plenty. +// +// Slice 2 adds sandboxes, templates, environments, routes, port +// allocation, and a real agent-mediated auth path. The legacy +// /api/deployments POST is kept for ad-hoc deploys (no sandbox) and a +// new /api/sandboxes/{id}/services/{repo}/deploy adds the +// sandbox-scoped variant that picks the right port and env. package api import ( "crypto/rand" "encoding/hex" - "encoding/json" "net/http" "strings" "sync" + "time" "github.com/sdp/control-plane/internal/store" "github.com/sdp/control-plane/internal/ws" - "github.com/sdp/protocol" ) type Server struct { @@ -20,6 +25,14 @@ type Server struct { hub *ws.Hub agents *AgentRegistry sess *Sessions + // gatewayRepo is the repo the control plane uses to validate + // Bitbucket creds at login. Defaults to "api-gateway". + gatewayRepo string + // microVMIP / gatewayVMIP are the host IPs the dashboard shows + // when a user picks a "local stand-in" for a microservice route. + // Defaults match the company infra (172.18.136.92 / 172.18.139.186). + microVMIP string + gatewayVMIP string } type AgentRegistry struct { @@ -29,20 +42,49 @@ type AgentRegistry struct { func New(st *store.Store, hub *ws.Hub) *Server { return &Server{ - st: st, - hub: hub, - agents: &AgentRegistry{conns: make(map[string]bool)}, - sess: NewSessions(), + st: st, + hub: hub, + agents: &AgentRegistry{conns: make(map[string]bool)}, + sess: NewSessions(), + gatewayRepo: "api-gateway", + microVMIP: "172.18.136.92", + gatewayVMIP: "172.18.139.186", } } func (s *Server) Routes() http.Handler { mux := http.NewServeMux() + + // auth mux.HandleFunc("/api/login", s.handleLogin) + mux.HandleFunc("/api/logout", s.handleLogout) + + // repos & deploy mux.HandleFunc("/api/repos", s.handleListRepos) mux.HandleFunc("/api/repos/branches", s.handleListBranches) - mux.HandleFunc("/api/deployments", s.handleDeployments) // GET list, POST create + mux.HandleFunc("/api/agents", s.handleListAgents) + mux.HandleFunc("/api/deployments", s.handleListDeployments) // GET list + mux.HandleFunc("/api/deployments/new", s.handleCreateDeployment) // POST mux.HandleFunc("/api/deployments/stop", s.handleStopDeployment) // POST + + // environments + mux.HandleFunc("/api/environments", s.handleEnvironments) // GET list, POST create + mux.HandleFunc("/api/environments/", s.handleEnvironmentByID) // GET/PUT/DELETE /{id} + + // sandboxes + mux.HandleFunc("/api/sandboxes", s.handleSandboxes) // GET list, POST create + mux.HandleFunc("/api/sandboxes/", s.handleSandboxByID) // GET/PUT/DELETE /{id} + mux.HandleFunc("/api/sandboxes/clone", s.handleCloneSandbox) // POST: clone from template + + // templates + mux.HandleFunc("/api/templates", s.handleTemplates) // GET list, POST create + mux.HandleFunc("/api/templates/", s.handleTemplateByID) // GET/PUT/DELETE /{id} + + // routes + mux.HandleFunc("/api/routes", s.handleListRoutes) // GET: live routes from gateway agent's config.php + mux.HandleFunc("/api/routes/push", s.handlePushRoutes) // POST: push routes for a sandbox + + // websocket mux.Handle("/ws/agent", s.hub.AgentWS(s.st, func(nodeID string) { s.agents.mu.Lock() @@ -56,6 +98,7 @@ func (s *Server) Routes() http.Handler { }, )) mux.HandleFunc("/ws/deployments/", s.hub.DeploymentWS(s.st)) + return s.withAuth(mux) } @@ -77,176 +120,16 @@ func (s *Server) withAuth(next http.Handler) http.Handler { }) } -// ---- login ---- - -type loginReq struct { - Username string `json:"username"` - Password string `json:"password"` - Repo string `json:"repo"` // optional: validate against this specific repo -} - -func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "POST only", http.StatusMethodNotAllowed) - return +// userFromContext returns the authenticated user from the session cookie. +// The cookie is set on /api/login; on every other /api/* path we look it +// up here so handlers can record who did what (audit trail). +func (s *Server) userFromContext(r *http.Request) string { + c, err := r.Cookie("sdp_session") + if err != nil { + return "" } - var body loginReq - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - http.Error(w, "bad json", http.StatusBadRequest) - return - } - // ponytail: trust boundary lives in the agent — it does the actual git - // ls-remote. The control plane just hands off credentials per-op. - // For login we ask any connected agent to validate. If none are - // connected, fail. Real impl: pick a known bootstrap repo. - ok := s.validateViaAgent(body.Username, body.Password, body.Repo) - if !ok { - http.Error(w, "login failed", http.StatusUnauthorized) - return - } - tok := s.sess.Issue(body.Username) - http.SetCookie(w, &http.Cookie{ - Name: "sdp_session", - Value: tok, - Path: "/", - HttpOnly: true, - SameSite: http.SameSiteLaxMode, - MaxAge: 12 * 3600, - }) - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"ok":true}`)) -} - -// validateViaAgent does a git ls-remote through one of the connected agents. -// The agent holds the repos; the control plane never touches git directly. -// -// ponytail: MVP stub. Returns true if any agent is connected so the smoke -// flow can run. Real impl will send a "probe" frame over the agent's WS -// and wait for a reply. -func (s *Server) validateViaAgent(user, pass, repo string) bool { - _ = user - _ = pass - _ = repo - s.agents.mu.RLock() - defer s.agents.mu.RUnlock() - return len(s.agents.conns) > 0 -} - -// ---- repos ---- - -type repoInfo struct { - Name string `json:"name"` - Node string `json:"node"` - Path string `json:"path"` -} - -func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) { - // ponytail: real impl asks the connected agents for their repo list. - // For MVP smoke, stub with the spec's example. - repos := []repoInfo{ - {Name: "account", Node: "micro", Path: "/home/user/AppGolang/account"}, - {Name: "payment", Node: "micro", Path: "/home/user/AppGolang/payment"}, - {Name: "user", Node: "micro", Path: "/home/user/AppGolang/user"}, - {Name: "notification", Node: "micro", Path: "/home/user/AppGolang/notification"}, - {Name: "api-gateway", Node: "gateway", Path: "/home/user/SDP"}, - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(repos) -} - -func (s *Server) handleListBranches(w http.ResponseWriter, r *http.Request) { - repo := r.URL.Query().Get("repo") - if repo == "" { - http.Error(w, "repo required", http.StatusBadRequest) - return - } - // ponytail: real impl forwards to the agent. For MVP, stub. - branches := []string{"main", "develop", "feature/login-error"} - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(branches) -} - -// ---- deployments ---- - -type deployReq struct { - Repository string `json:"repository"` - Branch string `json:"branch"` - Env map[string]string `json:"env,omitempty"` - Username string `json:"username"` - Password string `json:"password"` -} - -func (s *Server) handleDeployments(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - // ponytail: list from SQLite. Real impl: SELECT with filter. - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`[]`)) - case http.MethodPost: - var body deployReq - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - http.Error(w, "bad json", http.StatusBadRequest) - return - } - // resolve repo -> node - node := "micro" - if body.Repository == "api-gateway" { - node = "gateway" - } - // ensure agent connected - s.agents.mu.RLock() - connected := s.agents.conns[node] - s.agents.mu.RUnlock() - if !connected { - http.Error(w, "agent "+node+" not connected", http.StatusServiceUnavailable) - return - } - id := newID() - _ = s.st.StartDeployment(id, body.Repository, body.Branch, body.Username) - - // send deploy request to agent over its WS - req := protocol.DeployRequest{ - DeploymentID: id, - Repository: body.Repository, - Branch: body.Branch, - Env: body.Env, - Username: body.Username, - Password: body.Password, - } - payload, _ := json.Marshal(map[string]any{ - "op": "deploy", - "data": req, - }) - if !s.hub.SendToAgent(node, payload) { - http.Error(w, "agent buffer full", http.StatusServiceUnavailable) - return - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]string{"id": id}) - default: - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - } -} - -func (s *Server) handleStopDeployment(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "POST only", http.StatusMethodNotAllowed) - return - } - var body struct { - ID string `json:"id"` - Node string `json:"node"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - http.Error(w, "bad json", http.StatusBadRequest) - return - } - payload, _ := json.Marshal(map[string]any{"op": "stop", "id": body.ID}) - if !s.hub.SendToAgent(body.Node, payload) { - http.Error(w, "agent not reachable", http.StatusServiceUnavailable) - return - } - w.WriteHeader(http.StatusOK) + u, _ := s.sess.User(c.Value) + return u } func newID() string { @@ -254,3 +137,16 @@ func newID() string { _, _ = rand.Read(b) return hex.EncodeToString(b) } + +func writeJSON(w http.ResponseWriter, status int, v interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = encodeJSON(w, v) +} + +func writeErr(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +// nowMs is a small helper so tests can stub it later. +func nowMs() int64 { return time.Now().UnixMilli() } diff --git a/control-plane/internal/api/deployments.go b/control-plane/internal/api/deployments.go new file mode 100644 index 0000000..f5389e1 --- /dev/null +++ b/control-plane/internal/api/deployments.go @@ -0,0 +1,200 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + "time" + + "github.com/sdp/control-plane/internal/store" + "github.com/sdp/protocol" +) + +// handleListDeployments reads deployment history from SQLite. Filterable +// by sandbox via ?sandbox=ID. Limit via ?limit=N (default 100, max 500). +func (s *Server) handleListDeployments(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "GET only", http.StatusMethodNotAllowed) + return + } + sandboxID := r.URL.Query().Get("sandbox") + limit := 100 + if l := r.URL.Query().Get("limit"); l != "" { + if n, err := strconv.Atoi(l); err == nil { + limit = n + } + } + deps, err := s.st.ListDeployments(sandboxID, limit) + if err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + if deps == nil { + deps = []store.Deployment{} // never null in JSON + } + writeJSON(w, http.StatusOK, deps) +} + +type createDeployReq struct { + Repository string `json:"repository"` + Branch string `json:"branch"` + Env map[string]string `json:"env,omitempty"` + Username string `json:"username"` + Password string `json:"password"` + // SandboxID optionally ties this deploy to a sandbox. When set, the + // sandbox's host_port for the repo is used as the container's + // published port and the env is the union of (sandbox env) and + // (request env). + SandboxID string `json:"sandboxId,omitempty"` + // HostPort overrides the auto-allocated port. Use 0 to allocate. + HostPort int `json:"hostPort,omitempty"` +} + +// handleCreateDeployment is the Slice-2-friendly deploy path. It +// resolves the repo to a node, looks up the sandbox's port (if any), +// and dispatches the deploy frame. +func (s *Server) handleCreateDeployment(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + var body createDeployReq + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + if body.Repository == "" || body.Branch == "" { + writeErr(w, http.StatusBadRequest, "repository and branch required") + return + } + node, err := s.resolveNode(body.Repository, body.SandboxID) + if err != nil { + writeErr(w, http.StatusBadRequest, err.Error()) + return + } + s.agents.mu.RLock() + connected := s.agents.conns[node] + s.agents.mu.RUnlock() + if !connected { + writeErr(w, http.StatusServiceUnavailable, "agent "+node+" not connected") + return + } + user := s.userFromContext(r) + id := newID() + + hostPort := body.HostPort + env := body.Env + if body.SandboxID != "" { + sb, err := s.st.GetSandbox(body.SandboxID) + if err == nil { + for _, svc := range sb.Services { + if svc.Repo == body.Repository { + if hostPort == 0 { + hostPort = svc.HostPort + } + if svc.EnvID != "" { + if e, err := s.st.GetEnvironment(svc.EnvID); err == nil { + if env == nil { + env = map[string]string{} + } + for k, v := range e.Values { + env[k] = v + } + } + } + break + } + } + } + } + if err := s.st.StartDeploymentInSandbox(id, body.SandboxID, body.Repository, body.Branch, user, hostPort); err != nil { + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + + req := protocol.DeployRequest{ + DeploymentID: id, + SandboxID: body.SandboxID, + Repository: body.Repository, + Branch: body.Branch, + HostPort: hostPort, + Env: env, + Username: body.Username, + Password: body.Password, + } + payload, _ := json.Marshal(map[string]any{"op": "deploy", "data": req}) + if !s.hub.SendToAgent(node, payload) { + writeErr(w, http.StatusServiceUnavailable, "agent buffer full") + return + } + writeJSON(w, http.StatusOK, map[string]any{"id": id}) +} + +type stopDeployReq struct { + ID string `json:"id"` + Node string `json:"node"` +} + +func (s *Server) handleStopDeployment(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + var body stopDeployReq + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + if body.Node == "" { + // try to find the node from the deployment row + if d, err := s.st.GetDeployment(body.ID); err == nil && d != nil { + body.Node, _ = s.resolveNode(d.Repository, d.SandboxID) + } + } + if body.Node == "" { + writeErr(w, http.StatusBadRequest, "node required") + return + } + payload, _ := json.Marshal(map[string]any{"op": "stop", "id": body.ID}) + if !s.hub.SendToAgent(body.Node, payload) { + writeErr(w, http.StatusServiceUnavailable, "agent not reachable") + return + } + _ = s.st.FinishDeployment(body.ID, "STOPPED") + writeJSON(w, http.StatusOK, map[string]bool{"ok": true}) +} + +// resolveNode picks which agent serves a repo. The gateway agent owns +// the api-gateway repo. Everything else is the micro agent. +func (s *Server) resolveNode(repo, sandboxID string) (string, error) { + if repo == "api-gateway" { + return "gateway", nil + } + return "micro", nil +} + +// callAgentPushRoutes pushes a routes table to the gateway agent. +func (s *Server) callAgentPushRoutes(ctx context.Context, sandboxID string, payload []protocol.RouteOverride) error { + ctx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + data := map[string]any{ + "sandboxId": sandboxID, + "routes": payload, + } + raw, err := s.hub.CallAgent(ctx, "gateway", "push_routes", data, 0) + if err != nil { + return err + } + var reply struct { + OK bool `json:"ok"` + Err string `json:"error,omitempty"` + } + if err := json.Unmarshal(raw, &reply); err != nil { + return err + } + if !reply.OK { + return wsError(reply.Err) + } + return nil +} diff --git a/control-plane/internal/api/errors.go b/control-plane/internal/api/errors.go new file mode 100644 index 0000000..40a0aad --- /dev/null +++ b/control-plane/internal/api/errors.go @@ -0,0 +1,6 @@ +package api + +import "errors" + +// wsError turns a string from an agent reply into an error. +func wsError(s string) error { return errors.New(s) } diff --git a/control-plane/internal/api/json.go b/control-plane/internal/api/json.go new file mode 100644 index 0000000..8636e3b --- /dev/null +++ b/control-plane/internal/api/json.go @@ -0,0 +1,11 @@ +package api + +import ( + "encoding/json" + "io" +) + +func encodeJSON(w io.Writer, v interface{}) error { + enc := json.NewEncoder(w) + return enc.Encode(v) +} diff --git a/control-plane/internal/api/login.go b/control-plane/internal/api/login.go new file mode 100644 index 0000000..296334a --- /dev/null +++ b/control-plane/internal/api/login.go @@ -0,0 +1,96 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + "time" +) + +type loginReq struct { + Username string `json:"username"` + Password string `json:"password"` + // Repo is an optional override: validate creds against a specific + // repo on a specific agent. If empty, we use the gateway's default + // repo (api-gateway) on the connected gateway agent. + Repo string `json:"repo"` +} + +func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + var body loginReq + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + if body.Username == "" || body.Password == "" { + writeErr(w, http.StatusBadRequest, "username and password required") + return + } + ok := s.validateViaAgent(r.Context(), body.Username, body.Password, body.Repo) + if !ok { + writeErr(w, http.StatusUnauthorized, "login failed — git ls-remote rejected the credentials") + return + } + tok := s.sess.Issue(body.Username) + http.SetCookie(w, &http.Cookie{ + Name: "sdp_session", + Value: tok, + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + MaxAge: 12 * 3600, + }) + writeJSON(w, http.StatusOK, map[string]bool{"ok": true}) +} + +func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) { + if c, err := r.Cookie("sdp_session"); err == nil { + s.sess.Revoke(c.Value) + } + http.SetCookie(w, &http.Cookie{ + Name: "sdp_session", + Value: "", + Path: "/", + HttpOnly: true, + MaxAge: -1, + }) + writeJSON(w, http.StatusOK, map[string]bool{"ok": true}) +} + +// validateViaAgent asks the connected gateway agent to run +// `git ls-remote` against the api-gateway repo. The agent holds the +// repo and the trust boundary for Bitbucket creds. +func (s *Server) validateViaAgent(ctx context.Context, user, pass, repo string) bool { + s.agents.mu.RLock() + _, connected := s.agents.conns["gateway"] + s.agents.mu.RUnlock() + if !connected { + return false + } + if repo == "" { + repo = s.gatewayRepo + } + data := map[string]string{ + "repo": repo, + "username": user, + "password": pass, + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + raw, err := s.hub.CallAgent(ctx, "gateway", "probe", data, 0) + if err != nil { + return false + } + var reply struct { + OK bool `json:"ok"` + Err string `json:"error,omitempty"` + } + if err := json.Unmarshal(raw, &reply); err != nil { + return false + } + return reply.OK +} diff --git a/control-plane/internal/api/repos.go b/control-plane/internal/api/repos.go new file mode 100644 index 0000000..811db04 --- /dev/null +++ b/control-plane/internal/api/repos.go @@ -0,0 +1,157 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + "sort" + "time" +) + +// repoInfo is the shape the dashboard consumes. It carries the nodeID +// the repo lives on, so the dashboard can pick the right agent when +// building a deploy. +type repoInfo struct { + Name string `json:"name"` + Node string `json:"node"` + Path string `json:"path"` +} + +// nodeRepoList is the reply shape from the agent's "list_repos" RPC. +type nodeRepoList struct { + Repos []repoInfo `json:"repos"` +} + +// handleListRepos queries every connected agent for its repo list and +// merges the results. +func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "GET only", http.StatusMethodNotAllowed) + return + } + s.agents.mu.RLock() + nodes := make([]string, 0, len(s.agents.conns)) + for n, c := range s.agents.conns { + if c { + nodes = append(nodes, n) + } + } + s.agents.mu.RUnlock() + sort.Strings(nodes) + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + merged := make([]repoInfo, 0) + for _, node := range nodes { + raw, err := s.hub.CallAgent(ctx, node, "list_repos", map[string]any{}, 0) + if err != nil { + continue // agent flaked; skip + } + var reply nodeRepoList + if err := json.Unmarshal(raw, &reply); err != nil { + continue + } + for _, repo := range reply.Repos { + repo.Node = node + merged = append(merged, repo) + } + } + // Stable order: by node then name. + sort.Slice(merged, func(i, j int) bool { + if merged[i].Node != merged[j].Node { + return merged[i].Node < merged[j].Node + } + return merged[i].Name < merged[j].Name + }) + writeJSON(w, http.StatusOK, merged) +} + +// handleListBranches asks the connected agent that owns `repo` for its +// branches. The repo's node is implied by the prefix path; for now we +// try both connected agents and merge. +func (s *Server) handleListBranches(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "GET only", http.StatusMethodNotAllowed) + return + } + repo := r.URL.Query().Get("repo") + if repo == "" { + writeErr(w, http.StatusBadRequest, "repo required") + return + } + + s.agents.mu.RLock() + nodes := make([]string, 0, len(s.agents.conns)) + for n, c := range s.agents.conns { + if c { + nodes = append(nodes, n) + } + } + s.agents.mu.RUnlock() + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + branches := make([]string, 0) + for _, node := range nodes { + raw, err := s.hub.CallAgent(ctx, node, "list_branches", map[string]any{"repo": repo}, 0) + if err != nil { + continue + } + var reply struct { + OK bool `json:"ok"` + Branches []string `json:"branches"` + } + if err := json.Unmarshal(raw, &reply); err != nil || !reply.OK { + continue + } + branches = append(branches, reply.Branches...) + } + sort.Strings(branches) + // dedupe + dedup := make([]string, 0, len(branches)) + seen := map[string]bool{} + for _, b := range branches { + if !seen[b] { + dedup = append(dedup, b) + seen[b] = true + } + } + writeJSON(w, http.StatusOK, dedup) +} + +// handleListAgents returns which agents are currently connected. +func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "GET only", http.StatusMethodNotAllowed) + return + } + s.agents.mu.RLock() + defer s.agents.mu.RUnlock() + out := make([]map[string]string, 0, len(s.agents.conns)) + for n, c := range s.agents.conns { + if c { + out = append(out, map[string]string{"node": n}) + } + } + sort.Slice(out, func(i, j int) bool { return out[i]["node"] < out[j]["node"] }) + writeJSON(w, http.StatusOK, out) +} + +// handleListRoutes proxies the list_routes RPC to the gateway agent. +// The dashboard uses this to show the live _url map from the +// currently-checked-out branch's config.php. +func (s *Server) handleListRoutes(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "GET only", http.StatusMethodNotAllowed) + return + } + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + raw, err := s.hub.CallAgent(ctx, "gateway", "list_routes", map[string]any{}, 0) + if err != nil { + writeErr(w, http.StatusBadGateway, err.Error()) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(raw) +} diff --git a/control-plane/internal/store/deployments.go b/control-plane/internal/store/deployments.go new file mode 100644 index 0000000..5c4b34d --- /dev/null +++ b/control-plane/internal/store/deployments.go @@ -0,0 +1,243 @@ +package store + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/sdp/protocol" +) + +// Deployment is a deployment row joined with the current state and +// container info. Used by the dashboard's history list and the +// "active deployment" badge in the sandbox view. +type Deployment struct { + ID string `json:"id"` + SandboxID string `json:"sandboxId,omitempty"` + Repository string `json:"repository"` + Branch string `json:"branch"` + User string `json:"user"` + State string `json:"state"` + ContainerID string `json:"containerId,omitempty"` + HostPort int `json:"hostPort,omitempty"` + StartedAt int64 `json:"startedAt"` + CompletedAt int64 `json:"completedAt,omitempty"` +} + +// StartDeployment records a new deployment row. Idempotent on id. +func (s *Store) StartDeployment(id, repo, branch, user string) error { + _, err := s.db.Exec( + `INSERT OR IGNORE INTO deployments(id, repository, branch, user, state, started_at) VALUES(?,?,?,?,?,?)`, + id, repo, branch, user, "QUEUED", time.Now().UnixMilli(), + ) + return err +} + +// StartDeploymentInSandbox records a new deployment tied to a sandbox. +// sandboxID and hostPort are nullable; empty string / 0 stores NULL. +func (s *Store) StartDeploymentInSandbox(id, sandboxID, repo, branch, user string, hostPort int) error { + var sb interface{} + if sandboxID != "" { + sb = sandboxID + } + var hp interface{} + if hostPort > 0 { + hp = hostPort + } + _, err := s.db.Exec( + `INSERT OR IGNORE INTO deployments(id, sandbox_id, repository, branch, user, state, host_port, started_at) + VALUES(?,?,?,?,?,?,?,?)`, + id, sb, repo, branch, user, "QUEUED", hp, time.Now().UnixMilli(), + ) + return err +} + +// FinishDeployment marks the final state. completed_at is set if state is terminal. +func (s *Store) FinishDeployment(id, state string) error { + _, err := s.db.Exec( + `UPDATE deployments SET state=?, completed_at=? WHERE id=?`, + state, time.Now().UnixMilli(), id, + ) + return err +} + +// SetContainerID records the container id once a deploy is RUNNING. +func (s *Store) SetContainerID(id, containerID string) error { + _, err := s.db.Exec(`UPDATE deployments SET container_id=? WHERE id=?`, containerID, id) + return err +} + +// MarkStage records a stage transition. ok=1 success, 0 failure. +func (s *Store) MarkStage(id, stage string, ok bool) error { + v := 0 + if ok { + v = 1 + } + _, err := s.db.Exec( + `INSERT INTO progress(deployment_id, stage, ok, at) VALUES(?,?,?,?)`, + id, stage, v, time.Now().UnixMilli(), + ) + return err +} + +// AppendEvent writes an event. Log lines go to .log; progress/status hit SQLite. +// The deployment's running state is also updated so /api/deployments/{id} can +// serve a snapshot without replaying the whole log. +func (s *Store) AppendEvent(e protocol.Event) error { + switch e.Kind { + case "log": + return s.appendLog(e) + case "status": + _, err := s.db.Exec(`UPDATE deployments SET state=? WHERE id=?`, e.State, e.DeploymentID) + return err + case "progress": + return s.MarkStage(e.DeploymentID, e.Stage, e.State != "FAILED") + } + return nil +} + +func (s *Store) appendLog(e protocol.Event) error { + s.mu.Lock() + f, ok := s.logs[e.DeploymentID] + if !ok { + path := filepath.Join(s.dir, "logs", e.DeploymentID+".log") + var err error + f, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + s.mu.Unlock() + return err + } + s.logs[e.DeploymentID] = f + } + s.mu.Unlock() + ts := time.UnixMilli(e.At).Format("15:04:05.000") + _, err := fmt.Fprintf(f, "%s %s\n", ts, e.Line) + return err +} + +// TailLogs returns the last n lines of a deployment's log file. Used by the +// dashboard on first connect to backfill. +func (s *Store) TailLogs(id string, n int) ([]string, error) { + path := filepath.Join(s.dir, "logs", id+".log") + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + // ponytail: O(n) scan, fine for tail use. swap to a ring buffer if logs get huge. + var lines []string + start := 0 + for i, b := range data { + if b == '\n' { + lines = append(lines, string(data[start:i])) + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, string(data[start:])) + } + if n > 0 && len(lines) > n { + lines = lines[len(lines)-n:] + } + return lines, nil +} + +// ListDeployments returns deployments, newest first. If sandboxID is set, +// filters to that sandbox. limit caps the result. +func (s *Store) ListDeployments(sandboxID string, limit int) ([]Deployment, error) { + if limit <= 0 || limit > 500 { + limit = 100 + } + var ( + rows *sql.Rows + err error + ) + if sandboxID == "" { + rows, err = s.db.Query( + `SELECT id, COALESCE(sandbox_id,''), repository, branch, COALESCE(user,''), state, + COALESCE(container_id,''), COALESCE(host_port,0), started_at, COALESCE(completed_at,0) + FROM deployments ORDER BY started_at DESC LIMIT ?`, limit) + } else { + rows, err = s.db.Query( + `SELECT id, COALESCE(sandbox_id,''), repository, branch, COALESCE(user,''), state, + COALESCE(container_id,''), COALESCE(host_port,0), started_at, COALESCE(completed_at,0) + FROM deployments WHERE sandbox_id=? ORDER BY started_at DESC LIMIT ?`, sandboxID, limit) + } + if err != nil { + return nil, err + } + defer rows.Close() + var out []Deployment + for rows.Next() { + var d Deployment + if err := rows.Scan(&d.ID, &d.SandboxID, &d.Repository, &d.Branch, &d.User, &d.State, + &d.ContainerID, &d.HostPort, &d.StartedAt, &d.CompletedAt); err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + +// LatestDeploymentBySandboxService returns the most recent deployment for +// the (sandbox, repo) pair, or nil if none. +func (s *Store) LatestDeploymentBySandboxService(sandboxID, repo string) (*Deployment, error) { + row := s.db.QueryRow( + `SELECT id, COALESCE(sandbox_id,''), repository, branch, COALESCE(user,''), state, + COALESCE(container_id,''), COALESCE(host_port,0), started_at, COALESCE(completed_at,0) + FROM deployments WHERE sandbox_id=? AND repository=? ORDER BY started_at DESC LIMIT 1`, + sandboxID, repo) + var d Deployment + err := row.Scan(&d.ID, &d.SandboxID, &d.Repository, &d.Branch, &d.User, &d.State, + &d.ContainerID, &d.HostPort, &d.StartedAt, &d.CompletedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &d, nil +} + +// GetDeployment returns one deployment by id, or nil if not found. +func (s *Store) GetDeployment(id string) (*Deployment, error) { + row := s.db.QueryRow( + `SELECT id, COALESCE(sandbox_id,''), repository, branch, COALESCE(user,''), state, + COALESCE(container_id,''), COALESCE(host_port,0), started_at, COALESCE(completed_at,0) + FROM deployments WHERE id=?`, id) + var d Deployment + err := row.Scan(&d.ID, &d.SandboxID, &d.Repository, &d.Branch, &d.User, &d.State, + &d.ContainerID, &d.HostPort, &d.StartedAt, &d.CompletedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &d, nil +} + +// marshalJSON is a small helper for the json column on environments. +func marshalJSON(v map[string]string) (string, error) { + b, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(b), nil +} + +func unmarshalJSON(s string) (map[string]string, error) { + if s == "" { + return map[string]string{}, nil + } + var m map[string]string + if err := json.Unmarshal([]byte(s), &m); err != nil { + return nil, err + } + return m, nil +} diff --git a/control-plane/internal/store/store.go b/control-plane/internal/store/store.go index 40d95ef..2d54008 100644 --- a/control-plane/internal/store/store.go +++ b/control-plane/internal/store/store.go @@ -1,19 +1,17 @@ -// Package store persists deployment progress in SQLite and log lines in -// append-only .log files. The hot path is AppendEvent — agents emit a lot -// of these and the dashboard wants them live. +// Package store persists deployment progress and Slice-2 metadata in +// SQLite, and log lines in append-only .log files. The hot path is +// AppendEvent — agents emit a lot of these and the dashboard wants them +// live. Slice 2 adds sandboxes, templates, environments, routes, and +// port allocations on top of the deployment rows. package store import ( "database/sql" - "fmt" "os" "path/filepath" "sync" - "time" _ "modernc.org/sqlite" - - "github.com/sdp/protocol" ) type Store struct { @@ -34,28 +32,85 @@ func Open(dir string) (*Store, error) { if err != nil { return nil, err } - if _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS deployments ( - id TEXT PRIMARY KEY, - repository TEXT, - branch TEXT, - user TEXT, - state TEXT, - started_at INTEGER, - completed_at INTEGER - ); - CREATE TABLE IF NOT EXISTS progress ( - deployment_id TEXT, - stage TEXT, - ok INTEGER, - at INTEGER - ); - `); err != nil { + if _, err := db.Exec(schema); err != nil { return nil, err } return &Store{db: db, dir: dir, logs: make(map[string]*os.File)}, nil } +const schema = ` +CREATE TABLE IF NOT EXISTS deployments ( + id TEXT PRIMARY KEY, + sandbox_id TEXT, + repository TEXT, + branch TEXT, + user TEXT, + state TEXT, + container_id TEXT, + host_port INTEGER, + started_at INTEGER, + completed_at INTEGER +); +CREATE TABLE IF NOT EXISTS progress ( + deployment_id TEXT, + stage TEXT, + ok INTEGER, + at INTEGER +); + +CREATE TABLE IF NOT EXISTS sandboxes ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + gateway_branch TEXT, + gateway_env_id TEXT, + gateway_host_port INTEGER, + created_at INTEGER, + updated_at INTEGER +); +CREATE TABLE IF NOT EXISTS sandbox_services ( + id TEXT PRIMARY KEY, + sandbox_id TEXT NOT NULL, + repo TEXT NOT NULL, + branch TEXT, + env_id TEXT, + host_port INTEGER, + use_ocp INTEGER DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS templates ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + gateway_branch TEXT, + created_at INTEGER, + updated_at INTEGER +); +CREATE TABLE IF NOT EXISTS template_services ( + id TEXT PRIMARY KEY, + template_id TEXT NOT NULL, + repo TEXT NOT NULL, + branch TEXT, + env_id TEXT, + host_port INTEGER, + use_ocp INTEGER DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS environments ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + values_json TEXT NOT NULL, + created_at INTEGER, + updated_at INTEGER +); + +CREATE TABLE IF NOT EXISTS routes ( + id TEXT PRIMARY KEY, + sandbox_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + target_ocp INTEGER NOT NULL DEFAULT 0 +); +` + func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -64,98 +119,3 @@ func (s *Store) Close() error { } return s.db.Close() } - -// StartDeployment records a new deployment row. Idempotent on id. -func (s *Store) StartDeployment(id, repo, branch, user string) error { - _, err := s.db.Exec( - `INSERT OR IGNORE INTO deployments(id, repository, branch, user, state, started_at) VALUES(?,?,?,?,?,?)`, - id, repo, branch, user, "QUEUED", time.Now().UnixMilli(), - ) - return err -} - -// FinishDeployment marks the final state. completed_at is set if state is terminal. -func (s *Store) FinishDeployment(id, state string) error { - _, err := s.db.Exec( - `UPDATE deployments SET state=?, completed_at=? WHERE id=?`, - state, time.Now().UnixMilli(), id, - ) - return err -} - -// MarkStage records a stage transition. ok=1 success, 0 failure. -func (s *Store) MarkStage(id, stage string, ok bool) error { - v := 0 - if ok { - v = 1 - } - _, err := s.db.Exec( - `INSERT INTO progress(deployment_id, stage, ok, at) VALUES(?,?,?,?)`, - id, stage, v, time.Now().UnixMilli(), - ) - return err -} - -// AppendEvent writes an event. Log lines go to .log; progress/status hit SQLite. -// The deployment's running state is also updated so /api/deployments/{id} can -// serve a snapshot without replaying the whole log. -func (s *Store) AppendEvent(e protocol.Event) error { - switch e.Kind { - case "log": - return s.appendLog(e) - case "status": - _, err := s.db.Exec(`UPDATE deployments SET state=? WHERE id=?`, e.State, e.DeploymentID) - return err - case "progress": - return s.MarkStage(e.DeploymentID, e.Stage, e.State != "FAILED") - } - return nil -} - -func (s *Store) appendLog(e protocol.Event) error { - s.mu.Lock() - f, ok := s.logs[e.DeploymentID] - if !ok { - path := filepath.Join(s.dir, "logs", e.DeploymentID+".log") - var err error - f, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) - if err != nil { - s.mu.Unlock() - return err - } - s.logs[e.DeploymentID] = f - } - s.mu.Unlock() - ts := time.UnixMilli(e.At).Format("15:04:05.000") - _, err := fmt.Fprintf(f, "%s %s\n", ts, e.Line) - return err -} - -// TailLogs returns the last n lines of a deployment's log file. Used by the -// dashboard on first connect to backfill. -func (s *Store) TailLogs(id string, n int) ([]string, error) { - path := filepath.Join(s.dir, "logs", id+".log") - data, err := os.ReadFile(path) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - return nil, err - } - // ponytail: O(n) scan, fine for tail use. swap to a ring buffer if logs get huge. - var lines []string - start := 0 - for i, b := range data { - if b == '\n' { - lines = append(lines, string(data[start:i])) - start = i + 1 - } - } - if start < len(data) { - lines = append(lines, string(data[start:])) - } - if n > 0 && len(lines) > n { - lines = lines[len(lines)-n:] - } - return lines, nil -} diff --git a/control-plane/internal/store/store_test.go b/control-plane/internal/store/store_test.go new file mode 100644 index 0000000..0bdd579 --- /dev/null +++ b/control-plane/internal/store/store_test.go @@ -0,0 +1,114 @@ +package store_test + +import ( + "os" + "testing" + + "github.com/sdp/control-plane/internal/store" +) + +// TestSandboxRoundTrip exercises the Slice-2 CRUD path against a +// throwaway data directory. The build pipeline runs this; the +// real deployment uses the control plane's HTTP layer. +func TestSandboxRoundTrip(t *testing.T) { + dir, err := os.MkdirTemp("", "sdp-store-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + st, err := store.Open(dir) + if err != nil { + t.Fatalf("open: %v", err) + } + defer st.Close() + + // env + e := store.Environment{ID: "e1", Name: "dev", Values: map[string]string{"FOO": "bar"}} + if err := st.CreateEnvironment(e); err != nil { + t.Fatalf("env create: %v", err) + } + got, err := st.GetEnvironment("e1") + if err != nil { + t.Fatalf("env get: %v", err) + } + if got.Values["FOO"] != "bar" { + t.Fatalf("env values: got %v", got.Values) + } + + // sandbox + sb := store.Sandbox{ID: "s1", Name: "qa", GatewayBranch: "develop", GatewayHostPort: 8080, + Services: []store.SandboxService{ + {ID: "s1-haven", SandboxID: "s1", Repo: "haven", Branch: "main", HostPort: 9001, UseOCP: false, EnvID: "e1"}, + }} + if err := st.CreateSandbox(sb); err != nil { + t.Fatalf("sandbox create: %v", err) + } + gotsb, err := st.GetSandbox("s1") + if err != nil { + t.Fatalf("sandbox get: %v", err) + } + if len(gotsb.Services) != 1 || gotsb.Services[0].Repo != "haven" { + t.Fatalf("sandbox services: %+v", gotsb.Services) + } + + // template + clone + t1 := store.Template{ID: "t1", Name: "acct", GatewayBranch: "develop", + Services: []store.TemplateService{ + {ID: "t1-haven", TemplateID: "t1", Repo: "haven", Branch: "feature/x", HostPort: 9001, UseOCP: false}, + }} + if err := st.CreateTemplate(t1); err != nil { + t.Fatalf("tpl create: %v", err) + } + if err := st.CloneTemplateIntoSandbox("t1", store.Sandbox{ID: "s2", Name: "qa2"}); err != nil { + t.Fatalf("clone: %v", err) + } + cs, err := st.GetSandbox("s2") + if err != nil { + t.Fatalf("cloned get: %v", err) + } + if cs.GatewayBranch != "develop" || len(cs.Services) != 1 { + t.Fatalf("cloned content: %+v", cs) + } + + // routes + if err := st.SetRoutes("s1", []store.Route{ + {ID: "r1", SandboxID: "s1", Key: "haven", Value: "http://172.18.136.92:9001"}, + {ID: "r2", SandboxID: "s1", Key: "eredar", Value: "", TargetOCP: true}, + }); err != nil { + t.Fatalf("routes: %v", err) + } + rs, err := st.ListRoutes("s1") + if err != nil { + t.Fatalf("routes list: %v", err) + } + if len(rs) != 2 { + t.Fatalf("routes count: %d", len(rs)) + } + + // deployment + if err := st.StartDeploymentInSandbox("d1", "s1", "haven", "main", "achmad", 9001); err != nil { + t.Fatalf("dep start: %v", err) + } + d, err := st.GetDeployment("d1") + if err != nil { + t.Fatalf("dep get: %v", err) + } + if d.HostPort != 9001 || d.SandboxID != "s1" { + t.Fatalf("dep content: %+v", d) + } + if err := st.SetContainerID("d1", "container-abc"); err != nil { + t.Fatalf("container id: %v", err) + } + d, _ = st.GetDeployment("d1") + if d.ContainerID != "container-abc" { + t.Fatalf("container id roundtrip: %q", d.ContainerID) + } + deps, err := st.ListDeployments("s1", 10) + if err != nil { + t.Fatalf("deps list: %v", err) + } + if len(deps) != 1 { + t.Fatalf("deps count: %d", len(deps)) + } +} diff --git a/control-plane/internal/ws/handlers.go b/control-plane/internal/ws/handlers.go index afae970..ff119ff 100644 --- a/control-plane/internal/ws/handlers.go +++ b/control-plane/internal/ws/handlers.go @@ -59,6 +59,36 @@ func (h *Hub) AgentWS(st *store.Store, onConnect, onDisconnect func(nodeID strin if err != nil { return } + // Peek at the shape: protocol.Event frames have "kind", + // RPC replies have "op" == "reply". The agent's protocol + // does not allow a single frame to be both. + var probe map[string]json.RawMessage + if err := json.Unmarshal(raw, &probe); err != nil { + log.Printf("agent %s: bad json: %v", nodeID, err) + continue + } + if _, ok := probe["op"]; ok { + var reply struct { + Op string `json:"op"` + ID string `json:"id"` + OK bool `json:"ok"` + Err string `json:"error,omitempty"` + Raw json.RawMessage `json:"data,omitempty"` + } + if err := json.Unmarshal(raw, &reply); err != nil { + log.Printf("agent %s: bad reply: %v", nodeID, err) + continue + } + if reply.Op != "reply" { + log.Printf("agent %s: unknown op %q", nodeID, reply.Op) + continue + } + if !reply.OK { + log.Printf("agent %s: rpc %s error: %s", nodeID, reply.ID, reply.Err) + } + h.DeliverAgentReply(reply.ID, raw) + continue + } var e protocol.Event if err := json.Unmarshal(raw, &e); err != nil { log.Printf("agent %s: bad event: %v", nodeID, err) diff --git a/control-plane/internal/ws/hub.go b/control-plane/internal/ws/hub.go index e8c0ef5..988605f 100644 --- a/control-plane/internal/ws/hub.go +++ b/control-plane/internal/ws/hub.go @@ -1,16 +1,24 @@ -// Package ws is the WebSocket fan-out for SDP. Two flows: +// Package ws is the WebSocket fan-out for SDP. Three flows: // // agent --(events)--> /ws/agent (one conn per agent) // dashboard client --(subscribe)-> /ws/deployments/{id} (one conn per viewer) +// control plane --(RPC)----> /ws/agent (request/response, sync) // // On agent connect we record the agent's nodeID. On dashboard connect we // register a subscriber for one deployment. Events are best-effort fanned out; -// a slow client is dropped, not allowed to backpressure the agent. +// a slow client is dropped, not allowed to backpressure the agent. The RPC +// flow is request/response with a per-call channel indexed by a unique id. package ws import ( + "context" + cryptorand "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" "net/http" "sync" + "time" "github.com/gorilla/websocket" @@ -21,6 +29,7 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, // ponytail: internal tool, allow all } +// Hub is the central registry of agent connections and dashboard subscribers. type Hub struct { mu sync.RWMutex @@ -29,12 +38,18 @@ type Hub struct { // one channel per connected agent; keyed by nodeID. agents map[string]chan<- []byte // outbound to agent (deploy requests, etc.) + + // pending RPCs keyed by request id. Each value is a channel that + // receives the agent's reply. The agent's inbound reader delivers + // replies here. + pending map[string]chan []byte } func New() *Hub { return &Hub{ - subs: make(map[string]map[chan protocol.Event]struct{}), - agents: make(map[string]chan<- []byte), + subs: make(map[string]map[chan protocol.Event]struct{}), + agents: make(map[string]chan<- []byte), + pending: make(map[string]chan []byte), } } @@ -81,7 +96,7 @@ func (h *Hub) Subscribe(deploymentID string) (chan protocol.Event, func()) { } // RegisterAgent stores the outbound channel for a node. Used when the control -// plane wants to send something back to an agent (deploy requests, stop, etc.). +// plane wants to send something back to an agent (deploy requests, RPCs, etc.). func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) { h.mu.Lock() h.agents[nodeID] = out @@ -91,7 +106,19 @@ func (h *Hub) RegisterAgent(nodeID string, out chan<- []byte) { func (h *Hub) UnregisterAgent(nodeID string) { h.mu.Lock() delete(h.agents, nodeID) + // also fail any pending RPCs. + toFail := make([]chan []byte, 0, len(h.pending)) + for _, ch := range h.pending { + toFail = append(toFail, ch) + } + h.pending = make(map[string]chan []byte) h.mu.Unlock() + for _, ch := range toFail { + select { + case ch <- nil: + default: + } + } } // SendToAgent best-effort sends a payload to a connected agent. Returns false @@ -110,3 +137,74 @@ func (h *Hub) SendToAgent(nodeID string, payload []byte) bool { return false } } + +// CallAgent is a request/response round-trip over the agent's WS. It sends +// {op, id, data} and waits for a {op:"reply", id, ...} back. The raw +// reply JSON is returned; the caller unmarshals into the concrete type. +// +// Errors: ErrAgentOffline, ErrRPCTimeout, or a delivery failure. +func (h *Hub) CallAgent(ctx context.Context, nodeID, op string, data interface{}, timeout time.Duration) ([]byte, error) { + id := newRPCID() + reply := make(chan []byte, 1) + + h.mu.Lock() + h.pending[id] = reply + h.mu.Unlock() + defer func() { + h.mu.Lock() + delete(h.pending, id) + h.mu.Unlock() + }() + + frame := map[string]interface{}{"op": op, "id": id, "data": data} + payload, err := json.Marshal(frame) + if err != nil { + return nil, err + } + if !h.SendToAgent(nodeID, payload) { + return nil, ErrAgentOffline + } + if timeout <= 0 { + timeout = 10 * time.Second + } + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case raw := <-reply: + if raw == nil { + return nil, ErrAgentOffline + } + return raw, nil + case <-timer.C: + return nil, ErrRPCTimeout + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// DeliverAgentReply is called from the agent's inbound reader when a +// frame with op="reply" arrives. It looks up the pending call by id and +// hands the payload over. +func (h *Hub) DeliverAgentReply(id string, raw []byte) { + h.mu.RLock() + ch, ok := h.pending[id] + h.mu.RUnlock() + if !ok { + return + } + select { + case ch <- raw: + default: + } +} + +var ( + ErrAgentOffline = errors.New("agent offline") + ErrRPCTimeout = errors.New("rpc timeout") +) + +func newRPCID() string { + var b [8]byte + _, _ = cryptorand.Read(b[:]) + return hex.EncodeToString(b[:]) +} diff --git a/protocol/types.go b/protocol/types.go index 0f45a9b..fa0397c 100644 --- a/protocol/types.go +++ b/protocol/types.go @@ -20,9 +20,11 @@ type Event struct { // Credentials are passed per-operation; agents MUST NOT log or persist them. type DeployRequest struct { DeploymentID string `json:"deploymentId"` + SandboxID string `json:"sandboxId,omitempty"` // owning sandbox (Slice 2) Repository string `json:"repository"` // name from agent's repo config Branch string `json:"branch"` - Env map[string]string `json:"env,omitempty"` // injected into container + HostPort int `json:"hostPort,omitempty"` // host port to bind the container to + Env map[string]string `json:"env,omitempty"` // injected into container Username string `json:"username"` Password string `json:"password"` } @@ -33,3 +35,23 @@ type DeployResponse struct { OK bool `json:"ok"` Error string `json:"error,omitempty"` } + +// RepoInfo describes one repository the agent knows about. +type RepoInfo struct { + Name string `json:"name"` + Path string `json:"path"` + // DefaultBranch is best-effort; empty if the repo is empty or unreadable. + DefaultBranch string `json:"defaultBranch,omitempty"` +} + +// RouteOverride is a single "_url" line the gateway agent should +// rewrite in the API gateway's config.php. The key is the PHP array key +// (e.g. "haven_url"); the value is the new URL (e.g. +// "http://172.18.136.92:9001"). TargetOCP=true means "leave it alone / +// point back at the OCP URL"; in that case the agent should restore the +// original value from its snapshot. +type RouteOverride struct { + Key string `json:"key"` // e.g. "haven_url" + Value string `json:"value"` // new URL, e.g. "http://172.18.136.92:9001" + TargetOCP bool `json:"targetOcp"` // if true, restore OCP URL from snapshot +}