KAFKA-20194: Ensure backward compatibility for Session Stores#21934
KAFKA-20194: Ensure backward compatibility for Session Stores#21934mjsax wants to merge 3 commits intoapache:trunkfrom
Conversation
By default, the DSL should expose it's state-stores as ts-stores, as long as header format is not enabled; otherwise, it would be a backward incompatible change. This PR ensures that the builders are creating the correct state stores, depending on the format, and we insert an adaptor to allow the DSL Processors to work only against headers-store interface.
|
|
||
| public KeyValueStoreMaterializer( | ||
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized | ||
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized |
There was a problem hiding this comment.
I think indent 8 for breaking line is OK too?
There was a problem hiding this comment.
you have this change in many other files as well. So maybe your changes are intentional!
There was a problem hiding this comment.
Yes, I don't see why we would use indent 8? It very weird IMHO. I believe it's some IntelliJ default setting, which I changes on my end.
| && ((MaterializedStoreFactory<?, ?, ?>) storeFactory).materialized.equals(materialized); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
What are the changes of this class? just indent change?
|
|
||
| public RocksDbSessionBytesStoreSupplier(final String name, | ||
| final long retentionPeriod, | ||
| final boolean withHeaders) { |
There was a problem hiding this comment.
The same idea that I had in the other prs: #21904 (comment)
No need to answer if we have convinced eachother in other PRs. Thanks
|
|
||
| @Override | ||
| public KeyValueIterator<Windowed<K>, AggregationWithHeaders<V>> fetch(final K key) { | ||
| return null; |
| return KeyValue.pair(next.key, AggregationWithHeaders.make(next.value, new RecordHeaders())); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Do we need to implement managesOffsets and committedOffset too?
There was a problem hiding this comment.
I don't think we -- we implement WrappedStateStore which take care of forwarding the call to the inner store.
There was a problem hiding this comment.
Looking into the code, WrappedStateStore does not implement flush() atm -- flush was just updated to commit(final Map<TopicPartition, Long> changelogOffsets) ... not sure if this is actually correct... Will do a follow up PR to look into it.
\cc @bbejeck (this issue was introduced by a PR from @nicktelford)
| } catch (final ClassCastException swallow) { | ||
| // not plain session store | ||
|
|
||
| // Try headers-aware sessopm store |
There was a problem hiding this comment.
| // Try headers-aware sessopm store | |
| // Try headers-aware session store |
| } catch (final ClassCastException swallow) { | ||
| // not plain session store | ||
|
|
||
| // Try headers-aware sessopm store |
There was a problem hiding this comment.
| // Try headers-aware sessopm store | |
| // Try headers-aware session store |
| @@ -0,0 +1,162 @@ | |||
| /* | |||
There was a problem hiding this comment.
Should we add JavaDoc to this class (and also other adapter classes in the prev 2 PRs) explaining when/why this adapter is used?
There was a problem hiding this comment.
For the first two PRs, it's already there... Will add here, too.
| )) | ||
| : (WindowBytesStoreSupplier) materialized.storeSupplier(); | ||
|
|
||
| final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder = Stores |
There was a problem hiding this comment.
The main change should happen here?! It's missed!
There was a problem hiding this comment.
This PR is for session-store only -- we need one more PR for sliding window case.
| @SuppressWarnings("resource") | ||
| private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer, | ||
| final Deserializer<V> valueDeserializer, | ||
| final Class<?> innerClass, |
There was a problem hiding this comment.
Should we have integration tests for sliding windows here as well?
There was a problem hiding this comment.
Like other two PRs:
processorShouldAccessKStreamSlidingWindowReducedKTableStoreAsTimestampedStore()
processorShouldAccessKStreamSlidingWindowAggregatedKTableStoreAsTimestampedStore()
There was a problem hiding this comment.
There will be one more PR sliding windows. I'll keep this comment in mind to see what's missing and add on the new PR.
By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.
This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.