Maestro

Pipeline

The Pipeline skill is how agents read and write Maestro’s internal Pipeline (the contacts + activities tables that back the Pipelines UI). Every other skill in the v1 hero score talks to the world — Apollo finds leads, Compose drafts, Gmail sends. The Pipeline skill is the one that records what happened so the Pipelines UI shows the operator what the agent did.

Three operations:

OperationPurposeStatus
find_contactLook up a contact by email within a pipeline. Returns the contact or null. Use before add_contact to avoid duplicates.Shipped
add_contactCreate a new contact in a pipeline. Logs a ‘created’ activity. Auto-links to a company by domain (creates one if missing).Shipped
log_activityAppend an activity to a contact’s timeline. Optionally advances the contact’s stage in the same call.Shipped

How agents typically use it

Cold-leads agent flow:

apollo.find_leads          → list of candidates
for each candidate:
  pipeline.find_contact    → does this email already exist?
  if exists:               → skip, move to next
  apollo.enrich_domain     → company context
  compose.draft_opener     → subject + body
  gmail.send_email         → fire it off
  pipeline.add_contact     → create the contact, stage='contacted'
  pipeline.log_activity    → kind='contacted', payload={subject, preview}

Reply-triage agent flow:

gmail.list_inbox is:unread → recent inbound mail
for each thread:
  gmail.read_thread        → full message contents
  pipeline.find_contact    → match the sender to a known contact
  if no match:             → skip (not from a Maestro-tracked contact)
  compose.classify_reply   → intent, confidence, explanation
  pipeline.log_activity    → kind='triaged', new_stage based on intent,
                              payload={intent, confidence, explanation}
  gmail.label_thread       → apply Maestro/<intent> so we don't reprocess

The Pipelines UI streams these writes live via Postgres LISTEN/NOTIFY, so the operator sees the contact appear, advance stages, accumulate activities — all in real time as the agent works.

Setup

The Pipeline skill needs no manifest secrets — it talks directly to the same Postgres database the runtime is already connected to (via DATABASE_URL from .env). It opens a small per-skill connection pool (1–4 connections) lazily on first call.

If the skill ever returns internal: DATABASE_URL is not set, it means your runtime’s .env doesn’t have the database URL. That’s already required for the runtime itself to start — so this should never happen in a normally-configured install.

Pipeline IDs

Every operation takes pipeline_id (or contact_id, which transitively determines the pipeline). Agents pass them explicitly — usually hardcoded in the agent’s instructions:

“Write all contacts to pipeline pl_saas.”

This keeps agent ↔ pipeline relationships visible in plain text, which makes them easy to audit and easy to change without touching code.

What the operations record

add_contact writes:

  • A contacts row at the requested stage (default new).
  • A companies row if company_domain is provided and the company doesn’t exist yet (de-duped by domain within the workspace).
  • A created activity row capturing that the agent created the contact.
  • last_activity_at is set to now().

log_activity writes:

  • An activities row with the given kind and optional JSON payload.
  • contact.last_activity_at = now() and contact.updated_at = now().
  • If new_stage is provided, contact.stage = new_stage in the same transaction.

Both are transactional — if any step fails, the whole write is rolled back. No half-written contacts.

Activity kinds

Same enum as the database column:

created, enriched, contacted, replied, triaged, booked,
note, bounced, unsubscribed, excluded

Most kinds map 1:1 to a stage of the same name. Convention: when the agent records a contacted activity, it also passes new_stage='contacted' in the same call. Skipping the stage update is rare but legal — useful when an activity is informational (e.g. a note doesn’t advance stage).

Stage transitions

The valid stages in v1 (defined by the B2B SaaS Outbound score):

new → enriching → ready → contacted → replied → triaged → booked
                                                disqualified
                                                unsubscribed

disqualified and unsubscribed are terminal. The Pipeline skill doesn’t enforce transition order — an agent could go from new straight to booked if it had reason to. That flexibility is intentional; rule enforcement belongs in the score definition, not in the writes.

Failure modes

  • bad_input — duplicate email: add_contact rejects when an email already exists in the pipeline. Always call find_contact first. The error message tells the LLM exactly what to do.
  • bad_input — invalid stage / kind: input outside the allowed enums. Pydantic catches this in the SDK before the SQL runs.
  • not_found — pipeline or contact: pipeline_id doesn’t exist (e.g. typo in the agent’s instructions), or contact_id was deleted between writes.
  • internalDATABASE_URL is not set: env config issue, see Setup above.
  • remote_unavailable: Postgres is down or the connection pool can’t open. Retryable.

Verifying

Skills → Pipeline → click Test next to find_contact. Default pipelines from the seed data:

  • pl_saas — the SaaS pipeline with 8 seeded contacts
  • pl_health — empty Healthcare pipeline (good for safe writes)

Try find_contact { pipeline_id: "pl_saas", email: "sarah@acmecloud.io" } — should return the seeded Sarah Chen contact. Try find_contact { pipeline_id: "pl_health", email: "anyone@anywhere.com" } — should return {contact: null}.

Why Pipeline writes are a skill

Pipeline writes are explicit operations the agent calls — not implicit side-effects of other actions. Three reasons this matters:

  • Predictable composition. Agents combine skills freely; without hidden side-effects, what an agent does is exactly what its instructions say.
  • Intent stays explicit. Not every email is part of a tracked outreach campaign. Pairing a gmail.send_email with a pipeline.add_contact should be a deliberate choice, not an automatic one.
  • Symmetry. Reply-triage reads from the Pipeline (looks up senders, checks stages) just as cold-leads writes to it. Both sides as skills keeps the architecture consistent end-to-end.
  • Pipelines — the conceptual model (workspaces → pipelines → contacts → activities).
  • Apollo — finds the leads the cold-leads agent writes via this skill.
  • Compose — produces drafts and classifications that go into activity payloads.
  • Gmail — the actual send / read side.