Use Case · data-pipeline

How to build an AI data pipeline agent.

Every data team has the same long tail: a CSV showed up in a new bucket, an upstream schema added a column, a vendor changed an enum value. Each one needs a human to diagnose and patch a staging model. That work is a tool-using loop in disguise — list, sample, EXPLAIN, propose SQL, dry-run, commit — and the limiting factor is whether your agent can call SQL, object storage, and the warehouse with consistent typed tools.

Four drivers, four auth models, four shapes.

Postgres has its protocol. S3 has its signing flavor. Snowflake has a JSON-over-HTTPS API. BigQuery has its own. Build a generic agent and you’re writing four drivers before you write a single useful prompt. Each one has its own error envelope the model has to learn to parse.

The pragmatic path: put a thin REST surface in front of each (PostgREST, S3 REST, native Snowflake/BigQuery REST), point a tool gateway at them, and let the agent call uniformly-shaped MCP tools. The bytes that flow between agent and tool are the same shape across systems — the model spends its attention on the data problem, not the SDK problem.

wmcp.sh is that gateway. /api/v1/tools?url=... turns each REST surface into schema-valid MCP tools, scoped to whatever methods you whitelist. wmcp.sh is not affiliated with Snowflake, Google Cloud, Amazon Web Services, or any data vendor.

Schedule → extract → transform → load.

1. Scheduler. A cron or workflow runner (Airflow, Dagster, Cloudflare Cron) kicks the agent for a specific pipeline ID with a bounded turn budget and a bytes-scanned cap.

2. Tool gateway (wmcp.sh). The agent materializes tools for SQL (Postgres via PostgREST, or your warehouse REST endpoint), object storage (S3, R2, or GCS via their REST APIs), and the destination warehouse — Snowflake or BigQuery — through /integration/openapi.

3. Reasoning loop. The agent inspects the source schema, lists new files in the staging bucket, drafts a transform SQL, runs EXPLAIN to check cost, then either dry-runs the load or queues the SQL for human approval depending on policy.

4. Audit + observability. Every tool call lands in your warehouse’s audit table. /managed wires this into a dashboard so you can see which pipelines self-healed and which escalated.

What wmcp.sh provides.

CapabilitySystemHow wmcp.sh wires it
Read source schemaPostgres / MySQL✅ PostgREST or Hasura REST via /integration/openapi
List + read bucket objectsS3 / R2 / GCS✅ Native REST OpenAPI, scoped to list + get
Run EXPLAIN / dry-run SQLSnowflake / BigQuery✅ Warehouse REST adapter, scoped to dry-run methods
Execute SQL (gated)Snowflake / BigQuery✅ Separate MCP tool, gated behind human approval
Inspect a public spec or vendor docsAny URL✅ Generic /api/v1/tools?url=...
Audit log + replayYour warehouse✅ Included on /managed

A schema-drift agent loop.

Python sketch. The agent receives a pipeline ID, inspects source and target schemas, drafts a patch, and dry-runs it. Execute is left to a separate approved runner.

import os, httpx
from anthropic import Anthropic

client = Anthropic()
WMCP = "https://wmcp.sh"

def tools_for(url):
    return httpx.get(f"{WMCP}/api/v1/tools", params={"url": url}).json()["tools"]

tools = (
    tools_for("https://postgrest.acme.internal")              # source DB via REST
    + tools_for("https://s3.us-east-1.amazonaws.com")         # raw bucket
    + tools_for("https://acme.snowflakecomputing.com/api/v2") # warehouse
)

pipeline = os.environ["PIPELINE_ID"]

msg = client.messages.create(
    model="claude-sonnet-4-5",
    max_tokens=2048,
    tools=tools,
    messages=[{"role": "user",
        "content": f"Pipeline {pipeline} failed last run. Compare source and target schemas, "
                   "list new files in the staging bucket, draft a transform SQL, run EXPLAIN, "
                   "and emit a dry-run plan. Do NOT call any execute tool."}],
)

print(msg.content)  # proposed SQL + EXPLAIN cost

Hand-rolled drivers vs MCP gateway.

Four-driver stack:

  • Per-system auth, retries, and error envelopes
  • Model relearns each tool’s quirks
  • Bytes-scanned caps rolled by hand
  • Adding a new source means a new driver

wmcp.sh tool gateway:

  • One /api/v1/tools shape across SQL, storage, warehouse
  • Scope dry-run vs execute as separate MCP tools
  • Edge-cached schemas, sub-50ms tool listing
  • New source = new URL, not new driver

Common questions.

What is an AI data pipeline agent?
A loop that extracts, transforms, and loads — with self-correction on schema drift and plain-English failure explanations.
Should an agent run production ETL?
For triage and drift fixes, yes. For the live load, usually a deterministic runner is still the right answer; the agent proposes, a human or check approves, the runner executes.
What warehouses are supported?
Snowflake, BigQuery, Redshift via Data API — anything with a REST surface and OpenAPI spec.
How do I stop a 10TB query?
EXPLAIN before execute, bytes-scanned caps at the warehouse, dry-run-only MCP scoping.
Does this replace dbt or Airflow?
No. It sits above them, handling discovery and drift work.
Need this built for you?

Hosted pipeline agent with audit + cost caps.

Custom warehouse adapter + hosted MCP at mcp.yourbrand.com + verified badge. Starter $499 one-time · Managed Retainer $999/mo · Enterprise $4,999+/mo.

See /managed → Submit (free)