Skip to content

Commit

Permalink
tests: added very basic test that uses iceberg and redpanda connect
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Nov 27, 2024
1 parent e524a0d commit 8da1843
Showing 1 changed file with 180 additions and 0 deletions.
180 changes: 180 additions & 0 deletions tests/rptest/tests/datalake/simple_connect_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import json
import os
import tempfile
from rptest.archival.s3_client import S3Client
from rptest.clients.default import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.services.apache_iceberg_catalog import IcebergRESTCatalog
from rptest.services.cluster import cluster

from rptest.services.redpanda import SISettings, SchemaRegistryConfig
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer

from rptest.services.redpanda_connect import RedpandaConnectService
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until
import time
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix


class RedpandaConnectIcebergTest(RedpandaTest):

verifier_schema_avro = """
{
"type": "record",
"name": "VerifierRecord",
"fields": [
{
"name": "verifier_string",
"type": "string"
},
{
"name": "ordinal",
"type": "long"
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
"""

def __init__(self, test_context):
self._topic = None
super(RedpandaConnectIcebergTest, self).__init__(
test_context=test_context,
num_brokers=3,
si_settings=SISettings(test_context,
cloud_storage_enable_remote_read=False,
cloud_storage_enable_remote_write=False),
extra_rp_conf={
"iceberg_enabled": True,
"iceberg_catalog_commit_interval_ms": 10000
},
schema_registry_config=SchemaRegistryConfig())

def avro_stream_config(self, topic, subject, out_path):
return {
"input": {
"generate": {
"mapping": "root = counter()",
"interval": "",
"count": 1000,
"batch_size": 1
}
},
"pipeline": {
"processors": [
{
"mapping":
"""
root.ordinal = this
root.timestamp = timestamp_unix_milli()
root.verifier_string = uuid_v4()
"""
},
]
},
"output": {
"broker": {
"pattern":
"fan_out",
"outputs": [{
"redpanda": {
"seed_brokers": self.redpanda.brokers_list(),
"topic": topic,
},
"processors": [{
"schema_registry_encode": {
"url":
self.redpanda.schema_reg().split(",")[0],
"subject": subject,
"refresh_period": "10s"
}
}]
}, {
"file": {
"path": out_path,
"codec": "lines"
}
}]
}
}
}

def setUp(self):
pass

def _create_schema(self, subject: str, schema: str, schema_type="avro"):
rpk = RpkTool(self.redpanda)
with tempfile.NamedTemporaryFile(suffix=f".{schema_type}") as tf:
tf.write(bytes(schema, 'UTF-8'))
tf.seek(0)
rpk.create_schema(subject, tf.name)

@cluster(num_nodes=6)
@matrix(cloud_storage_type=supported_storage_types())
def test_translating_avro_serialized_records(self, cloud_storage_type):
topic_name = "ducky-topic"
with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[QueryEngineType.TRINO
]) as dl:

dl.create_iceberg_enabled_topic(
topic_name,
partitions=1,
replicas=3,
iceberg_mode="value_schema_id_prefix")

self._create_schema("verifier_schema", self.verifier_schema_avro)
connect = RedpandaConnectService(self.test_context, self.redpanda)
connect.start()

output_file = os.path.join(RedpandaConnectService.PERSISTENT_ROOT,
"verifier.log")
# create a stream
connect.start_stream(name="ducky_stream",
config=self.avro_stream_config(
topic_name,
"verifier_schema",
out_path=output_file))

connect.wait_for_stream_to_finish("ducky_stream")

cl = dl.catalog_service.client()
for n in cl.list_namespaces():
self.logger.info(f">>> namespace: {n}")
for t in cl.list_tables("redpanda"):
self.logger.info(f">>> table: {t}")

dl.wait_for_translation(topic_name, msg_count=1000)
connect.nodes[0].account.copy_from(output_file,
"/tmp/verifier.log")

with open("/tmp/verifier.log", "r") as f:
lines = f.readlines()
self.logger.info(f"Lines: {lines}")

r = dl.trino().run_query_fetch_all(
f"SELECT * FROM redpanda.{dl.trino().escape_table_name(topic_name)} ORDER BY ordinal"
)
expected = [json.loads(l) for l in lines]
expected.sort(key=lambda x: x["ordinal"])
for query, ex in zip(r, expected):
assert query[1] == ex[
"verifier_string"], f"{query[1]} != {ex['verifier_string']}"

assert dl.trino().partition_count(
"redpanda", topic_name,
0) == dl.trino().partition_distinct_offset_count(
"redpanda", topic_name,
0), "Exactly once translation semantics violated"

0 comments on commit 8da1843

Please sign in to comment.