1use crate::{
2 store::utils::Feeds,
3 utils::builder::{
4 FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
5 },
6};
7use anchor_client::solana_client::nonblocking::rpc_client::RpcClient;
8use anchor_client::solana_sdk::{pubkey::Pubkey, signer::Signer};
9use anchor_spl::associated_token::get_associated_token_address;
10use base64::prelude::*;
11use gmsol_solana_utils::bundle_builder::BundleOptions;
12use gmsol_store::states::PriceProviderKind;
13use rand::Rng;
14use solana_sdk::{instruction::AccountMeta, system_program};
15use spl_token::{native_mint::ID as NATIVE_MINT, ID as SPL_TOKEN_PROGRAM_ID};
16use std::{
17 collections::HashMap,
18 num::NonZeroUsize,
19 ops::Deref,
20 sync::{Arc, LazyLock},
21};
22use switchboard_on_demand_client::{
23 fetch_and_cache_luts, oracle_job::OracleJob, prost::Message, CrossbarClient, FeedConfig,
24 FetchSignaturesMultiParams, Gateway, MultiSubmission, OracleAccountData, PullFeed,
25 PullFeedAccountData, PullFeedSubmitResponseMany, PullFeedSubmitResponseManyParams,
26 QueueAccountData, SbContext, SlotHashSysvar, State, SWITCHBOARD_ON_DEMAND_PROGRAM_ID,
27};
28use time::OffsetDateTime;
29use tokio::{join, sync::OnceCell};
30
31const DEFAULT_BATCH_SIZE: usize = 5;
32
33cfg_if::cfg_if! {
34 if #[cfg(feature = "devnet")] {
35 static QUEUE: LazyLock<Pubkey> =
37 LazyLock::new(|| "EYiAmGSdsQTuCw413V5BzaruWuCCSDgTPtBGvLkXHbe7".parse().unwrap());
38 } else {
39 static QUEUE: LazyLock<Pubkey> =
41 LazyLock::new(|| "A43DyUGA7s8eXPxqEjJY6EBu1KKbNgfxF8h17VAHn13w".parse().unwrap());
42 }
43}
44
45#[derive(Debug)]
47pub struct SwitchcboardPullOracleFactory {
48 switchboard: Pubkey,
49 gateways: Vec<Gateway>,
50 crossbar: Option<CrossbarClient>,
51}
52
53impl SwitchcboardPullOracleFactory {
54 pub const ENV_GATEWAY: &str = "SWITCHBOARD_GATEWAY";
56
57 pub fn from_gateways(gateways: Vec<Gateway>) -> crate::Result<Self> {
59 if gateways.is_empty() {
60 return Err(crate::Error::switchboard_error("empty gateway list"));
61 }
62 Ok(Self {
63 switchboard: Pubkey::new_from_array(SWITCHBOARD_ON_DEMAND_PROGRAM_ID.to_bytes()),
64 gateways,
65 crossbar: None,
66 })
67 }
68
69 pub fn new(gateway_url: &str) -> Self {
71 Self::from_gateways(vec![Gateway::new(gateway_url.to_string())]).expect("must success")
72 }
73
74 pub fn from_env() -> crate::Result<Self> {
76 use std::env;
77
78 let gateway_url = env::var(Self::ENV_GATEWAY).map_err(|_| {
79 crate::Error::invalid_argument(format!("{} is not set", Self::ENV_GATEWAY))
80 })?;
81
82 Ok(Self::new(&gateway_url))
83 }
84
85 pub async fn from_default_queue(client: &RpcClient) -> crate::Result<Self> {
87 Self::from_queue(client, &QUEUE).await
88 }
89
90 pub async fn from_queue(client: &RpcClient, queue: &Pubkey) -> crate::Result<Self> {
92 let queue = QueueAccountData::load(client, queue).await.map_err(|err| {
93 crate::Error::switchboard_error(format!("loading queue data error: {err}"))
94 })?;
95 let gateways = queue.fetch_gateways(client).await.map_err(|err| {
96 crate::Error::switchboard_error(format!("fetching gateways error: {err}"))
97 })?;
98 tracing::debug!("loaded {} gateways", gateways.len());
99
100 Self::from_gateways(gateways)
101 }
102
103 pub fn num_gateways(&self) -> usize {
105 self.gateways.len()
106 }
107
108 pub fn make_oracle_with_gateway_index<'a, C: Deref<Target = impl Signer> + Clone>(
110 &'a self,
111 gmsol: &'a crate::Client<C>,
112 gateway_index: usize,
113 ) -> Option<SwitchboardPullOracle<'a, C>> {
114 let gateway = self.gateways.get(gateway_index)?;
115 tracing::debug!("using gateway: {gateway:?}");
116 Some(SwitchboardPullOracle::from_parts(
117 gmsol,
118 self.switchboard,
119 gateway,
120 self.crossbar.clone(),
121 ))
122 }
123
124 pub fn make_oracle_with_rng<'a, C: Deref<Target = impl Signer> + Clone>(
126 &'a self,
127 gmsol: &'a crate::Client<C>,
128 rng: &mut impl Rng,
129 ) -> SwitchboardPullOracle<'a, C> {
130 let index = rng.gen_range(0, self.num_gateways());
131 self.make_oracle_with_gateway_index(gmsol, index)
132 .expect("must success")
133 }
134
135 pub fn make_oracle<'a, C: Deref<Target = impl Signer> + Clone>(
137 &'a self,
138 gmsol: &'a crate::Client<C>,
139 ) -> SwitchboardPullOracle<'a, C> {
140 let mut rng = rand::thread_rng();
141 self.make_oracle_with_rng(gmsol, &mut rng)
142 }
143}
144
145pub struct SwitchboardPullOracle<'a, C> {
147 gmsol: &'a crate::Client<C>,
148 switchboard: Pubkey,
149 ctx: Arc<SbContext>,
150 client: RpcClient,
151 gateway: &'a Gateway,
152 crossbar: Option<CrossbarClient>,
153 batch_size: usize,
154}
155
156impl<'a, C: Deref<Target = impl Signer> + Clone> SwitchboardPullOracle<'a, C> {
157 pub fn from_parts(
159 gmsol: &'a crate::Client<C>,
160 switchboard: Pubkey,
161 gateway: &'a Gateway,
162 crossbar: Option<CrossbarClient>,
163 ) -> Self {
164 Self {
165 gmsol,
166 switchboard,
167 client: gmsol.store_program().rpc(),
168 ctx: SbContext::new(),
169 gateway,
170 crossbar,
171 batch_size: DEFAULT_BATCH_SIZE,
172 }
173 }
174
175 pub fn set_batch_size(&mut self, batch_size: NonZeroUsize) -> &mut Self {
177 self.batch_size = batch_size.get();
178 self
179 }
180}
181
182pub struct SbPriceUpdates {
184 pub feeds: Vec<Pubkey>,
186 pub price_submissions: Vec<MultiSubmission>,
188 pub slot: u64,
190 pub queue: Pubkey,
193 pub oracle_keys: Vec<Pubkey>,
195}
196
197impl<C: Deref<Target = impl Signer> + Clone> PullOracle for SwitchboardPullOracle<'_, C> {
198 type PriceUpdates = Vec<SbPriceUpdates>;
199
200 async fn fetch_price_updates(
201 &self,
202 feed_ids: &FeedIds,
203 after: Option<OffsetDateTime>,
204 ) -> crate::Result<Self::PriceUpdates> {
205 let feeds = filter_switchboard_feed_ids(feed_ids)?;
206
207 if feeds.is_empty() {
208 return Ok(vec![]);
209 }
210
211 let mut updates = Vec::new();
212
213 for feeds in feeds.chunks(self.batch_size) {
214 let mut num_signatures = 3;
215 let mut feed_configs = Vec::new();
216 let mut queue = Pubkey::default();
217
218 for feed in feeds {
219 tracing::trace!(%feed, "fetching feed data");
220 let data = *self
221 .ctx
222 .pull_feed_cache
223 .entry(*feed)
224 .or_insert_with(OnceCell::new)
225 .get_or_try_init(|| PullFeed::load_data(&self.client, feed))
226 .await
227 .map_err(|_| crate::Error::switchboard_error("fetching job data failed"))?;
228 tracing::trace!(%feed, ?data, "fechted feed data");
229 let jobs = data
230 .fetch_jobs(&self.crossbar.clone().unwrap_or_default())
231 .await
232 .map_err(|_| crate::Error::switchboard_error("fetching job data failed"))?;
233 tracing::trace!(%feed, ?jobs, "fetched jobs");
234 let encoded_jobs = encode_jobs(&jobs);
235 let max_variance = (data.max_variance / 1_000_000_000) as u32;
236 let min_responses = data.min_responses;
237 if min_responses >= num_signatures {
238 num_signatures = min_responses + 1;
239 }
240 let feed_config = FeedConfig {
241 encoded_jobs,
242 max_variance: Some(max_variance),
243 min_responses: Some(min_responses),
244 };
245 feed_configs.push(feed_config);
246 queue = data.queue;
247 }
248 let slothash = SlotHashSysvar::get_latest_slothash(&self.client)
249 .await
250 .map_err(|_| crate::Error::switchboard_error("fetching slot hash failed"))?;
251 let price_signatures = self
252 .gateway
253 .fetch_signatures_multi(FetchSignaturesMultiParams {
254 recent_hash: Some(slothash.to_base58_hash()),
255 num_signatures: Some(num_signatures),
256 feed_configs,
257 use_timestamp: Some(true),
258 })
259 .await
260 .map_err(|_| crate::Error::switchboard_error("fetching signatures failed"))?;
261 tracing::trace!("fetched price signatures: {price_signatures:#?}");
262
263 let mut all_submissions: Vec<MultiSubmission> = Vec::new();
264 let mut oracle_keys = Vec::new();
265 for resp in &price_signatures.oracle_responses {
266 all_submissions.push(MultiSubmission {
267 values: resp
268 .feed_responses
269 .iter()
270 .map(|x| {
271 if let Some(after) = after {
272 let Some(ts) = x.timestamp else {
273 return Err(crate::Error::switchboard_error(
274 "missing timestamp of the feed result",
275 ))?;
276 };
277 let ts = OffsetDateTime::from_unix_timestamp(ts)
278 .map_err(crate::Error::switchboard_error)?;
279 if ts < after {
280 return Err(crate::Error::switchboard_error(
281 "feed result is too old, ts={ts}, required={after}",
282 ));
283 }
284 }
285 Ok(x.success_value.parse().unwrap_or(i128::MAX))
286 })
287 .collect::<crate::Result<Vec<_>>>()?,
288 signature: BASE64_STANDARD
289 .decode(resp.signature.clone())
290 .map_err(|_| crate::Error::switchboard_error("base64:decode failure"))?
291 .try_into()
292 .map_err(|_| crate::Error::switchboard_error("signature:decode failure"))?,
293 recovery_id: resp.recovery_id as u8,
294 });
295 let oracle_key = hex::decode(
296 &resp
297 .feed_responses
298 .first()
299 .ok_or_else(|| crate::Error::switchboard_error("empty response"))?
300 .oracle_pubkey,
301 )
302 .map_err(|_| crate::Error::switchboard_error("hex:decode failure"))?
303 .try_into()
304 .map_err(|_| crate::Error::switchboard_error("pubkey:decode failure"))?;
305 let oracle_key = Pubkey::new_from_array(oracle_key);
306 oracle_keys.push(oracle_key);
307 }
308
309 updates.push(SbPriceUpdates {
310 feeds: feeds.to_vec(),
311 price_submissions: all_submissions,
312 slot: slothash.slot,
313 queue,
314 oracle_keys,
315 });
316 }
317
318 Ok(updates)
319 }
320}
321
322impl<'a, C: Clone + Deref<Target = impl Signer>> PostPullOraclePrices<'a, C>
323 for SwitchboardPullOracle<'a, C>
324{
325 async fn fetch_price_update_instructions(
326 &self,
327 price_updates: &Self::PriceUpdates,
328 options: BundleOptions,
329 ) -> crate::Result<(
330 PriceUpdateInstructions<'a, C>,
331 HashMap<PriceProviderKind, FeedAddressMap>,
332 )> {
333 let mut ixns = PriceUpdateInstructions::new(self.gmsol, options);
334 let mut prices = HashMap::default();
335 for update in price_updates {
336 let feeds = &update.feeds;
337 let price_signatures = &update.price_submissions;
338 let queue = update.queue;
339 let oracle_keys = &update.oracle_keys;
340
341 let queue_key = [queue];
342 let (oracle_luts_result, pull_feed_luts_result, queue_lut_result) = join!(
343 fetch_and_cache_luts::<OracleAccountData>(
344 &self.client,
345 self.ctx.clone(),
346 oracle_keys
347 ),
348 fetch_and_cache_luts::<PullFeedAccountData>(&self.client, self.ctx.clone(), feeds),
349 fetch_and_cache_luts::<QueueAccountData>(
350 &self.client,
351 self.ctx.clone(),
352 &queue_key
353 )
354 );
355
356 let oracle_luts = oracle_luts_result
357 .map_err(|_| crate::Error::switchboard_error("fetching oracle luts failed"))?;
358 let pull_feed_luts = pull_feed_luts_result
359 .map_err(|_| crate::Error::switchboard_error("fetching pull feed luts failed"))?;
360 let queue_lut = queue_lut_result
361 .map_err(|_| crate::Error::switchboard_error("fetching queue lut failed"))?;
362
363 let mut luts = oracle_luts;
364 luts.extend(pull_feed_luts);
365 luts.extend(queue_lut);
366
367 let payer = self.gmsol.payer();
368
369 prices.extend(feeds.iter().map(|feed| (*feed, *feed)));
370
371 let ix_data = PullFeedSubmitResponseManyParams {
372 slot: update.slot,
373 submissions: price_signatures.clone(),
374 };
375
376 let feeds = feeds.iter().map(|pubkey| AccountMeta::new(*pubkey, false));
377 let oracles_and_stats = oracle_keys.iter().flat_map(|oracle| {
378 let stats_key = OracleAccountData::stats_key(oracle);
379 [
380 AccountMeta::new_readonly(*oracle, false),
381 AccountMeta::new(stats_key, false),
382 ]
383 });
384 let ix = self
385 .gmsol
386 .store_transaction()
387 .program(self.switchboard)
388 .args(ix_data.data())
389 .accounts(
390 PullFeedSubmitResponseMany {
391 queue,
392 program_state: State::key(),
393 recent_slothashes: solana_sdk::sysvar::slot_hashes::ID,
394 payer,
395 system_program: system_program::ID,
396 reward_vault: get_associated_token_address(&queue, &NATIVE_MINT),
397 token_program: SPL_TOKEN_PROGRAM_ID,
398 token_mint: NATIVE_MINT,
399 }
400 .to_account_metas(None),
401 )
402 .accounts(feeds.chain(oracles_and_stats).collect())
403 .lookup_tables(
404 luts.clone()
405 .into_iter()
406 .map(|x| (x.key, x.addresses.clone())),
407 );
408 ixns.try_push_post(ix)?;
409 }
410
411 Ok((
412 ixns,
413 HashMap::from([(PriceProviderKind::Switchboard, prices)]),
414 ))
415 }
416}
417
418fn encode_jobs(job_array: &[OracleJob]) -> Vec<String> {
419 job_array
420 .iter()
421 .map(|job| BASE64_STANDARD.encode(job.encode_length_delimited_to_vec()))
422 .collect()
423}
424
425fn filter_switchboard_feed_ids(feed_ids: &FeedIds) -> crate::Result<Vec<Pubkey>> {
426 Feeds::new(feed_ids)
427 .filter_map(|res| {
428 res.map(|config| {
429 matches!(config.provider, PriceProviderKind::Switchboard).then_some(config.feed)
430 })
431 .transpose()
432 })
433 .collect()
434}