本文整理汇总了Python中util.byte2int函数的典型用法代码示例。如果您正苦于以下问题:Python byte2int函数的具体用法?Python byte2int怎么用?Python byte2int使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了byte2int函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_server_information
def _get_server_information(self):
i = 0
packet = MysqlPacket(self)
data = packet.get_all_data()
if DEBUG: dump_packet(data)
#packet_len = byte2int(data[i:i+1])
#i += 4
self.protocol_version = byte2int(data[i:i+1])
i += 1
server_end = data.find(int2byte(0), i)
# TODO: is this the correct charset? should it be default_charset?
self.server_version = data[i:server_end].decode(self.charset)
i = server_end + 1
self.server_thread_id = struct.unpack('<h', data[i:i+2])
i += 4
self.salt = data[i:i+8]
i += 9
if len(data) >= i + 1:
i += 1
self.server_capabilities = struct.unpack('<h', data[i:i+2])[0]
i += 1
self.server_language = byte2int(data[i:i+1])
self.server_charset = charset_by_id(self.server_language).name
i += 16
if len(data) >= i+12-1:
rest_salt = data[i:i+12]
self.salt += rest_salt
开发者ID:Webs961,项目名称:PyMySQL,代码行数:35,代码来源:connections.py
示例2: __recv_packet
def __recv_packet(self, socket):
"""Parse the packet header and read entire packet payload into buffer."""
packet_header = socket.recv(4)
while len(packet_header) < 4:
d = socket.recv(4 - len(packet_header))
if len(d) == 0:
raise OperationalError(2013, "Lost connection to MySQL server during query")
packet_header += d
if DEBUG: dump_packet(packet_header)
packet_length_bin = packet_header[:3]
self.__packet_number = byte2int(packet_header[3])
# TODO: check packet_num is correct (+1 from last packet)
bin_length = packet_length_bin + int2byte(0) # pad little-endian number
bytes_to_read = struct.unpack('<I', bin_length)[0]
payload_buff = [] # this is faster than cStringIO
while bytes_to_read > 0:
recv_data = socket.recv(bytes_to_read)
if len(recv_data) == 0:
raise OperationalError(2013, "Lost connection to MySQL server during query")
if DEBUG: dump_packet(recv_data)
payload_buff.append(recv_data)
bytes_to_read -= len(recv_data)
self.__data = join_bytes(payload_buff)
开发者ID:flymio,项目名称:hammer,代码行数:26,代码来源:connections.py
示例3: dump_packet
def dump_packet(data):
def is_ascii(data):
if 65 <= byte2int(data) <= 122: # data.isalnum():
if isinstance(data, int):
return chr(data)
return data
return "."
try:
print("packet length:", len(data))
print("method call[1]:", sys._getframe(1).f_code.co_name)
print("method call[2]:", sys._getframe(2).f_code.co_name)
print("method call[3]:", sys._getframe(3).f_code.co_name)
print("method call[4]:", sys._getframe(4).f_code.co_name)
print("method call[5]:", sys._getframe(5).f_code.co_name)
print("-" * 88)
except ValueError:
pass
dump_data = [data[i : i + 16] for i in range_type(0, min(len(data), 256), 16)]
for d in dump_data:
print(
" ".join(map(lambda x: "{:02X}".format(byte2int(x)), d))
+ " " * (16 - len(d))
+ " " * 2
+ " ".join(map(lambda x: "{}".format(is_ascii(x)), d))
)
print("-" * 88)
print()
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:28,代码来源:connections.py
示例4: _scramble_323
def _scramble_323(password, message):
hash_pass = _hash_password_323(password)
hash_message = _hash_password_323(message[:SCRAMBLE_LENGTH_323])
hash_pass_n = struct.unpack(">LL", hash_pass)
hash_message_n = struct.unpack(">LL", hash_message)
rand_st = RandStruct_323(hash_pass_n[0] ^ hash_message_n[0], hash_pass_n[1] ^ hash_message_n[1])
outbuf = io.BytesIO()
for _ in range_type(min(SCRAMBLE_LENGTH_323, len(message))):
outbuf.write(int2byte(int(rand_st.my_rnd() * 31) + 64))
extra = int2byte(int(rand_st.my_rnd() * 31))
out = outbuf.getvalue()
outbuf = io.BytesIO()
for c in out:
outbuf.write(int2byte(byte2int(c) ^ byte2int(extra)))
return outbuf.getvalue()
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:16,代码来源:connections.py
示例5: __parse_field_descriptor
def __parse_field_descriptor(self, encoding):
"""Parse the 'Field Descriptor' (Metadata) packet.
This is compatible with MySQL 4.1+ (not compatible with MySQL 4.0).
"""
self.catalog = self.read_length_coded_string()
self.db = self.read_length_coded_string()
self.table_name = self.read_length_coded_string().decode(encoding)
self.org_table = self.read_length_coded_string().decode(encoding)
self.name = self.read_length_coded_string().decode(encoding)
self.org_name = self.read_length_coded_string().decode(encoding)
self.advance(1) # non-null filler
self.charsetnr = struct.unpack("<H", self.read(2))[0]
self.length = struct.unpack("<I", self.read(4))[0]
self.type_code = byte2int(self.read(1))
self.flags = struct.unpack("<H", self.read(2))[0]
self.scale = byte2int(self.read(1)) # "decimals"
self.advance(2) # filler (always 0x00)
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:18,代码来源:connections.py
示例6: init_unbuffered_query
def init_unbuffered_query(self):
self.unbuffered_active = True
self.first_packet = self.connection.read_packet()
if self.first_packet.is_ok_packet():
self._read_ok_packet()
self.unbuffered_active = False
else:
self.field_count = byte2int(self.first_packet.read(1))
self._get_descriptions()
# Apparently, MySQLdb picks this number because it's the maximum
# value of a 64bit unsigned integer. Since we're emulating MySQLdb,
# we set it to this instead of None, which would be preferred.
self.affected_rows = 18446744073709551615
开发者ID:Webs961,项目名称:PyMySQL,代码行数:15,代码来源:connections.py
示例7: _hash_password_323
def _hash_password_323(password):
nr = 1345345333
add = 7
nr2 = 0x12345671
for c in [byte2int(x) for x in password if x not in (" ", "\t")]:
nr ^= (((nr & 63) + add) * c) + (nr << 8) & 0xFFFFFFFF
nr2 = (nr2 + ((nr2 << 8) ^ nr)) & 0xFFFFFFFF
add = (add + c) & 0xFFFFFFFF
r1 = nr & ((1 << 31) - 1) # kill sign bits
r2 = nr2 & ((1 << 31) - 1)
# pack
return struct.pack(">LL", r1, r2)
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:15,代码来源:connections.py
示例8: _get_server_information
def _get_server_information(self):
i = 0
packet = MysqlPacket(self)
yield packet.recv_packet()
packet.check_error()
data = packet.get_all_data()
if DEBUG:
dump_packet(data)
self.protocol_version = byte2int(data[i : i + 1])
i += 1
server_end = data.find(int2byte(0), i)
self.server_version = data[i:server_end].decode("latin1")
i = server_end + 1
self.server_thread_id = struct.unpack("<I", data[i : i + 4])
i += 4
self.salt = data[i : i + 8]
i += 9 # 8 + 1(filler)
self.server_capabilities = struct.unpack("<H", data[i : i + 2])[0]
i += 2
if len(data) >= i + 6:
lang, stat, cap_h, salt_len = struct.unpack("<BHHB", data[i : i + 6])
i += 6
self.server_language = lang
self.server_charset = charset_by_id(lang).name
self.server_status = stat
if DEBUG:
print("server_status: %x" % stat)
self.server_capabilities |= cap_h << 16
if DEBUG:
print("salt_len:", salt_len)
salt_len = max(12, salt_len - 9)
# reserved
i += 10
if len(data) >= i + salt_len:
self.salt += data[i : i + salt_len] # salt_len includes auth_plugin_data_part_1 and filler
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:45,代码来源:connections.py
示例9: __recv_packet
def __recv_packet(self):
"""Parse the packet header and read entire packet payload into buffer."""
packet_header = self.connection.rfile.read(4)
if len(packet_header) < 4:
raise OperationalError(2013, "Lost connection to MySQL server during query")
if DEBUG: dump_packet(packet_header)
packet_length_bin = packet_header[:3]
self.__packet_number = byte2int(packet_header[3])
# TODO: check packet_num is correct (+1 from last packet)
bin_length = packet_length_bin + int2byte(0) # pad little-endian number
bytes_to_read = struct.unpack('<I', bin_length)[0]
recv_data = self.connection.rfile.read(bytes_to_read)
if len(recv_data) < bytes_to_read:
raise OperationalError(2013, "Lost connection to MySQL server during query")
if DEBUG: dump_packet(recv_data)
self.__data = recv_data
开发者ID:Webs961,项目名称:PyMySQL,代码行数:18,代码来源:connections.py
示例10: read_length_coded_binary
def read_length_coded_binary(self):
"""Read a 'Length Coded Binary' number from the data buffer.
Length coded numbers can be anywhere from 1 to 9 bytes depending
on the value of the first byte.
"""
c = byte2int(self.read(1))
if c == NULL_COLUMN:
return None
if c < UNSIGNED_CHAR_COLUMN:
return c
elif c == UNSIGNED_SHORT_COLUMN:
return unpack_uint16(self.read(UNSIGNED_SHORT_LENGTH))
elif c == UNSIGNED_INT24_COLUMN:
return unpack_int24(self.read(UNSIGNED_INT24_LENGTH))
elif c == UNSIGNED_INT64_COLUMN:
# TODO: what was 'longlong'? confirm it wasn't used?
return unpack_int64(self.read(UNSIGNED_INT64_LENGTH))
开发者ID:Webs961,项目名称:PyMySQL,代码行数:18,代码来源:connections.py
示例11: dump_packet
def dump_packet(data):
def is_ascii(data):
if byte2int(data) >= 65 and byte2int(data) <= 122: #data.isalnum():
return data
return '.'
print "packet length %d" % len(data)
print "method call[1]: %s" % sys._getframe(1).f_code.co_name
print "method call[2]: %s" % sys._getframe(2).f_code.co_name
print "method call[3]: %s" % sys._getframe(3).f_code.co_name
print "method call[4]: %s" % sys._getframe(4).f_code.co_name
print "method call[5]: %s" % sys._getframe(5).f_code.co_name
print "-" * 88
dump_data = [data[i:i+16] for i in xrange(len(data)) if i%16 == 0]
for d in dump_data:
print ' '.join(map(lambda x:"%02X" % byte2int(x), d)) + \
' ' * (16 - len(d)) + ' ' * 2 + \
' '.join(map(lambda x:"%s" % is_ascii(x), d))
print "-" * 88
print ""
开发者ID:BillTheBest,项目名称:CloudStack,代码行数:20,代码来源:connections.py
示例12: recv_packet
def recv_packet(self):
"""Parse the packet header and read entire packet payload into buffer."""
buff = b""
while True:
packet_header = yield self.connection.stream.read_bytes(4)
if DEBUG:
dump_packet(packet_header)
packet_length_bin = packet_header[:3]
# TODO: check sequence id
self._packet_number = byte2int(packet_header[3])
bin_length = packet_length_bin + b"\0" # pad little-endian number
bytes_to_read = struct.unpack("<I", bin_length)[0]
recv_data = yield self.connection.stream.read_bytes(bytes_to_read)
if DEBUG:
dump_packet(recv_data)
buff += recv_data
if bytes_to_read < MAX_PACKET_LEN:
break
self._data = buff
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:21,代码来源:connections.py
示例13: is_ok_packet
def is_ok_packet(self):
return byte2int(self.get_bytes(0)) == 0
开发者ID:Webs961,项目名称:PyMySQL,代码行数:2,代码来源:connections.py
示例14: is_ascii
def is_ascii(data):
if 65 <= byte2int(data) <= 122: # data.isalnum():
if isinstance(data, int):
return chr(data)
return data
return "."
开发者ID:roson9527,项目名称:AsyncTorndb,代码行数:6,代码来源:connections.py
示例15: is_eof_packet
def is_eof_packet(self):
return byte2int(self.get_bytes(0)) == 254 # 'fe'
开发者ID:Webs961,项目名称:PyMySQL,代码行数:2,代码来源:connections.py
示例16: _read_result_packet
def _read_result_packet(self):
self.field_count = byte2int(self.first_packet.read(1))
self._get_descriptions()
self._read_rowdata_packet()
开发者ID:Webs961,项目名称:PyMySQL,代码行数:4,代码来源:connections.py
示例17: is_ascii
def is_ascii(data):
if byte2int(data) >= 65 and byte2int(data) <= 122: #data.isalnum():
return data
return '.'
开发者ID:Webs961,项目名称:PyMySQL,代码行数:4,代码来源:connections.py
示例18: is_error_packet
def is_error_packet(self):
return byte2int(self.get_bytes(0)) == 255
开发者ID:Webs961,项目名称:PyMySQL,代码行数:2,代码来源:connections.py
示例19: is_resultset_packet
def is_resultset_packet(self):
field_count = byte2int(self.get_bytes(0))
return field_count >= 1 and field_count <= 250
开发者ID:Webs961,项目名称:PyMySQL,代码行数:3,代码来源:connections.py
注:本文中的util.byte2int函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论