gmsol/pyth/pull_oracle/
mod.rs

1/// Wormhole Ops.
2pub mod wormhole;
3
4/// Pyth Reciever Ops.
5pub mod receiver;
6
7/// Hermes.
8pub mod hermes;
9
10/// Utils.
11pub mod utils;
12
13mod pull_oracle_impl;
14
15use std::{collections::HashMap, future::Future, ops::Deref};
16
17use anchor_client::{
18    solana_client::rpc_config::RpcSendTransactionConfig,
19    solana_sdk::{
20        pubkey::Pubkey,
21        signature::{Keypair, Signature},
22        signer::Signer,
23    },
24};
25use either::Either;
26use gmsol_solana_utils::{
27    bundle_builder::{BundleBuilder, SendBundleOptions},
28    program::Program,
29    transaction_builder::TransactionBuilder,
30};
31use gmsol_store::states::common::TokensWithFeed;
32use hermes::BinaryPriceUpdate;
33use pyth_sdk::Identifier;
34use pythnet_sdk::wire::v1::AccumulatorUpdateData;
35
36use self::wormhole::WORMHOLE_PROGRAM_ID;
37
38pub use self::{
39    pull_oracle_impl::{PriceUpdates, PythPullOracleWithHermes},
40    receiver::PythReceiverOps,
41    wormhole::WormholeOps,
42};
43
44use self::hermes::PriceUpdate;
45
46const VAA_SPLIT_INDEX: usize = 755;
47
48/// With Pyth Prices.
49pub struct WithPythPrices<'a, C> {
50    post: BundleBuilder<'a, C>,
51    consume: BundleBuilder<'a, C>,
52    close: BundleBuilder<'a, C>,
53}
54
55impl<S, C> WithPythPrices<'_, C>
56where
57    C: Deref<Target = S> + Clone,
58    S: Signer,
59{
60    /// Estimate execution fee.
61    pub async fn estimated_execution_fee(
62        &self,
63        compute_unit_price_micro_lamports: Option<u64>,
64    ) -> crate::Result<u64> {
65        let mut execution_fee = self
66            .post
67            .estimate_execution_fee(compute_unit_price_micro_lamports)
68            .await?;
69        execution_fee = execution_fee.saturating_add(
70            self.consume
71                .estimate_execution_fee(compute_unit_price_micro_lamports)
72                .await?,
73        );
74        execution_fee = execution_fee.saturating_add(
75            self.close
76                .estimate_execution_fee(compute_unit_price_micro_lamports)
77                .await?,
78        );
79        Ok(execution_fee)
80    }
81
82    /// Send all transactions.
83    pub async fn send_all(
84        self,
85        compute_unit_price_micro_lamports: Option<u64>,
86        skip_preflight: bool,
87        enable_tracing: bool,
88    ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
89        let mut error: Option<crate::Error> = None;
90
91        let mut signatures = match self
92            .post
93            .send_all_with_opts(SendBundleOptions {
94                without_compute_budget: false,
95                compute_unit_price_micro_lamports,
96                update_recent_block_hash_before_send: false,
97                config: RpcSendTransactionConfig {
98                    skip_preflight,
99                    ..Default::default()
100                },
101                disable_error_tracing: !enable_tracing,
102                ..Default::default()
103            })
104            .await
105        {
106            Ok(signatures) => signatures,
107            Err((signatures, err)) => {
108                error = Some(err.into());
109                signatures
110            }
111        };
112
113        if error.is_none() {
114            let mut consume_signatures = match self
115                .consume
116                .send_all_with_opts(SendBundleOptions {
117                    without_compute_budget: false,
118                    compute_unit_price_micro_lamports,
119                    update_recent_block_hash_before_send: false,
120                    config: RpcSendTransactionConfig {
121                        skip_preflight,
122                        ..Default::default()
123                    },
124                    disable_error_tracing: !enable_tracing,
125                    ..Default::default()
126                })
127                .await
128            {
129                Ok(signatures) => signatures,
130                Err((signatures, err)) => {
131                    error = Some(err.into());
132                    signatures
133                }
134            };
135
136            signatures.append(&mut consume_signatures);
137        }
138
139        let mut close_signatures = match self
140            .close
141            .send_all_with_opts(SendBundleOptions {
142                without_compute_budget: false,
143                compute_unit_price_micro_lamports,
144                update_recent_block_hash_before_send: false,
145                config: RpcSendTransactionConfig {
146                    skip_preflight,
147                    ..Default::default()
148                },
149                disable_error_tracing: !enable_tracing,
150                ..Default::default()
151            })
152            .await
153        {
154            Ok(signatures) => signatures,
155            Err((signatures, err)) => {
156                match &error {
157                    None => error = Some(err.into()),
158                    Some(post_err) => {
159                        error = Some(crate::Error::unknown(format!(
160                            "post error: {post_err}, close error: {err}"
161                        )));
162                    }
163                }
164                signatures
165            }
166        };
167
168        signatures.append(&mut close_signatures);
169
170        match error {
171            None => Ok(signatures
172                .into_iter()
173                .map(|with_slot| with_slot.into_value())
174                .collect()),
175            Some(err) => Err((
176                signatures
177                    .into_iter()
178                    .map(|with_slot| with_slot.into_value())
179                    .collect(),
180                err,
181            )),
182        }
183    }
184}
185
186/// Prices.
187pub type Prices = HashMap<Identifier, Pubkey>;
188
189/// Pyth Pull Oracle Context.
190pub struct PythPullOracleContext {
191    encoded_vaas: Vec<Keypair>,
192    feeds: HashMap<Identifier, Keypair>,
193    feed_ids: Vec<Identifier>,
194}
195
196impl PythPullOracleContext {
197    /// Create a new [`PythPullOracleContext`].
198    pub fn new(feed_ids: Vec<Identifier>) -> Self {
199        let feeds = feed_ids.iter().map(|id| (*id, Keypair::new())).collect();
200        Self {
201            encoded_vaas: Vec::with_capacity(1),
202            feeds,
203            feed_ids,
204        }
205    }
206
207    /// Create a new [`PythPullOracleContext`] from [`TokensWithFeed`].
208    pub fn try_from_feeds(feeds: &TokensWithFeed) -> crate::Result<Self> {
209        let feed_ids = utils::extract_pyth_feed_ids(feeds)?;
210        Ok(Self::new(feed_ids))
211    }
212
213    /// Get feed ids.
214    pub fn feed_ids(&self) -> &[Identifier] {
215        &self.feed_ids
216    }
217
218    /// Create a new keypair for encoded vaa account.
219    ///
220    /// Return its index.
221    pub fn add_encoded_vaa(&mut self) -> usize {
222        self.encoded_vaas.push(Keypair::new());
223        self.encoded_vaas.len() - 1
224    }
225
226    /// Get encoded vaas.
227    pub fn encoded_vaas(&self) -> &[Keypair] {
228        &self.encoded_vaas
229    }
230}
231
232/// Pyth Pull Oracle Ops.
233pub trait PythPullOracleOps<C> {
234    /// Get Pyth Program.
235    fn pyth(&self) -> &Program<C>;
236
237    /// Get Wormhole Program.
238    fn wormhole(&self) -> &Program<C>;
239
240    /// Create transactions to post price updates and consume the prices.
241    fn with_pyth_prices<'a, S, It, Fut>(
242        &'a self,
243        ctx: &'a PythPullOracleContext,
244        update: &'a PriceUpdate,
245        consume: impl FnOnce(Prices) -> Fut,
246    ) -> impl Future<Output = crate::Result<WithPythPrices<'a, C>>>
247    where
248        C: Deref<Target = S> + Clone + 'a,
249        S: Signer,
250        It: IntoIterator<Item = TransactionBuilder<'a, C>>,
251        Fut: Future<Output = crate::Result<It>>,
252    {
253        self.with_pyth_price_updates(ctx, [&update.binary], consume)
254    }
255
256    /// Create transactions to post price updates and consume the prices.
257    fn with_pyth_price_updates<'a, S, It, Fut>(
258        &'a self,
259        ctx: &'a PythPullOracleContext,
260        updates: impl IntoIterator<Item = &'a BinaryPriceUpdate>,
261        consume: impl FnOnce(Prices) -> Fut,
262    ) -> impl Future<Output = crate::Result<WithPythPrices<'a, C>>>
263    where
264        C: Deref<Target = S> + Clone + 'a,
265        S: Signer,
266        It: IntoIterator<Item = TransactionBuilder<'a, C>>,
267        Fut: Future<Output = crate::Result<It>>,
268    {
269        use std::collections::hash_map::Entry;
270
271        async {
272            let wormhole = self.wormhole();
273            let pyth = self.pyth();
274            let mut prices = HashMap::with_capacity(ctx.feeds.len());
275            let mut post = BundleBuilder::from_rpc_client(pyth.rpc());
276            let mut consume_txns = BundleBuilder::from_rpc_client(pyth.rpc());
277            let mut close = BundleBuilder::from_rpc_client(pyth.rpc());
278
279            let datas = updates
280                .into_iter()
281                .flat_map(
282                    |update| match utils::parse_accumulator_update_datas(update) {
283                        Ok(datas) => Either::Left(datas.into_iter().map(Ok)),
284                        Err(err) => Either::Right(std::iter::once(Err(err))),
285                    },
286                )
287                .collect::<crate::Result<Vec<AccumulatorUpdateData>>>()?;
288
289            // Merge by ids.
290            let mut updates = HashMap::<_, _>::default();
291            for data in datas.iter() {
292                let proof = &data.proof;
293                for update in utils::get_merkle_price_updates(proof) {
294                    let feed_id = utils::parse_feed_id(update)?;
295                    updates.insert(feed_id, (proof, update));
296                }
297            }
298
299            // Write vaas.
300            let mut encoded_vaas = HashMap::<_, _>::default();
301            let mut vaas = HashMap::<_, _>::default();
302            for (proof, _) in updates.values() {
303                let vaa = utils::get_vaa_buffer(proof);
304                if let Entry::Vacant(entry) = vaas.entry(vaa) {
305                    let guardian_set_index = utils::get_guardian_set_index(proof)?;
306
307                    let mut pubkey: Pubkey;
308                    loop {
309                        let keypair = Keypair::new();
310                        pubkey = keypair.pubkey();
311                        match encoded_vaas.entry(pubkey) {
312                            Entry::Vacant(entry) => {
313                                entry.insert(keypair);
314                                break;
315                            }
316                            Entry::Occupied(_) => continue,
317                        }
318                    }
319
320                    entry.insert((pubkey, guardian_set_index));
321                }
322            }
323
324            for (vaa, (pubkey, guardian_set_index)) in vaas.iter() {
325                let draft_vaa = encoded_vaas.remove(pubkey).expect("must exist");
326                let create = wormhole
327                    .create_encoded_vaa(draft_vaa, vaa.len() as u64)
328                    .await?;
329                let draft_vaa = pubkey;
330                let write_1 = wormhole.write_encoded_vaa(draft_vaa, 0, &vaa[0..VAA_SPLIT_INDEX]);
331                let write_2 = wormhole.write_encoded_vaa(
332                    draft_vaa,
333                    VAA_SPLIT_INDEX as u32,
334                    &vaa[VAA_SPLIT_INDEX..],
335                );
336                let verify = wormhole.verify_encoded_vaa_v1(draft_vaa, *guardian_set_index);
337                post.try_push(create.clear_output())?
338                    .try_push(write_1)?
339                    .try_push(write_2)?
340                    .try_push(verify)?;
341                let close_encoded_vaa = wormhole.close_encoded_vaa(draft_vaa);
342                close.try_push(close_encoded_vaa)?;
343            }
344
345            // Post price updates.
346            for (feed_id, (proof, update)) in updates {
347                let price_update = Keypair::new();
348                let vaa = utils::get_vaa_buffer(proof);
349                let Some((encoded_vaa, _)) = vaas.get(vaa) else {
350                    continue;
351                };
352                let (post_price_update, price_update) = pyth
353                    .post_price_update(price_update, update, encoded_vaa)?
354                    .swap_output(());
355                prices.insert(feed_id, price_update);
356                post.try_push(post_price_update)?;
357                close.try_push(pyth.reclaim_rent(&price_update))?;
358            }
359
360            let consume = (consume)(prices).await?;
361            consume_txns.push_many(consume, false)?;
362            Ok(WithPythPrices {
363                post,
364                consume: consume_txns,
365                close,
366            })
367        }
368    }
369
370    /// Execute with pyth price updates.
371    fn execute_with_pyth_price_updates<'a, 'exec, T, S>(
372        &'exec self,
373        updates: impl IntoIterator<Item = &'a BinaryPriceUpdate>,
374        execute: &mut T,
375        compute_unit_price_micro_lamports: Option<u64>,
376        skip_preflight: bool,
377        enable_tracing: bool,
378    ) -> impl Future<Output = crate::Result<()>>
379    where
380        C: Deref<Target = S> + Clone + 'exec,
381        S: Signer,
382        T: ExecuteWithPythPrices<'exec, C>,
383    {
384        async move {
385            let mut execution_fee_estiamted = !execute.should_estiamte_execution_fee();
386            let updates = updates.into_iter().collect::<Vec<_>>();
387            let ctx = execute.context().await?;
388            let mut with_prices;
389            loop {
390                with_prices = self
391                    .with_pyth_price_updates(&ctx, updates.clone(), |prices| async {
392                        let rpcs = execute.build_rpc_with_price_updates(prices).await?;
393                        Ok(rpcs)
394                    })
395                    .await?;
396                if execution_fee_estiamted {
397                    break;
398                } else {
399                    let execution_fee = with_prices
400                        .estimated_execution_fee(compute_unit_price_micro_lamports)
401                        .await?;
402                    execute.set_execution_fee(execution_fee);
403                    tracing::info!(%execution_fee, "execution fee estimated");
404                    execution_fee_estiamted = true;
405                }
406            }
407            execute
408                .execute_with_options(
409                    with_prices,
410                    compute_unit_price_micro_lamports,
411                    skip_preflight,
412                    enable_tracing,
413                )
414                .await?;
415            Ok(())
416        }
417    }
418}
419
420/// Pyth Pull Oracle.
421pub struct PythPullOracle<C> {
422    wormhole: Program<C>,
423    pyth: Program<C>,
424}
425
426impl<S, C> PythPullOracle<C>
427where
428    C: Deref<Target = S> + Clone,
429    S: Signer,
430{
431    /// Create a new [`PythPullOracle`] client from [`Client`](crate::Client).
432    pub fn try_new(client: &crate::Client<C>) -> crate::Result<Self> {
433        Ok(Self {
434            wormhole: client.program(WORMHOLE_PROGRAM_ID),
435            pyth: client.program(pyth_solana_receiver_sdk::ID),
436        })
437    }
438}
439
440impl<S, C> PythPullOracleOps<C> for PythPullOracle<C>
441where
442    C: Deref<Target = S> + Clone,
443    S: Signer,
444{
445    fn pyth(&self) -> &Program<C> {
446        &self.pyth
447    }
448
449    fn wormhole(&self) -> &Program<C> {
450        &self.wormhole
451    }
452}
453
454/// Execute with pyth prices.
455pub trait ExecuteWithPythPrices<'a, C> {
456    /// Whether to estimate the execution fee.
457    fn should_estiamte_execution_fee(&self) -> bool {
458        true
459    }
460
461    /// Set execution fee.
462    fn set_execution_fee(&mut self, lamports: u64);
463
464    /// Get the oracle context.
465    fn context(&mut self) -> impl Future<Output = crate::Result<PythPullOracleContext>>;
466
467    /// Build RPC requests with price updates.
468    fn build_rpc_with_price_updates(
469        &mut self,
470        price_updates: Prices,
471    ) -> impl Future<Output = crate::Result<Vec<TransactionBuilder<'a, C, ()>>>>;
472
473    /// Execute with options
474    fn execute_with_options<S>(
475        &mut self,
476        txns: WithPythPrices<C>,
477        compute_unit_price_micro_lamports: Option<u64>,
478        skip_preflight: bool,
479        enable_tracing: bool,
480    ) -> impl Future<Output = crate::Result<()>>
481    where
482        C: Deref<Target = S> + Clone,
483        S: Signer,
484    {
485        async move {
486            match txns
487                .send_all(
488                    compute_unit_price_micro_lamports,
489                    skip_preflight,
490                    enable_tracing,
491                )
492                .await
493            {
494                Ok(signatures) => {
495                    if enable_tracing {
496                        tracing::info!("executed with txns {signatures:#?}");
497                    }
498                    Ok(())
499                }
500                Err((signatures, err)) => {
501                    if enable_tracing {
502                        tracing::error!(%err, "failed to execute, successful txns: {signatures:#?}");
503                    }
504                    Err(err)
505                }
506            }
507        }
508    }
509
510    /// Execute.
511    fn execute<S>(
512        &mut self,
513        txns: WithPythPrices<C>,
514        skip_preflight: bool,
515    ) -> impl Future<Output = crate::Result<()>>
516    where
517        C: Deref<Target = S> + Clone,
518        S: Signer,
519    {
520        self.execute_with_options(txns, None, skip_preflight, true)
521    }
522}