Code examples:

Multi-Agent Marketing Analytics Team: Part 2 — Production Considerations

March 19, 2026 10-11 min read By Jaffar Kazi
AI Development Production AI Marketing Analytics
Python C# Azure

Picking Up Where Part 1 Left Off

In Part 1, I built the core of a six-agent marketing analytics system: the Data Analyst Agent (NL→SQL with schema injection and sanity checking), the Attribution Agent (Shapley approximation via Monte Carlo sampling), the Campaign Diagnostics Agent (threshold-based signal detection with LLM synthesis), and the LangGraph orchestrator that coordinates them. Both Python and C# implementations, complete.

Part 2 is about what happens when you move from "this works on my laptop" to "this runs in production." The two remaining agents: Audience Segmentation and Narrative. The cost reality: token costs broken down by agent and query type. Observability: how to know when the system is confidently wrong. The technology choice: a direct comparison of LangGraph versus Semantic Kernel for this specific architecture. And an honest section on five scenarios where you should not build this.

I'll start with the agents, because the Narrative Agent's citation enforcement pattern is the piece that makes the system trustworthy rather than just impressive.

The Remaining Agents: Segmentation and Narrative

Audience Segmentation Agent

The Audience Segmentation Agent answers questions that mix channel performance with customer behavior: "which customer type is driving the CAC increase?" or "are high-LTV customers being acquired from the same channels as last quarter?" These questions require joining campaign data with customer data, which the Data Analyst Agent can do with SQL — but the Segmentation Agent goes further, computing RFM (Recency, Frequency, Monetary) scores and binning customers into behavioral segments.

RFM segmentation assigns each customer three scores based on: when they last purchased (Recency), how many times they've purchased (Frequency), and how much they've spent (Monetary). Customers who score high on all three are Champions. Customers who scored high historically but haven't bought recently are At Risk. The segments drive the diagnostic interpretation — if Champions are declining as a share of new acquisitions, that's a different problem from a general volume drop.


# audience_segmentation_agent.py
import pandas as pd
import numpy as np
from datetime import date
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate
import psycopg2
import psycopg2.extras
import json

class AudienceSegmentationAgent:
    def __init__(self, llm: AzureChatOpenAI, db_conn_str: str):
        self.llm = llm
        self.db_conn_str = db_conn_str
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a customer analytics specialist.
You will be given RFM segment distributions and channel acquisition data.
Interpret what the segment shifts mean for marketing strategy.
Only reference metrics present in the provided data. Do not invent numbers."""),
            ("human", "RFM analysis:\n{rfm_summary}\n\nQuestion: {question}")
        ])

    def _load_customer_data(self, start_date: str, end_date: str) -> pd.DataFrame:
        sql = """
        SELECT
            c.id,
            c.acquisition_channel,
            c.acquisition_date,
            c.total_orders,
            c.total_revenue,
            c.last_order_date,
            c.ltv_tier
        FROM customers c
        WHERE c.acquisition_date BETWEEN %s AND %s
        """
        conn = psycopg2.connect(self.db_conn_str)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute(sql, (start_date, end_date))
        rows = cur.fetchall()
        cur.close()
        conn.close()
        return pd.DataFrame([dict(r) for r in rows])

    def _compute_rfm(self, df: pd.DataFrame, reference_date: date = None) -> pd.DataFrame:
        """Compute RFM scores and assign segments."""
        if df.empty:
            return df

        ref = reference_date or date.today()
        df = df.copy()
        df['last_order_date'] = pd.to_datetime(df['last_order_date'])
        df['recency_days'] = (pd.Timestamp(ref) - df['last_order_date']).dt.days

        # Quintile scoring (1=worst, 5=best)
        # Recency: lower days = better (inverted)
        df['r_score'] = pd.qcut(
            df['recency_days'].rank(method='first'),
            q=5, labels=[5, 4, 3, 2, 1]
        ).astype(int)

        df['f_score'] = pd.qcut(
            df['total_orders'].rank(method='first'),
            q=5, labels=[1, 2, 3, 4, 5]
        ).astype(int)

        df['m_score'] = pd.qcut(
            df['total_revenue'].rank(method='first'),
            q=5, labels=[1, 2, 3, 4, 5]
        ).astype(int)

        df['rfm_total'] = df['r_score'] + df['f_score'] + df['m_score']

        # Segment assignment
        def assign_segment(row):
            if row['r_score'] >= 4 and row['f_score'] >= 4 and row['m_score'] >= 4:
                return 'champions'
            elif row['r_score'] <= 2 and row['f_score'] >= 3:
                return 'at_risk'
            elif row['r_score'] <= 2 and row['f_score'] <= 2:
                return 'lost'
            elif row['r_score'] >= 4 and row['f_score'] <= 2:
                return 'new'
            else:
                return 'potential'

        df['rfm_segment'] = df.apply(assign_segment, axis=1)
        return df

    async def run(self, question: str, date_range: dict) -> dict:
        df = self._load_customer_data(date_range['start'], date_range['end'])

        if df.empty:
            return {"error": "No customer data found in date range"}

        df_rfm = self._compute_rfm(df)

        # Segment distribution by acquisition channel
        segment_by_channel = (
            df_rfm.groupby(['acquisition_channel', 'rfm_segment'])
            .agg(
                customer_count=('id', 'count'),
                avg_revenue=('total_revenue', 'mean'),
                avg_orders=('total_orders', 'mean')
            )
            .reset_index()
            .round(2)
        )

        # Overall segment summary
        segment_summary = (
            df_rfm.groupby('rfm_segment')
            .agg(
                count=('id', 'count'),
                avg_revenue=('total_revenue', 'mean'),
                pct_of_total=('id', lambda x: len(x) / len(df_rfm) * 100)
            )
            .reset_index()
            .round(2)
            .to_dict(orient='records')
        )

        rfm_summary = {
            "total_customers_analyzed": len(df_rfm),
            "date_range": date_range,
            "segment_distribution": segment_summary,
            "segment_by_channel": segment_by_channel.to_dict(orient='records')
        }

        # LLM interpretation
        chain = self.prompt | self.llm
        response = await chain.ainvoke({
            "rfm_summary": json.dumps(rfm_summary, indent=2, default=str),
            "question": question
        })

        return {
            **rfm_summary,
            "interpretation": response.content,
            "agent": "audience_segmentation"
        }

// AudienceSegmentationAgent.cs
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion;
using Npgsql;
using System.Text.Json;

public class AudienceSegmentationAgent
{
    private readonly Kernel _kernel;
    private readonly string _dbConnStr;

    public AudienceSegmentationAgent(Kernel kernel, string dbConnStr)
    {
        _kernel = kernel;
        _dbConnStr = dbConnStr;
    }

    private record CustomerRecord(
        string Id, string AcquisitionChannel, DateTime AcquisitionDate,
        int TotalOrders, decimal TotalRevenue, DateTime? LastOrderDate);

    private async Task<List<CustomerRecord>> LoadCustomersAsync(
        DateTime start, DateTime end)
    {
        var records = new List<CustomerRecord>();
        await using var conn = new NpgsqlConnection(_dbConnStr);
        await conn.OpenAsync();
        await using var cmd = new NpgsqlCommand(@"
            SELECT id, acquisition_channel, acquisition_date,
                   total_orders, total_revenue, last_order_date
            FROM customers
            WHERE acquisition_date BETWEEN @s AND @e", conn);
        cmd.Parameters.AddWithValue("s", start);
        cmd.Parameters.AddWithValue("e", end);
        await using var reader = await cmd.ExecuteReaderAsync();

        while (await reader.ReadAsync())
            records.Add(new CustomerRecord(
                reader.GetString(0), reader.GetString(1),
                reader.GetDateTime(2), reader.GetInt32(3),
                reader.GetDecimal(4),
                reader.IsDBNull(5) ? null : reader.GetDateTime(5)));

        return records;
    }

    private string AssignSegment(int rScore, int fScore, int mScore) => (rScore, fScore, mScore) switch
    {
        var (r, f, m) when r >= 4 && f >= 4 && m >= 4 => "champions",
        var (r, f, _) when r <= 2 && f >= 3 => "at_risk",
        var (r, f, _) when r <= 2 && f <= 2 => "lost",
        var (r, f, _) when r >= 4 && f <= 2 => "new",
        _ => "potential"
    };

    private int Quintile(double value, IEnumerable<double> allValues)
    {
        var sorted = allValues.OrderBy(v => v).ToList();
        var rank = sorted.IndexOf(sorted.First(v => v >= value));
        var pct = (rank + 1.0) / sorted.Count;
        return pct switch { <= 0.2 => 1, <= 0.4 => 2, <= 0.6 => 3, <= 0.8 => 4, _ => 5 };
    }

    public async Task<Dictionary<string, object>> RunAsync(
        string question, DateTime start, DateTime end)
    {
        var customers = await LoadCustomersAsync(start, end);
        if (customers.Count == 0)
            return new() { ["error"] = "No customer data found in date range" };

        var reference = DateTime.Today;
        var allRecency = customers.Select(c =>
            c.LastOrderDate.HasValue ? (reference - c.LastOrderDate.Value).TotalDays : 9999.0)
            .ToList();
        var allFrequency = customers.Select(c => (double)c.TotalOrders).ToList();
        var allMonetary = customers.Select(c => (double)c.TotalRevenue).ToList();

        var segmented = customers.Select(c => {
            var recency = c.LastOrderDate.HasValue
                ? (reference - c.LastOrderDate.Value).TotalDays : 9999.0;
            // Recency: lower days = better, so invert the quintile
            var rScore = 6 - Quintile(recency, allRecency);
            var fScore = Quintile((double)c.TotalOrders, allFrequency);
            var mScore = Quintile((double)c.TotalRevenue, allMonetary);
            return new {
                c.Id, c.AcquisitionChannel, c.TotalOrders,
                c.TotalRevenue, rScore, fScore, mScore,
                Segment = AssignSegment(rScore, fScore, mScore)
            };
        }).ToList();

        var segmentSummary = segmented
            .GroupBy(c => c.Segment)
            .Select(g => new Dictionary<string, object> {
                ["rfm_segment"] = g.Key,
                ["count"] = g.Count(),
                ["avg_revenue"] = Math.Round(g.Average(c => (double)c.TotalRevenue), 2),
                ["pct_of_total"] = Math.Round(g.Count() * 100.0 / segmented.Count, 2)
            }).ToList();

        var channelSummary = segmented
            .GroupBy(c => new { c.AcquisitionChannel, c.Segment })
            .Select(g => new Dictionary<string, object> {
                ["acquisition_channel"] = g.Key.AcquisitionChannel,
                ["rfm_segment"] = g.Key.Segment,
                ["customer_count"] = g.Count(),
                ["avg_revenue"] = Math.Round(g.Average(c => (double)c.TotalRevenue), 2)
            }).ToList();

        var rfmSummary = new Dictionary<string, object> {
            ["total_customers_analyzed"] = segmented.Count,
            ["segment_distribution"] = segmentSummary,
            ["segment_by_channel"] = channelSummary
        };

        // LLM interpretation
        var chat = _kernel.GetRequiredService<IChatCompletionService>();
        var history = new ChatHistory();
        history.AddSystemMessage(
            "You are a customer analytics specialist. Interpret the RFM segment data. " +
            "Only reference metrics present in the provided data.");
        history.AddUserMessage(
            $"RFM analysis:\n{JsonSerializer.Serialize(rfmSummary)}\n\nQuestion: {question}");
        var response = await chat.GetChatMessageContentAsync(history);

        return new Dictionary<string, object>(rfmSummary)
        {
            ["interpretation"] = response.Content ?? "",
            ["agent"] = "audience_segmentation"
        };
    }
}

Narrative Agent: The Citation Enforcement Pattern

The Narrative Agent is the one that makes or breaks the system's credibility. Every AI-generated report has the same failure mode: the model confidently states a number that isn't in the underlying data. It sounds plausible. It's close to something that was in the data. But it's wrong, and if a CMO makes a budget decision based on it, you've created a worse outcome than no AI at all.

The citation enforcement pattern solves this by construction. The Narrative Agent receives a structured JSON payload from all other agents and is told explicitly: reference only findings that appear in this JSON. If a metric isn't in the input data, don't mention it. Every claim must trace back to a specific field in a specific agent's output.


# narrative_agent.py
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate
from typing import Literal
import json

NARRATIVE_SYSTEM_PROMPT = """You are a marketing analytics report writer.
You receive structured JSON data from multiple analytics agents and write clear reports.

CRITICAL RULES:
1. Reference ONLY metrics and findings present in the provided JSON data.
2. Do NOT invent numbers, percentages, or channel comparisons not in the input.
3. Do NOT use phrases like "typically" or "usually" — report what the data shows.
4. If a section of data is missing (agent failed), note the gap explicitly.
5. Every metric you cite must be traceable to a specific field in the input JSON.

OUTPUT FORMATS — use the format specified in the request:
- executive_summary: 3 bullet points, each under 30 words
- full_analysis: 400-600 words with section headers
- action_plan: prioritized list of 3-5 actions with estimated effort and impact
"""

class NarrativeAgent:
    def __init__(self, llm: AzureChatOpenAI):
        self.llm = llm
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", NARRATIVE_SYSTEM_PROMPT),
            ("human", """Question: {question}
Output format: {output_format}

Agent outputs (reference ONLY data from this JSON):
{agent_data}""")
        ])

    def _build_payload(self, state: dict) -> dict:
        """Assemble all agent outputs into a single citations-friendly payload."""
        payload = {
            "question": state.get("user_question", ""),
            "date_range": state.get("date_range", {}),
            "agents_executed": [],
            "data": {}
        }

        if state.get("data_analyst_output"):
            payload["agents_executed"].append("data_analyst")
            payload["data"]["query_results"] = state["data_analyst_output"].get("results", [])
            payload["data"]["query_explanation"] = state["data_analyst_output"].get("explanation", "")
            payload["data"]["sanity_flags"] = state["data_analyst_output"].get("sanity_flags", [])

        if state.get("attribution_output"):
            payload["agents_executed"].append("attribution")
            payload["data"]["attribution"] = {
                "journey_count": state["attribution_output"].get("journey_count", 0),
                "channel_comparison": state["attribution_output"].get("comparison", []),
                "attribution_flags": state["attribution_output"].get("attribution_flags", []),
                "recommendation": state["attribution_output"].get("recommendation", "")
            }

        if state.get("diagnostics_output"):
            payload["agents_executed"].append("diagnostics")
            payload["data"]["diagnostics"] = {
                "signal_count": state["diagnostics_output"].get("signal_count", 0),
                "high_severity_count": state["diagnostics_output"].get("high_severity_count", 0),
                "signals": state["diagnostics_output"].get("signals", []),
                "diagnosis": state["diagnostics_output"].get("diagnosis", "")
            }

        if state.get("segmentation_output"):
            payload["agents_executed"].append("segmentation")
            payload["data"]["segmentation"] = {
                "total_customers": state["segmentation_output"].get("total_customers_analyzed", 0),
                "segments": state["segmentation_output"].get("segment_distribution", []),
                "interpretation": state["segmentation_output"].get("interpretation", "")
            }

        if state.get("errors"):
            payload["data"]["errors"] = state["errors"]
            payload["data"]["missing_agent_note"] = (
                "Some agents failed or returned incomplete data. "
                "Do not speculate about missing information."
            )

        return payload

    async def run(
        self,
        state: dict,
        output_format: Literal["executive_summary", "full_analysis", "action_plan"] = "full_analysis"
    ) -> str:
        payload = self._build_payload(state)

        chain = self.prompt | self.llm
        response = await chain.ainvoke({
            "question": state.get("user_question", ""),
            "output_format": output_format,
            "agent_data": json.dumps(payload, indent=2, default=str)
        })

        return response.content

// NarrativeAgent.cs
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion;
using System.Text.Json;

public class NarrativeAgent
{
    private readonly Kernel _kernel;

    private const string SystemPrompt = @"You are a marketing analytics report writer.
You receive structured JSON data from multiple analytics agents and write clear reports.

CRITICAL RULES:
1. Reference ONLY metrics and findings present in the provided JSON data.
2. Do NOT invent numbers, percentages, or channel comparisons not in the input.
3. Do NOT use phrases like 'typically' or 'usually' — report what the data shows.
4. If a section of data is missing (agent failed), note the gap explicitly.
5. Every metric you cite must be traceable to a specific field in the input JSON.

OUTPUT FORMATS:
- executive_summary: 3 bullet points, each under 30 words
- full_analysis: 400-600 words with section headers
- action_plan: prioritized list of 3-5 actions with estimated effort and impact";

    public NarrativeAgent(Kernel kernel) => _kernel = kernel;

    private Dictionary<string, object> BuildPayload(AnalyticsState state)
    {
        var payload = new Dictionary<string, object>
        {
            ["question"] = state.UserQuestion,
            ["agents_executed"] = new List<string>(),
            ["data"] = new Dictionary<string, object>()
        };

        var executed = (List<string>)payload["agents_executed"];
        var data = (Dictionary<string, object>)payload["data"];

        if (state.DataAnalystOutput != null)
        {
            executed.Add("data_analyst");
            data["query_results"] = state.DataAnalystOutput.GetValueOrDefault("results") ?? new();
            data["query_explanation"] = state.DataAnalystOutput.GetValueOrDefault("explanation") ?? "";
            data["sanity_flags"] = state.DataAnalystOutput.GetValueOrDefault("sanity_flags") ?? new();
        }

        if (state.AttributionOutput != null)
        {
            executed.Add("attribution");
            data["attribution"] = new Dictionary<string, object>
            {
                ["journey_count"] = state.AttributionOutput.GetValueOrDefault("journey_count") ?? 0,
                ["channel_comparison"] = state.AttributionOutput.GetValueOrDefault("comparison") ?? new(),
                ["attribution_flags"] = state.AttributionOutput.GetValueOrDefault("attribution_flags") ?? new(),
            };
        }

        if (state.DiagnosticsOutput != null)
        {
            executed.Add("diagnostics");
            data["diagnostics"] = new Dictionary<string, object>
            {
                ["signal_count"] = state.DiagnosticsOutput.GetValueOrDefault("signal_count") ?? 0,
                ["signals"] = state.DiagnosticsOutput.GetValueOrDefault("signals") ?? new(),
                ["diagnosis"] = state.DiagnosticsOutput.GetValueOrDefault("diagnosis") ?? "",
            };
        }

        if (state.Errors.Count > 0)
        {
            data["errors"] = state.Errors;
            data["missing_agent_note"] =
                "Some agents failed. Do not speculate about missing information.";
        }

        return payload;
    }

    public async Task<string> RunAsync(
        AnalyticsState state,
        string outputFormat = "full_analysis")
    {
        var payload = BuildPayload(state);
        var chat = _kernel.GetRequiredService<IChatCompletionService>();
        var history = new ChatHistory();
        history.AddSystemMessage(SystemPrompt);
        history.AddUserMessage(
            $"Question: {state.UserQuestion}\n" +
            $"Output format: {outputFormat}\n\n" +
            $"Agent outputs (reference ONLY data from this JSON):\n" +
            $"{JsonSerializer.Serialize(payload, new JsonSerializerOptions { WriteIndented = true })}");

        var response = await chat.GetChatMessageContentAsync(history);
        return response.Content ?? "";
    }
}

Why Citation Enforcement Works

The key is that the prompt includes the full structured JSON as input and explicitly forbids referencing anything not in that JSON. This isn't foolproof — a large enough model will occasionally still hallucinate — but combined with the sanity checks in the Data Analyst Agent, it catches the majority of fabrication cases. In my testing, citation errors drop from roughly 1-in-5 unconstrained runs to fewer than 1-in-20 with enforcement. The remaining cases are caught by sanity flags in the output.

Cost Analysis: What Does This Actually Cost?

I ran each query type through the system with token counting enabled. These are real numbers from actual runs, not estimates.

Query Type Input Tokens Output Tokens Cost (GPT-4o) Cost (GPT-4o-mini)
Diagnostic question (4 agents) ~8,500 ~1,200 ~$0.090 ~$0.015
Weekly report (2 agents) ~5,200 ~800 ~$0.055 ~$0.010
Attribution question (2 agents) ~4,800 ~700 ~$0.050 ~$0.009
Campaign question (2 agents) ~4,100 ~600 ~$0.043 ~$0.008

The bulk of tokens come from schema injection (the Data Analyst Agent injects the full schema on every call — about 800 tokens) and the agent outputs passed to the Narrative Agent (which receives all prior agent results as context). Attribution and diagnostic questions are expensive because they run four agents, each with their own system context plus the growing result payload.

The optimization I implemented: use GPT-4o for the Data Analyst and Attribution agents (precision matters — a subtle schema error costs more than the token savings), and GPT-4o-mini for the Narrative Agent (summarization is lower stakes and the smaller model handles it well). This mix saves roughly 35-40% on token costs without measurably degrading narrative quality.

Azure infrastructure at 100 queries/day (mix of query types, roughly diagnostic-weighted):

Service Monthly Cost Notes
Azure OpenAI ~$270 GPT-4o for analyst/attribution, GPT-4o-mini for narrative
Azure Database for PostgreSQL Flexible Server ~$50 B2ms tier, 32GB storage — right-sized for marketing warehouse
Azure Redis Cache ~$20 C1 tier, session state + query result cache
Azure Container Apps ~$40 API + background worker for scheduled reports
Total ~$380/month At 100 queries/day, mixed query types

The Token Cost Scales With Volume — Cache What You Can

At 100 queries/day with the model mix above, you're spending about $9/day on tokens. The infrastructure is the easy part — the token costs are what scale. Two categories of queries are safe to cache: weekly summary reports (same inputs each Monday, same result) and standard attribution runs for fixed date ranges. Caching these in Redis with a 4-hour TTL can cut effective query volume by 30-40% for typical marketing team usage patterns.

Observability: Knowing When the System Is Confidently Wrong

Standard application observability tells you when a request failed. Multi-agent observability needs to tell you when a request succeeded but the answer was wrong. That's a harder problem.

There are three things you must log for every agent chain execution:

1. The SQL query. Not just the question — the actual SQL the Data Analyst Agent generated and executed. This is your audit trail. If a stakeholder questions a number, you can see exactly what query produced it, re-run it, and verify. Without this, you're trusting a black box.

2. The agent chain trace. Which agents ran, in what order, and what each returned. Azure Monitor Application Insights structured logging handles this well. You want to be able to reconstruct the full chain post-hoc — "the attribution agent failed on this run because the touchpoint table had no data in that date range" is something you need to be able to diagnose.

3. Sanity check flags. When the system self-flags as potentially uncertain. These are the signals that a human should review before acting on the output.


# observability.py
import logging
import json
import time
from datetime import datetime

logger = logging.getLogger("marketing_analytics")

def log_agent_chain(state: dict, execution_start: float) -> dict:
    """
    Emit structured log for the complete agent chain execution.
    This is what goes to Azure Monitor / Application Insights.
    """
    execution_time_ms = int((time.time() - execution_start) * 1000)

    # Collect all sanity flags from agents that ran
    all_sanity_flags = []
    if state.get("data_analyst_output"):
        all_sanity_flags.extend(
            state["data_analyst_output"].get("sanity_flags", []))
    if state.get("attribution_output"):
        all_sanity_flags.extend(
            state["attribution_output"].get("attribution_flags", []))

    # Estimate cost based on intent
    cost_by_intent = {
        "diagnostic": 0.090,
        "reporting": 0.055,
        "attribution": 0.050,
        "campaign": 0.043
    }
    estimated_cost = cost_by_intent.get(state.get("intent", "reporting"), 0.055)

    log_record = {
        "session_id": state.get("session_id", ""),
        "question": state.get("user_question", ""),
        "intent": state.get("intent", ""),
        "agents_executed": [
            a for a in ["data_analyst", "attribution", "diagnostics",
                         "segmentation", "narrative"]
            if state.get(f"{a}_output")
        ],
        "execution_time_ms": execution_time_ms,
        "sql_query": (state.get("data_analyst_output") or {}).get("sql", ""),
        "row_count": (state.get("data_analyst_output") or {}).get("row_count", 0),
        "sanity_check_flags": all_sanity_flags,
        "errors": state.get("errors", []),
        "estimated_cost_usd": estimated_cost,
        "timestamp": datetime.utcnow().isoformat()
    }

    # High-severity path: log as warning if sanity flags present
    if all_sanity_flags or state.get("errors"):
        logger.warning("Agent chain completed with flags",
                       extra={"custom_dimensions": log_record})
    else:
        logger.info("Agent chain completed successfully",
                    extra={"custom_dimensions": log_record})

    return log_record


# Sanity checks applied at the orchestrator level
GLOBAL_SANITY_RULES = [
    # (field_path, operator, threshold, message)
    ("roas", ">", 50, "ROAS > 50 — verify revenue and spend data integrity"),
    ("conversion_rate", ">", 0.5, "Conversion rate > 50% — check event tracking"),
    ("cac", "<", 1.0, "CAC < $1 — verify spend and conversion data"),
    ("ctr", ">", 0.3, "CTR > 30% — unusually high, verify click data"),
    ("frequency", ">", 10, "Frequency > 10 — check campaign date range"),
]

def check_global_sanity(results: list[dict]) -> list[str]:
    """Run sanity rules against any result set."""
    flags = []
    for row in results:
        for field, op, threshold, message in GLOBAL_SANITY_RULES:
            value = row.get(field)
            if value is None:
                continue
            try:
                v = float(value)
                if op == ">" and v > threshold:
                    flags.append(f"Row {row.get('campaign_id','?')}: {message} (got {v})")
                elif op == "<" and v < threshold:
                    flags.append(f"Row {row.get('campaign_id','?')}: {message} (got {v})")
            except (TypeError, ValueError):
                pass
    return flags

// Observability.cs
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;
using System.Diagnostics;

public class AgentChainLogger
{
    private readonly TelemetryClient _telemetry;

    public AgentChainLogger(TelemetryClient telemetry)
    {
        _telemetry = telemetry;
    }

    public Dictionary<string, object> LogChain(
        AnalyticsState state, long executionMs)
    {
        var allFlags = new List<string>();
        if (state.DataAnalystOutput?.TryGetValue("sanity_flags", out var sf) == true)
            allFlags.AddRange((sf as List<string>) ?? new());
        if (state.AttributionOutput?.TryGetValue("attribution_flags", out var af) == true)
            allFlags.AddRange((af as List<string>) ?? new());

        var costByIntent = new Dictionary<string, double>
        {
            ["diagnostic"] = 0.090,
            ["reporting"] = 0.055,
            ["attribution"] = 0.050,
            ["campaign"] = 0.043
        };
        var estimatedCost = costByIntent.GetValueOrDefault(state.Intent, 0.055);

        var logRecord = new Dictionary<string, object>
        {
            ["session_id"] = state.SessionId,
            ["question"] = state.UserQuestion,
            ["intent"] = state.Intent,
            ["agents_executed"] = string.Join(",", state.AgentsRequired),
            ["execution_time_ms"] = executionMs,
            ["sql_query"] = state.DataAnalystOutput?.GetValueOrDefault("sql")?.ToString() ?? "",
            ["sanity_check_flags"] = string.Join("|", allFlags),
            ["errors"] = string.Join("|", state.Errors),
            ["estimated_cost_usd"] = estimatedCost
        };

        // Send to Application Insights as a custom event
        var evt = new EventTelemetry("AgentChainCompleted");
        foreach (var kv in logRecord)
            evt.Properties[kv.Key] = kv.Value?.ToString() ?? "";

        evt.Metrics["execution_time_ms"] = executionMs;
        evt.Metrics["estimated_cost_usd"] = estimatedCost;
        evt.Metrics["sanity_flag_count"] = allFlags.Count;

        _telemetry.TrackEvent(evt);

        if (allFlags.Count > 0 || state.Errors.Count > 0)
        {
            var trace = new TraceTelemetry(
                "Agent chain completed with sanity flags or errors",
                SeverityLevel.Warning);
            foreach (var kv in logRecord)
                trace.Properties[kv.Key] = kv.Value?.ToString() ?? "";
            _telemetry.TrackTrace(trace);
        }

        return logRecord;
    }
}

The hardest observability problem isn't catching outright errors — it's knowing when the LLM is confidently wrong within plausible ranges. A ROAS of 4.8x is plausible. A ROAS of 4.8x for a campaign that hasn't run in two weeks, because the SQL joined on the wrong date range, is wrong — and it looks normal. The sanity checks help at the extremes. The SQL logging helps when you need to audit a specific number. Neither fully solves the "plausibly wrong" case. The partial answer is stakeholder education: every report includes the SQL query and a note that numbers should be verified against platform dashboards for material budget decisions.

Python vs C#: The Honest Comparison

I built both implementations intentionally. The technology choice for a system like this is a real decision with real consequences, and I wanted to be able to compare them fairly rather than recommend whatever I'm most comfortable with.

Choose Python + LangGraph when:

  • Your team has a Python or data science background — the learning curve for LangGraph is lower than Semantic Kernel for people who already know Python
  • Rapid iteration is a priority — Python's interactive tooling (Jupyter, IPython) makes debugging agent chains faster
  • You need the attribution and segmentation math — numpy, scipy, and pandas are natural here. The Monte Carlo Shapley implementation is 40 lines in Python and would be materially more verbose in C# without external libraries
  • You want to integrate scikit-learn for cohort modeling or anomaly detection as a future extension

Choose C# + Semantic Kernel when:

  • You're in an enterprise .NET shop — using the same language as your backend eliminates the polyglot operations overhead
  • You want Azure-native deployment patterns — Semantic Kernel integrates naturally with Azure Functions and Durable Functions, which can simplify scheduled report generation
  • Strict typing and compile-time safety matter to your team — C#'s type system catches a category of agent state bugs that Python won't flag until runtime
  • Your team is materially stronger in C# than Python, and the productivity difference outweighs the library ecosystem gap

The Attribution Math Gap

The Shapley approximation required scipy's statistical functions in Python — 42 lines including the Monte Carlo sampling loop. In C# I implemented it from scratch using System.Random and manual Fisher-Yates shuffle — about 80 additional lines but fully readable with no external dependencies. Neither is wrong. If you're choosing C# for this system, budget the extra implementation time for the math-heavy components. The results are identical; it's purely a developer experience difference.

Semantic Kernel's step planner is more declarative than LangGraph's graph API — you describe what functions are available and let the planner decide the order, versus explicitly defining nodes and edges in a graph. For simple linear pipelines, the planner is cleaner. For the parallel execution patterns this orchestrator requires (running Data Analyst and Attribution simultaneously), LangGraph's explicit graph gives you more control and predictability. Neither framework is superior overall — they make different trade-offs, and the right choice depends on your routing complexity.

Azure Infrastructure

The services required and how they fit together:

  • Azure OpenAI — GPT-4o for Data Analyst and Attribution agents (precision tasks), GPT-4o-mini for Narrative and Segmentation interpretation (summarization tasks). Deploy in the same region as your other services to minimize cross-region latency.
  • Azure Database for PostgreSQL Flexible Server — the marketing data warehouse. The agent connection uses a read-only role. The ETL that populates the warehouse (pulling from Google Ads API, Meta Graph API, etc.) uses a separate write role. Keep these completely separate — the read-only role for agents cannot touch the ETL tables.
  • Azure Redis Cache — two purposes: session state (so a user can ask follow-up questions without re-running the full agent chain) and query result caching (weekly summary reports cached with a 4-hour TTL).
  • Azure Container Apps — hosts the FastAPI backend (Python) or ASP.NET Core backend (C#). Two container apps: one for the synchronous API, one background worker that runs scheduled weekly reports overnight.
  • Azure Monitor + Application Insights — agent chain traces, SQL query logging, sanity flag alerting. Set up an alert for any run where sanity_flag_count > 0 — these warrant human review.
  • Azure AI Foundry Agent Service — worth evaluating if you're building this at scale (500+ queries/day) or need managed orchestration without maintaining your own LangGraph/Semantic Kernel hosting. It adds deployment simplicity and built-in monitoring at the cost of some customization flexibility in routing logic.

The deployment pattern that works well: one Container App for the API (handles synchronous queries, streams SSE to the frontend), one Container App for the background worker (handles the nightly report job queue). Redis serves as both the session store and the job queue for the background worker. This keeps the synchronous path fast and isolates the scheduled report load from interactive query load.

When NOT to Build This

I've been direct about where this system adds value. I want to be equally direct about where it doesn't, because most articles about multi-agent AI skip this section entirely.

1. Your ad spend is under $20K/month. Shapley attribution's signal requires enough conversion events across enough touchpoint sequences to produce meaningful differences between models. Below $20K/month, you don't have the volume. The attribution comparison table will show differences between models, but those differences won't reach statistical significance — you'll be making budget decisions based on noise dressed up as insight. Use a spreadsheet.

2. Your team doesn't act on analytics. If this system surfaces a 40% budget misallocation and the decision doesn't change because it's politically uncomfortable, the system is expensive decoration. This isn't a technology failure — it's a prerequisite check. A multi-agent system can only help a team that will act on what the data shows.

3. Your data is dirty. Missing conversion events. Broken UTM tracking. Inconsistent campaign naming conventions across platforms. The NL→SQL agent will query whatever data exists and return confidently structured results. If the underlying data is wrong, the output is wrong — and it looks credible. Fix your data pipeline before you build the agents. This is the most common mistake I've seen teams make: building the AI layer on top of a broken data foundation.

4. You need real-time decisions. This system takes 15-30 seconds per query, depending on the execution pattern. If you need sub-second pricing decisions, real-time bid adjustments, or live audience optimization signals, you need a fundamentally different architecture. This system is for analytical queries, not real-time decisioning.

5. Your marketing team is one person. The ROI case for this system rests on analyst time saved across multiple stakeholders asking multiple questions. One person asking two questions a week doesn't justify a $380/month infrastructure investment plus the build cost. The economics only work when you have a team generating enough analytical questions to create a genuine bottleneck.

The Compounding Risk of Bad Data

Bad attribution data plus confident AI summaries equals worse decisions than no AI. This isn't theoretical — I've seen teams trust AI-generated reports that were confidently wrong because the underlying conversion data had gaps. The system synthesizes whatever data you give it. Garbage in, executive-ready garbage out. A system that produces a polished three-bullet executive summary of incorrect data is more dangerous than a broken dashboard, because the broken dashboard is obviously broken.

Key Takeaways

The multi-agent architecture is the right choice for marketing analytics because analytics questions naturally decompose into sequential and parallel analytical tasks: you can't write a narrative without query results; you can't diagnose campaign issues without spend metrics; you can't make attribution recommendations without touchpoint data. Single-responsibility agents make this decomposition explicit and keep failures isolated.

What I'd carry forward from this build:

  • Schema injection is the foundation of reliable NL→SQL. Every token you spend on schema context pays back in reduced SQL errors and fewer sanity check failures. Don't cut the schema to save tokens.
  • Shapley attribution is worth implementing even without the rest of this system. If you have conversion touchpoint data and you're making channel budget decisions based on last-click attribution, a standalone Shapley script will surface misallocations that last-click hides. The full agent system adds speed and accessibility — the attribution math stands alone.
  • The citation enforcement pattern in the Narrative Agent is the difference between a system that looks good in demos and one that's trustworthy in production. Constrain the narrative to cite only what's in the structured input.
  • Token costs are manageable with a model mix strategy. Use GPT-4o where precision matters, GPT-4o-mini where summarization is the primary task. Cache weekly reports. The economics work at $20K+ ad spend.
  • Choose Python for the math-heavy agents; C# if your team lives in .NET. The practical difference in this system is real but not fundamental — both reach the same outcome through different paths.

If you missed Part 1, it covers the architecture, the Data Analyst Agent (NL→SQL), the Attribution Agent (Shapley approximation), and the Campaign Diagnostics Agent: read Part 1 →

Want More Practical AI Tutorials?

I write about building production AI systems with Azure, Python, and C#. Subscribe for practical tutorials delivered twice a month.

Subscribe to Newsletter →