Skip to content

Commit

Permalink
feat(cli): added cli option for ingestion source
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinkarchacryl committed Nov 27, 2024
1 parent 0ee758c commit 04a9af9
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,57 @@ def mcps(path: str) -> None:
sys.exit(ret)


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def list_sources(page_offset: int, page_size: int) -> None:
"""List ingestion sources with their number of executions"""

query = """
query listIngestionRuns($input: ListIngestionSourcesInput!) {
listIngestionSources(input: $input) {
ingestionSources {
urn
name
executions {
total
}
}
}
}
"""

variables = {
"input": {
"start": page_offset,
"count": page_size
}
}

client = get_default_graph()
session = client._session
gms_host = client.config.server

url = f"{gms_host}/api/graphql"
response = session.post(url, json={"query": query, "variables": variables})

data = response.json()

rows = []
if "data" in data and "listIngestionSources" in data["data"]:
sources = data["data"]["listIngestionSources"]["ingestionSources"]
for source in sources:
urn = source.get("urn", "N/A")
name = source.get("name", "N/A")
executions = source.get("executions", {})
total = executions.get("total", 0)
rows.append([urn, name, total])

click.echo(tabulate(rows, headers=["URN", "Name", "Total Executions"], tablefmt="grid") if rows else "No ingestion sources found.")


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
Expand Down

0 comments on commit 04a9af9

Please sign in to comment.