Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
# VolPE
# VolPE
_Volunteer Computing Platform for Evolutionary Algorithms_

This repository houses the primary components of VolPE, i.e. the master and the worker applications.

## Dependencies
1. Install `podman` and the Go toolchain
2. Setup podman appropriately
3. Download this repository
4. Export the environment variable `CONTAINER_HOST`. Value can be determined by running `podman info -f json | jq .host.remoteSocket.path`. The output from the command must be modified as follows.If the output is `"/run/user/1000/podman/podman.sock"`, the following command must be used to export the env. var.
1. Install the required system packages:

**Ubuntu**
- `git`
- `libgpgme-dev`
- `podman`
- `libbtrfs-dev`

**MacOS**
- `git`
- `gpgme`
- `podman`
- `btrfs-progs`

**Windows**
- `git`
- `podman`
- `go`
- `uv`
2. Install the Go toolchain
3. Setup podman appropriately
4. Download this repository 4. Export the environment variable `CONTAINER_HOST`. Value can be determined by running `podman info -f json | jq .host.remoteSocket.path`. The output from the command must be modified as follows.If the output is `"/run/user/1000/podman/podman.sock"`, the following command must be used to export the env. var.
```
export CONTAINER_HOST=unix:///xyz.sock
```
Expand Down
18 changes: 18 additions & 0 deletions comms/volpe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#comms

## worker → master

workers connect to the master using a bidirectional grpc stream.

when a worker connects, it:
- sends (worker id, cpu count, memory)

after that, it periodically sends:
- `devicemetricsmessage`
- `population`

## master → worker

the master can send:
- `adjustinstancesmessage`
- seed populations for distributed evolution
45 changes: 23 additions & 22 deletions comms/volpe/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"net"
"os"
"sync"

"volpe-framework/comms/common"
"volpe-framework/types"


"github.com/rs/zerolog/log"
"google.golang.org/grpc"
)

// TODO: handle stream closing

// MasterComms is wrapper holding grpc server and listener
type MasterComms struct {
mcs masterCommsServer
sr *grpc.Server
Expand All @@ -28,22 +29,24 @@ type ProblemStore interface {
GetMetadata(problemID string, meta *types.Problem) *types.Problem
}

// masterCommsServer implements grpc services
type masterCommsServer struct {
UnimplementedVolpeMasterServer
chans_mut sync.RWMutex
channs map[string]chan *MasterMessage
metricChan chan *DeviceMetricsMessage
immigChan chan *MigrationMessage
sched SchedulerComms
probStore ProblemStore
chans_mut sync.RWMutex
channs map[string]chan *MasterMessage
metricChan chan *DeviceMetricsMessage
immigChan chan *MigrationMessage
sched SchedulerComms
probStore ProblemStore
eventStream chan string
}

type SchedulerComms interface {
AddWorker(worker types.Worker);
RemoveWorker(workerID string);
AddWorker(worker types.Worker)
RemoveWorker(workerID string)
}

// mcsStreamHandlerThread handles the core bidirectional loop
func mcsStreamHandlerThread(
workerID string,
stream grpc.BidiStreamingServer[WorkerMessage, MasterMessage],
Expand All @@ -52,7 +55,6 @@ func mcsStreamHandlerThread(
immigChan chan *MigrationMessage,
eventStream chan string,
) {

log.Info().Caller().Msgf("workerID %s connected", workerID)

masterRecvChan := make(chan *WorkerMessage)
Expand Down Expand Up @@ -102,7 +104,7 @@ func mcsStreamHandlerThread(

sumFitness := 0.0
popSize := 0
for _, ind := range(pop.GetMembers()) {
for _, ind := range pop.GetMembers() {
sumFitness += float64(ind.GetFitness())
popSize += 1
}
Expand All @@ -126,7 +128,7 @@ func mcsStreamHandlerThread(
log.Info().Caller().Msgf("send chan to workerID %s closed, exiting", workerID)
return
}
adjInst := result.GetAdjInst()
adjInst := result.GetAdjInst()
emigMsg := result.GetMigration()
if adjInst != nil {
// jsonMsg, _ := json.Marshal(map[string]any{
Expand All @@ -139,7 +141,7 @@ func mcsStreamHandlerThread(
} else if emigMsg != nil {
fitnessSum := float32(0.0)
indCount := 0
for _, ind := range(emigMsg.GetPopulation().GetMembers()) {
for _, ind := range emigMsg.GetPopulation().GetMembers() {
fitnessSum += float32(ind.GetFitness())
indCount += 1
}
Expand All @@ -156,7 +158,6 @@ func mcsStreamHandlerThread(
log.Error().Msgf("message sent from master was neither adjust instances nor send popln. ")
}


err := stream.Send(result)
log.Debug().Caller().Msgf("sent master msg to workerID %s", workerID)
if err != nil {
Expand All @@ -168,13 +169,15 @@ func mcsStreamHandlerThread(
}
}

// initMasterCommsServer initializes master server state and communication channels
func initMasterCommsServer(mcs *masterCommsServer, metricChan chan *DeviceMetricsMessage, eventStream chan string) (err error) {
mcs.channs = make(map[string]chan *MasterMessage)
mcs.metricChan = metricChan
mcs.eventStream = eventStream
return nil
}

// StartStreams handles initial worker connection
func (mcs *masterCommsServer) StartStreams(stream grpc.BidiStreamingServer[WorkerMessage, MasterMessage]) error {
protoMsg, err := stream.Recv()
if err != nil {
Expand All @@ -190,15 +193,14 @@ func (mcs *masterCommsServer) StartStreams(stream grpc.BidiStreamingServer[Worke
log.Info().Msgf("workerID %s connected to master", workerID)
fmt.Println(workerID)


masterSendChan := make(chan *MasterMessage)

mcs.chans_mut.Lock()
mcs.channs[workerID] = masterSendChan
mcs.chans_mut.Unlock()

mcs.sched.AddWorker(types.Worker{
WorkerID: workerID,
WorkerID: workerID,
CpuCount: workerHelloMsg.GetCpuCount(),
MemoryGB: workerHelloMsg.GetMemoryGB(),
})
Expand Down Expand Up @@ -234,7 +236,7 @@ func (mcs *masterCommsServer) GetProblemData(req *ProblemRequest, stream grpc.Se
}

fname := meta.ImagePath

file, err := os.Open(fname)
if err != nil {
log.Err(err).Caller().Msgf("failed to get image")
Expand All @@ -250,9 +252,9 @@ func (mcs *masterCommsServer) GetProblemData(req *ProblemRequest, stream grpc.Se
stream.Send(&common.ImageStreamObject{
Data: &common.ImageStreamObject_Details{
Details: &common.ProblemDetails{
ProblemID: problemID,
ImageSizeBytes: int32(fileSize),
MigrationSize: meta.MigrationSize,
ProblemID: problemID,
ImageSizeBytes: int32(fileSize),
MigrationSize: meta.MigrationSize,
MigrationFrequency: meta.MigrationFrequency,
},
},
Expand Down Expand Up @@ -285,7 +287,6 @@ func NewMasterComms(port uint16, metricChan chan *DeviceMetricsMessage, immigCha
return nil, err
}


sr := grpc.NewServer()
mc.sr = sr
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
Expand All @@ -294,7 +295,7 @@ func NewMasterComms(port uint16, metricChan chan *DeviceMetricsMessage, immigCha
return nil, err
}
log.Info().Caller().Msgf("master listening on port %d", port)

mc.lis = lis
mc.mcs.sched = sched
mc.mcs.immigChan = immigChan
Expand Down
14 changes: 7 additions & 7 deletions comms/volpe/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import (
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// TODO: handle stream closing

type WorkerComms struct {
client VolpeMasterClient
stream grpc.BidiStreamingClient[WorkerMessage, MasterMessage]
cancelFunc context.CancelFunc
client VolpeMasterClient
stream grpc.BidiStreamingClient[WorkerMessage, MasterMessage]
cancelFunc context.CancelFunc
cancelMutex sync.Mutex
workerID string
workerID string
// TODO: include something for population
}

// NewWorkerComms constructs WorkerComms and starts a grpc stream
func NewWorkerComms(endpoint string, workerID string, memoryLimit float32, cpuCount int32) (*WorkerComms, error) {
// TODO: channel or something for population adjust
wc := new(WorkerComms)
Expand Down Expand Up @@ -52,7 +52,7 @@ func NewWorkerComms(endpoint string, workerID string, memoryLimit float32, cpuCo
Message: &WorkerMessage_Hello{
Hello: &WorkerHello{
WorkerID: &WorkerID{Id: workerID},
// TODO: use system config
// TODO: use system config
CpuCount: int32(runtime.NumCPU()),
MemoryGB: float32(memoryLimit),
},
Expand Down Expand Up @@ -94,6 +94,7 @@ func (wc *WorkerComms) HandleStreams(adjInstChannel chan *AdjustInstancesMessage
}
}

// SendDeviceMetrics sets workerID and sends metrics over stream
func (wc *WorkerComms) SendDeviceMetrics(metrics *DeviceMetricsMessage) error {
metrics.WorkerID = wc.workerID
workerMsg := WorkerMessage{Message: &WorkerMessage_Metrics{Metrics: metrics}}
Expand Down Expand Up @@ -127,7 +128,6 @@ func (wc *WorkerComms) GetProblemData(problemID string, meta *types.Problem) err
return err
}


detailsMsg, err := stream.Recv()
if err != nil {
log.Error().Caller().Msg(err.Error())
Expand Down
21 changes: 10 additions & 11 deletions container_mgr/problem_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type ProblemContainer struct {
problemID string
containerName string
// containerPort uint16
hostPort uint16
commsClient ccomms.VolpeContainerClient
resultChannels map[chan *ccomms.ResultPopulation]bool
wEmigChan chan *volpe.MigrationMessage
rcMut sync.Mutex
hostPort uint16
commsClient ccomms.VolpeContainerClient
resultChannels map[chan *ccomms.ResultPopulation]bool
wEmigChan chan *volpe.MigrationMessage
rcMut sync.Mutex
containerContext context.Context
cancel context.CancelFunc
meta *types.Problem
containerID int32
cancel context.CancelFunc
meta *types.Problem
containerID int32
}

// generates random name for every container
Expand Down Expand Up @@ -161,7 +161,7 @@ func (pc *ProblemContainer) sendResultOnce() {
// continuosly sends results while containerContext is valid
func (pc *ProblemContainer) sendResults() {
for {
time.Sleep(5*time.Second)
time.Sleep(5 * time.Second)
if pc.containerContext.Err() != nil {
log.Err(pc.containerContext.Err()).Msgf("Stopping sendResults for problem %s", pc.problemID)
break
Expand Down Expand Up @@ -194,7 +194,7 @@ func (pc *ProblemContainer) runGenerations() {
popln.ProblemID = &pc.meta.ProblemID
mig := volpe.MigrationMessage{
Population: popln,
WorkerID: "",
WorkerID: "",
// TODO: set proper containerID
ContainerID: pc.containerID,
}
Expand Down Expand Up @@ -226,4 +226,3 @@ func (pc *ProblemContainer) stopContainer() {
return
}
}

Loading