gmsol/pyth/pull_oracle/
pull_oracle_impl.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    ops::Deref,
4};
5
6use anchor_client::solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
7use either::Either;
8use gmsol_solana_utils::bundle_builder::BundleOptions;
9use gmsol_store::states::PriceProviderKind;
10use pythnet_sdk::wire::v1::AccumulatorUpdateData;
11use time::OffsetDateTime;
12
13use crate::{
14    pyth::{EncodingType, Hermes},
15    utils::builder::{
16        FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
17    },
18};
19
20use super::{
21    hermes::BinaryPriceUpdate, utils, PythPullOracle, PythPullOracleOps, PythReceiverOps,
22    WormholeOps, VAA_SPLIT_INDEX,
23};
24
25/// Pyth Pull Oracle.
26pub struct PythPullOracleWithHermes<'a, C> {
27    gmsol: &'a crate::Client<C>,
28    hermes: &'a Hermes,
29    oracle: &'a PythPullOracle<C>,
30}
31
32/// Price updates.
33pub struct PriceUpdates {
34    num_feeds: Option<usize>,
35    updates: Vec<BinaryPriceUpdate>,
36}
37
38impl From<Vec<BinaryPriceUpdate>> for PriceUpdates {
39    fn from(value: Vec<BinaryPriceUpdate>) -> Self {
40        Self {
41            num_feeds: None,
42            updates: value,
43        }
44    }
45}
46
47impl<'a, C> PythPullOracleWithHermes<'a, C> {
48    /// Create from parts.
49    pub fn from_parts(
50        gmsol: &'a crate::Client<C>,
51        hermes: &'a Hermes,
52        oracle: &'a PythPullOracle<C>,
53    ) -> Self {
54        Self {
55            gmsol,
56            hermes,
57            oracle,
58        }
59    }
60}
61
62impl<C> PullOracle for PythPullOracleWithHermes<'_, C> {
63    type PriceUpdates = PriceUpdates;
64
65    async fn fetch_price_updates(
66        &self,
67        feed_ids: &FeedIds,
68        after: Option<OffsetDateTime>,
69    ) -> crate::Result<Self::PriceUpdates> {
70        let feed_ids = utils::extract_pyth_feed_ids(feed_ids)?;
71        if feed_ids.is_empty() {
72            return Ok(PriceUpdates {
73                num_feeds: Some(0),
74                updates: vec![],
75            });
76        }
77        let update = self
78            .hermes
79            .latest_price_updates(&feed_ids, Some(EncodingType::Base64))
80            .await?;
81        if let Some(after) = after {
82            let min_ts = update
83                .min_timestamp()
84                .ok_or_else(|| crate::Error::invalid_argument("empty price updates"))?;
85            let min_ts = OffsetDateTime::from_unix_timestamp(min_ts)
86                .map_err(crate::Error::invalid_argument)?;
87            if min_ts < after {
88                return Err(crate::Error::invalid_argument(format!(
89                    "price updates are too old, min_ts={min_ts}, required={after}"
90                )));
91            }
92        }
93        Ok(PriceUpdates {
94            num_feeds: Some(feed_ids.len()),
95            updates: vec![update.binary],
96        })
97    }
98}
99
100impl<C> PullOracle for &PythPullOracleWithHermes<'_, C> {
101    type PriceUpdates = PriceUpdates;
102
103    async fn fetch_price_updates(
104        &self,
105        feed_ids: &FeedIds,
106        after: Option<OffsetDateTime>,
107    ) -> crate::Result<Self::PriceUpdates> {
108        (*self).fetch_price_updates(feed_ids, after).await
109    }
110}
111
112impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
113    for PythPullOracleWithHermes<'a, C>
114{
115    async fn fetch_price_update_instructions(
116        &self,
117        price_updates: &Self::PriceUpdates,
118        options: BundleOptions,
119    ) -> crate::Result<(
120        PriceUpdateInstructions<'a, C>,
121        HashMap<PriceProviderKind, FeedAddressMap>,
122    )> {
123        let mut ixns = PriceUpdateInstructions::new(self.gmsol, options);
124
125        let PriceUpdates { updates, num_feeds } = price_updates;
126
127        if updates.is_empty() {
128            return Ok((ixns, Default::default()));
129        }
130
131        let mut prices = HashMap::with_capacity(num_feeds.unwrap_or(0));
132
133        let wormhole = self.oracle.wormhole();
134        let pyth = self.oracle.pyth();
135
136        let datas = updates
137            .iter()
138            .flat_map(
139                |update| match utils::parse_accumulator_update_datas(update) {
140                    Ok(datas) => Either::Left(datas.into_iter().map(Ok)),
141                    Err(err) => Either::Right(std::iter::once(Err(err))),
142                },
143            )
144            .collect::<crate::Result<Vec<AccumulatorUpdateData>>>()?;
145
146        // Merge by ids.
147        let mut updates = HashMap::<_, _>::default();
148        for data in datas.iter() {
149            let proof = &data.proof;
150            for update in utils::get_merkle_price_updates(proof) {
151                let feed_id = utils::parse_feed_id(update)?;
152                updates.insert(feed_id, (proof, update));
153            }
154        }
155
156        // Write vaas.
157        let mut encoded_vaas = HashMap::<_, _>::default();
158        let mut vaas = HashMap::<_, _>::default();
159        for (proof, _) in updates.values() {
160            let vaa = utils::get_vaa_buffer(proof);
161            if let Entry::Vacant(entry) = vaas.entry(vaa) {
162                let guardian_set_index = utils::get_guardian_set_index(proof)?;
163
164                let mut pubkey: Pubkey;
165                loop {
166                    let keypair = Keypair::new();
167                    pubkey = keypair.pubkey();
168                    match encoded_vaas.entry(pubkey) {
169                        Entry::Vacant(entry) => {
170                            entry.insert(keypair);
171                            break;
172                        }
173                        Entry::Occupied(_) => continue,
174                    }
175                }
176
177                entry.insert((pubkey, guardian_set_index));
178            }
179        }
180
181        for (vaa, (pubkey, guardian_set_index)) in vaas.iter() {
182            let draft_vaa = encoded_vaas.remove(pubkey).expect("must exist");
183            let create = wormhole
184                .create_encoded_vaa(draft_vaa, vaa.len() as u64)
185                .await?;
186            let draft_vaa = pubkey;
187            let write_1 = wormhole.write_encoded_vaa(draft_vaa, 0, &vaa[0..VAA_SPLIT_INDEX]);
188            let write_2 = wormhole.write_encoded_vaa(
189                draft_vaa,
190                VAA_SPLIT_INDEX as u32,
191                &vaa[VAA_SPLIT_INDEX..],
192            );
193            let verify = wormhole.verify_encoded_vaa_v1(draft_vaa, *guardian_set_index);
194            ixns.try_push_post(create.clear_output())?;
195            ixns.try_push_post(write_1)?;
196            ixns.try_push_post(write_2)?;
197            ixns.try_push_post(verify)?;
198            let close_encoded_vaa = wormhole.close_encoded_vaa(draft_vaa);
199            ixns.try_push_close(close_encoded_vaa)?;
200        }
201
202        // Post price updates.
203        for (feed_id, (proof, update)) in updates {
204            let price_update = Keypair::new();
205            let vaa = utils::get_vaa_buffer(proof);
206            let Some((encoded_vaa, _)) = vaas.get(vaa) else {
207                continue;
208            };
209            let (post_price_update, price_update) = pyth
210                .post_price_update(price_update, update, encoded_vaa)?
211                .swap_output(());
212            prices.insert(Pubkey::new_from_array(feed_id.to_bytes()), price_update);
213            ixns.try_push_post(post_price_update)?;
214            ixns.try_push_close(pyth.reclaim_rent(&price_update))?;
215        }
216
217        Ok((ixns, HashMap::from([(PriceProviderKind::Pyth, prices)])))
218    }
219}
220
221impl<'r, 'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
222    for &'r PythPullOracleWithHermes<'a, C>
223where
224    'r: 'a,
225{
226    async fn fetch_price_update_instructions(
227        &self,
228        price_updates: &Self::PriceUpdates,
229        options: BundleOptions,
230    ) -> crate::Result<(
231        PriceUpdateInstructions<'a, C>,
232        HashMap<PriceProviderKind, FeedAddressMap>,
233    )> {
234        (*self)
235            .fetch_price_update_instructions(price_updates, options)
236            .await
237    }
238}