Skip to content

Commit 087ef9a

Browse files
authored
feat: Support chunks in resource resolvers (#2287)
#### Summary Fixes cloudquery/cloudquery#14601 ~~Still untested, opening for reference~~ Tested this in cloudquery/cloudquery-private#9520 (internal link) ---
1 parent 8080660 commit 087ef9a

File tree

5 files changed

+229
-85
lines changed

5 files changed

+229
-85
lines changed

scheduler/queue/worker.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1515
"github.com/cloudquery/plugin-sdk/v4/schema"
1616
"github.com/rs/zerolog"
17+
"github.com/samber/lo"
1718
"go.opentelemetry.io/otel"
1819
"go.opentelemetry.io/otel/attribute"
1920
"go.opentelemetry.io/otel/trace"
@@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
140141
go func() {
141142
defer close(resourcesChan)
142143
var wg sync.WaitGroup
143-
for i := range resourcesSlice {
144-
i := i
144+
chunks := [][]any{resourcesSlice}
145+
if table.PreResourceChunkResolver != nil {
146+
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
147+
}
148+
for i := range chunks {
145149
wg.Add(1)
146150
go func() {
147151
defer wg.Done()
148-
resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser)
149-
if resolvedResource == nil {
150-
return
151-
}
152-
153-
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
154-
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
155-
w.metrics.AddErrors(ctx, 1, selector)
156-
return
157-
}
158-
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
159-
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
160-
}
161-
if err := resolvedResource.Validate(); err != nil {
162-
switch err.(type) {
163-
case *schema.PKError:
164-
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
152+
resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser)
153+
for _, resolvedResource := range resolvedResources {
154+
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
155+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
165156
w.metrics.AddErrors(ctx, 1, selector)
166157
return
167-
case *schema.PKComponentError:
168-
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
169158
}
170-
}
171-
select {
172-
case resourcesChan <- resolvedResource:
173-
case <-ctx.Done():
159+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
160+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
161+
}
162+
if err := resolvedResource.Validate(); err != nil {
163+
switch err.(type) {
164+
case *schema.PKError:
165+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
166+
w.metrics.AddErrors(ctx, 1, selector)
167+
return
168+
case *schema.PKComponentError:
169+
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
170+
}
171+
}
172+
select {
173+
case resourcesChan <- resolvedResource:
174+
case <-ctx.Done():
175+
}
174176
}
175177
}()
176178
}

scheduler/resolvers/resolvers.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric
4141
}
4242
}
4343

44-
func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource {
44+
func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource {
4545
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
4646
defer cancel()
4747

48-
resource := schema.NewResourceData(table, parent, item)
48+
resources := make([]*schema.Resource, len(chunk))
49+
for i, item := range chunk {
50+
resources[i] = schema.NewResourceData(table, parent, item)
51+
}
4952
objectStartTime := time.Now()
5053

5154
clientID := client.ID()
@@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric
6063
m.AddPanics(ctx, 1, selector)
6164
}
6265
}()
63-
if table.PreResourceResolver != nil {
64-
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
65-
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
66+
67+
if table.PreResourceChunkResolver != nil {
68+
if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil {
69+
tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error")
6670
m.AddErrors(ctx, 1, selector)
6771
return nil
6872
}
6973
}
7074

71-
for _, column := range table.Columns {
72-
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
75+
if table.PreResourceResolver != nil {
76+
for _, resource := range resources {
77+
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
78+
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
79+
m.AddErrors(ctx, 1, selector)
80+
return nil
81+
}
82+
}
83+
}
84+
for _, resource := range resources {
85+
for _, column := range table.Columns {
86+
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
87+
}
7388
}
7489

7590
if table.PostResourceResolver != nil {
76-
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
77-
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
78-
m.AddErrors(ctx, 1, selector)
91+
for _, resource := range resources {
92+
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
93+
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
94+
m.AddErrors(ctx, 1, selector)
95+
}
7996
}
8097
}
8198

82-
m.AddResources(ctx, 1, selector)
83-
return resource
99+
m.AddResources(ctx, int64(len(resources)), selector)
100+
return resources
84101
}

scheduler/scheduler_dfs.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1414
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1515
"github.com/cloudquery/plugin-sdk/v4/schema"
16+
"github.com/samber/lo"
1617
"go.opentelemetry.io/otel"
1718
"go.opentelemetry.io/otel/attribute"
1819
"go.opentelemetry.io/otel/trace"
@@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
157158
go func() {
158159
defer close(resourcesChan)
159160
var wg sync.WaitGroup
160-
for i := range resourcesSlice {
161-
i := i
161+
chunks := [][]any{resourcesSlice}
162+
if table.PreResourceChunkResolver != nil {
163+
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
164+
}
165+
for i := range chunks {
162166
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
163167
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
164168
resourceSem := resourceSemVal.(*semaphore.Weighted)
@@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
183187
defer resourceSem.Release(1)
184188
defer s.scheduler.resourceSem.Release(1)
185189
defer wg.Done()
186-
//nolint:all
187-
resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser)
188-
if resolvedResource == nil {
190+
resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser)
191+
if len(resolvedResources) == 0 {
189192
return
190193
}
191194

192-
if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
193-
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
194-
s.metrics.AddErrors(ctx, 1, selector)
195-
return
196-
}
197-
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
198-
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
199-
}
200-
if err := resolvedResource.Validate(); err != nil {
201-
switch err.(type) {
202-
case *schema.PKError:
203-
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
195+
for _, resolvedResource := range resolvedResources {
196+
if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
197+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
204198
s.metrics.AddErrors(ctx, 1, selector)
205199
return
206-
case *schema.PKComponentError:
207-
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
208200
}
209-
}
210-
select {
211-
case resourcesChan <- resolvedResource:
212-
case <-ctx.Done():
201+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
202+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
203+
}
204+
if err := resolvedResource.Validate(); err != nil {
205+
switch err.(type) {
206+
case *schema.PKError:
207+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
208+
s.metrics.AddErrors(ctx, 1, selector)
209+
return
210+
case *schema.PKComponentError:
211+
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
212+
}
213+
}
214+
select {
215+
case resourcesChan <- resolvedResource:
216+
case <-ctx.Done():
217+
}
213218
}
214219
}()
215220
}

0 commit comments

Comments
 (0)