Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions scheduler/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
wg.Add(1)
go func() {
defer wg.Done()
resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser)
if resolvedResource == nil {
return
}

if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser)
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
41 changes: 29 additions & 12 deletions scheduler/resolvers/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric
}
}

func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource {
func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

resource := schema.NewResourceData(table, parent, item)
resources := make([]*schema.Resource, len(chunk))
for i, item := range chunk {
resources[i] = schema.NewResourceData(table, parent, item)
}
objectStartTime := time.Now()

clientID := client.ID()
Expand All @@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric
m.AddPanics(ctx, 1, selector)
}
}()
if table.PreResourceResolver != nil {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")

if table.PreResourceChunkResolver != nil {
if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil {
tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error")
m.AddErrors(ctx, 1, selector)
return nil
}
}

for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
if table.PreResourceResolver != nil {
for _, resource := range resources {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
m.AddErrors(ctx, 1, selector)
return nil
}
}
}
for _, resource := range resources {
for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
}
}

if table.PostResourceResolver != nil {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
for _, resource := range resources {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
}
}
}

m.AddResources(ctx, 1, selector)
return resource
m.AddResources(ctx, int64(len(resources)), selector)
return resources
}
51 changes: 28 additions & 23 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
resourceSem := resourceSemVal.(*semaphore.Weighted)
Expand All @@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
defer resourceSem.Release(1)
defer s.scheduler.resourceSem.Release(1)
defer wg.Done()
//nolint:all
resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser)
if resolvedResource == nil {
resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser)
if len(resolvedResources) == 0 {
return
}

if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
Loading
Loading