gmsol/utils/rpc/
transaction_history.rs

1use std::borrow::Borrow;
2
3use anchor_client::{
4    solana_client::{
5        nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
6    },
7    solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature},
8    ClientError,
9};
10use async_stream::try_stream;
11use futures_util::Stream;
12
13use crate::utils::WithSlot;
14
15#[cfg(feature = "decode")]
16use gmsol_decode::decoder::{CPIEvents, TransactionDecoder};
17
18/// Fetch transaction history for an address.
19pub async fn fetch_transaction_history_with_config(
20    client: impl Borrow<RpcClient>,
21    address: &Pubkey,
22    commitment: CommitmentConfig,
23    until: Option<Signature>,
24    mut before: Option<Signature>,
25    batch: Option<usize>,
26) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<Signature>>>> {
27    let limit = batch;
28    let commitment = Some(commitment);
29    let address = *address;
30
31    let stream = try_stream! {
32        loop {
33            let txns = client.borrow().get_signatures_for_address_with_config(&address, GetConfirmedSignaturesForAddress2Config {
34                before,
35                until,
36                limit,
37                commitment,
38            }).await.map_err(ClientError::from)?;
39            match txns.last() {
40                Some(next) => {
41                    let next = next.signature.parse().map_err(crate::Error::unknown)?;
42                    for txn in txns {
43                        let slot = txn.slot;
44                        let signature = txn.signature.parse().map_err(crate::Error::unknown)?;
45                        yield WithSlot::new(slot, signature);
46                    }
47                    before = Some(next);
48                },
49                None => {
50                    break;
51                }
52            }
53        }
54    };
55    Ok(stream)
56}
57
58/// Extract encoded CPI events from transaction history.
59#[cfg(feature = "decode")]
60pub fn extract_cpi_events(
61    stream: impl Stream<Item = crate::Result<WithSlot<Signature>>>,
62    client: impl Borrow<RpcClient>,
63    program_id: &Pubkey,
64    event_authority: &Pubkey,
65    commitment: CommitmentConfig,
66    max_supported_transaction_version: Option<u8>,
67) -> impl Stream<Item = crate::Result<WithSlot<CPIEvents>>> {
68    use anchor_client::solana_client::rpc_config::RpcTransactionConfig;
69    use solana_transaction_status::UiTransactionEncoding;
70
71    let program_id = *program_id;
72    let event_authority = *event_authority;
73    async_stream::stream! {
74        for await res in stream {
75            match res {
76                Ok(ctx) => {
77                    let signature = *ctx.value();
78                    tracing::debug!(%signature, "fetching transaction");
79                    let tx = client
80                        .borrow()
81                        .get_transaction_with_config(
82                            &signature,
83                            RpcTransactionConfig {
84                                encoding: Some(UiTransactionEncoding::Base64),
85                                commitment: Some(commitment),
86                                max_supported_transaction_version,
87                            },
88                        )
89                        .await
90                        .map_err(ClientError::from)?;
91                    let mut decoder = TransactionDecoder::new(tx.slot, signature, &tx.transaction);
92                    match decoder
93                        .add_cpi_event_authority_and_program_id(event_authority, program_id)
94                        .and_then(|decoder| decoder.extract_cpi_events())
95                    {
96                        Ok(events) => {
97                            yield Ok(ctx.map(|_| events));
98                        },
99                        Err(err) => {
100                            yield Err(err.into());
101                        }
102                    }
103                },
104                Err(err) => {
105                    yield Err(err);
106                }
107            }
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use futures_util::StreamExt;
115
116    use super::*;
117    use crate::test::{default_cluster, setup_fmt_tracing};
118
119    #[tokio::test]
120    async fn test_transaction_hisotry_fetching() -> eyre::Result<()> {
121        let _guard = setup_fmt_tracing("info");
122        let cluster = default_cluster();
123        let client = RpcClient::new(cluster.url().to_string());
124        let stream = fetch_transaction_history_with_config(
125            client,
126            &crate::program_ids::DEFAULT_GMSOL_STORE_ID,
127            CommitmentConfig::confirmed(),
128            None,
129            None,
130            Some(5),
131        )
132        .await?
133        .take(5);
134        futures_util::pin_mut!(stream);
135        while let Some(Ok(signature)) = stream.next().await {
136            tracing::info!("{signature:?}");
137        }
138        Ok(())
139    }
140}