Source code for causalboundingengine.algorithms.zhangbareinboim

from itertools import product
import numpy as np
import pandas as pd
import pulp
from causalboundingengine.algorithms.algorithm import Algorithm

[docs] class ZhangBareinboim(Algorithm): def _compute_ATE(self, X: np.ndarray, Y: np.ndarray, Z: np.ndarray) -> tuple[float, float]: # Compute P(Y | do(x=1)) b1 = ZhangBareinboim._bound_causal_effect(X, Y, Z, target_x=1) # Compute P(Y | do(x=0)) b0 = ZhangBareinboim._bound_causal_effect(X, Y, Z, target_x=0) bound_lower = b1["lower_bound"] - b0["upper_bound"] bound_upper = b1["upper_bound"] - b0["lower_bound"] return bound_lower, bound_upper @staticmethod def _bound_causal_effect(X, Y, Z, target_x): """ Solve the LP to find bounds on E[Y_target_x] (the expected outcome if X is set to target_x). Returns a dictionary with 'lower_bound' and 'upper_bound'. Throws an exception if the LPs are invalid or produce an error. """ df = ZhangBareinboim._prepare_data(X, Y, Z) Z_vals, X_vals, P_x_given_z, EY_given_xz = ZhangBareinboim._compute_observational_stats(df) compliance_types = ZhangBareinboim._enumerate_compliance_types(X_vals, Z_vals) # Define LP problems for min and max lp_min = pulp.LpProblem("Min_E[Y{}_bound]".format(target_x), pulp.LpMinimize) lp_max = pulp.LpProblem("Max_E[Y{}_bound]".format(target_x), pulp.LpMaximize) # Define LP variables: # P[c] for each compliance type c, and Q[c,x] for each type and each treatment value P_vars = {c: pulp.LpVariable(f"P_{c}", lowBound=0, upBound=1) for c in compliance_types} Q_vars = {(c, x): pulp.LpVariable(f"Q_{c}_{x}", lowBound=0, upBound=1) for c in compliance_types for x in X_vals} # Constraint: Q(c,x) <= P(c) for all c, x (ensures 0 <= E[Y_x|c] <= 1) for c in compliance_types: for x in X_vals: lp_min += Q_vars[(c, x)] <= P_vars[c] lp_max += Q_vars[(c, x)] <= P_vars[c] # Constraint: Sum_c P(c) = 1 (all compliance type probabilities sum to 1) lp_min += pulp.lpSum(P_vars[c] for c in compliance_types) == 1 lp_max += pulp.lpSum(P_vars[c] for c in compliance_types) == 1 # Constraints to match observed distribution and outcomes: for z in Z_vals: # index of z in the sorted Z_vals list (to pick the corresponding element of compliance type tuple) z_index = Z_vals.index(z) for x in X_vals: # All compliance types c where c(z_index) == x matching_types = [c for c in compliance_types if c[z_index] == x] # Match P(X=x | Z=z): lp_min += pulp.lpSum(P_vars[c] for c in matching_types) == P_x_given_z[(x, z)] lp_max += pulp.lpSum(P_vars[c] for c in matching_types) == P_x_given_z[(x, z)] # Match E[Y | X=x, Z=z] * P(X=x | Z=z): # The right-hand side is the observed joint proportion of (X=x, Z=z) times the average Y in that cell rhs = P_x_given_z[(x, z)] * EY_given_xz[(x, z)] lp_min += pulp.lpSum(Q_vars[(c, x)] for c in matching_types) == rhs lp_max += pulp.lpSum(Q_vars[(c, x)] for c in matching_types) == rhs # Objective: E[Y_target_x] = sum_c Q(c, target_x) lp_min += pulp.lpSum(Q_vars[(c, target_x)] for c in compliance_types) lp_max += pulp.lpSum(Q_vars[(c, target_x)] for c in compliance_types) # Solve the two LPs min_status = lp_min.solve(pulp.PULP_CBC_CMD(msg=False)) if pulp.LpStatus[lp_min.status] != "Optimal": raise Exception(f"LP for lower bound is invalid or infeasible: status={pulp.LpStatus[lp_min.status]}") lower = pulp.value(lp_min.objective) max_status = lp_max.solve(pulp.PULP_CBC_CMD(msg=False)) if pulp.LpStatus[lp_max.status] != "Optimal": raise Exception(f"LP for upper bound is invalid or infeasible: status={pulp.LpStatus[lp_max.status]}") upper = pulp.value(lp_max.objective) return {"lower_bound": lower, "upper_bound": upper} @staticmethod def _prepare_data(X, Y, Z): # Expand the series of arrays into a DataFrame (one row per observation) df = pd.DataFrame({ "Z": Z, "X": X, "Y": Y }) # Ensure any 0-dim numpy arrays are converted to scalars (just in case) for col in ["Z", "X", "Y"]: df[col] = df[col].apply( lambda v: v.item() if isinstance(v, np.ndarray) and v.shape == () else v ) return df @staticmethod def _compute_observational_stats(df): """ Compute the empirical P(X=x | Z=z) and E[Y | X=x, Z=z] from the data frame. Returns: Z_vals: sorted list of unique Z values X_vals: sorted list of unique X values P_x_given_z: dict mapping (x,z) -> probability P(X=x | Z=z) EY_given_xz: dict mapping (x,z) -> conditional mean E[Y | X=x, Z=z] """ Z_vals = sorted(df["Z"].unique().tolist()) X_vals = sorted(df["X"].unique().tolist()) P_x_given_z = {} EY_given_xz = {} for z in Z_vals: df_z = df[df["Z"] == z] n_z = len(df_z) for x in X_vals: df_xz = df_z[df_z["X"] == x] n_xz = len(df_xz) # P(X=x | Z=z) = count(X=x,Z=z) / count(Z=z) P_x_given_z[(x, z)] = (n_xz / n_z) if n_z > 0 else 0.0 # E[Y | X=x, Z=z] = average Y in the subset where X=x, Z=z EY_given_xz[(x, z)] = df_xz["Y"].mean() if n_xz > 0 else 0.0 return Z_vals, X_vals, P_x_given_z, EY_given_xz @staticmethod def _enumerate_compliance_types(X_vals, Z_vals): """ Enumerate all possible compliance types as tuples. Each compliance type is a |Z_vals|-length tuple (t_z1, t_z2, ..., t_zm) representing the treatment X that would be taken for each value of Z. """ return [tuple(pattern) for pattern in product(X_vals, repeat=len(Z_vals))]