gmsol_solana_utils/
bundle_builder.rs

1use std::{collections::HashSet, ops::Deref};
2
3use futures_util::TryStreamExt;
4use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
5use solana_sdk::{
6    commitment_config::CommitmentConfig, packet::PACKET_DATA_SIZE, signature::Signature,
7    signer::Signer, transaction::VersionedTransaction,
8};
9
10use crate::{
11    client::SendAndConfirm,
12    cluster::Cluster,
13    transaction_builder::TransactionBuilder,
14    utils::{inspect_transaction, transaction_size, WithSlot},
15};
16
17const TRANSACTION_SIZE_LIMIT: usize = PACKET_DATA_SIZE;
18const DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX: usize = 14;
19
20/// Bundle Options.
21#[derive(Debug, Clone)]
22pub struct BundleOptions {
23    /// Whether to force one transaction.
24    pub force_one_transaction: bool,
25    /// Max packet size.
26    pub max_packet_size: Option<usize>,
27    /// Max number of instructions for one transaction.
28    pub max_instructions_for_one_tx: usize,
29}
30
31impl Default for BundleOptions {
32    fn default() -> Self {
33        Self {
34            force_one_transaction: false,
35            max_packet_size: None,
36            max_instructions_for_one_tx: DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX,
37        }
38    }
39}
40
41/// Create Bundle Options.
42#[derive(Debug, Clone, Default)]
43pub struct CreateBundleOptions {
44    /// Cluster.
45    pub cluster: Cluster,
46    /// Commitment config.
47    pub commitment: CommitmentConfig,
48    /// Bundle options.
49    pub options: BundleOptions,
50}
51
52/// Send Bundle Options.
53#[derive(Debug, Clone, Default)]
54pub struct SendBundleOptions {
55    /// Whether to send without compute budget.
56    pub without_compute_budget: bool,
57    /// Set the compute unit price.
58    pub compute_unit_price_micro_lamports: Option<u64>,
59    /// Set the min priority lamports.
60    /// `None` means the value is left unchanged.
61    pub compute_unit_min_priority_lamports: Option<u64>,
62    /// Whether to update recent block hash before send.
63    pub update_recent_block_hash_before_send: bool,
64    /// Whether to continue on error.
65    pub continue_on_error: bool,
66    /// RPC config.
67    pub config: RpcSendTransactionConfig,
68    /// Whether to trace transaction error.
69    pub disable_error_tracing: bool,
70    /// Cluster of the inspector url.
71    pub inspector_cluster: Option<Cluster>,
72}
73
74/// Buidler for transaction bundle.
75pub struct BundleBuilder<'a, C> {
76    client: RpcClient,
77    builders: Vec<TransactionBuilder<'a, C>>,
78    options: BundleOptions,
79}
80
81impl<C> BundleBuilder<'_, C> {
82    /// Create a new [`BundleBuilder`] for the given cluster.
83    pub fn new(cluster: Cluster) -> Self {
84        Self::new_with_options(CreateBundleOptions {
85            cluster,
86            ..Default::default()
87        })
88    }
89
90    /// Create a new [`BundleBuilder`] with the given options.
91    pub fn new_with_options(options: CreateBundleOptions) -> Self {
92        let rpc = options.cluster.rpc(options.commitment);
93
94        Self::from_rpc_client_with_options(rpc, options.options)
95    }
96
97    /// Create a new [`BundleBuilder`] from [`RpcClient`].
98    pub fn from_rpc_client(client: RpcClient) -> Self {
99        Self::from_rpc_client_with_options(client, Default::default())
100    }
101
102    /// Create a new [`BundleBuilder`] from [`RpcClient`] with the given options.
103    pub fn from_rpc_client_with_options(client: RpcClient, options: BundleOptions) -> Self {
104        Self {
105            client,
106            builders: Default::default(),
107            options,
108        }
109    }
110
111    /// Get packet size.
112    pub fn packet_size(&self) -> usize {
113        match self.options.max_packet_size {
114            Some(size) => size.min(TRANSACTION_SIZE_LIMIT),
115            None => TRANSACTION_SIZE_LIMIT,
116        }
117    }
118
119    /// Get the client.
120    pub fn client(&self) -> &RpcClient {
121        &self.client
122    }
123
124    /// Is empty.
125    pub fn is_empty(&self) -> bool {
126        self.builders.is_empty()
127    }
128
129    /// Try clone empty.
130    pub fn try_clone_empty(&self) -> crate::Result<Self> {
131        let cluster = self.client.url().parse()?;
132        let commitment = self.client.commitment();
133        Ok(Self::new_with_options(CreateBundleOptions {
134            cluster,
135            commitment,
136            options: self.options.clone(),
137        }))
138    }
139
140    /// Set options.
141    pub fn set_options(&mut self, options: BundleOptions) -> &mut Self {
142        self.options = options;
143        self
144    }
145}
146
147impl<'a, C: Deref<Target = impl Signer> + Clone> BundleBuilder<'a, C> {
148    /// Push a [`TransactionBuilder`] with options.
149    #[allow(clippy::result_large_err)]
150    pub fn try_push_with_opts(
151        &mut self,
152        mut txn: TransactionBuilder<'a, C>,
153        new_transaction: bool,
154    ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
155        let packet_size = self.packet_size();
156        let mut ix = txn.instructions_with_options(true, None);
157        let incoming_lookup_table = txn.get_complete_lookup_table();
158        if transaction_size(
159            &ix,
160            true,
161            Some(&incoming_lookup_table),
162            txn.get_luts().len(),
163        ) > packet_size
164        {
165            return Err((
166                txn,
167                crate::Error::AddTransaction("the size of this instruction is too big"),
168            ));
169        }
170        if self.builders.is_empty() || new_transaction {
171            tracing::debug!("adding to a new tx");
172            if !self.builders.is_empty() && self.options.force_one_transaction {
173                return Err((txn, crate::Error::AddTransaction("cannot create more than one transaction because `force_one_transaction` is set")));
174            }
175            self.builders.push(txn);
176        } else {
177            let last = self.builders.last_mut().unwrap();
178
179            let mut ixs_after_merge = last.instructions_with_options(false, None);
180            ixs_after_merge.append(&mut ix);
181
182            let mut lookup_table = last.get_complete_lookup_table();
183            lookup_table.extend(incoming_lookup_table);
184            let mut lookup_table_addresses = last.get_luts().keys().collect::<HashSet<_>>();
185            lookup_table_addresses.extend(txn.get_luts().keys());
186
187            let size_after_merge = transaction_size(
188                &ixs_after_merge,
189                true,
190                Some(&lookup_table),
191                lookup_table_addresses.len(),
192            );
193            if size_after_merge <= packet_size
194                && ixs_after_merge.len() <= self.options.max_instructions_for_one_tx
195            {
196                tracing::debug!(size_after_merge, "adding to the last tx");
197                last.try_merge(&mut txn).map_err(|err| (txn, err))?;
198            } else {
199                tracing::debug!(
200                    size_after_merge,
201                    "exceed packet data size limit, adding to a new tx"
202                );
203                if self.options.force_one_transaction {
204                    return Err((txn, crate::Error::AddTransaction("cannot create more than one transaction because `force_one_transaction` is set")));
205                }
206                self.builders.push(txn);
207            }
208        }
209        Ok(self)
210    }
211
212    /// Try to push a [`TransactionBuilder`] to the builder.
213    #[allow(clippy::result_large_err)]
214    #[inline]
215    pub fn try_push(
216        &mut self,
217        txn: TransactionBuilder<'a, C>,
218    ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
219        self.try_push_with_opts(txn, false)
220    }
221
222    /// Push a [`TransactionBuilder`].
223    pub fn push(&mut self, txn: TransactionBuilder<'a, C>) -> crate::Result<&mut Self> {
224        self.try_push(txn).map_err(|(_, err)| err)
225    }
226
227    /// Push [`TransactionBuilder`]s.
228    pub fn push_many(
229        &mut self,
230        txns: impl IntoIterator<Item = TransactionBuilder<'a, C>>,
231        new_transaction: bool,
232    ) -> crate::Result<&mut Self> {
233        for (idx, txn) in txns.into_iter().enumerate() {
234            self.try_push_with_opts(txn, (idx == 0) && new_transaction)
235                .map_err(|(_, err)| err)?;
236        }
237        Ok(self)
238    }
239
240    /// Get back all collected [`TransactionBuilder`]s.
241    pub fn into_builders(self) -> Vec<TransactionBuilder<'a, C>> {
242        self.builders
243    }
244
245    /// Send all in order and returns the signatures of the success transactions.
246    pub async fn send_all(
247        self,
248        skip_preflight: bool,
249    ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
250        match self
251            .send_all_with_opts(SendBundleOptions {
252                config: RpcSendTransactionConfig {
253                    skip_preflight,
254                    ..Default::default()
255                },
256                ..Default::default()
257            })
258            .await
259        {
260            Ok(signatures) => Ok(signatures
261                .into_iter()
262                .map(|with_slot| with_slot.into_value())
263                .collect()),
264            Err((signatures, err)) => Err((
265                signatures
266                    .into_iter()
267                    .map(|with_slot| with_slot.into_value())
268                    .collect(),
269                err,
270            )),
271        }
272    }
273
274    /// Send all in order with the given options and returns the signatures of the success transactions.
275    pub async fn send_all_with_opts(
276        self,
277        opts: SendBundleOptions,
278    ) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
279        let SendBundleOptions {
280            without_compute_budget,
281            compute_unit_price_micro_lamports,
282            compute_unit_min_priority_lamports,
283            update_recent_block_hash_before_send,
284            continue_on_error,
285            mut config,
286            disable_error_tracing,
287            inspector_cluster,
288        } = opts;
289        config.preflight_commitment = config
290            .preflight_commitment
291            .or(Some(self.client.commitment().commitment));
292        let latest_hash = self
293            .client
294            .get_latest_blockhash()
295            .await
296            .map_err(|err| (vec![], Box::new(err).into()))?;
297        let txs = self
298            .builders
299            .into_iter()
300            .enumerate()
301            .map(|(idx, mut builder)| {
302                tracing::debug!(
303                    size = builder.transaction_size(true),
304                    "signing transaction {idx}"
305                );
306
307                if let Some(lamports) = compute_unit_min_priority_lamports {
308                    builder
309                        .compute_budget_mut()
310                        .set_min_priority_lamports(Some(lamports));
311                }
312
313                builder.signed_transaction_with_blockhash_and_options(
314                    latest_hash,
315                    without_compute_budget,
316                    compute_unit_price_micro_lamports,
317                )
318            })
319            .collect::<crate::Result<Vec<_>>>()
320            .map_err(|err| (vec![], err))?;
321        send_all_txs(
322            &self.client,
323            txs,
324            config,
325            update_recent_block_hash_before_send,
326            continue_on_error,
327            !disable_error_tracing,
328            inspector_cluster,
329        )
330        .await
331    }
332
333    /// Estimate execution fee.
334    pub async fn estimate_execution_fee(
335        &self,
336        compute_unit_price_micro_lamports: Option<u64>,
337    ) -> crate::Result<u64> {
338        self.builders
339            .iter()
340            .map(|txn| txn.estimate_execution_fee(&self.client, compute_unit_price_micro_lamports))
341            .collect::<futures_util::stream::FuturesUnordered<_>>()
342            .try_fold(0, |acc, fee| futures_util::future::ready(Ok(acc + fee)))
343            .await
344    }
345
346    /// Insert all the instructions of `other` into `self`.
347    ///
348    /// If `new_transaction` is `true`, then a new transaction will be created before pushing.
349    pub fn append(&mut self, other: Self, new_transaction: bool) -> crate::Result<()> {
350        let builders = other.into_builders();
351
352        for (idx, txn) in builders.into_iter().enumerate() {
353            self.try_push_with_opts(txn, new_transaction && idx == 0)
354                .map_err(|(_, err)| err)?;
355        }
356
357        Ok(())
358    }
359}
360
361async fn send_all_txs(
362    client: &RpcClient,
363    txs: impl IntoIterator<Item = VersionedTransaction>,
364    config: RpcSendTransactionConfig,
365    update_recent_block_hash_before_send: bool,
366    continue_on_error: bool,
367    enable_tracing: bool,
368    inspector_cluster: Option<Cluster>,
369) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
370    let txs = txs.into_iter();
371    let (min, max) = txs.size_hint();
372    let mut signatures = Vec::with_capacity(max.unwrap_or(min));
373    let mut error = None;
374    for (idx, mut tx) in txs.into_iter().enumerate() {
375        if update_recent_block_hash_before_send {
376            match client.get_latest_blockhash().await {
377                Ok(latest_blockhash) => {
378                    tx.message.set_recent_blockhash(latest_blockhash);
379                }
380                Err(err) => {
381                    error = Some(Box::new(err).into());
382                    break;
383                }
384            }
385        }
386        tracing::debug!(
387            commitment = ?client.commitment(),
388            ?config,
389            "sending transaction {idx}"
390        );
391        match client
392            .send_and_confirm_transaction_with_config(&tx, config)
393            .await
394        {
395            Ok(signature) => {
396                signatures.push(signature);
397            }
398            Err(err) => {
399                if enable_tracing {
400                    let cluster = inspector_cluster
401                        .clone()
402                        .or_else(|| client.url().parse().ok());
403                    let inspector_url = inspect_transaction(&tx.message, cluster.as_ref(), false);
404                    let hash = tx.message.recent_blockhash();
405                    tracing::error!(%err, %hash, ?config, "transaction {idx} failed: {inspector_url}");
406                }
407
408                error = Some(Box::new(err).into());
409                if !continue_on_error {
410                    break;
411                }
412            }
413        }
414    }
415    match error {
416        None => Ok(signatures),
417        Some(err) => Err((signatures, err)),
418    }
419}
420
421impl<'a, C> IntoIterator for BundleBuilder<'a, C> {
422    type Item = TransactionBuilder<'a, C>;
423
424    type IntoIter = <Vec<TransactionBuilder<'a, C>> as IntoIterator>::IntoIter;
425
426    fn into_iter(self) -> Self::IntoIter {
427        self.builders.into_iter()
428    }
429}