forked from Percona-Lab/clickhousedb_fdw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclickhousedb_shipable.c
214 lines (186 loc) · 6.85 KB
/
clickhousedb_shipable.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/*-------------------------------------------------------------------------
*
* clickhousedb_shippable.c
* Determine which database objects are shippable to a remote server.
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/clickhousedb_fdw/clickhousedb_shippable.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "clickhousedb_fdw.h"
#include "access/transam.h"
#include "catalog/dependency.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/syscache.h"
/* Hash table for caching the results of shippability lookups */
static HTAB *ShippableCacheHash = NULL;
/*
* Hash key for shippability lookups. We include the FDW server OID because
* decisions may differ per-server. Otherwise, objects are identified by
* their (local!) OID and catalog OID.
*/
typedef struct
{
/* XXX we assume this struct contains no padding bytes */
Oid objid; /* function/operator/type OID */
Oid classid; /* OID of its catalog (pg_proc, etc) */
Oid serverid; /* FDW server we are concerned with */
} ShippableCacheKey;
typedef struct
{
ShippableCacheKey key; /* hash key - must be first */
bool shippable;
} ShippableCacheEntry;
/*
* Flush cache entries when pg_foreign_server is updated.
*
* We do this because of the possibility of ALTER SERVER being used to change
* a server's extensions option. We do not currently bother to check whether
* objects' extension membership changes once a shippability decision has been
* made for them, however.
*/
static void
InvalidateShippableCacheCallback(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
ShippableCacheEntry *entry;
/*
* In principle we could flush only cache entries relating to the
* pg_foreign_server entry being outdated; but that would be more
* complicated, and it's probably not worth the trouble. So for now, just
* flush all entries.
*/
hash_seq_init(&status, ShippableCacheHash);
while ((entry = (ShippableCacheEntry *) hash_seq_search(&status)) != NULL)
{
if (hash_search(ShippableCacheHash,
(void *) &entry->key,
HASH_REMOVE,
NULL) == NULL)
{
elog(ERROR, "hash table corrupted");
}
}
}
/*
* Initialize the backend-lifespan cache of shippability decisions.
*/
static void
InitializeShippableCache(void)
{
HASHCTL ctl;
/* Create the hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(ShippableCacheKey);
ctl.entrysize = sizeof(ShippableCacheEntry);
ShippableCacheHash =
hash_create("Shippability cache", 256, &ctl, HASH_ELEM | HASH_BLOBS);
/* Set up invalidation callback on pg_foreign_server. */
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
InvalidateShippableCacheCallback,
(Datum) 0);
}
/*
* Returns true if given object (operator/function/type) is shippable
* according to the server options.
*
* Right now "shippability" is exclusively a function of whether the object
* belongs to an extension declared by the user. In the future we could
* additionally have a whitelist of functions/operators declared one at a time.
*/
static bool
lookup_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo)
{
Oid extensionOid;
/*
* Is object a member of some extension? (Note: this is a fairly
* expensive lookup, which is why we try to cache the results.)
*/
extensionOid = getExtensionOfObject(classId, objectId);
/* If so, is that extension in fpinfo->shippable_extensions? */
if (OidIsValid(extensionOid) &&
list_member_oid(fpinfo->shippable_extensions, extensionOid))
{
return true;
}
return false;
}
/*
* Return true if given object is one of PostgreSQL's built-in objects.
*
* We use FirstBootstrapObjectId as the cutoff, so that we only consider
* objects with hand-assigned OIDs to be "built in", not for instance any
* function or type defined in the information_schema.
*
* Our constraints for dealing with types are tighter than they are for
* functions or operators: we want to accept only types that are in pg_catalog,
* else deparse_type_name might incorrectly fail to schema-qualify their names.
* Thus we must exclude information_schema types.
*
* XXX there is a problem with this, which is that the set of built-in
* objects expands over time. Something that is built-in to us might not
* be known to the remote server, if it's of an older version. But keeping
* track of that would be a huge exercise.
*/
bool
is_builtin(Oid objectId)
{
return (objectId < FirstBootstrapObjectId);
}
/*
* is_shippable
* Is this object (function/operator/type) shippable to foreign server?
*/
bool
is_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo)
{
ShippableCacheKey key;
ShippableCacheEntry *entry;
/* Built-in objects are presumed shippable. */
if (is_builtin(objectId))
{
return true;
}
/* Otherwise, give up if user hasn't specified any shippable extensions. */
if (fpinfo->shippable_extensions == NIL)
{
return false;
}
/* Initialize cache if first time through. */
if (!ShippableCacheHash)
{
InitializeShippableCache();
}
/* Set up cache hash key */
key.objid = objectId;
key.classid = classId;
key.serverid = fpinfo->server->serverid;
/* See if we already cached the result. */
entry = (ShippableCacheEntry *)
hash_search(ShippableCacheHash,
(void *) &key,
HASH_FIND,
NULL);
if (!entry)
{
/* Not found in cache, so perform shippability lookup. */
bool shippable = lookup_shippable(objectId, classId, fpinfo);
/*
* Don't create a new hash entry until *after* we have the shippable
* result in hand, as the underlying catalog lookups might trigger a
* cache invalidation.
*/
entry = (ShippableCacheEntry *)
hash_search(ShippableCacheHash,
(void *) &key,
HASH_ENTER,
NULL);
entry->shippable = shippable;
}
return entry->shippable;
}