1pub mod wormhole;
3
4pub mod receiver;
6
7pub mod hermes;
9
10pub mod utils;
12
13mod pull_oracle_impl;
14
15use std::{collections::HashMap, future::Future, ops::Deref};
16
17use anchor_client::{
18 solana_client::rpc_config::RpcSendTransactionConfig,
19 solana_sdk::{
20 pubkey::Pubkey,
21 signature::{Keypair, Signature},
22 signer::Signer,
23 },
24};
25use either::Either;
26use gmsol_solana_utils::{
27 bundle_builder::{BundleBuilder, SendBundleOptions},
28 program::Program,
29 transaction_builder::TransactionBuilder,
30};
31use gmsol_store::states::common::TokensWithFeed;
32use hermes::BinaryPriceUpdate;
33use pyth_sdk::Identifier;
34use pythnet_sdk::wire::v1::AccumulatorUpdateData;
35
36use self::wormhole::WORMHOLE_PROGRAM_ID;
37
38pub use self::{
39 pull_oracle_impl::{PriceUpdates, PythPullOracleWithHermes},
40 receiver::PythReceiverOps,
41 wormhole::WormholeOps,
42};
43
44use self::hermes::PriceUpdate;
45
46const VAA_SPLIT_INDEX: usize = 755;
47
48pub struct WithPythPrices<'a, C> {
50 post: BundleBuilder<'a, C>,
51 consume: BundleBuilder<'a, C>,
52 close: BundleBuilder<'a, C>,
53}
54
55impl<S, C> WithPythPrices<'_, C>
56where
57 C: Deref<Target = S> + Clone,
58 S: Signer,
59{
60 pub async fn estimated_execution_fee(
62 &self,
63 compute_unit_price_micro_lamports: Option<u64>,
64 ) -> crate::Result<u64> {
65 let mut execution_fee = self
66 .post
67 .estimate_execution_fee(compute_unit_price_micro_lamports)
68 .await?;
69 execution_fee = execution_fee.saturating_add(
70 self.consume
71 .estimate_execution_fee(compute_unit_price_micro_lamports)
72 .await?,
73 );
74 execution_fee = execution_fee.saturating_add(
75 self.close
76 .estimate_execution_fee(compute_unit_price_micro_lamports)
77 .await?,
78 );
79 Ok(execution_fee)
80 }
81
82 pub async fn send_all(
84 self,
85 compute_unit_price_micro_lamports: Option<u64>,
86 skip_preflight: bool,
87 enable_tracing: bool,
88 ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
89 let mut error: Option<crate::Error> = None;
90
91 let mut signatures = match self
92 .post
93 .send_all_with_opts(SendBundleOptions {
94 without_compute_budget: false,
95 compute_unit_price_micro_lamports,
96 update_recent_block_hash_before_send: false,
97 config: RpcSendTransactionConfig {
98 skip_preflight,
99 ..Default::default()
100 },
101 disable_error_tracing: !enable_tracing,
102 ..Default::default()
103 })
104 .await
105 {
106 Ok(signatures) => signatures,
107 Err((signatures, err)) => {
108 error = Some(err.into());
109 signatures
110 }
111 };
112
113 if error.is_none() {
114 let mut consume_signatures = match self
115 .consume
116 .send_all_with_opts(SendBundleOptions {
117 without_compute_budget: false,
118 compute_unit_price_micro_lamports,
119 update_recent_block_hash_before_send: false,
120 config: RpcSendTransactionConfig {
121 skip_preflight,
122 ..Default::default()
123 },
124 disable_error_tracing: !enable_tracing,
125 ..Default::default()
126 })
127 .await
128 {
129 Ok(signatures) => signatures,
130 Err((signatures, err)) => {
131 error = Some(err.into());
132 signatures
133 }
134 };
135
136 signatures.append(&mut consume_signatures);
137 }
138
139 let mut close_signatures = match self
140 .close
141 .send_all_with_opts(SendBundleOptions {
142 without_compute_budget: false,
143 compute_unit_price_micro_lamports,
144 update_recent_block_hash_before_send: false,
145 config: RpcSendTransactionConfig {
146 skip_preflight,
147 ..Default::default()
148 },
149 disable_error_tracing: !enable_tracing,
150 ..Default::default()
151 })
152 .await
153 {
154 Ok(signatures) => signatures,
155 Err((signatures, err)) => {
156 match &error {
157 None => error = Some(err.into()),
158 Some(post_err) => {
159 error = Some(crate::Error::unknown(format!(
160 "post error: {post_err}, close error: {err}"
161 )));
162 }
163 }
164 signatures
165 }
166 };
167
168 signatures.append(&mut close_signatures);
169
170 match error {
171 None => Ok(signatures
172 .into_iter()
173 .map(|with_slot| with_slot.into_value())
174 .collect()),
175 Some(err) => Err((
176 signatures
177 .into_iter()
178 .map(|with_slot| with_slot.into_value())
179 .collect(),
180 err,
181 )),
182 }
183 }
184}
185
186pub type Prices = HashMap<Identifier, Pubkey>;
188
189pub struct PythPullOracleContext {
191 encoded_vaas: Vec<Keypair>,
192 feeds: HashMap<Identifier, Keypair>,
193 feed_ids: Vec<Identifier>,
194}
195
196impl PythPullOracleContext {
197 pub fn new(feed_ids: Vec<Identifier>) -> Self {
199 let feeds = feed_ids.iter().map(|id| (*id, Keypair::new())).collect();
200 Self {
201 encoded_vaas: Vec::with_capacity(1),
202 feeds,
203 feed_ids,
204 }
205 }
206
207 pub fn try_from_feeds(feeds: &TokensWithFeed) -> crate::Result<Self> {
209 let feed_ids = utils::extract_pyth_feed_ids(feeds)?;
210 Ok(Self::new(feed_ids))
211 }
212
213 pub fn feed_ids(&self) -> &[Identifier] {
215 &self.feed_ids
216 }
217
218 pub fn add_encoded_vaa(&mut self) -> usize {
222 self.encoded_vaas.push(Keypair::new());
223 self.encoded_vaas.len() - 1
224 }
225
226 pub fn encoded_vaas(&self) -> &[Keypair] {
228 &self.encoded_vaas
229 }
230}
231
232pub trait PythPullOracleOps<C> {
234 fn pyth(&self) -> &Program<C>;
236
237 fn wormhole(&self) -> &Program<C>;
239
240 fn with_pyth_prices<'a, S, It, Fut>(
242 &'a self,
243 ctx: &'a PythPullOracleContext,
244 update: &'a PriceUpdate,
245 consume: impl FnOnce(Prices) -> Fut,
246 ) -> impl Future<Output = crate::Result<WithPythPrices<'a, C>>>
247 where
248 C: Deref<Target = S> + Clone + 'a,
249 S: Signer,
250 It: IntoIterator<Item = TransactionBuilder<'a, C>>,
251 Fut: Future<Output = crate::Result<It>>,
252 {
253 self.with_pyth_price_updates(ctx, [&update.binary], consume)
254 }
255
256 fn with_pyth_price_updates<'a, S, It, Fut>(
258 &'a self,
259 ctx: &'a PythPullOracleContext,
260 updates: impl IntoIterator<Item = &'a BinaryPriceUpdate>,
261 consume: impl FnOnce(Prices) -> Fut,
262 ) -> impl Future<Output = crate::Result<WithPythPrices<'a, C>>>
263 where
264 C: Deref<Target = S> + Clone + 'a,
265 S: Signer,
266 It: IntoIterator<Item = TransactionBuilder<'a, C>>,
267 Fut: Future<Output = crate::Result<It>>,
268 {
269 use std::collections::hash_map::Entry;
270
271 async {
272 let wormhole = self.wormhole();
273 let pyth = self.pyth();
274 let mut prices = HashMap::with_capacity(ctx.feeds.len());
275 let mut post = BundleBuilder::from_rpc_client(pyth.rpc());
276 let mut consume_txns = BundleBuilder::from_rpc_client(pyth.rpc());
277 let mut close = BundleBuilder::from_rpc_client(pyth.rpc());
278
279 let datas = updates
280 .into_iter()
281 .flat_map(
282 |update| match utils::parse_accumulator_update_datas(update) {
283 Ok(datas) => Either::Left(datas.into_iter().map(Ok)),
284 Err(err) => Either::Right(std::iter::once(Err(err))),
285 },
286 )
287 .collect::<crate::Result<Vec<AccumulatorUpdateData>>>()?;
288
289 let mut updates = HashMap::<_, _>::default();
291 for data in datas.iter() {
292 let proof = &data.proof;
293 for update in utils::get_merkle_price_updates(proof) {
294 let feed_id = utils::parse_feed_id(update)?;
295 updates.insert(feed_id, (proof, update));
296 }
297 }
298
299 let mut encoded_vaas = HashMap::<_, _>::default();
301 let mut vaas = HashMap::<_, _>::default();
302 for (proof, _) in updates.values() {
303 let vaa = utils::get_vaa_buffer(proof);
304 if let Entry::Vacant(entry) = vaas.entry(vaa) {
305 let guardian_set_index = utils::get_guardian_set_index(proof)?;
306
307 let mut pubkey: Pubkey;
308 loop {
309 let keypair = Keypair::new();
310 pubkey = keypair.pubkey();
311 match encoded_vaas.entry(pubkey) {
312 Entry::Vacant(entry) => {
313 entry.insert(keypair);
314 break;
315 }
316 Entry::Occupied(_) => continue,
317 }
318 }
319
320 entry.insert((pubkey, guardian_set_index));
321 }
322 }
323
324 for (vaa, (pubkey, guardian_set_index)) in vaas.iter() {
325 let draft_vaa = encoded_vaas.remove(pubkey).expect("must exist");
326 let create = wormhole
327 .create_encoded_vaa(draft_vaa, vaa.len() as u64)
328 .await?;
329 let draft_vaa = pubkey;
330 let write_1 = wormhole.write_encoded_vaa(draft_vaa, 0, &vaa[0..VAA_SPLIT_INDEX]);
331 let write_2 = wormhole.write_encoded_vaa(
332 draft_vaa,
333 VAA_SPLIT_INDEX as u32,
334 &vaa[VAA_SPLIT_INDEX..],
335 );
336 let verify = wormhole.verify_encoded_vaa_v1(draft_vaa, *guardian_set_index);
337 post.try_push(create.clear_output())?
338 .try_push(write_1)?
339 .try_push(write_2)?
340 .try_push(verify)?;
341 let close_encoded_vaa = wormhole.close_encoded_vaa(draft_vaa);
342 close.try_push(close_encoded_vaa)?;
343 }
344
345 for (feed_id, (proof, update)) in updates {
347 let price_update = Keypair::new();
348 let vaa = utils::get_vaa_buffer(proof);
349 let Some((encoded_vaa, _)) = vaas.get(vaa) else {
350 continue;
351 };
352 let (post_price_update, price_update) = pyth
353 .post_price_update(price_update, update, encoded_vaa)?
354 .swap_output(());
355 prices.insert(feed_id, price_update);
356 post.try_push(post_price_update)?;
357 close.try_push(pyth.reclaim_rent(&price_update))?;
358 }
359
360 let consume = (consume)(prices).await?;
361 consume_txns.push_many(consume, false)?;
362 Ok(WithPythPrices {
363 post,
364 consume: consume_txns,
365 close,
366 })
367 }
368 }
369
370 fn execute_with_pyth_price_updates<'a, 'exec, T, S>(
372 &'exec self,
373 updates: impl IntoIterator<Item = &'a BinaryPriceUpdate>,
374 execute: &mut T,
375 compute_unit_price_micro_lamports: Option<u64>,
376 skip_preflight: bool,
377 enable_tracing: bool,
378 ) -> impl Future<Output = crate::Result<()>>
379 where
380 C: Deref<Target = S> + Clone + 'exec,
381 S: Signer,
382 T: ExecuteWithPythPrices<'exec, C>,
383 {
384 async move {
385 let mut execution_fee_estiamted = !execute.should_estiamte_execution_fee();
386 let updates = updates.into_iter().collect::<Vec<_>>();
387 let ctx = execute.context().await?;
388 let mut with_prices;
389 loop {
390 with_prices = self
391 .with_pyth_price_updates(&ctx, updates.clone(), |prices| async {
392 let rpcs = execute.build_rpc_with_price_updates(prices).await?;
393 Ok(rpcs)
394 })
395 .await?;
396 if execution_fee_estiamted {
397 break;
398 } else {
399 let execution_fee = with_prices
400 .estimated_execution_fee(compute_unit_price_micro_lamports)
401 .await?;
402 execute.set_execution_fee(execution_fee);
403 tracing::info!(%execution_fee, "execution fee estimated");
404 execution_fee_estiamted = true;
405 }
406 }
407 execute
408 .execute_with_options(
409 with_prices,
410 compute_unit_price_micro_lamports,
411 skip_preflight,
412 enable_tracing,
413 )
414 .await?;
415 Ok(())
416 }
417 }
418}
419
420pub struct PythPullOracle<C> {
422 wormhole: Program<C>,
423 pyth: Program<C>,
424}
425
426impl<S, C> PythPullOracle<C>
427where
428 C: Deref<Target = S> + Clone,
429 S: Signer,
430{
431 pub fn try_new(client: &crate::Client<C>) -> crate::Result<Self> {
433 Ok(Self {
434 wormhole: client.program(WORMHOLE_PROGRAM_ID),
435 pyth: client.program(pyth_solana_receiver_sdk::ID),
436 })
437 }
438}
439
440impl<S, C> PythPullOracleOps<C> for PythPullOracle<C>
441where
442 C: Deref<Target = S> + Clone,
443 S: Signer,
444{
445 fn pyth(&self) -> &Program<C> {
446 &self.pyth
447 }
448
449 fn wormhole(&self) -> &Program<C> {
450 &self.wormhole
451 }
452}
453
454pub trait ExecuteWithPythPrices<'a, C> {
456 fn should_estiamte_execution_fee(&self) -> bool {
458 true
459 }
460
461 fn set_execution_fee(&mut self, lamports: u64);
463
464 fn context(&mut self) -> impl Future<Output = crate::Result<PythPullOracleContext>>;
466
467 fn build_rpc_with_price_updates(
469 &mut self,
470 price_updates: Prices,
471 ) -> impl Future<Output = crate::Result<Vec<TransactionBuilder<'a, C, ()>>>>;
472
473 fn execute_with_options<S>(
475 &mut self,
476 txns: WithPythPrices<C>,
477 compute_unit_price_micro_lamports: Option<u64>,
478 skip_preflight: bool,
479 enable_tracing: bool,
480 ) -> impl Future<Output = crate::Result<()>>
481 where
482 C: Deref<Target = S> + Clone,
483 S: Signer,
484 {
485 async move {
486 match txns
487 .send_all(
488 compute_unit_price_micro_lamports,
489 skip_preflight,
490 enable_tracing,
491 )
492 .await
493 {
494 Ok(signatures) => {
495 if enable_tracing {
496 tracing::info!("executed with txns {signatures:#?}");
497 }
498 Ok(())
499 }
500 Err((signatures, err)) => {
501 if enable_tracing {
502 tracing::error!(%err, "failed to execute, successful txns: {signatures:#?}");
503 }
504 Err(err)
505 }
506 }
507 }
508 }
509
510 fn execute<S>(
512 &mut self,
513 txns: WithPythPrices<C>,
514 skip_preflight: bool,
515 ) -> impl Future<Output = crate::Result<()>>
516 where
517 C: Deref<Target = S> + Clone,
518 S: Signer,
519 {
520 self.execute_with_options(txns, None, skip_preflight, true)
521 }
522}