1use std::{
2 collections::{hash_map::Entry, HashMap},
3 num::NonZeroUsize,
4 ops::DerefMut,
5 sync::Arc,
6 time::Duration,
7};
8
9use anchor_client::{
10 solana_client::{
11 nonblocking::pubsub_client::PubsubClient as SolanaPubsubClient,
12 rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter},
13 rpc_response::RpcLogsResponse,
14 },
15 solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey},
16};
17use futures_util::{Stream, StreamExt, TryStreamExt};
18use gmsol_solana_utils::cluster::Cluster;
19use tokio::{
20 sync::{broadcast, oneshot, Mutex, RwLock},
21 task::{AbortHandle, JoinSet},
22};
23use tokio_stream::wrappers::BroadcastStream;
24use tracing::Instrument;
25
26use crate::utils::WithContext;
27
28#[derive(Debug)]
31pub struct PubsubClient {
32 inner: RwLock<Option<Inner>>,
33 cluster: Cluster,
34 config: SubscriptionConfig,
35}
36
37impl PubsubClient {
38 pub async fn new(cluster: Cluster, config: SubscriptionConfig) -> crate::Result<Self> {
40 Ok(Self {
41 inner: RwLock::new(None),
42 cluster,
43 config,
44 })
45 }
46
47 async fn prepare(&self) -> crate::Result<()> {
48 if self.inner.read().await.is_some() {
49 return Ok(());
50 }
51 self.reset().await
52 }
53
54 pub async fn logs_subscribe(
56 &self,
57 mention: &Pubkey,
58 commitment: Option<CommitmentConfig>,
59 ) -> crate::Result<impl Stream<Item = crate::Result<WithContext<RpcLogsResponse>>>> {
60 self.prepare().await?;
61 let res = self
62 .inner
63 .read()
64 .await
65 .as_ref()
66 .ok_or_else(|| crate::Error::invalid_argument("the pubsub client has been closed"))?
67 .logs_subscribe(mention, commitment, &self.config)
68 .await;
69 match res {
70 Ok(stream) => Ok(stream),
71 Err(crate::Error::PubsubClosed) => {
72 self.reset().await?;
73 Err(crate::Error::PubsubClosed)
74 }
75 Err(err) => Err(err),
76 }
77 }
78
79 pub async fn reset(&self) -> crate::Result<()> {
81 let client = SolanaPubsubClient::new(self.cluster.ws_url())
82 .await
83 .map_err(anchor_client::ClientError::from)?;
84 let mut inner = self.inner.write().await;
85 if let Some(previous) = inner.take() {
86 _ = previous.shutdown().await;
87 }
88 *inner = Some(Inner::new(client));
89 Ok(())
90 }
91
92 pub async fn shutdown(&self) -> crate::Result<()> {
94 if let Some(inner) = self.inner.write().await.take() {
95 inner.shutdown().await?;
96 }
97 Ok(())
98 }
99}
100
101#[derive(Debug)]
102struct Inner {
103 tasks: Mutex<JoinSet<()>>,
104 client: Arc<SolanaPubsubClient>,
105 logs: LogsSubscriptions,
106}
107
108impl Inner {
109 fn new(client: SolanaPubsubClient) -> Self {
110 Self {
111 tasks: Default::default(),
112 client: Arc::new(client),
113 logs: Default::default(),
114 }
115 }
116
117 async fn logs_subscribe(
118 &self,
119 mention: &Pubkey,
120 commitment: Option<CommitmentConfig>,
121 config: &SubscriptionConfig,
122 ) -> crate::Result<impl Stream<Item = crate::Result<WithContext<RpcLogsResponse>>>> {
123 let config = SubscriptionConfig {
124 commitment: commitment.unwrap_or(config.commitment),
125 ..*config
126 };
127 let receiver = self
128 .logs
129 .subscribe(
130 self.tasks.lock().await.deref_mut(),
131 &self.client,
132 mention,
133 config,
134 )
135 .await?;
136 Ok(BroadcastStream::new(receiver).map_err(crate::Error::from))
137 }
138
139 async fn shutdown(self) -> crate::Result<()> {
140 self.tasks.lock().await.shutdown().await;
141 Arc::into_inner(self.client)
142 .ok_or_else(|| {
143 crate::Error::unknown("the client should be unique here, but it is not")
144 })?
145 .shutdown()
146 .await
147 .map_err(anchor_client::ClientError::from)?;
148 Ok(())
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct SubscriptionConfig {
155 pub commitment: CommitmentConfig,
157 pub cleanup_interval: Duration,
159 pub capacity: NonZeroUsize,
161}
162
163impl Default for SubscriptionConfig {
164 fn default() -> Self {
165 Self {
166 commitment: CommitmentConfig::finalized(),
167 cleanup_interval: Duration::from_secs(10),
168 capacity: NonZeroUsize::new(256).unwrap(),
169 }
170 }
171}
172
173#[derive(Debug)]
174struct LogsSubscription {
175 commitment: CommitmentConfig,
176 sender: ClosableSender<WithContext<RpcLogsResponse>>,
177 abort: AbortHandle,
178}
179
180impl Drop for LogsSubscription {
181 fn drop(&mut self) {
182 self.abort.abort();
183 }
184}
185
186impl LogsSubscription {
187 async fn init(
188 join_set: &mut JoinSet<()>,
189 sender: ClosableSender<WithContext<RpcLogsResponse>>,
190 client: &Arc<SolanaPubsubClient>,
191 mention: &Pubkey,
192 commitment: CommitmentConfig,
193 cleanup_interval: Duration,
194 ) -> crate::Result<Self> {
195 let (tx, rx) = oneshot::channel::<crate::Result<_>>();
196 let abort = join_set.spawn({
197 let client = client.clone();
198 let mention = *mention;
199 let sender = sender.clone();
200 async move {
201 let res = client
202 .logs_subscribe(
203 RpcTransactionLogsFilter::Mentions(vec![mention.to_string()]),
204 RpcTransactionLogsConfig { commitment: Some(commitment) },
205 )
206 .await
207 .inspect_err(
208 |err| tracing::error!(%err, %mention, "failed to subscribe transaction logs"),
209 );
210 match res {
211 Ok((mut stream, unsubscribe)) => {
212 _ = tx.send(Ok(()));
213 let mut interval = tokio::time::interval(cleanup_interval);
214 loop {
215 tokio::select! {
216 _ = interval.tick() => {
217 if sender.receiver_count().unwrap_or(0) == 0 {
218 break;
219 }
220 }
221 res = stream.next() => {
222 match res {
223 Some(res) => {
224 if sender.send(res.into()).unwrap_or(0) == 0 {
225 break;
226 }
227 }
228 None => break,
229 }
230 }
231 }
232 }
233 (unsubscribe)().await;
234 },
235 Err(err) => {
236 _ = tx.send(Err(err.into()));
237 }
238 }
239 tracing::info!(%mention, "logs subscription end");
240 }
241 .in_current_span()
242 });
243 rx.await
244 .map_err(|_| crate::Error::unknown("worker is dead"))??;
245 Ok(Self {
246 commitment,
247 abort,
248 sender,
249 })
250 }
251}
252
253#[derive(Debug, Default)]
254struct LogsSubscriptions(RwLock<HashMap<Pubkey, LogsSubscription>>);
255
256impl LogsSubscriptions {
257 async fn subscribe(
258 &self,
259 join_set: &mut JoinSet<()>,
260 client: &Arc<SolanaPubsubClient>,
261 mention: &Pubkey,
262 config: SubscriptionConfig,
263 ) -> crate::Result<broadcast::Receiver<WithContext<RpcLogsResponse>>> {
264 let mut map = self.0.write().await;
265 loop {
266 match map.entry(*mention) {
267 Entry::Occupied(entry) => {
268 let subscription = entry.get();
269 if subscription.abort.is_finished() {
270 entry.remove();
271 } else {
272 if config.commitment != subscription.commitment {
273 return Err(crate::Error::invalid_argument(format!(
274 "commitment mismatched, current: {}",
275 subscription.commitment.commitment
276 )));
277 }
278 if let Some(receiver) = subscription.sender.subscribe() {
279 return Ok(receiver);
280 } else {
281 entry.remove();
282 }
283 }
284 }
285 Entry::Vacant(entry) => {
286 let (sender, receiver) = broadcast::channel(config.capacity.get());
287 let subscription = LogsSubscription::init(
288 join_set,
289 sender.into(),
290 client,
291 mention,
292 config.commitment,
293 config.cleanup_interval,
294 )
295 .await?;
296 entry.insert(subscription);
297 return Ok(receiver);
298 }
299 }
300 }
301 }
302}
303
304#[derive(Debug)]
305struct ClosableSender<T>(Arc<std::sync::RwLock<Option<broadcast::Sender<T>>>>);
306
307impl<T> From<broadcast::Sender<T>> for ClosableSender<T> {
308 fn from(sender: broadcast::Sender<T>) -> Self {
309 Self(Arc::new(std::sync::RwLock::new(Some(sender))))
310 }
311}
312
313impl<T> Clone for ClosableSender<T> {
314 fn clone(&self) -> Self {
315 Self(self.0.clone())
316 }
317}
318
319impl<T> ClosableSender<T> {
320 fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
321 match self.0.read().unwrap().as_ref() {
322 Some(sender) => sender.send(value),
323 None => Err(broadcast::error::SendError(value)),
324 }
325 }
326
327 fn receiver_count(&self) -> Option<usize> {
328 Some(self.0.read().unwrap().as_ref()?.receiver_count())
329 }
330
331 fn subscribe(&self) -> Option<broadcast::Receiver<T>> {
332 Some(self.0.read().unwrap().as_ref()?.subscribe())
333 }
334
335 fn close(&self) -> bool {
336 self.0.write().unwrap().take().is_some()
337 }
338}
339
340impl<T> Drop for ClosableSender<T> {
341 fn drop(&mut self) {
342 self.close();
343 }
344}