Support Article
Cassandra cluster fails when processing large data flows
SA-54309
Summary
User has a set of large data flows which run to pull customer data from Hadoop into Cassandra for use during campaigns.
The volumes are around 14 million customers and 45 million accounts for those customers. Intermittently, the data flow will fail (error message shown below).
Observing the Cassandra logs generally shows at least one node showing a large amount of garbage collection.
The infrastructure setup is:
6 Cassandra nodes, organised into 2 data centers
30 data flow nodes, restricted to 1 thread per node.
Error Messages
Example exception from failed data flow. It shows that data flow cannot proceed because it cannot communicate with Cassandra nodes:
com.pega.dsm.dnode.api.ExceptionWithInputRecord: Could not save record of class [<Your Class>-Data-Customer] to a Data Set with applies to class [<Your Class>-Data-Customer] at
com.pega.dsm.dnode.impl.dataset.cassandra.CassandraSaveWithTTLOperation$4.emit(CassandraSaveWithTTLOperation.java:230) at
com.pega.dsm.dnode.impl.dataset.cassandra.CassandraDataEmitter.emit(CassandraDataEmitter.java:57) at
com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:326) at
com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:53) at
com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:99) at
com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:88) at
com.pega.dsm.dnode.impl.dataflow.SaveStageProcessor.onNext(SaveStageProcessor.java:107) at
com.pega.dsm.dnode.api.dataflow.DataFlowStageBatchProcessor.commitBatchInternal(DataFlowStageBatchProcessor.java:116) at
com.pega.dsm.dnode.api.dataflow.DataFlowStageBatchProcessor.commitBatch(DataFlowStageBatchProcessor.java:104) at
com.pega.dsm.dnode.api.dataflow.DataFlowStageBatchProcessor.onPulse(DataFlowStageBatchProcessor.java:84) at
com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageInputSubscriber.onPulse(DataFlowStage.java:433) at
com.pega.dsm.dnode.api.dataflow.DataFlowExecutor$QueueBasedDataFlowExecutor.runEventLoop(DataFlowExecutor.java:188) at
com.pega.dsm.dnode.api.dataflow.DataFlow$5.emit(DataFlow.java:366) at
com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:57) at
com.pega.dsm.dnode.impl.dataflow.service.LocalRun.execute(LocalRun.java:268) at
com.pega.dsm.dnode.impl.dataflow.service.LocalRun.lockAndRun(LocalRun.java:178) at
com.pega.dsm.dnode.impl.dataflow.service.LocalRun.run(LocalRun.java:84) at
com.pega.dsm.dnode.impl.dataflow.service.PickUpRun.run(PickUpRun.java:67) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at
java.util.concurrent.FutureTask.run(FutureTask.java:262) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:44) at
com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:41) at
com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) at
com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:84)
Caused by: java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: <your host>.<your domain>/<your ip>:9042 (com.datastax.driver.core.OperationTimedOutException: [<your host>.<your domain>/<your ip>:9042] Operation timed out)) at
com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) at
com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:272) at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:96) at
com.pega.dsm.dnode.impl.dataset.cassandra.CassandraSaveWithTTLOperation$SaveFuture.get(CassandraSaveWithTTLOperation.java:367) at
com.pega.dsm.dnode.impl.dataset.cassandra.CassandraSaveWithTTLOperation$4.emit(CassandraSaveWithTTLOperation.java:218) ... 25 more
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: <your host>.<your domain>/<your ip>:9042 (com.datastax.driver.core.OperationTimedOutException: [<your host>.<your domain>/<your ip>:9042] Operation timed out)) at
com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:217) at
com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:44) at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:276) at
com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:374) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
java.lang.Thread.run(Thread.java:745)
Steps to Reproduce
Run a data flow which writes a large amount of data into Cassandra from a large number of data flow nodes.
Root Cause
A defect or configuration issue in the operating environment.
Investigating Cassandra logs and Cassandra GC logs showed long jvm pauses.
Tests after reducing the number of DF nodes to 15 always works and are stable. This shows Cassandra nodes are stressed to the limit.
NMON data showed lots of paging occurring, Paging should be disabled on DDS nodes.
Resolution
Make the following change to the operating environment:
Disable paging on DDS nodes to help improve performance and stability.
Published July 23, 2018 - Updated October 8, 2020
Have a question? Get answers now.
Visit the Collaboration Center to ask questions, engage in discussions, share ideas, and help others.