Skip to content

Handle cluster token overlap issue more gracefully and provide clear failure reasons during migration #386

@msmygit

Description

@msmygit

When the Cassandra-based cluster is created with token overlap (mostly due to incorrect build-up like starting multiple nodes at the same time causing this token overlap problem), CDM Migrate job fails with errors such as below,

Exception in thread "main" java.util.NoSuchElementException: No value present
      at java.base/java.util.Optional.get(Optional.java:148)
      at com.datastax.cdm.schema.CqlTable.setCqlMetadata(CqlTable.java:435)
      at com.datastax.cdm.schema.CqlTable.<init>(CqlTable.java:100)
      at com.datastax.cdm.cql.EnhancedSession.<init>(EnhancedSession.java:55)
      at com.datastax.cdm.job.AbstractJobSession.<init>(AbstractJobSession.java:71)
      at com.datastax.cdm.job.AbstractJobSession.<init>(AbstractJobSession.java:47)
      at com.datastax.cdm.job.CopyJobSession.<init>(CopyJobSession.java:53)
      at com.datastax.cdm.job.CopyJobSessionFactory.getInstance(CopyJobSessionFactory.java:32)
      at com.datastax.cdm.job.Migrate$.$anonfun$execute$2(Migrate.scala:34)
      at com.datastax.cdm.job.Migrate$.$anonfun$execute$2$adapted(Migrate.scala:33)
      at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
      at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
      at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
      at com.datastax.cdm.job.Migrate$.$anonfun$execute$1(Migrate.scala:33)
      at com.datastax.cdm.job.Migrate$.$anonfun$execute$1$adapted(Migrate.scala:32)
      at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
      at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
      at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
      at com.datastax.cdm.job.Migrate$.execute(Migrate.scala:32)
      at com.datastax.cdm.job.Migrate$.delayedEndpoint$com$datastax$cdm$job$Migrate$1(Migrate.scala:27)
      at com.datastax.cdm.job.Migrate$delayedInit$body.apply(Migrate.scala:24)
      at scala.Function0.apply$mcV$sp(Function0.scala:39)
      at scala.Function0.apply$mcV$sp$(Function0.scala:39)
      at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
      at scala.App.$anonfun$main$1(App.scala:76)
      at scala.App.$anonfun$main$1$adapted(App.scala:76)
      at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
      at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
      at scala.App.main(App.scala:76)
      at scala.App.main$(App.scala:74)
      at com.datastax.cdm.job.BaseJob.main(BaseJob.scala:34)
      at com.datastax.cdm.job.Migrate.main(Migrate.scala)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
      at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:1029)
      at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
      at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
      at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
      at org.apache.spark.deploy.SparkSubmit$anon$2.doSubmit(SparkSubmit.scala:1120)
      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  25/08/07 13:46:55 INFO SparkContext: Invoking stop() from shutdown hook
  25/08/07 13:46:55 INFO SparkContext: SparkContext is stopping with exitCode 0.

and / or similar to the below example stack (with min and max token filters provided),

25/09/07 13:18:27 INFO SplitPartitions: ThreadID: 1 Splitting min: -3955596820872449124 max: -3074457345618258604
   25/09/07 13:18:31 INFO Migrate$: PARAM Calculated -- Total Partitions: 1000000
   25/09/07 13:18:31 INFO Migrate$: Spark parallelize created : 1000000 slices!
   25/09/07 13:18:31 INFO CopyJobSession: PARAM -- Origin Rate Limit: 50000.0
   25/09/07 13:18:31 INFO CopyJobSession: PARAM -- Target Rate Limit: 50000.0
   25/09/07 13:18:31 WARN CachingCodecRegistry: [s0] Ignoring codec com.datastax.cdm.cql.codec.BigInteger_BIGINTCodec@442310fd because it collides with built-in primitive codec 
  com.datastax.oss.driver.internal.core.type.codec.IntCodec@23263ba
   25/09/07 13:18:31 WARN CachingCodecRegistry: [s1] Ignoring codec com.datastax.cdm.cql.codec.BigInteger_BIGINTCodec@7836c79 because it collides with built-in primitive codec 
  com.datastax.oss.driver.internal.core.type.codec.IntCodec@23263ba
   Exception in thread "main" java.util.NoSuchElementException: No value present
         at java.base/java.util.Optional.get(Optional.java:148)
         at com.datastax.cdm.schema.CqlTable.setCqlMetadata(CqlTable.java:435)
         at com.datastax.cdm.schema.CqlTable.<init>(CqlTable.java:100)
         at com.datastax.cdm.cql.EnhancedSession.<init>(EnhancedSession.java:55)
         at com.datastax.cdm.job.AbstractJobSession.<init>(AbstractJobSession.java:71)
         at com.datastax.cdm.job.AbstractJobSession.<init>(AbstractJobSession.java:47)
         at com.datastax.cdm.job.CopyJobSession.<init>(CopyJobSession.java:53)
         at com.datastax.cdm.job.CopyJobSessionFactory.getInstance(CopyJobSessionFactory.java:32)
         at com.datastax.cdm.job.Migrate$.$anonfun$execute$2(Migrate.scala:34)
         at com.datastax.cdm.job.Migrate$.$anonfun$execute$2$adapted(Migrate.scala:33)
         at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
         at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
         at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
         at com.datastax.cdm.job.Migrate$.$anonfun$execute$1(Migrate.scala:33)
         at com.datastax.cdm.job.Migrate$.$anonfun$execute$1$adapted(Migrate.scala:32)
         at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
         at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
         at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
         at com.datastax.cdm.job.Migrate$.execute(Migrate.scala:32)
         at com.datastax.cdm.job.Migrate$.delayedEndpoint$com$datastax$cdm$job$Migrate$1(Migrate.scala:27)
         at com.datastax.cdm.job.Migrate$delayedInit$body.apply(Migrate.scala:24)
         at scala.Function0.apply$mcV$sp(Function0.scala:39)
         at scala.Function0.apply$mcV$sp$(Function0.scala:39)
         at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
         at scala.App.$anonfun$main$1(App.scala:76)
         at scala.App.$anonfun$main$1$adapted(App.scala:76)
         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
         at scala.App.main(App.scala:76)
         at scala.App.main$(App.scala:74)
         at com.datastax.cdm.job.BaseJob.main(BaseJob.scala:34)
         at com.datastax.cdm.job.Migrate.main(Migrate.scala)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
         at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:1034)
         at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:199)
         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:222)
         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
         at org.apache.spark.deploy.SparkSubmit$anon$2.doSubmit(SparkSubmit.scala:1125)
         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1134)
         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   25/09/07 13:18:31 INFO SparkContext: Invoking stop() from shutdown hook
   25/09/07 13:18:31 INFO SparkContext: SparkContext is stopping with exitCode 0.
   25/09/07 13:18:31 INFO SparkUI: Stopped Spark web UI at http://10.186.86.30:4040
   25/09/07 13:18:31 INFO CassandraConnector: Disconnected from Cassandra cluster.
   25/09/07 13:18:31 INFO StandaloneSchedulerBackend: Shutting down all executors
   25/09/07 13:18:31 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down
   25/09/07 13:18:31 INFO CassandraConnector: Disconnected from Cassandra cluster.
   25/09/07 13:18:31 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector
   25/09/07 13:18:31 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   25/09/07 13:18:31 INFO MemoryStore: MemoryStore cleared
   25/09/07 13:18:31 INFO BlockManager: BlockManager stopped
   25/09/07 13:18:32 INFO BlockManagerMaster: BlockManagerMaster stopped
   25/09/07 13:18:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   25/09/07 13:18:32 INFO SparkContext: Successfully stopped SparkContext

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions