forked from citusdata/citus
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstat_counters.c
More file actions
974 lines (823 loc) · 30 KB
/
stat_counters.c
File metadata and controls
974 lines (823 loc) · 30 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
/*-------------------------------------------------------------------------
*
* stat_counters.c
*
* This file contains functions to track various statistic counters for
* Citus.
*
* We create an array of "BackendStatsSlot"s in shared memory, one for
* each backend. Each backend increments its own stat counters in its
* own slot via IncrementStatCounterForMyDb(). And when a backend exits,
* it saves its stat counters from its slot via
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
* the key is the database id. In other words, each entry of the hash
* table is used to aggregate the stat counters for backends that were
* connected to that database and exited since the last server restart.
* Plus, each entry is responsible for keeping track of the reset
* timestamp for both active and exited backends too.
* Note that today we don't evict the entries of the said hash table
* that point to dropped databases because the wrapper view anyway
* filters them out (thanks to LEFT JOIN) and we don't expect a
* performance hit due to that unless users have a lot of databases
* that are dropped and recreated frequently.
*
* The reason why we save the stat counters for exited backends in the
* shared hash table is that we cannot guarantee that the backend slot
* that was used by an exited backend will be reused by another backend
* connected to the same database. For this reason, we need to save the
* stat counters for exited backends into a shared hash table so that we
* can reset the counters within the corresponding backend slots while
* the backends exit.
*
* When citus_stat_counters() is called, we first aggregate the stat
* counters from the backend slots of all the active backends and then
* we add the aggregated stat counters from the exited backends that
* are stored in the shared hash table. Also, we don't persist backend
* stats on server shutdown, but we might want to do that in the future.
*
* Similarly, when citus_stat_counters_reset() is called, we reset the
* stat counters for the active backends and the exited backends that are
* stored in the shared hash table. Then, it also updates the
* resetTimestamp in the shared hash table entry appropriately. So,
* similarly, when citus_stat_counters() is called, we just report
* resetTimestamp as stats_reset column.
*
* Caveats:
*
* There is chance that citus_stat_counters_reset() might race with a
* backend that is trying to increment one of the counters in its slot
* and as a result it can effectively fail to reset that counter due to
* the reasons documented in IncrementStatCounterForMyDb() function.
* However, this should be a very rare case and we can live with that
* for now.
*
* Also, citus_stat_counters() might observe the counters for a backend
* twice or perhaps unsee it if it's concurrently exiting, depending on
* the order we call CollectActiveBackendStatsIntoHTAB() and
* CollectSavedBackendStatsIntoHTAB() in citus_stat_counters(). However,
* the next call to citus_stat_counters() will see the correct values
* for the counters, so we can live with that for now.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "common/hashfn.h"
#include "port/atomics.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "utils/hsearch.h"
#include "pg_version_compat.h"
#include "distributed/argutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/stats/stat_counters.h"
#include "distributed/tuplestore.h"
/*
* saved backend stats - hash table constants
*
* Configurations used to create the hash table for saved backend stats.
* The places where SAVED_BACKEND_STATS_HASH_MAX_DATABASES is used do not
* impose a hard limit on the number of databases that can be tracked but
* in ShmemInitHash() it's documented that the access efficiency will degrade
* if it is exceeded substantially.
*
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
* a concern.
*/
#define SAVED_BACKEND_STATS_HASH_INIT_DATABASES 8
#define SAVED_BACKEND_STATS_HASH_MAX_DATABASES 1024
/* fixed size array types to store the stat counters */
typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS];
typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS];
/*
* saved backend stats - hash entry definition
*
* This is used to define & access the shared hash table used to aggregate the stat
* counters for the backends exited so far since last server restart. It's also
* responsible for keeping track of the reset timestamp.
*/
typedef struct SavedBackendStatsHashEntry
{
/* hash entry key, must always be the first */
Oid databaseId;
/*
* Needs to be locked whenever we read / write counters or resetTimestamp
* in this struct since we don't use atomic counters for this struct. Plus,
* we want to update the stat counters and resetTimestamp atomically.
*/
slock_t mutex;
/*
* While "counters" only represents the stat counters for exited backends,
* the "resetTimestamp" doesn't only represent the reset timestamp for exited
* backends' stat counters but also for the active backends.
*/
StatCounters counters;
TimestampTz resetTimestamp;
} SavedBackendStatsHashEntry;
/*
* Hash entry definition used for the local hash table created by
* citus_stat_counters() at the runtime to aggregate the stat counters
* across all backends.
*/
typedef struct DatabaseStatsHashEntry
{
/* hash entry key, must always be the first */
Oid databaseId;
StatCounters counters;
TimestampTz resetTimestamp;
} DatabaseStatsHashEntry;
/* definition of a one per-backend stat counters slot in shared memory */
typedef struct BackendStatsSlot
{
AtomicStatCounters counters;
} BackendStatsSlot;
/*
* GUC variable
*
* This only controls whether we track the stat counters or not, via
* IncrementStatCounterForMyDb() and
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
* when the GUC is disabled, we still allocate the shared memory
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
* will still work.
*/
bool EnableStatCounters = ENABLE_STAT_COUNTERS_DEFAULT;
/* saved backend stats - shared memory variables */
static LWLockId *SharedSavedBackendStatsHashLock = NULL;
static HTAB *SharedSavedBackendStatsHash = NULL;
/* per-backend stat counter slots - shared memory array */
BackendStatsSlot *SharedBackendStatsSlotArray = NULL;
/*
* We don't expect the callsites that check this (via
* EnsureStatCountersShmemInitDone()) to be executed before
* StatCountersShmemInit() is done. Plus, once StatCountersShmemInit()
* is done, we also don't expect shared memory variables to be
* initialized improperly. However, we still set this to true only
* once StatCountersShmemInit() is done and if all three of the shared
* memory variables above are initialized properly. And in the callsites
* where these shared memory variables are accessed, we check this
* variable first just to be on the safe side.
*/
static bool StatCountersShmemInitDone = false;
/* saved shmem_startup_hook */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* shared memory init & management */
static void StatCountersShmemInit(void);
static Size SharedBackendStatsSlotArrayShmemSize(void);
/* helper functions for citus_stat_counters() */
static void CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
static void CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
static DatabaseStatsHashEntry * DatabaseStatsHashEntryFindOrCreate(Oid databaseId,
HTAB *databaseStats);
static void StoreDatabaseStatsIntoTupStore(HTAB *databaseStats,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
/* helper functions for citus_stat_counters_reset() */
static bool ResetActiveBackendStats(Oid databaseId);
static void ResetSavedBackendStats(Oid databaseId, bool force);
/* saved backend stats */
static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid
databaseId);
/* sql exports */
PG_FUNCTION_INFO_V1(citus_stat_counters);
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
/*
* EnsureStatCountersShmemInitDone returns true if the shared memory
* data structures used for keeping track of stat counters have been
* properly initialized, otherwise, returns false and emits a warning.
*/
static inline bool
EnsureStatCountersShmemInitDone(void)
{
if (!StatCountersShmemInitDone)
{
ereport(WARNING,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("shared memory for stat counters was not properly initialized")));
return false;
}
return true;
}
/*
* citus_stat_counters returns stats counters for the given database id.
*
* This only returns rows for the databases which have been connected to
* by at least one backend since the last server restart (even if no
* observations have been made for none of the counters or if they were
* reset) and it considers such a database even if it has been dropped later.
*
* When InvalidOid is provided, all such databases are considered; otherwise
* only the database with the given id is considered.
*
* So, as an outcome, when a database id that is different than InvalidOid
* is provided and no backend has connected to it since the last server
* restart, or, if we didn't ever have such a database, then the function
* returns an empty set.
*
* Finally, stats_reset column is set to NULL if the stat counters for the
* database were never reset since the last server restart.
*/
Datum
citus_stat_counters(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/*
* Function's sql definition allows Postgres to silently
* ignore NULL, but we still check.
*/
PG_ENSURE_ARGNOTNULL(0, "database_id");
Oid databaseId = PG_GETARG_OID(0);
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
PG_RETURN_VOID();
}
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.hash = oid_hash;
info.entrysize = sizeof(DatabaseStatsHashEntry);
HTAB *databaseStats = hash_create("Citus Database Stats Collect Hash", 8, &info,
hashFlags);
CollectActiveBackendStatsIntoHTAB(databaseId, databaseStats);
CollectSavedBackendStatsIntoHTAB(databaseId, databaseStats);
StoreDatabaseStatsIntoTupStore(databaseStats, tupleStore, tupleDescriptor);
hash_destroy(databaseStats);
PG_RETURN_VOID();
}
/*
* citus_stat_counters_reset resets Citus stat counters for given database
* id or for the current database if InvalidOid is provided.
*
* If a valid database id is provided, stat counters for that database are
* reset, even if it was dropped later.
*
* Otherwise, if the provided database id is not valid, then the function
* effectively does nothing.
*/
Datum
citus_stat_counters_reset(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/*
* Function's sql definition allows Postgres to silently
* ignore NULL, but we still check.
*/
PG_ENSURE_ARGNOTNULL(0, "database_id");
Oid databaseId = PG_GETARG_OID(0);
/*
* If the database id is InvalidOid, then we assume that
* the caller wants to reset the stat counters for the
* current database.
*/
if (databaseId == InvalidOid)
{
databaseId = MyDatabaseId;
}
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
PG_RETURN_VOID();
}
bool foundAnyBackendsForDb = ResetActiveBackendStats(databaseId);
/*
* Even when we don't have an entry for the given database id in the
* saved backend stats hash table, we still want to create one for
* it to save the resetTimestamp if we currently have at least backend
* connected to it. By providing foundAnyBackendsForDb, we effectively
* let the function do that. Since ResetActiveBackendStats() doesn't
* filter the active backends, foundAnyBackendsForDb being true
* not always means that at least one backend is connected to it right
* now, but it means that we had such a backend at some point in time
* since the last server restart. If all backends refered to in the
* shared array are already exited, then we should already have an
* entry for it in the saved backend stats hash table, so providing
* a "true" wouldn't do anything in that case. Otherwise, if at least
* one backend is still connected to it, providing a "true" will
* effectively create a new entry for it if it doesn't exist yet,
* which is what we actually want to do.
*
* That way, we can save the resetTimestamp for the active backends
* into the relevant entry of the saved backend stats hash table.
* Note that we don't do that for the databases that don't have
* any active backends connected to them because we actually don't
* reset anything for such databases.
*/
ResetSavedBackendStats(databaseId, foundAnyBackendsForDb);
PG_RETURN_VOID();
}
/*
* InitializeStatCountersShmem saves the previous shmem_startup_hook and sets
* up a new shmem_startup_hook for initializing the shared memory data structures
* used for keeping track of stat counters.
*/
void
InitializeStatCountersShmem(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = StatCountersShmemInit;
}
/*
* StatCountersShmemSize calculates and returns shared memory size
* required for the shared memory data structures used for keeping track of
* stat counters.
*/
Size
StatCountersShmemSize(void)
{
Size backendStatsSlotArraySize = SharedBackendStatsSlotArrayShmemSize();
Size savedBackendStatsHashLockSize = MAXALIGN(sizeof(LWLockId));
Size savedBackendStatsHashSize = hash_estimate_size(
SAVED_BACKEND_STATS_HASH_MAX_DATABASES, sizeof(SavedBackendStatsHashEntry));
return add_size(add_size(backendStatsSlotArraySize, savedBackendStatsHashLockSize),
savedBackendStatsHashSize);
}
/*
* IncrementStatCounterForMyDb increments the stat counter for the given statId
* for this backend.
*/
void
IncrementStatCounterForMyDb(int statId)
{
if (!EnableStatCounters)
{
return;
}
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
return;
}
int myBackendSlotIdx = getProcNo_compat(MyProc);
BackendStatsSlot *myBackendStatsSlot =
&SharedBackendStatsSlotArray[myBackendSlotIdx];
/*
* When there cannot be any other writers, incrementing an atomic
* counter via pg_atomic_read_u64() and pg_atomic_write_u64() is
* same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the
* former is cheaper than the latter because the latter has to do
* extra work to deal with concurrent writers.
*
* In our case, the only concurrent writer could be the backend that
* is executing citus_stat_counters_reset(). So, there is chance that
* we read the counter value, then it gets reset by a concurrent call
* made to citus_stat_counters_reset() and then we write the
* incremented value back, by effectively overriding the reset value.
* But this should be a rare case and we can live with that, for the
* sake of lock-free implementation of this function.
*/
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId];
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
}
/*
* SaveBackendStatsIntoSavedBackendStatsHash saves the stat counters
* for this backend into the saved backend stats hash table.
*
* So, this is only supposed to be called when a backend exits.
*
* Also, we do our best to avoid throwing errors in this function because
* this function is called when a backend is exiting and throwing errors
* at that point will cause the backend to crash.
*/
void
SaveBackendStatsIntoSavedBackendStatsHash(void)
{
if (!EnableStatCounters)
{
return;
}
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
return;
}
Oid databaseId = MyDatabaseId;
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (!dbSavedBackendStatsEntry)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(*SharedSavedBackendStatsHashLock);
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
dbSavedBackendStatsEntry =
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
LWLockRelease(*SharedSavedBackendStatsHashLock);
if (!dbSavedBackendStatsEntry)
{
/*
* Couldn't allocate a new hash entry because we're out of
* (shared) memory. In that case, we just log a warning and
* return, instead of throwing an error due to the reasons
* mentioned in function's comment.
*/
ereport(WARNING,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to allocate saved backend stats hash entry")));
return;
}
/* re-acquire the shared lock */
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
}
int myBackendSlotIdx = getProcNo_compat(MyProc);
BackendStatsSlot *myBackendStatsSlot =
&SharedBackendStatsSlotArray[myBackendSlotIdx];
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbSavedBackendStatsEntry->counters[statIdx] +=
pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]);
/*
* Given that this function is only called when a backend exits, later on
* another backend might be assigned to the same slot. So, we reset each
* stat counter of this slot to 0 after saving it.
*/
pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0);
}
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* StatCountersShmemInit initializes the shared memory data structures used
* for keeping track of stat counters.
*/
static void
StatCountersShmemInit(void)
{
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool sharedBackendStatsSlotArrayAlreadyInit = false;
SharedBackendStatsSlotArray = (BackendStatsSlot *)
ShmemInitStruct(
"Citus Shared Backend Stats Slot Array",
SharedBackendStatsSlotArrayShmemSize(),
&sharedBackendStatsSlotArrayAlreadyInit);
bool sharedSavedBackendStatsHashLockAlreadyInit = false;
SharedSavedBackendStatsHashLock = ShmemInitStruct(
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME,
sizeof(LWLockId),
&
sharedSavedBackendStatsHashLockAlreadyInit);
HASHCTL hashInfo = {
.keysize = sizeof(Oid),
.entrysize = sizeof(SavedBackendStatsHashEntry),
.hash = oid_hash,
};
SharedSavedBackendStatsHash = ShmemInitHash("Citus Shared Saved Backend Stats Hash",
SAVED_BACKEND_STATS_HASH_INIT_DATABASES,
SAVED_BACKEND_STATS_HASH_MAX_DATABASES,
&hashInfo,
HASH_ELEM | HASH_FUNCTION);
Assert(sharedBackendStatsSlotArrayAlreadyInit ==
sharedSavedBackendStatsHashLockAlreadyInit);
if (!sharedBackendStatsSlotArrayAlreadyInit)
{
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0);
}
}
*SharedSavedBackendStatsHashLock = &(
GetNamedLWLockTranche(
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME)
)->lock;
}
LWLockRelease(AddinShmemInitLock);
/*
* At this point, they should have been set to non-null values already,
* but we still check them just to be sure.
*/
if (SharedBackendStatsSlotArray &&
SharedSavedBackendStatsHashLock &&
SharedSavedBackendStatsHash)
{
StatCountersShmemInitDone = true;
}
}
/*
* SharedBackendStatsSlotArrayShmemSize returns the size of the shared
* backend stats slot array.
*/
static Size
SharedBackendStatsSlotArrayShmemSize(void)
{
return mul_size(sizeof(BackendStatsSlot), MaxBackends);
}
/*
* CollectActiveBackendStatsIntoHTAB aggregates the stat counters for the
* given database id from all the active backends into the databaseStats
* hash table. The function doesn't actually filter the slots of active
* backends but it's just fine to read the stat counters from all because
* exited backends anyway zero out their stat counters when they exit.
*
* If the database id is InvalidOid, then all the active backends will be
* considered regardless of the database they are connected to.
*
* Otherwise, if the database id is different than InvalidOid, then only
* the active backends whose PGPROC->databaseId is the same as the given
* database id will be considered, if any.
*/
static void
CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
{
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
if (backendProc->pid == 0)
{
/* unused slot */
continue;
}
Oid procDatabaseId = backendProc->databaseId;
if (procDatabaseId == InvalidOid)
{
/*
* Not connected to any database, something like logical replication
* launcher, autovacuum launcher or such.
*/
continue;
}
if (databaseId != InvalidOid && databaseId != procDatabaseId)
{
/* not a database we are interested in */
continue;
}
DatabaseStatsHashEntry *dbStatsEntry =
DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats);
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]);
}
}
}
/*
* CollectSavedBackendStatsIntoHTAB fetches the saved stat counters and
* resetTimestamp for the given database id from the saved backend stats
* hash table and saves them into the databaseStats hash table.
*
* If the database id is InvalidOid, then all the databases that present
* in the saved backend stats hash table will be considered.
*
* Otherwise, if the database id is different than InvalidOid, then only
* the entry that belongs to given database will be considered, if there
* is such an entry.
*/
static void
CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
{
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
if (databaseId != InvalidOid)
{
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (dbSavedBackendStatsEntry)
{
DatabaseStatsHashEntry *dbStatsEntry =
DatabaseStatsHashEntryFindOrCreate(databaseId, databaseStats);
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
dbSavedBackendStatsEntry->counters[statIdx];
}
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
}
else
{
HASH_SEQ_STATUS hashSeqStatus;
hash_seq_init(&hashSeqStatus, SharedSavedBackendStatsHash);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry = NULL;
while ((dbSavedBackendStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
{
DatabaseStatsHashEntry *dbStatsEntry =
DatabaseStatsHashEntryFindOrCreate(dbSavedBackendStatsEntry->databaseId,
databaseStats);
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
dbSavedBackendStatsEntry->counters[statIdx];
}
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
}
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* DatabaseStatsHashEntryFindOrCreate creates a new entry in databaseStats
* hash table for the given database id if it doesn't already exist and
* initializes it, or just returns the existing entry if it does.
*/
static DatabaseStatsHashEntry *
DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats)
{
bool found = false;
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
hash_search(databaseStats, &databaseId,
HASH_ENTER, &found);
if (!found)
{
MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters));
dbStatsEntry->resetTimestamp = 0;
}
return dbStatsEntry;
}
/*
* StoreDatabaseStatsIntoTupStore stores the database stats from the
* databaseStats hash table into given tuple store.
*/
static void
StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor)
{
HASH_SEQ_STATUS hashSeqStatus;
hash_seq_init(&hashSeqStatus, databaseStats);
DatabaseStatsHashEntry *dbStatsEntry = NULL;
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
{
/* +2 for database_id (first) and the stats_reset (last) column */
Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 };
bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 };
values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
uint64 statCounter = dbStatsEntry->counters[statIdx];
values[statIdx + 1] = UInt64GetDatum(statCounter);
}
/* set stats_reset column to NULL if it was never reset */
if (dbStatsEntry->resetTimestamp == 0)
{
isNulls[N_CITUS_STAT_COUNTERS + 1] = true;
}
else
{
values[N_CITUS_STAT_COUNTERS + 1] =
TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
}
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
}
/*
* ResetActiveBackendStats resets the stat counters for the given database
* id for all the active backends. The function doesn't actually filter the
* slots of active backends but it's just fine to reset the stat counters
* for all because doing so just means resetting the stat counters for
* exited backends once again, which were already reset when they exited.
*
* Only active backends whose PGPROC->databaseId is the same as the given
* database id will be considered, if any.
*
* Returns true if any active backend was found.
*/
static bool
ResetActiveBackendStats(Oid databaseId)
{
bool foundAny = false;
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
if (backendProc->pid == 0)
{
/* unused slot */
continue;
}
Oid procDatabaseId = backendProc->databaseId;
if (procDatabaseId == InvalidOid)
{
/*
* not connected to any database, something like logical replication
* launcher, autovacuum launcher, etc.
*/
continue;
}
if (databaseId != procDatabaseId)
{
/* not a database we are interested in */
continue;
}
foundAny = true;
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0);
}
}
return foundAny;
}
/*
* ResetSavedBackendStats resets the saved stat counters for the given
* database id and sets the resetTimestamp for it to the current timestamp.
*
* If force is true, then we first make sure that we have an entry for
* the given database id in the saved backend stats hash table.
*/
static void
ResetSavedBackendStats(Oid databaseId, bool force)
{
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (!dbSavedBackendStatsEntry && force)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(*SharedSavedBackendStatsHashLock);
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
dbSavedBackendStatsEntry =
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
LWLockRelease(*SharedSavedBackendStatsHashLock);
if (!dbSavedBackendStatsEntry)
{
/*
* Couldn't allocate a new hash entry because we're out of
* (shared) memory.
*/
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to allocate saved backend stats hash entry")));
return;
}
/* re-acquire the shared lock */
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
}
/*
* Actually reset the stat counters for the exited backends and set
* the resetTimestamp to the current timestamp if we already had
* an entry for it or if we just created it.
*/
if (dbSavedBackendStatsEntry)
{
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* SavedBackendStatsHashEntryCreateIfNotExists creates a new entry in the
* saved backend stats hash table for the given database id if it doesn't
* already exist and initializes it.
*
* Assumes that the caller has exclusive access to the hash table since it
* performs HASH_ENTER_NULL.
*
* Returns NULL if the entry didn't exist and couldn't be allocated since
* we're out of (shared) memory.
*/
static SavedBackendStatsHashEntry *
SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId)
{
bool found = false;
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_ENTER_NULL,
&found);
if (!dbSavedBackendStatsEntry)
{
/*
* As we provided HASH_ENTER_NULL, returning NULL means OOM.
* In that case, we return and let the caller decide what to do.
*/
return NULL;
}
if (!found)
{
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
dbSavedBackendStatsEntry->resetTimestamp = 0;
SpinLockInit(&dbSavedBackendStatsEntry->mutex);
}
return dbSavedBackendStatsEntry;
}