Skip to content

Conversation

@sinhaashish
Copy link
Contributor

@sinhaashish sinhaashish commented Jun 18, 2019

Selects and filters out data from stored object.
Client implementation for select feature
Closes #762

@sinhaashish sinhaashish force-pushed the select branch 18 times, most recently from fcae9ef to 44bc707 Compare June 24, 2019 17:35
@sinhaashish sinhaashish changed the title [WIP] Implement select feature Implement select feature Jun 24, 2019
@sinhaashish sinhaashish force-pushed the select branch 2 times, most recently from 74cde71 to 4f2e49b Compare June 25, 2019 02:51
@sinhaashish sinhaashish force-pushed the select branch 4 times, most recently from 295e0a9 to 6a8bddf Compare July 19, 2019 18:36
@sinhaashish sinhaashish force-pushed the select branch 2 times, most recently from 9579def to 0a56f7f Compare July 30, 2019 05:55
@sinhaashish
Copy link
Contributor Author

extract_message, extract_header, decode_message all need unit tests..

Added a functional test test_select_object_content . Could not write unit test for extract_message as it required and object of Select_object_reader which inturn needed a streaming response.

@sinhaashish
Copy link
Contributor Author

@harshavardhana @Praveenrajmani PTAL

Copy link
Member

@harshavardhana harshavardhana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some formatting

@harshavardhana
Copy link
Member

@sinhaashish one more issue

~ python --version
Python 2.7.16

~  python select_object_content.py 
Traceback (most recent call last):
  File "select_object_content.py", line 70, in <module>
    for d in data.stream(32*1024):
  File "/home/harsha/repos/minio-py/minio/select_object_reader.py", line 289, in stream
    x = self.read(num_bytes)
  File "/home/harsha/repos/minio-py/minio/select_object_reader.py", line 269, in read
    res = self.extract_message()
  File "/home/harsha/repos/minio-py/minio/select_object_reader.py", line 130, in extract_message
    if total_byte_parsed + byte_int(total_byte_length) > read_buffer:
  File "/home/harsha/repos/minio-py/minio/select_object_reader.py", line 56, in byte_int
    return int.from_bytes(data_bytes, byteorder='big')
AttributeError: type object 'int' has no attribute 'from_bytes'
~ python3 --version
Python 3.7.3

~ python3 select_object_content.py 
Traceback (most recent call last):
  File "select_object_content.py", line 70, in <module>
    for d in data.stream(32*1024):
  File "/home/harsha/repos/minio-py/minio/select_object_reader.py", line 294, in stream
    yield x.decode("utf-8")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf4 in position 6921: invalid continuation byte

@harshavardhana
Copy link
Member

Also, @sinhaashish found another major performance problem in calculate_crc() function CRC32() is being initialized again and again.

Rather than using PyCRC it is better to use the standard library function crc32 from binascii

  • for a 30MiB it takes 10 secs instead with this new change it now takes 2secs.
diff --git a/minio/select_object_reader.py b/minio/select_object_reader.py
index 6c3d2ce..21e675a 100644
--- a/minio/select_object_reader.py
+++ b/minio/select_object_reader.py
@@ -15,8 +15,11 @@
 # limitations under the License.
 
 
+from __future__ import unicode_literals
 import io
-from PyCRC.CRC32 import CRC32
+import codecs
+
+from binascii import crc32
 from xml.etree import cElementTree
 from .error import InvalidXMLError
 from xml.etree.cElementTree import ParseError
@@ -30,13 +33,11 @@ class CRCValidationError(Exception):
     Raised in case of CRC mismatch
     '''
 
-
 def calcuate_crc(value):
     '''
-    Returns the CRC using PyCRC
+    Returns the CRC using crc32
     '''
-    return CRC32().calculate(value)
-
+    return crc32(value) & 0xffffffff
 
 def validate_crc(current_value, expected_value):
     '''
@@ -53,8 +54,7 @@ def byte_int(data_bytes):
     '''
     Convert bytes to big-endian integer
     '''
-    return int.from_bytes(data_bytes, byteorder='big')
-
+    return int(codecs.encode(data_bytes, 'hex'), 16)
 
 class SelectObjectReader(object):
     """
@@ -291,4 +291,4 @@ class SelectObjectReader(object):
                 break
             elif len(x) < num_bytes:
                 x += self.read(num_bytes-len(x))
-            yield x.decode("utf-8")
+            yield str(x) if isinstance(x, bytearray) else x
diff --git a/setup.py b/setup.py
index 3d554ea..739c4ea 100644
--- a/setup.py
+++ b/setup.py
@@ -45,7 +45,6 @@ requires = [
     'pytz',
     'certifi',
     'python-dateutil',
-    'PyCRC',
 ]
 
 tests_requires = [

@sinhaashish sinhaashish force-pushed the select branch 2 times, most recently from eb39c48 to c29e76a Compare August 2, 2019 05:54
harshavardhana
harshavardhana previously approved these changes Aug 2, 2019
Copy link
Member

@harshavardhana harshavardhana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thanks for your patience.

@sinhaashish
Copy link
Contributor Author

LGTM - thanks for your patience.

Thanks for your much valued assistance.

@Praveenrajmani
Copy link
Collaborator

Praveenrajmani commented Aug 2, 2019

We could remove the unrelated diffs (because of the indents) Never mind. I think these are from older PRs which were left UN-indented with pep8

Copy link
Member

@vadmeste vadmeste left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

docs/API.md Outdated
|:---|:---|:---|
|``bucket_name`` |_string_ |Name of the bucket. |
|``object_name`` |_string_ |Name of the object. |
|``options`` | _Object_ | Query Options |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Object/SelectObjectOptions/ ?

docs/API.md Outdated

|Param |Type |Description |
|:---|:---|:---|
|``obj``|_Object_ |Select_object_reader object. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Object/SelectObjectReader/ ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

docs/API.md Outdated
record_data.write(d)

# Get the stats
print(data.stats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/stats/stats()/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

minio/api.py Outdated
# Select Object Content
def select_object_content(self, bucket_name, object_name, opts):
"""
It filters the contents of an object based on a simple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better comment would be Executes SQL requests on objects having data in CSV, JSON or Parquet formats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if len(chunked_message) == 0:
self.close()
return b''
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can gain an indentation level here since there is a return just before else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Praveenrajmani
Praveenrajmani previously approved these changes Aug 2, 2019
Copy link
Collaborator

@Praveenrajmani Praveenrajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just few comments. LGTM otherwise

log_output = LogOutput(client.copy_object, 'test_copy_object_no_copy_condition')
test_copy_object_no_copy_condition(client, log_output)

if sys.version_info.major == 3:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

log_output = LogOutput(client.get_bucket_notification, 'test_get_bucket_notification')
test_get_bucket_notification(client, log_output)

if sys.version_info.major == 3:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment on why we running on 3+ only ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@vadmeste
Copy link
Member

vadmeste commented Aug 2, 2019

@sinhaashish not sure what I am missing here, but the example with AWS S3 doesn't work for me:

import sys
sys.path.append('/path/to/minio-py/')

from minio import Minio
from minio.error import ResponseError
from minio.select_object_reader import CRCValidationError
from minio.select_object_options import (SelectObjectOptions, CSVInput,
                                         JSONInput, RequestProgress,
                                         ParquetInput, InputSerialization,
                                         OutputSerialization, CSVOutput,
                                         JsonOutput)

client = Minio('s3.amazonaws.com',
               access_key='xxxx',
               secret_key='xxxx')

options = SelectObjectOptions(
    expression="select * from s3object",
    input_serialization=InputSerialization(
        compression_type="NONE",
        csv=CSVInput(FileHeaderInfo="USE",
                     RecordDelimiter="\n",
                     FieldDelimiter=",",
                     QuoteCharacter='"',
                     QuoteEscapeCharacter='"',
                     Comments="#",
                     AllowQuotedRecordDelimiter="FALSE",
                    ),
        # If input is JSON
        # json=JSONInput(Type="DOCUMENT",)
        ),

    output_serialization=OutputSerialization(
        csv=CSVOutput(QuoteFields="ASNEEDED",
                      RecordDelimiter="\n",
                      FieldDelimiter=",",
                      QuoteCharacter='"',
                      QuoteEscapeCharacter='"',)

        # json = JsonOutput(
        #     RecordDelimiter="\n",
        #     )
        ),
    request_progress=RequestProgress(
        enabled="False"
        )
    )

try:
    data = client.select_object_content('vadmeste-testbucket', 'testdata.csv', options)

    # Get the records
    with open('my-record-file', 'w') as record_data:
        for d in data.stream(10*1024):
            record_data.write(d)

    # Get the stats
    print(data.stats())

except CRCValidationError as err:
    print(err)
except ResponseError as err:
    print(err)

The error is:

12:37 $ python select_object_content.py
Traceback (most recent call last):
  File "select_object_content.py", line 68, in <module>
    data = client.select_object_content('vadmeste-testbucket', 'testdata.csv', options)
  File "/home/vadmeste/work/python/minio-py/minio/api.py", line 274, in select_object_content
    preload_content=False)
  File "/home/vadmeste/work/python/minio-py/minio/api.py", line 1942, in _url_open
    object_name).get_exception()
minio.error.MalformedXML: MalformedXML: message: This happens when the user sends malformed xml (xml that doesn't conform to the published xsd) for the configuration.

@sinhaashish
Copy link
Contributor Author

sinhaashish commented Aug 4, 2019

@vadmeste you can comment AllowQuotedRecordDelimiter="False" and try once, for me it was successful. @harshavardhana Should i be using this field in XML. S3 docs has this field while minio-go sdk doesn't . Please confirm.

@sinhaashish
Copy link
Contributor Author

@vadmeste removing AllowQuotedRecordDelimiter="False" from example. As S3 docs says The AllowQuotedRecordDelimiters property is not supported. If this property is specified, the query fails. . Refer https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3select.html

@harshavardhana
Copy link
Member

@sinhaashish that is just spark select implementation limitation, the problem is XML marshalling bug on our end in minio-py, we are sending an incorrect XML to AWS S3

Selects and filters out data from stored object.
Client for select feature
@sinhaashish
Copy link
Contributor Author

@vadmeste the code runs with S3 now. PTAL

@vadmeste
Copy link
Member

vadmeste commented Aug 5, 2019

@sinhaashish s3/vadmeste-testbucket/testdata.csv contains a column called name which has a name with ' in it, but it is not returned correctly (in contrary to mc sql which returns it correcty)

False ascertainment

Copy link
Member

@vadmeste vadmeste left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM & tested

@harshavardhana harshavardhana merged commit 6a6dc7a into minio:master Aug 5, 2019
@sinhaashish sinhaashish deleted the select branch August 6, 2019 05:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement select_object_content API

4 participants