Skip to content

KAFKA-20194: Ensure backward compatibility for Session Stores#21934

Open
mjsax wants to merge 3 commits intoapache:trunkfrom
mjsax:kafka-20194-ensure-backward-compatiblity-session
Open

KAFKA-20194: Ensure backward compatibility for Session Stores#21934
mjsax wants to merge 3 commits intoapache:trunkfrom
mjsax:kafka-20194-ensure-backward-compatiblity-session

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Apr 2, 2026

By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.

This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.

By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.

This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.
@mjsax mjsax added streams kip Requires or implements a KIP labels Apr 2, 2026

public KeyValueStoreMaterializer(
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indent 8 for breaking line is OK too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have this change in many other files as well. So maybe your changes are intentional!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't see why we would use indent 8? It very weird IMHO. I believe it's some IntelliJ default setting, which I changes on my end.

&& ((MaterializedStoreFactory<?, ?, ?>) storeFactory).materialized.equals(materialized);
}

}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the changes of this class? just indent change?


public RocksDbSessionBytesStoreSupplier(final String name,
final long retentionPeriod,
final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same idea that I had in the other prs: #21904 (comment)
No need to answer if we have convinced eachother in other PRs. Thanks


@Override
public KeyValueIterator<Windowed<K>, AggregationWithHeaders<V>> fetch(final K key) {
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups.

return KeyValue.pair(next.key, AggregationWithHeaders.make(next.value, new RecordHeaders()));
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to implement managesOffsets and committedOffset too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we -- we implement WrappedStateStore which take care of forwarding the call to the inner store.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into the code, WrappedStateStore does not implement flush() atm -- flush was just updated to commit(final Map<TopicPartition, Long> changelogOffsets) ... not sure if this is actually correct... Will do a follow up PR to look into it.

\cc @bbejeck (this issue was introduced by a PR from @nicktelford)

} catch (final ClassCastException swallow) {
// not plain session store

// Try headers-aware sessopm store
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try headers-aware sessopm store
// Try headers-aware session store

} catch (final ClassCastException swallow) {
// not plain session store

// Try headers-aware sessopm store
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try headers-aware sessopm store
// Try headers-aware session store

@@ -0,0 +1,162 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add JavaDoc to this class (and also other adapter classes in the prev 2 PRs) explaining when/why this adapter is used?

Copy link
Copy Markdown
Member Author

@mjsax mjsax Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first two PRs, it's already there... Will add here, too.

))
: (WindowBytesStoreSupplier) materialized.storeSupplier();

final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder = Stores
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change should happen here?! It's missed!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is for session-store only -- we need one more PR for sliding window case.

@SuppressWarnings("resource")
private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class<?> innerClass,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have integration tests for sliding windows here as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like other two PRs:

processorShouldAccessKStreamSlidingWindowReducedKTableStoreAsTimestampedStore()
processorShouldAccessKStreamSlidingWindowAggregatedKTableStoreAsTimestampedStore()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be one more PR sliding windows. I'll keep this comment in mind to see what's missing and add on the new PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants