[docs]defsubscribe(self,sub_id:str)->None:'''Subscribe to a subscription using its id Args: sub_id (str): the subscription id'''ifself.listeners.get(sub_id,None)isNone:self.listeners[sub_id]=0self.listeners[sub_id]=0
[docs]defunsubscribe(self,sub_id:str)->None:'''Un-Subscribe to a subscription using its id Args: sub_id (str): the subscription id'''ifself.listeners.get(sub_id,None)isNone:returnself.listeners.pop(sub_id)
[docs]asyncdefhandle_event(self,context:Context)->None:'''handles any new events Args: context (Union[web3.utils.PendingTxSubscriptionContext, web3.utils.LogsSubscriptionContext]): the event context'''sub_id=context.subscription.idifsub_idnotinself.events:self.events[sub_id]=[]self.events[sub_id].append(context)
[docs]defpoll(self,sub_id:str,limit:int)->Union[List[Context],None]:'''poll any new events using the subscription id Args: sub_id (str): the subscription id limit (int): the limit of returned results Return: Union[List[Context], None]'''if(index:=self.listeners.get(sub_id,None))isNone:returndata=[]events=self.events.get(sub_id,[])try:events_index=len(events)new_events=events[index:min(events_index,index+limit)]self.listeners[sub_id]=index+len(new_events)exceptIndexError:new_events=[]data.extend(new_events)returndata