gmsol/chainlink/pull_oracle/
pull_oracle_impl.rs

1use std::{
2    collections::HashMap,
3    ops::Deref,
4    sync::{Arc, RwLock},
5};
6
7use anchor_client::solana_sdk::{pubkey::Pubkey, signer::Signer};
8use gmsol_solana_utils::bundle_builder::{BundleBuilder, BundleOptions};
9use gmsol_store::states::PriceProviderKind;
10use time::OffsetDateTime;
11
12use crate::{
13    store::{oracle::OracleOps, utils::Feeds},
14    utils::builder::{
15        FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
16    },
17};
18
19use super::{client::ApiReportData, Client, FeedId};
20
21/// Chainlink Pull Oracle Factory.
22pub struct ChainlinkPullOracleFactory {
23    chainlink_program: Pubkey,
24    access_controller: Pubkey,
25    store: Pubkey,
26    feed_index: u16,
27    feeds: RwLock<FeedAddressMap>,
28}
29
30impl ChainlinkPullOracleFactory {
31    /// Create a new [`ChainlinkPullOracleFactory`] with default program ID and access controller address.
32    pub fn new(store: &Pubkey, feed_index: u16) -> Self {
33        use gmsol_chainlink_datastreams::verifier;
34
35        Self::with_program_id_and_access_controller(
36            store,
37            feed_index,
38            &verifier::ID,
39            &super::access_controller_address::ID,
40        )
41    }
42
43    /// Wrap in an [`Arc`].
44    pub fn arced(self) -> Arc<Self> {
45        Arc::new(self)
46    }
47
48    /// Create a new [`ChainlinkPullOracleFactory`] with the given program ID and access controller address.
49    pub fn with_program_id_and_access_controller(
50        store: &Pubkey,
51        feed_index: u16,
52        chainlink_program: &Pubkey,
53        access_controller: &Pubkey,
54    ) -> Self {
55        Self {
56            chainlink_program: *chainlink_program,
57            access_controller: *access_controller,
58            store: *store,
59            feed_index,
60            feeds: Default::default(),
61        }
62    }
63
64    /// Prepare feed accounts but do not send.
65    pub async fn prepare_feeds_bundle<'a, C: Deref<Target = impl Signer> + Clone>(
66        &self,
67        gmsol: &'a crate::Client<C>,
68        feed_ids: HashMap<Pubkey, FeedId>,
69        options: BundleOptions,
70    ) -> crate::Result<BundleBuilder<'a, C>> {
71        let provider = PriceProviderKind::ChainlinkDataStreams;
72        let mut txs = gmsol.bundle_with_options(options);
73        let authority = gmsol.payer();
74        for (token, feed_id) in feed_ids {
75            let address = gmsol.find_price_feed_address(
76                &self.store,
77                &authority,
78                self.feed_index,
79                provider,
80                &token,
81            );
82            let feed_id = Pubkey::new_from_array(feed_id);
83            match gmsol.price_feed(&address).await? {
84                Some(feed) => {
85                    if *feed.feed_id() != feed_id {
86                        return Err(crate::Error::invalid_argument("feed_id mismatched"));
87                    }
88                }
89                None => {
90                    txs.push(
91                        gmsol
92                            .initialize_price_feed(
93                                &self.store,
94                                self.feed_index,
95                                provider,
96                                &token,
97                                &feed_id,
98                            )
99                            .0,
100                    )?;
101                }
102            }
103            self.feeds.write().unwrap().insert(feed_id, address);
104        }
105
106        let feeds = self
107            .feeds
108            .read()
109            .unwrap()
110            .values()
111            .copied()
112            .collect::<Vec<_>>();
113
114        tracing::info!("Using custom feeds: {feeds:#?}");
115
116        Ok(txs)
117    }
118
119    /// Prepare feed accounts for the given tokens and feed_ids.
120    pub async fn prepare_feeds<C: Deref<Target = impl Signer> + Clone>(
121        &self,
122        gmsol: &crate::Client<C>,
123        feed_ids: HashMap<Pubkey, FeedId>,
124    ) -> crate::Result<()> {
125        let txs = self
126            .prepare_feeds_bundle(gmsol, feed_ids, Default::default())
127            .await?;
128
129        if !txs.is_empty() {
130            match txs.send_all(false).await {
131                Ok(signatures) => {
132                    tracing::info!("initialized feeds with txs: {signatures:#?}");
133                }
134                Err((signatures, err)) => {
135                    tracing::error!(%err, "failed to initailize feeds, successful txs: {signatures:#?}");
136                }
137            }
138        }
139
140        Ok(())
141    }
142
143    /// Create [`ChainlinkPullOracle`].
144    pub fn make_oracle<'a, C>(
145        self: Arc<Self>,
146        chainlink: &'a Client,
147        gmsol: &'a crate::Client<C>,
148        skip_feeds_preparation: bool,
149    ) -> ChainlinkPullOracle<'a, C> {
150        ChainlinkPullOracle::new(chainlink, gmsol, self, skip_feeds_preparation)
151    }
152}
153
154/// Chainlink Pull Oracle.
155pub struct ChainlinkPullOracle<'a, C> {
156    chainlink: &'a Client,
157    gmsol: &'a crate::Client<C>,
158    ctx: Arc<ChainlinkPullOracleFactory>,
159    skip_feeds_preparation: bool,
160}
161
162impl<C> Clone for ChainlinkPullOracle<'_, C> {
163    fn clone(&self) -> Self {
164        Self {
165            ctx: self.ctx.clone(),
166            ..*self
167        }
168    }
169}
170
171impl<'a, C> ChainlinkPullOracle<'a, C> {
172    /// Create a new [`ChainlinkPullOracle`] with default program ID and access controller address.
173    pub fn new(
174        chainlink: &'a Client,
175        gmsol: &'a crate::Client<C>,
176        ctx: Arc<ChainlinkPullOracleFactory>,
177        skip_feeds_preparation: bool,
178    ) -> Self {
179        Self {
180            chainlink,
181            gmsol,
182            ctx,
183            skip_feeds_preparation,
184        }
185    }
186}
187
188impl<C: Deref<Target = impl Signer> + Clone> ChainlinkPullOracle<'_, C> {
189    /// Prepare feed accounts but do not send.
190    pub async fn prepare_feeds_bundle(
191        &self,
192        feed_ids: &FeedIds,
193        options: BundleOptions,
194    ) -> crate::Result<BundleBuilder<C>> {
195        self.ctx
196            .prepare_feeds_bundle(self.gmsol, filter_feed_ids(feed_ids)?, options)
197            .await
198    }
199}
200
201impl<C: Deref<Target = impl Signer> + Clone> PullOracle for ChainlinkPullOracle<'_, C> {
202    type PriceUpdates = HashMap<FeedId, ApiReportData>;
203
204    async fn fetch_price_updates(
205        &self,
206        feed_ids: &FeedIds,
207        after: Option<OffsetDateTime>,
208    ) -> crate::Result<Self::PriceUpdates> {
209        let feeds = filter_feed_ids(feed_ids)?;
210
211        let feed_ids = feeds.values().map(hex::encode).collect::<Vec<_>>();
212
213        if !self.skip_feeds_preparation {
214            self.ctx.prepare_feeds(self.gmsol, feeds).await?;
215        }
216
217        let tasks = feed_ids
218            .iter()
219            .map(|feed_id| self.chainlink.latest_report(feed_id));
220        let price_updates = futures_util::future::try_join_all(tasks).await?;
221
222        let updates = price_updates
223            .into_iter()
224            .map(|report| {
225                let feed_id = report.decode_feed_id()?;
226                let ts = report.observations_timestamp;
227
228                if let Some(after) = after {
229                    let ts = OffsetDateTime::from_unix_timestamp(ts)
230                        .map_err(crate::Error::invalid_argument)?;
231                    if after > ts {
232                        return Err(crate::Error::invalid_argument(format!(
233                            "price updates are too old, ts={ts}, required={after}"
234                        )));
235                    }
236                }
237
238                Ok((feed_id, report.into_data()))
239            })
240            .collect::<crate::Result<HashMap<_, _>>>()?;
241
242        Ok(updates)
243    }
244}
245
246impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
247    for ChainlinkPullOracle<'a, C>
248{
249    async fn fetch_price_update_instructions(
250        &self,
251        price_updates: &Self::PriceUpdates,
252        options: BundleOptions,
253    ) -> crate::Result<(
254        PriceUpdateInstructions<'a, C>,
255        HashMap<PriceProviderKind, FeedAddressMap>,
256    )> {
257        let mut txs = PriceUpdateInstructions::new(self.gmsol, options);
258        let mut map = HashMap::with_capacity(price_updates.len());
259
260        let feeds = self.ctx.feeds.read().unwrap();
261        for (feed_id, update) in price_updates {
262            let feed_id = Pubkey::new_from_array(*feed_id);
263            tracing::info!("adding ix to post price update for {feed_id}");
264            let feed = feeds.get(&feed_id).ok_or_else(|| {
265                crate::Error::invalid_argument(format!(
266                    "feed account for the given `feed_id` is not provided, feed_id = {feed_id}"
267                ))
268            })?;
269            let rpc = self.gmsol.update_price_feed_with_chainlink(
270                &self.ctx.store,
271                feed,
272                &self.ctx.chainlink_program,
273                &self.ctx.access_controller,
274                &update.report_bytes()?,
275            )?;
276            txs.try_push_post(rpc)?;
277            map.insert(feed_id, *feed);
278        }
279
280        Ok((
281            txs,
282            HashMap::from([(PriceProviderKind::ChainlinkDataStreams, map)]),
283        ))
284    }
285}
286
287/// Filter feed ids.
288pub fn filter_feed_ids(feed_ids: &FeedIds) -> crate::Result<HashMap<Pubkey, FeedId>> {
289    Feeds::new(feed_ids)
290        .filter_map(|res| {
291            res.map(|config| {
292                matches!(config.provider, PriceProviderKind::ChainlinkDataStreams)
293                    .then(|| (config.token, config.feed.to_bytes()))
294            })
295            .transpose()
296        })
297        .collect::<crate::Result<HashMap<_, _>>>()
298}