Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions octobot/community/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,9 @@ def create_wallet(self, name: typing.Optional[str], passphrase: str, is_admin: b
def import_wallet(self, private_key: str, passphrase: str, name: typing.Optional[str], is_admin: bool = False):
return self._wallet_backend.import_wallet(private_key, passphrase, name, is_admin)

def import_wallet_from_seed(self, seed: str, passphrase: str, name: typing.Optional[str], is_admin: bool = False):
return self._wallet_backend.import_wallet_from_seed(seed, passphrase, name, is_admin)

def authenticate_wallet(self, address: str, passphrase: str) -> dict:
"""Verify passphrase and return wallet metadata in a single storage read.

Expand All @@ -667,6 +670,9 @@ def verify_wallet_passphrase(self, address: str, passphrase: str) -> bool:
def decrypt_wallet_by_address(self, address: str, passphrase: str):
return self._wallet_backend.decrypt_wallet_by_address(address, passphrase)

def decrypt_wallet_entry_by_address(self, address: str, passphrase: str):
return self._wallet_backend.decrypt_wallet_entry_by_address(address, passphrase)

def remove_wallet(self, address: str) -> None:
return self._wallet_backend.remove_wallet(address)

Expand Down
28 changes: 26 additions & 2 deletions octobot/community/wallet_backend/community_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class WalletEntry(commons_dataclasses.FlexibleDataclass):
is_admin: bool = False
private_key: str = ""
passphrase_hash: str = ""
seed: typing.Optional[str] = None


def _hash_passphrase(passphrase: str) -> str:
Expand Down Expand Up @@ -123,8 +124,8 @@ def create_wallet(
passphrase: str,
is_admin: bool = False,
) -> sync_chain.Wallet:
wallet = sync_chain.create_evm_wallet()
return self._add_wallet_entry(wallet.private_key, wallet.address, name, passphrase, is_admin)
wallet, mnemonic = sync_chain.create_evm_wallet_with_mnemonic()
return self._add_wallet_entry(wallet.private_key, wallet.address, name, passphrase, is_admin, seed=mnemonic)

def import_wallet(
self,
Expand All @@ -139,13 +140,27 @@ def import_wallet(
raise InvalidPrivateKeyError(f"Invalid EVM private key: {err}") from err
return self._add_wallet_entry(private_key, address, name, passphrase, is_admin)

def import_wallet_from_seed(
self,
seed: str,
passphrase: str,
name: typing.Optional[str],
is_admin: bool = False,
) -> sync_chain.Wallet:
try:
wallet = sync_chain.wallet_from_mnemonic(seed.strip())
except Exception as err:
raise InvalidPrivateKeyError(f"Invalid seed phrase: {err}") from err
return self._add_wallet_entry(wallet.private_key, wallet.address, name, passphrase, is_admin, seed=seed.strip())

def _add_wallet_entry(
self,
private_key: str,
address: str,
name: typing.Optional[str],
passphrase: str,
is_admin: bool,
seed: typing.Optional[str] = None,
) -> sync_chain.Wallet:
if len(passphrase) < 8:
raise PassphraseTooShortError("Passphrase must be at least 8 characters")
Expand All @@ -162,6 +177,7 @@ def _add_wallet_entry(
is_admin=is_admin,
private_key=private_key.removeprefix("0x"),
passphrase_hash=_hash_passphrase(passphrase),
seed=seed,
)
node_wallets.append(entry)
self._save_node_wallets_list(node_wallets)
Expand Down Expand Up @@ -195,6 +211,14 @@ def decrypt_wallet_by_address(self, address: str, passphrase: str) -> sync_chain
raise InvalidPassphraseError("Invalid passphrase")
return self._wallet_from_entry(entry)

def decrypt_wallet_entry_by_address(self, address: str, passphrase: str) -> WalletEntry:
entry = self._find_wallet_entry(address)
if entry is None:
raise WalletNotFoundError(f"Wallet {address} not found")
if not _verify_passphrase_hash(passphrase, entry.passphrase_hash):
raise InvalidPassphraseError("Invalid passphrase")
return entry

def get_wallet_for_bot(self, address: str) -> sync_chain.Wallet:
"""Return wallet without passphrase verification — for bot auto-unlock at startup."""
entry = self._find_wallet_entry(address)
Expand Down
4 changes: 4 additions & 0 deletions packages/sync/octobot_sync/chain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
from octobot_sync.chain.evm import (
Wallet,
create_evm_wallet,
create_evm_wallet_with_mnemonic,
wallet_from_mnemonic,
address_from_evm_key,
)

__all__ = [
"Wallet",
"create_evm_wallet",
"create_evm_wallet_with_mnemonic",
"wallet_from_mnemonic",
"address_from_evm_key",
]
12 changes: 12 additions & 0 deletions packages/sync/octobot_sync/chain/evm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,17 @@ def create_evm_wallet() -> Wallet:
return Wallet(private_key=account.key.hex(), address=account.address)


def create_evm_wallet_with_mnemonic() -> tuple[Wallet, str]:
web3.Account.enable_unaudited_hdwallet_features() # pylint: disable=no-value-for-parameter
account, mnemonic = web3.Account.create_with_mnemonic() # pylint: disable=no-value-for-parameter
return Wallet(private_key=account.key.hex(), address=account.address), mnemonic


def wallet_from_mnemonic(mnemonic: str) -> Wallet:
web3.Account.enable_unaudited_hdwallet_features() # pylint: disable=no-value-for-parameter
account = web3.Account.from_mnemonic(mnemonic) # pylint: disable=no-value-for-parameter
return Wallet(private_key=account.key.hex(), address=account.address)


def address_from_evm_key(private_key: str) -> str:
return web3.Account.from_key(private_key).address # pylint: disable=no-value-for-parameter
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class SetupResult(pydantic.BaseModel):
class WalletExport(pydantic.BaseModel):
address: str
private_key: str
seed: typing.Optional[str] = None


@router.get("/setup/status", response_model=SetupStatus)
Expand Down Expand Up @@ -105,15 +106,30 @@ def init_setup(body: SetupInit) -> SetupResult:
def export_wallet(
current_user: CurrentUser,
credentials: typing.Annotated[typing.Optional[HTTPBasicCredentials], Depends(security_basic)],
address: typing.Optional[str] = None,
passphrase: typing.Optional[str] = None,
) -> WalletExport:
auth = community_auth.CommunityAuthentication.instance()
if auth is None or credentials is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Node not configured",
)
target_address = (address or current_user.email).lower()
is_own_wallet = target_address == current_user.email.lower()
if not is_own_wallet and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the admin can export other wallets",
)
target_passphrase = credentials.password if is_own_wallet else passphrase
if not target_passphrase:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Passphrase required",
)
try:
wallet = auth.decrypt_wallet_by_address(current_user.email, credentials.password)
entry = auth.decrypt_wallet_entry_by_address(target_address, target_passphrase)
except wallet_backend.WalletNotFoundError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
Expand All @@ -124,4 +140,4 @@ def export_wallet(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid passphrase",
)
return WalletExport(address=wallet.address, private_key=wallet.private_key)
return WalletExport(address=entry.address, private_key=entry.private_key, seed=entry.seed or None)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CreateWalletBody(pydantic.BaseModel):
passphrase: str
name: typing.Optional[str] = None
private_key: typing.Optional[str] = None
seed: typing.Optional[str] = None


class UpdateWalletBody(pydantic.BaseModel):
Expand Down Expand Up @@ -96,6 +97,13 @@ def create_wallet(body: CreateWalletBody, current_user: CurrentUser) -> WalletIn
name=body.name,
is_admin=False,
)
elif body.seed:
wallet = auth.import_wallet_from_seed(
seed=body.seed,
passphrase=body.passphrase,
name=body.name,
is_admin=False,
)
else:
wallet = auth.create_wallet(
name=body.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ def test_setup_init_invalid_passphrase_returns_422(client):


def test_wallet_export_success(admin_client, mock_auth):
mock_auth.decrypt_wallet_by_address.return_value = mock.MagicMock(
mock_auth.decrypt_wallet_entry_by_address.return_value = mock.MagicMock(
address=ADMIN_ADDRESS,
private_key="0xdeadbeef",
seed=None,
)
resp = admin_client.get(
"/api/v1/setup/wallet/export",
Expand Down
Loading
Loading