44 BucketSource ,
55 BucketSourceType ,
66 RequestedStream ,
7+ RequestJwtPayload ,
78 RequestParameters ,
89 ResolvedBucket ,
910 SqlSyncRules
@@ -28,10 +29,9 @@ export interface BucketChecksumStateOptions {
2829 syncContext : SyncContext ;
2930 bucketStorage : BucketChecksumStateStorage ;
3031 syncRules : SqlSyncRules ;
31- syncParams : RequestParameters ;
32+ tokenPayload : RequestJwtPayload ;
3233 syncRequest : util . StreamingSyncRequest ;
3334 logger ?: Logger ;
34- initialBucketPositions ?: { name : string ; after : util . InternalOpId } [ ] ;
3535}
3636
3737type BucketSyncState = {
@@ -79,14 +79,14 @@ export class BucketChecksumState {
7979 options . syncContext ,
8080 options . bucketStorage ,
8181 options . syncRules ,
82- options . syncParams ,
82+ options . tokenPayload ,
8383 options . syncRequest ,
8484 this . logger
8585 ) ;
8686 this . bucketDataPositions = new Map ( ) ;
8787
88- for ( let { name, after : start } of options . initialBucketPositions ?? [ ] ) {
89- this . bucketDataPositions . set ( name , { start_op_id : start } ) ;
88+ for ( let { name, after : start } of options . syncRequest . buckets ?? [ ] ) {
89+ this . bucketDataPositions . set ( name , { start_op_id : BigInt ( start ) } ) ;
9090 }
9191 }
9292
@@ -199,7 +199,7 @@ export class BucketChecksumState {
199199 } ) ) ;
200200 bucketsToFetch = [ ...generateBucketsToFetch ] . map ( ( b ) => {
201201 return {
202- ... bucketDescriptionMap . get ( b ) ! ,
202+ priority : bucketDescriptionMap . get ( b ) ! . priority ,
203203 bucket : b
204204 } ;
205205 } ) ;
@@ -233,11 +233,11 @@ export class BucketChecksumState {
233233 message += `buckets: ${ allBuckets . length } ${ limitedBuckets ( allBuckets , 20 ) } ` ;
234234 this . logger . info ( message , { checkpoint : base . checkpoint , user_id : user_id , buckets : allBuckets . length } ) ;
235235 } ;
236- bucketsToFetch = allBuckets ;
236+ bucketsToFetch = allBuckets . map ( ( b ) => ( { bucket : b . bucket , priority : b . priority } ) ) ;
237237
238238 const subscriptions : util . StreamDescription [ ] = [ ] ;
239239 for ( const source of this . parameterState . syncRules . bucketSources ) {
240- if ( source . type == BucketSourceType . SYNC_STREAM && this . parameterState . isSubscribedToStream ( source ) ) {
240+ if ( this . parameterState . isSubscribedToStream ( source ) ) {
241241 subscriptions . push ( {
242242 name : source . name ,
243243 is_default : source . subscribedToByDefault
@@ -360,7 +360,11 @@ export class BucketParameterState {
360360 public readonly syncRules : SqlSyncRules ;
361361 public readonly syncParams : RequestParameters ;
362362 private readonly querier : BucketParameterQuerier ;
363- private readonly staticBuckets : Map < string , BucketDescription > ;
363+ /**
364+ * Static buckets. This map is guaranteed not to change during a request, since resolving static buckets can only
365+ * take request parameters into account,
366+ */
367+ private readonly staticBuckets : Map < string , ResolvedBucket > ;
364368 private readonly includeDefaultStreams : boolean ;
365369 // Indexed by the client-side id
366370 private readonly explicitStreamSubscriptions : Record < string , util . RequestedStreamSubscription > ;
@@ -375,22 +379,22 @@ export class BucketParameterState {
375379 context : SyncContext ,
376380 bucketStorage : BucketChecksumStateStorage ,
377381 syncRules : SqlSyncRules ,
378- syncParams : RequestParameters ,
382+ tokenPayload : RequestJwtPayload ,
379383 request : util . StreamingSyncRequest ,
380384 logger : Logger
381385 ) {
382386 this . context = context ;
383387 this . bucketStorage = bucketStorage ;
384388 this . syncRules = syncRules ;
385- this . syncParams = syncParams ;
389+ this . syncParams = new RequestParameters ( tokenPayload , request . parameters ?? { } ) ;
386390 this . logger = logger ;
387391
388392 const idToStreamSubscription : Record < string , util . RequestedStreamSubscription > = { } ;
389393 const streamsByName : Record < string , RequestedStream [ ] > = { } ;
390- const subscriptions = request . subscriptions ;
394+ const subscriptions = request . streams ;
391395 if ( subscriptions ) {
392- for ( const subscription of subscriptions . opened ) {
393- idToStreamSubscription [ subscription . stream ] = subscription ;
396+ for ( const subscription of subscriptions . subscriptions ) {
397+ idToStreamSubscription [ subscription . client_id ] = subscription ;
394398
395399 const syncRuleStream : RequestedStream = {
396400 parameters : subscription . parameters ?? { } ,
@@ -412,7 +416,7 @@ export class BucketParameterState {
412416 streams : streamsByName
413417 } ) ;
414418
415- this . staticBuckets = new Map < string , BucketDescription > (
419+ this . staticBuckets = new Map < string , ResolvedBucket > (
416420 mergeBuckets ( this . querier . staticBuckets ) . map ( ( b ) => [ b . bucket , b ] )
417421 ) ;
418422 this . lookups = new Set < string > ( this . querier . parameterQueryLookups . map ( ( l ) => JSONBig . stringify ( l . values ) ) ) ;
@@ -441,14 +445,13 @@ export class BucketParameterState {
441445 }
442446
443447 return {
444- definition : description . definition ,
445448 bucket : description . bucket ,
446449 priority : priorityOverride ?? description . priority ,
447450 subscriptions : description . inclusion_reasons . map ( ( reason ) => {
448451 if ( reason == 'default' ) {
449- return { def : description . definition } ;
452+ return { default : 0 } ; // TODO
450453 } else {
451- return { sub : reason . subscription } ;
454+ return reason . subscription ;
452455 }
453456 } )
454457 } ;
@@ -489,19 +492,19 @@ export class BucketParameterState {
489492 * For static buckets, we can keep track of which buckets have been updated.
490493 */
491494 private async getCheckpointUpdateStatic ( checkpoint : storage . StorageCheckpointUpdate ) : Promise < CheckpointUpdate > {
492- const querier = this . querier ;
495+ const staticBuckets = [ ... this . staticBuckets . values ( ) ] ;
493496 const update = checkpoint . update ;
494497
495498 if ( update . invalidateDataBuckets ) {
496499 return {
497- buckets : querier . staticBuckets ,
500+ buckets : staticBuckets ,
498501 updatedBuckets : INVALIDATE_ALL_BUCKETS
499502 } ;
500503 }
501504
502505 const updatedBuckets = new Set < string > ( getIntersection ( this . staticBuckets , update . updatedDataBuckets ) ) ;
503506 return {
504- buckets : querier . staticBuckets ,
507+ buckets : staticBuckets ,
505508 updatedBuckets
506509 } ;
507510 }
@@ -512,7 +515,7 @@ export class BucketParameterState {
512515 private async getCheckpointUpdateDynamic ( checkpoint : storage . StorageCheckpointUpdate ) : Promise < CheckpointUpdate > {
513516 const querier = this . querier ;
514517 const storage = this . bucketStorage ;
515- const staticBuckets = querier . staticBuckets ;
518+ const staticBuckets = this . staticBuckets . values ( ) ;
516519 const update = checkpoint . update ;
517520
518521 let hasParameterChange = false ;
@@ -556,7 +559,7 @@ export class BucketParameterState {
556559 }
557560 }
558561 }
559- const allBuckets = [ ...staticBuckets , ...dynamicBuckets ] ;
562+ const allBuckets = [ ...staticBuckets , ...mergeBuckets ( dynamicBuckets ) ] ;
560563
561564 if ( invalidateDataBuckets ) {
562565 return {
@@ -632,15 +635,15 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number)
632635 * bucket.
633636 */
634637function mergeBuckets ( buckets : ResolvedBucket [ ] ) : ResolvedBucket [ ] {
635- const byDefinition : Record < string , ResolvedBucket > = { } ;
638+ const byBucketId : Record < string , ResolvedBucket > = { } ;
636639
637640 for ( const bucket of buckets ) {
638- if ( Object . hasOwn ( byDefinition , bucket . definition ) ) {
639- byDefinition [ bucket . definition ] . inclusion_reasons . push ( ...bucket . inclusion_reasons ) ;
641+ if ( Object . hasOwn ( byBucketId , bucket . bucket ) ) {
642+ byBucketId [ bucket . bucket ] . inclusion_reasons . push ( ...bucket . inclusion_reasons ) ;
640643 } else {
641- byDefinition [ bucket . definition ] = bucket ;
644+ byBucketId [ bucket . bucket ] = structuredClone ( bucket ) ;
642645 }
643646 }
644647
645- return Object . values ( byDefinition ) ;
648+ return Object . values ( byBucketId ) ;
646649}
0 commit comments