Skip to content

Commit f9f3a17

Browse files
harshavardhanaminio-trusted
authored andcommitted
Re-implement select_object_content implementation
This change fixes multiple issues - calculates proper CRC for the entire message as per spec - handles unicode boundaries properly for special delimiters - handle zero payload 'Cont' event messages
1 parent 84e57b6 commit f9f3a17

File tree

3 files changed

+135
-163
lines changed

3 files changed

+135
-163
lines changed

minio/helpers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@
5656
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB
5757

5858

59-
# Select Object Content
60-
READ_SIZE_SELECT = 32 * 1024 # Buffer size
6159
SQL = 'SQL' # Value for ExpressionType
6260
EVENT_RECORDS = 'Records' # Event Type is Records
6361
EVENT_PROGRESS = 'Progress' # Event Type Progress
6462
EVENT_STATS = 'Stats' # Event Type Stats
65-
EVENT = 'event' # Message Type is event
63+
EVENT_CONT = 'Cont' # Event Type continue
6664
EVENT_END = 'End' # Event Type is End
65+
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
66+
EVENT = 'event' # Message Type is event
6767
ERROR = 'error' # Message Type is error
6868

6969
_VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$')

minio/select_object_reader.py

Lines changed: 122 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,33 @@
1515
# limitations under the License.
1616

1717

18+
from __future__ import absolute_import
19+
1820
import io
21+
import sys
1922
import codecs
2023

2124
from binascii import crc32
2225
from xml.etree import cElementTree
2326
from .error import InvalidXMLError
2427
from xml.etree.cElementTree import ParseError
2528

26-
from .helpers import (READ_SIZE_SELECT, EVENT_RECORDS,
27-
EVENT_PROGRESS, EVENT_STATS, EVENT, EVENT_END, ERROR)
29+
from .helpers import (EVENT_RECORDS, EVENT_PROGRESS,
30+
EVENT_STATS, EVENT_CONT,
31+
EVENT, EVENT_CONTENT_TYPE,
32+
EVENT_END, ERROR)
33+
2834

35+
class SelectMessageError(Exception):
36+
'''
37+
Raised in case of message type 'error'
38+
'''
2939

3040
class CRCValidationError(Exception):
3141
'''
3242
Raised in case of CRC mismatch
3343
'''
3444

35-
3645
def calculate_crc(value):
3746
'''
3847
Returns the CRC using crc32
@@ -66,7 +75,7 @@ class SelectObjectReader(object):
6675
"""
6776
def __init__(self, response):
6877
self.response = response
69-
self.remaining_bytes = bytearray()
78+
self.remaining_bytes = bytes()
7079
self.stat = {}
7180
self.prog = {}
7281

@@ -76,10 +85,6 @@ def readable(self):
7685
def writeable(self):
7786
return False
7887

79-
@property
80-
def closed(self):
81-
return self.response.isclosed()
82-
8388
def close(self):
8489
self.response.close()
8590

@@ -94,82 +99,118 @@ def __extract_message(self):
9499
Process the response sent from server.
95100
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html
96101
"""
97-
rec = bytearray()
98-
read_buffer = READ_SIZE_SELECT
99-
# Messages read in chunks of read_buffer bytes
100-
chunked_message = self.response.read(read_buffer)
101-
total_byte_parsed = 0
102-
if len(chunked_message) == 0:
103-
self.close()
102+
103+
crc_bytes = io.BytesIO()
104+
total_bytes_len = self.response.read(4)
105+
if len(total_bytes_len) == 0:
104106
return b''
105107

106-
# The first 4 bytes gives the total_byte_length and then
107-
# complete message is extracted
108-
while total_byte_parsed < read_buffer:
109-
# Case 1 - If the total_byte_length is partially read
110-
# in the chunked message , then complete the total_byte_length
111-
# by reading the required bytes from response and then
112-
# generate the complete message
113-
if read_buffer - total_byte_parsed <= 4:
114-
value = chunked_message[total_byte_parsed:
115-
total_byte_parsed +
116-
(read_buffer - total_byte_parsed) +
117-
1]
118-
rem_bytes = self.response.read(4 - (read_buffer -
119-
total_byte_parsed))
120-
message = value + rem_bytes + \
121-
self.response.read(byte_int(value+rem_bytes)-4)
122-
end_status = self.__decode_message(message, rec)
123-
total_byte_parsed = 0
124-
break
125-
else:
126-
total_byte_length = chunked_message[total_byte_parsed: total_byte_parsed + 4]
127-
# Case 2 - Incomplete message in chunked message ,
128-
# so creating the complete message by reading the
129-
# total_byte_length- len_read from the response message.
130-
if total_byte_parsed + byte_int(total_byte_length) > read_buffer:
131-
len_read = len(chunked_message[total_byte_parsed:])
132-
message = chunked_message[total_byte_parsed:] + \
133-
self.response.read(byte_int(total_byte_length)-len_read)
134-
end_status = self.__decode_message(message, rec)
135-
total_byte_parsed += byte_int(total_byte_length)
136-
# Case 3- the complete message is present in chunked
137-
# messsage.
108+
total_length = byte_int(total_bytes_len)
109+
header_bytes_len = self.response.read(4)
110+
if len(header_bytes_len) == 0:
111+
return b''
112+
113+
header_len = byte_int(header_bytes_len)
114+
115+
crc_bytes.write(total_bytes_len)
116+
crc_bytes.write(header_bytes_len)
117+
118+
prelude_bytes_crc = self.response.read(4)
119+
if not validate_crc(crc_bytes.getvalue(), prelude_bytes_crc):
120+
self.close()
121+
raise CRCValidationError(
122+
{"Checksum Mismatch, PreludeCRC of " +
123+
str(calculate_crc(crc_bytes.getvalue())) +
124+
" does not equal expected CRC of " +
125+
str(byte_int(prelude_bytes_crc))})
126+
127+
crc_bytes.write(prelude_bytes_crc)
128+
129+
header_bytes = self.response.read(header_len)
130+
if len(header_bytes) == 0:
131+
raise SelectMessageError(
132+
"Premature truncation of select message header"+
133+
", server is sending corrupt message?")
134+
135+
crc_bytes.write(header_bytes)
136+
137+
header_map = self.__extract_header(header_bytes)
138+
payload_length = total_length - header_len - int(16)
139+
payload_bytes = b''
140+
return_bytes = b''
141+
if header_map["message-type"] == ERROR:
142+
self.close()
143+
raise SelectMessageError(
144+
header_map["error-code"] + ":\"" + \
145+
header_map["error-message"] + "\"")
146+
elif header_map["message-type"] == EVENT:
147+
event_type = header_map["event-type"]
148+
if event_type == EVENT_END:
149+
self.close()
150+
payload_bytes = b''
151+
elif event_type == EVENT_CONT:
152+
payload_bytes = b''
153+
## This is to indicate continue reading.
154+
return_bytes = b'continue'
155+
elif event_type == EVENT_STATS:
156+
# Will fully ignoring progress events.
157+
content_type = header_map["content-type"]
158+
if content_type == EVENT_CONTENT_TYPE:
159+
payload_bytes = self.response.read(payload_length)
160+
self.__read_stats(payload_bytes)
138161
else:
139-
message = chunked_message[total_byte_parsed:
140-
total_byte_parsed +
141-
byte_int(total_byte_length)]
142-
total_byte_parsed += byte_int(total_byte_length)
143-
end_status = self.__decode_message(message, rec)
144-
if end_status:
145-
break
146-
return rec
162+
self.close()
163+
raise SelectMessageError(
164+
"Unrecognized content-type {0}".format(content_type))
165+
elif event_type == EVENT_RECORDS:
166+
payload_bytes = self.response.read(payload_length)
167+
return_bytes = payload_bytes
168+
else:
169+
raise SelectMessageError(
170+
"Unrecognized message-type {0}".format(header_map["message-type"])
171+
)
172+
173+
crc_bytes.write(payload_bytes)
147174

148-
def __extract_header(self, header, header_length):
175+
message_crc = self.response.read(4)
176+
if len(message_crc) == 0:
177+
return b''
178+
179+
if not validate_crc(crc_bytes.getvalue(),
180+
message_crc):
181+
self.close()
182+
raise CRCValidationError(
183+
{"Checksum Mismatch, MessageCRC of " +
184+
str(calculate_crc(crc_bytes.getvalue())) +
185+
" does not equal expected CRC of " +
186+
str(byte_int(message_crc))})
187+
188+
return return_bytes
189+
190+
def __extract_header(self, header_bytes):
149191
"""
150-
populates the header map after reading the header
192+
populates the header map after reading the header in bytes
151193
"""
152194
header_map = {}
153195
header_byte_parsed = 0
154196
# While loop ends when all the headers present are read
155197
# header contains multipe headers
156-
while header_byte_parsed < header_length:
157-
header_name_byte_length = \
158-
byte_int(header[header_byte_parsed: header_byte_parsed+1])
198+
while header_byte_parsed < len(header_bytes):
199+
header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1])
159200
header_byte_parsed += 1
160201
header_name = \
161-
header[header_byte_parsed:
162-
header_byte_parsed+header_name_byte_length]
202+
header_bytes[header_byte_parsed:
203+
header_byte_parsed+header_name_byte_length]
163204
header_byte_parsed += header_name_byte_length
164205
# Header Value Type is of 1 bytes and is skipped
165206
header_byte_parsed += 1
166207
value_string_byte_length = \
167-
byte_int(header[header_byte_parsed:
168-
header_byte_parsed+2])
208+
byte_int(header_bytes[header_byte_parsed:
209+
header_byte_parsed+2])
169210
header_byte_parsed += 2
170211
header_value = \
171-
header[header_byte_parsed:
172-
header_byte_parsed+value_string_byte_length]
212+
header_bytes[header_byte_parsed:
213+
header_byte_parsed+value_string_byte_length]
173214
header_byte_parsed += value_string_byte_length
174215
header_map[header_name.decode("utf-8").lstrip(":")] = \
175216
header_value.decode("utf-8").lstrip(":")
@@ -188,77 +229,6 @@ def __read_stats(self, stats):
188229
elif attribute.tag == 'BytesReturned':
189230
self.stat['BytesReturned'] = attribute.text
190231

191-
def __parse_message(self, header_map, payload, payload_length, record):
192-
'''
193-
Parses the message
194-
'''
195-
if header_map["message-type"] == ERROR:
196-
error = header_map["error-code"] + ":\"" +\
197-
header_map["error-message"] + "\""
198-
if header_map["message-type"] == EVENT:
199-
# Fetch the content-type
200-
content_type = header_map["content-type"]
201-
# Fetch the event-type
202-
event_type = header_map["event-type"]
203-
if event_type == EVENT_RECORDS:
204-
record += payload[0:payload_length]
205-
elif event_type == EVENT_PROGRESS:
206-
if content_type == "text/xml":
207-
progress = payload[0:payload_length]
208-
elif event_type == EVENT_STATS:
209-
if content_type == "text/xml":
210-
self.__read_stats(payload[0:payload_length])
211-
212-
def __decode_message(self, message, rec):
213-
end_status = False
214-
total_byte_length = message[0:4] # total_byte_length is of 4 bytes
215-
headers_byte_length = message[4: 8] # headers_byte_length is 4 bytes
216-
prelude_crc = message[8:12] # prelude_crc is of 4 bytes
217-
header = message[12:12+byte_int(headers_byte_length)]
218-
payload_length = byte_int(total_byte_length) - \
219-
byte_int(headers_byte_length) - int(16)
220-
payload = message[12 + byte_int(headers_byte_length):
221-
12 + byte_int(headers_byte_length) + payload_length]
222-
message_crc = message[12 + byte_int(headers_byte_length) +
223-
payload_length: 12 +
224-
byte_int(headers_byte_length) +
225-
payload_length + 4]
226-
227-
if not validate_crc(total_byte_length + headers_byte_length,
228-
prelude_crc):
229-
raise CRCValidationError(
230-
{"Checksum Mismatch, MessageCRC of " +
231-
str(calculate_crc(total_byte_length +
232-
headers_byte_length)) +
233-
" does not equal expected CRC of " +
234-
str(byte_int(prelude_crc))})
235-
236-
if not validate_crc(message[0:len(message)-4], message_crc):
237-
raise CRCValidationError(
238-
{"Checksum Mismatch, MessageCRC of " +
239-
str(calculate_crc(message)) +
240-
" does not equal expected CRC of " +
241-
str(byte_int(message_crc))})
242-
243-
header_map = self.__extract_header(header, byte_int(headers_byte_length))
244-
245-
if header_map["message-type"] == EVENT:
246-
# Parse message only when event-type is Records,
247-
# Progress, Stats. Break the loop if event type is End
248-
# Do nothing if event type is Cont
249-
if header_map["event-type"] == EVENT_RECORDS or \
250-
header_map["event-type"] == EVENT_PROGRESS or \
251-
header_map["event-type"] == EVENT_STATS:
252-
self.__parse_message(header_map, payload,
253-
payload_length, rec)
254-
255-
if header_map["event-type"] == EVENT_END:
256-
end_status = True
257-
if header_map["message-type"] == ERROR:
258-
self.__parse_message(header_map, payload, payload_length, rec)
259-
end_status = True
260-
return end_status
261-
262232
def __read(self, num_bytes):
263233
"""
264234
extract each record from the response body ... and buffer it.
@@ -269,26 +239,32 @@ def __read(self, num_bytes):
269239
res = self.__extract_message()
270240
if len(res) == 0:
271241
return b''
242+
elif res == b'continue':
243+
return res
272244
else:
273245
self.remaining_bytes = res
274246

247+
result = self.remaining_bytes
275248
if num_bytes < len(self.remaining_bytes):
276249
result = self.remaining_bytes[:num_bytes]
277-
del self.remaining_bytes[:num_bytes]
278-
return result
279-
else:
280-
left_in_buffer = self.remaining_bytes[:len(self.remaining_bytes)]
281-
del self.remaining_bytes[:len(left_in_buffer)]
282-
return left_in_buffer
283250

284-
def stream(self, num_bytes):
251+
self.remaining_bytes = self.remaining_bytes[len(result):]
252+
return result
253+
254+
def stream(self, num_bytes=32*1024):
285255
"""
286256
streams the response
287257
"""
288-
while True:
258+
while not self.response.isclosed():
289259
x = self.__read(num_bytes)
290260
if x == b'':
291261
break
262+
if x == b'continue':
263+
continue
292264
elif len(x) < num_bytes:
293265
x += self.__read(num_bytes-len(x))
294-
yield x.decode('utf-8') if isinstance(x, bytearray) else x
266+
if sys.version_info.major == 3:
267+
yield x.decode('utf-8', errors='ignore')
268+
else:
269+
# Python 2.x needs explicit conversion.
270+
yield x.decode('utf-8', errors='ignore').encode('utf-8')

0 commit comments

Comments
 (0)