Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket doesn't re-subscribe upon disconnect #24

Open
novicetopython opened this issue Feb 14, 2024 · 3 comments
Open

Websocket doesn't re-subscribe upon disconnect #24

novicetopython opened this issue Feb 14, 2024 · 3 comments

Comments

@novicetopython
Copy link

Thanks for your great work on this.

The websocket re-connects on a disconnect but doesn't seem to re-subscribe to channels.

2024-02-14 04:48:48.092 | ERROR | pybitget.stream:__on_error:235 - Connection to remote host was lost.
2024-02-14 04:48:48.093 | INFO | pybitget.stream:__re_connect:252 - start reconnection ...
2024-02-14 04:48:48.095 | INFO | pybitget.stream:__on_close:241 - ws is closeing ......close_status:None,close_msg:None
2024-02-14 04:48:49.505 | INFO | pybitget.stream:__on_open:179 - connection is success....
2024-02-14 04:48:49.566 | INFO | pybitget.stream:__on_open:179 - connection is success....

I have edited stream.py as below in to close the connection and then wait in case it needs time until properly logged in before trying to re-subscribe

def __re_connect(self):
    if not self.__keyboard_interrupt_flag:
        self.__reconnect_status = True
        logger.info("start reconnection ...")
        # Close the current connection
        self.__close()
        # Rebuild the connection
        self.build()
        # Wait for the connection to be established before resubscribing
        while not self.has_connect():
            time.sleep(1)
        for channel in self.__all_suribe:
            self.subscribe([channel])            
        #pass
        logger.info("Resubscribed to channels: {}".format(self.__all_suribe))

Could ideally do with the script catching KeyboardException (I have tried to implement within stream.py but isn't error free) and a close and reconnect if no messages received for a period of time.

Thanks

@novicetopython
Copy link
Author

Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.

@erichegit
Copy link

Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.

It would be nice to show how you changed the Websocket code, since I also have the problem, that once the connection is lost because the webserver of Bitget are overloaded, it is not re-connecting anymore.

This is how it looks when entered the death spiral, not recovering anymore.

2024-03-08 18:25:27.124 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.125 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "ordersAlgo", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "positions", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "orders", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "account", "inst_id": "default"}]}
2024-03-08 18:25:27.127 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.127 | INFO     | pybitget.stream:__re_connect:226 - start reconnection ...
2024-03-08 18:25:27.128 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:27.132 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.205 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.206 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.207 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.251 | ERROR    | pybitget.stream:__on_error:213 - Handshake status 400 Bad Request -+-+- {'server': 'CloudFront', 'date': 'Fri, 08 Mar 2024 17:25:27 GMT', 'content-type': 'text/html', 'content-length': '915', 'connection': 'close', 'x-cache': 'Error from cloudfront', 'via': '1.1 9a97e41242551c9a56be1311e4d3db70.cloudfront.net (CloudFront)', 'x-amz-cf-pop': 'FRA60-P10', 'x-amz-cf-id': 'CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEAA=='} -+-+- b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">\n<HTML><HEAD><META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">\n<TITLE>ERROR: The request could not be satisfied</TITLE>\n</HEAD><BODY>\n<H1>400 ERROR</H1>\n<H2>The request could not be satisfied.</H2>\n<HR noshade size="1px">\nBad request.\nWe can\'t connect to the server for this app or website at this time. There might be too much traffic or a configuration error. Try again later, or contact the app or website owner.\n<BR clear="all">\nIf you provide content to customers through CloudFront, you can find steps to troubleshoot and help prevent this error by reviewing the CloudFront documentation.\n<BR clear="all">\n<HR noshade size="1px">\n<PRE>\nGenerated by cloudfront (CloudFront)\nRequest ID: CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEjvxQ5vf-piwvAA==\n</PRE>\n<ADDRESS>\n</ADDRESS>\n</BODY></HTML>'
2024-03-08 18:25:27.251 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.253 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.389 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.390 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.391 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.626 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.636 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.705 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.726 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.976 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.986 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.084 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.133 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:28.151 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.458 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.514 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.517 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.639 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.698 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.704 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.713 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.721 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.724 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.864 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.064 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.068 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.136 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream

@novicetopython
Copy link
Author

Hi

Builds on what you had already done but I am not trading using the websocket so not logging in.

Thanks

class BitgetWsClient:
def init(self,
api_key: Optional[str] = None,
api_secret: Optional[str] = None,
passphrase: Optional[str] = None,
channels: Optional[list[SubscribeReq]] = None, # Add channels parameter
ping_interval: Optional[int] = 20,
ws_url: Optional[str] = None,
verbose: Optional[str] = False,
handle=None,
handle_error=None,
):
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.channels = channels
self.ws_url = ws_url or CONTRACT_WS_URL
self.verbose = verbose
self.connection = False
self.login_status = False
self.subscribe_status = False
self.all_subscribe = set()
self.listener = self.handle if handle is None else handle
self.error_listener = self.handle_error if handle_error is None else handle_error
self.scribe_map = {}
self.ws_client = None
self.keyboard_interrupt_flag = False
self.last_receive_time = self.lastrestarttime = self.last_ping_time = time.time()
self.errormonitor = 'unset'
self.inerror = False
self.reconnect_interval = 60 # 2 minutes in seconds
self.ping_interval = ping_interval
self.logwrites = 0
self.restarts = 0
# Initialize the websocket_lock attribute
self.websocket_lock = asyncio.Lock()

async def connect(self):
    # signal.signal(signal.SIGINT, lambda signal, frame: asyncio.create_task(self.handle_keyboard_interrupt(signal, frame)))
    signal.signal(signal.SIGINT, self.handle_keyboard_interrupt)
    while True:
        if self.keyboard_interrupt_flag:
            await self.unsubscribe(list(self.all_subscribe))
            await asyncio.sleep(2)
            await self.close()
            await asyncio.sleep(2)
            break
        try:
            if self.inerror: await asyncio.sleep(20) # if in error wait 20 seconds before trying to connect again
            logger.info("Trying to connect to WebSocket server.")
            async with websockets.connect(self.ws_url) as websocket:
                logger.info("Connected to WebSocket server.")
                self.websocket = websocket
                self.connection, self.inerror, self.subscribe_status, self.errormonitor = True, False, False, 'unset'
                self.restarts = self.restarts + 1
                if self.restarts > 10: self.update_log("Websocket has restarted 10 times, resetting to zero")
                # don't need to login for public information
                # await self.login()
                await asyncio.sleep(2)
                await self.subscribe(self.channels)
                while True:
                    if self.keyboard_interrupt_flag: break
                    elif time.time() > self.last_receive_time + 600: # if no message received for 10 minutes
                        self.update_log("No data received for 10 minutes")
                        self.connection, self.inerror, self.last_receive_time = False, True, time.time()
                        break
                    elif not self.connection:
                        try:
                            await self.unsubscribe(list(self.all_subscribe))
                            await asyncio.sleep(2)
                            await self.close()
                            await asyncio.sleep(2)
                            break
                        except: break
                    elif time.time() - self.last_ping_time > self.ping_interval: await self.send_ping()
                    else:
                        #async with self.websocket_lock: message = await self.websocket.recv()
                        await self.on_message()
        except websockets.WebSocketException as e:
            logger.error(f"WebSocket error: {e}")
            self.update_log("websockets.WebsocketException in connect, error is "+str(e))
            await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
            continue
        except KeyboardInterrupt:
            print("\nKeyboardInterrupt in main while loop: Closing WebSocket connection.")
            self.keyboard_interrupt_flag = True
            break
        except Exception as e:
            logger.error(f"Error connecting to WebSocket server: {e}")
            self.update_log("Exception in connect, error is "+str(e))
            await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
            continue
    sys.exit()

def update_log(self, msg):
    if self.logwrites < 50:
        updatelogfile(msg,logfile)
        self.logwrites = self.logwrites + 1
        if "restarted" in msg: self.restarts = 0
    return

def handle_keyboard_interrupt(self, signal, frame):
    print("\nKeyboardInterrupt: Closing WebSocket connection.")
    self.keyboard_interrupt_flag = True
    return

async def send_ping(self):
    try:
        self.last_ping_time = time.time()
        await self.websocket.send("ping")
    except Exception as e: 
        logger.error(f"Error sending ping: {e}")
        if self.errormonitor == 'unset': self.errormonitor = time.time()
    return

async def on_message(self):
    try:
        try: message = await asyncio.wait_for(self.websocket.recv(), timeout=120)  # Set timeout to 1 minute
        except asyncio.TimeoutError: return
        #message = await self.websocket.recv()
        if message == 'pong': return
        json_obj = json.loads(message)
        listener = None
        if "data" in str(json_obj):
            if not self.__check_sum(json_obj):
                return
            listener = self.get_listener(json_obj)
            # Update the last receive time whenever data is received
            self.last_receive_time = time.time()
            self.subscribe_status, self.inerror, self.errormonitor = True, False, 'unset'
        elif "connection close" in str(json_obj):
            self.update_log("Connection is closed, error is "+str(json_obj))
            self.connection, self.inerror = False, True
            return
        elif "code" in str(json_obj) and json_obj.get("code") != 0:
            self.update_log("Error message received, error is "+str(json_obj))
            if self.errormonitor == 'unset': self.errormonitor = time.time()
            elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
                self.connection, self.inerror = False, True
            return
        elif "subscribe" in str(json_obj):
            logger.info(f"Subscribed to: {json_obj}")
            return
        elif "login" in str(json_obj):
            if self.verbose: print("login msg: %s" % message)
            self.login_status = True
            return
        else:
            self.update_log("Unknown message received: "+(json_obj))
            print("unknown message is: "+str(json_obj))
            return
        if listener:
            print("in listener")
            listener(message)
            return
        try: self.listener(json_obj)
        except: 
            self.listener(message)
            pass
    except KeyboardInterrupt:
        print("\nKeyboardInterrupt in on_message: Closing WebSocket connection.")
        self.keyboard_interrupt_flag = True
        return
    except websockets.ConnectionClosedOK:
        logger.error("WebSocket connection closed gracefully.")
        self.connection, self.inerror = False, True
    except websockets.ConnectionClosedError as e:
        logger.error(f"WebSocket connection closed unexpectedly: {e}")
        self.update_log("Connection Closeed Error: "+str(e))
        self.connection, self.inerror = False, True
    except Exception as e:
        logger.error(f"Error in on message: {e}")
        self.update_log("Error in on_message: "+str(e))
        if "sent 1000 (OK)" in str(e) or "sent 1011 (unexpected error)" in str(e): self.connection, self.inerror = False, True
        elif self.errormonitor == 'unset': self.errormonitor = time.time()
        elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
            self.connection, self.inerror = False, True
        return

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants