Skip to content
This repository was archived by the owner on Apr 14, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions internal/daemon/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package daemon

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"strings"
"time"
)

// NotifyPayload is the JSON body sent to DALCENTER_NOTIFY_URL.
type NotifyPayload struct {
Event string `json:"event"` // "task_done", "task_failed"
Dal string `json:"dal"`
TaskID string `json:"task_id"`
Task string `json:"task"`
Status string `json:"status"`
PRUrl string `json:"pr_url,omitempty"`
Error string `json:"error,omitempty"`
Output string `json:"output,omitempty"`
Changes int `json:"git_changes"`
Verified string `json:"verified,omitempty"`
Timestamp string `json:"timestamp"`
}

// notifyTaskComplete sends a notification when a task finishes.
// It tries three channels in order:
// 1. DALCENTER_NOTIFY_URL — HTTP POST with JSON payload
// 2. notify-dalroot CLI — if CallbackPane is set
// 3. Neither — log only
func notifyTaskComplete(dalName string, tr *taskResult, repo string) {
payload := buildNotifyPayload(dalName, tr)

// 1. HTTP notification via DALCENTER_NOTIFY_URL
if url := os.Getenv("DALCENTER_NOTIFY_URL"); url != "" {
go sendNotifyHTTP(url, payload)
}

// 2. CLI notification via notify-dalroot (backward compat)
if tr.CallbackPane != "" {
go sendNotifyCLI(dalName, tr, repo)
}
}

// buildNotifyPayload constructs the notification payload from a task result.
func buildNotifyPayload(dalName string, tr *taskResult) NotifyPayload {
event := "task_done"
if tr.Status == "failed" || tr.Status == "blocked" {
event = "task_failed"
}

p := NotifyPayload{
Event: event,
Dal: dalName,
TaskID: tr.ID,
Task: truncateStr(tr.Task, 200),
Status: tr.Status,
Changes: tr.GitChanges,
Verified: tr.Verified,
Timestamp: time.Now().UTC().Format(time.RFC3339),
}

if tr.Status == "failed" || tr.Status == "blocked" {
p.Error = truncateStr(tr.Error, 500)
}

// Extract PR URL from output if present
if prURL := extractPRUrl(tr.Output); prURL != "" {
p.PRUrl = prURL
}

return p
}

// sendNotifyHTTP posts the payload to the given URL.
func sendNotifyHTTP(url string, payload NotifyPayload) {
data, err := json.Marshal(payload)
if err != nil {
log.Printf("[notify] marshal error: %v", err)
return
}

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Post(url, "application/json", bytes.NewReader(data))
if err != nil {
log.Printf("[notify] HTTP POST to %s failed: %v", url, err)
return
}
resp.Body.Close()
log.Printf("[notify] HTTP POST %s → %d (%s %s)", url, resp.StatusCode, payload.Event, payload.Dal)
}

// sendNotifyCLI calls the notify-dalroot CLI tool for pane-based notification.
func sendNotifyCLI(dalName string, tr *taskResult, repo string) {
msg := fmt.Sprintf("[%s] task %s: %s", dalName, tr.Status, truncateStr(tr.Task, 80))
if prURL := extractPRUrl(tr.Output); prURL != "" {
msg += " → " + prURL
}
if tr.Status == "failed" && tr.Error != "" {
msg += " | error: " + truncateStr(tr.Error, 100)
}
cmd := exec.Command("notify-dalroot", repo, msg, tr.CallbackPane)
if err := cmd.Run(); err != nil {
log.Printf("[notify] dalroot CLI failed: %v", err)
}
}

// extractPRUrl scans output for a GitHub PR URL.
func extractPRUrl(output string) string {
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
if strings.Contains(line, "github.com/") && strings.Contains(line, "/pull/") {
// Find the URL within the line
for _, word := range strings.Fields(line) {
if strings.Contains(word, "github.com/") && strings.Contains(word, "/pull/") {
return strings.TrimRight(word, ".,;:!?\"'`)")
}
}
}
}
return ""
}
155 changes: 155 additions & 0 deletions internal/daemon/notify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package daemon

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)

func TestBuildNotifyPayload_Done(t *testing.T) {
tr := &taskResult{
ID: "task-001",
Dal: "leader",
Task: "go test ./...",
Status: "done",
Output: "all tests passed\nhttps://github.com/dalsoop/dalcenter/pull/42\n",
GitChanges: 3,
Verified: "yes",
}
p := buildNotifyPayload("leader", tr)
if p.Event != "task_done" {
t.Errorf("expected event=task_done, got %s", p.Event)
}
if p.PRUrl != "https://github.com/dalsoop/dalcenter/pull/42" {
t.Errorf("expected PR URL extracted, got %q", p.PRUrl)
}
if p.Error != "" {
t.Errorf("expected no error for done task, got %q", p.Error)
}
if p.Changes != 3 {
t.Errorf("expected 3 changes, got %d", p.Changes)
}
}

func TestBuildNotifyPayload_Failed(t *testing.T) {
tr := &taskResult{
ID: "task-002",
Dal: "dev",
Task: "implement feature X",
Status: "failed",
Error: "compilation error: undefined variable",
}
p := buildNotifyPayload("dev", tr)
if p.Event != "task_failed" {
t.Errorf("expected event=task_failed, got %s", p.Event)
}
if p.Error == "" {
t.Error("expected error content in payload")
}
}

func TestBuildNotifyPayload_Blocked(t *testing.T) {
tr := &taskResult{
ID: "task-003",
Status: "blocked",
Error: "need approval",
}
p := buildNotifyPayload("dev", tr)
if p.Event != "task_failed" {
t.Errorf("blocked should map to task_failed event, got %s", p.Event)
}
}

func TestExtractPRUrl(t *testing.T) {
tests := []struct {
name string
output string
want string
}{
{
name: "github PR URL in output",
output: "Created PR: https://github.com/dalsoop/dalcenter/pull/42",
want: "https://github.com/dalsoop/dalcenter/pull/42",
},
{
name: "no PR URL",
output: "all tests passed\nno changes",
want: "",
},
{
name: "PR URL with trailing punctuation",
output: "see https://github.com/dalsoop/dalcenter/pull/99.",
want: "https://github.com/dalsoop/dalcenter/pull/99",
},
{
name: "multiple lines with PR",
output: "line1\nline2\nhttps://github.com/org/repo/pull/123\nline4",
want: "https://github.com/org/repo/pull/123",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := extractPRUrl(tt.output)
if got != tt.want {
t.Errorf("extractPRUrl() = %q, want %q", got, tt.want)
}
})
}
}

func TestSendNotifyHTTP(t *testing.T) {
var received NotifyPayload
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if ct := r.Header.Get("Content-Type"); ct != "application/json" {
t.Errorf("expected application/json, got %s", ct)
}
json.NewDecoder(r.Body).Decode(&received)
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

payload := NotifyPayload{
Event: "task_done",
Dal: "leader",
TaskID: "task-001",
Status: "done",
PRUrl: "https://github.com/dalsoop/dalcenter/pull/42",
}
sendNotifyHTTP(srv.URL, payload)

if received.Event != "task_done" {
t.Errorf("expected task_done, got %s", received.Event)
}
if received.PRUrl != "https://github.com/dalsoop/dalcenter/pull/42" {
t.Errorf("expected PR URL, got %s", received.PRUrl)
}
}

func TestNotifyPayload_JSONSerialization(t *testing.T) {
p := NotifyPayload{
Event: "task_done",
Dal: "leader",
TaskID: "task-001",
Task: "run tests",
Status: "done",
PRUrl: "https://github.com/org/repo/pull/1",
}
data, err := json.Marshal(p)
if err != nil {
t.Fatal(err)
}
var decoded NotifyPayload
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded.PRUrl != p.PRUrl {
t.Errorf("PR URL lost in round-trip: got %q", decoded.PRUrl)
}
if decoded.Event != "task_done" {
t.Errorf("event lost in round-trip: got %q", decoded.Event)
}
}
25 changes: 8 additions & 17 deletions internal/daemon/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func (d *Daemon) handleTaskFinish(w http.ResponseWriter, r *http.Request) {
http.Error(w, "task not found", http.StatusNotFound)
return
}

// Notify dalroot on external task finish
notifyTaskComplete(tr.Dal, tr, d.serviceRepo)

respondJSON(w, http.StatusOK, tr)
}

Expand Down Expand Up @@ -449,10 +453,8 @@ func (d *Daemon) execTaskInContainer(c *Container, tr *taskResult) {
Timestamp: now.Format(time.RFC3339),
})

// Notify dalroot if callback pane was specified
if tr.CallbackPane != "" {
notifyDalroot(c.DalName, tr, d.serviceRepo)
}
// Notify dalroot on failure
notifyTaskComplete(c.DalName, tr, d.serviceRepo)
} else {
tr.Status = "done"
tr.Output = stdout.String()
Expand Down Expand Up @@ -484,10 +486,8 @@ func (d *Daemon) execTaskInContainer(c *Container, tr *taskResult) {
Timestamp: now.Format(time.RFC3339),
})

// Notify dalroot if callback pane was specified
if tr.CallbackPane != "" {
notifyDalroot(c.DalName, tr, d.serviceRepo)
}
// Notify dalroot on completion
notifyTaskComplete(c.DalName, tr, d.serviceRepo)
}
}

Expand Down Expand Up @@ -519,15 +519,6 @@ func verifyTaskChanges(containerID string, tr *taskResult) {
}
}

// notifyDalroot calls notify-dalroot to send a notification to the requesting pane.
func notifyDalroot(dalName string, tr *taskResult, repo string) {
msg := fmt.Sprintf("[%s] task %s: %s", dalName, tr.Status, truncateStr(tr.Task, 80))
cmd := exec.Command("notify-dalroot", repo, msg, tr.CallbackPane)
if err := cmd.Run(); err != nil {
log.Printf("[notify] dalroot notification failed: %v", err)
}
}

func truncateStr(s string, n int) string {
if len(s) <= n {
return s
Expand Down
Loading