gmsol/utils/rpc/
transaction_history.rs1use std::borrow::Borrow;
2
3use anchor_client::{
4 solana_client::{
5 nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
6 },
7 solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature},
8 ClientError,
9};
10use async_stream::try_stream;
11use futures_util::Stream;
12
13use crate::utils::WithSlot;
14
15#[cfg(feature = "decode")]
16use gmsol_decode::decoder::{CPIEvents, TransactionDecoder};
17
18pub async fn fetch_transaction_history_with_config(
20 client: impl Borrow<RpcClient>,
21 address: &Pubkey,
22 commitment: CommitmentConfig,
23 until: Option<Signature>,
24 mut before: Option<Signature>,
25 batch: Option<usize>,
26) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<Signature>>>> {
27 let limit = batch;
28 let commitment = Some(commitment);
29 let address = *address;
30
31 let stream = try_stream! {
32 loop {
33 let txns = client.borrow().get_signatures_for_address_with_config(&address, GetConfirmedSignaturesForAddress2Config {
34 before,
35 until,
36 limit,
37 commitment,
38 }).await.map_err(ClientError::from)?;
39 match txns.last() {
40 Some(next) => {
41 let next = next.signature.parse().map_err(crate::Error::unknown)?;
42 for txn in txns {
43 let slot = txn.slot;
44 let signature = txn.signature.parse().map_err(crate::Error::unknown)?;
45 yield WithSlot::new(slot, signature);
46 }
47 before = Some(next);
48 },
49 None => {
50 break;
51 }
52 }
53 }
54 };
55 Ok(stream)
56}
57
58#[cfg(feature = "decode")]
60pub fn extract_cpi_events(
61 stream: impl Stream<Item = crate::Result<WithSlot<Signature>>>,
62 client: impl Borrow<RpcClient>,
63 program_id: &Pubkey,
64 event_authority: &Pubkey,
65 commitment: CommitmentConfig,
66 max_supported_transaction_version: Option<u8>,
67) -> impl Stream<Item = crate::Result<WithSlot<CPIEvents>>> {
68 use anchor_client::solana_client::rpc_config::RpcTransactionConfig;
69 use solana_transaction_status::UiTransactionEncoding;
70
71 let program_id = *program_id;
72 let event_authority = *event_authority;
73 async_stream::stream! {
74 for await res in stream {
75 match res {
76 Ok(ctx) => {
77 let signature = *ctx.value();
78 tracing::debug!(%signature, "fetching transaction");
79 let tx = client
80 .borrow()
81 .get_transaction_with_config(
82 &signature,
83 RpcTransactionConfig {
84 encoding: Some(UiTransactionEncoding::Base64),
85 commitment: Some(commitment),
86 max_supported_transaction_version,
87 },
88 )
89 .await
90 .map_err(ClientError::from)?;
91 let mut decoder = TransactionDecoder::new(tx.slot, signature, &tx.transaction);
92 match decoder
93 .add_cpi_event_authority_and_program_id(event_authority, program_id)
94 .and_then(|decoder| decoder.extract_cpi_events())
95 {
96 Ok(events) => {
97 yield Ok(ctx.map(|_| events));
98 },
99 Err(err) => {
100 yield Err(err.into());
101 }
102 }
103 },
104 Err(err) => {
105 yield Err(err);
106 }
107 }
108 }
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use futures_util::StreamExt;
115
116 use super::*;
117 use crate::test::{default_cluster, setup_fmt_tracing};
118
119 #[tokio::test]
120 async fn test_transaction_hisotry_fetching() -> eyre::Result<()> {
121 let _guard = setup_fmt_tracing("info");
122 let cluster = default_cluster();
123 let client = RpcClient::new(cluster.url().to_string());
124 let stream = fetch_transaction_history_with_config(
125 client,
126 &crate::program_ids::DEFAULT_GMSOL_STORE_ID,
127 CommitmentConfig::confirmed(),
128 None,
129 None,
130 Some(5),
131 )
132 .await?
133 .take(5);
134 futures_util::pin_mut!(stream);
135 while let Some(Ok(signature)) = stream.next().await {
136 tracing::info!("{signature:?}");
137 }
138 Ok(())
139 }
140}