Skip to content

Commit f5704d7

Browse files
committed
[fix] fixed merge conflicts. rebased
2 parents c64cc9a + eeb2e42 commit f5704d7

File tree

13 files changed

+810
-12
lines changed

13 files changed

+810
-12
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
!**/*.go
44
!.gitignore
55
!.circleci/config.yml
6+
!/tests/*.bz2

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88

99
# RediSearch Go Client
10+
[![Mailing List](https://img.shields.io/badge/Mailing%20List-RediSearch-blue)](https://groups.google.com/forum/#!forum/redisearch)
11+
[![Gitter](https://badges.gitter.im/RedisLabs/RediSearch.svg)](https://gitter.im/RedisLabs/RediSearch?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)
1012

1113
Go client for [RediSearch](http://redisearch.io), based on redigo.
1214

redisearch/aggregate.go

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
package redisearch
2+
3+
import (
4+
"fmt"
5+
"github.com/garyburd/redigo/redis"
6+
"log"
7+
"reflect"
8+
)
9+
10+
// Projection
11+
type Projection struct {
12+
Expression string
13+
Alias string
14+
}
15+
16+
func NewProjection(expression string, alias string) *Projection {
17+
return &Projection{
18+
Expression: expression,
19+
Alias: alias,
20+
}
21+
}
22+
23+
func (p Projection) Serialize() redis.Args {
24+
args := redis.Args{"APPLY", p.Expression, "AS", p.Alias}
25+
return args
26+
}
27+
28+
// Cursor
29+
type Cursor struct {
30+
Id int
31+
Count int
32+
MaxIdle int
33+
}
34+
35+
func NewCursor() *Cursor {
36+
return &Cursor{
37+
Id: 0,
38+
Count: 0,
39+
MaxIdle: 0,
40+
}
41+
}
42+
43+
func (c *Cursor) SetId(id int) *Cursor {
44+
c.Id = id
45+
return c
46+
}
47+
48+
func (c *Cursor) SetCount(count int) *Cursor {
49+
c.Count = count
50+
return c
51+
}
52+
53+
func (c *Cursor) SetMaxIdle(maxIdle int) *Cursor {
54+
c.MaxIdle = maxIdle
55+
return c
56+
}
57+
58+
func (c Cursor) Serialize() redis.Args {
59+
args := redis.Args{"WITHCURSOR"}
60+
if c.Count > 0 {
61+
args = args.Add("COUNT", c.Count)
62+
}
63+
if c.MaxIdle > 0 {
64+
args = args.Add("MAXIDLE", c.MaxIdle)
65+
}
66+
return args
67+
}
68+
69+
// GroupBy
70+
type GroupBy struct {
71+
Fields []string
72+
Reducers []Reducer
73+
Paging *Paging
74+
}
75+
76+
func NewGroupBy() *GroupBy {
77+
return &GroupBy{
78+
Fields: make([]string, 0),
79+
Reducers: make([]Reducer, 0),
80+
Paging: nil,
81+
}
82+
}
83+
84+
func (g *GroupBy) AddFields(fields interface{}) *GroupBy {
85+
switch fields.(type) {
86+
case string:
87+
g.Fields = append(g.Fields, fields.(string))
88+
case []string:
89+
g.Fields = append(g.Fields, fields.([]string)...)
90+
default:
91+
return g
92+
}
93+
return g
94+
}
95+
96+
func (g *GroupBy) Reduce(reducer Reducer) *GroupBy {
97+
g.Reducers = append(g.Reducers, reducer)
98+
return g
99+
}
100+
101+
func (g *GroupBy) Limit(offset int, num int) *GroupBy {
102+
g.Paging = NewPaging(offset, num)
103+
return g
104+
}
105+
106+
func (g GroupBy) Serialize() redis.Args {
107+
ret := len(g.Fields)
108+
args := redis.Args{"GROUPBY", ret}.AddFlat(g.Fields)
109+
for _, reducer := range g.Reducers {
110+
args = args.AddFlat(reducer.Serialize())
111+
}
112+
if g.Paging != nil {
113+
args = args.AddFlat(g.Paging.serialize())
114+
}
115+
return args
116+
}
117+
118+
// AggregateQuery
119+
type AggregateQuery struct {
120+
Query *Query
121+
AggregatePlan redis.Args
122+
Paging *Paging
123+
Max int
124+
WithSchema bool
125+
Verbatim bool
126+
// TODO: add cursor
127+
WithCursor bool
128+
Cursor *Cursor
129+
// TODO: add load fields
130+
131+
}
132+
133+
func NewAggregateQuery() *AggregateQuery {
134+
return &AggregateQuery{
135+
Query: nil,
136+
Paging: nil,
137+
Max: 0,
138+
WithSchema: false,
139+
Verbatim: false,
140+
WithCursor: false,
141+
}
142+
}
143+
144+
func (a *AggregateQuery) SetQuery(query *Query) *AggregateQuery {
145+
a.Query = query
146+
return a
147+
}
148+
149+
func (a *AggregateQuery) SetWithSchema(value bool) *AggregateQuery {
150+
a.WithSchema = value
151+
return a
152+
}
153+
154+
func (a *AggregateQuery) SetVerbatim(value bool) *AggregateQuery {
155+
a.Verbatim = value
156+
return a
157+
}
158+
159+
func (a *AggregateQuery) SetMax(value int) *AggregateQuery {
160+
a.Max = value
161+
return a
162+
}
163+
164+
func (a *AggregateQuery) SetCursor(cursor *Cursor) *AggregateQuery {
165+
a.WithCursor = true
166+
a.Cursor = cursor
167+
return a
168+
}
169+
170+
func (a *AggregateQuery) CursorHasResults() (res bool) {
171+
res = false
172+
if !reflect.ValueOf(a.Cursor).IsNil() {
173+
res = a.Cursor.Id > 0
174+
}
175+
return
176+
}
177+
178+
//Adds a APPLY clause to the aggregate plan
179+
func (a *AggregateQuery) Apply(expression Projection) *AggregateQuery {
180+
a.AggregatePlan = a.AggregatePlan.AddFlat(expression.Serialize())
181+
return a
182+
}
183+
184+
//Sets the limit for the initial pool of results from the query.
185+
func (a *AggregateQuery) Limit(offset int, num int) *AggregateQuery {
186+
a.Paging = NewPaging(offset, num)
187+
return a
188+
}
189+
190+
//Adds a GROUPBY clause to the aggregate plan
191+
func (a *AggregateQuery) GroupBy(group GroupBy) *AggregateQuery {
192+
a.AggregatePlan = a.AggregatePlan.AddFlat(group.Serialize())
193+
return a
194+
}
195+
196+
//Adds a SORTBY clause to the aggregate plan
197+
func (a *AggregateQuery) SortBy(SortByProperties []SortingKey) *AggregateQuery {
198+
nsort := len(SortByProperties)
199+
if nsort > 0 {
200+
a.AggregatePlan = a.AggregatePlan.Add("SORTBY", nsort*2)
201+
for _, sortby := range SortByProperties {
202+
a.AggregatePlan = a.AggregatePlan.AddFlat(sortby.Serialize())
203+
}
204+
if a.Max > 0 {
205+
a.AggregatePlan = a.AggregatePlan.Add("MAX", a.Max)
206+
}
207+
}
208+
return a
209+
}
210+
211+
//Specify filters to filter the results using predicates relating to values in the result set.
212+
func (a *AggregateQuery) Filter(expression string) *AggregateQuery {
213+
a.AggregatePlan = a.AggregatePlan.Add("FILTER", expression)
214+
//a.Filters = append(a.Filters, expression)
215+
return a
216+
}
217+
218+
func (q AggregateQuery) Serialize() redis.Args {
219+
args := redis.Args{}
220+
if q.Query != nil {
221+
args = args.AddFlat(q.Query.serialize())
222+
} else {
223+
args = args.Add("*")
224+
}
225+
// WITHSCHEMA
226+
if q.WithSchema {
227+
args = args.AddFlat("WITHSCHEMA")
228+
}
229+
// VERBATIM
230+
if q.Verbatim {
231+
args = args.Add("VERBATIM")
232+
}
233+
234+
// WITHCURSOR
235+
if q.WithCursor {
236+
args = args.AddFlat(q.Cursor.Serialize())
237+
}
238+
239+
// TODO: add load fields
240+
241+
//Add the aggregation plan with ( GROUPBY and REDUCE | SORTBY | APPLY | FILTER ).+ clauses
242+
args = args.AddFlat(q.AggregatePlan)
243+
244+
// LIMIT
245+
if !reflect.ValueOf(q.Paging).IsNil() {
246+
args = args.Add("LIMIT", q.Paging.Offset, q.Paging.Num)
247+
}
248+
249+
return args
250+
}
251+
252+
func ProcessAggResponse(res []interface{}) [][]string {
253+
aggregateReply := make([][]string, len(res), len(res))
254+
for i := 0; i < len(res); i++ {
255+
if d, e := redis.Strings(res[i], nil); e == nil {
256+
aggregateReply[i] = d
257+
} else {
258+
log.Print("Error parsing Aggregate Reply: ", e)
259+
aggregateReply[i] = nil
260+
}
261+
}
262+
return aggregateReply
263+
}
264+
265+
func ProcessAggResponseSS(res []interface{}) [][]string {
266+
var lout = len(res)
267+
aggregateReply := make([][]string, lout, lout)
268+
for i := 0; i < lout; i++ {
269+
reply := res[i].([]interface{})
270+
linner := len(reply)
271+
aggregateReply[i] = make([]string, linner, linner)
272+
for j := 0; j < linner; j++ {
273+
if reply[j] == nil {
274+
log.Print(fmt.Sprintf("Error parsing Aggregate Reply on position (%d,%d)", i, j))
275+
} else {
276+
aggregateReply[i][j] = reply[j].(string)
277+
}
278+
279+
}
280+
}
281+
return aggregateReply
282+
}

0 commit comments

Comments
 (0)