atoti_kafka package¶
Module contents¶
Plugin to load real time Kafka streams into atoti stores.
This package is required to use atoti.store.Store.load_kafka()
.
-
atoti_kafka.
JSON_DESERIALIZER
= KafkaDeserializer(name='io.atoti.loading.kafka.impl.serialization.JsonDeserializer')¶ Core JSON deserializer.
Each JSON object corresponds to a row of the store, keys of the JSON object must match columns of the store.
-
atoti_kafka.
create_deserializer
(callback, *, batch_size=1)¶ Return a custom Kafka deserializer.
- Parameters
callback (
Callable
[[str
],Mapping
[str
,Any
]]) – Function taking the record as a string and returning a mapping from a store column name to its value.batch_size (
int
) –Size of the batch of records.
If
batch_size > 1
the records will be batched then deserialized when either the batch size is reached or the batch_duration passed inatoti.store.Store.load_kafka()
has ellapsed. A bigger batch size means a higher latency but less store transactions so often better performance.
Example
Considering a store with columns
c_1
,c_2
andc_3
, for Kafka records shaped like:{"c_1": v_1, "c_2": v_2, "c_3": v_3}
, this callback would work:columns = store.columns def callback(record: str): values = json.loads(record) return {column: values[column] for column in columns}
v_1, v_2, v_3
, this callback would work:columns = store.columns def callback(record: str): values = record.split(", ") return {column: values[index] for index, column in enumerate(columns)}
- Return type
KafkaDeserializer