atoti_kafka package¶
Module contents¶
Plugin to load real time Kafka streams into atoti tables.
This package is required to use atoti.table.Table.load_kafka()
.
- atoti_kafka.JSON_DESERIALIZER = KafkaDeserializer(name='io.atoti.loading.kafka.impl.serialization.JsonDeserializer')¶
Core JSON deserializer.
Each JSON object corresponds to a row of the table, keys of the JSON object must match columns of the table.
- atoti_kafka.create_deserializer(callback, *, batch_size=1)¶
Return a custom Kafka deserializer.
- Parameters
callback (
Callable
[[str
],Mapping
[str
,Any
]]) – Function taking the record as a string and returning a mapping from a table column name to its value.batch_size (
int
) –Size of the batch of records.
If
batch_size > 1
the records will be batched then deserialized when either the batch size is reached or the batch_duration passed inatoti.table.Table.load_kafka()
has ellapsed. A bigger batch size means a higher latency but less table transactions so often better performance.
Example
Considering a table with columns
c_1
,c_2
andc_3
, for Kafka records shaped like:{"c_1": v_1, "c_2": v_2, "c_3": v_3}
, this callback would work:columns = table.columns def callback(record: str): values = json.loads(record) return {column: values[column] for column in columns}
v_1, v_2, v_3
, this callback would work:columns = table.columns def callback(record: str): values = record.split(", ") return {column: values[index] for index, column in enumerate(columns)}
- Return type
KafkaDeserializer