1use std::{collections::HashSet, ops::Deref};
2
3use futures_util::TryStreamExt;
4use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
5use solana_sdk::{
6 commitment_config::CommitmentConfig, packet::PACKET_DATA_SIZE, signature::Signature,
7 signer::Signer, transaction::VersionedTransaction,
8};
9
10use crate::{
11 client::SendAndConfirm,
12 cluster::Cluster,
13 transaction_builder::TransactionBuilder,
14 utils::{inspect_transaction, transaction_size, WithSlot},
15};
16
17const TRANSACTION_SIZE_LIMIT: usize = PACKET_DATA_SIZE;
18const DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX: usize = 14;
19
20#[derive(Debug, Clone)]
22pub struct BundleOptions {
23 pub force_one_transaction: bool,
25 pub max_packet_size: Option<usize>,
27 pub max_instructions_for_one_tx: usize,
29}
30
31impl Default for BundleOptions {
32 fn default() -> Self {
33 Self {
34 force_one_transaction: false,
35 max_packet_size: None,
36 max_instructions_for_one_tx: DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct CreateBundleOptions {
44 pub cluster: Cluster,
46 pub commitment: CommitmentConfig,
48 pub options: BundleOptions,
50}
51
52#[derive(Debug, Clone, Default)]
54pub struct SendBundleOptions {
55 pub without_compute_budget: bool,
57 pub compute_unit_price_micro_lamports: Option<u64>,
59 pub compute_unit_min_priority_lamports: Option<u64>,
62 pub update_recent_block_hash_before_send: bool,
64 pub continue_on_error: bool,
66 pub config: RpcSendTransactionConfig,
68 pub disable_error_tracing: bool,
70 pub inspector_cluster: Option<Cluster>,
72}
73
74pub struct BundleBuilder<'a, C> {
76 client: RpcClient,
77 builders: Vec<TransactionBuilder<'a, C>>,
78 options: BundleOptions,
79}
80
81impl<C> BundleBuilder<'_, C> {
82 pub fn new(cluster: Cluster) -> Self {
84 Self::new_with_options(CreateBundleOptions {
85 cluster,
86 ..Default::default()
87 })
88 }
89
90 pub fn new_with_options(options: CreateBundleOptions) -> Self {
92 let rpc = options.cluster.rpc(options.commitment);
93
94 Self::from_rpc_client_with_options(rpc, options.options)
95 }
96
97 pub fn from_rpc_client(client: RpcClient) -> Self {
99 Self::from_rpc_client_with_options(client, Default::default())
100 }
101
102 pub fn from_rpc_client_with_options(client: RpcClient, options: BundleOptions) -> Self {
104 Self {
105 client,
106 builders: Default::default(),
107 options,
108 }
109 }
110
111 pub fn packet_size(&self) -> usize {
113 match self.options.max_packet_size {
114 Some(size) => size.min(TRANSACTION_SIZE_LIMIT),
115 None => TRANSACTION_SIZE_LIMIT,
116 }
117 }
118
119 pub fn client(&self) -> &RpcClient {
121 &self.client
122 }
123
124 pub fn is_empty(&self) -> bool {
126 self.builders.is_empty()
127 }
128
129 pub fn try_clone_empty(&self) -> crate::Result<Self> {
131 let cluster = self.client.url().parse()?;
132 let commitment = self.client.commitment();
133 Ok(Self::new_with_options(CreateBundleOptions {
134 cluster,
135 commitment,
136 options: self.options.clone(),
137 }))
138 }
139
140 pub fn set_options(&mut self, options: BundleOptions) -> &mut Self {
142 self.options = options;
143 self
144 }
145}
146
147impl<'a, C: Deref<Target = impl Signer> + Clone> BundleBuilder<'a, C> {
148 #[allow(clippy::result_large_err)]
150 pub fn try_push_with_opts(
151 &mut self,
152 mut txn: TransactionBuilder<'a, C>,
153 new_transaction: bool,
154 ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
155 let packet_size = self.packet_size();
156 let mut ix = txn.instructions_with_options(true, None);
157 let incoming_lookup_table = txn.get_complete_lookup_table();
158 if transaction_size(
159 &ix,
160 true,
161 Some(&incoming_lookup_table),
162 txn.get_luts().len(),
163 ) > packet_size
164 {
165 return Err((
166 txn,
167 crate::Error::AddTransaction("the size of this instruction is too big"),
168 ));
169 }
170 if self.builders.is_empty() || new_transaction {
171 tracing::debug!("adding to a new tx");
172 if !self.builders.is_empty() && self.options.force_one_transaction {
173 return Err((txn, crate::Error::AddTransaction("cannot create more than one transaction because `force_one_transaction` is set")));
174 }
175 self.builders.push(txn);
176 } else {
177 let last = self.builders.last_mut().unwrap();
178
179 let mut ixs_after_merge = last.instructions_with_options(false, None);
180 ixs_after_merge.append(&mut ix);
181
182 let mut lookup_table = last.get_complete_lookup_table();
183 lookup_table.extend(incoming_lookup_table);
184 let mut lookup_table_addresses = last.get_luts().keys().collect::<HashSet<_>>();
185 lookup_table_addresses.extend(txn.get_luts().keys());
186
187 let size_after_merge = transaction_size(
188 &ixs_after_merge,
189 true,
190 Some(&lookup_table),
191 lookup_table_addresses.len(),
192 );
193 if size_after_merge <= packet_size
194 && ixs_after_merge.len() <= self.options.max_instructions_for_one_tx
195 {
196 tracing::debug!(size_after_merge, "adding to the last tx");
197 last.try_merge(&mut txn).map_err(|err| (txn, err))?;
198 } else {
199 tracing::debug!(
200 size_after_merge,
201 "exceed packet data size limit, adding to a new tx"
202 );
203 if self.options.force_one_transaction {
204 return Err((txn, crate::Error::AddTransaction("cannot create more than one transaction because `force_one_transaction` is set")));
205 }
206 self.builders.push(txn);
207 }
208 }
209 Ok(self)
210 }
211
212 #[allow(clippy::result_large_err)]
214 #[inline]
215 pub fn try_push(
216 &mut self,
217 txn: TransactionBuilder<'a, C>,
218 ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
219 self.try_push_with_opts(txn, false)
220 }
221
222 pub fn push(&mut self, txn: TransactionBuilder<'a, C>) -> crate::Result<&mut Self> {
224 self.try_push(txn).map_err(|(_, err)| err)
225 }
226
227 pub fn push_many(
229 &mut self,
230 txns: impl IntoIterator<Item = TransactionBuilder<'a, C>>,
231 new_transaction: bool,
232 ) -> crate::Result<&mut Self> {
233 for (idx, txn) in txns.into_iter().enumerate() {
234 self.try_push_with_opts(txn, (idx == 0) && new_transaction)
235 .map_err(|(_, err)| err)?;
236 }
237 Ok(self)
238 }
239
240 pub fn into_builders(self) -> Vec<TransactionBuilder<'a, C>> {
242 self.builders
243 }
244
245 pub async fn send_all(
247 self,
248 skip_preflight: bool,
249 ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
250 match self
251 .send_all_with_opts(SendBundleOptions {
252 config: RpcSendTransactionConfig {
253 skip_preflight,
254 ..Default::default()
255 },
256 ..Default::default()
257 })
258 .await
259 {
260 Ok(signatures) => Ok(signatures
261 .into_iter()
262 .map(|with_slot| with_slot.into_value())
263 .collect()),
264 Err((signatures, err)) => Err((
265 signatures
266 .into_iter()
267 .map(|with_slot| with_slot.into_value())
268 .collect(),
269 err,
270 )),
271 }
272 }
273
274 pub async fn send_all_with_opts(
276 self,
277 opts: SendBundleOptions,
278 ) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
279 let SendBundleOptions {
280 without_compute_budget,
281 compute_unit_price_micro_lamports,
282 compute_unit_min_priority_lamports,
283 update_recent_block_hash_before_send,
284 continue_on_error,
285 mut config,
286 disable_error_tracing,
287 inspector_cluster,
288 } = opts;
289 config.preflight_commitment = config
290 .preflight_commitment
291 .or(Some(self.client.commitment().commitment));
292 let latest_hash = self
293 .client
294 .get_latest_blockhash()
295 .await
296 .map_err(|err| (vec![], Box::new(err).into()))?;
297 let txs = self
298 .builders
299 .into_iter()
300 .enumerate()
301 .map(|(idx, mut builder)| {
302 tracing::debug!(
303 size = builder.transaction_size(true),
304 "signing transaction {idx}"
305 );
306
307 if let Some(lamports) = compute_unit_min_priority_lamports {
308 builder
309 .compute_budget_mut()
310 .set_min_priority_lamports(Some(lamports));
311 }
312
313 builder.signed_transaction_with_blockhash_and_options(
314 latest_hash,
315 without_compute_budget,
316 compute_unit_price_micro_lamports,
317 )
318 })
319 .collect::<crate::Result<Vec<_>>>()
320 .map_err(|err| (vec![], err))?;
321 send_all_txs(
322 &self.client,
323 txs,
324 config,
325 update_recent_block_hash_before_send,
326 continue_on_error,
327 !disable_error_tracing,
328 inspector_cluster,
329 )
330 .await
331 }
332
333 pub async fn estimate_execution_fee(
335 &self,
336 compute_unit_price_micro_lamports: Option<u64>,
337 ) -> crate::Result<u64> {
338 self.builders
339 .iter()
340 .map(|txn| txn.estimate_execution_fee(&self.client, compute_unit_price_micro_lamports))
341 .collect::<futures_util::stream::FuturesUnordered<_>>()
342 .try_fold(0, |acc, fee| futures_util::future::ready(Ok(acc + fee)))
343 .await
344 }
345
346 pub fn append(&mut self, other: Self, new_transaction: bool) -> crate::Result<()> {
350 let builders = other.into_builders();
351
352 for (idx, txn) in builders.into_iter().enumerate() {
353 self.try_push_with_opts(txn, new_transaction && idx == 0)
354 .map_err(|(_, err)| err)?;
355 }
356
357 Ok(())
358 }
359}
360
361async fn send_all_txs(
362 client: &RpcClient,
363 txs: impl IntoIterator<Item = VersionedTransaction>,
364 config: RpcSendTransactionConfig,
365 update_recent_block_hash_before_send: bool,
366 continue_on_error: bool,
367 enable_tracing: bool,
368 inspector_cluster: Option<Cluster>,
369) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
370 let txs = txs.into_iter();
371 let (min, max) = txs.size_hint();
372 let mut signatures = Vec::with_capacity(max.unwrap_or(min));
373 let mut error = None;
374 for (idx, mut tx) in txs.into_iter().enumerate() {
375 if update_recent_block_hash_before_send {
376 match client.get_latest_blockhash().await {
377 Ok(latest_blockhash) => {
378 tx.message.set_recent_blockhash(latest_blockhash);
379 }
380 Err(err) => {
381 error = Some(Box::new(err).into());
382 break;
383 }
384 }
385 }
386 tracing::debug!(
387 commitment = ?client.commitment(),
388 ?config,
389 "sending transaction {idx}"
390 );
391 match client
392 .send_and_confirm_transaction_with_config(&tx, config)
393 .await
394 {
395 Ok(signature) => {
396 signatures.push(signature);
397 }
398 Err(err) => {
399 if enable_tracing {
400 let cluster = inspector_cluster
401 .clone()
402 .or_else(|| client.url().parse().ok());
403 let inspector_url = inspect_transaction(&tx.message, cluster.as_ref(), false);
404 let hash = tx.message.recent_blockhash();
405 tracing::error!(%err, %hash, ?config, "transaction {idx} failed: {inspector_url}");
406 }
407
408 error = Some(Box::new(err).into());
409 if !continue_on_error {
410 break;
411 }
412 }
413 }
414 }
415 match error {
416 None => Ok(signatures),
417 Some(err) => Err((signatures, err)),
418 }
419}
420
421impl<'a, C> IntoIterator for BundleBuilder<'a, C> {
422 type Item = TransactionBuilder<'a, C>;
423
424 type IntoIter = <Vec<TransactionBuilder<'a, C>> as IntoIterator>::IntoIter;
425
426 fn into_iter(self) -> Self::IntoIter {
427 self.builders.into_iter()
428 }
429}