gmsol/switchboard/pull_oracle/
pull_oracle_impl.rs

1use crate::{
2    store::utils::Feeds,
3    utils::builder::{
4        FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
5    },
6};
7use anchor_client::solana_client::nonblocking::rpc_client::RpcClient;
8use anchor_client::solana_sdk::{pubkey::Pubkey, signer::Signer};
9use anchor_spl::associated_token::get_associated_token_address;
10use base64::prelude::*;
11use gmsol_solana_utils::bundle_builder::BundleOptions;
12use gmsol_store::states::PriceProviderKind;
13use rand::Rng;
14use solana_sdk::{instruction::AccountMeta, system_program};
15use spl_token::{native_mint::ID as NATIVE_MINT, ID as SPL_TOKEN_PROGRAM_ID};
16use std::{
17    collections::HashMap,
18    num::NonZeroUsize,
19    ops::Deref,
20    sync::{Arc, LazyLock},
21};
22use switchboard_on_demand_client::{
23    fetch_and_cache_luts, oracle_job::OracleJob, prost::Message, CrossbarClient, FeedConfig,
24    FetchSignaturesMultiParams, Gateway, MultiSubmission, OracleAccountData, PullFeed,
25    PullFeedAccountData, PullFeedSubmitResponseMany, PullFeedSubmitResponseManyParams,
26    QueueAccountData, SbContext, SlotHashSysvar, State, SWITCHBOARD_ON_DEMAND_PROGRAM_ID,
27};
28use time::OffsetDateTime;
29use tokio::{join, sync::OnceCell};
30
31const DEFAULT_BATCH_SIZE: usize = 5;
32
33cfg_if::cfg_if! {
34    if #[cfg(feature = "devnet")] {
35        /// Switchboard Default Queue.
36        static QUEUE: LazyLock<Pubkey> =
37        LazyLock::new(|| "EYiAmGSdsQTuCw413V5BzaruWuCCSDgTPtBGvLkXHbe7".parse().unwrap());
38    } else {
39        /// Switchboard Default Queue.
40        static QUEUE: LazyLock<Pubkey> =
41        LazyLock::new(|| "A43DyUGA7s8eXPxqEjJY6EBu1KKbNgfxF8h17VAHn13w".parse().unwrap());
42    }
43}
44
45/// Switchboard Pull Oracle Factory.
46#[derive(Debug)]
47pub struct SwitchcboardPullOracleFactory {
48    switchboard: Pubkey,
49    gateways: Vec<Gateway>,
50    crossbar: Option<CrossbarClient>,
51}
52
53impl SwitchcboardPullOracleFactory {
54    /// Gateway Env.
55    pub const ENV_GATEWAY: &str = "SWITCHBOARD_GATEWAY";
56
57    /// Create with gateways.
58    pub fn from_gateways(gateways: Vec<Gateway>) -> crate::Result<Self> {
59        if gateways.is_empty() {
60            return Err(crate::Error::switchboard_error("empty gateway list"));
61        }
62        Ok(Self {
63            switchboard: Pubkey::new_from_array(SWITCHBOARD_ON_DEMAND_PROGRAM_ID.to_bytes()),
64            gateways,
65            crossbar: None,
66        })
67    }
68
69    /// Create a new factory from the given gateway url.
70    pub fn new(gateway_url: &str) -> Self {
71        Self::from_gateways(vec![Gateway::new(gateway_url.to_string())]).expect("must success")
72    }
73
74    /// Create from env.
75    pub fn from_env() -> crate::Result<Self> {
76        use std::env;
77
78        let gateway_url = env::var(Self::ENV_GATEWAY).map_err(|_| {
79            crate::Error::invalid_argument(format!("{} is not set", Self::ENV_GATEWAY))
80        })?;
81
82        Ok(Self::new(&gateway_url))
83    }
84
85    /// Create from default queue.
86    pub async fn from_default_queue(client: &RpcClient) -> crate::Result<Self> {
87        Self::from_queue(client, &QUEUE).await
88    }
89
90    /// Create from queue.
91    pub async fn from_queue(client: &RpcClient, queue: &Pubkey) -> crate::Result<Self> {
92        let queue = QueueAccountData::load(client, queue).await.map_err(|err| {
93            crate::Error::switchboard_error(format!("loading queue data error: {err}"))
94        })?;
95        let gateways = queue.fetch_gateways(client).await.map_err(|err| {
96            crate::Error::switchboard_error(format!("fetching gateways error: {err}"))
97        })?;
98        tracing::debug!("loaded {} gateways", gateways.len());
99
100        Self::from_gateways(gateways)
101    }
102
103    /// Get the total number of the gateways.
104    pub fn num_gateways(&self) -> usize {
105        self.gateways.len()
106    }
107
108    /// Make an oracle with the gateway index.
109    pub fn make_oracle_with_gateway_index<'a, C: Deref<Target = impl Signer> + Clone>(
110        &'a self,
111        gmsol: &'a crate::Client<C>,
112        gateway_index: usize,
113    ) -> Option<SwitchboardPullOracle<'a, C>> {
114        let gateway = self.gateways.get(gateway_index)?;
115        tracing::debug!("using gateway: {gateway:?}");
116        Some(SwitchboardPullOracle::from_parts(
117            gmsol,
118            self.switchboard,
119            gateway,
120            self.crossbar.clone(),
121        ))
122    }
123
124    /// Make an oracle with the given rng.
125    pub fn make_oracle_with_rng<'a, C: Deref<Target = impl Signer> + Clone>(
126        &'a self,
127        gmsol: &'a crate::Client<C>,
128        rng: &mut impl Rng,
129    ) -> SwitchboardPullOracle<'a, C> {
130        let index = rng.gen_range(0, self.num_gateways());
131        self.make_oracle_with_gateway_index(gmsol, index)
132            .expect("must success")
133    }
134
135    /// Make an oracle.
136    pub fn make_oracle<'a, C: Deref<Target = impl Signer> + Clone>(
137        &'a self,
138        gmsol: &'a crate::Client<C>,
139    ) -> SwitchboardPullOracle<'a, C> {
140        let mut rng = rand::thread_rng();
141        self.make_oracle_with_rng(gmsol, &mut rng)
142    }
143}
144
145/// Switchboard Pull Oracle.
146pub struct SwitchboardPullOracle<'a, C> {
147    gmsol: &'a crate::Client<C>,
148    switchboard: Pubkey,
149    ctx: Arc<SbContext>,
150    client: RpcClient,
151    gateway: &'a Gateway,
152    crossbar: Option<CrossbarClient>,
153    batch_size: usize,
154}
155
156impl<'a, C: Deref<Target = impl Signer> + Clone> SwitchboardPullOracle<'a, C> {
157    /// Create from parts.
158    pub fn from_parts(
159        gmsol: &'a crate::Client<C>,
160        switchboard: Pubkey,
161        gateway: &'a Gateway,
162        crossbar: Option<CrossbarClient>,
163    ) -> Self {
164        Self {
165            gmsol,
166            switchboard,
167            client: gmsol.store_program().rpc(),
168            ctx: SbContext::new(),
169            gateway,
170            crossbar,
171            batch_size: DEFAULT_BATCH_SIZE,
172        }
173    }
174
175    /// Set batch size.
176    pub fn set_batch_size(&mut self, batch_size: NonZeroUsize) -> &mut Self {
177        self.batch_size = batch_size.get();
178        self
179    }
180}
181
182/// Swtichboard Price Updates type.
183pub struct SbPriceUpdates {
184    /// The list of feed pubkeys for which prices were fetched.
185    pub feeds: Vec<Pubkey>,
186    /// The list of price submissions from each oracles.
187    pub price_submissions: Vec<MultiSubmission>,
188    /// The slot number for which the price updates were signed with the slothash.
189    pub slot: u64,
190    /// The queue (network) to which all feeds are owned.
191    /// This will always be the same unless the network of oracles you use is non-standard.
192    pub queue: Pubkey,
193    /// The list of oracle pubkeys that signed the price updates.
194    pub oracle_keys: Vec<Pubkey>,
195}
196
197impl<C: Deref<Target = impl Signer> + Clone> PullOracle for SwitchboardPullOracle<'_, C> {
198    type PriceUpdates = Vec<SbPriceUpdates>;
199
200    async fn fetch_price_updates(
201        &self,
202        feed_ids: &FeedIds,
203        after: Option<OffsetDateTime>,
204    ) -> crate::Result<Self::PriceUpdates> {
205        let feeds = filter_switchboard_feed_ids(feed_ids)?;
206
207        if feeds.is_empty() {
208            return Ok(vec![]);
209        }
210
211        let mut updates = Vec::new();
212
213        for feeds in feeds.chunks(self.batch_size) {
214            let mut num_signatures = 3;
215            let mut feed_configs = Vec::new();
216            let mut queue = Pubkey::default();
217
218            for feed in feeds {
219                tracing::trace!(%feed, "fetching feed data");
220                let data = *self
221                    .ctx
222                    .pull_feed_cache
223                    .entry(*feed)
224                    .or_insert_with(OnceCell::new)
225                    .get_or_try_init(|| PullFeed::load_data(&self.client, feed))
226                    .await
227                    .map_err(|_| crate::Error::switchboard_error("fetching job data failed"))?;
228                tracing::trace!(%feed, ?data, "fechted feed data");
229                let jobs = data
230                    .fetch_jobs(&self.crossbar.clone().unwrap_or_default())
231                    .await
232                    .map_err(|_| crate::Error::switchboard_error("fetching job data failed"))?;
233                tracing::trace!(%feed, ?jobs, "fetched jobs");
234                let encoded_jobs = encode_jobs(&jobs);
235                let max_variance = (data.max_variance / 1_000_000_000) as u32;
236                let min_responses = data.min_responses;
237                if min_responses >= num_signatures {
238                    num_signatures = min_responses + 1;
239                }
240                let feed_config = FeedConfig {
241                    encoded_jobs,
242                    max_variance: Some(max_variance),
243                    min_responses: Some(min_responses),
244                };
245                feed_configs.push(feed_config);
246                queue = data.queue;
247            }
248            let slothash = SlotHashSysvar::get_latest_slothash(&self.client)
249                .await
250                .map_err(|_| crate::Error::switchboard_error("fetching slot hash failed"))?;
251            let price_signatures = self
252                .gateway
253                .fetch_signatures_multi(FetchSignaturesMultiParams {
254                    recent_hash: Some(slothash.to_base58_hash()),
255                    num_signatures: Some(num_signatures),
256                    feed_configs,
257                    use_timestamp: Some(true),
258                })
259                .await
260                .map_err(|_| crate::Error::switchboard_error("fetching signatures failed"))?;
261            tracing::trace!("fetched price signatures: {price_signatures:#?}");
262
263            let mut all_submissions: Vec<MultiSubmission> = Vec::new();
264            let mut oracle_keys = Vec::new();
265            for resp in &price_signatures.oracle_responses {
266                all_submissions.push(MultiSubmission {
267                    values: resp
268                        .feed_responses
269                        .iter()
270                        .map(|x| {
271                            if let Some(after) = after {
272                                let Some(ts) = x.timestamp else {
273                                    return Err(crate::Error::switchboard_error(
274                                        "missing timestamp of the feed result",
275                                    ))?;
276                                };
277                                let ts = OffsetDateTime::from_unix_timestamp(ts)
278                                    .map_err(crate::Error::switchboard_error)?;
279                                if ts < after {
280                                    return Err(crate::Error::switchboard_error(
281                                        "feed result is too old, ts={ts}, required={after}",
282                                    ));
283                                }
284                            }
285                            Ok(x.success_value.parse().unwrap_or(i128::MAX))
286                        })
287                        .collect::<crate::Result<Vec<_>>>()?,
288                    signature: BASE64_STANDARD
289                        .decode(resp.signature.clone())
290                        .map_err(|_| crate::Error::switchboard_error("base64:decode failure"))?
291                        .try_into()
292                        .map_err(|_| crate::Error::switchboard_error("signature:decode failure"))?,
293                    recovery_id: resp.recovery_id as u8,
294                });
295                let oracle_key = hex::decode(
296                    &resp
297                        .feed_responses
298                        .first()
299                        .ok_or_else(|| crate::Error::switchboard_error("empty response"))?
300                        .oracle_pubkey,
301                )
302                .map_err(|_| crate::Error::switchboard_error("hex:decode failure"))?
303                .try_into()
304                .map_err(|_| crate::Error::switchboard_error("pubkey:decode failure"))?;
305                let oracle_key = Pubkey::new_from_array(oracle_key);
306                oracle_keys.push(oracle_key);
307            }
308
309            updates.push(SbPriceUpdates {
310                feeds: feeds.to_vec(),
311                price_submissions: all_submissions,
312                slot: slothash.slot,
313                queue,
314                oracle_keys,
315            });
316        }
317
318        Ok(updates)
319    }
320}
321
322impl<'a, C: Clone + Deref<Target = impl Signer>> PostPullOraclePrices<'a, C>
323    for SwitchboardPullOracle<'a, C>
324{
325    async fn fetch_price_update_instructions(
326        &self,
327        price_updates: &Self::PriceUpdates,
328        options: BundleOptions,
329    ) -> crate::Result<(
330        PriceUpdateInstructions<'a, C>,
331        HashMap<PriceProviderKind, FeedAddressMap>,
332    )> {
333        let mut ixns = PriceUpdateInstructions::new(self.gmsol, options);
334        let mut prices = HashMap::default();
335        for update in price_updates {
336            let feeds = &update.feeds;
337            let price_signatures = &update.price_submissions;
338            let queue = update.queue;
339            let oracle_keys = &update.oracle_keys;
340
341            let queue_key = [queue];
342            let (oracle_luts_result, pull_feed_luts_result, queue_lut_result) = join!(
343                fetch_and_cache_luts::<OracleAccountData>(
344                    &self.client,
345                    self.ctx.clone(),
346                    oracle_keys
347                ),
348                fetch_and_cache_luts::<PullFeedAccountData>(&self.client, self.ctx.clone(), feeds),
349                fetch_and_cache_luts::<QueueAccountData>(
350                    &self.client,
351                    self.ctx.clone(),
352                    &queue_key
353                )
354            );
355
356            let oracle_luts = oracle_luts_result
357                .map_err(|_| crate::Error::switchboard_error("fetching oracle luts failed"))?;
358            let pull_feed_luts = pull_feed_luts_result
359                .map_err(|_| crate::Error::switchboard_error("fetching pull feed luts failed"))?;
360            let queue_lut = queue_lut_result
361                .map_err(|_| crate::Error::switchboard_error("fetching queue lut failed"))?;
362
363            let mut luts = oracle_luts;
364            luts.extend(pull_feed_luts);
365            luts.extend(queue_lut);
366
367            let payer = self.gmsol.payer();
368
369            prices.extend(feeds.iter().map(|feed| (*feed, *feed)));
370
371            let ix_data = PullFeedSubmitResponseManyParams {
372                slot: update.slot,
373                submissions: price_signatures.clone(),
374            };
375
376            let feeds = feeds.iter().map(|pubkey| AccountMeta::new(*pubkey, false));
377            let oracles_and_stats = oracle_keys.iter().flat_map(|oracle| {
378                let stats_key = OracleAccountData::stats_key(oracle);
379                [
380                    AccountMeta::new_readonly(*oracle, false),
381                    AccountMeta::new(stats_key, false),
382                ]
383            });
384            let ix = self
385                .gmsol
386                .store_transaction()
387                .program(self.switchboard)
388                .args(ix_data.data())
389                .accounts(
390                    PullFeedSubmitResponseMany {
391                        queue,
392                        program_state: State::key(),
393                        recent_slothashes: solana_sdk::sysvar::slot_hashes::ID,
394                        payer,
395                        system_program: system_program::ID,
396                        reward_vault: get_associated_token_address(&queue, &NATIVE_MINT),
397                        token_program: SPL_TOKEN_PROGRAM_ID,
398                        token_mint: NATIVE_MINT,
399                    }
400                    .to_account_metas(None),
401                )
402                .accounts(feeds.chain(oracles_and_stats).collect())
403                .lookup_tables(
404                    luts.clone()
405                        .into_iter()
406                        .map(|x| (x.key, x.addresses.clone())),
407                );
408            ixns.try_push_post(ix)?;
409        }
410
411        Ok((
412            ixns,
413            HashMap::from([(PriceProviderKind::Switchboard, prices)]),
414        ))
415    }
416}
417
418fn encode_jobs(job_array: &[OracleJob]) -> Vec<String> {
419    job_array
420        .iter()
421        .map(|job| BASE64_STANDARD.encode(job.encode_length_delimited_to_vec()))
422        .collect()
423}
424
425fn filter_switchboard_feed_ids(feed_ids: &FeedIds) -> crate::Result<Vec<Pubkey>> {
426    Feeds::new(feed_ids)
427        .filter_map(|res| {
428            res.map(|config| {
429                matches!(config.provider, PriceProviderKind::Switchboard).then_some(config.feed)
430            })
431            .transpose()
432        })
433        .collect()
434}