From a3db706efb7aff583131c46da941ea931dcc915c Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Tue, 12 May 2026 17:44:12 +0300 Subject: [PATCH 1/2] tpcc: add PL/pgSQL stored-procedure mode (--stored-procs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When --stored-procs is set on `go-tpc tpcc run` (postgres driver only), each of the five TPC-C transactions is dispatched as a single server-side CALL: tpcc_new_order / tpcc_payment / tpcc_order_status / tpcc_delivery / tpcc_stock_level. Procedures are installed via CREATE OR REPLACE on the first Run() invocation (sync.Once), so the same dataset can be benched in either mode — no separate prepare step. NEW_ORDER's 1%-rollback path (sentinel item id of -1) is handled server-side via an in-band ROLLBACK inside the procedure: the engine pays for the district/orders/new_order writes and then undoes them, matching the original Go transaction's behaviour exactly. Other procedures run their work and return; the autocommit transaction commits implicitly. --- cmd/go-tpc/tpcc.go | 1 + tpcc/delivery.go | 18 ++ tpcc/new_order.go | 69 +++++++ tpcc/order_status.go | 36 ++++ tpcc/payment.go | 51 +++++ tpcc/procs_pg.go | 444 +++++++++++++++++++++++++++++++++++++++++++ tpcc/stock_level.go | 20 ++ tpcc/workload.go | 42 ++++ 8 files changed, 681 insertions(+) create mode 100644 tpcc/procs_pg.go diff --git a/cmd/go-tpc/tpcc.go b/cmd/go-tpc/tpcc.go index 4dd5d80b..fb6a8909 100644 --- a/cmd/go-tpc/tpcc.go +++ b/cmd/go-tpc/tpcc.go @@ -125,6 +125,7 @@ func registerTpcc(root *cobra.Command) { cmdRun.PersistentFlags().DurationVar(&tpccConfig.MaxMeasureLatency, "max-measure-latency", measurement.DefaultMaxLatency, "max measure latency in millisecond") cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel") cmdRun.Flags().DurationVar(&tpccConfig.ConnRefreshInterval, "conn-refresh-interval", 0, "automatically refresh database connections at specified intervals to balance traffic across new replicas (0 = disabled, e.g., 10s)") + cmdRun.PersistentFlags().BoolVar(&tpccConfig.StoredProcs, "stored-procs", false, "Use PL/pgSQL stored procedures for the 5 TPC-C transactions (postgres driver only)") var cmdCleanup = &cobra.Command{ Use: "cleanup", diff --git a/tpcc/delivery.go b/tpcc/delivery.go index 43cec4c2..ff3febb6 100644 --- a/tpcc/delivery.go +++ b/tpcc/delivery.go @@ -34,6 +34,10 @@ const ( ) func (w *Workloader) runDelivery(ctx context.Context, thread int) error { + if w.cfg.StoredProcs { + return w.runDeliveryProc(ctx, thread) + } + s := getTPCCState(ctx) d := deliveryData{ @@ -171,3 +175,17 @@ func (w *Workloader) runDelivery(ctx context.Context, thread int) error { } return tx.Commit() } + +// runDeliveryProc dispatches DELIVERY as `CALL tpcc_delivery(...)`. +func (w *Workloader) runDeliveryProc(ctx context.Context, thread int) error { + s := getTPCCState(ctx) + wID := randInt(s.R, 1, w.cfg.Warehouses) + oCarrierID := randInt(s.R, 1, 10) + + stmt := s.procStmts[tpccCallDelivery] + _, err := stmt.ExecContext(ctx, wID, oCarrierID, time.Now().Format(timeFormat)) + if err != nil { + return fmt.Errorf("CALL tpcc_delivery failed: %v", err) + } + return nil +} diff --git a/tpcc/new_order.go b/tpcc/new_order.go index 39d7aa01..5117e7a3 100644 --- a/tpcc/new_order.go +++ b/tpcc/new_order.go @@ -6,6 +6,8 @@ import ( "database/sql" "fmt" "time" + + "github.com/lib/pq" ) const ( @@ -118,6 +120,10 @@ type newOrderData struct { } func (w *Workloader) runNewOrder(ctx context.Context, thread int) error { + if w.cfg.StoredProcs { + return w.runNewOrderProc(ctx, thread) + } + s := getTPCCState(ctx) // refer 2.4.1 @@ -309,3 +315,66 @@ func (w *Workloader) runNewOrder(ctx context.Context, thread int) error { } return tx.Commit() } + +// runNewOrderProc is the stored-procedure-mode variant of runNewOrder. It +// generates the same random inputs but dispatches the whole transaction as +// a single `CALL tpcc_new_order(...)`. The procedure handles the +// 1%-rollback case internally via in-band ROLLBACK, so the caller just +// runs the CALL outside of an explicit transaction (autocommit) and any +// failure surfaces as a normal SQL error. +func (w *Workloader) runNewOrderProc(ctx context.Context, thread int) error { + s := getTPCCState(ctx) + + d := newOrderData{ + wID: randInt(s.R, 1, w.cfg.Warehouses), + dID: randInt(s.R, 1, districtPerWarehouse), + cID: randCustomerID(s.R), + oOlCnt: randInt(s.R, 5, 15), + } + rbk := randInt(s.R, 1, 100) + allLocal := 1 + + supplyW := make([]int32, d.oOlCnt) + itemIDs := make([]int32, d.oOlCnt) + quantities := make([]int32, d.oOlCnt) + + seen := make(map[int]struct{}, d.oOlCnt) + for i := 0; i < d.oOlCnt; i++ { + var iID int + if i == d.oOlCnt-1 && rbk == 1 { + iID = -1 + } else { + for { + id := randItemID(s.R) + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + iID = id + break + } + } + itemIDs[i] = int32(iID) + + var supply int + if w.cfg.Warehouses == 1 || randInt(s.R, 1, 100) != 1 { + supply = d.wID + } else { + supply = w.otherWarehouse(ctx, d.wID) + allLocal = 0 + } + supplyW[i] = int32(supply) + quantities[i] = int32(randInt(s.R, 1, 10)) + } + + stmt := s.procStmts[tpccCallNewOrder] + _, err := stmt.ExecContext(ctx, + d.wID, d.dID, d.cID, d.oOlCnt, + pq.Array(supplyW), pq.Array(itemIDs), pq.Array(quantities), + time.Now().Format(timeFormat), allLocal, + ) + if err != nil { + return fmt.Errorf("CALL tpcc_new_order failed: %v", err) + } + return nil +} diff --git a/tpcc/order_status.go b/tpcc/order_status.go index d523d406..055fca67 100644 --- a/tpcc/order_status.go +++ b/tpcc/order_status.go @@ -30,6 +30,10 @@ type orderStatusData struct { } func (w *Workloader) runOrderStatus(ctx context.Context, thread int) error { + if w.cfg.StoredProcs { + return w.runOrderStatusProc(ctx, thread) + } + s := getTPCCState(ctx) d := orderStatusData{ wID: randInt(s.R, 1, w.cfg.Warehouses), @@ -116,3 +120,35 @@ func (w *Workloader) runOrderStatus(ctx context.Context, thread int) error { return tx.Commit() } + +// runOrderStatusProc dispatches ORDER_STATUS as `CALL tpcc_order_status(...)`. +func (w *Workloader) runOrderStatusProc(ctx context.Context, thread int) error { + s := getTPCCState(ctx) + d := orderStatusData{ + wID: randInt(s.R, 1, w.cfg.Warehouses), + dID: randInt(s.R, 1, districtPerWarehouse), + } + + if s.R.Intn(100) < 60 { + d.cLast = randCLast(s.R, s.Buf) + } else { + d.cID = randCustomerID(s.R) + } + + var cIDArg interface{} + var cLastArg interface{} + if d.cID == 0 { + cIDArg = nil + cLastArg = d.cLast + } else { + cIDArg = d.cID + cLastArg = nil + } + + stmt := s.procStmts[tpccCallOrderStatus] + _, err := stmt.ExecContext(ctx, d.wID, d.dID, cIDArg, cLastArg) + if err != nil { + return fmt.Errorf("CALL tpcc_order_status failed: %v", err) + } + return nil +} diff --git a/tpcc/payment.go b/tpcc/payment.go index 5767ab69..4bfe29cd 100644 --- a/tpcc/payment.go +++ b/tpcc/payment.go @@ -64,6 +64,10 @@ type paymentData struct { } func (w *Workloader) runPayment(ctx context.Context, thread int) error { + if w.cfg.StoredProcs { + return w.runPaymentProc(ctx, thread) + } + s := getTPCCState(ctx) d := paymentData{ @@ -180,3 +184,50 @@ func (w *Workloader) runPayment(ctx context.Context, thread int) error { return tx.Commit() } + +// runPaymentProc dispatches PAYMENT as `CALL tpcc_payment(...)`. By-last-name +// vs by-id lookup is selected by passing one of c_id / c_last as NULL. +func (w *Workloader) runPaymentProc(ctx context.Context, thread int) error { + s := getTPCCState(ctx) + + d := paymentData{ + wID: randInt(s.R, 1, w.cfg.Warehouses), + dID: randInt(s.R, 1, districtPerWarehouse), + hAmount: float64(randInt(s.R, 100, 500000)) / float64(100.0), + } + + if s.R.Intn(100) < 60 { + d.cLast = randCLast(s.R, s.Buf) + } else { + d.cID = randCustomerID(s.R) + } + + if w.cfg.Warehouses == 1 || s.R.Intn(100) < 85 { + d.cWID = d.wID + d.cDID = d.dID + } else { + d.cWID = w.otherWarehouse(ctx, d.wID) + d.cDID = randInt(s.R, 1, districtPerWarehouse) + } + + // Pass NULL for whichever lookup key wasn't picked. + var cIDArg interface{} + var cLastArg interface{} + if d.cID == 0 { + cIDArg = nil + cLastArg = d.cLast + } else { + cIDArg = d.cID + cLastArg = nil + } + + stmt := s.procStmts[tpccCallPayment] + _, err := stmt.ExecContext(ctx, + d.wID, d.dID, d.cWID, d.cDID, d.hAmount, + cIDArg, cLastArg, time.Now().Format(timeFormat), + ) + if err != nil { + return fmt.Errorf("CALL tpcc_payment failed: %v", err) + } + return nil +} diff --git a/tpcc/procs_pg.go b/tpcc/procs_pg.go new file mode 100644 index 00000000..195b0819 --- /dev/null +++ b/tpcc/procs_pg.go @@ -0,0 +1,444 @@ +package tpcc + +import ( + "context" + "database/sql" + "fmt" +) + +// procs_pg.go — PostgreSQL stored-procedure mode for TPC-C. +// +// When --stored-procs is set (postgres driver only), each TPC-C transaction is +// implemented as a PL/pgSQL PROCEDURE installed via CREATE OR REPLACE at the +// start of Run(). Workers then invoke them with `CALL tpcc_xxx(...)` instead of +// running the multi-statement transaction client-side. That eliminates ~10 +// round-trips per NEW_ORDER and is what HammerDB / DBT-2 call "stored +// procedure mode". +// +// Procedures are installed declaratively (CREATE OR REPLACE) — no Prepare +// step is required and the same dataset can be benchmarked in either mode. +// +// PROCEDURE (not FUNCTION) is used everywhere so NEW_ORDER's 1%-rollback +// case can issue an in-band ROLLBACK, matching the original Go behaviour +// where district/orders/new_order writes are done and then undone. + +// --------------------------------------------------------------------------- +// CALL strings used by the per-transaction stored-proc wrappers +// --------------------------------------------------------------------------- + +const ( + tpccCallNewOrder = `CALL tpcc_new_order($1,$2,$3,$4,$5,$6,$7,$8,$9)` + tpccCallPayment = `CALL tpcc_payment($1,$2,$3,$4,$5,$6,$7,$8)` + tpccCallOrderStatus = `CALL tpcc_order_status($1,$2,$3,$4)` + tpccCallDelivery = `CALL tpcc_delivery($1,$2,$3)` + tpccCallStockLevel = `CALL tpcc_stock_level($1,$2,$3)` +) + +// --------------------------------------------------------------------------- +// PL/pgSQL bodies +// --------------------------------------------------------------------------- + +// tpcc_new_order(p_w_id, p_d_id, p_c_id, p_ol_cnt, +// p_ol_supply_w_id INT[], p_ol_i_id INT[], p_ol_quantity INT[], +// p_entry_d TIMESTAMP, p_all_local INT) +// +// 1% of calls intentionally pass an item id of -1 in the last position to +// drive the spec-mandated rollback path; the procedure performs the +// district/orders/new_order writes first (so the engine pays for them), +// then issues ROLLBACK and returns normally — caller sees success. +const procNewOrder = ` +CREATE OR REPLACE PROCEDURE tpcc_new_order( + p_w_id INT, + p_d_id INT, + p_c_id INT, + p_ol_cnt INT, + p_ol_supply_w_id INT[], + p_ol_i_id INT[], + p_ol_quantity INT[], + p_entry_d TIMESTAMP, + p_all_local INT +) LANGUAGE plpgsql AS $$ +DECLARE + v_c_discount NUMERIC; + v_w_tax NUMERIC; + v_d_tax NUMERIC; + v_d_next_o_id INT; + v_o_id INT; + + v_prices NUMERIC[] := ARRAY[]::NUMERIC[]; + v_rollback BOOLEAN := FALSE; + + v_s_quantity INT; + v_s_data VARCHAR; + v_s_dist CHAR(24); + v_remote_cnt INT; + v_amount NUMERIC; + v_i_price NUMERIC; + v_i_name VARCHAR; + v_i_data VARCHAR; + i INT; +BEGIN + SELECT c_discount, w_tax + INTO v_c_discount, v_w_tax + FROM customer, warehouse + WHERE w_id = p_w_id + AND c_w_id = w_id + AND c_d_id = p_d_id + AND c_id = p_c_id; + + SELECT d_next_o_id, d_tax + INTO v_d_next_o_id, v_d_tax + FROM district + WHERE d_id = p_d_id AND d_w_id = p_w_id + FOR UPDATE; + + UPDATE district + SET d_next_o_id = v_d_next_o_id + 1 + WHERE d_id = p_d_id AND d_w_id = p_w_id; + + v_o_id := v_d_next_o_id; + + INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) + VALUES (v_o_id, p_d_id, p_w_id, p_c_id, p_entry_d, p_ol_cnt, p_all_local); + + INSERT INTO new_order (no_o_id, no_d_id, no_w_id) + VALUES (v_o_id, p_d_id, p_w_id); + + -- First pass over items: lookup prices, detect intentional rollback. + -- An i_id of -1 marks the 1%-rollback case; any other unknown id is a + -- real error and aborts the procedure with an exception. + FOR i IN 1..p_ol_cnt LOOP + IF p_ol_i_id[i] = -1 THEN + v_rollback := TRUE; + v_prices := array_append(v_prices, 0::NUMERIC); + CONTINUE; + END IF; + + SELECT i_price, i_name, i_data + INTO v_i_price, v_i_name, v_i_data + FROM item + WHERE i_id = p_ol_i_id[i]; + IF NOT FOUND THEN + RAISE EXCEPTION 'item % not found', p_ol_i_id[i]; + END IF; + v_prices := array_append(v_prices, v_i_price); + END LOOP; + + IF v_rollback THEN + ROLLBACK; + RETURN; + END IF; + + -- Second pass: stock lookup, stock update, order_line insert. + FOR i IN 1..p_ol_cnt LOOP + -- District-specific stock distribution column (s_dist_01 .. s_dist_10). + EXECUTE format( + 'SELECT s_quantity, s_data, s_dist_%s FROM stock ' + 'WHERE s_w_id = $1 AND s_i_id = $2 FOR UPDATE', + to_char(p_d_id, 'FM00') + ) + INTO v_s_quantity, v_s_data, v_s_dist + USING p_w_id, p_ol_i_id[i]; + + v_s_quantity := v_s_quantity - p_ol_quantity[i]; + IF v_s_quantity < 10 THEN + v_s_quantity := v_s_quantity + 91; + END IF; + + IF p_ol_supply_w_id[i] <> p_w_id THEN + v_remote_cnt := 1; + ELSE + v_remote_cnt := 0; + END IF; + + UPDATE stock + SET s_quantity = v_s_quantity, + s_ytd = s_ytd + p_ol_quantity[i], + s_order_cnt = s_order_cnt + 1, + s_remote_cnt = s_remote_cnt + v_remote_cnt + WHERE s_i_id = p_ol_i_id[i] AND s_w_id = p_w_id; + + v_amount := p_ol_quantity[i]::NUMERIC * v_prices[i] + * (1 + v_w_tax + v_d_tax) * (1 - v_c_discount); + + INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, + ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) + VALUES (v_o_id, p_d_id, p_w_id, i, p_ol_i_id[i], + p_ol_supply_w_id[i], p_ol_quantity[i], v_amount, v_s_dist); + END LOOP; +END; +$$; +` + +// tpcc_payment(p_w_id, p_d_id, p_c_w_id, p_c_d_id, p_h_amount, +// p_c_id INT (NULL → search by c_last), p_c_last VARCHAR (or NULL), +// p_h_date TIMESTAMP) +const procPayment = ` +CREATE OR REPLACE PROCEDURE tpcc_payment( + p_w_id INT, + p_d_id INT, + p_c_w_id INT, + p_c_d_id INT, + p_h_amount NUMERIC, + p_c_id INT, + p_c_last VARCHAR, + p_h_date TIMESTAMP +) LANGUAGE plpgsql AS $$ +DECLARE + v_w_name VARCHAR; + v_d_name VARCHAR; + v_c_id INT; + v_c_first VARCHAR; + v_c_middle VARCHAR; + v_c_last VARCHAR; + v_c_credit CHAR(2); + v_c_balance NUMERIC; + v_c_data VARCHAR; + v_new_data VARCHAR; + v_h_data VARCHAR; + v_namecnt INT; + v_match_pos INT; +BEGIN + UPDATE district SET d_ytd = d_ytd + p_h_amount + WHERE d_w_id = p_w_id AND d_id = p_d_id; + SELECT d_name INTO v_d_name FROM district + WHERE d_w_id = p_w_id AND d_id = p_d_id; + + UPDATE warehouse SET w_ytd = w_ytd + p_h_amount + WHERE w_id = p_w_id; + SELECT w_name INTO v_w_name FROM warehouse + WHERE w_id = p_w_id; + + IF p_c_id IS NULL THEN + SELECT COUNT(c_id) INTO v_namecnt FROM customer + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_last = p_c_last; + IF v_namecnt = 0 THEN + RAISE EXCEPTION 'customer not found for last=%', p_c_last; + END IF; + IF v_namecnt % 2 = 1 THEN + v_namecnt := v_namecnt + 1; + END IF; + v_match_pos := v_namecnt / 2; + + SELECT c_id INTO v_c_id FROM ( + SELECT c_id, ROW_NUMBER() OVER (ORDER BY c_first) AS rn + FROM customer + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_last = p_c_last + ) sub WHERE sub.rn = v_match_pos; + ELSE + v_c_id := p_c_id; + END IF; + + SELECT c_first, c_middle, c_last, c_credit, c_balance + INTO v_c_first, v_c_middle, v_c_last, v_c_credit, v_c_balance + FROM customer + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_id = v_c_id + FOR UPDATE; + + IF v_c_credit = 'BC' THEN + SELECT c_data INTO v_c_data FROM customer + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_id = v_c_id; + + v_new_data := format('| %s %s %s %s %s $%s %s %s', + v_c_id, p_c_d_id, p_c_w_id, p_d_id, p_w_id, + p_h_amount, + to_char(p_h_date, 'YYYY-MM-DD HH24:MI:SS'), + v_c_data); + IF length(v_new_data) > 500 THEN + v_new_data := substr(v_new_data, 1, 500); + END IF; + + UPDATE customer + SET c_balance = c_balance - p_h_amount, + c_ytd_payment = c_ytd_payment + p_h_amount, + c_payment_cnt = c_payment_cnt + 1, + c_data = v_new_data + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_id = v_c_id; + ELSE + UPDATE customer + SET c_balance = c_balance - p_h_amount, + c_ytd_payment = c_ytd_payment + p_h_amount, + c_payment_cnt = c_payment_cnt + 1 + WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_id = v_c_id; + END IF; + + v_h_data := substr(v_w_name || ' ' || v_d_name, 1, 24); + INSERT INTO history (h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, + h_date, h_amount, h_data) + VALUES (p_c_d_id, p_c_w_id, v_c_id, p_d_id, p_w_id, + p_h_date, p_h_amount, v_h_data); +END; +$$; +` + +// tpcc_order_status(p_w_id, p_d_id, p_c_id INT (NULL → by-name), p_c_last VARCHAR) +// +// Read-only. Mirrors the Go side: count by last name, pick the (cnt/2)th +// by c_first; or look up by c_id directly. Then latest order and its +// order_line rows are read (results discarded — only the engine work matters). +const procOrderStatus = ` +CREATE OR REPLACE PROCEDURE tpcc_order_status( + p_w_id INT, + p_d_id INT, + p_c_id INT, + p_c_last VARCHAR +) LANGUAGE plpgsql AS $$ +DECLARE + v_c_id INT; + v_c_balance NUMERIC; + v_c_first VARCHAR; + v_c_middle VARCHAR; + v_c_last VARCHAR; + v_o_id INT; + v_o_entry_d TIMESTAMP; + v_o_carrier_id INT; + v_namecnt INT; + v_match_pos INT; + r RECORD; +BEGIN + IF p_c_id IS NULL THEN + SELECT COUNT(c_id) INTO v_namecnt FROM customer + WHERE c_w_id = p_w_id AND c_d_id = p_d_id AND c_last = p_c_last; + IF v_namecnt = 0 THEN + RAISE EXCEPTION 'customer not found for last=%', p_c_last; + END IF; + IF v_namecnt % 2 = 1 THEN + v_namecnt := v_namecnt + 1; + END IF; + v_match_pos := v_namecnt / 2; + + SELECT sub.c_balance, sub.c_first, sub.c_middle, sub.c_id + INTO v_c_balance, v_c_first, v_c_middle, v_c_id + FROM ( + SELECT c_balance, c_first, c_middle, c_id, + ROW_NUMBER() OVER (ORDER BY c_first) AS rn + FROM customer + WHERE c_w_id = p_w_id AND c_d_id = p_d_id AND c_last = p_c_last + ) sub WHERE sub.rn = v_match_pos; + ELSE + SELECT c_balance, c_first, c_middle, c_last + INTO v_c_balance, v_c_first, v_c_middle, v_c_last + FROM customer + WHERE c_w_id = p_w_id AND c_d_id = p_d_id AND c_id = p_c_id; + v_c_id := p_c_id; + END IF; + + SELECT o_id, o_carrier_id, o_entry_d + INTO v_o_id, v_o_carrier_id, v_o_entry_d + FROM orders + WHERE o_w_id = p_w_id AND o_d_id = p_d_id AND o_c_id = v_c_id + ORDER BY o_id DESC LIMIT 1; + + -- Drain order_line rows for the latest order so the engine pays the + -- same scan cost as the statement-mode version. + FOR r IN + SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d + FROM order_line + WHERE ol_w_id = p_w_id AND ol_d_id = p_d_id AND ol_o_id = v_o_id + LOOP + NULL; + END LOOP; +END; +$$; +` + +// tpcc_delivery(p_w_id, p_o_carrier_id, p_ol_delivery_d TIMESTAMP) +// +// For each of 10 districts in p_w_id: pick the oldest new_order row, +// delete it, set the order's carrier_id, mark each order_line as delivered +// with p_ol_delivery_d, sum order_line amounts and credit customer balance. +const procDelivery = ` +CREATE OR REPLACE PROCEDURE tpcc_delivery( + p_w_id INT, + p_o_carrier_id INT, + p_ol_delivery_d TIMESTAMP +) LANGUAGE plpgsql AS $$ +DECLARE + d_id INT; + v_o_id INT; + v_c_id INT; + v_amount NUMERIC; +BEGIN + FOR d_id IN 1..10 LOOP + SELECT no_o_id INTO v_o_id + FROM new_order + WHERE no_w_id = p_w_id AND no_d_id = d_id + ORDER BY no_o_id ASC LIMIT 1 + FOR UPDATE; + IF NOT FOUND THEN + CONTINUE; + END IF; + + DELETE FROM new_order + WHERE no_w_id = p_w_id AND no_d_id = d_id AND no_o_id = v_o_id; + + UPDATE orders SET o_carrier_id = p_o_carrier_id + WHERE o_w_id = p_w_id AND o_d_id = d_id AND o_id = v_o_id + RETURNING o_c_id INTO v_c_id; + + UPDATE order_line SET ol_delivery_d = p_ol_delivery_d + WHERE ol_w_id = p_w_id AND ol_d_id = d_id AND ol_o_id = v_o_id; + + SELECT SUM(ol_amount) INTO v_amount + FROM order_line + WHERE ol_w_id = p_w_id AND ol_d_id = d_id AND ol_o_id = v_o_id; + + UPDATE customer + SET c_balance = c_balance + v_amount, + c_delivery_cnt = c_delivery_cnt + 1 + WHERE c_w_id = p_w_id AND c_d_id = d_id AND c_id = v_c_id; + END LOOP; +END; +$$; +` + +// tpcc_stock_level(p_w_id, p_d_id, p_threshold) +// +// Pure read. Number of distinct s_i_id with low stock among the last 20 +// order_line rows of the district. +const procStockLevel = ` +CREATE OR REPLACE PROCEDURE tpcc_stock_level( + p_w_id INT, + p_d_id INT, + p_threshold INT +) LANGUAGE plpgsql AS $$ +DECLARE + v_o_id INT; + v_stock_cnt INT; +BEGIN + SELECT d_next_o_id INTO v_o_id + FROM district WHERE d_w_id = p_w_id AND d_id = p_d_id; + + SELECT COUNT(DISTINCT s_i_id) INTO v_stock_cnt + FROM order_line, stock + WHERE ol_w_id = p_w_id + AND ol_d_id = p_d_id + AND ol_o_id < v_o_id + AND ol_o_id >= v_o_id - 20 + AND s_w_id = p_w_id + AND s_i_id = ol_i_id + AND s_quantity < p_threshold; +END; +$$; +` + +// allStoredProcs is consumed by installStoredProcsPG: each element is a +// standalone CREATE OR REPLACE PROCEDURE statement. +var allStoredProcs = []string{ + procNewOrder, + procPayment, + procOrderStatus, + procDelivery, + procStockLevel, +} + +// installStoredProcsPG runs all the CREATE OR REPLACE PROCEDURE statements. +// Safe to invoke repeatedly — every one of them is idempotent. +func installStoredProcsPG(ctx context.Context, conn *sql.Conn) error { + for _, ddl := range allStoredProcs { + if _, err := conn.ExecContext(ctx, ddl); err != nil { + return fmt.Errorf("install stored procedure failed: %w\nDDL:\n%s", err, ddl) + } + } + return nil +} diff --git a/tpcc/stock_level.go b/tpcc/stock_level.go index 7a13a4a0..e6b44de2 100644 --- a/tpcc/stock_level.go +++ b/tpcc/stock_level.go @@ -2,6 +2,7 @@ package tpcc import ( "context" + "fmt" ) const stockLevelCount = `SELECT /*+ TIDB_INLJ(order_line,stock) */ COUNT(DISTINCT (s_i_id)) stock_count FROM order_line, stock @@ -9,6 +10,10 @@ WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id < ? AND ol_o_id >= ? - 20 AND s_w_ const stockLevelSelectDistrict = `SELECT d_next_o_id FROM district WHERE d_w_id = ? AND d_id = ?` func (w *Workloader) runStockLevel(ctx context.Context, thread int) error { + if w.cfg.StoredProcs { + return w.runStockLevelProc(ctx, thread) + } + s := getTPCCState(ctx) tx, err := w.beginTx(ctx) @@ -38,3 +43,18 @@ func (w *Workloader) runStockLevel(ctx context.Context, thread int) error { return tx.Commit() } + +// runStockLevelProc dispatches STOCK_LEVEL as `CALL tpcc_stock_level(...)`. +func (w *Workloader) runStockLevelProc(ctx context.Context, thread int) error { + s := getTPCCState(ctx) + wID := randInt(s.R, 1, w.cfg.Warehouses) + dID := randInt(s.R, 1, 10) + threshold := randInt(s.R, 10, 20) + + stmt := s.procStmts[tpccCallStockLevel] + _, err := stmt.ExecContext(ctx, wID, dID, threshold) + if err != nil { + return fmt.Errorf("CALL tpcc_stock_level failed: %v", err) + } + return nil +} diff --git a/tpcc/workload.go b/tpcc/workload.go index 6c1983fb..0af1a1fe 100644 --- a/tpcc/workload.go +++ b/tpcc/workload.go @@ -44,6 +44,9 @@ type tpccState struct { stockLevelStmt map[string]*sql.Stmt paymentStmts map[string]*sql.Stmt + // Stored-procedure CALL statements, populated when cfg.StoredProcs is on. + procStmts map[string]*sql.Stmt + // for automatic connection refresh lastConnRefresh time.Time } @@ -91,6 +94,12 @@ type Config struct { // automatic connection refresh interval to balance traffic across new replicas ConnRefreshInterval time.Duration + + // StoredProcs enables PL/pgSQL stored-procedure mode (postgres driver only). + // Each of the five TPC-C transactions is then dispatched as a single + // `CALL tpcc_xxx(...)` server-side. Procedures are installed via CREATE + // OR REPLACE on the first Run() call. + StoredProcs bool } // Workloader is TPCC workload @@ -106,6 +115,11 @@ type Workloader struct { txns []txn + // installProcsOnce installs the PL/pgSQL stored procedures lazily on the + // first Run() call when --stored-procs is set. + installProcsOnce sync.Once + installProcsErr error + // stats rtMeasurement *measurement.Measurement waitTimeMeasurement *measurement.Measurement @@ -117,6 +131,10 @@ func NewWorkloader(db *sql.DB, cfg *Config) (workload.Workloader, error) { panic(fmt.Errorf("failed to connect to database when loading data")) } + if cfg.StoredProcs && cfg.Driver != "postgres" { + panic(fmt.Errorf("--stored-procs is only supported on driver=postgres (got %q)", cfg.Driver)) + } + if cfg.Parts > cfg.Warehouses { panic(fmt.Errorf("number warehouses %d must >= partition %d", cfg.Warehouses, cfg.Parts)) } @@ -202,6 +220,7 @@ func (w *Workloader) CleanupThread(ctx context.Context, threadID int) { closeStmts(s.deliveryStmts) closeStmts(s.stockLevelStmt) closeStmts(s.orderStatusStmts) + closeStmts(s.procStmts) // TODO: close stmts for delivery, order status, and stock level if s.Conn != nil { s.Conn.Close() @@ -267,6 +286,18 @@ func (w *Workloader) Run(ctx context.Context, threadID int) (err error) { s.lastConnRefresh = time.Now() refreshConn = true } + // Install PL/pgSQL stored procedures lazily on the first Run() call. + // CREATE OR REPLACE is idempotent, so this is safe even if other threads + // arrived first. + if w.cfg.StoredProcs { + w.installProcsOnce.Do(func() { + w.installProcsErr = installStoredProcsPG(ctx, s.Conn) + }) + if w.installProcsErr != nil { + return fmt.Errorf("install stored procs: %w", w.installProcsErr) + } + } + if s.newOrderStmts == nil || refreshConn { s.newOrderStmts = map[string]*sql.Stmt{ newOrderSelectCustomer: prepareStmt(w.cfg.Driver, ctx, s.Conn, newOrderSelectCustomer), @@ -318,6 +349,17 @@ func (w *Workloader) Run(ctx context.Context, threadID int) (err error) { stockLevelSelectDistrict: prepareStmt(w.cfg.Driver, ctx, s.Conn, stockLevelSelectDistrict), stockLevelCount: prepareStmt(w.cfg.Driver, ctx, s.Conn, stockLevelCount), } + + if w.cfg.StoredProcs { + // CALL strings already use $N placeholders, so we don't translate. + s.procStmts = map[string]*sql.Stmt{ + tpccCallNewOrder: prepareStmt(w.cfg.Driver, ctx, s.Conn, tpccCallNewOrder), + tpccCallPayment: prepareStmt(w.cfg.Driver, ctx, s.Conn, tpccCallPayment), + tpccCallOrderStatus: prepareStmt(w.cfg.Driver, ctx, s.Conn, tpccCallOrderStatus), + tpccCallDelivery: prepareStmt(w.cfg.Driver, ctx, s.Conn, tpccCallDelivery), + tpccCallStockLevel: prepareStmt(w.cfg.Driver, ctx, s.Conn, tpccCallStockLevel), + } + } } // refer 5.2.4.2 From b8309cf254a7468a4639ce8f5e031dc0449ae20b Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Wed, 13 May 2026 22:52:22 +0300 Subject: [PATCH 2/2] tpcc_new_order: replace dynamic SQL for s_dist with CASE so the plan caches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The district-specific stock distribution column (s_dist_01..s_dist_10) was selected via EXECUTE format(...), which forced PL/pgSQL to parse and plan the SELECT from scratch on every iteration — measurable cost on the hot NEW_ORDER path (10 stock rows per call). Switch to a single static SELECT that picks the right column via CASE p_d_id WHEN N THEN s_dist_NN END. PL/pgSQL caches its plan after a few executions and pg_stat_statements collapses the 10 district variants into one entry. Reading all 10 s_dist columns is essentially free — the stock row is already in cache and locked FOR UPDATE. --- tpcc/procs_pg.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tpcc/procs_pg.go b/tpcc/procs_pg.go index 195b0819..38908b42 100644 --- a/tpcc/procs_pg.go +++ b/tpcc/procs_pg.go @@ -131,14 +131,26 @@ BEGIN -- Second pass: stock lookup, stock update, order_line insert. FOR i IN 1..p_ol_cnt LOOP - -- District-specific stock distribution column (s_dist_01 .. s_dist_10). - EXECUTE format( - 'SELECT s_quantity, s_data, s_dist_%s FROM stock ' - 'WHERE s_w_id = $1 AND s_i_id = $2 FOR UPDATE', - to_char(p_d_id, 'FM00') - ) - INTO v_s_quantity, v_s_data, v_s_dist - USING p_w_id, p_ol_i_id[i]; + -- District-specific stock distribution column picked via CASE so the + -- statement stays static and PL/pgSQL can cache its plan (an EXECUTE + -- format() variant would re-plan every call). + SELECT s_quantity, s_data, + CASE p_d_id + WHEN 1 THEN s_dist_01 + WHEN 2 THEN s_dist_02 + WHEN 3 THEN s_dist_03 + WHEN 4 THEN s_dist_04 + WHEN 5 THEN s_dist_05 + WHEN 6 THEN s_dist_06 + WHEN 7 THEN s_dist_07 + WHEN 8 THEN s_dist_08 + WHEN 9 THEN s_dist_09 + WHEN 10 THEN s_dist_10 + END + INTO v_s_quantity, v_s_data, v_s_dist + FROM stock + WHERE s_w_id = p_w_id AND s_i_id = p_ol_i_id[i] + FOR UPDATE; v_s_quantity := v_s_quantity - p_ol_quantity[i]; IF v_s_quantity < 10 THEN