rafaeltuelho
Mission Specialist
Mission Specialist
  • 2,041 Views

AD482: Kafka Streams Comprehensive Lab

Hi folks!

I just finished the AD482 (Developing EDA with Kafka...) and I have one technical question still hanging. The last LAB (Comprehensive Review) on Chapter #7 has the following solution code for the last step #7.1:

 

// Aggregate enriched measurements
enrichedSensorMeasurements
    .groupBy(
        (sensorId, measurement) -> measurement.gardenName,
        Grouped.with(Serdes.String(), sensorMeasurementEnrichedSerde)
    )
    .windowedBy(
        TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
    )
    .aggregate(
        GardenStatus::new,
        (gardenName, measurement, gardenStatus) ->
            gardenStatus.updateWith(measurement),
        Materialized
            .<String, GardenStatus, WindowStore<Bytes, byte[]>>as(
                    "garden-status-store"
                )
                .withKeySerde(Serdes.String())
                .withValueSerde(gardenStatusSerde))
//... code continues

 

 

My point is related to the use of the Materialized Configuration to create a WindowStore state store for the aggregation result. I just wanna understand why is this step necessary since in CH03S05 the lecture text states that:

"The third argument specifies the name of the table store and the serdes to materialize the table to the store. This argument is only required if you intend to use the interactive queries feature to query the table. If you do not specify a store name, then Kafka Streams assigns an internal, non-queryable store to the table. .."

I tried to remove the Materialized parameter from the aggregate method call just to see the behavior.

 

        // Aggregate enriched measurements
        enrichedSensorMeasurements
        .groupBy(
            (sensorId, measurement) -> measurement.gardenName,
            Grouped.with(Serdes.String(), sensorMeasurementEnrichedSerde)
        )
        .windowedBy(
            TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
        )
        .aggregate(
            GardenStatus::new, 
            (sensorName, measurement, gardenStatus) -> {
                gardenStatus.updateWith(measurement);
                return gardenStatus;
            }
	    //, // why do I need this if I do not intend to read the store?
            //Materialized.<String, GardenStatus, WindowStore<Bytes, byte[]>>as("garden-status-store")
            //    .withKeySerde(Serdes.String())
            //    .withValueSerde(gardenStatusSerde)
        )
        .toStream()
        .map((windowedGardenName, gardenStatus) -> KeyValue.<Void, GardenStatus>pair(null, gardenStatus))
        .to(GARDEN_STATUS_EVENTS_TOPIC, Produced.with(Serdes.Void(), gardenStatusSerde));
//...

 

 

The app compiles and runs but I get the following error when the stream processing starts:

2022-04-12 20:08:35,148 ERROR [org.apa.kaf.str.KafkaStreams] (garden-streams-comprehensive-review-13909e70-8ea9-4c77-925f-b0d22079cff6-StreamThread-1) stream-client [garden-streams-comprehensive-review-13909e70-8ea9-4c77-925f-b0d22079cff6] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. : 
org.apache.kafka.streams.errors.StreamsException:
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.redhat.training.gardens.model.GardenStatus). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$3(MeteredWindowStore.java:173)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:172)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:168)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:147)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.ClassCastException: class com.redhat.training.gardens.model.GardenStatus cannot be cast to class [B (com.redhat.training.gardens.model.GardenStatus is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @36deaa6a; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
... 20 more

I would appreciate it if someone could shed a light on this scenario.

Thanks in advance.

Labels (2)
0 Kudos
3 Replies
AykutBulgu
Mission Specialist
Mission Specialist
  • 1,935 Views

Hi @rafaeltuelho,

We examined your issue and decided to reword the part that explains `This argument is only required if you intend to use the interactive queries feature to query the table.` which might be a bit misleading.

In your case commenting out that particular part means it will use ByteArraySerializer, which is the default, rather than StringSerializer which is set in the commented out part. 

For the code itself, there is no need for a change as there is nothing wrong with that part. 

Thanks for raising this, we will work on it soon and reword the misleading part. 

Best,

Aykut

0 Kudos
rafaeltuelho
Mission Specialist
Mission Specialist
  • 1,920 Views

Thanks for commenting on this @AykutBulgu !

No problem. I just wanted to better understand the code. 

I got that it serializes the result using a custom SerDes, but my question was more of "is this the only way to accomplish that?". Since "garden-status-store" is never used later in the code.

0 Kudos
AykutBulgu
Mission Specialist
Mission Specialist
  • 1,918 Views

For the SerDe part its not the only way. You can override the Kafka Streams default Serde (that is too a ByteArray one) to a StringSerDe and remove the following parts. That should work.

 .withKeySerde(Serdes.String())
 .withValueSerde(gardenStatusSerde)

If you are using interactive queries better to have custom SerDe because it might change per query. 

0 Kudos
Join the discussion
You must log in to join this conversation.