Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 33 additions & 44 deletions client/src/block/extrinsic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use avail_rust_core::{
Extrinsic, ExtrinsicDecodable, H256, HasHeader, HashNumber, MultiAddress, RpcError, avail,
rpc::{self, AllowedExtrinsic, DataFormat},
rpc::{self, AllowedExtrinsic, DataFormat, Query},
substrate::extrinsic::Preamble,
types::HashStringNumber,
};
Expand Down Expand Up @@ -52,15 +52,14 @@ impl ExtrinsicsQuery {
pub async fn first(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
mut query: rpc::Query,
) -> Result<Option<UntypedExtrinsic>, Error> {
let at = self.ctx.hash_number()?;
let chain = self.ctx.chain();
query.max_items = Some(1);
query.reverse = false;

let mut result = chain
.extrinsics(at, allow_list, sig_filter, DataFormat::Extrinsic)
.await?;

let mut result = chain.extrinsics(at, allow_list, query, DataFormat::Extrinsic).await?;
let Some(info) = result.first_mut() else {
return Ok(None);
};
Expand All @@ -72,14 +71,14 @@ impl ExtrinsicsQuery {
pub async fn last(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
mut query: rpc::Query,
) -> Result<Option<UntypedExtrinsic>, Error> {
let at = self.ctx.hash_number()?;
let chain = self.ctx.chain();
query.max_items = Some(1);
query.reverse = true;

let mut result = chain
.extrinsics(at, allow_list, sig_filter, DataFormat::Extrinsic)
.await?;
let mut result = chain.extrinsics(at, allow_list, query, DataFormat::Extrinsic).await?;
let Some(info) = result.last_mut() else {
return Ok(None);
};
Expand All @@ -91,14 +90,12 @@ impl ExtrinsicsQuery {
pub async fn all(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
query: rpc::Query,
) -> Result<Vec<UntypedExtrinsic>, Error> {
let at = self.ctx.hash_number()?;
let chain = self.ctx.chain();

let extrinsics = chain
.extrinsics(at, allow_list, sig_filter, DataFormat::Extrinsic)
.await?;
let extrinsics = chain.extrinsics(at, allow_list, query, DataFormat::Extrinsic).await?;

let mut result = Vec::with_capacity(extrinsics.len());
for info in extrinsics {
Expand All @@ -109,24 +106,21 @@ impl ExtrinsicsQuery {
Ok(result)
}

pub async fn count(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
) -> Result<usize, Error> {
pub async fn count(&self, allow_list: Option<Vec<AllowedExtrinsic>>, query: rpc::Query) -> Result<usize, Error> {
let at = self.ctx.at.clone();
let chain = self.ctx.chain();
let result = chain.extrinsics(at, allow_list, sig_filter, DataFormat::None).await?;
let result = chain.extrinsics(at, allow_list, query, DataFormat::None).await?;

Ok(result.len())
}

pub async fn exists(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
mut query: rpc::Query,
) -> Result<bool, Error> {
self.count(allow_list, sig_filter).await.map(|x| x > 0)
query.max_items = Some(1);
self.count(allow_list, query).await.map(|x| x > 0)
}

// ── Typed (_as) methods ─────────────────────────────────────────────
Expand Down Expand Up @@ -159,41 +153,32 @@ impl ExtrinsicsQuery {
inner::<T>(self, extrinsic_id.into()).await
}

pub async fn first_as<T: HasHeader + Decode>(
&self,
sig_filter: rpc::SignatureFilter,
) -> Result<Option<TypedExtrinsic<T>>, Error> {
pub async fn first_as<T: HasHeader + Decode>(&self, query: rpc::Query) -> Result<Option<TypedExtrinsic<T>>, Error> {
let allow_list = Some(vec![T::HEADER_INDEX.into()]);

let encoded = self.first(allow_list, sig_filter).await?;
let encoded = self.first(allow_list, query).await?;
let Some(encoded) = encoded else {
return Ok(None);
};

Ok(Some(encoded.as_typed::<T>()?))
}

pub async fn last_as<T: HasHeader + Decode>(
&self,
sig_filter: rpc::SignatureFilter,
) -> Result<Option<TypedExtrinsic<T>>, Error> {
pub async fn last_as<T: HasHeader + Decode>(&self, query: rpc::Query) -> Result<Option<TypedExtrinsic<T>>, Error> {
let allow_list = Some(vec![T::HEADER_INDEX.into()]);

let encoded = self.last(allow_list, sig_filter).await?;
let encoded = self.last(allow_list, query).await?;
let Some(encoded) = encoded else {
return Ok(None);
};

Ok(Some(encoded.as_typed::<T>()?))
}

pub async fn all_as<T: HasHeader + Decode>(
&self,
sig_filter: rpc::SignatureFilter,
) -> Result<Vec<TypedExtrinsic<T>>, Error> {
pub async fn all_as<T: HasHeader + Decode>(&self, query: rpc::Query) -> Result<Vec<TypedExtrinsic<T>>, Error> {
let allow_list = Some(vec![T::HEADER_INDEX.into()]);

let all = self.all(allow_list, sig_filter).await?;
let all = self.all(allow_list, query).await?;
let mut result = Vec::with_capacity(all.len());
for encoded in all {
result.push(encoded.as_typed::<T>()?);
Expand All @@ -206,7 +191,9 @@ impl ExtrinsicsQuery {

/// Block 0 is the only block that does not have this extrinsic in it.
pub async fn ext_timestamp(&self) -> Result<TypedExtrinsic<avail::timestamp::tx::Set>, Error> {
let ext = self.first_as::<avail::timestamp::tx::Set>(Default::default()).await?;
let mut query = Query::default();
query.max_items = Some(1);
let ext = self.first_as::<avail::timestamp::tx::Set>(query).await?;
let Some(ext) = ext else {
return Err(Error::NotFound(String::from("Timestamp Set extrinsic not found")));
};
Expand All @@ -217,8 +204,10 @@ impl ExtrinsicsQuery {
pub async fn ext_submit_blob_txs_summary(
&self,
) -> Result<TypedExtrinsic<avail::data_availability::tx::SubmitBlobTxsSummary>, Error> {
let mut query = Query::default();
query.max_items = Some(1);
let ext = self
.first_as::<avail::data_availability::tx::SubmitBlobTxsSummary>(Default::default())
.first_as::<avail::data_availability::tx::SubmitBlobTxsSummary>(query)
.await?;
let Some(ext) = ext else {
return Err(Error::NotFound(String::from("Data Availability Submit Blob Tx Summary extrinsic not found")));
Expand All @@ -230,9 +219,9 @@ impl ExtrinsicsQuery {
pub async fn ext_failed_send_message_txs(
&self,
) -> Result<TypedExtrinsic<avail::vector::tx::FailedSendMessageTxs>, Error> {
let ext = self
.first_as::<avail::vector::tx::FailedSendMessageTxs>(Default::default())
.await?;
let mut query = Query::default();
query.max_items = Some(1);
let ext = self.first_as::<avail::vector::tx::FailedSendMessageTxs>(query).await?;
let Some(ext) = ext else {
return Err(Error::NotFound(String::from("Vector Failed Send Message Txs extrinsic not found")));
};
Expand All @@ -244,12 +233,12 @@ impl ExtrinsicsQuery {
pub async fn rpc(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
query: rpc::Query,
data_format: rpc::DataFormat,
) -> Result<Vec<rpc::Extrinsic>, Error> {
self.ctx
.chain()
.extrinsics(self.ctx.at.clone(), allow_list, sig_filter, data_format)
.extrinsics(self.ctx.at.clone(), allow_list, query, data_format)
.await
}

Expand Down
8 changes: 4 additions & 4 deletions client/src/chain/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,22 +630,22 @@ impl Chain {
&self,
at: impl Into<HashStringNumber>,
allow_list: Option<Vec<rpc::AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
query: rpc::Query,
data_format: rpc::DataFormat,
) -> Result<Vec<rpc::Extrinsic>, Error> {
async fn inner(
c: &Chain,
at: HashNumber,
allow_list: Option<Vec<rpc::AllowedExtrinsic>>,
sig_filter: rpc::SignatureFilter,
query: rpc::Query,
data_format: rpc::DataFormat,
) -> Result<Vec<rpc::Extrinsic>, Error> {
retry!(c.should_retry_on_error(), {
rpc::custom::fetch_extrinsics(
&c.client.rpc_client,
at.into(),
allow_list.clone(),
sig_filter.clone(),
query.clone(),
data_format,
)
.await
Expand All @@ -655,7 +655,7 @@ impl Chain {

let at = HashNumber::try_from(at.into())
.map_err(|e| Error::validation_with_op(error_ops::ErrorOperation::ChainFetchExtrinsics, e))?;
inner(self, at, allow_list, sig_filter, data_format).await
inner(self, at, allow_list, query, data_format).await
}

/// Pulls events for a block with optional filtering.
Expand Down
10 changes: 5 additions & 5 deletions client/src/subscription/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use async_trait::async_trait;
use avail_rust_core::{
BlockInfo, HasHeader,
grandpa::GrandpaJustification,
rpc::{AllowedEvents, AllowedExtrinsic, LegacyBlock, PhaseEvents, SignatureFilter},
rpc::{AllowedEvents, AllowedExtrinsic, LegacyBlock, PhaseEvents, Query},
};
use codec::Decode;
use std::marker::PhantomData;
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Fetcher for BlockEventsFetcher {
/// Yields decoded extrinsics of type `T` for each block.
#[derive(Clone)]
pub struct ExtrinsicFetcher<T: HasHeader + Decode> {
pub(crate) sig_filter: SignatureFilter,
pub(crate) query: Query,
pub(crate) _phantom: PhantomData<T>,
}

Expand All @@ -154,7 +154,7 @@ impl<T: HasHeader + Decode + Clone + Sync> Fetcher for ExtrinsicFetcher<T> {
async fn fetch(&self, client: &Client, info: BlockInfo, retry: RetryPolicy) -> Result<Self::Output, Error> {
let mut block = Block::new(client.clone(), info.hash).extrinsics();
block.set_retry_policy(retry);
block.all_as::<T>(self.sig_filter.clone()).await
block.all_as::<T>(self.query.clone()).await
}

fn is_empty(&self, value: &Self::Output) -> bool {
Expand All @@ -170,7 +170,7 @@ impl<T: HasHeader + Decode + Clone + Sync> Fetcher for ExtrinsicFetcher<T> {
#[derive(Debug, Clone)]
pub struct UntypedExtrinsicFetcher {
pub(crate) allow_list: Option<Vec<AllowedExtrinsic>>,
pub(crate) sig_filter: SignatureFilter,
pub(crate) query: Query,
}

#[async_trait]
Expand All @@ -180,7 +180,7 @@ impl Fetcher for UntypedExtrinsicFetcher {
async fn fetch(&self, client: &Client, info: BlockInfo, retry: RetryPolicy) -> Result<Self::Output, Error> {
let mut block = Block::new(client.clone(), info.hash).extrinsics();
block.set_retry_policy(retry);
block.all(self.allow_list.clone(), self.sig_filter.clone()).await
block.all(self.allow_list.clone(), self.query.clone()).await
}

fn is_empty(&self, value: &Self::Output) -> bool {
Expand Down
10 changes: 5 additions & 5 deletions client/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use sub::{BlockQueryMode, Subscription, SubscriptionItem};
use crate::Client;
use avail_rust_core::{
HasHeader,
rpc::{AllowedEvents, AllowedExtrinsic, SignatureFilter},
rpc::{AllowedEvents, AllowedExtrinsic, Query},
};
use codec::Decode;
use std::marker::PhantomData;
Expand Down Expand Up @@ -42,17 +42,17 @@ impl SubscribeApi {

pub fn extrinsics<T: HasHeader + Decode + Clone + Sync>(
&self,
sig_filter: SignatureFilter,
query: Query,
) -> SubscriptionBuilder<ExtrinsicFetcher<T>> {
SubscriptionBuilder::new(self.0.clone(), ExtrinsicFetcher { sig_filter, _phantom: PhantomData })
SubscriptionBuilder::new(self.0.clone(), ExtrinsicFetcher { query, _phantom: PhantomData })
}

pub fn untyped_extrinsics(
&self,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: SignatureFilter,
query: Query,
) -> SubscriptionBuilder<UntypedExtrinsicFetcher> {
SubscriptionBuilder::new(self.0.clone(), UntypedExtrinsicFetcher { allow_list, sig_filter })
SubscriptionBuilder::new(self.0.clone(), UntypedExtrinsicFetcher { allow_list, query })
}

pub fn justification(&self) -> SubscriptionBuilder<GrandpaJustificationFetcher> {
Expand Down
10 changes: 6 additions & 4 deletions core/src/rpc/custom/extrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ pub async fn fetch_extrinsics(
client: &RpcClient,
at: BlockId,
allow_list: Option<Vec<AllowedExtrinsic>>,
sig_filter: SignatureFilter,
query: Query,
data_format: DataFormat,
) -> Result<Vec<Extrinsic>, Error> {
let params = rpc_params![at, allow_list, sig_filter, data_format];
let params = rpc_params![at, allow_list, query, data_format];
let value: Vec<Extrinsic> = client.request("custom_extrinsics", params).await?;
Ok(value)
}
Expand Down Expand Up @@ -70,9 +70,11 @@ impl From<(u8, u8)> for AllowedExtrinsic {
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SignatureFilter {
pub account_id: Option<String>,
pub struct Query {
pub address: Option<String>,
pub nonce: Option<u32>,
pub max_items: Option<u32>,
pub reverse: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion core/src/rpc/custom/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use primitive_types::H256;
use subxt_rpcs::{RpcClient, rpc_params};

pub use events::{AllowedEvents, PhaseEvents, RuntimeEvent, fetch_events};
pub use extrinsics::{AllowedExtrinsic, DataFormat, Extrinsic, SignatureFilter, fetch_extrinsics};
pub use extrinsics::{AllowedExtrinsic, DataFormat, Extrinsic, Query, fetch_extrinsics};

pub async fn get_block_number(client: &RpcClient, at: H256) -> Result<Option<u32>, Error> {
let params = rpc_params![at];
Expand Down
2 changes: 1 addition & 1 deletion core/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use error::Error;

pub use super::AvailHeader;
pub use chain::{Block, BlockJustification, LegacyBlock};
pub use custom::{AllowedEvents, AllowedExtrinsic, DataFormat, Extrinsic, PhaseEvents, RuntimeEvent, SignatureFilter};
pub use custom::{AllowedEvents, AllowedExtrinsic, DataFormat, Extrinsic, PhaseEvents, Query, RuntimeEvent};
use subxt_rpcs::{RpcClient, client::RpcParams};

pub async fn raw_call<T: serde::de::DeserializeOwned>(
Expand Down