gmsol/chainlink/pull_oracle/
client.rs

1use std::{fmt, ops::Deref, sync::Arc};
2
3use futures_util::{Stream, StreamExt, TryStreamExt};
4use gmsol_chainlink_datastreams::report::{decode, decode_full_report, Report};
5use reqwest::{IntoUrl, Url};
6use reqwest_websocket::{Message, RequestBuilderExt};
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10/// ENV for User ID.
11pub const ENV_USER_ID: &str = "CHAINLINK_USER_ID";
12
13/// ENV for Secret.
14pub const ENV_SECRET: &str = "CHAINLINK_SECRET";
15
16/// Default base URL for Chainlink Streams.
17pub const DEFAULT_STREAMS_BASE: &str = "https://api.dataengine.chain.link";
18
19/// Testnet base URL for Chainlink Streams.
20pub const TESTNET_STREAMS_BASE: &str = "https://api.testnet-dataengine.chain.link";
21
22/// Default base URL for Chainlink Streams.
23pub const DEFAULT_WS_STREAMS_BASE: &str = "wss://ws.dataengine.chain.link";
24
25/// Testnet base URL for Chainlink Streams.
26pub const TESTNET_WS_STREAMS_BASE: &str = "wss://ws.testnet-dataengine.chain.link";
27
28enum Path {
29    ReportsLatest,
30    ReportsBulk,
31    Feeds,
32    Websocket,
33}
34
35impl Path {
36    fn to_uri(&self) -> &str {
37        match self {
38            Self::ReportsLatest => "/api/v1/reports/latest",
39            Self::ReportsBulk => "/api/v1/reports/bulk",
40            Self::Feeds => "/api/v1/feeds",
41            Self::Websocket => "/api/v1/ws",
42        }
43    }
44}
45
46/// Credential.
47#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
48pub struct Credential {
49    user_id: String,
50    secret: String,
51}
52
53impl Credential {
54    /// Create from Default ENVs.
55    pub fn from_default_envs() -> crate::Result<Self> {
56        use std::env;
57
58        let user_id = env::var(ENV_USER_ID).map_err(crate::Error::invalid_argument)?;
59        let secret = env::var(ENV_SECRET).map_err(crate::Error::invalid_argument)?;
60
61        Ok(Self { user_id, secret })
62    }
63
64    fn generate_hmac(&self, timestamp: i128, request: &reqwest::Request) -> crate::Result<String> {
65        use hmac::{Hmac, Mac};
66
67        let body = request
68            .body()
69            .and_then(|body| body.as_bytes())
70            .unwrap_or_default();
71        let body_hash = hex::encode(Sha256::digest(body));
72
73        let url = request.url();
74        let uri = std::iter::once(url.path())
75            .chain(url.query())
76            .collect::<Vec<_>>()
77            .join("?");
78
79        let message = format!(
80            "{} {uri} {body_hash} {} {timestamp}",
81            request.method(),
82            self.user_id
83        );
84
85        let mut mac = Hmac::<Sha256>::new_from_slice(self.secret.as_bytes())
86            .map_err(crate::Error::invalid_argument)?;
87        mac.update(message.as_bytes());
88
89        let signature = hex::encode(mac.finalize().into_bytes());
90
91        Ok(signature)
92    }
93
94    fn sign(&self, request: &mut reqwest::Request) -> crate::Result<()> {
95        let timestamp_nanos = time::OffsetDateTime::now_utc().unix_timestamp_nanos();
96        let timestamp = timestamp_nanos / 1_000_000;
97
98        let signature = self.generate_hmac(timestamp, request)?;
99        let header = request.headers_mut();
100        header.insert(
101            "Authorization",
102            self.user_id
103                .parse()
104                .map_err(crate::Error::invalid_argument)?,
105        );
106        header.insert(
107            "X-Authorization-Timestamp",
108            timestamp
109                .to_string()
110                .parse()
111                .map_err(crate::Error::invalid_argument)?,
112        );
113        header.insert(
114            "X-Authorization-Signature-SHA256",
115            signature.parse().map_err(crate::Error::invalid_argument)?,
116        );
117        Ok(())
118    }
119}
120
121/// Chainlink Data Streams Client.
122#[derive(Clone)]
123pub struct Client {
124    base: Url,
125    ws_base: Url,
126    client: reqwest::Client,
127    credential: Arc<Credential>,
128}
129
130impl fmt::Debug for Client {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.debug_struct("Client")
133            .field("base", &self.base)
134            .field("ws_base", &self.ws_base)
135            .field("client", &self.client)
136            .finish_non_exhaustive()
137    }
138}
139
140impl Client {
141    /// Create a new [`Client`] with the given base URL.
142    pub fn try_new(
143        base: impl IntoUrl,
144        ws_base: impl IntoUrl,
145        credential: Credential,
146    ) -> crate::Result<Self> {
147        Ok(Self {
148            base: base.into_url()?,
149            ws_base: ws_base.into_url()?,
150            client: reqwest::Client::new(),
151            credential: Arc::new(credential),
152        })
153    }
154
155    /// Create a new mainnet [`Client`] with the given credential.
156    pub fn with_credential(credential: Credential) -> Self {
157        Self::try_new(DEFAULT_STREAMS_BASE, DEFAULT_WS_STREAMS_BASE, credential).unwrap()
158    }
159
160    /// Create a new testnet [`Client`] with the given credential.
161    pub fn with_testnet_credential(credential: Credential) -> Self {
162        Self::try_new(TESTNET_STREAMS_BASE, TESTNET_WS_STREAMS_BASE, credential).unwrap()
163    }
164
165    /// Create a new [`Client`] with default base url and default ENVs.
166    pub fn from_defaults() -> crate::Result<Self> {
167        Ok(Self::with_credential(Credential::from_default_envs()?))
168    }
169
170    /// Create a new [`Client`] with testnest base url and default ENVs.
171    pub fn from_testnet_defaults() -> crate::Result<Self> {
172        Ok(Self::with_testnet_credential(
173            Credential::from_default_envs()?,
174        ))
175    }
176
177    fn get_inner<T>(
178        &self,
179        path: Path,
180        query: &T,
181        sign: bool,
182        ws: bool,
183    ) -> crate::Result<reqwest::RequestBuilder>
184    where
185        T: Serialize,
186    {
187        let base = if ws { &self.ws_base } else { &self.base };
188        let url = base.join(path.to_uri())?;
189        let mut request = self.client.get(url).query(query).build()?;
190        if sign {
191            self.credential.sign(&mut request)?;
192        }
193        Ok(reqwest::RequestBuilder::from_parts(
194            self.client.clone(),
195            request,
196        ))
197    }
198
199    fn get<T>(&self, path: Path, query: &T, sign: bool) -> crate::Result<reqwest::RequestBuilder>
200    where
201        T: Serialize,
202    {
203        self.get_inner(path, query, sign, false)
204    }
205
206    /// Get available feeds.
207    pub async fn feeds(&self) -> crate::Result<Feeds> {
208        let feeds = self
209            .get::<Option<()>>(Path::Feeds, &None, true)?
210            .send()
211            .await?
212            .json()
213            .await?;
214        Ok(feeds)
215    }
216
217    /// Get latest report of the given hex-encoded feed ID.
218    pub async fn latest_report(&self, feed_id: &str) -> crate::Result<ApiReport> {
219        let report = self
220            .get(Path::ReportsLatest, &[("feedID", feed_id)], true)?
221            .send()
222            .await?
223            .json()
224            .await?;
225        Ok(report)
226    }
227
228    /// Get bulk of reports with the given feed IDs and timestamp.
229    pub async fn bulk_report(
230        &self,
231        feed_ids: impl IntoIterator<Item = &str>,
232        ts: time::OffsetDateTime,
233    ) -> crate::Result<ApiReports> {
234        let feed_ids = feed_ids.into_iter().collect::<Vec<_>>().join(",");
235        let timestamp = ts.unix_timestamp();
236        let reports = self
237            .get(
238                Path::ReportsBulk,
239                &[("feedIDs", feed_ids), ("timestamp", timestamp.to_string())],
240                true,
241            )?
242            .send()
243            .await?
244            .json()
245            .await?;
246        Ok(reports)
247    }
248
249    /// Subscribe to report updates using websocket.
250    pub async fn subscribe(
251        &self,
252        feed_ids: impl IntoIterator<Item = &str>,
253    ) -> crate::Result<impl Stream<Item = crate::Result<ApiReport>>> {
254        let feed_ids = feed_ids.into_iter().collect::<Vec<_>>().join(",");
255        let ws = self
256            .get_inner(Path::Websocket, &[("feedIDs", feed_ids)], true, true)?
257            .upgrade()
258            .send()
259            .await
260            .map_err(crate::Error::transport)?
261            .into_websocket()
262            .await
263            .map_err(crate::Error::transport)?;
264
265        let stream = ws
266            .map_err(crate::Error::transport)
267            .and_then(|message| async {
268                match message {
269                    Message::Binary(data) => Ok(Some(data)),
270                    Message::Close { code, reason } => Err(crate::Error::transport(format!(
271                        "channel closed: code = {code}, reason = {reason}"
272                    ))),
273                    _ => Ok(None),
274                }
275            })
276            .filter_map(|message| async { message.transpose() })
277            .and_then(|data| async move {
278                let report = serde_json::from_slice(&data)?;
279                Ok(report)
280            });
281
282        Ok(stream)
283    }
284}
285
286/// Feeds.
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct Feeds {
289    /// Feeds.
290    pub feeds: Vec<Feed>,
291}
292
293/// Feed.
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct Feed {
296    /// Hex-encoded Feed ID.
297    #[serde(rename = "feedID")]
298    pub feed_id: String,
299}
300
301/// Raw Report.
302#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct ApiReport {
304    report: ApiReportData,
305}
306
307impl ApiReport {
308    /// Into report data.
309    pub fn into_data(self) -> ApiReportData {
310        self.report
311    }
312}
313
314impl Deref for ApiReport {
315    type Target = ApiReportData;
316
317    fn deref(&self) -> &Self::Target {
318        &self.report
319    }
320}
321
322/// A bulk of reports.
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct ApiReports {
325    reports: Vec<ApiReportData>,
326}
327
328impl ApiReports {
329    /// Into reports.
330    pub fn into_reports(self) -> Vec<ApiReportData> {
331        self.reports
332    }
333}
334
335impl Deref for ApiReports {
336    type Target = Vec<ApiReportData>;
337
338    fn deref(&self) -> &Self::Target {
339        &self.reports
340    }
341}
342
343/// Raw Report Data.
344#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(rename_all = "camelCase")]
346pub struct ApiReportData {
347    /// Feed ID.
348    #[serde(rename = "feedID")]
349    pub feed_id: String,
350    /// Full Report.
351    pub full_report: String,
352    /// Observations timestamp (in secs).
353    pub observations_timestamp: i64,
354    /// Valid From Timestamp (in secs).
355    pub valid_from_timestamp: i64,
356}
357
358impl ApiReportData {
359    /// Decode the report.
360    pub fn decode(&self) -> crate::Result<Report> {
361        let report = self.report_bytes()?;
362        let (_, blob) = decode_full_report(&report).map_err(crate::Error::invalid_argument)?;
363        let report = decode(blob).map_err(crate::Error::invalid_argument)?;
364        Ok(report)
365    }
366
367    /// Decode report to bytes.
368    pub fn report_bytes(&self) -> crate::Result<Vec<u8>> {
369        hex::decode(
370            self.full_report
371                .strip_prefix("0x")
372                .unwrap_or(&self.full_report),
373        )
374        .map_err(crate::Error::invalid_argument)
375    }
376
377    /// Feed ID.
378    pub fn decode_feed_id(&self) -> crate::Result<[u8; 32]> {
379        let mut data = [0; 32];
380        hex::decode_to_slice(
381            self.feed_id.strip_prefix("0x").unwrap_or(&self.feed_id),
382            &mut data,
383        )
384        .map_err(crate::Error::unknown)?;
385        Ok(data)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391
392    use super::*;
393
394    #[test]
395    fn test_generate_hmac() {
396        let credential = Credential {
397            user_id: "clientId2".to_string(),
398            secret: "secret2".to_string(),
399        };
400
401        let client = reqwest::Client::new();
402        let request = client
403            .post(format!(
404                "{DEFAULT_STREAMS_BASE}{}",
405                Path::ReportsBulk.to_uri()
406            ))
407            .body(r#"{"attr1": "value1","attr2": [1,2,3]}"#)
408            .build()
409            .unwrap();
410
411        let signature = credential.generate_hmac(1718885772, &request).unwrap();
412        assert_eq!(
413            signature,
414            "37190febe20b6f3662f6abbfa3a7085ad705ac64e88bde8c1a01a635859e6cf7"
415        );
416    }
417}