1use std::{
2 collections::{hash_map::Entry, HashMap},
3 ops::Deref,
4};
5
6use anchor_client::solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
7use either::Either;
8use gmsol_solana_utils::bundle_builder::BundleOptions;
9use gmsol_store::states::PriceProviderKind;
10use pythnet_sdk::wire::v1::AccumulatorUpdateData;
11use time::OffsetDateTime;
12
13use crate::{
14 pyth::{EncodingType, Hermes},
15 utils::builder::{
16 FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
17 },
18};
19
20use super::{
21 hermes::BinaryPriceUpdate, utils, PythPullOracle, PythPullOracleOps, PythReceiverOps,
22 WormholeOps, VAA_SPLIT_INDEX,
23};
24
25pub struct PythPullOracleWithHermes<'a, C> {
27 gmsol: &'a crate::Client<C>,
28 hermes: &'a Hermes,
29 oracle: &'a PythPullOracle<C>,
30}
31
32pub struct PriceUpdates {
34 num_feeds: Option<usize>,
35 updates: Vec<BinaryPriceUpdate>,
36}
37
38impl From<Vec<BinaryPriceUpdate>> for PriceUpdates {
39 fn from(value: Vec<BinaryPriceUpdate>) -> Self {
40 Self {
41 num_feeds: None,
42 updates: value,
43 }
44 }
45}
46
47impl<'a, C> PythPullOracleWithHermes<'a, C> {
48 pub fn from_parts(
50 gmsol: &'a crate::Client<C>,
51 hermes: &'a Hermes,
52 oracle: &'a PythPullOracle<C>,
53 ) -> Self {
54 Self {
55 gmsol,
56 hermes,
57 oracle,
58 }
59 }
60}
61
62impl<C> PullOracle for PythPullOracleWithHermes<'_, C> {
63 type PriceUpdates = PriceUpdates;
64
65 async fn fetch_price_updates(
66 &self,
67 feed_ids: &FeedIds,
68 after: Option<OffsetDateTime>,
69 ) -> crate::Result<Self::PriceUpdates> {
70 let feed_ids = utils::extract_pyth_feed_ids(feed_ids)?;
71 if feed_ids.is_empty() {
72 return Ok(PriceUpdates {
73 num_feeds: Some(0),
74 updates: vec![],
75 });
76 }
77 let update = self
78 .hermes
79 .latest_price_updates(&feed_ids, Some(EncodingType::Base64))
80 .await?;
81 if let Some(after) = after {
82 let min_ts = update
83 .min_timestamp()
84 .ok_or_else(|| crate::Error::invalid_argument("empty price updates"))?;
85 let min_ts = OffsetDateTime::from_unix_timestamp(min_ts)
86 .map_err(crate::Error::invalid_argument)?;
87 if min_ts < after {
88 return Err(crate::Error::invalid_argument(format!(
89 "price updates are too old, min_ts={min_ts}, required={after}"
90 )));
91 }
92 }
93 Ok(PriceUpdates {
94 num_feeds: Some(feed_ids.len()),
95 updates: vec![update.binary],
96 })
97 }
98}
99
100impl<C> PullOracle for &PythPullOracleWithHermes<'_, C> {
101 type PriceUpdates = PriceUpdates;
102
103 async fn fetch_price_updates(
104 &self,
105 feed_ids: &FeedIds,
106 after: Option<OffsetDateTime>,
107 ) -> crate::Result<Self::PriceUpdates> {
108 (*self).fetch_price_updates(feed_ids, after).await
109 }
110}
111
112impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
113 for PythPullOracleWithHermes<'a, C>
114{
115 async fn fetch_price_update_instructions(
116 &self,
117 price_updates: &Self::PriceUpdates,
118 options: BundleOptions,
119 ) -> crate::Result<(
120 PriceUpdateInstructions<'a, C>,
121 HashMap<PriceProviderKind, FeedAddressMap>,
122 )> {
123 let mut ixns = PriceUpdateInstructions::new(self.gmsol, options);
124
125 let PriceUpdates { updates, num_feeds } = price_updates;
126
127 if updates.is_empty() {
128 return Ok((ixns, Default::default()));
129 }
130
131 let mut prices = HashMap::with_capacity(num_feeds.unwrap_or(0));
132
133 let wormhole = self.oracle.wormhole();
134 let pyth = self.oracle.pyth();
135
136 let datas = updates
137 .iter()
138 .flat_map(
139 |update| match utils::parse_accumulator_update_datas(update) {
140 Ok(datas) => Either::Left(datas.into_iter().map(Ok)),
141 Err(err) => Either::Right(std::iter::once(Err(err))),
142 },
143 )
144 .collect::<crate::Result<Vec<AccumulatorUpdateData>>>()?;
145
146 let mut updates = HashMap::<_, _>::default();
148 for data in datas.iter() {
149 let proof = &data.proof;
150 for update in utils::get_merkle_price_updates(proof) {
151 let feed_id = utils::parse_feed_id(update)?;
152 updates.insert(feed_id, (proof, update));
153 }
154 }
155
156 let mut encoded_vaas = HashMap::<_, _>::default();
158 let mut vaas = HashMap::<_, _>::default();
159 for (proof, _) in updates.values() {
160 let vaa = utils::get_vaa_buffer(proof);
161 if let Entry::Vacant(entry) = vaas.entry(vaa) {
162 let guardian_set_index = utils::get_guardian_set_index(proof)?;
163
164 let mut pubkey: Pubkey;
165 loop {
166 let keypair = Keypair::new();
167 pubkey = keypair.pubkey();
168 match encoded_vaas.entry(pubkey) {
169 Entry::Vacant(entry) => {
170 entry.insert(keypair);
171 break;
172 }
173 Entry::Occupied(_) => continue,
174 }
175 }
176
177 entry.insert((pubkey, guardian_set_index));
178 }
179 }
180
181 for (vaa, (pubkey, guardian_set_index)) in vaas.iter() {
182 let draft_vaa = encoded_vaas.remove(pubkey).expect("must exist");
183 let create = wormhole
184 .create_encoded_vaa(draft_vaa, vaa.len() as u64)
185 .await?;
186 let draft_vaa = pubkey;
187 let write_1 = wormhole.write_encoded_vaa(draft_vaa, 0, &vaa[0..VAA_SPLIT_INDEX]);
188 let write_2 = wormhole.write_encoded_vaa(
189 draft_vaa,
190 VAA_SPLIT_INDEX as u32,
191 &vaa[VAA_SPLIT_INDEX..],
192 );
193 let verify = wormhole.verify_encoded_vaa_v1(draft_vaa, *guardian_set_index);
194 ixns.try_push_post(create.clear_output())?;
195 ixns.try_push_post(write_1)?;
196 ixns.try_push_post(write_2)?;
197 ixns.try_push_post(verify)?;
198 let close_encoded_vaa = wormhole.close_encoded_vaa(draft_vaa);
199 ixns.try_push_close(close_encoded_vaa)?;
200 }
201
202 for (feed_id, (proof, update)) in updates {
204 let price_update = Keypair::new();
205 let vaa = utils::get_vaa_buffer(proof);
206 let Some((encoded_vaa, _)) = vaas.get(vaa) else {
207 continue;
208 };
209 let (post_price_update, price_update) = pyth
210 .post_price_update(price_update, update, encoded_vaa)?
211 .swap_output(());
212 prices.insert(Pubkey::new_from_array(feed_id.to_bytes()), price_update);
213 ixns.try_push_post(post_price_update)?;
214 ixns.try_push_close(pyth.reclaim_rent(&price_update))?;
215 }
216
217 Ok((ixns, HashMap::from([(PriceProviderKind::Pyth, prices)])))
218 }
219}
220
221impl<'r, 'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
222 for &'r PythPullOracleWithHermes<'a, C>
223where
224 'r: 'a,
225{
226 async fn fetch_price_update_instructions(
227 &self,
228 price_updates: &Self::PriceUpdates,
229 options: BundleOptions,
230 ) -> crate::Result<(
231 PriceUpdateInstructions<'a, C>,
232 HashMap<PriceProviderKind, FeedAddressMap>,
233 )> {
234 (*self)
235 .fetch_price_update_instructions(price_updates, options)
236 .await
237 }
238}