-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfirmware_update.py
More file actions
482 lines (380 loc) · 19.2 KB
/
firmware_update.py
File metadata and controls
482 lines (380 loc) · 19.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
#!/usr/bin/env python3
"""
Firmware Update Automation Script for IBM Servers
Automates the firmware update process for Everest and Rainier systems
"""
import paramiko
import getpass
import sys
import time
import re
import os
from datetime import datetime
class FirmwareUpdater:
def __init__(self):
self.gsa_id = None
self.gsa_password = None
self.luci_password = None
self.build_path = None
self.system_type = None
self.machine_address = None
self.lab_password = None
self.gateway_client = None
self.machine_client = None
self.log_file = f"firmware_update_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
def log(self, message, level="INFO"):
"""Log messages to both console and file"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_message = f"[{timestamp}] [{level}] {message}"
print(log_message)
with open(self.log_file, 'a') as f:
f.write(log_message + '\n')
def collect_inputs(self):
"""Collect all required inputs from user"""
self.log("=== Firmware Update Automation Script ===")
self.log("Collecting user inputs...")
try:
self.gsa_id = input("Enter GSA ID: ").strip()
self.gsa_password = getpass.getpass("Enter GSA Password: ")
# Luci password with info tip
print("\nInfo: LUCI password is mostly same as GSA password")
use_same = input("Use same password as GSA password? (y/n): ").strip().lower()
if use_same == 'y':
self.luci_password = self.gsa_password
else:
self.luci_password = getpass.getpass("Enter LUCI Password: ")
self.build_path = input("\nEnter Build Path: ").strip()
# System type validation
while True:
self.system_type = input("Enter System Type (Everest/Rainier): ").strip().lower()
if self.system_type in ['everest', 'rainier']:
break
print("Invalid system type. Please enter 'Everest' or 'Rainier'")
self.machine_address = input("Enter Machine Address: ").strip()
self.lab_password = getpass.getpass("Enter Lab Password: ")
self.log("All inputs collected successfully")
return True
except KeyboardInterrupt:
self.log("\nInput collection cancelled by user", "ERROR")
return False
except Exception as e:
self.log(f"Error collecting inputs: {str(e)}", "ERROR")
return False
def execute_command(self, client, command, wait_for_prompt=True, timeout=30):
"""Execute command on SSH client and return output"""
try:
self.log(f"Executing: {command}")
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
if error:
self.log(f"Command stderr: {error}", "WARNING")
return output, error
except Exception as e:
self.log(f"Error executing command '{command}': {str(e)}", "ERROR")
return None, str(e)
def connect_to_gateway(self):
"""Connect to IBM gateway server"""
try:
self.log("Connecting to gateway server gfwr701.rchland.ibm.com...")
self.gateway_client = paramiko.SSHClient()
self.gateway_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.gateway_client.connect(
hostname='gfwr701.rchland.ibm.com',
username=self.gsa_id,
password=self.gsa_password,
timeout=30
)
self.log("Successfully connected to gateway server")
return True
except Exception as e:
self.log(f"Failed to connect to gateway: {str(e)}", "ERROR")
return False
def setup_gateway_session(self):
"""Setup bash and kerberos token on gateway"""
try:
self.log("Setting up gateway session...")
# Start interactive shell
if self.gateway_client is None:
raise Exception("Gateway client not initialized")
shell = self.gateway_client.invoke_shell()
time.sleep(1)
# Switch to bash
self.log("Switching to bash shell...")
shell.send(b'bash\n')
time.sleep(1)
# Get kerberos token
self.log("Getting Kerberos token...")
shell.send(b'klog -c rchland\n')
time.sleep(2)
# Send LUCI password
shell.send(f'{self.luci_password}\n'.encode('utf-8'))
time.sleep(2)
# Read output
output = shell.recv(4096).decode('utf-8')
self.log(f"Gateway setup output: {output}")
self.log("Gateway session setup complete")
return True
except Exception as e:
self.log(f"Failed to setup gateway session: {str(e)}", "ERROR")
return False
def transfer_firmware(self):
"""Transfer firmware file to target machine"""
try:
# Determine build file based on system type
if self.system_type == 'rainier':
build_file = 'obmc-phosphor-image-rainier.ext4.mmc.tar'
else: # everest
build_file = 'obmc-phosphor-image-everest.ext4.mmc.tar'
self.log(f"Using build file: {build_file}")
full_path = f'{self.build_path}/{build_file}'
self.log(f"Source file: {full_path}")
# Method 1: Try using sshpass if available
self.log(f"Starting file transfer to {self.machine_address}...")
self.log("This may take several minutes depending on file size...")
# Create sshpass command with password
sshpass_command = (
f"sshpass -p '{self.lab_password}' "
f"scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null "
f"{full_path} service@{self.machine_address}:/tmp/images/"
)
self.log("Attempting file transfer using sshpass...")
output, error = self.execute_command(self.gateway_client, sshpass_command, timeout=600)
# Check if sshpass is not available
if error and ('sshpass: command not found' in error or 'not found' in error):
self.log("sshpass not available, trying alternative method...", "WARNING")
return self.transfer_firmware_alternative()
if error and 'permission denied' in error.lower():
self.log("File transfer failed - permission denied", "ERROR")
self.log("Trying alternative transfer method...", "INFO")
return self.transfer_firmware_alternative()
if error and 'error' in error.lower():
self.log(f"Transfer error: {error}", "WARNING")
self.log("Trying alternative transfer method...", "INFO")
return self.transfer_firmware_alternative()
self.log("File transfer completed successfully")
return True
except Exception as e:
self.log(f"Failed to transfer firmware: {str(e)}", "ERROR")
self.log("Trying alternative transfer method...", "INFO")
return self.transfer_firmware_alternative()
def transfer_firmware_alternative(self):
"""Alternative method: Transfer via Python using paramiko SFTP through gateway"""
try:
self.log("Using alternative transfer method (SFTP through gateway)...")
# Determine build file based on system type
if self.system_type == 'rainier':
build_file = 'obmc-phosphor-image-rainier.ext4.mmc.tar'
else: # everest
build_file = 'obmc-phosphor-image-everest.ext4.mmc.tar'
source_path = f'{self.build_path}/{build_file}'
# First, download file from gateway to local temp
self.log("Step 1: Downloading file from gateway...")
if self.gateway_client is None:
raise Exception("Gateway client not initialized")
sftp_gateway = self.gateway_client.open_sftp()
local_temp_path = f'/tmp/{build_file}'
# Get file size for progress tracking
file_attrs = sftp_gateway.stat(source_path)
file_size = file_attrs.st_size if file_attrs.st_size else 0
file_size_mb = file_size / (1024 * 1024)
self.log(f"Downloading {source_path} to {local_temp_path}...")
self.log(f"File size: {file_size_mb:.2f} MB")
# Download with progress callback
downloaded = [0] # Use list to allow modification in nested function
last_percent = [0]
def download_progress(transferred, total):
downloaded[0] = transferred
percent = (transferred / total) * 100
# Only log every 10% to avoid spam
if int(percent / 10) > int(last_percent[0] / 10):
mb_transferred = transferred / (1024 * 1024)
mb_total = total / (1024 * 1024)
self.log(f"Download progress: {percent:.1f}% ({mb_transferred:.2f}/{mb_total:.2f} MB)")
last_percent[0] = percent
sftp_gateway.get(source_path, local_temp_path, callback=download_progress)
sftp_gateway.close()
self.log("Download from gateway completed (100%)")
# Now upload to target machine
self.log("Step 2: Uploading file to target machine...")
if self.machine_address is None or self.lab_password is None:
raise Exception("Machine address or password not set")
# Create new SSH connection to target for file transfer
target_client = paramiko.SSHClient()
target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
target_client.connect(
hostname=self.machine_address,
username='service',
password=self.lab_password,
timeout=30
)
sftp_target = target_client.open_sftp()
# Ensure /tmp/images directory exists
try:
sftp_target.stat('/tmp/images')
except FileNotFoundError:
self.log("Creating /tmp/images directory on target...")
sftp_target.mkdir('/tmp/images')
target_path = f'/tmp/images/{build_file}'
# Get file size for progress tracking
local_file_size = os.path.getsize(local_temp_path)
local_file_size_mb = local_file_size / (1024 * 1024)
self.log(f"Uploading {local_temp_path} to {target_path}...")
self.log(f"File size: {local_file_size_mb:.2f} MB")
# Upload with progress callback
uploaded = [0]
last_percent_up = [0]
def upload_progress(transferred, total):
uploaded[0] = transferred
percent = (transferred / total) * 100
# Only log every 10% to avoid spam
if int(percent / 10) > int(last_percent_up[0] / 10):
mb_transferred = transferred / (1024 * 1024)
mb_total = total / (1024 * 1024)
self.log(f"Upload progress: {percent:.1f}% ({mb_transferred:.2f}/{mb_total:.2f} MB)")
last_percent_up[0] = percent
sftp_target.put(local_temp_path, target_path, callback=upload_progress)
sftp_target.close()
target_client.close()
self.log("Upload to target machine completed (100%)")
# Clean up local temp file
if os.path.exists(local_temp_path):
os.remove(local_temp_path)
self.log("Cleaned up temporary file")
self.log("Alternative file transfer completed successfully")
return True
except Exception as e:
self.log(f"Alternative transfer method failed: {str(e)}", "ERROR")
return False
def connect_to_machine(self):
"""Connect to target machine"""
try:
if self.machine_address is None:
raise Exception("Machine address not set")
if self.lab_password is None:
raise Exception("Lab password not set")
self.log(f"Connecting to target machine {self.machine_address}...")
self.machine_client = paramiko.SSHClient()
self.machine_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.machine_client.connect(
hostname=self.machine_address,
username='service',
password=self.lab_password,
timeout=30
)
self.log("Successfully connected to target machine")
return True
except Exception as e:
self.log(f"Failed to connect to target machine: {str(e)}", "ERROR")
return False
def activate_firmware(self):
"""Activate firmware on target machine"""
try:
self.log("Preparing firmware activation...")
# Change to images directory
output, error = self.execute_command(self.machine_client, 'cd /tmp/images')
# Create ignore file
self.log("Creating ignore-machine-name file...")
output, error = self.execute_command(self.machine_client, 'touch /tmp/ignore-machine-name')
# List files to find hash
self.log("Searching for firmware hash...")
output, error = self.execute_command(self.machine_client, 'ls /tmp/images')
if not output:
self.log("Failed to list files in /tmp/images", "ERROR")
return False
self.log(f"Directory contents:\n{output}")
# Extract hash (looking for alphanumeric hash pattern)
hash_pattern = r'[a-f0-9]{8,}'
matches = re.findall(hash_pattern, output)
if not matches:
self.log("No firmware hash found in /tmp/images", "ERROR")
self.log("Please verify the firmware file was transferred correctly", "ERROR")
return False
# Use the first match as hash
hash_number = matches[0]
self.log(f"Found firmware hash: {hash_number}")
# Activate firmware - try with sudo first, then without
activation_command = (
f'busctl set-property xyz.openbmc_project.Software.BMC.Updater '
f'/xyz/openbmc_project/software/{hash_number} '
f'xyz.openbmc_project.Software.Activation RequestedActivation s '
f'xyz.openbmc_project.Software.Activation.RequestedActivations.Active'
)
# First try with sudo
self.log("Activating firmware (attempting with sudo)...")
sudo_command = f'echo "{self.lab_password}" | sudo -S {activation_command}'
output, error = self.execute_command(self.machine_client, sudo_command, timeout=60)
# Check if sudo worked
if error and ('access denied' in error.lower() or 'permission denied' in error.lower()):
self.log("Sudo attempt failed, trying without sudo...", "WARNING")
# Try without sudo
output, error = self.execute_command(self.machine_client, activation_command, timeout=60)
# Check final result
if error and ('access denied' in error.lower() or 'permission denied' in error.lower()):
self.log(f"Firmware activation failed - Access denied: {error}", "ERROR")
self.log("The service account may not have sufficient privileges.", "ERROR")
self.log("You may need to run the busctl command manually with root privileges.", "ERROR")
return False
elif error and 'error' in error.lower():
self.log(f"Firmware activation may have encountered an issue: {error}", "WARNING")
# Don't fail completely, as some warnings are normal
else:
self.log("Firmware activation command executed successfully")
return True
except Exception as e:
self.log(f"Failed to activate firmware: {str(e)}", "ERROR")
return False
def cleanup(self):
"""Close all connections"""
self.log("Cleaning up connections...")
if self.machine_client:
self.machine_client.close()
self.log("Machine connection closed")
if self.gateway_client:
self.gateway_client.close()
self.log("Gateway connection closed")
def run(self):
"""Main execution flow"""
try:
# Collect inputs
if not self.collect_inputs():
return False
# Connect to gateway
if not self.connect_to_gateway():
return False
# Setup gateway session
if not self.setup_gateway_session():
return False
# Transfer firmware
if not self.transfer_firmware():
return False
# Connect to target machine
if not self.connect_to_machine():
return False
# Activate firmware
if not self.activate_firmware():
return False
self.log("=== Firmware update process completed successfully ===", "SUCCESS")
self.log(f"Log file saved to: {self.log_file}")
return True
except Exception as e:
self.log(f"Unexpected error during firmware update: {str(e)}", "ERROR")
return False
finally:
self.cleanup()
def main():
"""Entry point"""
print("=" * 60)
print("IBM Server Firmware Update Automation Script")
print("=" * 60)
print()
updater = FirmwareUpdater()
success = updater.run()
if success:
print("\n✓ Firmware update completed successfully!")
sys.exit(0)
else:
print("\n✗ Firmware update failed. Check the log file for details.")
sys.exit(1)
if __name__ == "__main__":
main()