Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
652 views
in Technique[技术] by (71.8m points)

asynchronous - asyncio -- RuntimeWarning: coroutine 'MyStreamConsumer.handle_timesale_equity' was never awaited

When running this code I get this error:

RuntimeWarning: coroutine 'MyStreamConsumer.handle_timesale_equity' was never awaited --

I have tried adding -- await consumer.handle_timesale_equity() in the def main() function but I still get the same error.

Any help would be appreciated. Source code was here- https://github.com/alexgolec/tda-api/blob/master/examples/streaming/timesales.py

class MyStreamConsumer:
    """
    We use a class to enforce good code organization practices
    """
    def __init__(self, api_key, account_id, queue_size=1,
                 credentials_path='token'):
        """
        We're storing the configuration variables within the class for easy
        access later in the code!
        """
        self.api_key = api_key
        self.account_id = account_id
        self.credentials_path = credentials_path
        self.tda_client = None
        self.stream_client = None
        self.symbols = [
            'GOOG', 'GOOGL']

        # Create a queue so we can queue up work gathered from the client
        self.queue = asyncio.Queue(queue_size)

    def initialize(self):
        """
        Create the clients and log in. Using easy_client, we can get new creds
        from the user via the web browser if necessary
        """
        self.tda_client = easy_client(
            api_key=self.api_key,
            redirect_uri='https://localhost',
            token_path=self.credentials_path)
        self.stream_client = StreamClient(self.tda_client, account_id=self.account_id)

        # The streaming client wants you to add a handler for every service type
        self.stream_client.add_timesale_equity_handler(self.handle_timesale_equity)

    async def stream(self):
        await self.stream_client.login() # Log into the streaming service
        await self.stream_client.quality_of_service(StreamClient.QOSLevel.EXPRESS)
        await self.stream_client.timesale_equity_subs(self.symbols)

        # Kick off our handle_queue function as an independent coroutine
        asyncio.ensure_future(self.handle_queue())

        # Continuously handle inbound messages
        while True:
            await self.stream_client.handle_message()

    async def handle_timesale_equity(self, msg):
        """
        This is where we take msgs from the streaming client and put them on a
        queue for later consumption. We use a queue to prevent us from wasting
        resources processing old data, and falling behind.
        """
        # if the queue is full, make room
        if self.queue.full():
            await self.queue.get()
        await self.queue.put(msg)

    async def handle_queue(self):
        """
        Here we pull messages off the queue and process them.
        """
        while True:
            msg = await self.queue.get()
            pprint.pprint(msg)


async def main():
    """
    Create and instantiate the consumer, and start the stream
    """
    consumer = MyStreamConsumer(API_KEY, ACCOUNT_ID)
    consumer.initialize()
    await consumer.stream()


if __name__ == '__main__':
    asyncio.run(main())

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...