prefect-gcp
is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
To use prefect-gcp
and Cloud Run:
pip install prefect-gcp
To use Cloud Storage:
pip install "prefect-gcp[cloud_storage]"
To use BigQuery:
pip install "prefect-gcp[bigquery]"
To use Secret Manager:
pip install "prefect-gcp[secret_manager]"
To use Vertex AI:
pip install "prefect-gcp[aiplatform]"
A list of available blocks in prefect-gcp
and their setup instructions can be found here.
from prefect import flow
from prefect_gcp.cloud_storage import GcsBucket
@flow
def donwload_flow():
gcs_bucket = GcsBucket.load("my-bucket")
path = gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
return path
download_flow()
Save the following as prefect_gcp_flow.py
:
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob
@flow
def cloud_run_job_flow():
cloud_run_job = CloudRunJob(
image="us-docker.pkg.dev/cloudrun/container/job:latest",
credentials=GcpCredentials.load("MY_BLOCK_NAME"),
region="us-central1",
command=["echo", "hello world"],
)
return cloud_run_job.run()
Deploy prefect_gcp_flow.py
:
from prefect.deployments import Deployment
from prefect_gcp_flow import cloud_run_job_flow
deployment = Deployment.build_from_flow(
flow=cloud_run_job_flow,
name="cloud_run_job_deployment",
version=1,
work_queue_name="demo",
)
deployment.apply()
Run the deployment either on the UI or through the CLI:
prefect deployment run cloud-run-job-flow/cloud_run_job_deployment
Visit Prefect Deployments for more information about deployments.
To instantiate a Google Cloud client, like bigquery.Client
, GcpCredentials
is not a valid input. Instead, use the get_credentials_from_service_account
method.
import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
google_auth_credentials = gcp_credentials_block.get_credentials_from_service_account()
bigquery_client = bigquery.Client(credentials=google_auth_credentials)
Or simply call get_bigquery_client
from GcpCredentials
.
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
bigquery_client = gcp_credentials_block.get_bigquery_client()
Save the following as prefect_gcp_flow.py
:
from prefect import flow
from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.aiplatform import VertexAICustomTrainingJob
@flow
def vertex_ai_job_flow():
gcp_credentials = GcpCredentials.load("MY_BLOCK")
job = VertexAICustomTrainingJob(
command=["echo", "hello world"],
region="us-east1",
image="us-docker.pkg.dev/cloudrun/container/job:latest",
gcp_credentials=gcp_credentials,
)
job.run()
vertex_ai_job_flow()
Deploy prefect_gcp_flow.py
:
from prefect.deployments import Deployment
from prefect_gcp_flow import vertex_ai_job_flow
deployment = Deployment.build_from_flow(
flow=vertex_ai_job_flow,
name="vertex-ai-job-deployment",
version=1,
work_queue_name="demo",
)
deployment.apply()
Run the deployment either on the UI or through the CLI:
prefect deployment run vertex-ai-job-flow/vertex-ai-job-deployment
Visit Prefect Deployments for more information about deployments.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes
custom_download = cloud_storage_download_blob_as_bytes.with_options(
name="My custom task name",
retries=2,
retry_delay_seconds=10,
)
@flow
def example_with_options_flow():
gcp_credentials = GcpCredentials(
service_account_file="/path/to/service/account/keyfile.json")
contents = custom_download("bucket", "blob", gcp_credentials)
return contents()
example_with_options_flow()
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
If you encounter any bugs while using prefect-gcp
, feel free to open an issue in the prefect-gcp repository.
If you have any questions or issues while using prefect-gcp
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to ⭐️ or watch prefect-gcp
for updates too!
If you'd like to install a version of prefect-gcp
for development, clone the repository and perform an editable install with pip
:
git clone https://github.com/PrefectHQ/prefect-gcp.git
cd prefect-gcp/
pip install -e ".[dev]"
# Install linting pre-commit hooks
pre-commit install