diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index afec99ee59..be4864d628 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -14,6 +14,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien go func() { defer close(resourcesChan) var wg sync.WaitGroup - for i := range resourcesSlice { - i := i + chunks := [][]any{resourcesSlice} + if table.PreResourceChunkResolver != nil { + chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) + } + for i := range chunks { wg.Add(1) go func() { defer wg.Done() - resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser) - if resolvedResource == nil { - return - } - - if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - w.metrics.AddErrors(ctx, 1, selector) - return - } - if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := resolvedResource.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) + for _, resolvedResource := range resolvedResources { + if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") w.metrics.AddErrors(ctx, 1, selector) return - case *schema.PKComponentError: - w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - } - select { - case resourcesChan <- resolvedResource: - case <-ctx.Done(): + if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") + } + if err := resolvedResource.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + w.metrics.AddErrors(ctx, 1, selector) + return + case *schema.PKComponentError: + w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") + } + } + select { + case resourcesChan <- resolvedResource: + case <-ctx.Done(): + } } }() } diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index 56f8ead9e6..2d20aee9eb 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric } } -func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource { +func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource { ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - resource := schema.NewResourceData(table, parent, item) + resources := make([]*schema.Resource, len(chunk)) + for i, item := range chunk { + resources[i] = schema.NewResourceData(table, parent, item) + } objectStartTime := time.Now() clientID := client.ID() @@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric m.AddPanics(ctx, 1, selector) } }() - if table.PreResourceResolver != nil { - if err := table.PreResourceResolver(ctx, client, resource); err != nil { - tableLogger.Error().Err(err).Msg("pre resource resolver failed") + + if table.PreResourceChunkResolver != nil { + if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil { + tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error") m.AddErrors(ctx, 1, selector) return nil } } - for _, column := range table.Columns { - resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) + if table.PreResourceResolver != nil { + for _, resource := range resources { + if err := table.PreResourceResolver(ctx, client, resource); err != nil { + tableLogger.Error().Err(err).Msg("pre resource resolver failed") + m.AddErrors(ctx, 1, selector) + return nil + } + } + } + for _, resource := range resources { + for _, column := range table.Columns { + resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) + } } if table.PostResourceResolver != nil { - if err := table.PostResourceResolver(ctx, client, resource); err != nil { - tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") - m.AddErrors(ctx, 1, selector) + for _, resource := range resources { + if err := table.PostResourceResolver(ctx, client, resource); err != nil { + tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") + m.AddErrors(ctx, 1, selector) + } } } - m.AddResources(ctx, 1, selector) - return resource + m.AddResources(ctx, int64(len(resources)), selector) + return resources } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index dfdf4703d7..515cd9bc77 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -13,6 +13,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl go func() { defer close(resourcesChan) var wg sync.WaitGroup - for i := range resourcesSlice { - i := i + chunks := [][]any{resourcesSlice} + if table.PreResourceChunkResolver != nil { + chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) + } + for i := range chunks { resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource" resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency)) resourceSem := resourceSemVal.(*semaphore.Weighted) @@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl defer resourceSem.Release(1) defer s.scheduler.resourceSem.Release(1) defer wg.Done() - //nolint:all - resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser) - if resolvedResource == nil { + resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser) + if len(resolvedResources) == 0 { return } - if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - s.metrics.AddErrors(ctx, 1, selector) - return - } - if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := resolvedResource.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + for _, resolvedResource := range resolvedResources { + if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") s.metrics.AddErrors(ctx, 1, selector) return - case *schema.PKComponentError: - s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - } - select { - case resourcesChan <- resolvedResource: - case <-ctx.Done(): + if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") + } + if err := resolvedResource.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + s.metrics.AddErrors(ctx, 1, selector) + return + case *schema.PKComponentError: + s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") + } + } + select { + case resourcesChan <- resolvedResource: + case <-ctx.Done(): + } } }() } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index bd6effca83..7ca9f9901e 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -3,15 +3,18 @@ package scheduler import ( "context" "fmt" + "strconv" "testing" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scalar" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -184,9 +187,82 @@ func testTableRelationSuccess() *schema.Table { } } +const chunkSize = 200 + +func testTableSuccessWithRowsChunkResolverSendSingleItemToResChan() *schema.Table { + return &schema.Table{ + Name: "test_table_success_with_rows_chunk_resolver", + Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error { + for i := range chunkSize { + res <- i + } + return nil + }, + PreResourceChunkResolver: &schema.RowsChunkResolver{ + ChunkSize: chunkSize, + RowsResolver: func(_ context.Context, _ schema.ClientMeta, resourcesChunk []*schema.Resource) error { + for _, resource := range resourcesChunk { + _ = resource.Set("test_column", strconv.Itoa(resource.Item.(int))) + } + return nil + }, + }, + Columns: []schema.Column{ + { + Name: "test_column", + Type: arrow.BinaryTypes.String, + }, + }, + } +} + +func testTableSuccessWithRowsChunkResolverSendSliceToResChan() *schema.Table { + return &schema.Table{ + Name: "test_table_success_with_rows_chunk_resolver", + Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error { + data := make([]int, chunkSize) + for i := range chunkSize { + data[i] = i + } + res <- data + return nil + }, + PreResourceChunkResolver: &schema.RowsChunkResolver{ + ChunkSize: chunkSize, + RowsResolver: func(_ context.Context, _ schema.ClientMeta, resourcesChunk []*schema.Resource) error { + for _, resource := range resourcesChunk { + _ = resource.Set("test_column", strconv.Itoa(resource.Item.(int))) + } + return nil + }, + }, + Columns: []schema.Column{ + { + Name: "test_column", + Type: arrow.BinaryTypes.String, + }, + }, + } +} + +func expectedChunkedResolverData(s *arrow.Schema) []arrow.Record { + const rowsPerRecord = 50 + data := make([]arrow.Record, chunkSize/rowsPerRecord) + for i := range data { + builder := array.NewRecordBuilder(memory.DefaultAllocator, s) + for j := range rowsPerRecord { + builder.Field(0).(*array.StringBuilder).Append(strconv.Itoa(i*rowsPerRecord + j)) + } + record := builder.NewRecord() + data[i] = record + } + return data +} + type syncTestCase struct { table *schema.Table data []scalar.Vector + dataAsRecords []arrow.Record deterministicCQID bool err error } @@ -298,6 +374,14 @@ var syncTestCases = []syncTestCase{ }, }, }, + { + table: testTableSuccessWithRowsChunkResolverSendSingleItemToResChan(), + dataAsRecords: expectedChunkedResolverData(testTableSuccessWithRowsChunkResolverSendSingleItemToResChan().ToArrowSchema()), + }, + { + table: testTableSuccessWithRowsChunkResolverSendSliceToResChan(), + dataAsRecords: expectedChunkedResolverData(testTableSuccessWithRowsChunkResolverSendSliceToResChan().ToArrowSchema()), + }, } type optionsTestCase struct { @@ -348,36 +432,24 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist WithStrategy(strategy), }, extra...) sc := NewScheduler(opts...) - msgs := make(chan message.SyncMessage, 10) + // We use a buffer channel to avoid the complexity of testing using a Go routine, we just need the buffer to be big enough to contain all sync messages + msgs := make(chan message.SyncMessage, 500) err := sc.Sync(ctx, &c, tables, msgs, WithSyncDeterministicCQID(deterministicCQID)) require.ErrorIs(t, err, tc.err) close(msgs) - var i int + dataAsRecords := tc.dataAsRecords + if dataAsRecords == nil { + dataAsRecords = lo.Map(tc.data, func(item scalar.Vector, _ int) arrow.Record { + return item.ToArrowRecord(tc.table.ToArrowSchema()) + }) + } + + gotRecords := make([]arrow.Record, 0) for msg := range msgs { switch v := msg.(type) { case *message.SyncInsert: - record := v.Record - rec := tc.data[i].ToArrowRecord(record.Schema()) - if !array.RecordEqual(rec, record) { - // For records that include CqIDColumn, we can't verify equality because it is generated by the scheduler, unless deterministicCQID is true - onlyCqIDInequality := false - for col := range rec.Columns() { - if !deterministicCQID && rec.ColumnName(col) == schema.CqIDColumn.Name { - onlyCqIDInequality = true - continue - } - lc := rec.Column(col) - rc := record.Column(col) - if !array.Equal(lc, rc) { - onlyCqIDInequality = false - } - } - if !onlyCqIDInequality { - t.Fatalf("expected at i=%d: %v. got %v", i, tc.data[i], record) - } - } - i++ + gotRecords = append(gotRecords, v.Record) case *message.SyncMigrateTable: migratedTable := v.Table @@ -402,8 +474,47 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist t.Fatalf("expected insert message. got %T", msg) } } - if len(tc.data) != i { - t.Fatalf("expected %d resources. got %d", len(tc.data), i) + + // We do this since the SDK can batch rows into a single record, so we need to compare them as single row records + slicedExpectedRecords := make([]arrow.Record, 0) + for _, record := range dataAsRecords { + for j := int64(0); j < record.NumRows(); j++ { + slicedRecord := record.NewSlice(j, j+1) + slicedExpectedRecords = append(slicedExpectedRecords, slicedRecord) + } + } + gotSlicedRecords := make([]arrow.Record, 0) + for _, record := range gotRecords { + for j := int64(0); j < record.NumRows(); j++ { + slicedRecord := record.NewSlice(j, j+1) + gotSlicedRecords = append(gotSlicedRecords, slicedRecord) + } + } + if len(slicedExpectedRecords) != len(gotSlicedRecords) { + t.Fatalf("expected %d rows. got %d", len(slicedExpectedRecords), len(gotSlicedRecords)) + } + + for _, expectedRecord := range slicedExpectedRecords { + // Records can be returned in any order, so we need to find the matching record + _, found := lo.Find(gotSlicedRecords, func(gotRecord arrow.Record) bool { + if deterministicCQID { + return array.RecordEqual(gotRecord, expectedRecord) + } + for col := range gotRecord.Columns() { + // skip equality check for random CQID values + if gotRecord.ColumnName(col) == schema.CqIDColumn.Name { + continue + } + if !array.Equal(gotRecord.Column(col), expectedRecord.Column(col)) { + return false + } + } + return true + }) + + if !found { + t.Fatalf("expected record %v not found", expectedRecord) + } } } diff --git a/schema/table.go b/schema/table.go index 49c309c465..875d973714 100644 --- a/schema/table.go +++ b/schema/table.go @@ -25,6 +25,12 @@ type TableResolver func(ctx context.Context, meta ClientMeta, parent *Resource, type RowResolver func(ctx context.Context, meta ClientMeta, resource *Resource) error +// EXPERIMENTAL: RowsChunkResolver API might change in future versions of the SDK +type RowsChunkResolver struct { + ChunkSize int + RowsResolver func(ctx context.Context, meta ClientMeta, resourcesChunk []*Resource) error +} + type Multiplexer func(meta ClientMeta) []ClientMeta type Transform func(table *Table) error @@ -86,6 +92,9 @@ type Table struct { // PreResourceResolver is called before all columns are resolved but after Resource is created. The ordering of resolvers is: // (Table) Resolver → PreResourceResolver → ColumnResolvers → PostResourceResolver PreResourceResolver RowResolver `json:"-"` + + PreResourceChunkResolver *RowsChunkResolver `json:"-"` + // IsIncremental is a flag that indicates if the table is incremental or not. This flag mainly affects how the table is // documented. IsIncremental bool `json:"is_incremental"`