-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsites.py
More file actions
executable file
·213 lines (175 loc) · 7.35 KB
/
sites.py
File metadata and controls
executable file
·213 lines (175 loc) · 7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python
'''
Check websites to make sure they are up, are handling www/https redirects
as expected, and have valid not-expiring-soon SSL certificates.
Brian Cantoni
todo:
- move server list from here to an external yaml file
- better controls to allow AWS, Twilio and Slack to be optional
'''
import argparse
import datetime
import os
import re
import requests
import s3data
import socket
import ssl
import sys
import time
from twilio.rest import Client as TwilioClient
def ssl_expiry_datetime(hostname):
'''Get SSL certificate expire time from a website'''
ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
context = ssl.create_default_context()
conn = context.wrap_socket(
socket.socket(socket.AF_INET),
server_hostname=hostname,
)
conn.settimeout(3.0)
conn.connect((hostname, 443))
ssl_info = conn.getpeercert()
return datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
def ssl_valid_time_remaining(hostname):
'''Get the number of days left in a cert's lifetime.'''
expires = ssl_expiry_datetime(hostname)
return expires - datetime.datetime.utcnow()
def check_sites(verbose=False):
'''
Server test array fields:
url - URL to test with HTTP HEAD operation
code - expected HTTP response code
redirect - expected redirect URL for 301/302 responses, only used when expected code=301/302
contents - expected contents in page (HTTP GET), only used when expected code=200
'''
servers = [
{'url': 'http://about.readthedocs.com/', 'code': 302, 'redirect': 'https://about.readthedocs.com/'},
{'url': 'http://readthedocs.com/', 'code': 302, 'redirect': 'https://readthedocs.com/'},
{'url': 'https://about.readthedocs.com/', 'code': 200, 'contents': 'Documentation simplified'},
{'url': 'http://www.python.org/', 'code': 301, 'redirect': 'https://www.python.org/'},
{'url': 'https://www.python.org/', 'code': 200, 'contents': 'official home of the Python Programming Language'},
{'url': 'http://thunderbird.net/', 'code': 301, 'redirect': 'https://thunderbird.net/'},
{'url': 'http://www.thunderbird.net/', 'code': 301, 'redirect': 'https://www.thunderbird.net/'},
{'url': 'https://www.thunderbird.net/en-US/', 'code': 200, 'contents': 'Creative Commons license'},
]
errors = []
for s in servers:
if verbose:
print("checking {}".format(s['url']))
try:
r = requests.head(s['url'])
except requests.exceptions.RequestException as e:
errors.append("Fail: {} exception {}".format(s['url'], e))
break
if r.status_code != s['code']:
errors.append("Fail: {} expected response code {} received {}".format(
s['url'], s['code'], r.status_code))
else:
if r.status_code == 301 or r.status_code == 302:
if r.headers['Location'] != s['redirect']:
errors.append("Fail: {} expected redirect {} received {}".format(
s['url'], s['redirect'], r.headers['Location']))
elif r.status_code == 200:
r2 = requests.get(s['url'])
if r2.status_code != 200:
raise Exception("unexpected http response code {} from get {}"
.format(r.status_code, s['url']))
matches = re.findall(s['contents'], r2.content.decode('utf-8'))
if not matches:
errors.append("Fail: {} expected contents {}".format(s['url'],
s['contents']))
ssl_hosts = [
'readthedocs.com',
'www.python.org',
'www.thunderbird.net',
]
for s in ssl_hosts:
if verbose:
print("checking SSL {}".format(s))
remaining = ssl_valid_time_remaining(s)
if remaining < datetime.timedelta(days=0):
errors.append("Fail: SSL cert for {} already expired!".format(s))
elif remaining < datetime.timedelta(3):
errors.append("Fail: SSL cert for {} expiring in {} days".format(s, remaining.days))
if verbose:
print("Done. {} server and {} SSL certs checks; found {} error(s)"
.format(len(servers), len(ssl_hosts), len(errors)))
print('\033[91m' + "\n".join(errors) + '\033[0m')
return(errors)
def send_sms_messages(twilio_sid, twilio_auth_token, from_number, to_number, messages, verbose=False):
'''send SMS message via Twilio'''
client = TwilioClient(twilio_sid, twilio_auth_token)
for m in messages:
message = client.messages.create(
body=m,
from_=from_number,
to=to_number
)
if verbose:
print(message.sid)
return
def send_slack_messages(slack_webhook_url, messages, verbose=False):
'''send Slack message to channel using webhook'''
for m in messages:
req = requests.post(slack_webhook_url, json={'text': m})
if verbose:
print("Response from Slack webhook: {} {}".format(req.status_code, req.content))
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Check personal websites online and valid SSL certs")
parser.add_argument('--ci', action="store_true", help="CI mode including notifications")
parser.add_argument('--verbose', '-v', action="store_true", help="Verbose mode")
parser.add_argument('--delete', '-d', action="store_true", help="Delete existing stored data")
args = parser.parse_args()
errors = check_sites(args.verbose)
rc = len(errors)
# in CI mode, save results in S3 and also send results via SMS and Slack
if args.ci:
s3 = s3data.S3Data(
os.environ['AWS_ACCESS_KEY_ID'],
os.environ['AWS_SECRET_ACCESS_KEY'],
os.environ['S3DATA_BUCKET'],
)
s3key = 'sitecheck-example-data'
if args.delete:
s3.delete(s3key)
last = s3.get(s3key)
if last:
print("last run rc {} at {} which is {}".format(
last['results'],
last['lastRun'],
datetime.datetime.fromtimestamp(last['lastRun']).strftime('%Y-%m-%d %H:%M:%S'))
)
data = {
"lastRun": int(time.time()),
"version": 1,
"results": rc,
"errors": errors,
}
s3.put(s3key, data)
# send notifications only if currently failing or was failing, now passing
if not last or errors or (not errors and last and last['results'] != 0):
# send multiple messages if needed to get around Twilio 160 char limit
msg = []
if rc == 0:
msg.append("Sitecheck PASS")
else:
msg.append("Sitecheck {} Errors:".format(rc))
for e in errors:
msg.append(e)
if args.verbose:
print("CI mode")
send_sms_messages(
os.environ['TWILIO_ACCOUNT_SID'],
os.environ['TWILIO_AUTH_TOKEN'],
os.environ['TWILIO_FROM_NUMBER'],
os.environ['TWILIO_TO_NUMBER'],
msg,
args.verbose,
)
send_slack_messages(
os.environ['SLACK_WEBHOOK'],
msg,
args.verbose,
)
sys.exit(rc)