Skip to content

Commit b100edd

Browse files
committed
Updated cloudsync_changes (wp)
1 parent 5c5cc10 commit b100edd

File tree

2 files changed

+162
-35
lines changed

2 files changed

+162
-35
lines changed

src/cloudsync.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.0"
20+
#define CLOUDSYNC_VERSION "0.9.1"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1

src/postgresql/cloudsync_postgresql.c

Lines changed: 161 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
// Created by Claude Code on 18/12/25.
66
//
77

8-
#define CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA "E'\\\\x0b095f5f5b524c535d5f5f'::bytea"
9-
108
// Define POSIX feature test macros before any includes
119
#define _POSIX_C_SOURCE 200809L
1210

@@ -49,6 +47,8 @@ PG_MODULE_MAGIC;
4947
#define UNUSED_PARAMETER(X) (void)(X)
5048
#endif
5149

50+
#define CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA "E'\\\\x0b095f5f5b524c535d5f5f'::bytea"
51+
5252
// External declaration
5353
Datum database_column_datum (dbvm_t *vm, int index);
5454

@@ -988,8 +988,6 @@ Datum cloudsync_pk_encode (PG_FUNCTION_ARGS) {
988988
PG_RETURN_BYTEA_P(result);
989989
}
990990

991-
// cloudsync_pk_decode - Decode primary key component at given index
992-
PG_FUNCTION_INFO_V1(cloudsync_pk_decode);
993991
typedef struct cloudsync_pk_decode_ctx {
994992
int target_index;
995993
text *result;
@@ -1037,6 +1035,8 @@ static int cloudsync_pk_decode_set_result (void *xdata, int index, int type, int
10371035
return DBRES_OK;
10381036
}
10391037

1038+
// cloudsync_pk_decode - Decode primary key component at given index
1039+
PG_FUNCTION_INFO_V1(cloudsync_pk_decode);
10401040
Datum cloudsync_pk_decode (PG_FUNCTION_ARGS) {
10411041
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
10421042
PG_RETURN_NULL();
@@ -1629,22 +1629,33 @@ static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, Oid target_
16291629

16301630
PG_FUNCTION_INFO_V1(cloudsync_encode_value);
16311631
Datum cloudsync_encode_value(PG_FUNCTION_ARGS) {
1632-
bool isnull = PG_ARGISNULL(0);
1633-
int32 typmod = -1;
1634-
Oid collid = InvalidOid;
1632+
if (PG_ARGISNULL(0)) {
1633+
bytea *null_encoded = cloudsync_encode_null_value();
1634+
PG_RETURN_BYTEA_P(null_encoded);
1635+
}
16351636

1636-
Datum value = PG_GETARG_DATUM(0);
1637-
Oid typeoid = get_fn_expr_argtype(fcinfo->flinfo, 0);
1637+
Oid typeoid = get_fn_expr_argtype(fcinfo->flinfo, 0);
1638+
int32 typmod = -1;
1639+
Oid collid = PG_GET_COLLATION();
16381640

1639-
if (fcinfo->flinfo && fcinfo->flinfo->fn_expr && IsA(fcinfo->flinfo->fn_expr, FuncExpr)) {
1640-
FuncExpr *fexpr = (FuncExpr *) fcinfo->flinfo->fn_expr;
1641-
Node *arg = (Node *) linitial(fexpr->args);
1642-
1643-
typmod = exprTypmod(arg);
1644-
collid = exprCollation(arg);
1641+
if (!OidIsValid(typeoid) || typeoid == ANYELEMENTOID) {
1642+
if (fcinfo->flinfo->fn_expr && IsA(fcinfo->flinfo->fn_expr, FuncExpr)) {
1643+
FuncExpr *fexpr = (FuncExpr *) fcinfo->flinfo->fn_expr;
1644+
if (fexpr->args && list_length(fexpr->args) >= 1) {
1645+
Node *arg = (Node *) linitial(fexpr->args);
1646+
typeoid = exprType(arg);
1647+
typmod = exprTypmod(arg);
1648+
collid = exprCollation(arg);
1649+
}
1650+
}
16451651
}
16461652

1647-
bytea *result = cloudsync_encode_value_from_datum(value, typeoid, typmod, collid, isnull);
1653+
if (!OidIsValid(typeoid) || typeoid == ANYELEMENTOID) {
1654+
ereport(ERROR, (errmsg("cloudsync_encode_any: unable to resolve argument type")));
1655+
}
1656+
1657+
Datum val = PG_GETARG_DATUM(0);
1658+
bytea *result = cloudsync_encode_value_from_datum(val, typeoid, typmod, collid, false);
16481659
PG_RETURN_BYTEA_P(result);
16491660
}
16501661

@@ -1739,7 +1750,7 @@ static char * build_union_sql (void) {
17391750

17401751
StringInfoData buf;
17411752
initStringInfo(&buf);
1742-
1753+
17431754
uint64 ntables = SPI_processed;
17441755
bool first = true;
17451756
for (uint64 i = 0; i < ntables; i++) {
@@ -1762,29 +1773,145 @@ static char * build_union_sql (void) {
17621773
char *quoted_base = quote_literal_cstr(base);
17631774
const char *quoted_nsp = quote_identifier(nsp);
17641775
const char *quoted_rel = quote_identifier(rel);
1765-
1776+
17661777
if (!first) appendStringInfoString(&buf, " UNION ALL ");
17671778
first = false;
17681779

1780+
1781+
/*
1782+
* Build a single SELECT per table that:
1783+
* - reads change rows from <table>_cloudsync (t1)
1784+
* - joins the base table (b) using decoded PK components
1785+
* - computes col_value in-SQL with a CASE over col_name
1786+
*
1787+
* This avoids calling cloudsync_col_value() (and therefore avoids
1788+
* executing extra SPI queries per row), while still honoring RLS:
1789+
* if the base row is not visible, the LEFT JOIN yields NULL and we
1790+
* return the restricted sentinel value (then filtered out).
1791+
*/
1792+
1793+
char *nsp_lit = quote_literal_cstr(nsp);
1794+
char *base_lit = quote_literal_cstr(base);
1795+
1796+
/* Collect PK columns (name + SQL type) */
1797+
StringInfoData pkq;
1798+
initStringInfo(&pkq);
1799+
appendStringInfo(&pkq,
1800+
"SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS typ "
1801+
"FROM pg_index i "
1802+
"JOIN pg_class c ON c.oid = i.indrelid "
1803+
"JOIN pg_namespace n ON n.oid = c.relnamespace "
1804+
"JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = ANY(i.indkey) "
1805+
"WHERE i.indisprimary AND n.nspname = %s AND c.relname = %s "
1806+
"ORDER BY array_position(i.indkey, a.attnum)",
1807+
nsp_lit, base_lit
1808+
);
1809+
int pkrc = SPI_execute(pkq.data, true, 0);
1810+
pfree(pkq.data);
1811+
if (pkrc != SPI_OK_SELECT || SPI_processed == 0) {
1812+
ereport(ERROR, (errmsg("cloudsync: unable to resolve primary key for %s.%s", nsp, base)));
1813+
}
1814+
uint64 npk = SPI_processed;
1815+
1816+
StringInfoData joincond;
1817+
initStringInfo(&joincond);
1818+
for (uint64 k = 0; k < npk; k++) {
1819+
HeapTuple pkt = SPI_tuptable->vals[k];
1820+
TupleDesc pkd = SPI_tuptable->tupdesc;
1821+
char *pkname = SPI_getvalue(pkt, pkd, 1);
1822+
char *pktype = SPI_getvalue(pkt, pkd, 2);
1823+
if (!pkname || !pktype) ereport(ERROR, (errmsg("cloudsync: invalid pk metadata for %s.%s", nsp, base)));
1824+
1825+
if (k > 0) appendStringInfoString(&joincond, " AND ");
1826+
appendStringInfo(&joincond,
1827+
"b.%s = cloudsync_pk_decode(t1.pk, %llu)::%s",
1828+
quote_identifier(pkname),
1829+
(unsigned long long)(k + 1),
1830+
pktype
1831+
);
1832+
pfree(pkname);
1833+
pfree(pktype);
1834+
}
1835+
SPI_freetuptable(SPI_tuptable);
1836+
1837+
/* Collect all base-table columns to build CASE over t1.col_name */
1838+
StringInfoData colq;
1839+
initStringInfo(&colq);
1840+
appendStringInfo(&colq,
1841+
"SELECT a.attname "
1842+
"FROM pg_attribute a "
1843+
"JOIN pg_class c ON c.oid = a.attrelid "
1844+
"JOIN pg_namespace n ON n.oid = c.relnamespace "
1845+
"WHERE a.attnum > 0 AND NOT a.attisdropped "
1846+
" AND n.nspname = %s AND c.relname = %s "
1847+
"ORDER BY a.attnum",
1848+
nsp_lit, base_lit
1849+
);
1850+
int colrc = SPI_execute(colq.data, true, 0);
1851+
pfree(colq.data);
1852+
if (colrc != SPI_OK_SELECT) {
1853+
ereport(ERROR, (errmsg("cloudsync: unable to resolve columns for %s.%s", nsp, base)));
1854+
}
1855+
uint64 ncols = SPI_processed;
1856+
1857+
StringInfoData caseexpr;
1858+
initStringInfo(&caseexpr);
1859+
appendStringInfoString(&caseexpr,
1860+
"CASE "
1861+
"WHEN t1.col_name = '" CLOUDSYNC_TOMBSTONE_VALUE "' THEN cloudsync_encode_value(NULL::text) "
1862+
"WHEN b.ctid IS NULL THEN " CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA " "
1863+
"ELSE CASE t1.col_name "
1864+
);
1865+
1866+
for (uint64 k = 0; k < ncols; k++) {
1867+
HeapTuple ct = SPI_tuptable->vals[k];
1868+
TupleDesc cd = SPI_tuptable->tupdesc;
1869+
char *cname = SPI_getvalue(ct, cd, 1);
1870+
if (!cname) continue;
1871+
1872+
appendStringInfo(&caseexpr,
1873+
"WHEN %s THEN cloudsync_encode_value(b.%s) ",
1874+
quote_literal_cstr(cname),
1875+
quote_identifier(cname)
1876+
);
1877+
pfree(cname);
1878+
}
1879+
SPI_freetuptable(SPI_tuptable);
1880+
1881+
appendStringInfoString(&caseexpr,
1882+
"ELSE " CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA " END END"
1883+
);
1884+
1885+
const char *quoted_base_ident = quote_identifier(base);
1886+
17691887
appendStringInfo(&buf,
1770-
"SELECT * FROM ("
1771-
"SELECT %s AS tbl, t1.pk, t1.col_name, "
1772-
"cloudsync_col_value(%s::text, t1.col_name, t1.pk) AS col_value, "
1773-
"t1.col_version, t1.db_version, site_tbl.site_id, "
1774-
"COALESCE(t2.col_version, 1) AS cl, t1.seq "
1775-
"FROM %s.%s t1 "
1776-
"LEFT JOIN cloudsync_site_id site_tbl ON t1.site_id = site_tbl.id "
1777-
"LEFT JOIN %s.%s t2 "
1778-
" ON t1.pk = t2.pk AND t2.col_name = '%s'"
1779-
") s WHERE s.col_value IS DISTINCT FROM %s",
1780-
quoted_base, quoted_base,
1781-
quoted_nsp, quoted_rel,
1782-
quoted_nsp, quoted_rel,
1783-
CLOUDSYNC_TOMBSTONE_VALUE,
1784-
CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA
1785-
);
1888+
"SELECT * FROM ("
1889+
"SELECT %s AS tbl, t1.pk, t1.col_name, "
1890+
"%s AS col_value, "
1891+
"t1.col_version, t1.db_version, site_tbl.site_id, "
1892+
"COALESCE(t2.col_version, 1) AS cl, t1.seq "
1893+
"FROM %s.%s t1 "
1894+
"LEFT JOIN cloudsync_site_id site_tbl ON t1.site_id = site_tbl.id "
1895+
"LEFT JOIN %s.%s t2 "
1896+
" ON t1.pk = t2.pk AND t2.col_name = '%s' "
1897+
"LEFT JOIN %s.%s b ON %s "
1898+
") s WHERE s.col_value IS DISTINCT FROM %s",
1899+
quoted_base,
1900+
caseexpr.data,
1901+
quoted_nsp, quoted_rel,
1902+
quoted_nsp, quoted_rel,
1903+
CLOUDSYNC_TOMBSTONE_VALUE,
1904+
quoted_nsp, quoted_base_ident,
1905+
joincond.data,
1906+
CLOUDSYNC_RLS_RESTRICTED_VALUE_BYTEA
1907+
);
1908+
1909+
pfree((void*)quoted_base_ident);
1910+
pfree(joincond.data);
1911+
pfree(caseexpr.data);
17861912

17871913
pfree(base);
1914+
17881915
pfree(quoted_base);
17891916
pfree(nsp);
17901917
pfree((void *)quoted_nsp);

0 commit comments

Comments
 (0)