gmsol/chainlink/pull_oracle/
client.rs1use std::{fmt, ops::Deref, sync::Arc};
2
3use futures_util::{Stream, StreamExt, TryStreamExt};
4use gmsol_chainlink_datastreams::report::{decode, decode_full_report, Report};
5use reqwest::{IntoUrl, Url};
6use reqwest_websocket::{Message, RequestBuilderExt};
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10pub const ENV_USER_ID: &str = "CHAINLINK_USER_ID";
12
13pub const ENV_SECRET: &str = "CHAINLINK_SECRET";
15
16pub const DEFAULT_STREAMS_BASE: &str = "https://api.dataengine.chain.link";
18
19pub const TESTNET_STREAMS_BASE: &str = "https://api.testnet-dataengine.chain.link";
21
22pub const DEFAULT_WS_STREAMS_BASE: &str = "wss://ws.dataengine.chain.link";
24
25pub const TESTNET_WS_STREAMS_BASE: &str = "wss://ws.testnet-dataengine.chain.link";
27
28enum Path {
29 ReportsLatest,
30 ReportsBulk,
31 Feeds,
32 Websocket,
33}
34
35impl Path {
36 fn to_uri(&self) -> &str {
37 match self {
38 Self::ReportsLatest => "/api/v1/reports/latest",
39 Self::ReportsBulk => "/api/v1/reports/bulk",
40 Self::Feeds => "/api/v1/feeds",
41 Self::Websocket => "/api/v1/ws",
42 }
43 }
44}
45
46#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
48pub struct Credential {
49 user_id: String,
50 secret: String,
51}
52
53impl Credential {
54 pub fn from_default_envs() -> crate::Result<Self> {
56 use std::env;
57
58 let user_id = env::var(ENV_USER_ID).map_err(crate::Error::invalid_argument)?;
59 let secret = env::var(ENV_SECRET).map_err(crate::Error::invalid_argument)?;
60
61 Ok(Self { user_id, secret })
62 }
63
64 fn generate_hmac(&self, timestamp: i128, request: &reqwest::Request) -> crate::Result<String> {
65 use hmac::{Hmac, Mac};
66
67 let body = request
68 .body()
69 .and_then(|body| body.as_bytes())
70 .unwrap_or_default();
71 let body_hash = hex::encode(Sha256::digest(body));
72
73 let url = request.url();
74 let uri = std::iter::once(url.path())
75 .chain(url.query())
76 .collect::<Vec<_>>()
77 .join("?");
78
79 let message = format!(
80 "{} {uri} {body_hash} {} {timestamp}",
81 request.method(),
82 self.user_id
83 );
84
85 let mut mac = Hmac::<Sha256>::new_from_slice(self.secret.as_bytes())
86 .map_err(crate::Error::invalid_argument)?;
87 mac.update(message.as_bytes());
88
89 let signature = hex::encode(mac.finalize().into_bytes());
90
91 Ok(signature)
92 }
93
94 fn sign(&self, request: &mut reqwest::Request) -> crate::Result<()> {
95 let timestamp_nanos = time::OffsetDateTime::now_utc().unix_timestamp_nanos();
96 let timestamp = timestamp_nanos / 1_000_000;
97
98 let signature = self.generate_hmac(timestamp, request)?;
99 let header = request.headers_mut();
100 header.insert(
101 "Authorization",
102 self.user_id
103 .parse()
104 .map_err(crate::Error::invalid_argument)?,
105 );
106 header.insert(
107 "X-Authorization-Timestamp",
108 timestamp
109 .to_string()
110 .parse()
111 .map_err(crate::Error::invalid_argument)?,
112 );
113 header.insert(
114 "X-Authorization-Signature-SHA256",
115 signature.parse().map_err(crate::Error::invalid_argument)?,
116 );
117 Ok(())
118 }
119}
120
121#[derive(Clone)]
123pub struct Client {
124 base: Url,
125 ws_base: Url,
126 client: reqwest::Client,
127 credential: Arc<Credential>,
128}
129
130impl fmt::Debug for Client {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.debug_struct("Client")
133 .field("base", &self.base)
134 .field("ws_base", &self.ws_base)
135 .field("client", &self.client)
136 .finish_non_exhaustive()
137 }
138}
139
140impl Client {
141 pub fn try_new(
143 base: impl IntoUrl,
144 ws_base: impl IntoUrl,
145 credential: Credential,
146 ) -> crate::Result<Self> {
147 Ok(Self {
148 base: base.into_url()?,
149 ws_base: ws_base.into_url()?,
150 client: reqwest::Client::new(),
151 credential: Arc::new(credential),
152 })
153 }
154
155 pub fn with_credential(credential: Credential) -> Self {
157 Self::try_new(DEFAULT_STREAMS_BASE, DEFAULT_WS_STREAMS_BASE, credential).unwrap()
158 }
159
160 pub fn with_testnet_credential(credential: Credential) -> Self {
162 Self::try_new(TESTNET_STREAMS_BASE, TESTNET_WS_STREAMS_BASE, credential).unwrap()
163 }
164
165 pub fn from_defaults() -> crate::Result<Self> {
167 Ok(Self::with_credential(Credential::from_default_envs()?))
168 }
169
170 pub fn from_testnet_defaults() -> crate::Result<Self> {
172 Ok(Self::with_testnet_credential(
173 Credential::from_default_envs()?,
174 ))
175 }
176
177 fn get_inner<T>(
178 &self,
179 path: Path,
180 query: &T,
181 sign: bool,
182 ws: bool,
183 ) -> crate::Result<reqwest::RequestBuilder>
184 where
185 T: Serialize,
186 {
187 let base = if ws { &self.ws_base } else { &self.base };
188 let url = base.join(path.to_uri())?;
189 let mut request = self.client.get(url).query(query).build()?;
190 if sign {
191 self.credential.sign(&mut request)?;
192 }
193 Ok(reqwest::RequestBuilder::from_parts(
194 self.client.clone(),
195 request,
196 ))
197 }
198
199 fn get<T>(&self, path: Path, query: &T, sign: bool) -> crate::Result<reqwest::RequestBuilder>
200 where
201 T: Serialize,
202 {
203 self.get_inner(path, query, sign, false)
204 }
205
206 pub async fn feeds(&self) -> crate::Result<Feeds> {
208 let feeds = self
209 .get::<Option<()>>(Path::Feeds, &None, true)?
210 .send()
211 .await?
212 .json()
213 .await?;
214 Ok(feeds)
215 }
216
217 pub async fn latest_report(&self, feed_id: &str) -> crate::Result<ApiReport> {
219 let report = self
220 .get(Path::ReportsLatest, &[("feedID", feed_id)], true)?
221 .send()
222 .await?
223 .json()
224 .await?;
225 Ok(report)
226 }
227
228 pub async fn bulk_report(
230 &self,
231 feed_ids: impl IntoIterator<Item = &str>,
232 ts: time::OffsetDateTime,
233 ) -> crate::Result<ApiReports> {
234 let feed_ids = feed_ids.into_iter().collect::<Vec<_>>().join(",");
235 let timestamp = ts.unix_timestamp();
236 let reports = self
237 .get(
238 Path::ReportsBulk,
239 &[("feedIDs", feed_ids), ("timestamp", timestamp.to_string())],
240 true,
241 )?
242 .send()
243 .await?
244 .json()
245 .await?;
246 Ok(reports)
247 }
248
249 pub async fn subscribe(
251 &self,
252 feed_ids: impl IntoIterator<Item = &str>,
253 ) -> crate::Result<impl Stream<Item = crate::Result<ApiReport>>> {
254 let feed_ids = feed_ids.into_iter().collect::<Vec<_>>().join(",");
255 let ws = self
256 .get_inner(Path::Websocket, &[("feedIDs", feed_ids)], true, true)?
257 .upgrade()
258 .send()
259 .await
260 .map_err(crate::Error::transport)?
261 .into_websocket()
262 .await
263 .map_err(crate::Error::transport)?;
264
265 let stream = ws
266 .map_err(crate::Error::transport)
267 .and_then(|message| async {
268 match message {
269 Message::Binary(data) => Ok(Some(data)),
270 Message::Close { code, reason } => Err(crate::Error::transport(format!(
271 "channel closed: code = {code}, reason = {reason}"
272 ))),
273 _ => Ok(None),
274 }
275 })
276 .filter_map(|message| async { message.transpose() })
277 .and_then(|data| async move {
278 let report = serde_json::from_slice(&data)?;
279 Ok(report)
280 });
281
282 Ok(stream)
283 }
284}
285
286#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct Feeds {
289 pub feeds: Vec<Feed>,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct Feed {
296 #[serde(rename = "feedID")]
298 pub feed_id: String,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct ApiReport {
304 report: ApiReportData,
305}
306
307impl ApiReport {
308 pub fn into_data(self) -> ApiReportData {
310 self.report
311 }
312}
313
314impl Deref for ApiReport {
315 type Target = ApiReportData;
316
317 fn deref(&self) -> &Self::Target {
318 &self.report
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct ApiReports {
325 reports: Vec<ApiReportData>,
326}
327
328impl ApiReports {
329 pub fn into_reports(self) -> Vec<ApiReportData> {
331 self.reports
332 }
333}
334
335impl Deref for ApiReports {
336 type Target = Vec<ApiReportData>;
337
338 fn deref(&self) -> &Self::Target {
339 &self.reports
340 }
341}
342
343#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(rename_all = "camelCase")]
346pub struct ApiReportData {
347 #[serde(rename = "feedID")]
349 pub feed_id: String,
350 pub full_report: String,
352 pub observations_timestamp: i64,
354 pub valid_from_timestamp: i64,
356}
357
358impl ApiReportData {
359 pub fn decode(&self) -> crate::Result<Report> {
361 let report = self.report_bytes()?;
362 let (_, blob) = decode_full_report(&report).map_err(crate::Error::invalid_argument)?;
363 let report = decode(blob).map_err(crate::Error::invalid_argument)?;
364 Ok(report)
365 }
366
367 pub fn report_bytes(&self) -> crate::Result<Vec<u8>> {
369 hex::decode(
370 self.full_report
371 .strip_prefix("0x")
372 .unwrap_or(&self.full_report),
373 )
374 .map_err(crate::Error::invalid_argument)
375 }
376
377 pub fn decode_feed_id(&self) -> crate::Result<[u8; 32]> {
379 let mut data = [0; 32];
380 hex::decode_to_slice(
381 self.feed_id.strip_prefix("0x").unwrap_or(&self.feed_id),
382 &mut data,
383 )
384 .map_err(crate::Error::unknown)?;
385 Ok(data)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391
392 use super::*;
393
394 #[test]
395 fn test_generate_hmac() {
396 let credential = Credential {
397 user_id: "clientId2".to_string(),
398 secret: "secret2".to_string(),
399 };
400
401 let client = reqwest::Client::new();
402 let request = client
403 .post(format!(
404 "{DEFAULT_STREAMS_BASE}{}",
405 Path::ReportsBulk.to_uri()
406 ))
407 .body(r#"{"attr1": "value1","attr2": [1,2,3]}"#)
408 .build()
409 .unwrap();
410
411 let signature = credential.generate_hmac(1718885772, &request).unwrap();
412 assert_eq!(
413 signature,
414 "37190febe20b6f3662f6abbfa3a7085ad705ac64e88bde8c1a01a635859e6cf7"
415 );
416 }
417}