On this tutorial, we design an end-to-end, production-style analytics and modeling pipeline utilizing Vaex to function effectively on tens of millions of rows with out materializing information in reminiscence. We generate a sensible, large-scale dataset, engineer wealthy behavioral and city-level options utilizing lazy expressions and approximate statistics, and combination insights at scale. We then combine Vaex with scikit-learn to coach and consider a predictive mannequin, demonstrating how Vaex can act because the spine for high-performance exploratory evaluation and machine-learning workflows.
!pip -q set up "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").model.cut up()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
metropolis = rng.selection(cities, dimension=n, substitute=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, dimension=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, dimension=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, dimension=n).astype("int32")
base_income = rng.lognormal(imply=10.6, sigma=0.45, dimension=n).astype("float64")
city_mult = pd.Sequence({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec Metropolis":0.88,"Winnipeg":0.90}).reindex(metropolis).to_numpy()
earnings = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimal(tenure_m,120))).astype("float64")
earnings = np.clip(earnings, 18_000, 420_000)
noise = rng.regular(0, 1, dimension=n).astype("float64")
score_latent = (
0.55*np.log1p(earnings/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.most(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(metropolis == "Vancouver").astype("float64")
+ 0.15*(metropolis == "Toronto").astype("float64")
+ 0.10*(metropolis == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
goal = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(metropolis=metropolis, age=age, tenure_m=tenure_m, tx=tx, earnings=earnings, goal=goal)
df["income_k"] = df.earnings / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.earnings.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))
We generate a big, life like artificial dataset and initialize a Vaex DataFrame to work lazily on tens of millions of rows. We engineer core numerical options immediately as expressions so no intermediate information is materialized. We validate the setup by inspecting schema, row counts, and a small pattern of computed values.
encoder = vaex.ml.LabelEncoder(options=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:okay for okay,v in city_map.gadgets()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.imply("income_k", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.imply("goal", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.rely(binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"metropolis": [inv_city_map[i] for i in vary(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("nCity abstract:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.be a part of(df_city_features, on="metropolis", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score
We encode categorical metropolis information and compute scalable, approximate per-city statistics utilizing binning-based operations. We assemble these aggregates right into a city-level desk and be a part of them again to the primary dataset. We then derive relative options that evaluate every report towards its metropolis context.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(options=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
options = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
mannequin = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(mannequin=mannequin, options=options, goal="goal", prediction_name="pred")
t0 = time.time()
vaex_model.match(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.remodel(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", spherical(fit_s, 3))
print("test_auc:", spherical(float(auc), 4))
print("test_avg_precision:", spherical(float(ap), 4))
We standardize all numeric options utilizing Vaex’s ML utilities and put together a constant characteristic vector for modeling. We cut up the dataset with out loading the whole dataset into reminiscence. We prepare a logistic regression mannequin by Vaex’s sklearn wrapper and consider its predictive efficiency.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], proper=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
raise = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.rely(), "price": vaex.agg.imply("y_true"), "avg_pred": vaex.agg.imply("y_pred")}).type("bucket")
lift_pd = raise.to_pandas_df()
baseline = float(df_test_local["y_true"].imply())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile raise desk:")
print(lift_pd.to_string(index=False))
We analyze mannequin conduct by segmenting predictions into deciles and computing raise metrics. We calculate baseline charges and evaluate them throughout rating buckets to evaluate rating high quality. We summarize the leads to a compact raise desk that displays real-world mannequin diagnostics.
out_dir = "/content material/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.be a part of(out_dir, "customers_vaex.parquet")
state_path = os.path.be a part of(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].pattern(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.take away(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {okay: {str(kk): int(vv) for kk,vv in v.gadgets()} for okay,v in encoder.labels_.gadgets()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.earnings / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.earnings.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.be a part of(df_city_features, on="metropolis", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {okay: int(v) for okay,v in st["encoder_labels"]["city"].gadgets()}
df_reopen["label_encoded_city"] = df_reopen.metropolis.map(labels_city, default_value=-1)
for i, feat in enumerate(st["features_num"]):
mean_i = st["scaler_mean"][i]
std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.remodel(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("nDone.")
We export a big, feature-complete pattern to Parquet and persist the complete preprocessing state for reproducibility. We reload the information and deterministically rebuild all engineered options from saved metadata. We run inference on the reloaded dataset to verify that the pipeline stays secure and deployable end-to-end.
In conclusion, we demonstrated how Vaex permits quick, memory-efficient information processing whereas nonetheless supporting superior characteristic engineering, aggregation, and mannequin integration. We confirmed that approximate statistics, lazy computation, and out-of-core execution enable us to scale cleanly from evaluation to deployment-ready artifacts. By exporting reproducible options and persisting the complete pipeline state, we closed the loop from uncooked information to inference, illustrating how Vaex matches naturally into trendy large-data Python workflows.
Take a look at the Full Codes here. Additionally, be happy to comply with us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
