本文整理汇总了Python中aspen.sockets.message.Message类的典型用法代码示例。如果您正苦于以下问题:Python Message类的具体用法?Python Message怎么用?Python Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_event_data_decoded
def test_event_data_decoded():
message = Message.from_bytes('''5:::{
"name": "bar", "args": []
}''')
expected = {u'args': [], u'name': 'bar'}
actual = message.data
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:7,代码来源:test_sockets_message.py
示例2: test_can_put_onto_buffer
def test_can_put_onto_buffer():
mk(("echo.sock", "socket.send(socket.recv())"))
buffer = Buffer(make_socket(), "foo")
expected = [FFFD + "4" + FFFD + "1:::"]
buffer.put(Message.from_bytes("1:::"))
actual = list(buffer.flush())
assert actual == expected, actual
开发者ID:rayleyva,项目名称:aspen,代码行数:7,代码来源:test_sockets_buffer.py
示例3: test_json_data_decoded
def test_json_data_decoded():
message = Message.from_bytes('''4:deadbeef:/cheese.sock:{
"foo": "bar"
}''')
expected = {"foo": "bar"}
actual = message.data
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:7,代码来源:test_sockets_message.py
示例4: test_can_put_onto_buffer
def test_can_put_onto_buffer():
mk(('echo.sock', 'socket.send(socket.recv())'))
buffer = Buffer(make_socket(), 'foo')
expected = [FFFD+'4'+FFFD+'1:::']
buffer.put(Message.from_bytes('1:::'))
actual = list(buffer.flush())
assert actual == expected, actual
开发者ID:berryp,项目名称:aspen,代码行数:7,代码来源:test_sockets_buffer.py
示例5: test_json_roundtrip
def test_json_roundtrip():
bytes = '''4:deadbeef:/cheese.sock:{
"foo": "bar"
}'''
message = Message.from_bytes(bytes)
expected = bytes
actual = str(message)
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:8,代码来源:test_sockets_message.py
示例6: __iter__
def __iter__(self):
"""Yield Message objects.
"""
if self.bytes[:3] != FFFD:
yield Message.from_bytes(self.bytes)
else:
frames = self.bytes.split(FFFD)
frames = frames[1:] # discard initial empty string
nframes = len(frames)
if nframes % 2 != 0:
msg = "There are an odd number of frames in this packet: "
msg += self.bytes
raise SyntaxError(msg)
while frames:
# frames == [nbytes, bytes, nbytes, bytes, ...]
# We only care about bytes.
yield Message.from_bytes(frames[1])
frames = frames[2:]
开发者ID:jarpineh,项目名称:aspen,代码行数:18,代码来源:packet.py
示例7: test_channel_passes_send_on_to_four_sockets
def test_channel_passes_send_on_to_four_sockets():
mk(('echo.sock', 'channel.send(channel.recv())'))
channel = Channel('foo', ThreadedBuffer)
sockets = [make_socket(channel=channel) for i in range(4)]
channel.send('foo')
for socket in sockets:
expected = deque([Message.from_bytes('3::/echo.sock:foo')])
actual = socket.outgoing.queue
assert actual == expected, actual
开发者ID:allisonmobley,项目名称:aspen,代码行数:10,代码来源:test_sockets_channel.py
示例8: test_channel_passes_send_on_to_one_socket
def test_channel_passes_send_on_to_one_socket():
mk(('echo.sock', ''))
socket = make_socket()
channel = Channel('foo', ThreadedBuffer)
channel.add(socket)
channel.send('foo')
expected = deque([Message.from_bytes('3::/echo.sock:foo')])
actual = socket.outgoing.queue
assert actual == expected, actual
开发者ID:allisonmobley,项目名称:aspen,代码行数:10,代码来源:test_sockets_channel.py
示例9: test_transport_GET_gets_data_from_socket
def test_transport_GET_gets_data_from_socket():
transport = make_transport(state=1)
message = Message.from_bytes("3:::Greetings, program!")
transport.socket.outgoing.put(message)
request = Request('GET')
response = transport.respond(request)
expected = FFFD+'23'+FFFD+'3:::Greetings, program!'
actual = response.body.next()
assert actual == expected, actual
开发者ID:jarpineh,项目名称:aspen,代码行数:11,代码来源:test_sockets_transport.py
示例10: test_buffer_flush_performance
def test_buffer_flush_performance():
return # This test makes my lap hot.
M = lambda: Message.from_bytes("3::/echo.sock:Greetings, program!")
N = 10000
buffer = Buffer([M() for i in range(N)])
out = list(buffer.flush())
nbuffer = len(buffer)
nout = len(out)
assert nbuffer + nout == N, (nbuffer, nout)
assert nout > 500
开发者ID:berryp,项目名称:aspen,代码行数:12,代码来源:test_sockets_buffer.py
示例11: test_from_bytes_data_part_is_optional
def test_from_bytes_data_part_is_optional():
message = Message.from_bytes('3::')
expected = ""
actual = message.data
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例12: test_packet_Packetable_with_multiple_frames
def test_packet_Packetable_with_multiple_frames():
expected = [Message.from_bytes(x) for x in ('0:::', '1:::')]
actual = list(Packet(FFFD+'4'+FFFD+'0:::'+FFFD+'4'+FFFD+'1:::'))
assert actual == expected, repr(actual)
开发者ID:buchuki,项目名称:aspen,代码行数:4,代码来源:test_sockets_packet.py
示例13: test_from_bytes_type_lower_bound_instantiable
def test_from_bytes_type_lower_bound_instantiable():
message = Message.from_bytes('0:::')
expected = 0
actual = message.type
assert actual == expected
开发者ID:BigBlueHat,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例14: test_endpoint_passes_through
def test_endpoint_passes_through():
message = Message.from_bytes('3:deadbeef:/cheese.sock:')
expected = '/cheese.sock'
actual = message.endpoint
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例15: test_data_passes_through
def test_data_passes_through():
message = Message.from_bytes('3:deadbeef:/cheese.sock:Greetings, program!')
expected = 'Greetings, program!'
actual = message.data
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例16: test_from_bytes_type_upper_bound_instantiable
def test_from_bytes_type_upper_bound_instantiable():
message = Message.from_bytes('8:::')
expected = 8
actual = message.type
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例17: test_id_passes_through
def test_id_passes_through():
message = Message.from_bytes('3:deadbeef::')
expected = 'deadbeef'
actual = message.id
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例18: test_message_can_be_instantiated_from_bytes
def test_message_can_be_instantiated_from_bytes():
expected = Message
actual = Message.from_bytes('3:::').__class__
assert actual is expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:4,代码来源:test_sockets_message.py
示例19: test_from_bytes_too_many_colons_and_the_extras_end_up_in_the_data
def test_from_bytes_too_many_colons_and_the_extras_end_up_in_the_data():
message = Message.from_bytes('3::::')
expected = ":"
actual = message.data
assert actual == expected, actual
开发者ID:nejstastnejsistene,项目名称:aspen-python,代码行数:5,代码来源:test_sockets_message.py
示例20: test_packet_Packetable_with_unframed_bytes
def test_packet_Packetable_with_unframed_bytes():
expected = [Message.from_bytes('1:::')]
actual = list(Packet('1:::'))
assert actual == expected, actual
开发者ID:buchuki,项目名称:aspen,代码行数:4,代码来源:test_sockets_packet.py
注:本文中的aspen.sockets.message.Message类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论