Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ target/

### Python ###
__pycache__
.uv-python/
.uv-cache/

### Windows ###
Thumbs.db
Expand Down Expand Up @@ -42,6 +44,7 @@ slurm*
node_modules/
bindings
.pre-commit-config.yaml
.claude/

### Data folders ###
/data
Expand Down
21 changes: 12 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions architectures/centralized/client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use anyhow::{Error, Result};
use bytemuck::Zeroable;
use psyche_centralized_shared::{ClientToServerMessage, ServerToClientMessage};
use psyche_client::{
CheckpointUploader, Client, ClientTUI, ClientTUIState, ModelExtraData, NC, RunInitConfig,
TrainArgs, read_identity_secret_key,
CheckpointUploader, Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs,
read_identity_secret_key,
};
use psyche_coordinator::model::{self, CheckpointSource};
use psyche_coordinator::model_extra_data::CheckpointData;
use psyche_coordinator::{Coordinator, HealthChecks};
use psyche_core::NodeIdentity;
use psyche_coordinator::coordinator::{Coordinator, HealthChecks};
use psyche_coordinator::model::{self, CheckpointBytes, CheckpointSource};
use psyche_coordinator::node_identity::NodeIdentity;
use psyche_core::{CheckpointData, ModelExtraData};
use psyche_event_sourcing::event;
use psyche_event_sourcing::events::RpcCallType;
use psyche_metrics::ClientMetrics;
Expand Down Expand Up @@ -69,7 +69,7 @@ impl WatcherBackend for Backend {
Ok(())
}

async fn send_checkpoint(&mut self, checkpoint: model::CheckpointBytes) -> Result<()> {
async fn send_checkpoint(&mut self, checkpoint: CheckpointBytes) -> Result<()> {
self.tx.send(ToSend::Checkpoint(Box::new(checkpoint)))?;
Ok(())
}
Expand Down Expand Up @@ -211,7 +211,7 @@ impl App {

// Validate upload credentials now that we have the coordinator state with checkpoint info.
if !state_options.checkpoint_config.skip_upload {
let model::Model::LLM(ref llm) = first_coordinator_state.model;
let llm = first_coordinator_state.model;
if llm.checkpoint_source != CheckpointSource::Ephemeral {
match CheckpointData::from_fixed_vec(&llm.checkpoint_data) {
Ok(CheckpointData::Hub { ref repo_id, .. }) => {
Expand Down
21 changes: 9 additions & 12 deletions architectures/centralized/server/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::dashboard::{DashboardState, DashboardTui};
use anyhow::{Result, bail};
use async_trait::async_trait;
use psyche_centralized_shared::{ClientToServerMessage, ServerToClientMessage};
use psyche_coordinator::model::{self, CheckpointSource, Model};
use psyche_coordinator::model_extra_data::CheckpointData;
use psyche_coordinator::{
use psyche_coordinator::coordinator::{
Client, ClientState, Coordinator, CoordinatorError, HealthChecks, Round, RunState,
SOLANA_MAX_NUM_CLIENTS, TickResult,
};

use psyche_core::{FixedVec, NodeIdentity, Shuffle, SizedIterator, TokenSize};
use psyche_coordinator::fixed_vec::FixedVec;
use psyche_coordinator::model::{CheckpointBytes, CheckpointSource};
use psyche_coordinator::node_identity::NodeIdentity;
use psyche_core::{CheckpointData, Shuffle, SizedIterator, TokenSize};
use psyche_data_provider::{
DataProviderTcpServer, DataServerTui, LocalDataProvider, download_model_from_gcs_async,
download_model_repo_async,
Expand All @@ -34,8 +35,6 @@ use tokio::{select, time::Interval};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, info, info_span, warn};

use crate::dashboard::{DashboardState, DashboardTui};

pub(super) type TabWidgetTypes = (
DashboardTui,
CoordinatorTui,
Expand Down Expand Up @@ -83,7 +82,7 @@ impl psyche_watcher::Backend for ChannelCoordinatorBackend {
bail!("Server does not send health checks");
}

async fn send_checkpoint(&mut self, _checkpoint: model::CheckpointBytes) -> Result<()> {
async fn send_checkpoint(&mut self, _checkpoint: CheckpointBytes) -> Result<()> {
bail!("Server does not send checkpoints");
}
}
Expand Down Expand Up @@ -137,9 +136,7 @@ impl App {
}

pub fn get_checkpoint(&self) -> CheckpointSource {
match self.coordinator.model {
Model::LLM(llm) => llm.checkpoint_source,
}
self.coordinator.model.checkpoint_source
}

pub fn get_port(&self) -> u16 {
Expand Down Expand Up @@ -191,7 +188,7 @@ impl App {
}) = data_server_config
{
// Download model if needed based on checkpoint type
let Model::LLM(llm) = &coordinator.model;
let llm = &coordinator.model;
if llm.checkpoint_source == CheckpointSource::Ephemeral {
bail!("Can't start up a run with an Ephemeral checkpoint.")
}
Expand Down
2 changes: 1 addition & 1 deletion architectures/centralized/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod dashboard;
use anyhow::{Context, Result, bail};
use app::{App, DataServerInfo};
use clap::{ArgAction, Parser};
use psyche_coordinator::Coordinator;
use psyche_coordinator::coordinator::Coordinator;
use psyche_tui::{
LogOutput, ServiceInfo,
logging::{MetricsDestination, OpenTelemetry, RemoteLogsDestination, TraceDestination},
Expand Down
5 changes: 4 additions & 1 deletion architectures/centralized/shared/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use psyche_coordinator::{Coordinator, HealthChecks, model};
use psyche_coordinator::{
coordinator::{Coordinator, HealthChecks},
model,
};
use psyche_watcher::OpportunisticData;
use serde::{Deserialize, Serialize};

Expand Down
14 changes: 8 additions & 6 deletions architectures/centralized/testing/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use crate::{COOLDOWN_TIME, test_utils::sample_rand_run_id};
use crate::{MAX_ROUND_TRAIN_TIME, ROUND_WITNESS_TIME, WARMUP_TIME};
use bytemuck::Zeroable;
use psyche_centralized_server::app::App as ServerApp;
use psyche_coordinator::{Client, Round};
use psyche_coordinator::{
Coordinator, CoordinatorConfig, CoordinatorEpochState, RunState, SOLANA_MAX_NUM_CLIENTS,
model::{CheckpointSource, LLM, Model},
use psyche_coordinator::coordinator::{
Client, Coordinator, CoordinatorConfig, CoordinatorEpochState, Round, RunState,
SOLANA_MAX_NUM_CLIENTS,
};
use psyche_core::{FixedVec, NodeIdentity};
use psyche_coordinator::fixed_vec::FixedVec;
use psyche_coordinator::model::{CheckpointSource, Model};
use psyche_coordinator::node_identity::NodeIdentity;
use psyche_core::CheckpointData;
use std::{collections::HashSet, ops::ControlFlow};
use tokio::{
select,
Expand Down Expand Up @@ -90,7 +92,7 @@ impl CoordinatorServer {
let run_id = sample_rand_run_id();
let coordinator: Coordinator = Coordinator {
run_id: run_id.as_str().try_into().unwrap(),
model: Model::LLM(LLM::dummy()),
model: Model::dummy(CheckpointData::Dummy.to_fixed_vec()),
config: coordinator_config,
epoch_state,
..Coordinator::zeroed()
Expand Down
5 changes: 2 additions & 3 deletions architectures/centralized/testing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::client::ClientHandle;
use crate::server::CoordinatorServerHandle;
use clap::Parser;
use psyche_client::TrainArgs;
use psyche_coordinator::coordinator::Coordinator;
use rand::distr::{Alphanumeric, SampleString};
use std::env;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -98,9 +99,7 @@ pub async fn assert_witnesses_healthy_score(
// calculate score
let mut score = 0;
clients.iter().for_each(|client| {
score += psyche_coordinator::Coordinator::trainer_healthy_score_by_witnesses(
&client.id, witnesses,
);
score += Coordinator::trainer_healthy_score_by_witnesses(&client.id, witnesses);
});

assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use psyche_centralized_testing::{
spawn_clients_with_training_delay,
},
};
use psyche_coordinator::{RunState, model::CheckpointSource};
use psyche_coordinator::{coordinator::RunState, model::CheckpointSource};
use tracing::info;

#[test_log::test(tokio::test(flavor = "multi_thread"))]
Expand Down
2 changes: 1 addition & 1 deletion architectures/decentralized/solana-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ psyche-metrics.workspace = true
psyche-modeling.workspace = true
psyche-network.workspace = true
psyche-solana-rpc.workspace = true
psyche-solana-coordinator.workspace = true
psyche-solana-coordinator = { workspace = true, features = ["client"] }
psyche-tui.workspace = true
psyche-watcher.workspace = true
rand.workspace = true
Expand Down
23 changes: 12 additions & 11 deletions architectures/decentralized/solana-client/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use psyche_core::{CheckpointData, ModelExtraData};
use psyche_solana_rpc::SolanaBackend;

use anchor_client::{
Expand All @@ -10,15 +11,15 @@ use anchor_client::{
};
use anyhow::{Result, anyhow};
use psyche_client::{
CheckpointUploader, Client, ClientTUI, ClientTUIState, ModelExtraData, NC, RunInitConfig,
TrainArgs, read_identity_secret_key,
CheckpointUploader, Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs,
read_identity_secret_key,
};
use psyche_coordinator::{
ClientState, Coordinator, CoordinatorError, RunState,
model::{CheckpointSource, Model},
model_extra_data::CheckpointData,
coordinator::{ClientState, Coordinator, CoordinatorError, RunState},
model::CheckpointSource,
node_identity::NodeIdentity,
sha::sha256,
};
use psyche_core::sha256;
use psyche_metrics::ClientMetrics;

use psyche_network::{DiscoveryMode, NetworkTUIState, NetworkTui, SecretKey, allowlist};
Expand Down Expand Up @@ -89,7 +90,7 @@ pub async fn build_app(
let mut rng = ChaCha20Rng::from_seed(sha256(&seed_preimage));
SecretKey::generate(&mut rng)
});
let identity = psyche_core::NodeIdentity::new(
let identity = NodeIdentity::new(
wallet_keypair.pubkey().to_bytes(),
*identity_secret_key.public().as_bytes(),
);
Expand Down Expand Up @@ -239,7 +240,7 @@ impl App {

// sanity checks — skip credential validation when checkpoint upload is disabled
if !self.state_options.checkpoint_config.skip_upload {
let Model::LLM(ref llm) = start_coordinator_state.model;
let llm = start_coordinator_state.model;
if llm.checkpoint_source != CheckpointSource::Ephemeral {
match CheckpointData::from_fixed_vec(&llm.checkpoint_data) {
Ok(CheckpointData::Hub { ref repo_id, .. }) => {
Expand Down Expand Up @@ -269,7 +270,7 @@ impl App {
.join_run(
coordinator_instance_pubkey,
coordinator_account,
psyche_core::NodeIdentity::new(signer.to_bytes(), *p2p_identity.as_bytes()),
NodeIdentity::new(signer.to_bytes(), *p2p_identity.as_bytes()),
self.authorizer,
)
.await?;
Expand Down Expand Up @@ -301,7 +302,7 @@ impl App {
self.metrics,
);

let id = psyche_core::NodeIdentity::new(signer.to_bytes(), *p2p_identity.as_bytes());
let id = NodeIdentity::new(signer.to_bytes(), *p2p_identity.as_bytes());

loop {
select! {
Expand All @@ -328,7 +329,7 @@ impl App {
None
};

let pending_clients_ids: Option<Vec<psyche_core::NodeIdentity>> = coordinator_state_in_waiting_for_members
let pending_clients_ids: Option<Vec<NodeIdentity>> = coordinator_state_in_waiting_for_members
.as_ref()
.map(|state| state.clients_state.get_active_clients_ids().collect());

Expand Down
9 changes: 3 additions & 6 deletions architectures/decentralized/solana-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use anchor_client::{
use anyhow::{Result, bail};
use clap::{Args, Parser, Subcommand};
use psyche_client::{TrainArgs, print_identity_keys};
use psyche_coordinator::model::{CheckpointSource, Model};
use psyche_coordinator::model_extra_data::CheckpointData;
use psyche_coordinator::model::CheckpointSource;
use psyche_core::CheckpointData;
use psyche_event_sourcing::{EventStore, FileBackend, RunStarted};
use psyche_network::SecretKey;
use psyche_solana_rpc::SolanaBackend;
Expand Down Expand Up @@ -295,10 +295,7 @@ async fn async_main() -> Result<()> {
.coordinator;

if model {
#[allow(irrefutable_let_patterns)]
let Model::LLM(model_config) = coordinator_account_state.model else {
bail!("Model is not an LLM, unsure how to predownload.");
};
let model_config = coordinator_account_state.model;

if model_config.checkpoint_source == CheckpointSource::Ephemeral {
bail!("Can't predownload model with ephemeral checkpoint.")
Expand Down
3 changes: 2 additions & 1 deletion architectures/decentralized/solana-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ psyche-coordinator.workspace = true
psyche-core.workspace = true
psyche-event-sourcing.workspace = true
psyche-solana-authorizer.workspace = true
psyche-solana-coordinator.workspace = true
psyche-solana-coordinator = { workspace = true, features = ["client"] }
psyche-solana-treasurer.workspace = true
psyche-watcher.workspace = true
tokio.workspace = true
tracing.workspace = true
backon = "1.4.1"
postcard.workspace = true
solana-account-decoder-client-types = "=2.1.4"
solana-transaction-status-client-types = "=2.1.4"
Loading
Loading