tyit
2023-06-20 18:39:55 +08:00
CREATE TABLE ngx_acc_log_entry (
"@timestamp" DateTime,
dissect String,
uri String,
http_code String,
remote_ip String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'your_topic',
kafka_group_name = 'your_group',
kafka_format = 'JSONEachRow',
kafka_row_format = 'FlatJson';
INSERT INTO ngx_acc_log_entry
SELECT
"@timestamp",
dissect,
JSONExtractString(dissect, 'uri') AS uri,
JSONExtractString(dissect, 'http_code') AS http_code,
JSONExtractString(dissect, 'remote_ip') AS remote_ip
FROM kafka;