gmsol/utils/rpc/
pubsub.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    num::NonZeroUsize,
4    ops::DerefMut,
5    sync::Arc,
6    time::Duration,
7};
8
9use anchor_client::{
10    solana_client::{
11        nonblocking::pubsub_client::PubsubClient as SolanaPubsubClient,
12        rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter},
13        rpc_response::RpcLogsResponse,
14    },
15    solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey},
16};
17use futures_util::{Stream, StreamExt, TryStreamExt};
18use gmsol_solana_utils::cluster::Cluster;
19use tokio::{
20    sync::{broadcast, oneshot, Mutex, RwLock},
21    task::{AbortHandle, JoinSet},
22};
23use tokio_stream::wrappers::BroadcastStream;
24use tracing::Instrument;
25
26use crate::utils::WithContext;
27
28/// A wrapper of [the solana version of pubsub client](SolanaPubsubClient)
29/// with shared subscription support.
30#[derive(Debug)]
31pub struct PubsubClient {
32    inner: RwLock<Option<Inner>>,
33    cluster: Cluster,
34    config: SubscriptionConfig,
35}
36
37impl PubsubClient {
38    /// Create a new [`PubsubClient`] with the given config.
39    pub async fn new(cluster: Cluster, config: SubscriptionConfig) -> crate::Result<Self> {
40        Ok(Self {
41            inner: RwLock::new(None),
42            cluster,
43            config,
44        })
45    }
46
47    async fn prepare(&self) -> crate::Result<()> {
48        if self.inner.read().await.is_some() {
49            return Ok(());
50        }
51        self.reset().await
52    }
53
54    /// Subscribe to transaction logs.
55    pub async fn logs_subscribe(
56        &self,
57        mention: &Pubkey,
58        commitment: Option<CommitmentConfig>,
59    ) -> crate::Result<impl Stream<Item = crate::Result<WithContext<RpcLogsResponse>>>> {
60        self.prepare().await?;
61        let res = self
62            .inner
63            .read()
64            .await
65            .as_ref()
66            .ok_or_else(|| crate::Error::invalid_argument("the pubsub client has been closed"))?
67            .logs_subscribe(mention, commitment, &self.config)
68            .await;
69        match res {
70            Ok(stream) => Ok(stream),
71            Err(crate::Error::PubsubClosed) => {
72                self.reset().await?;
73                Err(crate::Error::PubsubClosed)
74            }
75            Err(err) => Err(err),
76        }
77    }
78
79    /// Reset the client.
80    pub async fn reset(&self) -> crate::Result<()> {
81        let client = SolanaPubsubClient::new(self.cluster.ws_url())
82            .await
83            .map_err(anchor_client::ClientError::from)?;
84        let mut inner = self.inner.write().await;
85        if let Some(previous) = inner.take() {
86            _ = previous.shutdown().await;
87        }
88        *inner = Some(Inner::new(client));
89        Ok(())
90    }
91
92    /// Shutdown gracefully.
93    pub async fn shutdown(&self) -> crate::Result<()> {
94        if let Some(inner) = self.inner.write().await.take() {
95            inner.shutdown().await?;
96        }
97        Ok(())
98    }
99}
100
101#[derive(Debug)]
102struct Inner {
103    tasks: Mutex<JoinSet<()>>,
104    client: Arc<SolanaPubsubClient>,
105    logs: LogsSubscriptions,
106}
107
108impl Inner {
109    fn new(client: SolanaPubsubClient) -> Self {
110        Self {
111            tasks: Default::default(),
112            client: Arc::new(client),
113            logs: Default::default(),
114        }
115    }
116
117    async fn logs_subscribe(
118        &self,
119        mention: &Pubkey,
120        commitment: Option<CommitmentConfig>,
121        config: &SubscriptionConfig,
122    ) -> crate::Result<impl Stream<Item = crate::Result<WithContext<RpcLogsResponse>>>> {
123        let config = SubscriptionConfig {
124            commitment: commitment.unwrap_or(config.commitment),
125            ..*config
126        };
127        let receiver = self
128            .logs
129            .subscribe(
130                self.tasks.lock().await.deref_mut(),
131                &self.client,
132                mention,
133                config,
134            )
135            .await?;
136        Ok(BroadcastStream::new(receiver).map_err(crate::Error::from))
137    }
138
139    async fn shutdown(self) -> crate::Result<()> {
140        self.tasks.lock().await.shutdown().await;
141        Arc::into_inner(self.client)
142            .ok_or_else(|| {
143                crate::Error::unknown("the client should be unique here, but it is not")
144            })?
145            .shutdown()
146            .await
147            .map_err(anchor_client::ClientError::from)?;
148        Ok(())
149    }
150}
151
152/// Config for subscription manager.
153#[derive(Debug, Clone)]
154pub struct SubscriptionConfig {
155    /// Commitment.
156    pub commitment: CommitmentConfig,
157    /// Cleanup interval.
158    pub cleanup_interval: Duration,
159    /// Capacity for the broadcast channel.
160    pub capacity: NonZeroUsize,
161}
162
163impl Default for SubscriptionConfig {
164    fn default() -> Self {
165        Self {
166            commitment: CommitmentConfig::finalized(),
167            cleanup_interval: Duration::from_secs(10),
168            capacity: NonZeroUsize::new(256).unwrap(),
169        }
170    }
171}
172
173#[derive(Debug)]
174struct LogsSubscription {
175    commitment: CommitmentConfig,
176    sender: ClosableSender<WithContext<RpcLogsResponse>>,
177    abort: AbortHandle,
178}
179
180impl Drop for LogsSubscription {
181    fn drop(&mut self) {
182        self.abort.abort();
183    }
184}
185
186impl LogsSubscription {
187    async fn init(
188        join_set: &mut JoinSet<()>,
189        sender: ClosableSender<WithContext<RpcLogsResponse>>,
190        client: &Arc<SolanaPubsubClient>,
191        mention: &Pubkey,
192        commitment: CommitmentConfig,
193        cleanup_interval: Duration,
194    ) -> crate::Result<Self> {
195        let (tx, rx) = oneshot::channel::<crate::Result<_>>();
196        let abort = join_set.spawn({
197            let client = client.clone();
198            let mention = *mention;
199            let sender = sender.clone();
200            async move {
201                let res = client
202                    .logs_subscribe(
203                        RpcTransactionLogsFilter::Mentions(vec![mention.to_string()]),
204                        RpcTransactionLogsConfig { commitment: Some(commitment) },
205                    )
206                    .await
207                    .inspect_err(
208                        |err| tracing::error!(%err, %mention, "failed to subscribe transaction logs"),
209                    );
210                match res {
211                    Ok((mut stream, unsubscribe)) => {
212                        _ = tx.send(Ok(()));
213                        let mut interval = tokio::time::interval(cleanup_interval);
214                        loop {
215                            tokio::select! {
216                                _ = interval.tick() => {
217                                    if sender.receiver_count().unwrap_or(0) == 0 {
218                                        break;
219                                    }
220                                }
221                                res = stream.next() => {
222                                    match res {
223                                        Some(res) => {
224                                            if sender.send(res.into()).unwrap_or(0) == 0 {
225                                                break;
226                                            }
227                                        }
228                                        None => break,
229                                    }
230                                }
231                            }
232                        }
233                        (unsubscribe)().await;
234                    },
235                    Err(err) => {
236                        _ = tx.send(Err(err.into()));
237                    }
238                }
239                tracing::info!(%mention, "logs subscription end");
240            }
241            .in_current_span()
242        });
243        rx.await
244            .map_err(|_| crate::Error::unknown("worker is dead"))??;
245        Ok(Self {
246            commitment,
247            abort,
248            sender,
249        })
250    }
251}
252
253#[derive(Debug, Default)]
254struct LogsSubscriptions(RwLock<HashMap<Pubkey, LogsSubscription>>);
255
256impl LogsSubscriptions {
257    async fn subscribe(
258        &self,
259        join_set: &mut JoinSet<()>,
260        client: &Arc<SolanaPubsubClient>,
261        mention: &Pubkey,
262        config: SubscriptionConfig,
263    ) -> crate::Result<broadcast::Receiver<WithContext<RpcLogsResponse>>> {
264        let mut map = self.0.write().await;
265        loop {
266            match map.entry(*mention) {
267                Entry::Occupied(entry) => {
268                    let subscription = entry.get();
269                    if subscription.abort.is_finished() {
270                        entry.remove();
271                    } else {
272                        if config.commitment != subscription.commitment {
273                            return Err(crate::Error::invalid_argument(format!(
274                                "commitment mismatched, current: {}",
275                                subscription.commitment.commitment
276                            )));
277                        }
278                        if let Some(receiver) = subscription.sender.subscribe() {
279                            return Ok(receiver);
280                        } else {
281                            entry.remove();
282                        }
283                    }
284                }
285                Entry::Vacant(entry) => {
286                    let (sender, receiver) = broadcast::channel(config.capacity.get());
287                    let subscription = LogsSubscription::init(
288                        join_set,
289                        sender.into(),
290                        client,
291                        mention,
292                        config.commitment,
293                        config.cleanup_interval,
294                    )
295                    .await?;
296                    entry.insert(subscription);
297                    return Ok(receiver);
298                }
299            }
300        }
301    }
302}
303
304#[derive(Debug)]
305struct ClosableSender<T>(Arc<std::sync::RwLock<Option<broadcast::Sender<T>>>>);
306
307impl<T> From<broadcast::Sender<T>> for ClosableSender<T> {
308    fn from(sender: broadcast::Sender<T>) -> Self {
309        Self(Arc::new(std::sync::RwLock::new(Some(sender))))
310    }
311}
312
313impl<T> Clone for ClosableSender<T> {
314    fn clone(&self) -> Self {
315        Self(self.0.clone())
316    }
317}
318
319impl<T> ClosableSender<T> {
320    fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
321        match self.0.read().unwrap().as_ref() {
322            Some(sender) => sender.send(value),
323            None => Err(broadcast::error::SendError(value)),
324        }
325    }
326
327    fn receiver_count(&self) -> Option<usize> {
328        Some(self.0.read().unwrap().as_ref()?.receiver_count())
329    }
330
331    fn subscribe(&self) -> Option<broadcast::Receiver<T>> {
332        Some(self.0.read().unwrap().as_ref()?.subscribe())
333    }
334
335    fn close(&self) -> bool {
336        self.0.write().unwrap().take().is_some()
337    }
338}
339
340impl<T> Drop for ClosableSender<T> {
341    fn drop(&mut self) {
342        self.close();
343    }
344}