gmsol/pyth/pull_oracle/
hermes.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt,
4};
5
6use eventsource_stream::Eventsource;
7use futures_util::{Stream, TryStreamExt};
8use gmsol_store::states::{
9    pyth::pyth_price_with_confidence_to_price, HasMarketMeta, PriceProviderKind, TokenMapAccess,
10};
11use reqwest::{Client, IntoUrl, Url};
12
13pub use pyth_sdk::Identifier;
14
15use crate::pyth::pubkey_to_identifier;
16
17/// Default base URL for Hermes.
18pub const DEFAULT_HERMES_BASE: &str = "https://hermes.pyth.network";
19
20/// The SSE endpoint of price updates stream.
21pub const PRICE_STREAM: &str = "/v2/updates/price/stream";
22
23/// The endpoint of latest price update.
24pub const PRICE_LATEST: &str = "/v2/updates/price/latest";
25
26/// Hermes Client.
27#[derive(Debug, Clone)]
28pub struct Hermes {
29    base: Url,
30    client: Client,
31}
32
33impl Hermes {
34    /// Create a new hermes client with the given base URL.
35    pub fn try_new(base: impl IntoUrl) -> crate::Result<Self> {
36        Ok(Self {
37            base: base.into_url()?,
38            client: Client::new(),
39        })
40    }
41
42    /// Get a stream of price updates.
43    pub async fn price_updates(
44        &self,
45        feed_ids: impl IntoIterator<Item = &Identifier>,
46        encoding: Option<EncodingType>,
47    ) -> crate::Result<impl Stream<Item = crate::Result<PriceUpdate>> + 'static> {
48        let params = get_query(feed_ids, encoding);
49        let stream = self
50            .client
51            .get(self.base.join(PRICE_STREAM)?)
52            .query(&params)
53            .send()
54            .await?
55            .bytes_stream()
56            .eventsource()
57            .map_err(crate::Error::from)
58            .try_filter_map(|event| {
59                let update = deserialize_price_update_event(&event)
60                    .inspect_err(
61                        |err| tracing::warn!(%err, ?event, "deserialize price update error"),
62                    )
63                    .ok();
64                async { Ok(update) }
65            });
66        Ok(stream)
67    }
68
69    /// Get latest price updates.
70    pub async fn latest_price_updates(
71        &self,
72        feed_ids: impl IntoIterator<Item = &Identifier>,
73        encoding: Option<EncodingType>,
74    ) -> crate::Result<PriceUpdate> {
75        let params = get_query(feed_ids, encoding);
76        let update = self
77            .client
78            .get(self.base.join(PRICE_LATEST)?)
79            .query(&params)
80            .send()
81            .await?
82            .json()
83            .await?;
84        Ok(update)
85    }
86
87    /// Get unit prices for the given market.
88    pub async fn unit_prices_for_market(
89        &self,
90        token_map: &impl TokenMapAccess,
91        market: &impl HasMarketMeta,
92    ) -> crate::Result<gmsol_model::price::Prices<u128>> {
93        let token_configs =
94            token_map
95                .token_configs_for_market(market)
96                .ok_or(crate::Error::invalid_argument(
97                    "missing configs for the tokens of the market",
98                ))?;
99        let feeds = token_configs
100            .iter()
101            .map(|config| {
102                config
103                    .get_feed(&PriceProviderKind::Pyth)
104                    .map(|feed| pubkey_to_identifier(&feed))
105            })
106            .collect::<Result<Vec<_>, _>>()
107            .map_err(crate::Error::invalid_argument)?;
108        let update = self
109            .latest_price_updates(feeds.iter().collect::<HashSet<_>>(), None)
110            .await?;
111        let prices = update
112            .parsed
113            .iter()
114            .map(|price| {
115                Ok((
116                    Identifier::from_hex(price.id()).map_err(crate::Error::unknown)?,
117                    &price.price,
118                ))
119            })
120            .collect::<crate::Result<HashMap<Identifier, _>>>()?;
121        let [index_token_price, long_token_price, short_token_price] = feeds
122            .iter()
123            .enumerate()
124            .map(|(idx, feed)| {
125                let config = token_configs[idx];
126                let price = prices
127                    .get(feed)
128                    .ok_or(crate::Error::unknown(format!("missing price for {}", feed)))?;
129                let price = pyth_price_with_confidence_to_price(
130                    price.price,
131                    price.conf,
132                    price.expo,
133                    config,
134                )
135                .map_err(crate::Error::unknown)?;
136                Ok(gmsol_model::price::Price {
137                    min: price.min.to_unit_price(),
138                    max: price.max.to_unit_price(),
139                })
140            })
141            .collect::<crate::Result<Vec<_>>>()?
142            .try_into()
143            .expect("must success");
144        Ok(gmsol_model::price::Prices {
145            index_token_price,
146            long_token_price,
147            short_token_price,
148        })
149    }
150}
151
152impl Default for Hermes {
153    fn default() -> Self {
154        Self {
155            base: DEFAULT_HERMES_BASE.parse().unwrap(),
156            client: Default::default(),
157        }
158    }
159}
160
161fn deserialize_price_update_event(event: &eventsource_stream::Event) -> crate::Result<PriceUpdate> {
162    Ok(serde_json::from_str(&event.data)?)
163}
164
165/// Price Update.
166#[derive(Debug, serde::Serialize, serde::Deserialize)]
167pub struct PriceUpdate {
168    pub(crate) binary: BinaryPriceUpdate,
169    #[serde(default)]
170    parsed: Vec<ParsedPriceUpdate>,
171}
172
173impl PriceUpdate {
174    /// Get the parsed price udpate.
175    pub fn parsed(&self) -> &[ParsedPriceUpdate] {
176        &self.parsed
177    }
178
179    /// Min timestamp.
180    pub fn min_timestamp(&self) -> Option<i64> {
181        self.parsed
182            .iter()
183            .map(|update| update.price.publish_time)
184            .min()
185    }
186
187    /// Get the binary price update.
188    pub fn binary(&self) -> &BinaryPriceUpdate {
189        &self.binary
190    }
191}
192
193#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
194pub struct BinaryPriceUpdate {
195    pub(crate) encoding: EncodingType,
196    pub(crate) data: Vec<String>,
197}
198
199#[derive(Clone, Copy, Debug, Default, serde::Deserialize, serde::Serialize)]
200pub enum EncodingType {
201    /// Hex.
202    #[default]
203    #[serde(rename = "hex")]
204    Hex,
205    /// Base64.
206    #[serde(rename = "base64")]
207    Base64,
208}
209
210impl fmt::Display for EncodingType {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        match self {
213            Self::Hex => write!(f, "hex"),
214            Self::Base64 => write!(f, "base64"),
215        }
216    }
217}
218
219#[derive(Debug, serde::Serialize, serde::Deserialize)]
220pub struct ParsedPriceUpdate {
221    id: String,
222    price: Price,
223    ema_price: Price,
224    metadata: Metadata,
225}
226
227impl ParsedPriceUpdate {
228    /// Get the feed id.
229    pub fn id(&self) -> &str {
230        self.id.as_str()
231    }
232
233    /// Get price.
234    pub fn price(&self) -> &Price {
235        &self.price
236    }
237
238    /// Get EMA Price.
239    pub fn ema_price(&self) -> &Price {
240        &self.ema_price
241    }
242
243    /// Get metadata.
244    pub fn metadata(&self) -> &Metadata {
245        &self.metadata
246    }
247}
248
249#[derive(Debug, serde::Serialize, serde::Deserialize)]
250pub struct Price {
251    /// Price.
252    #[serde(with = "pyth_sdk::utils::as_string")]
253    price: i64,
254    /// Confidence.
255    #[serde(with = "pyth_sdk::utils::as_string")]
256    conf: u64,
257    /// Exponent of the price.
258    expo: i32,
259    /// Publish unix timestamp (secs) of the price.
260    publish_time: i64,
261}
262
263impl Price {
264    /// Get (raw) price.
265    pub fn price(&self) -> i64 {
266        self.price
267    }
268
269    /// Get the confidence of the price.
270    pub fn conf(&self) -> u64 {
271        self.conf
272    }
273
274    /// Get the exponent of the price.
275    pub fn expo(&self) -> i32 {
276        self.expo
277    }
278
279    /// Get the publish time (unix timestamp in secs).
280    pub fn publish_time(&self) -> i64 {
281        self.publish_time
282    }
283}
284
285#[derive(Debug, serde::Serialize, serde::Deserialize)]
286pub struct Metadata {
287    slot: Option<u64>,
288    proof_available_time: Option<i64>,
289    prev_publish_time: Option<i64>,
290}
291
292impl Metadata {
293    /// Get slot.
294    pub fn slot(&self) -> Option<u64> {
295        self.slot
296    }
297
298    /// Get proof available time.
299    pub fn proof_available_time(&self) -> Option<i64> {
300        self.proof_available_time
301    }
302
303    /// Get previous publish time.
304    pub fn prev_publish_time(&self) -> Option<i64> {
305        self.prev_publish_time
306    }
307}
308
309fn get_query<'a>(
310    feed_ids: impl IntoIterator<Item = &'a Identifier>,
311    encoding: Option<EncodingType>,
312) -> Vec<(&'static str, String)> {
313    let encoding = encoding.or(Some(EncodingType::Base64));
314    feed_ids
315        .into_iter()
316        .map(|id| ("ids[]", id.to_hex()))
317        .chain(encoding.map(|encoding| ("encoding", encoding.to_string())))
318        .collect()
319}