• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python message.Message类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中aspen.sockets.message.Message的典型用法代码示例。如果您正苦于以下问题:Python Message类的具体用法?Python Message怎么用?Python Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_event_data_decoded

def test_event_data_decoded():
    message = Message.from_bytes('''5:::{
    "name": "bar", "args": []
}''')
    expected = {u'args': [], u'name': 'bar'}
    actual = message.data
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:7,代码来源:test_sockets_message.py


示例2: test_can_put_onto_buffer

def test_can_put_onto_buffer():
    mk(("echo.sock", "socket.send(socket.recv())"))
    buffer = Buffer(make_socket(), "foo")
    expected = [FFFD + "4" + FFFD + "1:::"]
    buffer.put(Message.from_bytes("1:::"))
    actual = list(buffer.flush())
    assert actual == expected, actual
开发者ID:rayleyva,项目名称:aspen,代码行数:7,代码来源:test_sockets_buffer.py


示例3: test_json_data_decoded

def test_json_data_decoded():
    message = Message.from_bytes('''4:deadbeef:/cheese.sock:{
    "foo": "bar"
}''')
    expected = {"foo": "bar"}
    actual = message.data
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:7,代码来源:test_sockets_message.py


示例4: test_can_put_onto_buffer

def test_can_put_onto_buffer():
    mk(('echo.sock', 'socket.send(socket.recv())'))
    buffer = Buffer(make_socket(), 'foo')
    expected = [FFFD+'4'+FFFD+'1:::']
    buffer.put(Message.from_bytes('1:::'))
    actual = list(buffer.flush())
    assert actual == expected, actual
开发者ID:berryp,项目名称:aspen,代码行数:7,代码来源:test_sockets_buffer.py


示例5: test_json_roundtrip

def test_json_roundtrip():
    bytes = '''4:deadbeef:/cheese.sock:{
    "foo": "bar"
}'''
    message = Message.from_bytes(bytes)
    expected = bytes
    actual = str(message)
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:8,代码来源:test_sockets_message.py


示例6: __iter__

 def __iter__(self):
     """Yield Message objects.
     """
     if self.bytes[:3] != FFFD:
         yield Message.from_bytes(self.bytes)
     else:
         frames = self.bytes.split(FFFD)
         frames = frames[1:]  # discard initial empty string
         nframes = len(frames)
         if nframes % 2 != 0:
             msg = "There are an odd number of frames in this packet: "
             msg += self.bytes
             raise SyntaxError(msg)
         while frames:
             # frames == [nbytes, bytes, nbytes, bytes, ...]
             # We only care about bytes.
             yield Message.from_bytes(frames[1])
             frames = frames[2:]
开发者ID:jarpineh,项目名称:aspen,代码行数:18,代码来源:packet.py


示例7: test_channel_passes_send_on_to_four_sockets

def test_channel_passes_send_on_to_four_sockets():
    mk(('echo.sock', 'channel.send(channel.recv())'))
    channel = Channel('foo', ThreadedBuffer)
    sockets = [make_socket(channel=channel) for i in range(4)]
    channel.send('foo')

    for socket in sockets:
        expected = deque([Message.from_bytes('3::/echo.sock:foo')])
        actual = socket.outgoing.queue
        assert actual == expected, actual
开发者ID:allisonmobley,项目名称:aspen,代码行数:10,代码来源:test_sockets_channel.py


示例8: test_channel_passes_send_on_to_one_socket

def test_channel_passes_send_on_to_one_socket():
    mk(('echo.sock', ''))
    socket = make_socket()
    channel = Channel('foo', ThreadedBuffer)
    channel.add(socket)
    channel.send('foo')

    expected = deque([Message.from_bytes('3::/echo.sock:foo')])
    actual = socket.outgoing.queue
    assert actual == expected, actual
开发者ID:allisonmobley,项目名称:aspen,代码行数:10,代码来源:test_sockets_channel.py


示例9: test_transport_GET_gets_data_from_socket

def test_transport_GET_gets_data_from_socket():
    transport = make_transport(state=1)
    message = Message.from_bytes("3:::Greetings, program!")
    transport.socket.outgoing.put(message)
    
    request = Request('GET')
    response = transport.respond(request)
   
    expected = FFFD+'23'+FFFD+'3:::Greetings, program!'
    actual = response.body.next()
    assert actual == expected, actual
开发者ID:jarpineh,项目名称:aspen,代码行数:11,代码来源:test_sockets_transport.py


示例10: test_buffer_flush_performance

def test_buffer_flush_performance():

    return # This test makes my lap hot.

    M = lambda: Message.from_bytes("3::/echo.sock:Greetings, program!")
    N = 10000
    buffer = Buffer([M() for i in range(N)])
    out = list(buffer.flush())
    nbuffer = len(buffer)
    nout = len(out)
    assert nbuffer + nout == N, (nbuffer, nout)
    assert nout > 500
开发者ID:berryp,项目名称:aspen,代码行数:12,代码来源:test_sockets_buffer.py


示例11: test_from_bytes_data_part_is_optional

def test_from_bytes_data_part_is_optional():
    message = Message.from_bytes('3::')
    expected = ""
    actual = message.data
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例12: test_packet_Packetable_with_multiple_frames

def test_packet_Packetable_with_multiple_frames():
    expected = [Message.from_bytes(x) for x in ('0:::', '1:::')]
    actual = list(Packet(FFFD+'4'+FFFD+'0:::'+FFFD+'4'+FFFD+'1:::'))
    assert actual == expected, repr(actual)
开发者ID:buchuki,项目名称:aspen,代码行数:4,代码来源:test_sockets_packet.py


示例13: test_from_bytes_type_lower_bound_instantiable

def test_from_bytes_type_lower_bound_instantiable():
    message = Message.from_bytes('0:::')
    expected = 0
    actual = message.type
    assert actual == expected
开发者ID:BigBlueHat,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例14: test_endpoint_passes_through

def test_endpoint_passes_through():
    message = Message.from_bytes('3:deadbeef:/cheese.sock:')
    expected = '/cheese.sock'
    actual = message.endpoint
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例15: test_data_passes_through

def test_data_passes_through():
    message = Message.from_bytes('3:deadbeef:/cheese.sock:Greetings, program!')
    expected = 'Greetings, program!'
    actual = message.data
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例16: test_from_bytes_type_upper_bound_instantiable

def test_from_bytes_type_upper_bound_instantiable():
    message = Message.from_bytes('8:::')
    expected = 8
    actual = message.type
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例17: test_id_passes_through

def test_id_passes_through():
    message = Message.from_bytes('3:deadbeef::')
    expected = 'deadbeef'
    actual = message.id
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例18: test_message_can_be_instantiated_from_bytes

def test_message_can_be_instantiated_from_bytes():
    expected = Message
    actual = Message.from_bytes('3:::').__class__
    assert actual is expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:4,代码来源:test_sockets_message.py


示例19: test_from_bytes_too_many_colons_and_the_extras_end_up_in_the_data

def test_from_bytes_too_many_colons_and_the_extras_end_up_in_the_data():
    message = Message.from_bytes('3::::')
    expected = ":"
    actual = message.data
    assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py


示例20: test_packet_Packetable_with_unframed_bytes

def test_packet_Packetable_with_unframed_bytes():
    expected = [Message.from_bytes('1:::')]
    actual = list(Packet('1:::'))
    assert actual == expected, actual
开发者ID:buchuki,项目名称:aspen,代码行数:4,代码来源:test_sockets_packet.py



注:本文中的aspen.sockets.message.Message类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testing.assert_raises函数代码示例发布时间:2022-05-24
下一篇:
Python resources.get函数代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap