Skip to content

Commit 295e0a9

Browse files
committed
Implement select object
Selects and filters out data from stored object. Client for select feature
1 parent 7393d66 commit 295e0a9

File tree

9 files changed

+715
-4
lines changed

9 files changed

+715
-4
lines changed

docs/API.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ s3Client = Minio('s3.amazonaws.com',
4040
| | [`fput_object`](#fput_object) | | |
4141
| | [`fget_object`](#fget_object) | | |
4242
| | [`get_partial_object`](#get_partial_object) | | |
43+
| | [`select_object_content`](#select_object_content) | | |
4344

4445
## 1. Constructor
4546

@@ -668,6 +669,70 @@ except ResponseError as err:
668669
print(err)
669670
```
670671

672+
<a name="select_object_content"></a>
673+
### select_object_content(self, bucket_name, object_name, obj)
674+
Select object content filters the contents of object based on a simple structured query language (SQL).
675+
676+
__Parameters__
677+
678+
|Param |Type |Description |
679+
|:---|:---|:---|
680+
|``bucket_name`` |_string_ |Name of the bucket. |
681+
|``object_name`` |_string_ |Name of the object. |
682+
|``obj`` | _Object_ | Query Options |
683+
684+
685+
__Return Value__
686+
687+
|Param |Type |Description |
688+
|:---|:---|:---|
689+
|``object`` | _urllib3.response.HTTPResponse_ |Represents http streaming reader. |
690+
691+
__Example__
692+
693+
694+
```py
695+
client = Minio('s3.amazonaws.com',
696+
access_key='YOUR-ACCESSKEY',
697+
secret_key='YOUR-SECRETKEY')
698+
699+
obj = SelectObjectOptions(
700+
expression="select * from s3object",
701+
input_serialization = InputSerialization(compression_type="NONE",
702+
csv=CSVInput(FileHeaderInfo="USE",
703+
RecordDelimiter="\n",
704+
FieldDelimiter=",",
705+
QuoteCharacter='"',
706+
QuoteEscapeCharacter='"',
707+
Comments="#",
708+
AllowQuotedRecordDelimiter="FALSE"
709+
),
710+
),
711+
output_serialization=OutputSerialization(
712+
csv = CSVOutput(
713+
QuoteFields="ASNEEDED",
714+
RecordDelimiter="\n",
715+
FieldDelimiter=",",
716+
QuoteCharacter='"',
717+
QuoteEscapeCharacter='"',
718+
)
719+
),
720+
request_progress=RequestProgress(
721+
enabled="TRUE"
722+
)
723+
)
724+
725+
726+
try:
727+
records, stats = client.select_object_content('sinha', 'test.csv', obj)
728+
with open('my-record', 'wb') as record_data:
729+
record_data.write(records)
730+
with open('my-stat', 'wb') as stat_data:
731+
stat_data.write(stats)
732+
except ResponseError as err:
733+
print(err)
734+
```
735+
671736
<a name="fget_object"></a>
672737
### fget_object(bucket_name, object_name, file_path, request_headers=None)
673738
Downloads and saves the object as a file in the local filesystem.

examples/select_object_content.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# -*- coding: utf-8 -*-
2+
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) 2019 MinIO, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from minio import Minio
18+
from minio.error import ResponseError
19+
from minio import SelectObjectReader
20+
from minio.select_object_reader import CRCValidationError
21+
from minio.select_object_options import (SelectObjectOptions, CSVInput, JSONInput, RequestProgress, ParquetInput, InputSerialization, OutputSerialization, CSVOutput, JsonOutput)
22+
23+
client = Minio('s3.amazonaws.com',
24+
access_key='YOUR-ACCESSKEY',
25+
secret_key='YOUR-SECRETKEY')
26+
27+
obj = SelectObjectOptions(
28+
expression=" select * from s3object",
29+
input_serialization = InputSerialization(compression_type="NONE",
30+
csv=CSVInput(FileHeaderInfo="USE",
31+
RecordDelimiter="\n",
32+
FieldDelimiter=",",
33+
QuoteCharacter='"',
34+
QuoteEscapeCharacter='"',
35+
Comments="#",
36+
AllowQuotedRecordDelimiter="FALSE"
37+
),
38+
),
39+
# If input is JSON
40+
# json = JSONInput(Type="DOCUMENT"
41+
# )
42+
# ),
43+
44+
output_serialization=OutputSerialization(
45+
csv = CSVOutput(
46+
QuoteFields="ASNEEDED",
47+
RecordDelimiter="\n",
48+
FieldDelimiter=",",
49+
QuoteCharacter='"',
50+
QuoteEscapeCharacter='"',
51+
)
52+
),
53+
54+
# If output is JSON
55+
# output_serialization=OutputSerialization(
56+
# json = JsonOutput(
57+
# RecordDelimiter="\n"
58+
# )
59+
# ),
60+
request_progress=RequestProgress(
61+
enabled="TRUE"
62+
)
63+
)
64+
65+
try:
66+
response_stream = client.select_object_content('<YOUR_BUCKET>', '<YOUR_OBJECT>', obj)
67+
data = SelectObjectReader(response_stream)
68+
file = open('my-record', 'w')
69+
while True :
70+
records = data.read(500)
71+
file.write(records)
72+
if records == "" :
73+
file.close()
74+
break
75+
76+
# Get the stats of message
77+
file = open('my-stat', 'w')
78+
stat = ""
79+
for k, v in data.stats_message().items():
80+
stat += k + " : " + v + "\n"
81+
file.write(stat)
82+
file.close()
83+
84+
except CRCValidationError as err:
85+
print(err)
86+
except ResponseError as err:
87+
print(err)

minio/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
__author__ = 'MinIO, Inc.'
3232
__version__ = '4.0.19'
3333
__license__ = 'Apache 2.0'
34-
__copyright__ = 'Copyright 2015, 2016, 2017 MinIO, Inc.'
34+
__copyright__ = 'Copyright 2015, 2016, 2017, 2018, 2019 MinIO, Inc.'
3535

3636
from .api import Minio
3737
from .error import ResponseError
3838
from .post_policy import PostPolicy
3939
from .copy_conditions import CopyConditions
4040
from .definitions import Bucket, Object
41+
from .select_object_reader import SelectObjectReader

minio/api.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
is_valid_source_sse_object,
7777
is_valid_bucket_notification_config, is_valid_policy_type,
7878
mkdir_p, dump_http, amzprefix_user_metadata,
79-
is_supported_header,is_amz_header)
79+
is_supported_header,is_amz_header,is_valid_input_type)
8080
from .helpers import (MAX_MULTIPART_OBJECT_SIZE,
8181
MAX_PART_SIZE,
8282
MAX_POOL_SIZE,
@@ -90,7 +90,8 @@
9090
from .xml_marshal import (xml_marshal_bucket_constraint,
9191
xml_marshal_complete_multipart_upload,
9292
xml_marshal_bucket_notifications,
93-
xml_marshal_delete_objects)
93+
xml_marshal_delete_objects,
94+
xml_marshal_select)
9495
from .fold_case_dict import FoldCaseDict
9596
from .thread_pool import ThreadPool
9697

@@ -235,6 +236,40 @@ def trace_off(self):
235236
"""
236237
self._trace_output_stream = None
237238

239+
240+
def select_object_content(self, bucket_name,
241+
object_name,
242+
obj):
243+
244+
'''
245+
Select from object
246+
'''
247+
is_valid_bucket_name(bucket_name)
248+
is_non_empty_string(object_name)
249+
is_valid_input_type(object_name)
250+
251+
content = xml_marshal_select(obj)
252+
url_values = dict()
253+
url_values["select"] = ""
254+
url_values["select-type"] = "2"
255+
256+
headers = {
257+
'Content-Length': str(len(content)),
258+
'Content-Md5': get_md5_base64digest(content)
259+
}
260+
content_sha256_hex = get_sha256_hexdigest(content)
261+
return self._url_open(
262+
'POST',
263+
bucket_name=bucket_name,
264+
object_name=object_name,
265+
query=url_values,
266+
headers=headers,
267+
body=content,
268+
content_sha256=content_sha256_hex,
269+
preload_content=False)
270+
271+
272+
238273
# Bucket level
239274
def make_bucket(self, bucket_name, location='us-east-1'):
240275
"""

minio/helpers.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
MIN_PART_SIZE = 5 * 1024 * 1024 # 5MiB
5656
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB
5757

58+
READ_SIZE_SELECT = 32 * 1024 # Buffer size for select object content
59+
SQL = 'SQL'
60+
RECORDS = 'Records'
61+
PROGRESS = 'Progress'
62+
STATS ='Stats'
63+
EVENT = 'event'
64+
END = 'End'
65+
ERROR = 'error'
66+
5867
_VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$')
5968
_ALLOWED_HOSTNAME_REGEX = re.compile(
6069
'^((?!-)(?!_)[A-Z_\\d-]{1,63}(?<!-)(?<!_)\\.)*((?!_)(?!-)[A-Z_\\d-]{1,63}(?<!-)(?<!_))$',
@@ -696,3 +705,18 @@ def is_supported_header(key):
696705
# returns true if header is a storage class header
697706
def is_storageclass_header(key):
698707
return key.lower() == "x-amz-storage-class"
708+
709+
# return the input type if valid i.e. csv, json, parquet, par
710+
# or raise an InvalidArgumentError
711+
def is_valid_input_type(object_name):
712+
if object_name.__contains__('.csv') | object_name.__contains__('.CSV') :
713+
return 'csv'
714+
elif object_name.__contains__('.json') | object_name.__contains__('.JSON') :
715+
return 'json'
716+
elif object_name.__contains__('.parquet') | object_name.__contains__('.par') :
717+
return 'parquet'
718+
else:
719+
raise InvalidArgumentError('Input type not supported '
720+
' Only csv, json and parquet.')
721+
722+

minio/select_object_options.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# -*- coding: utf-8 -*-
2+
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) 2019 MinIO, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
18+
This module cretaes the request for Select
19+
20+
:copyright: (c) 2019 by MinIO, Inc.
21+
:license: Apache 2.0, see LICENSE for more details.
22+
23+
"""
24+
from .helpers import (SQL)
25+
26+
class CSVInput:
27+
"""CSVInput.
28+
29+
"""
30+
def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
31+
FieldDelimiter=",", QuoteCharacter='"',
32+
QuoteEscapeCharacter='"', Comments="#",
33+
AllowQuotedRecordDelimiter=False):
34+
self.FileHeaderInfo = FileHeaderInfo
35+
self.RecordDelimiter = RecordDelimiter
36+
self.FieldDelimiter = FieldDelimiter
37+
self.QuoteCharacter = QuoteCharacter
38+
self.QuoteEscapeCharacter = QuoteEscapeCharacter
39+
self.Comments = Comments
40+
self.AllowQuotedRecordDelimiter = AllowQuotedRecordDelimiter
41+
42+
class JSONInput:
43+
"""JSONInput.
44+
45+
"""
46+
def __init__(self, Type=None):
47+
self.Type = Type
48+
49+
class ParquetInput:
50+
"""ParquetInput.
51+
52+
"""
53+
class InputSerialization:
54+
"""InputSerialization.
55+
56+
"""
57+
def __init__(self, compression_type="NONE", csv=None, json=None, par=None ):
58+
self.compression_type = compression_type
59+
self.csv_input = csv
60+
self.json_input = json
61+
self.parquet_input = par
62+
63+
64+
65+
class CSVOutput:
66+
"""CSVOutput.
67+
68+
"""
69+
def __init__(self, QuoteFields="ASNEEDED", RecordDelimiter="\n",
70+
FieldDelimiter=",", QuoteCharacter='"',
71+
QuoteEscapeCharacter='"'):
72+
self.QuoteFields = QuoteFields
73+
self.RecordDelimiter = RecordDelimiter
74+
self.FieldDelimiter = FieldDelimiter
75+
self.QuoteCharacter = QuoteCharacter
76+
self.QuoteEscapeCharacter = QuoteEscapeCharacter
77+
78+
class JsonOutput:
79+
"""
80+
JsonOutput.
81+
82+
"""
83+
def __init__(self, RecordDelimiter="\n"):
84+
self.RecordDelimiter = RecordDelimiter
85+
86+
87+
88+
class OutputSerialization:
89+
"""OutputSerialization.
90+
91+
"""
92+
def __init__(self, csv=None, json=None ):
93+
self.csv_output = csv
94+
self.json_output = json
95+
96+
class RequestProgress:
97+
"""RequestProgress.
98+
99+
"""
100+
def __init__(self, enabled=False):
101+
self.enabled = enabled
102+
103+
class SelectObjectOptions:
104+
"""SelectObjectOptions.
105+
106+
"""
107+
expression_type = SQL
108+
def __init__(self, expression, input_serialization, output_serialization, request_progress):
109+
self.expression = expression
110+
self.in_ser = input_serialization
111+
self.out_ser = output_serialization
112+
self.req_progress = request_progress

0 commit comments

Comments
 (0)