1use std::{
2 collections::BTreeMap,
3 ops::Deref,
4 sync::{Arc, OnceLock},
5};
6
7use anchor_client::{
8 anchor_lang::{AccountDeserialize, AnchorSerialize, Discriminator},
9 solana_client::{
10 nonblocking::rpc_client::RpcClient,
11 rpc_config::RpcAccountInfoConfig,
12 rpc_filter::{Memcmp, RpcFilterType},
13 },
14 solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Signer},
15};
16
17use gmsol_model::{price::Prices, PnlFactorKind};
18use gmsol_solana_utils::{
19 bundle_builder::{BundleBuilder, BundleOptions, CreateBundleOptions},
20 cluster::Cluster,
21 program::Program,
22 transaction_builder::{Config, TransactionBuilder},
23};
24use gmsol_store::{
25 states::{
26 deposit::find_first_deposit_receiver_pda, market::status::MarketStatus,
27 position::PositionKind, user::ReferralCodeBytes, NonceBytes, PriceProviderKind,
28 },
29 utils::pubkey::optional_address,
30};
31use gmsol_timelock::states::utils::InstructionBuffer;
32use solana_account_decoder::UiAccountEncoding;
33use tokio::sync::OnceCell;
34use typed_builder::TypedBuilder;
35
36use crate::{
37 store::market::MarketOps,
38 types,
39 utils::{
40 account_with_context, accounts_lazy_with_context, workarounds::zero_copy::SharedZeroCopy,
41 ProgramAccountsConfig, PubsubClient, SubscriptionConfig, WithContext, ZeroCopy,
42 },
43};
44
45const DISC_OFFSET: usize = 8;
46
47#[derive(Debug, Clone, TypedBuilder)]
49pub struct ClientOptions {
50 #[builder(default)]
51 store_program_id: Option<Pubkey>,
52 #[builder(default)]
53 treasury_program_id: Option<Pubkey>,
54 #[builder(default)]
55 timelock_program_id: Option<Pubkey>,
56 #[builder(default)]
57 commitment: CommitmentConfig,
58 #[builder(default)]
59 subscription: SubscriptionConfig,
60}
61
62impl Default for ClientOptions {
63 fn default() -> Self {
64 Self::builder().build()
65 }
66}
67
68pub struct Client<C> {
70 cfg: Config<C>,
71 anchor: Arc<anchor_client::Client<C>>,
72 store_program: Program<C>,
73 treasury_program: Program<C>,
74 timelock_program: Program<C>,
75 rpc: OnceLock<RpcClient>,
76 pub_sub: OnceCell<PubsubClient>,
77 subscription_config: SubscriptionConfig,
78}
79
80impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
81 pub fn new_with_options(
83 cluster: Cluster,
84 payer: C,
85 options: ClientOptions,
86 ) -> crate::Result<Self> {
87 let ClientOptions {
88 store_program_id,
89 treasury_program_id,
90 timelock_program_id,
91 commitment,
92 subscription,
93 } = options;
94 let anchor = anchor_client::Client::new_with_options(
95 cluster.clone().into(),
96 payer.clone(),
97 commitment,
98 );
99 let cfg = Config::new(cluster, payer, commitment);
100 Ok(Self {
101 store_program: Program::new(store_program_id.unwrap_or(gmsol_store::id()), cfg.clone()),
102 treasury_program: Program::new(
103 treasury_program_id.unwrap_or(gmsol_treasury::id()),
104 cfg.clone(),
105 ),
106 timelock_program: Program::new(
107 timelock_program_id.unwrap_or(gmsol_timelock::id()),
108 cfg.clone(),
109 ),
110 cfg,
111 anchor: Arc::new(anchor),
112 pub_sub: OnceCell::default(),
113 rpc: Default::default(),
114 subscription_config: subscription,
115 })
116 }
117
118 pub fn new(cluster: Cluster, payer: C) -> crate::Result<Self> {
120 Self::new_with_options(cluster, payer, ClientOptions::default())
121 }
122
123 pub fn try_clone_with_payer<C2: Clone + Deref<Target = impl Signer>>(
125 &self,
126 payer: C2,
127 ) -> crate::Result<Client<C2>> {
128 Client::new_with_options(
129 self.cluster().clone(),
130 payer,
131 ClientOptions {
132 store_program_id: Some(*self.store_program_id()),
133 treasury_program_id: Some(*self.treasury_program_id()),
134 timelock_program_id: Some(*self.timelock_program_id()),
135 commitment: self.commitment(),
136 subscription: self.subscription_config.clone(),
137 },
138 )
139 }
140
141 pub fn try_clone(&self) -> crate::Result<Self> {
143 Ok(Self {
144 cfg: self.cfg.clone(),
145 anchor: self.anchor.clone(),
146 store_program: self.program(*self.store_program_id()),
147 treasury_program: self.program(*self.treasury_program_id()),
148 timelock_program: self.program(*self.timelock_program_id()),
149 pub_sub: OnceCell::default(),
150 rpc: Default::default(),
151 subscription_config: self.subscription_config.clone(),
152 })
153 }
154
155 pub fn set_subscription_config(&mut self, config: SubscriptionConfig) -> &mut Self {
157 self.subscription_config = config;
158 self
159 }
160
161 pub fn anchor(&self) -> &anchor_client::Client<C> {
163 &self.anchor
164 }
165
166 pub fn program(&self, program_id: Pubkey) -> Program<C> {
168 Program::new(program_id, self.cfg.clone())
169 }
170
171 pub fn cluster(&self) -> &Cluster {
173 self.cfg.cluster()
174 }
175
176 pub fn commitment(&self) -> CommitmentConfig {
178 *self.cfg.commitment()
179 }
180
181 pub fn payer(&self) -> Pubkey {
183 self.cfg.payer()
184 }
185
186 pub fn rpc(&self) -> &RpcClient {
188 self.rpc.get_or_init(|| self.cfg.rpc())
189 }
190
191 pub fn store_program(&self) -> &Program<C> {
193 &self.store_program
194 }
195
196 pub fn treasury_program(&self) -> &Program<C> {
198 &self.treasury_program
199 }
200
201 pub fn timelock_program(&self) -> &Program<C> {
203 &self.timelock_program
204 }
205
206 pub fn new_store_program(&self) -> crate::Result<Program<C>> {
208 Ok(self.program(*self.store_program_id()))
209 }
210
211 pub fn new_treasury_program(&self) -> crate::Result<Program<C>> {
213 Ok(self.program(*self.store_program_id()))
214 }
215
216 pub fn store_program_id(&self) -> &Pubkey {
218 self.store_program().id()
219 }
220
221 pub fn treasury_program_id(&self) -> &Pubkey {
223 self.treasury_program().id()
224 }
225
226 pub fn timelock_program_id(&self) -> &Pubkey {
228 self.timelock_program().id()
229 }
230
231 pub fn store_transaction(&self) -> TransactionBuilder<'_, C> {
233 self.store_program().transaction()
234 }
235
236 pub fn treasury_transaction(&self) -> TransactionBuilder<'_, C> {
238 self.treasury_program().transaction()
239 }
240
241 pub fn timelock_transaction(&self) -> TransactionBuilder<'_, C> {
243 self.timelock_program().transaction()
244 }
245
246 pub fn bundle_with_options(&self, options: BundleOptions) -> BundleBuilder<'_, C> {
248 BundleBuilder::new_with_options(CreateBundleOptions {
249 cluster: self.cluster().clone(),
250 commitment: self.commitment(),
251 options,
252 })
253 }
254
255 pub fn bundle(&self) -> BundleBuilder<C> {
257 self.bundle_with_options(Default::default())
258 }
259
260 pub fn find_store_address(&self, key: &str) -> Pubkey {
262 crate::pda::find_store_address(key, self.store_program_id()).0
263 }
264
265 pub fn find_store_wallet_address(&self, store: &Pubkey) -> Pubkey {
267 crate::pda::find_store_wallet_pda(store, self.store_program_id()).0
268 }
269
270 pub fn store_event_authority(&self) -> Pubkey {
272 crate::pda::find_event_authority_address(self.store_program_id()).0
273 }
274
275 pub fn find_market_vault_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
277 crate::pda::find_market_vault_address(store, token, self.store_program_id()).0
278 }
279
280 pub fn find_market_token_address(
282 &self,
283 store: &Pubkey,
284 index_token: &Pubkey,
285 long_token: &Pubkey,
286 short_token: &Pubkey,
287 ) -> Pubkey {
288 crate::pda::find_market_token_address(
289 store,
290 index_token,
291 long_token,
292 short_token,
293 self.store_program_id(),
294 )
295 .0
296 }
297
298 pub fn find_market_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
300 types::Market::find_market_address(store, token, self.store_program_id()).0
301 }
302
303 pub fn find_deposit_address(
305 &self,
306 store: &Pubkey,
307 user: &Pubkey,
308 nonce: &NonceBytes,
309 ) -> Pubkey {
310 crate::pda::find_deposit_address(store, user, nonce, self.store_program_id()).0
311 }
312
313 pub fn find_first_deposit_owner_address(&self) -> Pubkey {
315 find_first_deposit_receiver_pda(self.store_program_id()).0
316 }
317
318 pub fn find_withdrawal_address(
320 &self,
321 store: &Pubkey,
322 user: &Pubkey,
323 nonce: &NonceBytes,
324 ) -> Pubkey {
325 crate::pda::find_withdrawal_address(store, user, nonce, self.store_program_id()).0
326 }
327
328 pub fn find_order_address(&self, store: &Pubkey, user: &Pubkey, nonce: &NonceBytes) -> Pubkey {
330 crate::pda::find_order_address(store, user, nonce, self.store_program_id()).0
331 }
332
333 pub fn find_shift_address(&self, store: &Pubkey, owner: &Pubkey, nonce: &NonceBytes) -> Pubkey {
335 crate::pda::find_shift_address(store, owner, nonce, self.store_program_id()).0
336 }
337
338 pub fn find_position_address(
340 &self,
341 store: &Pubkey,
342 user: &Pubkey,
343 market_token: &Pubkey,
344 collateral_token: &Pubkey,
345 kind: PositionKind,
346 ) -> crate::Result<Pubkey> {
347 Ok(crate::pda::find_position_address(
348 store,
349 user,
350 market_token,
351 collateral_token,
352 kind,
353 self.store_program_id(),
354 )?
355 .0)
356 }
357
358 pub fn find_claimable_account_address(
360 &self,
361 store: &Pubkey,
362 mint: &Pubkey,
363 user: &Pubkey,
364 time_key: &[u8],
365 ) -> Pubkey {
366 crate::pda::find_claimable_account_pda(store, mint, user, time_key, self.store_program_id())
367 .0
368 }
369
370 pub fn find_trade_event_buffer_address(
372 &self,
373 store: &Pubkey,
374 authority: &Pubkey,
375 index: u16,
376 ) -> Pubkey {
377 crate::pda::find_trade_event_buffer_pda(store, authority, index, self.store_program_id()).0
378 }
379
380 pub fn find_user_address(&self, store: &Pubkey, owner: &Pubkey) -> Pubkey {
382 crate::pda::find_user_pda(store, owner, self.store_program_id()).0
383 }
384
385 pub fn find_referral_code_address(&self, store: &Pubkey, code: ReferralCodeBytes) -> Pubkey {
387 crate::pda::find_referral_code_pda(store, code, self.store_program_id()).0
388 }
389
390 pub fn find_glv_token_address(&self, store: &Pubkey, index: u16) -> Pubkey {
392 types::Glv::find_glv_token_pda(store, index, self.store_program_id()).0
393 }
394
395 pub fn find_glv_address(&self, glv_token: &Pubkey) -> Pubkey {
397 types::Glv::find_glv_pda(glv_token, self.store_program_id()).0
398 }
399
400 pub fn find_glv_deposit_address(
402 &self,
403 store: &Pubkey,
404 owner: &Pubkey,
405 nonce: &NonceBytes,
406 ) -> Pubkey {
407 crate::pda::find_glv_deposit_pda(store, owner, nonce, self.store_program_id()).0
408 }
409
410 pub fn find_glv_withdrawal_address(
412 &self,
413 store: &Pubkey,
414 owner: &Pubkey,
415 nonce: &NonceBytes,
416 ) -> Pubkey {
417 crate::pda::find_glv_withdrawal_pda(store, owner, nonce, self.store_program_id()).0
418 }
419
420 pub fn find_gt_exchange_vault_address(
422 &self,
423 store: &Pubkey,
424 time_window_index: i64,
425 time_window: u32,
426 ) -> Pubkey {
427 crate::pda::find_gt_exchange_vault_pda(
428 store,
429 time_window_index,
430 time_window,
431 self.store_program_id(),
432 )
433 .0
434 }
435
436 pub fn find_gt_exchange_address(&self, vault: &Pubkey, owner: &Pubkey) -> Pubkey {
438 crate::pda::find_gt_exchange_pda(vault, owner, self.store_program_id()).0
439 }
440
441 pub fn find_price_feed_address(
443 &self,
444 store: &Pubkey,
445 authority: &Pubkey,
446 index: u16,
447 provider: PriceProviderKind,
448 token: &Pubkey,
449 ) -> Pubkey {
450 crate::pda::find_price_feed_pda(
451 store,
452 authority,
453 index,
454 provider,
455 token,
456 self.store_program_id(),
457 )
458 .0
459 }
460
461 pub fn find_treasury_config_address(&self, store: &Pubkey) -> Pubkey {
463 crate::pda::find_treasury_config_pda(store, self.treasury_program_id()).0
464 }
465
466 pub fn find_treasury_vault_config_address(&self, config: &Pubkey, index: u16) -> Pubkey {
468 crate::pda::find_treasury_vault_config_pda(config, index, self.treasury_program_id()).0
469 }
470
471 pub fn find_gt_bank_address(
473 &self,
474 treasury_vault_config: &Pubkey,
475 gt_exchange_vault: &Pubkey,
476 ) -> Pubkey {
477 crate::pda::find_gt_bank_pda(
478 treasury_vault_config,
479 gt_exchange_vault,
480 self.treasury_program_id(),
481 )
482 .0
483 }
484
485 pub fn find_treasury_receiver_address(&self, config: &Pubkey) -> Pubkey {
487 crate::pda::find_treasury_receiver_pda(config, self.treasury_program_id()).0
488 }
489
490 pub fn find_timelock_config_address(&self, store: &Pubkey) -> Pubkey {
492 crate::pda::find_timelock_config_pda(store, self.timelock_program_id()).0
493 }
494
495 pub fn find_executor_address(&self, store: &Pubkey, role: &str) -> crate::Result<Pubkey> {
497 Ok(crate::pda::find_executor_pda(store, role, self.timelock_program_id())?.0)
498 }
499
500 pub fn find_executor_wallet_address(&self, executor: &Pubkey) -> Pubkey {
502 crate::pda::find_executor_wallet_pda(executor, self.timelock_program_id()).0
503 }
504
505 pub async fn get_slot(&self, commitment: Option<CommitmentConfig>) -> crate::Result<u64> {
507 let slot = self
508 .store_program()
509 .rpc()
510 .get_slot_with_commitment(commitment.unwrap_or(self.commitment()))
511 .await
512 .map_err(anchor_client::ClientError::from)?;
513 Ok(slot)
514 }
515
516 pub async fn store_accounts_with_config<T>(
518 &self,
519 filter_by_store: Option<StoreFilter>,
520 other_filters: impl IntoIterator<Item = RpcFilterType>,
521 config: ProgramAccountsConfig,
522 ) -> crate::Result<WithContext<Vec<(Pubkey, T)>>>
523 where
524 T: AccountDeserialize + Discriminator,
525 {
526 let filters = std::iter::empty()
527 .chain(
528 filter_by_store
529 .inspect(|filter| {
530 let store = &filter.store;
531 tracing::debug!(%store, offset=%filter.store_offset(), "store bytes to filter: {}", hex::encode(store));
532 })
533 .map(RpcFilterType::from),
534 )
535 .chain(other_filters);
536 accounts_lazy_with_context(self.store_program(), filters, config)
537 .await?
538 .map(|iter| iter.collect())
539 .transpose()
540 }
541
542 pub async fn account_with_config<T>(
546 &self,
547 address: &Pubkey,
548 mut config: RpcAccountInfoConfig,
549 ) -> crate::Result<WithContext<Option<T>>>
550 where
551 T: AccountDeserialize,
552 {
553 config.encoding = Some(config.encoding.unwrap_or(UiAccountEncoding::Base64));
554 let client = self.store_program().rpc();
555 account_with_context(&client, address, config).await
556 }
557
558 pub async fn account<T: AccountDeserialize>(
560 &self,
561 address: &Pubkey,
562 ) -> crate::Result<Option<T>> {
563 Ok(self
564 .account_with_config(address, Default::default())
565 .await?
566 .into_value())
567 }
568
569 pub async fn store_accounts<T>(
571 &self,
572 filter_by_store: Option<StoreFilter>,
573 other_filters: impl IntoIterator<Item = RpcFilterType>,
574 ) -> crate::Result<Vec<(Pubkey, T)>>
575 where
576 T: AccountDeserialize + Discriminator,
577 {
578 let res = self
579 .store_accounts_with_config(
580 filter_by_store,
581 other_filters,
582 ProgramAccountsConfig::default(),
583 )
584 .await?;
585 tracing::debug!(slot=%res.slot(), "accounts fetched");
586 Ok(res.into_value())
587 }
588
589 pub async fn store(&self, address: &Pubkey) -> crate::Result<Arc<types::Store>> {
591 Ok(self
592 .account::<SharedZeroCopy<types::Store>>(address)
593 .await?
594 .ok_or(crate::Error::NotFound)?
595 .0)
596 }
597
598 pub async fn user(&self, address: &Pubkey) -> crate::Result<types::user::UserHeader> {
600 Ok(self
601 .account::<ZeroCopy<types::user::UserHeader>>(address)
602 .await?
603 .ok_or(crate::Error::NotFound)?
604 .0)
605 }
606
607 pub async fn authorized_token_map_address(
609 &self,
610 store: &Pubkey,
611 ) -> crate::Result<Option<Pubkey>> {
612 let store = self.store(store).await?;
613 let token_map = store.token_map;
614 Ok(optional_address(&token_map).copied())
615 }
616
617 pub async fn token_map(&self, address: &Pubkey) -> crate::Result<types::TokenMap> {
619 self.account(address).await?.ok_or(crate::Error::NotFound)
620 }
621
622 pub async fn authorized_token_map(&self, store: &Pubkey) -> crate::Result<types::TokenMap> {
624 let address = self
625 .authorized_token_map_address(store)
626 .await?
627 .ok_or(crate::Error::invalid_argument("token map is not set"))?;
628 self.token_map(&address).await
629 }
630
631 pub async fn markets_with_config(
633 &self,
634 store: &Pubkey,
635 config: ProgramAccountsConfig,
636 ) -> crate::Result<WithContext<BTreeMap<Pubkey, types::Market>>> {
637 let markets = self
638 .store_accounts_with_config::<ZeroCopy<types::Market>>(
639 Some(StoreFilter::new(
640 store,
641 bytemuck::offset_of!(types::Market, store),
642 )),
643 None,
644 config,
645 )
646 .await?
647 .map(|accounts| {
648 accounts
649 .into_iter()
650 .map(|(pubkey, m)| (pubkey, m.0))
651 .collect::<BTreeMap<_, _>>()
652 });
653 Ok(markets)
654 }
655
656 pub async fn markets(&self, store: &Pubkey) -> crate::Result<BTreeMap<Pubkey, types::Market>> {
658 let markets = self
659 .markets_with_config(store, ProgramAccountsConfig::default())
660 .await?
661 .into_value();
662 Ok(markets)
663 }
664
665 pub async fn market_with_config<T>(
669 &self,
670 address: &Pubkey,
671 config: RpcAccountInfoConfig,
672 ) -> crate::Result<WithContext<Option<types::Market>>> {
673 let market = self
674 .account_with_config::<ZeroCopy<types::Market>>(address, config)
675 .await?;
676 Ok(market.map(|m| m.map(|m| m.0)))
677 }
678
679 pub async fn market(&self, address: &Pubkey) -> crate::Result<Arc<types::Market>> {
681 Ok(self
682 .account::<SharedZeroCopy<types::Market>>(address)
683 .await?
684 .ok_or(crate::Error::NotFound)?
685 .0)
686 }
687
688 pub async fn glvs_with_config(
690 &self,
691 store: &Pubkey,
692 config: ProgramAccountsConfig,
693 ) -> crate::Result<WithContext<BTreeMap<Pubkey, types::Glv>>> {
694 let glvs = self
695 .store_accounts_with_config::<ZeroCopy<types::Glv>>(
696 Some(StoreFilter::new(
697 store,
698 bytemuck::offset_of!(types::Glv, store),
699 )),
700 None,
701 config,
702 )
703 .await?
704 .map(|accounts| {
705 accounts
706 .into_iter()
707 .map(|(pubkey, m)| (pubkey, m.0))
708 .collect::<BTreeMap<_, _>>()
709 });
710 Ok(glvs)
711 }
712
713 pub async fn glvs(&self, store: &Pubkey) -> crate::Result<BTreeMap<Pubkey, types::Glv>> {
715 let glvs = self
716 .glvs_with_config(store, ProgramAccountsConfig::default())
717 .await?
718 .into_value();
719 Ok(glvs)
720 }
721
722 pub async fn market_status(
724 &self,
725 store: &Pubkey,
726 market_token: &Pubkey,
727 prices: Prices<u128>,
728 maximize_pnl: bool,
729 maximize_pool_value: bool,
730 ) -> crate::Result<MarketStatus> {
731 let req = self.get_market_status(
732 store,
733 market_token,
734 prices,
735 maximize_pnl,
736 maximize_pool_value,
737 );
738 let status = crate::utils::view::<MarketStatus>(
739 &self.store_program().rpc(),
740 &req.signed_transaction_with_options(true, None).await?,
741 )
742 .await?;
743 Ok(status)
744 }
745
746 pub async fn market_token_price(
748 &self,
749 store: &Pubkey,
750 market_token: &Pubkey,
751 prices: Prices<u128>,
752 pnl_factor: PnlFactorKind,
753 maximize: bool,
754 ) -> crate::Result<u128> {
755 let req = self.get_market_token_price(store, market_token, prices, pnl_factor, maximize);
756 let price = crate::utils::view::<u128>(
757 &self.store_program().rpc(),
758 &req.signed_transaction_with_options(true, None).await?,
759 )
760 .await?;
761 Ok(price)
762 }
763
764 pub async fn positions(
766 &self,
767 store: &Pubkey,
768 owner: Option<&Pubkey>,
769 market_token: Option<&Pubkey>,
770 ) -> crate::Result<BTreeMap<Pubkey, types::Position>> {
771 let filter = match owner {
772 Some(owner) => {
773 let mut bytes = owner.as_ref().to_owned();
774 if let Some(market_token) = market_token {
775 bytes.extend_from_slice(market_token.as_ref());
776 }
777 let filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
778 bytemuck::offset_of!(types::Position, owner) + DISC_OFFSET,
779 bytes,
780 ));
781 Some(filter)
782 }
783 None => market_token.and_then(|token| {
784 Some(RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
785 bytemuck::offset_of!(types::Position, market_token) + DISC_OFFSET,
786 token.try_to_vec().ok()?,
787 )))
788 }),
789 };
790
791 let store_filter = StoreFilter::new(store, bytemuck::offset_of!(types::Position, store));
792
793 let positions = self
794 .store_accounts::<ZeroCopy<types::Position>>(Some(store_filter), filter)
795 .await?
796 .into_iter()
797 .map(|(pubkey, p)| (pubkey, p.0))
798 .collect();
799
800 Ok(positions)
801 }
802
803 pub async fn position(&self, address: &Pubkey) -> crate::Result<types::Position> {
805 let position = self
806 .account::<ZeroCopy<types::Position>>(address)
807 .await?
808 .ok_or(crate::Error::NotFound)?;
809 Ok(position.0)
810 }
811
812 pub async fn order(&self, address: &Pubkey) -> crate::Result<types::Order> {
814 Ok(self
815 .account::<ZeroCopy<types::Order>>(address)
816 .await?
817 .ok_or(crate::Error::NotFound)?
818 .0)
819 }
820
821 pub async fn order_with_config(
825 &self,
826 address: &Pubkey,
827 config: RpcAccountInfoConfig,
828 ) -> crate::Result<WithContext<Option<types::Order>>> {
829 Ok(self
830 .account_with_config::<ZeroCopy<types::Order>>(address, config)
831 .await?
832 .map(|a| a.map(|a| a.0)))
833 }
834
835 pub async fn orders(
837 &self,
838 store: &Pubkey,
839 owner: Option<&Pubkey>,
840 market_token: Option<&Pubkey>,
841 ) -> crate::Result<BTreeMap<Pubkey, types::Order>> {
842 let mut filters = Vec::default();
843 if let Some(owner) = owner {
844 filters.push(RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
845 bytemuck::offset_of!(types::common::ActionHeader, owner) + DISC_OFFSET,
846 owner.as_ref().to_owned(),
847 )));
848 }
849 if let Some(market_token) = market_token {
850 let market = self.find_market_address(store, market_token);
851 filters.push(RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
852 bytemuck::offset_of!(types::common::ActionHeader, market) + DISC_OFFSET,
853 market.as_ref().to_owned(),
854 )));
855 }
856 let store_filter = StoreFilter::new(
857 store,
858 bytemuck::offset_of!(types::common::ActionHeader, store),
859 );
860
861 let orders = self
862 .store_accounts::<ZeroCopy<types::Order>>(Some(store_filter), filters)
863 .await?
864 .into_iter()
865 .map(|(addr, order)| (addr, order.0))
866 .collect();
867
868 Ok(orders)
869 }
870
871 pub async fn deposit(&self, address: &Pubkey) -> crate::Result<types::Deposit> {
873 Ok(self
874 .account::<ZeroCopy<_>>(address)
875 .await?
876 .ok_or(crate::Error::NotFound)?
877 .0)
878 }
879
880 pub async fn withdrawal(&self, address: &Pubkey) -> crate::Result<types::Withdrawal> {
882 Ok(self
883 .account::<ZeroCopy<types::Withdrawal>>(address)
884 .await?
885 .ok_or(crate::Error::NotFound)?
886 .0)
887 }
888
889 pub async fn price_feed(&self, address: &Pubkey) -> crate::Result<Option<types::PriceFeed>> {
891 Ok(self
892 .account::<ZeroCopy<types::PriceFeed>>(address)
893 .await?
894 .map(|a| a.0))
895 }
896
897 pub async fn instruction_buffer(
899 &self,
900 address: &Pubkey,
901 ) -> crate::Result<Option<InstructionBuffer>> {
902 self.account::<InstructionBuffer>(address).await
903 }
904
905 pub async fn pub_sub(&self) -> crate::Result<&PubsubClient> {
907 let client = self
908 .pub_sub
909 .get_or_try_init(|| {
910 PubsubClient::new(self.cluster().clone(), self.subscription_config.clone())
911 })
912 .await?;
913 Ok(client)
914 }
915
916 #[cfg(feature = "decode")]
918 pub async fn subscribe_store_cpi_events(
919 &self,
920 commitment: Option<CommitmentConfig>,
921 ) -> crate::Result<
922 impl futures_util::Stream<
923 Item = crate::Result<crate::utils::WithSlot<Vec<crate::store::events::StoreCPIEvent>>>,
924 >,
925 > {
926 use futures_util::TryStreamExt;
927 use gmsol_decode::Decode;
928
929 use crate::{
930 store::events::StoreCPIEvent,
931 utils::{extract_cpi_events, WithSlot},
932 };
933
934 let program_id = self.store_program_id();
935 let event_authority = self.store_event_authority();
936 let query = Arc::new(self.store_program().rpc());
937 let commitment = commitment.unwrap_or(self.subscription_config.commitment);
938 let signatures = self
939 .pub_sub()
940 .await?
941 .logs_subscribe(&event_authority, Some(commitment))
942 .await?
943 .and_then(|txn| {
944 let signature = WithSlot::from(txn)
945 .map(|txn| {
946 txn.signature
947 .parse()
948 .map_err(crate::Error::invalid_argument)
949 })
950 .transpose();
951 async move { signature }
952 });
953 let events = extract_cpi_events(
954 signatures,
955 query,
956 program_id,
957 &event_authority,
958 commitment,
959 Some(0),
960 )
961 .try_filter_map(|event| {
962 let decoded = event
963 .map(|event| {
964 event
965 .events
966 .iter()
967 .map(|event| StoreCPIEvent::decode(event).map_err(crate::Error::from))
968 .collect::<crate::Result<Vec<_>>>()
969 })
970 .transpose()
971 .inspect_err(|err| tracing::error!(%err, "decode error"))
972 .ok();
973 async move { Ok(decoded) }
974 });
975 Ok(events)
976 }
977
978 #[cfg(feature = "decode")]
980 pub async fn historical_store_cpi_events(
981 &self,
982 address: &Pubkey,
983 commitment: Option<CommitmentConfig>,
984 ) -> crate::Result<
985 impl futures_util::Stream<
986 Item = crate::Result<crate::utils::WithSlot<Vec<crate::store::events::StoreCPIEvent>>>,
987 >,
988 > {
989 use futures_util::TryStreamExt;
990 use gmsol_decode::Decode;
991
992 use crate::{
993 store::events::StoreCPIEvent,
994 utils::{extract_cpi_events, fetch_transaction_history_with_config},
995 };
996
997 let commitment = commitment.unwrap_or(self.commitment());
998 let client = Arc::new(self.store_program().rpc());
999 let signatures = fetch_transaction_history_with_config(
1000 client.clone(),
1001 address,
1002 commitment,
1003 None,
1004 None,
1005 None,
1006 )
1007 .await?;
1008 let events = extract_cpi_events(
1009 signatures,
1010 client,
1011 self.store_program_id(),
1012 &self.store_event_authority(),
1013 commitment,
1014 Some(0),
1015 )
1016 .try_filter(|events| std::future::ready(!events.value().events.is_empty()))
1017 .and_then(|encoded| {
1018 let decoded = encoded
1019 .map(|event| {
1020 event
1021 .events
1022 .iter()
1023 .map(|event| StoreCPIEvent::decode(event).map_err(crate::Error::from))
1024 .collect::<crate::Result<Vec<_>>>()
1025 })
1026 .transpose();
1027 async move { decoded }
1028 });
1029 Ok(events)
1030 }
1031
1032 #[cfg(feature = "decode")]
1034 pub async fn complete_order(
1035 &self,
1036 address: &Pubkey,
1037 commitment: Option<CommitmentConfig>,
1038 ) -> crate::Result<Option<crate::types::TradeEvent>> {
1039 let slot = self.get_slot(None).await?;
1040 self.complete_order_with_config(
1041 address,
1042 slot,
1043 std::time::Duration::from_secs(5),
1044 commitment,
1045 )
1046 .await
1047 }
1048
1049 #[cfg(feature = "decode")]
1051 pub async fn last_order_events(
1052 &self,
1053 order: &Pubkey,
1054 before_slot: u64,
1055 commitment: CommitmentConfig,
1056 ) -> crate::Result<Vec<crate::store::events::StoreCPIEvent>> {
1057 use futures_util::{StreamExt, TryStreamExt};
1058
1059 let events = self
1060 .historical_store_cpi_events(order, Some(commitment))
1061 .await?
1062 .try_filter(|events| {
1063 let pass = events.slot() <= before_slot;
1064 async move { pass }
1065 })
1066 .take(1);
1067 futures_util::pin_mut!(events);
1068 match events.next().await.transpose()? {
1069 Some(events) => Ok(events.into_value()),
1070 None => Err(crate::Error::unknown(format!(
1071 "events not found, slot={before_slot}"
1072 ))),
1073 }
1074 }
1075
1076 #[cfg(feature = "decode")]
1078 pub async fn complete_order_with_config(
1079 &self,
1080 address: &Pubkey,
1081 mut slot: u64,
1082 polling: std::time::Duration,
1083 commitment: Option<CommitmentConfig>,
1084 ) -> crate::Result<Option<crate::types::TradeEvent>> {
1085 use crate::store::events::StoreCPIEvent;
1086 use futures_util::{StreamExt, TryStreamExt};
1087 use solana_account_decoder::UiAccountEncoding;
1088
1089 let mut trade = None;
1090 let commitment = commitment.unwrap_or(self.subscription_config.commitment);
1091
1092 let events = self.subscribe_store_cpi_events(Some(commitment)).await?;
1093
1094 let config = RpcAccountInfoConfig {
1095 encoding: Some(UiAccountEncoding::Base64),
1096 commitment: Some(commitment),
1097 min_context_slot: Some(slot),
1098 ..Default::default()
1099 };
1100 let mut slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1101 if slot_reached {
1102 let order = self.order_with_config(address, config.clone()).await?;
1103 slot = order.slot();
1104 let order = order.into_value();
1105 if order.is_none() {
1106 let events = self.last_order_events(address, slot, commitment).await?;
1107 return Ok(events
1108 .into_iter()
1109 .filter_map(|event| {
1110 if let StoreCPIEvent::TradeEvent(event) = event {
1111 Some(event)
1112 } else {
1113 None
1114 }
1115 })
1116 .next());
1117 }
1118 }
1119 let address = *address;
1120 let stream = events
1121 .try_filter_map(|events| async {
1122 if events.slot() < slot {
1123 return Ok(None);
1124 }
1125 let events = events
1126 .into_value()
1127 .into_iter()
1128 .filter(|event| {
1129 matches!(
1130 event,
1131 StoreCPIEvent::TradeEvent(_) | StoreCPIEvent::OrderRemoved(_)
1132 )
1133 })
1134 .map(Ok);
1135 Ok(Some(futures_util::stream::iter(events)))
1136 })
1137 .try_flatten();
1138 let stream =
1139 tokio_stream::StreamExt::timeout_repeating(stream, tokio::time::interval(polling));
1140 futures_util::pin_mut!(stream);
1141 while let Some(res) = stream.next().await {
1142 match res {
1143 Ok(Ok(event)) => match event {
1144 StoreCPIEvent::TradeEvent(event) => {
1145 trade = Some(event);
1146 }
1147 StoreCPIEvent::OrderRemoved(_remove) => {
1148 return Ok(trade);
1149 }
1150 _ => unreachable!(),
1151 },
1152 Ok(Err(err)) => {
1153 return Err(err);
1154 }
1155 Err(_elapsed) => {
1156 if slot_reached {
1157 let res = self.order_with_config(&address, config.clone()).await?;
1158 if res.value().is_none() {
1159 let events = self
1160 .last_order_events(&address, res.slot(), commitment)
1161 .await?;
1162 return Ok(events
1163 .into_iter()
1164 .filter_map(|event| {
1165 if let StoreCPIEvent::TradeEvent(event) = event {
1166 Some(event)
1167 } else {
1168 None
1169 }
1170 })
1171 .next());
1172 }
1173 } else {
1174 slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1175 }
1176 }
1177 }
1178 }
1179 Err(crate::Error::unknown("the watch stream end"))
1180 }
1181
1182 pub async fn shutdown(&self) -> crate::Result<()> {
1184 self.pub_sub().await?.shutdown().await
1185 }
1186
1187 pub async fn gt_exchanges(
1189 &self,
1190 store: &Pubkey,
1191 owner: &Pubkey,
1192 ) -> crate::Result<BTreeMap<Pubkey, types::gt::GtExchange>> {
1193 use types::gt::GtExchange;
1194
1195 let store_filter = StoreFilter::new(store, bytemuck::offset_of!(GtExchange, store));
1196 let owner_filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1197 8 + bytemuck::offset_of!(GtExchange, owner),
1198 owner.as_ref(),
1199 ));
1200 let exchanges = self
1201 .store_accounts::<ZeroCopy<GtExchange>>(Some(store_filter), Some(owner_filter))
1202 .await?;
1203 Ok(exchanges
1204 .into_iter()
1205 .map(|(address, exchange)| (address, exchange.0))
1206 .collect())
1207 }
1208}
1209
1210pub trait SystemProgramOps<C> {
1212 fn transfer(&self, to: &Pubkey, lamports: u64) -> crate::Result<TransactionBuilder<C>>;
1214}
1215
1216impl<C: Clone + Deref<Target = impl Signer>> SystemProgramOps<C> for Client<C> {
1217 fn transfer(&self, to: &Pubkey, lamports: u64) -> crate::Result<TransactionBuilder<C>> {
1218 use anchor_client::solana_sdk::system_instruction::transfer;
1219
1220 if lamports == 0 {
1221 return Err(crate::Error::invalid_argument(
1222 "transferring amount is zero",
1223 ));
1224 }
1225 Ok(self
1226 .store_transaction()
1227 .pre_instruction(transfer(&self.payer(), to, lamports)))
1228 }
1229}
1230
1231#[derive(Debug)]
1233pub struct StoreFilter {
1234 store: Pubkey,
1236 store_offset: usize,
1238 ignore_disc_offset: bool,
1240}
1241
1242impl StoreFilter {
1243 pub fn new(store: &Pubkey, store_offset: usize) -> Self {
1245 Self {
1246 store: *store,
1247 store_offset,
1248 ignore_disc_offset: false,
1249 }
1250 }
1251
1252 pub fn ignore_disc_offset(mut self, ignore: bool) -> Self {
1254 self.ignore_disc_offset = ignore;
1255 self
1256 }
1257
1258 pub fn store_offset(&self) -> usize {
1260 if self.ignore_disc_offset {
1261 self.store_offset
1262 } else {
1263 self.store_offset + DISC_OFFSET
1264 }
1265 }
1266}
1267
1268impl From<StoreFilter> for RpcFilterType {
1269 fn from(filter: StoreFilter) -> Self {
1270 let store = filter.store;
1271 let store_offset = filter.store_offset();
1272 RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
1273 store_offset,
1274 store.as_ref().to_owned(),
1275 ))
1276 }
1277}