Skip to content

Commit 8c1a954

Browse files
authored
Merge pull request #2515 from kubernetes-client/copilot/implement-informer-in-python-client
Add SharedInformer implementation to python-client
2 parents bc5e32e + a858453 commit 8c1a954

7 files changed

Lines changed: 1900 additions & 0 deletions

File tree

examples/informer_example.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2026 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Example: use SharedInformer to watch pods in the default namespace.
16+
17+
The informer runs a background daemon thread that keeps a local cache
18+
synchronised with the Kubernetes API server. The main thread is free to
19+
query the cache at any time without worrying about connectivity or retries.
20+
"""
21+
22+
import time
23+
24+
import kubernetes
25+
from kubernetes import config
26+
from kubernetes.client import CoreV1Api
27+
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer
28+
29+
30+
def on_pod_added(pod):
31+
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
32+
print("[ADDED] ", name)
33+
34+
35+
def on_pod_modified(pod):
36+
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
37+
print("[MODIFIED]", name)
38+
39+
40+
def on_pod_deleted(pod):
41+
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
42+
print("[DELETED] ", name)
43+
44+
45+
def main():
46+
config.load_kube_config()
47+
48+
v1 = CoreV1Api()
49+
informer = SharedInformer(
50+
list_func=v1.list_namespaced_pod,
51+
namespace="default",
52+
resync_period=60,
53+
)
54+
55+
informer.add_event_handler(ADDED, on_pod_added)
56+
informer.add_event_handler(MODIFIED, on_pod_modified)
57+
informer.add_event_handler(DELETED, on_pod_deleted)
58+
59+
informer.start()
60+
print('Informer started. Watching pods in "default" namespace ...')
61+
62+
try:
63+
while True:
64+
cached = informer.cache.list()
65+
print("Cached pods: {}".format(len(cached)))
66+
time.sleep(10)
67+
except KeyboardInterrupt:
68+
pass
69+
finally:
70+
informer.stop()
71+
print("Informer stopped.")
72+
73+
74+
if __name__ == "__main__":
75+
main()

kubernetes/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@
2323
from . import stream
2424
from . import utils
2525
from . import leaderelection
26+
from . import informer
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# Copyright 2026 The Kubernetes Authors.
2+
# Licensed under the Apache License, Version 2.0 (the "License").
3+
# End-to-end tests for kubernetes.informer.SharedInformer.
4+
5+
import threading
6+
import time
7+
import unittest
8+
import uuid
9+
10+
from kubernetes.client import api_client
11+
from kubernetes.client.api import core_v1_api
12+
from kubernetes.e2e_test import base
13+
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer
14+
15+
_TIMEOUT = 30
16+
17+
18+
def _uid():
19+
return str(uuid.uuid4())[-12:]
20+
21+
22+
def _cm(name, payload=None):
23+
return {
24+
"apiVersion": "v1",
25+
"kind": "ConfigMap",
26+
"metadata": {"name": name, "labels": {"inf-e2e": "1"}},
27+
"data": payload or {"k": "v"},
28+
}
29+
30+
31+
def _name_of(obj):
32+
if hasattr(obj, "metadata"):
33+
return obj.metadata.name
34+
return (obj.get("metadata") or {}).get("name")
35+
36+
37+
class TestSharedInformerE2E(unittest.TestCase):
38+
39+
@classmethod
40+
def setUpClass(cls):
41+
cls.cfg = base.get_e2e_configuration()
42+
cls.apiclient = api_client.ApiClient(configuration=cls.cfg)
43+
cls.api = core_v1_api.CoreV1Api(cls.apiclient)
44+
45+
def _drop(self, cm_name):
46+
try:
47+
self.api.delete_namespaced_config_map(name=cm_name, namespace="default")
48+
except Exception:
49+
pass
50+
51+
def _expect(self, ev, label):
52+
if not ev.wait(timeout=_TIMEOUT):
53+
self.fail("Timeout waiting for: " + label)
54+
55+
def _wait_in_cache(self, inf, key):
56+
stop = time.monotonic() + _TIMEOUT
57+
while time.monotonic() < stop:
58+
if inf.cache.get_by_key(key) is not None:
59+
return
60+
time.sleep(0.25)
61+
self.fail("key " + key + " never appeared in cache")
62+
63+
def _wait_listed(self, inf):
64+
stop = time.monotonic() + _TIMEOUT
65+
while inf._resource_version is None and time.monotonic() < stop:
66+
time.sleep(0.1)
67+
self.assertIsNotNone(inf._resource_version, "initial list never completed")
68+
69+
# -------------------------------------------------------
70+
71+
def test_cache_populated_after_start(self):
72+
"""Pre-existing ConfigMaps appear in the cache once the informer starts."""
73+
name = "inf-pre-" + _uid()
74+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
75+
self.addCleanup(self._drop, name)
76+
77+
inf = SharedInformer(
78+
list_func=self.api.list_namespaced_config_map,
79+
namespace="default",
80+
label_selector="inf-e2e=1",
81+
)
82+
inf.start()
83+
self.addCleanup(inf.stop)
84+
85+
self._wait_in_cache(inf, "default/" + name)
86+
cached = inf.cache.get_by_key("default/" + name)
87+
self.assertEqual(_name_of(cached), name)
88+
# Verify the cached object actually contains the expected data payload.
89+
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
90+
self.assertEqual(data.get("k"), "v")
91+
92+
def test_added_event_and_cache_entry(self):
93+
"""Creating a ConfigMap fires ADDED and the object appears in the cache."""
94+
name = "inf-add-" + _uid()
95+
seen = threading.Event()
96+
97+
inf = SharedInformer(
98+
list_func=self.api.list_namespaced_config_map,
99+
namespace="default",
100+
label_selector="inf-e2e=1",
101+
)
102+
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
103+
inf.start()
104+
self.addCleanup(inf.stop)
105+
self.addCleanup(self._drop, name)
106+
107+
self._wait_listed(inf)
108+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
109+
self._expect(seen, "ADDED/" + name)
110+
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))
111+
112+
def test_modified_event_and_cache_refresh(self):
113+
"""Patching a ConfigMap fires MODIFIED and the cache holds the updated object."""
114+
name = "inf-mod-" + _uid()
115+
seen = threading.Event()
116+
117+
inf = SharedInformer(
118+
list_func=self.api.list_namespaced_config_map,
119+
namespace="default",
120+
label_selector="inf-e2e=1",
121+
)
122+
inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None)
123+
inf.start()
124+
self.addCleanup(inf.stop)
125+
self.addCleanup(self._drop, name)
126+
127+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
128+
self._wait_in_cache(inf, "default/" + name)
129+
130+
self.api.patch_namespaced_config_map(
131+
name=name, namespace="default", body={"data": {"k": "updated"}}
132+
)
133+
self._expect(seen, "MODIFIED/" + name)
134+
# Verify that the cache now holds the updated data.
135+
cached = inf.cache.get_by_key("default/" + name)
136+
self.assertIsNotNone(cached)
137+
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
138+
self.assertEqual(data.get("k"), "updated")
139+
140+
def test_deleted_event_removes_from_cache(self):
141+
"""Deleting a ConfigMap fires DELETED and removes it from the cache."""
142+
name = "inf-del-" + _uid()
143+
seen = threading.Event()
144+
145+
inf = SharedInformer(
146+
list_func=self.api.list_namespaced_config_map,
147+
namespace="default",
148+
label_selector="inf-e2e=1",
149+
)
150+
inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None)
151+
inf.start()
152+
self.addCleanup(inf.stop)
153+
154+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
155+
self._wait_in_cache(inf, "default/" + name)
156+
157+
self.api.delete_namespaced_config_map(name=name, namespace="default")
158+
self._expect(seen, "DELETED/" + name)
159+
self.assertIsNone(inf.cache.get_by_key("default/" + name))
160+
161+
def test_resource_version_advances(self):
162+
"""The stored resourceVersion advances after watch events are received."""
163+
name = "inf-rv-" + _uid()
164+
seen = threading.Event()
165+
166+
inf = SharedInformer(
167+
list_func=self.api.list_namespaced_config_map,
168+
namespace="default",
169+
label_selector="inf-e2e=1",
170+
)
171+
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
172+
inf.start()
173+
self.addCleanup(inf.stop)
174+
self.addCleanup(self._drop, name)
175+
176+
self._wait_listed(inf)
177+
rv_before = int(inf._resource_version)
178+
179+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
180+
self._expect(seen, "ADDED/" + name)
181+
self.assertGreater(int(inf._resource_version), rv_before)
182+
183+
184+
def test_resync_fires_modified_for_existing_objects(self):
185+
"""Periodic resync re-lists from the API server and fires MODIFIED for cached objects.
186+
187+
A short resync_period (5 s) is used so the test completes quickly.
188+
After the informer has cached the ConfigMap via the initial list, we
189+
wait for a MODIFIED event that is fired by the resync, verifying that
190+
the resync actually contacts the API server and triggers callbacks.
191+
"""
192+
name = "inf-rsync-" + _uid()
193+
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
194+
self.addCleanup(self._drop, name)
195+
196+
added = threading.Event()
197+
resynced = threading.Event()
198+
199+
inf = SharedInformer(
200+
list_func=self.api.list_namespaced_config_map,
201+
namespace="default",
202+
label_selector="inf-e2e=1",
203+
resync_period=5,
204+
)
205+
inf.add_event_handler(ADDED, lambda o: added.set() if _name_of(o) == name else None)
206+
# The resync fires MODIFIED for existing cached objects; wait for it.
207+
inf.add_event_handler(MODIFIED, lambda o: resynced.set() if _name_of(o) == name else None)
208+
inf.start()
209+
self.addCleanup(inf.stop)
210+
211+
# First, wait for the object to be added to the cache.
212+
self._expect(added, "ADDED/" + name)
213+
# Then wait for the resync to fire MODIFIED (allow up to 3× resync_period).
214+
if not resynced.wait(timeout=15):
215+
self.fail("Timeout waiting for resync MODIFIED/" + name)
216+
217+
# Verify the cached object still holds the expected data.
218+
cached = inf.cache.get_by_key("default/" + name)
219+
self.assertIsNotNone(cached)
220+
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
221+
self.assertEqual(data.get("k"), "v")
222+
223+
224+
if __name__ == "__main__":
225+
unittest.main()

kubernetes/informer/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2026 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from .cache import ObjectCache, _meta_namespace_key
16+
from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR
17+
18+
__all__ = [
19+
"ObjectCache",
20+
"_meta_namespace_key",
21+
"SharedInformer",
22+
"ADDED",
23+
"MODIFIED",
24+
"DELETED",
25+
"BOOKMARK",
26+
"ERROR",
27+
]

0 commit comments

Comments
 (0)