gmsol/chainlink/pull_oracle/
pull_oracle_impl.rs1use std::{
2 collections::HashMap,
3 ops::Deref,
4 sync::{Arc, RwLock},
5};
6
7use anchor_client::solana_sdk::{pubkey::Pubkey, signer::Signer};
8use gmsol_solana_utils::bundle_builder::{BundleBuilder, BundleOptions};
9use gmsol_store::states::PriceProviderKind;
10use time::OffsetDateTime;
11
12use crate::{
13 store::{oracle::OracleOps, utils::Feeds},
14 utils::builder::{
15 FeedAddressMap, FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle,
16 },
17};
18
19use super::{client::ApiReportData, Client, FeedId};
20
21pub struct ChainlinkPullOracleFactory {
23 chainlink_program: Pubkey,
24 access_controller: Pubkey,
25 store: Pubkey,
26 feed_index: u16,
27 feeds: RwLock<FeedAddressMap>,
28}
29
30impl ChainlinkPullOracleFactory {
31 pub fn new(store: &Pubkey, feed_index: u16) -> Self {
33 use gmsol_chainlink_datastreams::verifier;
34
35 Self::with_program_id_and_access_controller(
36 store,
37 feed_index,
38 &verifier::ID,
39 &super::access_controller_address::ID,
40 )
41 }
42
43 pub fn arced(self) -> Arc<Self> {
45 Arc::new(self)
46 }
47
48 pub fn with_program_id_and_access_controller(
50 store: &Pubkey,
51 feed_index: u16,
52 chainlink_program: &Pubkey,
53 access_controller: &Pubkey,
54 ) -> Self {
55 Self {
56 chainlink_program: *chainlink_program,
57 access_controller: *access_controller,
58 store: *store,
59 feed_index,
60 feeds: Default::default(),
61 }
62 }
63
64 pub async fn prepare_feeds_bundle<'a, C: Deref<Target = impl Signer> + Clone>(
66 &self,
67 gmsol: &'a crate::Client<C>,
68 feed_ids: HashMap<Pubkey, FeedId>,
69 options: BundleOptions,
70 ) -> crate::Result<BundleBuilder<'a, C>> {
71 let provider = PriceProviderKind::ChainlinkDataStreams;
72 let mut txs = gmsol.bundle_with_options(options);
73 let authority = gmsol.payer();
74 for (token, feed_id) in feed_ids {
75 let address = gmsol.find_price_feed_address(
76 &self.store,
77 &authority,
78 self.feed_index,
79 provider,
80 &token,
81 );
82 let feed_id = Pubkey::new_from_array(feed_id);
83 match gmsol.price_feed(&address).await? {
84 Some(feed) => {
85 if *feed.feed_id() != feed_id {
86 return Err(crate::Error::invalid_argument("feed_id mismatched"));
87 }
88 }
89 None => {
90 txs.push(
91 gmsol
92 .initialize_price_feed(
93 &self.store,
94 self.feed_index,
95 provider,
96 &token,
97 &feed_id,
98 )
99 .0,
100 )?;
101 }
102 }
103 self.feeds.write().unwrap().insert(feed_id, address);
104 }
105
106 let feeds = self
107 .feeds
108 .read()
109 .unwrap()
110 .values()
111 .copied()
112 .collect::<Vec<_>>();
113
114 tracing::info!("Using custom feeds: {feeds:#?}");
115
116 Ok(txs)
117 }
118
119 pub async fn prepare_feeds<C: Deref<Target = impl Signer> + Clone>(
121 &self,
122 gmsol: &crate::Client<C>,
123 feed_ids: HashMap<Pubkey, FeedId>,
124 ) -> crate::Result<()> {
125 let txs = self
126 .prepare_feeds_bundle(gmsol, feed_ids, Default::default())
127 .await?;
128
129 if !txs.is_empty() {
130 match txs.send_all(false).await {
131 Ok(signatures) => {
132 tracing::info!("initialized feeds with txs: {signatures:#?}");
133 }
134 Err((signatures, err)) => {
135 tracing::error!(%err, "failed to initailize feeds, successful txs: {signatures:#?}");
136 }
137 }
138 }
139
140 Ok(())
141 }
142
143 pub fn make_oracle<'a, C>(
145 self: Arc<Self>,
146 chainlink: &'a Client,
147 gmsol: &'a crate::Client<C>,
148 skip_feeds_preparation: bool,
149 ) -> ChainlinkPullOracle<'a, C> {
150 ChainlinkPullOracle::new(chainlink, gmsol, self, skip_feeds_preparation)
151 }
152}
153
154pub struct ChainlinkPullOracle<'a, C> {
156 chainlink: &'a Client,
157 gmsol: &'a crate::Client<C>,
158 ctx: Arc<ChainlinkPullOracleFactory>,
159 skip_feeds_preparation: bool,
160}
161
162impl<C> Clone for ChainlinkPullOracle<'_, C> {
163 fn clone(&self) -> Self {
164 Self {
165 ctx: self.ctx.clone(),
166 ..*self
167 }
168 }
169}
170
171impl<'a, C> ChainlinkPullOracle<'a, C> {
172 pub fn new(
174 chainlink: &'a Client,
175 gmsol: &'a crate::Client<C>,
176 ctx: Arc<ChainlinkPullOracleFactory>,
177 skip_feeds_preparation: bool,
178 ) -> Self {
179 Self {
180 chainlink,
181 gmsol,
182 ctx,
183 skip_feeds_preparation,
184 }
185 }
186}
187
188impl<C: Deref<Target = impl Signer> + Clone> ChainlinkPullOracle<'_, C> {
189 pub async fn prepare_feeds_bundle(
191 &self,
192 feed_ids: &FeedIds,
193 options: BundleOptions,
194 ) -> crate::Result<BundleBuilder<C>> {
195 self.ctx
196 .prepare_feeds_bundle(self.gmsol, filter_feed_ids(feed_ids)?, options)
197 .await
198 }
199}
200
201impl<C: Deref<Target = impl Signer> + Clone> PullOracle for ChainlinkPullOracle<'_, C> {
202 type PriceUpdates = HashMap<FeedId, ApiReportData>;
203
204 async fn fetch_price_updates(
205 &self,
206 feed_ids: &FeedIds,
207 after: Option<OffsetDateTime>,
208 ) -> crate::Result<Self::PriceUpdates> {
209 let feeds = filter_feed_ids(feed_ids)?;
210
211 let feed_ids = feeds.values().map(hex::encode).collect::<Vec<_>>();
212
213 if !self.skip_feeds_preparation {
214 self.ctx.prepare_feeds(self.gmsol, feeds).await?;
215 }
216
217 let tasks = feed_ids
218 .iter()
219 .map(|feed_id| self.chainlink.latest_report(feed_id));
220 let price_updates = futures_util::future::try_join_all(tasks).await?;
221
222 let updates = price_updates
223 .into_iter()
224 .map(|report| {
225 let feed_id = report.decode_feed_id()?;
226 let ts = report.observations_timestamp;
227
228 if let Some(after) = after {
229 let ts = OffsetDateTime::from_unix_timestamp(ts)
230 .map_err(crate::Error::invalid_argument)?;
231 if after > ts {
232 return Err(crate::Error::invalid_argument(format!(
233 "price updates are too old, ts={ts}, required={after}"
234 )));
235 }
236 }
237
238 Ok((feed_id, report.into_data()))
239 })
240 .collect::<crate::Result<HashMap<_, _>>>()?;
241
242 Ok(updates)
243 }
244}
245
246impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
247 for ChainlinkPullOracle<'a, C>
248{
249 async fn fetch_price_update_instructions(
250 &self,
251 price_updates: &Self::PriceUpdates,
252 options: BundleOptions,
253 ) -> crate::Result<(
254 PriceUpdateInstructions<'a, C>,
255 HashMap<PriceProviderKind, FeedAddressMap>,
256 )> {
257 let mut txs = PriceUpdateInstructions::new(self.gmsol, options);
258 let mut map = HashMap::with_capacity(price_updates.len());
259
260 let feeds = self.ctx.feeds.read().unwrap();
261 for (feed_id, update) in price_updates {
262 let feed_id = Pubkey::new_from_array(*feed_id);
263 tracing::info!("adding ix to post price update for {feed_id}");
264 let feed = feeds.get(&feed_id).ok_or_else(|| {
265 crate::Error::invalid_argument(format!(
266 "feed account for the given `feed_id` is not provided, feed_id = {feed_id}"
267 ))
268 })?;
269 let rpc = self.gmsol.update_price_feed_with_chainlink(
270 &self.ctx.store,
271 feed,
272 &self.ctx.chainlink_program,
273 &self.ctx.access_controller,
274 &update.report_bytes()?,
275 )?;
276 txs.try_push_post(rpc)?;
277 map.insert(feed_id, *feed);
278 }
279
280 Ok((
281 txs,
282 HashMap::from([(PriceProviderKind::ChainlinkDataStreams, map)]),
283 ))
284 }
285}
286
287pub fn filter_feed_ids(feed_ids: &FeedIds) -> crate::Result<HashMap<Pubkey, FeedId>> {
289 Feeds::new(feed_ids)
290 .filter_map(|res| {
291 res.map(|config| {
292 matches!(config.provider, PriceProviderKind::ChainlinkDataStreams)
293 .then(|| (config.token, config.feed.to_bytes()))
294 })
295 .transpose()
296 })
297 .collect::<crate::Result<HashMap<_, _>>>()
298}