1use std::{
2 collections::{HashMap, HashSet},
3 fmt,
4};
5
6use eventsource_stream::Eventsource;
7use futures_util::{Stream, TryStreamExt};
8use gmsol_store::states::{
9 pyth::pyth_price_with_confidence_to_price, HasMarketMeta, PriceProviderKind, TokenMapAccess,
10};
11use reqwest::{Client, IntoUrl, Url};
12
13pub use pyth_sdk::Identifier;
14
15use crate::pyth::pubkey_to_identifier;
16
17pub const DEFAULT_HERMES_BASE: &str = "https://hermes.pyth.network";
19
20pub const PRICE_STREAM: &str = "/v2/updates/price/stream";
22
23pub const PRICE_LATEST: &str = "/v2/updates/price/latest";
25
26#[derive(Debug, Clone)]
28pub struct Hermes {
29 base: Url,
30 client: Client,
31}
32
33impl Hermes {
34 pub fn try_new(base: impl IntoUrl) -> crate::Result<Self> {
36 Ok(Self {
37 base: base.into_url()?,
38 client: Client::new(),
39 })
40 }
41
42 pub async fn price_updates(
44 &self,
45 feed_ids: impl IntoIterator<Item = &Identifier>,
46 encoding: Option<EncodingType>,
47 ) -> crate::Result<impl Stream<Item = crate::Result<PriceUpdate>> + 'static> {
48 let params = get_query(feed_ids, encoding);
49 let stream = self
50 .client
51 .get(self.base.join(PRICE_STREAM)?)
52 .query(¶ms)
53 .send()
54 .await?
55 .bytes_stream()
56 .eventsource()
57 .map_err(crate::Error::from)
58 .try_filter_map(|event| {
59 let update = deserialize_price_update_event(&event)
60 .inspect_err(
61 |err| tracing::warn!(%err, ?event, "deserialize price update error"),
62 )
63 .ok();
64 async { Ok(update) }
65 });
66 Ok(stream)
67 }
68
69 pub async fn latest_price_updates(
71 &self,
72 feed_ids: impl IntoIterator<Item = &Identifier>,
73 encoding: Option<EncodingType>,
74 ) -> crate::Result<PriceUpdate> {
75 let params = get_query(feed_ids, encoding);
76 let update = self
77 .client
78 .get(self.base.join(PRICE_LATEST)?)
79 .query(¶ms)
80 .send()
81 .await?
82 .json()
83 .await?;
84 Ok(update)
85 }
86
87 pub async fn unit_prices_for_market(
89 &self,
90 token_map: &impl TokenMapAccess,
91 market: &impl HasMarketMeta,
92 ) -> crate::Result<gmsol_model::price::Prices<u128>> {
93 let token_configs =
94 token_map
95 .token_configs_for_market(market)
96 .ok_or(crate::Error::invalid_argument(
97 "missing configs for the tokens of the market",
98 ))?;
99 let feeds = token_configs
100 .iter()
101 .map(|config| {
102 config
103 .get_feed(&PriceProviderKind::Pyth)
104 .map(|feed| pubkey_to_identifier(&feed))
105 })
106 .collect::<Result<Vec<_>, _>>()
107 .map_err(crate::Error::invalid_argument)?;
108 let update = self
109 .latest_price_updates(feeds.iter().collect::<HashSet<_>>(), None)
110 .await?;
111 let prices = update
112 .parsed
113 .iter()
114 .map(|price| {
115 Ok((
116 Identifier::from_hex(price.id()).map_err(crate::Error::unknown)?,
117 &price.price,
118 ))
119 })
120 .collect::<crate::Result<HashMap<Identifier, _>>>()?;
121 let [index_token_price, long_token_price, short_token_price] = feeds
122 .iter()
123 .enumerate()
124 .map(|(idx, feed)| {
125 let config = token_configs[idx];
126 let price = prices
127 .get(feed)
128 .ok_or(crate::Error::unknown(format!("missing price for {}", feed)))?;
129 let price = pyth_price_with_confidence_to_price(
130 price.price,
131 price.conf,
132 price.expo,
133 config,
134 )
135 .map_err(crate::Error::unknown)?;
136 Ok(gmsol_model::price::Price {
137 min: price.min.to_unit_price(),
138 max: price.max.to_unit_price(),
139 })
140 })
141 .collect::<crate::Result<Vec<_>>>()?
142 .try_into()
143 .expect("must success");
144 Ok(gmsol_model::price::Prices {
145 index_token_price,
146 long_token_price,
147 short_token_price,
148 })
149 }
150}
151
152impl Default for Hermes {
153 fn default() -> Self {
154 Self {
155 base: DEFAULT_HERMES_BASE.parse().unwrap(),
156 client: Default::default(),
157 }
158 }
159}
160
161fn deserialize_price_update_event(event: &eventsource_stream::Event) -> crate::Result<PriceUpdate> {
162 Ok(serde_json::from_str(&event.data)?)
163}
164
165#[derive(Debug, serde::Serialize, serde::Deserialize)]
167pub struct PriceUpdate {
168 pub(crate) binary: BinaryPriceUpdate,
169 #[serde(default)]
170 parsed: Vec<ParsedPriceUpdate>,
171}
172
173impl PriceUpdate {
174 pub fn parsed(&self) -> &[ParsedPriceUpdate] {
176 &self.parsed
177 }
178
179 pub fn min_timestamp(&self) -> Option<i64> {
181 self.parsed
182 .iter()
183 .map(|update| update.price.publish_time)
184 .min()
185 }
186
187 pub fn binary(&self) -> &BinaryPriceUpdate {
189 &self.binary
190 }
191}
192
193#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
194pub struct BinaryPriceUpdate {
195 pub(crate) encoding: EncodingType,
196 pub(crate) data: Vec<String>,
197}
198
199#[derive(Clone, Copy, Debug, Default, serde::Deserialize, serde::Serialize)]
200pub enum EncodingType {
201 #[default]
203 #[serde(rename = "hex")]
204 Hex,
205 #[serde(rename = "base64")]
207 Base64,
208}
209
210impl fmt::Display for EncodingType {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 match self {
213 Self::Hex => write!(f, "hex"),
214 Self::Base64 => write!(f, "base64"),
215 }
216 }
217}
218
219#[derive(Debug, serde::Serialize, serde::Deserialize)]
220pub struct ParsedPriceUpdate {
221 id: String,
222 price: Price,
223 ema_price: Price,
224 metadata: Metadata,
225}
226
227impl ParsedPriceUpdate {
228 pub fn id(&self) -> &str {
230 self.id.as_str()
231 }
232
233 pub fn price(&self) -> &Price {
235 &self.price
236 }
237
238 pub fn ema_price(&self) -> &Price {
240 &self.ema_price
241 }
242
243 pub fn metadata(&self) -> &Metadata {
245 &self.metadata
246 }
247}
248
249#[derive(Debug, serde::Serialize, serde::Deserialize)]
250pub struct Price {
251 #[serde(with = "pyth_sdk::utils::as_string")]
253 price: i64,
254 #[serde(with = "pyth_sdk::utils::as_string")]
256 conf: u64,
257 expo: i32,
259 publish_time: i64,
261}
262
263impl Price {
264 pub fn price(&self) -> i64 {
266 self.price
267 }
268
269 pub fn conf(&self) -> u64 {
271 self.conf
272 }
273
274 pub fn expo(&self) -> i32 {
276 self.expo
277 }
278
279 pub fn publish_time(&self) -> i64 {
281 self.publish_time
282 }
283}
284
285#[derive(Debug, serde::Serialize, serde::Deserialize)]
286pub struct Metadata {
287 slot: Option<u64>,
288 proof_available_time: Option<i64>,
289 prev_publish_time: Option<i64>,
290}
291
292impl Metadata {
293 pub fn slot(&self) -> Option<u64> {
295 self.slot
296 }
297
298 pub fn proof_available_time(&self) -> Option<i64> {
300 self.proof_available_time
301 }
302
303 pub fn prev_publish_time(&self) -> Option<i64> {
305 self.prev_publish_time
306 }
307}
308
309fn get_query<'a>(
310 feed_ids: impl IntoIterator<Item = &'a Identifier>,
311 encoding: Option<EncodingType>,
312) -> Vec<(&'static str, String)> {
313 let encoding = encoding.or(Some(EncodingType::Base64));
314 feed_ids
315 .into_iter()
316 .map(|id| ("ids[]", id.to_hex()))
317 .chain(encoding.map(|encoding| ("encoding", encoding.to_string())))
318 .collect()
319}