Skip to content

Commit 94c0643

Browse files
committed
Several memory related issues fixed
1 parent 3689f14 commit 94c0643

File tree

4 files changed

+123
-60
lines changed

4 files changed

+123
-60
lines changed

src/cloudsync.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2232,6 +2232,7 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
22322232

22332233
uint32_t rc = LZ4_decompress_safe(buffer, clone, blen, header.expanded_size);
22342234
if (rc <= 0 || rc != header.expanded_size) {
2235+
if (clone) cloudsync_memory_free(clone);
22352236
return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
22362237
}
22372238

src/cloudsync.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.4"
20+
#define CLOUDSYNC_VERSION "0.9.5"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1

src/postgresql/cloudsync_postgresql.c

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,10 +1265,9 @@ Datum cloudsync_update_transfn (PG_FUNCTION_ARGS) {
12651265
} else {
12661266
payload = (cloudsync_update_payload *)PG_GETARG_POINTER(0);
12671267
if (payload->mcxt == NULL || payload->mcxt != allocContext) {
1268-
elog(DEBUG1, "cloudsync_update_transfn repairing payload context payload=%p old_mcxt=%p new_mcxt=%p",
1269-
payload, payload->mcxt, allocContext);
1270-
payload->mcxt = allocContext;
1271-
}
1268+
elog(DEBUG1, "cloudsync_update_transfn repairing payload context payload=%p old_mcxt=%p new_mcxt=%p", payload, payload->mcxt, allocContext);
1269+
payload->mcxt = allocContext;
1270+
}
12721271
}
12731272

12741273
if (!payload) {
@@ -1583,6 +1582,7 @@ static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, Oid target_
15831582
Oid argt[1] = {TEXTOID};
15841583
Datum argv[1];
15851584
char argn[1] = {' '};
1585+
bool argv_is_pointer = false; // Track if argv[0] needs pfree on error
15861586

15871587
switch (dv.dbtype) {
15881588
case DBTYPE_INTEGER:
@@ -1596,13 +1596,15 @@ static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, Oid target_
15961596
case DBTYPE_TEXT:
15971597
argt[0] = TEXTOID;
15981598
argv[0] = PointerGetDatum(cstring_to_text_with_len(dv.pval ? dv.pval : "", (int)(dv.len)));
1599+
argv_is_pointer = true;
15991600
break;
16001601
case DBTYPE_BLOB: {
16011602
argt[0] = BYTEAOID;
16021603
bytea *ba = (bytea *)palloc(VARHDRSZ + dv.len);
16031604
SET_VARSIZE(ba, VARHDRSZ + dv.len);
16041605
if (dv.len > 0) memcpy(VARDATA(ba), dv.pval, (size_t)dv.len);
16051606
argv[0] = PointerGetDatum(ba);
1607+
argv_is_pointer = true;
16061608
} break;
16071609
case DBTYPE_NULL:
16081610
if (out_isnull) *out_isnull = true;
@@ -1617,26 +1619,31 @@ static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, Oid target_
16171619

16181620
// Cast to the target column type from the table schema.
16191621
if (argt[0] == target_typoid) {
1620-
return pgvalue_create(argv[0], target_typoid, -1, InvalidOid, false);
1622+
pgvalue_t *result = pgvalue_create(argv[0], target_typoid, -1, InvalidOid, false);
1623+
if (!result && argv_is_pointer) {
1624+
pfree(DatumGetPointer(argv[0]));
1625+
}
1626+
return result;
16211627
}
16221628

16231629
StringInfoData castq;
16241630
initStringInfo(&castq);
16251631
appendStringInfo(&castq, "SELECT $1::%s", target_typname);
16261632

16271633
int rc = SPI_execute_with_args(castq.data, 1, argt, argv, argn, true, 1);
1628-
if (rc != SPI_OK_SELECT || SPI_processed != 1) {
1629-
if (SPI_tuptable) {
1630-
SPI_freetuptable(SPI_tuptable);
1631-
}
1634+
if (rc != SPI_OK_SELECT || SPI_processed != 1 || !SPI_tuptable) {
1635+
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
1636+
pfree(castq.data);
1637+
if (argv_is_pointer) pfree(DatumGetPointer(argv[0]));
16321638
ereport(ERROR, (errmsg("cloudsync: failed to cast value to %s", target_typname)));
16331639
}
16341640
pfree(castq.data);
16351641

16361642
bool typed_isnull = false;
1643+
// SPI_getbinval uses 1-based column indexing, but TupleDescAttr uses 0-based indexing
16371644
Datum typed_value = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &typed_isnull);
1638-
int32 typmod = TupleDescAttr(SPI_tuptable->tupdesc, 1)->atttypmod;
1639-
Oid collation = TupleDescAttr(SPI_tuptable->tupdesc, 1)->attcollation;
1645+
int32 typmod = TupleDescAttr(SPI_tuptable->tupdesc, 0)->atttypmod;
1646+
Oid collation = TupleDescAttr(SPI_tuptable->tupdesc, 0)->attcollation;
16401647
if (!typed_isnull) {
16411648
Form_pg_attribute att = TupleDescAttr(SPI_tuptable->tupdesc, 0);
16421649
typed_value = datumCopy(typed_value, att->attbyval, att->attlen);
@@ -1769,7 +1776,7 @@ static char * build_union_sql (void) {
17691776
"ORDER BY n.nspname, c.relname";
17701777

17711778
int rc = SPI_execute(sql, true, 0);
1772-
if (rc != SPI_OK_SELECT) {
1779+
if (rc != SPI_OK_SELECT || !SPI_tuptable) {
17731780
ereport(ERROR, (errmsg("cloudsync: SPI_execute failed while listing *_cloudsync")));
17741781
}
17751782

@@ -1855,7 +1862,8 @@ static char * build_union_sql (void) {
18551862
);
18561863
int pkrc = SPI_execute(pkq.data, true, 0);
18571864
pfree(pkq.data);
1858-
if (pkrc != SPI_OK_SELECT || SPI_processed == 0) {
1865+
if (pkrc != SPI_OK_SELECT || (SPI_processed == 0) || (!SPI_tuptable)) {
1866+
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
18591867
ereport(ERROR, (errmsg("cloudsync: unable to resolve primary key for %s.%s", nsp, base)));
18601868
}
18611869
uint64 npk = SPI_processed;
@@ -1867,7 +1875,13 @@ static char * build_union_sql (void) {
18671875
TupleDesc pkd = SPI_tuptable->tupdesc;
18681876
char *pkname = SPI_getvalue(pkt, pkd, 1);
18691877
char *pktype = SPI_getvalue(pkt, pkd, 2);
1870-
if (!pkname || !pktype) ereport(ERROR, (errmsg("cloudsync: invalid pk metadata for %s.%s", nsp, base)));
1878+
if (!pkname || !pktype) {
1879+
if (pkname) pfree(pkname);
1880+
if (pktype) pfree(pktype);
1881+
pfree(joincond.data);
1882+
SPI_freetuptable(SPI_tuptable);
1883+
ereport(ERROR, (errmsg("cloudsync: invalid pk metadata for %s.%s", nsp, base)));
1884+
}
18711885

18721886
if (k > 0) appendStringInfoString(&joincond, " AND ");
18731887
appendStringInfo(&joincond,
@@ -1896,7 +1910,8 @@ static char * build_union_sql (void) {
18961910
);
18971911
int colrc = SPI_execute(colq.data, true, 0);
18981912
pfree(colq.data);
1899-
if (colrc != SPI_OK_SELECT) {
1913+
if (colrc != SPI_OK_SELECT || !SPI_tuptable) {
1914+
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
19001915
ereport(ERROR, (errmsg("cloudsync: unable to resolve columns for %s.%s", nsp, base)));
19011916
}
19021917
uint64 ncols = SPI_processed;

src/postgresql/database_postgresql.c

Lines changed: 91 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -572,11 +572,11 @@ int database_exec (cloudsync_context *data, const char *sql) {
572572
int database_exec_callback (cloudsync_context *data, const char *sql, int (*callback)(void *xdata, int argc, char **values, char **names), void *xdata) {
573573
if (!sql) return cloudsync_set_error(data, "SQL statement is NULL", DBRES_ERROR);
574574
cloudsync_reset_error(data);
575-
575+
576576
int rc;
577577
bool is_error = false;
578578
PG_TRY();
579-
{
579+
{
580580
rc = SPI_execute(sql, true, 0);
581581
}
582582
PG_CATCH();
@@ -595,21 +595,33 @@ int database_exec_callback (cloudsync_context *data, const char *sql, int (*call
595595
// Call callback for each row if provided
596596
if (callback && SPI_tuptable) {
597597
TupleDesc tupdesc = SPI_tuptable->tupdesc;
598-
if (!tupdesc) return cloudsync_set_error(data, "Invalid tuple descriptor", DBRES_ERROR);
599-
598+
if (!tupdesc) {
599+
SPI_freetuptable(SPI_tuptable);
600+
return cloudsync_set_error(data, "Invalid tuple descriptor", DBRES_ERROR);
601+
}
602+
600603
int ncols = tupdesc->natts;
601-
if (ncols <= 0) return DBRES_OK;
604+
if (ncols <= 0) {
605+
SPI_freetuptable(SPI_tuptable);
606+
return DBRES_OK;
607+
}
608+
609+
// IMPORTANT: Save SPI state before any callback can modify it.
610+
// Callbacks may execute SPI queries which overwrite global SPI_tuptable.
611+
// We must copy all data we need BEFORE calling any callbacks.
612+
uint64 nrows = SPI_processed;
613+
SPITupleTable *saved_tuptable = SPI_tuptable;
614+
615+
// No rows to process - free tuptable and return success
616+
if (nrows == 0) {
617+
SPI_freetuptable(saved_tuptable);
618+
return DBRES_OK;
619+
}
602620

603-
// Allocate arrays for column names and values
621+
// Allocate array for column names (shared across all rows)
604622
char **names = cloudsync_memory_alloc(ncols * sizeof(char*));
605623
if (!names) {
606-
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
607-
return DBRES_NOMEM;
608-
}
609-
char **values = cloudsync_memory_alloc(ncols * sizeof(char*));
610-
if (!values) {
611-
cloudsync_memory_free(names);
612-
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
624+
SPI_freetuptable(saved_tuptable);
613625
return DBRES_NOMEM;
614626
}
615627

@@ -623,50 +635,84 @@ int database_exec_callback (cloudsync_context *data, const char *sql, int (*call
623635
}
624636
}
625637

626-
// Process each row
627-
for (uint64 row = 0; row < SPI_processed; row++) {
628-
HeapTuple tuple = SPI_tuptable->vals[row];
629-
if (!tuple) continue;
630-
631-
// Get values for this row
638+
// Pre-extract ALL row values before calling any callbacks.
639+
// This prevents SPI state corruption when callbacks run queries.
640+
char ***all_values = cloudsync_memory_alloc(nrows * sizeof(char**));
641+
if (!all_values) {
632642
for (int i = 0; i < ncols; i++) {
633-
bool isnull;
634-
SPI_getbinval(tuple, tupdesc, i + 1, &isnull);
635-
values[i] = (isnull) ? NULL : SPI_getvalue(tuple, tupdesc, i + 1);
643+
if (names[i]) cloudsync_memory_free(names[i]);
636644
}
645+
cloudsync_memory_free(names);
646+
SPI_freetuptable(saved_tuptable);
647+
return DBRES_NOMEM;
648+
}
637649

638-
// Call user callback
639-
int cb_rc = callback(xdata, ncols, values, names);
640-
641-
// Cleanup values (SPI_getvalue uses palloc)
642-
for (int i = 0; i < ncols; i++) {
643-
if (values[i]) {
644-
pfree(values[i]);
645-
values[i] = NULL;
650+
// Extract values from all tuples
651+
for (uint64 row = 0; row < nrows; row++) {
652+
HeapTuple tuple = saved_tuptable->vals[row];
653+
all_values[row] = cloudsync_memory_alloc(ncols * sizeof(char*));
654+
if (!all_values[row]) {
655+
// Cleanup already allocated rows
656+
for (uint64 r = 0; r < row; r++) {
657+
for (int c = 0; c < ncols; c++) {
658+
if (all_values[r][c]) pfree(all_values[r][c]);
659+
}
660+
cloudsync_memory_free(all_values[r]);
646661
}
647-
}
648-
649-
if (cb_rc != 0) {
650-
// Free our name copies
662+
cloudsync_memory_free(all_values);
651663
for (int i = 0; i < ncols; i++) {
652664
if (names[i]) cloudsync_memory_free(names[i]);
653665
}
654666
cloudsync_memory_free(names);
655-
cloudsync_memory_free(values);
667+
SPI_freetuptable(saved_tuptable);
668+
return DBRES_NOMEM;
669+
}
670+
671+
if (!tuple) {
672+
for (int i = 0; i < ncols; i++) all_values[row][i] = NULL;
673+
continue;
674+
}
675+
676+
for (int i = 0; i < ncols; i++) {
677+
bool isnull;
678+
SPI_getbinval(tuple, tupdesc, i + 1, &isnull);
679+
all_values[row][i] = (isnull) ? NULL : SPI_getvalue(tuple, tupdesc, i + 1);
680+
}
681+
}
682+
683+
// Free SPI_tuptable BEFORE calling callbacks - we have all data we need
684+
SPI_freetuptable(saved_tuptable);
685+
SPI_tuptable = NULL;
686+
687+
// Now process each row - callbacks can safely run SPI queries
688+
int result = DBRES_OK;
689+
for (uint64 row = 0; row < nrows; row++) {
690+
int cb_rc = callback(xdata, ncols, all_values[row], names);
691+
692+
if (cb_rc != 0) {
656693
char errmsg[1024];
657694
snprintf(errmsg, sizeof(errmsg), "database_exec_callback aborted %d", cb_rc);
658-
rc = cloudsync_set_error(data, errmsg, DBRES_ABORT);
659-
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
660-
return rc;
695+
result = cloudsync_set_error(data, errmsg, DBRES_ABORT);
696+
break;
661697
}
662698
}
663699

664-
// Free our name copies
700+
// Cleanup all extracted values
701+
for (uint64 row = 0; row < nrows; row++) {
702+
for (int i = 0; i < ncols; i++) {
703+
if (all_values[row][i]) pfree(all_values[row][i]);
704+
}
705+
cloudsync_memory_free(all_values[row]);
706+
}
707+
cloudsync_memory_free(all_values);
708+
709+
// Free column names
665710
for (int i = 0; i < ncols; i++) {
666711
if (names[i]) cloudsync_memory_free(names[i]);
667712
}
668713
cloudsync_memory_free(names);
669-
cloudsync_memory_free(values);
714+
715+
return result;
670716
}
671717

672718
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
@@ -772,7 +818,7 @@ static int64_t database_count_bind (cloudsync_context *data, const char *sql, co
772818

773819
int64_t count = 0;
774820
int rc = SPI_execute_with_args(sql, 1, argtypes, values, nulls, true, 0);
775-
if (rc >= 0 && SPI_processed > 0) {
821+
if (rc >= 0 && SPI_processed > 0 && SPI_tuptable) {
776822
bool isnull;
777823
Datum d = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
778824
if (!isnull) count = DatumGetInt64(d);
@@ -1334,7 +1380,7 @@ int database_pk_names (cloudsync_context *data, const char *table_name, char ***
13341380
char **pk_names = cloudsync_memory_zeroalloc(n * sizeof(char*));
13351381
if (!pk_names) return DBRES_NOMEM;
13361382

1337-
for (int i = 0; i < n; i++) {
1383+
for (uint64_t i = 0; i < n; i++) {
13381384
HeapTuple tuple = SPI_tuptable->vals[i];
13391385
bool isnull;
13401386
Datum datum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, 1, &isnull);
@@ -1351,6 +1397,7 @@ int database_pk_names (cloudsync_context *data, const char *table_name, char ***
13511397
if (pk_names[j]) cloudsync_memory_free(pk_names[j]);
13521398
}
13531399
cloudsync_memory_free(pk_names);
1400+
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
13541401
return DBRES_NOMEM;
13551402
}
13561403
}
@@ -1843,14 +1890,14 @@ const void *database_column_blob (dbvm_t *vm, int index) {
18431890
return NULL;
18441891
}
18451892

1846-
int len = VARSIZE(ba) - VARHDRSZ;
1893+
Size len = VARSIZE(ba) - VARHDRSZ;
18471894
void *out = palloc(len);
18481895
if (!out) {
18491896
MemoryContextSwitchTo(old);
18501897
return NULL;
18511898
}
18521899

1853-
memcpy(out, VARDATA(ba), len);
1900+
memcpy(out, VARDATA(ba), (size_t)len);
18541901
MemoryContextSwitchTo(old);
18551902

18561903
return out;

0 commit comments

Comments
 (0)