[docs]classSubscriptionScanner(Scanner):'''a Fast Subscription-Based Scanner that uses `eth_subscribe` to monitor events (**MAY NOT COVER SOME USE CASES**) Args: config (:class:`houndcore.config.Config`): the config'''def__init__(self,config:Config,dispatcher:Dispatcher|None=None):ifnotconfig.RPC.startswith("ws"):raiseIncorrectRPC(scanner=SubscriptionScanner,rpc=config.RPC)self.w3:AsyncWeb3=AsyncWeb3(WebSocketProvider(endpoint_uri=config.RPC))self.dispatcher=dispatcherorDispatcher()self.is_running:bool=False@propertyasyncdefis_connected(self)->bool:returnawaitself.w3.provider.is_connected()
[docs]asyncdefconnect(self)->None:'''connects the WebSocketProvider, if not connected'''ifawaitself.is_connected:returnawaitself.w3.provider.connect()
[docs]asyncdefsubscribe(self,subs:Union[List[Subscription],Subscription],)->List[str]:'''subscribes to events Args: subs (Union[List[Subscription], Subscription]): The subscriptions Returns: HexStr'''subscriptions=[]ifisinstance(subs,Subscription):subs=[subs]forsubinsubs:ifnotsub.handler:sub.handler=self.dispatcher.handle_eventsub_id=awaitself.w3.subscription_manager.subscribe(sub.to_web3())subscriptions.append(sub_id)self.dispatcher.subscribe(sub_id=sub_id)returnsubscriptions
[docs]asyncdefunsubscribe(self,sub_id:str)->None:'''unsubscribe from web3 and dispatcher Args: sub_id (str): Subscription ID Returns: None'''try:awaitself.w3.subscription_manager.unsubscribe(sub_id)self.dispatcher.unsubscribe(sub_id=sub_id)exceptExceptionase:print(e)
[docs]asyncdefrun(self)->None:'''running the handlers'''ifself.is_running:returnself.is_running=Trueawaitself.w3.subscription_manager.handle_subscriptions()