From 6f07fdc7c02da6ae93fd2b85b28f6ed723d62d1b Mon Sep 17 00:00:00 2001 From: santh-cpu Date: Wed, 11 Feb 2026 15:51:19 +0530 Subject: [PATCH 1/4] updated readme for dependencies --- README.md | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ee9ad54..5b779be 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,31 @@ -# VolPE +# VolPE _Volunteer Computing Platform for Evolutionary Algorithms_ This repository houses the primary components of VolPE, i.e. the master and the worker applications. ## Dependencies -1. Install `podman` and the Go toolchain -2. Setup podman appropriately -3. Download this repository -4. Export the environment variable `CONTAINER_HOST`. Value can be determined by running `podman info -f json | jq .host.remoteSocket.path`. The output from the command must be modified as follows.If the output is `"/run/user/1000/podman/podman.sock"`, the following command must be used to export the env. var. +1. Install the required system packages: + + **Ubuntu** + - `git` + - `libgpgme-dev` + - `podman` + - `libbtrfs-dev` + + **MacOS** + - `git` + - `gpgme` + - `podman` + - `btrfs-progs` + + **Windows** + - `git` + - `podman` + - `go` + - `uv` +2. Install the Go toolchain +3. Setup podman appropriately +4. Download this repository 4. Export the environment variable `CONTAINER_HOST`. Value can be determined by running `podman info -f json | jq .host.remoteSocket.path`. The output from the command must be modified as follows.If the output is `"/run/user/1000/podman/podman.sock"`, the following command must be used to export the env. var. ``` export CONTAINER_HOST=unix:///xyz.sock ``` From d9dee8ee71c8ef361f5d56c28f82d8661f13964c Mon Sep 17 00:00:00 2001 From: santh-cpu Date: Wed, 18 Feb 2026 21:37:08 +0530 Subject: [PATCH 2/4] added docu for comms and scheduler --- comms/volpe/README.md | 18 ++++++++++++ comms/volpe/master.go | 45 +++++++++++++++--------------- comms/volpe/worker.go | 14 +++++----- container_mgr/problem_container.go | 21 +++++++------- scheduler/prelim_scheduler.go | 7 ++++- scheduler/schedule.go | 6 +++- 6 files changed, 69 insertions(+), 42 deletions(-) create mode 100644 comms/volpe/README.md diff --git a/comms/volpe/README.md b/comms/volpe/README.md new file mode 100644 index 0000000..214b209 --- /dev/null +++ b/comms/volpe/README.md @@ -0,0 +1,18 @@ +#comms + +## worker → master + +workers connect to the master using a bidirectional grpc stream. + +when a worker connects, it: +- sends (worker id, cpu count, memory) + +after that, it periodically sends: +- `devicemetricsmessage` +- `population` + +## master → worker + +the master can send: +- `adjustinstancesmessage` +- seed populations for distributed evolution diff --git a/comms/volpe/master.go b/comms/volpe/master.go index 312a8e6..45e0e93 100644 --- a/comms/volpe/master.go +++ b/comms/volpe/master.go @@ -8,16 +8,17 @@ import ( "net" "os" "sync" + "volpe-framework/comms/common" "volpe-framework/types" - "github.com/rs/zerolog/log" "google.golang.org/grpc" ) // TODO: handle stream closing +// MasterComms is wrapper holding grpc server and listener type MasterComms struct { mcs masterCommsServer sr *grpc.Server @@ -28,22 +29,24 @@ type ProblemStore interface { GetMetadata(problemID string, meta *types.Problem) *types.Problem } +// masterCommsServer implements grpc services type masterCommsServer struct { UnimplementedVolpeMasterServer - chans_mut sync.RWMutex - channs map[string]chan *MasterMessage - metricChan chan *DeviceMetricsMessage - immigChan chan *MigrationMessage - sched SchedulerComms - probStore ProblemStore + chans_mut sync.RWMutex + channs map[string]chan *MasterMessage + metricChan chan *DeviceMetricsMessage + immigChan chan *MigrationMessage + sched SchedulerComms + probStore ProblemStore eventStream chan string } type SchedulerComms interface { - AddWorker(worker types.Worker); - RemoveWorker(workerID string); + AddWorker(worker types.Worker) + RemoveWorker(workerID string) } +// mcsStreamHandlerThread handles the core bidirectional loop func mcsStreamHandlerThread( workerID string, stream grpc.BidiStreamingServer[WorkerMessage, MasterMessage], @@ -52,7 +55,6 @@ func mcsStreamHandlerThread( immigChan chan *MigrationMessage, eventStream chan string, ) { - log.Info().Caller().Msgf("workerID %s connected", workerID) masterRecvChan := make(chan *WorkerMessage) @@ -102,7 +104,7 @@ func mcsStreamHandlerThread( sumFitness := 0.0 popSize := 0 - for _, ind := range(pop.GetMembers()) { + for _, ind := range pop.GetMembers() { sumFitness += float64(ind.GetFitness()) popSize += 1 } @@ -126,7 +128,7 @@ func mcsStreamHandlerThread( log.Info().Caller().Msgf("send chan to workerID %s closed, exiting", workerID) return } - adjInst := result.GetAdjInst() + adjInst := result.GetAdjInst() emigMsg := result.GetMigration() if adjInst != nil { // jsonMsg, _ := json.Marshal(map[string]any{ @@ -139,7 +141,7 @@ func mcsStreamHandlerThread( } else if emigMsg != nil { fitnessSum := float32(0.0) indCount := 0 - for _, ind := range(emigMsg.GetPopulation().GetMembers()) { + for _, ind := range emigMsg.GetPopulation().GetMembers() { fitnessSum += float32(ind.GetFitness()) indCount += 1 } @@ -156,7 +158,6 @@ func mcsStreamHandlerThread( log.Error().Msgf("message sent from master was neither adjust instances nor send popln. ") } - err := stream.Send(result) log.Debug().Caller().Msgf("sent master msg to workerID %s", workerID) if err != nil { @@ -168,6 +169,7 @@ func mcsStreamHandlerThread( } } +// initMasterCommsServer initializes master server state and communication channels func initMasterCommsServer(mcs *masterCommsServer, metricChan chan *DeviceMetricsMessage, eventStream chan string) (err error) { mcs.channs = make(map[string]chan *MasterMessage) mcs.metricChan = metricChan @@ -175,6 +177,7 @@ func initMasterCommsServer(mcs *masterCommsServer, metricChan chan *DeviceMetric return nil } +// StartStreams handles initial worker connection func (mcs *masterCommsServer) StartStreams(stream grpc.BidiStreamingServer[WorkerMessage, MasterMessage]) error { protoMsg, err := stream.Recv() if err != nil { @@ -190,7 +193,6 @@ func (mcs *masterCommsServer) StartStreams(stream grpc.BidiStreamingServer[Worke log.Info().Msgf("workerID %s connected to master", workerID) fmt.Println(workerID) - masterSendChan := make(chan *MasterMessage) mcs.chans_mut.Lock() @@ -198,7 +200,7 @@ func (mcs *masterCommsServer) StartStreams(stream grpc.BidiStreamingServer[Worke mcs.chans_mut.Unlock() mcs.sched.AddWorker(types.Worker{ - WorkerID: workerID, + WorkerID: workerID, CpuCount: workerHelloMsg.GetCpuCount(), MemoryGB: workerHelloMsg.GetMemoryGB(), }) @@ -234,7 +236,7 @@ func (mcs *masterCommsServer) GetProblemData(req *ProblemRequest, stream grpc.Se } fname := meta.ImagePath - + file, err := os.Open(fname) if err != nil { log.Err(err).Caller().Msgf("failed to get image") @@ -250,9 +252,9 @@ func (mcs *masterCommsServer) GetProblemData(req *ProblemRequest, stream grpc.Se stream.Send(&common.ImageStreamObject{ Data: &common.ImageStreamObject_Details{ Details: &common.ProblemDetails{ - ProblemID: problemID, - ImageSizeBytes: int32(fileSize), - MigrationSize: meta.MigrationSize, + ProblemID: problemID, + ImageSizeBytes: int32(fileSize), + MigrationSize: meta.MigrationSize, MigrationFrequency: meta.MigrationFrequency, }, }, @@ -285,7 +287,6 @@ func NewMasterComms(port uint16, metricChan chan *DeviceMetricsMessage, immigCha return nil, err } - sr := grpc.NewServer() mc.sr = sr lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) @@ -294,7 +295,7 @@ func NewMasterComms(port uint16, metricChan chan *DeviceMetricsMessage, immigCha return nil, err } log.Info().Caller().Msgf("master listening on port %d", port) - + mc.lis = lis mc.mcs.sched = sched mc.mcs.immigChan = immigChan diff --git a/comms/volpe/worker.go b/comms/volpe/worker.go index 0ca9fa8..7ff242f 100644 --- a/comms/volpe/worker.go +++ b/comms/volpe/worker.go @@ -12,19 +12,19 @@ import ( "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" -) // TODO: handle stream closing type WorkerComms struct { - client VolpeMasterClient - stream grpc.BidiStreamingClient[WorkerMessage, MasterMessage] - cancelFunc context.CancelFunc + client VolpeMasterClient + stream grpc.BidiStreamingClient[WorkerMessage, MasterMessage] + cancelFunc context.CancelFunc cancelMutex sync.Mutex - workerID string + workerID string // TODO: include something for population } +// NewWorkerComms constructs WorkerComms and starts a grpc stream func NewWorkerComms(endpoint string, workerID string, memoryLimit float32, cpuCount int32) (*WorkerComms, error) { // TODO: channel or something for population adjust wc := new(WorkerComms) @@ -52,7 +52,7 @@ func NewWorkerComms(endpoint string, workerID string, memoryLimit float32, cpuCo Message: &WorkerMessage_Hello{ Hello: &WorkerHello{ WorkerID: &WorkerID{Id: workerID}, - // TODO: use system config + // TODO: use system config CpuCount: int32(runtime.NumCPU()), MemoryGB: float32(memoryLimit), }, @@ -94,6 +94,7 @@ func (wc *WorkerComms) HandleStreams(adjInstChannel chan *AdjustInstancesMessage } } +// SendDeviceMetrics sets workerID and sends metrics over stream func (wc *WorkerComms) SendDeviceMetrics(metrics *DeviceMetricsMessage) error { metrics.WorkerID = wc.workerID workerMsg := WorkerMessage{Message: &WorkerMessage_Metrics{Metrics: metrics}} @@ -127,7 +128,6 @@ func (wc *WorkerComms) GetProblemData(problemID string, meta *types.Problem) err return err } - detailsMsg, err := stream.Recv() if err != nil { log.Error().Caller().Msg(err.Error()) diff --git a/container_mgr/problem_container.go b/container_mgr/problem_container.go index a87a5fc..2abab17 100644 --- a/container_mgr/problem_container.go +++ b/container_mgr/problem_container.go @@ -24,15 +24,15 @@ type ProblemContainer struct { problemID string containerName string // containerPort uint16 - hostPort uint16 - commsClient ccomms.VolpeContainerClient - resultChannels map[chan *ccomms.ResultPopulation]bool - wEmigChan chan *volpe.MigrationMessage - rcMut sync.Mutex + hostPort uint16 + commsClient ccomms.VolpeContainerClient + resultChannels map[chan *ccomms.ResultPopulation]bool + wEmigChan chan *volpe.MigrationMessage + rcMut sync.Mutex containerContext context.Context - cancel context.CancelFunc - meta *types.Problem - containerID int32 + cancel context.CancelFunc + meta *types.Problem + containerID int32 } // generates random name for every container @@ -161,7 +161,7 @@ func (pc *ProblemContainer) sendResultOnce() { // continuosly sends results while containerContext is valid func (pc *ProblemContainer) sendResults() { for { - time.Sleep(5*time.Second) + time.Sleep(5 * time.Second) if pc.containerContext.Err() != nil { log.Err(pc.containerContext.Err()).Msgf("Stopping sendResults for problem %s", pc.problemID) break @@ -194,7 +194,7 @@ func (pc *ProblemContainer) runGenerations() { popln.ProblemID = &pc.meta.ProblemID mig := volpe.MigrationMessage{ Population: popln, - WorkerID: "", + WorkerID: "", // TODO: set proper containerID ContainerID: pc.containerID, } @@ -226,4 +226,3 @@ func (pc *ProblemContainer) stopContainer() { return } } - diff --git a/scheduler/prelim_scheduler.go b/scheduler/prelim_scheduler.go index 829e637..259145c 100644 --- a/scheduler/prelim_scheduler.go +++ b/scheduler/prelim_scheduler.go @@ -3,6 +3,7 @@ package scheduler import ( "slices" "sync" + vcomms "volpe-framework/comms/volpe" "volpe-framework/types" @@ -10,6 +11,7 @@ import ( //"github.com/rs/zerolog/log" ) +// INFO type PrelimScheduler struct { problems []types.Problem workers map[string]*Worker @@ -28,6 +30,7 @@ func NewPrelimScheduler() (*PrelimScheduler, error) { func (ss *PrelimScheduler) Init() error { return nil } +// AddWorker checks and adds worker to PrelimScheduler func (ss *PrelimScheduler) AddWorker(worker types.Worker) { ss.mut.Lock() defer ss.mut.Unlock() @@ -44,7 +47,7 @@ func (ss *PrelimScheduler) AddWorker(worker types.Worker) { func (ss *PrelimScheduler) UpdateMetrics(metrics *vcomms.DeviceMetricsMessage) { // TODO: apply metrics update - //log.Warn().Caller().Msgf("skipping metrics update for static scheduler") + // log.Warn().Caller().Msgf("skipping metrics update for static scheduler") ss.mut.Lock() worker := ss.workers[metrics.WorkerID] ss.mut.Unlock() @@ -53,6 +56,7 @@ func (ss *PrelimScheduler) UpdateMetrics(metrics *vcomms.DeviceMetricsMessage) { worker.CpuUtilPerc = metrics.CpuUtilPerc } +// RemoveWorker checks and removes worker from PrelimScheduler func (ss *PrelimScheduler) RemoveWorker(worker string) { ss.mut.Lock() defer ss.mut.Unlock() @@ -60,6 +64,7 @@ func (ss *PrelimScheduler) RemoveWorker(worker string) { delete(ss.workers, worker) } +// INFO: FillSchedule fills schedule by assigning problems to workers based on CpuCount func (ss *PrelimScheduler) FillSchedule(sched Schedule) error { sched.Reset() diff --git a/scheduler/schedule.go b/scheduler/schedule.go index 94b01fa..92a185b 100644 --- a/scheduler/schedule.go +++ b/scheduler/schedule.go @@ -4,6 +4,7 @@ import "strings" type Schedule map[string]int32 +// Get returns assigned value for problemID@workerID func (s Schedule) Get(workerID string, problemID string) int32 { v, ok := s[problemID+"@"+workerID] if ok { @@ -13,11 +14,13 @@ func (s Schedule) Get(workerID string, problemID string) int32 { } } +// Set assigns val to problemID@workerID func (s Schedule) Set(workerID string, problemID string, val int32) { s[problemID+"@"+workerID] = val } -func (s Schedule) Apply(applyFunc func (workerID string, problemID string, val int32)) { +// Apply applies given function through all iterated schedule entries +func (s Schedule) Apply(applyFunc func(workerID string, problemID string, val int32)) { for k, val := range s { substrings := strings.Split(k, "@") problemID := substrings[0] @@ -26,6 +29,7 @@ func (s Schedule) Apply(applyFunc func (workerID string, problemID string, val i } } +// Reset clears all schedule entries func (s Schedule) Reset() { for k := range s { delete(s, k) From 9802746b100e4d14d3eda8e81e86b4b07f8d0b5a Mon Sep 17 00:00:00 2001 From: santh-cpu Date: Wed, 18 Feb 2026 21:47:09 +0530 Subject: [PATCH 3/4] added README.md for scheduler --- scheduler/README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 scheduler/README.md diff --git a/scheduler/README.md b/scheduler/README.md new file mode 100644 index 0000000..adab5c2 --- /dev/null +++ b/scheduler/README.md @@ -0,0 +1,31 @@ +# scheduler + +## master → scheduler + +the master: + +- creates a scheduler implementation (`PrelimScheduler` or `StaticScheduler`) +- registers workers via: + - `AddWorker` + - `RemoveWorker` +- registers problems via: + - `AddProblem` + - `RemoveProblem` +- forwards device metrics (optional) +- periodically calls: + - `FillSchedule` + +`FillSchedule` produces the desired `` mapping for the current cluster state. + +## scheduler → master + +the scheduler returns: + +- a fully rebuilt `Schedule` +- target instance counts per `` pair + +the master is responsible for: + +- diffing the schedule +- issuing scale up / scale down commands +- enforcing zero assignments for removed problems From e4780cc29ff8ab1b31fb27ea52f5872e7435b5d1 Mon Sep 17 00:00:00 2001 From: santh-cpu Date: Thu, 12 Mar 2026 03:48:58 +0530 Subject: [PATCH 4/4] api documentation --- master/api/controller.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/master/api/controller.go b/master/api/controller.go index 4f627f0..d1cb76f 100644 --- a/master/api/controller.go +++ b/master/api/controller.go @@ -1,4 +1,4 @@ -package api +package appackage api import ( "io" @@ -29,6 +29,7 @@ type VolpeAPI struct { eventOutChannels map[chan string]bool } +// NewVolpeAPI initializes volpe api and starts result distribution func NewVolpeAPI(ps *model.ProblemStore, sched scheduler.Scheduler, contman *contman.ContainerManager, eventStream chan string) (*VolpeAPI, error) { api := &VolpeAPI{ probstore: ps, @@ -41,6 +42,7 @@ func NewVolpeAPI(ps *model.ProblemStore, sched scheduler.Scheduler, contman *con return api, nil } +// RegisterProblem parses multipart form to register new problem and save its image func (va *VolpeAPI) RegisterProblem(c *gin.Context) { problemID := c.Param("id") if problemID == "" { @@ -120,6 +122,7 @@ func (va *VolpeAPI) RegisterProblem(c *gin.Context) { log.Info().Caller().Msgf("registered image %s", problemID) } +// ListProblems returns list of all problems and their running status func (va *VolpeAPI) ListProblems(c *gin.Context) { type Problem struct { ProblemID string `json:"problemID"` @@ -135,6 +138,7 @@ func (va *VolpeAPI) ListProblems(c *gin.Context) { c.JSON(200, map[string]any{"problems": problems}) } +// DeleteProblem removes problem from container manager and scheduler func (va *VolpeAPI) DeleteProblem(c *gin.Context) { problemID := c.Param("id") @@ -146,6 +150,7 @@ func (va *VolpeAPI) DeleteProblem(c *gin.Context) { } } +// GetProblem fetches and returns detailed metadata for a specific problem func (va *VolpeAPI) GetProblem(c *gin.Context) { problemID := c.Param("id") if len(problemID) == 0 { @@ -180,15 +185,18 @@ func (va *VolpeAPI) GetProblem(c *gin.Context) { c.JSON(200, problemData) } +// GetWorkers returns list of active workers from the scheduler func (va *VolpeAPI) GetWorkers(c *gin.Context) { workers := va.sched.GetWorkers() c.JSON(200, workers) } +// GetWorkerCount returns total count of registered workers func (va *VolpeAPI) GetWorkerCount(c *gin.Context) { c.JSON(200, map[string]any{"count": va.sched.GetWorkerCount()}) } +// StartProblem signals scheduler and container manager to begin running a problem func (va *VolpeAPI) StartProblem(c *gin.Context) { problemID := c.Param("id") if len(problemID) == 0 { @@ -231,6 +239,7 @@ func (va *VolpeAPI) StartProblem(c *gin.Context) { } } +// distributeResults forwards events from the internal stream to all registered output channels func (va *VolpeAPI) distributeResults() { for { // event, ok := <- va.eventStream @@ -244,6 +253,7 @@ func (va *VolpeAPI) distributeResults() { } } +// StreamResults establishes sse connection to stream population results for a problem func (va *VolpeAPI) StreamResults(c *gin.Context) { log.Info().Msg("streaming results") @@ -294,6 +304,7 @@ func (va *VolpeAPI) StreamResults(c *gin.Context) { } } +// AbortProblem stops execution of a problem across the framework func (va *VolpeAPI) AbortProblem(c *gin.Context) { problemID := c.Param("id") @@ -308,6 +319,7 @@ func (va *VolpeAPI) AbortProblem(c *gin.Context) { c.Status(200) } +// EventStream establishes sse connection for general framework event notifications func (va *VolpeAPI) EventStream(c *gin.Context) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache")