This is the commits from #181 and #182, backported for rhel7. The differences are minor but _just barely_ enough to make me want another quick review.
The differences from the above commits are:
- 231ac70: also reverts a66cb15 - 99c97dd: dnfpayload vs. yumpayload; the fix is the same, it's just in a different file. - d844db8: resolve some parse-kickstart conflicts - `commands.network.RHEL7_Network` vs. `commands.network.F22_Network`, etc. - faf2432: resolve conflict in dracut/module-setup.sh: RHEL7 doesn't have `PYTHONHASHSEED=42` - 679448a: RHEL7 handles chronyd and ntpd; rather than checking to see if the files exist, just try to modify them - the worst that can happen (AFAICT) is a warning in the log. NBD.
Added label: rhel7-branch.
From: Will Woods wwoods@redhat.com
addDriverRepos needs to happen before gatherRepoMetadata, otherwise we don't gather its metadata. --- pyanaconda/packaging/__init__.py | 1 + pyanaconda/packaging/yumpayload.py | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-)
diff --git a/pyanaconda/packaging/__init__.py b/pyanaconda/packaging/__init__.py index 6cc44d2..bf64709 100644 --- a/pyanaconda/packaging/__init__.py +++ b/pyanaconda/packaging/__init__.py @@ -1254,6 +1254,7 @@ def _runThread(self, storage, ksdata, payload, instClass, fallback, checkmount): # Download package metadata try: payload.updateBaseRepo(fallback=fallback, checkmount=checkmount) + payload.addDriverRepos() except (OSError, PayloadError) as e: log.error("PayloadError: %s", e) self._error = self.ERROR_SETUP diff --git a/pyanaconda/packaging/yumpayload.py b/pyanaconda/packaging/yumpayload.py index 12b48f8..4c2a837 100644 --- a/pyanaconda/packaging/yumpayload.py +++ b/pyanaconda/packaging/yumpayload.py @@ -1405,8 +1405,6 @@ def preInstall(self, packages=None, groups=None): self.requiredPackages = packages self.requiredGroups = groups
- self.addDriverRepos() - if self.install_device: self._setupMedia(self.install_device)
From: Will Woods wwoods@redhat.com
No substantive code changes here. Just some comments, some pylint directives, and an abstract mixin class so the extra arguments to dracut_args() don't look unused. --- dracut/parse-kickstart | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-)
diff --git a/dracut/parse-kickstart b/dracut/parse-kickstart index 08a26f8..94b4069 100755 --- a/dracut/parse-kickstart +++ b/dracut/parse-kickstart @@ -34,7 +34,7 @@ from pykickstart.parser import KickstartParser, preprocessKickstart from pykickstart.sections import NullSection from pykickstart.version import returnClassForVersion, RHEL7 from pykickstart.errors import KickstartError -# pylint: disable=wildcard-import +# pylint: disable=wildcard-import,unused-wildcard-import from pykickstart.constants import * from pykickstart import commands from collections import OrderedDict @@ -62,7 +62,7 @@ def read_cmdline(f): # pylint: disable=redefined-outer-name for line in lines: for arg in line.split(): - k,_e,v = arg.partition("=") + k,_,v = arg.partition("=") args[k] = v return args
@@ -83,20 +83,25 @@ def setting_only_hostname(net, args):
proc_cmdline = read_cmdline("/proc/cmdline")
+class DracutArgsMixin(object): + """A mixin class to make a Command generate dracut args.""" + def dracut_args(self, args, lineno, obj): + raise NotImplementedError + # Here are the kickstart commands we care about:
-class Cdrom(commands.cdrom.FC3_Cdrom): +class Cdrom(commands.cdrom.FC3_Cdrom, DracutArgsMixin): def dracut_args(self, args, lineno, obj): return "inst.repo=cdrom"
-class HardDrive(commands.harddrive.FC3_HardDrive): +class HardDrive(commands.harddrive.FC3_HardDrive, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.biospart: return "inst.repo=bd:%s:%s" % (self.partition, self.dir) else: return "inst.repo=hd:%s:%s" % (self.partition, self.dir)
-class NFS(commands.nfs.FC6_NFS): +class NFS(commands.nfs.FC6_NFS, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.opts: method = "nfs:%s:%s:%s" % (self.opts, self.server, self.dir) @@ -107,7 +112,7 @@ class NFS(commands.nfs.FC6_NFS): method = method.replace(" ", "\ ") return "inst.repo=%s" % method
-class URL(commands.url.F18_Url): +class URL(commands.url.F18_Url, DracutArgsMixin): def dracut_args(self, args, lineno, obj): # Spaces in the url need to be %20 if self.url: @@ -124,19 +129,19 @@ class URL(commands.url.F18_Url):
return "\n".join(args)
-class Updates(commands.updates.F7_Updates): +class Updates(commands.updates.F7_Updates, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.url == "floppy": return "live.updates=/dev/fd0" elif self.url: return "live.updates=%s" % self.url
-class MediaCheck(commands.mediacheck.FC4_MediaCheck): +class MediaCheck(commands.mediacheck.FC4_MediaCheck, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.mediacheck: return "rd.live.check"
-class DriverDisk(commands.driverdisk.F14_DriverDisk): +class DriverDisk(commands.driverdisk.F14_DriverDisk, DracutArgsMixin): def dracut_args(self, args, lineno, obj): dd_net = [] dd_disk = [] @@ -156,7 +161,7 @@ class DriverDisk(commands.driverdisk.F14_DriverDisk): # are processed later. return "\n".join(dd_net)
-class Network(commands.network.RHEL7_Network): +class Network(commands.network.RHEL7_Network, DracutArgsMixin): def dracut_args(self, args, lineno, net): ''' NOTE: The first 'network' line get special treatment: @@ -200,7 +205,7 @@ class Network(commands.network.RHEL7_Network):
return netline
-class DisplayMode(commands.displaymode.FC3_DisplayMode): +class DisplayMode(commands.displaymode.FC3_DisplayMode, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.displayMode == DISPLAY_MODE_CMDLINE: return "inst.cmdline" @@ -209,12 +214,12 @@ class DisplayMode(commands.displaymode.FC3_DisplayMode): elif self.displayMode == DISPLAY_MODE_GRAPHICAL: return "inst.graphical"
-class Bootloader(commands.bootloader.RHEL7_Bootloader): +class Bootloader(commands.bootloader.RHEL7_Bootloader, DracutArgsMixin): def dracut_args(self, args, lineno, obj): if self.extlinux: return "extlinux"
-# TODO: keymap, lang... device? selinux? +# FUTURE: keymap, lang... device? selinux?
dracutCmds = { 'cdrom': Cdrom, @@ -354,7 +359,7 @@ def ksnet_to_dracut(args, lineno, net, bootdev=False): open("/tmp/net.ifaces", "a")
if net.essid or net.wepkey or net.wpakey: - # TODO: make dracut support wireless? (do we care?) + # NOTE: does dracut actually support wireless? (do we care?) log.error("'%s': dracut doesn't support wireless networks", " ".join(args))
@@ -434,7 +439,7 @@ def ksnet_to_ifcfg(net, filename=None): if net.nodefroute: ifcfg['DEFROUTE'] = "no"
- # TODO: dhcpclass, ethtool, essid/wepkey/wpakay, etc. + # FUTURE: dhcpclass, ethtool, essid/wepkey/wpakay, etc.
if net.bootProto == 'dhcp': srcpath = "/tmp/dhclient.%s.lease" % dev @@ -502,7 +507,7 @@ def ksnet_to_ifcfg(net, filename=None):
options = {} for opt in net.bridgeopts.split(","): - key, _sep, value = opt.partition("=") + key, _, value = opt.partition("=") if not value: log.error("Invalid bridge option %s", opt) continue
From: Will Woods wwoods@redhat.com
The fancy handling here is only needed because of the truly messed-up way we were handling "inst.dd" compared to every other boot arg.
Fixing driverdisk handling makes this unnecessary. --- dracut/parse-kickstart | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-)
diff --git a/dracut/parse-kickstart b/dracut/parse-kickstart index 94b4069..a4f3666 100755 --- a/dracut/parse-kickstart +++ b/dracut/parse-kickstart @@ -143,23 +143,16 @@ class MediaCheck(commands.mediacheck.FC4_MediaCheck, DracutArgsMixin):
class DriverDisk(commands.driverdisk.F14_DriverDisk, DracutArgsMixin): def dracut_args(self, args, lineno, obj): - dd_net = [] - dd_disk = [] + dd_args = [] for dd in self.driverdiskList: if dd.partition: - dd_disk.append(dd.partition) + dd_args.append("inst.dd=hd:%s" % dd.partition) elif dd.source: - dd_net.append("inst.dd=%s" % dd.source) + dd_args.append("inst.dd=%s" % dd.source) + elif dd.biospart: + dd_args.append("inst.dd=bd:%s" % dd.biospart)
- # disk sources cannot be added to cmdline because the initial - # driver-update run has already finished. - if dd_disk: - with open("/tmp/dd_args_ks", "w") as f: - f.write(" ".join(dd_disk)) - - # network sources can be added to the existing cmdline, they - # are processed later. - return "\n".join(dd_net) + return "\n".join(dd_args)
class Network(commands.network.RHEL7_Network, DracutArgsMixin): def dracut_args(self, args, lineno, net):
From: Will Woods wwoods@redhat.com
This commit adds a new, rewritten `driver_updates.py`, along with some driverdisk tests.
It also adds `tests/lib/mkdud.py`, which is a helper tool for making driver disk images. --- dracut/driver_updates.py | 576 ++++++++++++++++++++++++++++ dracut/test_driver_updates.py | 624 +++++++++++++++++++++++++++++++ tests/kickstart_tests/driverdisk-disk.ks | 41 ++ tests/kickstart_tests/driverdisk-disk.sh | 33 ++ tests/lib/mkdud.py | 120 ++++++ 5 files changed, 1394 insertions(+) create mode 100755 dracut/driver_updates.py create mode 100644 dracut/test_driver_updates.py create mode 100644 tests/kickstart_tests/driverdisk-disk.ks create mode 100755 tests/kickstart_tests/driverdisk-disk.sh create mode 100755 tests/lib/mkdud.py
diff --git a/dracut/driver_updates.py b/dracut/driver_updates.py new file mode 100755 index 0000000..4c331d5 --- /dev/null +++ b/dracut/driver_updates.py @@ -0,0 +1,576 @@ +#!/usr/bin/python +# +# Copyright (C) 2015 by Red Hat, Inc. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. +# +# Author(s): +# Brian C. Lane bcl@brianlane.com +# Will Woods wwoods@redhat.com +# +""" +Driver Update Disk handler program. + +This will be called once for each requested driverdisk (non-interactive), and +once for interactive mode (if requested). + +Usage is one of: + + driver-updates --disk DISKSTR DEVNODE + + DISKSTR is the string passed by the user ('/dev/sda3', 'LABEL=DD', etc.) + DEVNODE is the actual device node (/dev/sda3, /dev/sr0, etc.) + + driver-updates --net URL LOCALFILE + + URL is the string passed by the user ('http://.../something.iso') + LOCALFILE is the location of the downloaded file + + driver-updates --interactive + + The user will be presented with a menu where they can choose a disk + and pick individual drivers to install. + +/tmp/dd_net contains the list of URLs given by the user. +/tmp/dd_disk contains the list of disk devices given by the user. +/tmp/dd_interactive contains "menu" if interactive mode was requested. + +/tmp/dd.done should be created when all the user-requested stuff above has been +handled; the installer won't start up until this file is created. + +Repositories for installed drivers are copied into /run/install/DD-X where X +starts at 1 and increments for each repository. + +Selected driver package names are saved in /run/install/dd_packages. + +Anaconda uses the repository and package list to install the same set of drivers +to the target system. +""" + +import logging +import sys +import os +import subprocess +import fnmatch +import readline # pylint:disable=unused-import + +from contextlib import contextmanager +from logging.handlers import SysLogHandler + +# py2 compat +try: + from subprocess import DEVNULL +except ImportError: + DEVNULL = open("/dev/null", 'a+') +try: + _input = raw_input # pylint: disable=undefined-variable +except NameError: + _input = input + +log = logging.getLogger("DD") + +arch = os.uname()[4] +kernelver = os.uname()[2] + +def mkdir_seq(stem): + """ + Create sequentially-numbered directories starting with stem. + + For example, mkdir_seq("/tmp/DD-") would create "/tmp/DD-1"; + if that already exists, try "/tmp/DD-2", "/tmp/DD-3", and so on, + until a directory is created. + + Returns the newly-created directory name. + """ + n = 1 + while True: + dirname = str(stem) + str(n) + try: + os.makedirs(dirname) + except OSError as e: + if e.errno != 17: raise + n += 1 + else: + return dirname + +def find_repos(mnt): + """find any valid driverdisk repos that exist under mnt.""" + dd_repos = [] + for root, dirs, files in os.walk(mnt, followlinks=True): + repo = root+"/rpms/"+arch + if "rhdd3" in files and "rpms" in dirs and os.path.isdir(repo): + log.debug("found repo: %s", repo) + dd_repos.append(repo) + return dd_repos + +# NOTE: it's unclear whether or not we're supposed to recurse subdirs looking +# for .iso files, but that seems like a bad idea if you mount some huge disk.. +# So I've made a judgement call: we only load .iso files from the toplevel. +def find_isos(mnt): + """find files named '.iso' at the top level of mnt.""" + return [mnt+'/'+f for f in os.listdir(mnt) if f.lower().endswith('.iso')] + +class Driver(object): + """Represents a single driver (rpm), as listed by dd_list""" + def __init__(self, source="", name="", flags="", description="", repo=""): + self.source = source + self.name = name + self.flags = flags + self.description = description + self.repo = repo + +def dd_list(dd_path, anaconda_ver=None, kernel_ver=None): + log.debug("dd_list: listing %s", dd_path) + if not anaconda_ver: + anaconda_ver = '19.0' + if not kernel_ver: + kernel_ver = kernelver + cmd = ["dd_list", '-d', dd_path, '-k', kernel_ver, '-a', anaconda_ver] + out = subprocess.check_output(cmd, stderr=DEVNULL) + out = out.decode('utf-8') + drivers = [Driver(*d.split('\n',3)) for d in out.split('\n---\n') if d] + log.debug("dd_list: found drivers: %s", ' '.join(d.name for d in drivers)) + for d in drivers: d.repo = dd_path + return drivers + +def dd_extract(rpm_path, outdir, kernel_ver=None, flags='-blmf'): + log.debug("dd_extract: extracting %s", rpm_path) + if not kernel_ver: + kernel_ver = kernelver + cmd = ["dd_extract", flags, '-r', rpm_path, '-d', outdir, '-k', kernel_ver] + subprocess.check_output(cmd, stderr=DEVNULL) # discard stdout + +def mount(dev, mnt=None): + """Mount the given dev at the mountpoint given by mnt.""" + # NOTE: dev may be a filesystem image - "-o loop" is not necessary anymore + if not mnt: + mnt = mkdir_seq("/media/DD-") + cmd = ["mount", dev, mnt] + log.debug("mounting %s at %s", dev, mnt) + subprocess.check_call(cmd) + return mnt + +def umount(mnt): + log.debug("unmounting %s", mnt) + subprocess.call(["umount", mnt]) + +@contextmanager +def mounted(dev, mnt=None): + mnt = mount(dev, mnt) + try: + yield mnt + finally: + umount(mnt) + +module_updates_dir = '/lib/modules/%s/updates' % os.uname()[2] +firmware_updates_dir = '/lib/firmware/updates' + +def iter_files(topdir, pattern=None): + """iterator; yields full paths to files under topdir that match pattern.""" + for head, _, files in os.walk(topdir): + for f in files: + if pattern is None or fnmatch.fnmatch(f, pattern): + yield os.path.join(head, f) + +def ensure_dir(d): + """make sure the given directory exists.""" + subprocess.check_call(["mkdir", "-p", d]) + +def move_files(files, destdir): + """move files into destdir (iff they're not already under destdir)""" + ensure_dir(destdir) + for f in files: + if f.startswith(destdir): + continue + subprocess.call(["mv", "-f", f, destdir]) + +def copy_files(files, destdir): + """copy files into destdir (iff they're not already under destdir)""" + ensure_dir(destdir) + for f in files: + if f.startswith(destdir): + continue + subprocess.call(["cp", "-a", f, destdir]) + +def append_line(filename, line): + """simple helper to append a line to a file""" + if not line.endswith("\n"): + line += "\n" + with open(filename, 'a') as outf: + outf.write(line) + +def read_lines(filename): + try: + return open(filename).read().splitlines() + except IOError: + return [] + +def save_repo(repo, target="/run/install"): + """copy a repo to the place where the installer will look for it later.""" + newdir = mkdir_seq(os.path.join(target, "DD-")) + log.debug("save_repo: copying %s to %s", repo, newdir) + subprocess.call(["cp", "-arT", repo, newdir]) + return newdir + +def extract_drivers(drivers=None, repos=None, outdir="/updates", + pkglist="/run/install/dd_packages"): + """ + Extract drivers - either a user-selected driver list or full repos. + + drivers should be a list of Drivers to extract, or None. + repos should be a list of repo paths to extract, or None. + (If both are empty, nothing happens..) + + If any packages containing modules or firmware are extracted, also: + * call save_repo for that package's repo + * write the package name(s) to pkglist. + + Returns True if any package containing modules was extracted. + """ + if not drivers: + drivers = [] + if repos: + drivers += [d for repo in repos for d in dd_list(repo)] + + save_repos = set() + new_drivers = False + + ensure_dir(outdir) + + for driver in drivers: + log.info("Extracting: %s", driver.name) + dd_extract(driver.source, outdir) + # Make sure we install modules/firmware into the target system + if 'modules' in driver.flags or 'firmwares' in driver.flags: + append_line(pkglist, driver.name) + save_repos.add(driver.repo) + new_drivers = True + + # save the repos containing those packages + for repo in save_repos: + save_repo(repo) + + return new_drivers + +def grab_driver_files(outdir="/updates"): + """ + copy any modules/firmware we just extracted into the running system. + return a list of the names of any modules we just copied. + """ + modules = list(iter_files(outdir+'/lib/modules',"*.ko*")) + firmware = list(iter_files(outdir+'/lib/firmware')) + copy_files(modules, module_updates_dir) + copy_files(firmware, firmware_updates_dir) + move_files(modules, outdir+module_updates_dir) + move_files(firmware, outdir+firmware_updates_dir) + return [os.path.basename(m).split('.ko')[0] for m in modules] + +def load_drivers(modnames): + """run depmod and try to modprobe all the given module names.""" + log.debug("load_drivers: %s", modnames) + subprocess.call(["depmod", "-a"]) + subprocess.call(["modprobe", "-a"] + modnames) + +def process_driver_disk(dev, interactive=False): + try: + _process_driver_disk(dev, interactive=interactive) + except (subprocess.CalledProcessError, IOError) as e: + log.error("ERROR: %s", e) + +def _process_driver_disk(dev, interactive=False): + """ + Main entry point for processing a single driver disk. + Mount the device/image, find repos, and install drivers from those repos. + + If there are no repos, look for .iso files, and (if present) recursively + process those. + + If interactive, ask the user which driver(s) to install from the repos, + or ask which iso file to process (if no repos). + """ + log.info("Examining %s", dev) + with mounted(dev) as mnt: + repos = find_repos(mnt) + isos = find_isos(mnt) + + if repos: + if interactive: + new_modules = extract_drivers(drivers=repo_menu(repos)) + else: + new_modules = extract_drivers(repos=repos) + if new_modules: + modules = grab_driver_files() + load_drivers(modules) + elif isos: + if interactive: + isos = iso_menu(isos) + for iso in isos: + process_driver_disk(iso, interactive=interactive) + else: + print("=== No driver disks found in %s! ===\n" % dev) + +def mark_finished(user_request, topdir="/tmp"): + log.debug("marking %s complete in %s", user_request, topdir) + append_line(topdir+"/dd_finished", user_request) + +def all_finished(topdir="/tmp"): + finished = read_lines(topdir+"/dd_finished") + todo = read_lines(topdir+"/dd_todo") + return all(r in finished for r in todo) + +def finish(user_request, topdir="/tmp"): + # mark that we've finished processing this request + mark_finished(user_request, topdir) + # if we're done now, let dracut know + if all_finished(topdir): + append_line(topdir+"/dd.done", "true") + +# --- DEVICE LISTING HELPERS FOR THE MENU ----------------------------------- + +class DeviceInfo(object): + def __init__(self, **kwargs): + self.device = kwargs.get("DEVNAME", '') + self.uuid = kwargs.get("UUID", '') + self.fs_type = kwargs.get("TYPE", '') + self.label = kwargs.get("LABEL", '') + + def __repr__(self): + return '<DeviceInfo %s>' % self.device + + @property + def shortdev(self): + if os.path.islink(self.device): + return os.path.basename(os.readlink(self.device)) + elif self.device.startswith('/dev/'): + return self.device[5:] + else: + return self.device + +def blkid(): + out = subprocess.check_output("blkid -o export -s UUID -s TYPE".split()) + out = out.decode('ascii') + return [dict(kv.split('=',1) for kv in block.splitlines()) + for block in out.split('\n\n')] + +def get_disk_labels(): + return {os.path.realpath(s):os.path.basename(s) + for s in iter_files("/dev/disk/by-label")} + +def get_deviceinfo(): + disk_labels = get_disk_labels() + deviceinfo = [DeviceInfo(**d) for d in blkid()] + for dev in deviceinfo: + dev.label = disk_labels.get(dev.device, '') + return deviceinfo + +# --- INTERACTIVE MENU JUNK ------------------------------------------------ + +class TextMenu(object): + def __init__(self, items, title=None, formatter=None, headeritem=None, + refresher=None, multi=False, page_height=20): + self.items = items + self.title = title + self.formatter = formatter + self.headeritem = headeritem + self.refresher = refresher + self.multi = multi + self.page_height = page_height + self.pagenum = 1 + self.selected_items = [] + self.is_done = False + if callable(items): + self.refresher = items + self.refresh() + + @property + def num_pages(self): + pages, leftover = divmod(len(self.items), self.page_height) + if leftover: + return pages+1 + else: + return pages + + def next(self): + if self.pagenum < self.num_pages: + self.pagenum += 1 + + def prev(self): + if self.pagenum > 1: + self.pagenum -= 1 + + def refresh(self): + if callable(self.refresher): + self.items = self.refresher() + + def done(self): + self.is_done = True + + def invalid(self, k): + print("Invalid selection %r" % k) + + def toggle_item(self, item): + if item in self.selected_items: + self.selected_items.remove(item) + else: + self.selected_items.append(item) + if not self.multi: + self.done() + + def items_on_page(self): + start_idx = (self.pagenum-1) * self.page_height + if start_idx > len(self.items): + return [] + else: + items = self.items[start_idx:start_idx+self.page_height] + return enumerate(items, start=start_idx) + + def format_item(self, item): + if callable(self.formatter): + return self.formatter(item) + else: + return str(item) + + def format_items(self): + for n, i in self.items_on_page(): + if self.multi: + x = 'x' if i in self.selected_items else ' ' + yield "%2d) [%s] %s" % (n+1, x, self.format_item(i)) + else: + yield "%2d) %s" % (n+1, self.format_item(i)) + + def format_header(self): + if self.multi: + return ' '+self.format_item(self.headeritem) + else: + return ' '+self.format_item(self.headeritem) + + def action_dict(self): + actions = { + 'r': self.refresh, + 'n': self.next, + 'p': self.prev, + 'c': self.done, + } + for n, i in self.items_on_page(): + actions[str(n+1)] = lambda item=i: self.toggle_item(item) + return actions + + def format_page(self): + page = '\n(Page {pagenum} of {num_pages}) {title}\n{items}' + items = list(self.format_items()) + if self.headeritem: + items.insert(0, self.format_header()) + return page.format(pagenum=self.pagenum, + num_pages=self.num_pages, + title=self.title or '', + items='\n'.join(items)) + + def format_prompt(self): + options = [ + '# to toggle selection' if self.multi else '# to select', + "'r'-refresh" if callable(self.refresher) else None, + "'n'-next page" if self.pagenum < self.num_pages else None, + "'p'-previous page" if self.pagenum > 1 else None, + "or 'c'-continue" + ] + return ', '.join(o for o in options if o is not None) + ': ' + + def run(self): + while not self.is_done: + print(self.format_page()) + k = _input(self.format_prompt()) + action = self.action_dict().get(k) + if action: + action() + else: + self.invalid(k) + return self.selected_items + +def repo_menu(repos): + drivers = [d for r in repos for d in dd_list(r)] + if not drivers: + log.info("No suitable drivers found.") + return [] + menu = TextMenu(drivers, title="Select drivers to install", + formatter=lambda d: d.source, + multi=True) + result = menu.run() + return result + +def iso_menu(isos): + menu = TextMenu(isos, title="Choose driver disk ISO file") + result = menu.run() + return result + +def device_menu(): + fmt = '{0.shortdev:<8.8} {0.fs_type:<8.8} {0.label:<20.20} {0.uuid:<.36}' + hdr = DeviceInfo(DEVNAME='DEVICE', TYPE='TYPE', LABEL='LABEL', UUID='UUID') + menu = TextMenu(get_deviceinfo, title="Driver disk device selection", + formatter=fmt.format, + headeritem=hdr) + result = menu.run() + return result + +# --- COMMANDLINE-TYPE STUFF ------------------------------------------------ + +def setup_log(): + log.setLevel(logging.DEBUG) + handler = SysLogHandler(address="/dev/log") + log.addHandler(handler) + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + formatter = logging.Formatter("DD: %(message)s") + handler.setFormatter(formatter) + log.addHandler(handler) + +def print_usage(): + print("usage: driver-updates --interactive") + print(" driver-updates --disk DISK KERNELDEV") + print(" driver-updates --net URL LOCALFILE") + +def check_args(args): + if args and args[0] == '--interactive': + return True + elif len(args) == 3 and args[0] in ('--disk', '--net'): + return True + else: + return False + +def main(args): + if not check_args(args): + print_usage() + raise SystemExit(2) + + mode = args.pop(0) + + if mode in ('--disk', '--net'): + request, dev = args + process_driver_disk(dev) + + elif mode == '--interactive': + log.info("starting interactive mode") + request = 'menu' + while True: + dev = device_menu() + if not dev: break + process_driver_disk(dev.pop().device, interactive=True) + + finish(request) + +if __name__ == '__main__': + setup_log() + try: + main(sys.argv[1:]) + except KeyboardInterrupt: + log.info("exiting.") diff --git a/dracut/test_driver_updates.py b/dracut/test_driver_updates.py new file mode 100644 index 0000000..2aaa993 --- /dev/null +++ b/dracut/test_driver_updates.py @@ -0,0 +1,624 @@ +# test_driver_updates.py - unittests for driver_updates.py + +import unittest +try: + import unittest.mock as mock +except ImportError: + import mock + +import os +import tempfile +import shutil + +from driver_updates import copy_files, move_files, iter_files, ensure_dir +from driver_updates import append_line, mkdir_seq + +def touch(path): + try: + open(path, 'a') + except IOError as e: + if e.errno != 17: raise + +def makedir(path): + ensure_dir(path) + return path + +def makefile(path): + makedir(os.path.dirname(path)) + touch(path) + return path + +def makefiles(*paths): + return [makefile(p) for p in paths] + +class FileTestCaseBase(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="test_driver_updates.") + self.srcdir = self.tmpdir+'/src/' + self.destdir = self.tmpdir+'/dest/' + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def makefiles(self, *paths): + return [makefile(os.path.normpath(self.tmpdir+'/'+p)) for p in paths] + +class SelfTestCase(FileTestCaseBase): + def test_makefiles(self): + """check test helpers""" + filepaths = ["sub/dir/test.file", "testfile"] + self.makefiles(*filepaths) + for f in filepaths: + self.assertTrue(os.path.exists(self.tmpdir+'/'+f)) + +class TestCopyFiles(FileTestCaseBase): + def test_basic(self): + """copy_file: copy files into destdir, leaving existing contents""" + files = self.makefiles("src/file1", "src/subdir/file2") + self.makefiles("dest/file3") + copy_files(files, self.destdir) + result = set(os.listdir(self.destdir)) + self.assertEqual(result, set(["file1", "file2", "file3"])) + + def test_overwrite(self): + """copy_file: overwrite files in destdir if they have the same name""" + src, dest = self.makefiles("src/file1", "dest/file1") + with open(src, 'w') as outf: + outf.write("srcfile") + with open(dest, 'w') as outf: + outf.write("destfile") + copy_files([src], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "srcfile") + + def test_samefile(self): + """copy_file: skip files already in destdir""" + (dest,) = self.makefiles("dest/file1") + with open(dest, 'w') as outf: + outf.write("destfile") + copy_files([dest], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "destfile") + + def test_copy_to_parent(self): + """copy_file: skip files in subdirs of destdir""" + files = self.makefiles("dest/subdir/file1") + copy_files(files, self.destdir) + self.assertEqual(list(iter_files(self.destdir)), files) + +class TestIterFiles(FileTestCaseBase): + def test_basic(self): + """iter_files: iterates over full paths to files under topdir""" + files = set(self.makefiles("src/file1", "dest/file2", "src/sub/file3")) + makedir(self.tmpdir+'/empty/dir') + result = set(iter_files(self.tmpdir)) + self.assertEqual(files, result) + + def test_pattern(self): + """iter_files: match filename against glob pattern""" + self.makefiles("src/file1.so", "src/sub.ko/file2") + goodfiles = set(self.makefiles("src/sub/file1.ko", "src/file2.ko.xz")) + result = set(iter_files(self.tmpdir, pattern="*.ko*")) + self.assertEqual(result, goodfiles) + +class TestMoveFiles(FileTestCaseBase): + def test_basic(self): + """move_files: move files to destdir""" + files = self.makefiles("src/file1", "src/subdir/file2") + move_files(files, self.destdir) + self.assertEqual(set(os.listdir(self.destdir)), set(["file1", "file2"])) + self.assertEqual(list(iter_files(self.srcdir)), []) + + def test_overwrite(self): + """move_files: overwrite files with the same name""" + src, dest = self.makefiles("src/file1", "dest/file1") + with open(src, 'w') as outf: + outf.write("srcfile") + with open(dest, 'w') as outf: + outf.write("destfile") + move_files([src], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "srcfile") + self.assertEqual(list(iter_files(self.srcdir)), []) + + def test_samefile(self): + """move_files: leave files alone if they're already in destdir""" + (dest,) = self.makefiles("dest/file1") + with open(dest, 'w') as outf: + outf.write("destfile") + move_files([dest], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "destfile") + + def test_move_to_parent(self): + """move_files: leave files alone if they're in a subdir of destdir""" + files = set(self.makefiles("dest/subdir/file1", "dest/file2")) + move_files(files, self.destdir) + self.assertEqual(set(iter_files(self.destdir)), files) + +class TestAppendLine(FileTestCaseBase): + def test_empty(self): + """append_line: create file + append \n when needed""" + line = "this is a line of text with no newline" + outfile = self.tmpdir+'/outfile' + append_line(outfile, line) + self.assertEqual(open(outfile).read(), line+'\n') + + def test_append(self): + """append_line: adds a line to the end of an existing file""" + oldlines = ["line one", "line two", "and I'm line three"] + outfile = self.tmpdir+'/outfile' + with open(outfile, 'w') as outf: + for line in oldlines: + outf.write(line+'\n') + line = "this line contains a newline already\n" + append_line(outfile, line) + self.assertEqual(open(outfile).read(), '\n'.join(oldlines+[line])) + +from driver_updates import read_lines +class TestReadLine(FileTestCaseBase): + def test_empty(self): + """read_lines: return [] for empty file""" + [empty] = self.makefiles("emptyfile") + self.assertEqual(read_lines(empty), []) + + def test_missing(self): + """read_lines: return [] for missing file""" + self.assertEqual(read_lines(self.tmpdir+'/no-such-file'),[]) + + def test_readlines(self): + """read_lines: returns a list of lines without trailing newlines""" + filedata = 'line one\nline two\n\nline four\n' + outfile = self.tmpdir+'/outfile' + with open(outfile, 'w') as outf: + outf.write(filedata) + lines = read_lines(outfile) + self.assertEqual(lines, ['line one', 'line two','','line four']) + +class TestMkdirSeq(FileTestCaseBase): + def test_basic(self): + """mkdir_seq: first dir ends with 1""" + newdir = mkdir_seq(self.srcdir+'/DD-') + self.assertEqual(newdir, self.srcdir+'/DD-1') + self.assertTrue(os.path.isdir(newdir)) + + def test_one_exists(self): + """mkdir_seq: increment number if file exists""" + firstdir = mkdir_seq(self.srcdir+'/DD-') + newdir = mkdir_seq(self.srcdir+'/DD-') + self.assertEqual(newdir, self.srcdir+'/DD-2') + self.assertTrue(os.path.isdir(newdir)) + self.assertTrue(os.path.isdir(firstdir)) + +from driver_updates import find_repos, save_repo, arch +# As far as we know, this is what makes a valid repo: rhdd3 + rpms/`uname -m`/ +def makerepo(topdir, desc=None): + descfile = makefile(topdir+'/rhdd3') + if not desc: + desc = os.path.basename(topdir) + with open(descfile, "w") as outf: + outf.write(desc+"\n") + makedir(topdir+'/rpms/'+arch) + +class TestFindRepos(FileTestCaseBase): + def test_basic(self): + """find_repos: return RPM dir if a valid repo is found""" + makerepo(self.tmpdir) + repos = find_repos(self.tmpdir) + self.assertEqual(repos, [self.tmpdir+'/rpms/'+arch]) + self.assertTrue(os.path.isdir(repos[0])) + + def test_multiple_subdirs(self): + """find_repos: descend multiple subdirs if needed""" + makerepo(self.tmpdir+'/driver1') + makerepo(self.tmpdir+'/sub/driver1') + makerepo(self.tmpdir+'/sub/driver2') + repos = find_repos(self.tmpdir) + self.assertEqual(len(repos),3) + +class TestSaveRepo(FileTestCaseBase): + def test_basic(self): + """save_repo: copies a directory to /run/install/DD-X""" + makerepo(self.srcdir) + [repo] = find_repos(self.srcdir) + makefile(repo+'/fake-something.rpm') + saved = save_repo(repo, target=self.destdir) + self.assertEqual(set(os.listdir(saved)), set(["fake-something.rpm"])) + self.assertEqual(saved, os.path.join(self.destdir, "DD-1")) + +from driver_updates import mount, umount, mounted +class MountTestCase(unittest.TestCase): + @mock.patch('driver_updates.mkdir_seq') + @mock.patch('driver_updates.subprocess.check_call') + def test_mkdir(self, check_call, mkdir): + """mount: makes mountpoint if needed""" + dev, mnt = '/dev/fake', '/media/DD-1' + mkdir.return_value = mnt + mountpoint = mount(dev) + mkdir.assert_called_once_with('/media/DD-') + check_call.assert_called_once_with(["mount", dev, mnt]) + self.assertEqual(mnt, mountpoint) + + @mock.patch('driver_updates.mkdir_seq') + @mock.patch('driver_updates.subprocess.check_call') + def test_basic(self, check_call, mkdir): + """mount: calls mount(8) to mount a device/image""" + dev, mnt = '/dev/fake', '/media/fake' + mount(dev, mnt) + check_call.assert_called_once_with(["mount", dev, mnt]) + self.assertFalse(mkdir.called) + + @mock.patch('driver_updates.subprocess.call') + def test_umount(self, call): + """umount: calls umount(8)""" + mnt = '/mnt/fake' + umount(mnt) + call.assert_called_once_with(["umount", mnt]) + + @mock.patch('driver_updates.mount') + @mock.patch('driver_updates.umount') + def test_mount_manager(self, mock_umount, mock_mount): + """mounted: context manager mounts/umounts as expected""" + dev, mnt = '/dev/fake', '/media/fake' + mock_mount.return_value = mnt + with mounted(dev, mnt) as mountpoint: + mock_mount.assert_called_once_with(dev, mnt) + self.assertFalse(mock_umount.called) + self.assertEqual(mountpoint, mnt) + mock_umount.assert_called_once_with(mnt) + +# NOTE: dd_list and dd_extract get tested pretty thoroughly in tests/dd_tests, +# so this is a slightly higher-level test case +from driver_updates import dd_list, dd_extract, Driver +fake_module = Driver( + source='/repo/path/to/fake-driver-1.0-1.rpm', + name='fake-driver', + flags='modules firmwares', + description='Wow this is totally a fake driver.\nHooray for this', + repo='/repo/path/to' +) +fake_enhancement = Driver( + source='/repo/path/to/fake-enhancement-1.0-1.rpm', + name='fake-enhancement', + flags='binaries libraries', + description='This is enhancing the crap out of the installer.\n\nYeah.', + repo=fake_module.repo +) +def dd_list_output(driver): + out='{0.source}\n{0.name}\n{0.flags}\n{0.description}\n---\n'.format(driver) + return out.encode('utf-8') + +class DDUtilsTestCase(unittest.TestCase): + @mock.patch("driver_updates.subprocess.check_output") + def test_dd_list(self, check_output): + """dd_list: returns a list of Driver objects parsed from output""" + output = dd_list_output(fake_module)+dd_list_output(fake_enhancement) + check_output.return_value = output + anaconda, kernel = '19.0', os.uname()[2] + result = dd_list(fake_module.repo) + cmd = check_output.call_args[0][0] + self.assertIn(kernel, cmd) + self.assertIn(anaconda, cmd) + self.assertIn(fake_module.repo, cmd) + self.assertTrue(cmd[0].endswith("dd_list")) + self.assertEqual(len(result), 2) + mod, enh = sorted(result, key=lambda d: d.name) + self.assertEqual(mod.__dict__, fake_module.__dict__) + self.assertEqual(enh.__dict__, fake_enhancement.__dict__) + + @mock.patch("driver_updates.subprocess.check_output") + def test_dd_extract(self, check_output): + """dd_extract: call binary with expected arguments""" + rpm = "/some/kind/of/path.rpm" + outdir = "/output/dir" + dd_extract(rpm, outdir) + cmd = check_output.call_args[0][0] + self.assertIn(os.uname()[2], cmd) + self.assertIn(rpm, cmd) + self.assertIn(outdir, cmd) + self.assertIn("-blmf", cmd) + self.assertTrue(cmd[0].endswith("dd_extract")) + +from driver_updates import extract_drivers, grab_driver_files, load_drivers + +class ExtractDriversTestCase(unittest.TestCase): + @mock.patch("driver_updates.save_repo") + @mock.patch("driver_updates.append_line") + @mock.patch("driver_updates.dd_extract") + def test_drivers(self, mock_extract, mock_append, mock_save): + """extract_drivers: save repo, write pkglist""" + extract_drivers(drivers=[fake_enhancement, fake_module]) + # extracts all listed modules + mock_extract.assert_has_calls([ + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_module.source, "/updates") + ], any_order=True) + pkglist = "/run/install/dd_packages" + mock_append.assert_called_once_with(pkglist, fake_module.name) + mock_save.assert_called_once_with(fake_module.repo) + + @mock.patch("driver_updates.save_repo") + @mock.patch("driver_updates.append_line") + @mock.patch("driver_updates.dd_extract") + def test_enhancements(self, mock_extract, mock_append, mock_save): + """extract_drivers: extract selected drivers, don't save enhancements""" + extract_drivers(drivers=[fake_enhancement]) + mock_extract.assert_called_once_with( + fake_enhancement.source, "/updates" + ) + self.assertFalse(mock_append.called) + self.assertFalse(mock_save.called) + + @mock.patch("driver_updates.save_repo") + @mock.patch("driver_updates.append_line") + @mock.patch("driver_updates.dd_extract") + def test_repo(self, mock_extract, mock_append, mock_save): + """extract_drivers(repos=[...]) extracts all drivers from named repos""" + with mock.patch("driver_updates.dd_list", side_effect=[ + [fake_enhancement], + [fake_enhancement, fake_module]]): + extract_drivers(repos=['enh_repo', 'mod_repo']) + mock_extract.assert_has_calls([ + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_module.source, "/updates") + ]) + pkglist = "/run/install/dd_packages" + mock_append.assert_called_once_with(pkglist, fake_module.name) + mock_save.assert_called_once_with(fake_module.repo) + +class GrabDriverFilesTestCase(FileTestCaseBase): + def test_basic(self): + """grab_driver_files: copy drivers into place, return module list""" + # create a bunch of fake extracted files + outdir = self.tmpdir + '/extract-outdir' + moddir = outdir + "/lib/modules/%s/kernel/" % os.uname()[2] + fwdir = outdir + "/lib/firmware/" + modules = makefiles(moddir+"net/funk.ko", moddir+"fs/lolfs.ko.xz") + firmware = makefiles(fwdir+"funk.fw") + makefiles(outdir+"/usr/bin/monkey", outdir+"/other/dir/blah.ko") + mod_upd_dir = self.tmpdir+'/module-updates' + fw_upd_dir = self.tmpdir+'/fw-updates' + # use our updates dirs instead of the default updates dirs + with mock.patch.multiple("driver_updates", + module_updates_dir=mod_upd_dir, + firmware_updates_dir=fw_upd_dir): + modnames = grab_driver_files(outdir) + self.assertEqual(set(modnames), set(["funk", "lolfs"])) + modfiles = set(['funk.ko', 'lolfs.ko.xz']) + fwfiles = set(['funk.fw']) + # modules/firmware are *not* in their old locations + self.assertEqual([f for f in modules+firmware if os.path.exists(f)], []) + # modules are in the system's updates dir + self.assertEqual(set(os.listdir(mod_upd_dir)), modfiles) + # modules are also in outdir's updates dir + self.assertEqual(set(os.listdir(outdir+'/'+mod_upd_dir)), modfiles) + # repeat for firmware + self.assertEqual(set(os.listdir(fw_upd_dir)), fwfiles) + self.assertEqual(set(os.listdir(outdir+'/'+fw_upd_dir)), fwfiles) + +class LoadDriversTestCase(unittest.TestCase): + @mock.patch("driver_updates.subprocess.call") + def test_basic(self, call): + """load_drivers: runs depmod and modprobes all named modules""" + modnames = ['mod1', 'mod2'] + load_drivers(modnames) + call.assert_has_calls([ + mock.call(["depmod", "-a"]), + mock.call(["modprobe", "-a"] + modnames) + ]) + +from driver_updates import process_driver_disk +class ProcessDriverDiskTestCase(unittest.TestCase): + def setUp(self): + # an iterable that returns fake mountpoints, for mocking mount() + self.fakemount = ["/mnt/DD-%i" % n for n in range(1,10)] + # an iterable that returns fake repos, for mocking find_repos() + self.frepo = { + '/mnt/DD-1': ['/mnt/DD-1/repo1'], + '/mnt/DD-2': ['/mnt/DD-2/repo1', '/mnt/DD-2/repo2'], + } + # fake iso listings for iso_dir + self.fiso = { + '/mnt/DD-1': [], + '/mnt/DD-2': [], + '/mnt/DD-3': [], + } + # a context-manager object to be returned by the mock mounted() + mounted_ctx = mock.MagicMock( + __enter__=mock.MagicMock(side_effect=self.fakemount), # mount + __exit__=mock.MagicMock(return_value=None), # umount + ) + self.modlist = [] + # set up our patches + patches = ( + mock.patch("driver_updates.mounted", return_value=mounted_ctx), + mock.patch("driver_updates.find_repos", side_effect=self.frepo.get), + mock.patch("driver_updates.find_isos", side_effect=self.fiso.get), + mock.patch("driver_updates.extract_drivers", return_value=True), + mock.patch("driver_updates.load_drivers"), + mock.patch('driver_updates.grab_driver_files', + side_effect=lambda: self.modlist), + ) + self.mocks = {p.attribute:p.start() for p in patches} + for p in patches: self.addCleanup(p.stop) + + def test_basic(self): + """process_driver_disk: mount disk, extract RPMs, grab + load drivers""" + dev = '/dev/fake' + process_driver_disk(dev) + # did we mount the initial device, and then the .iso we find therein? + self.mocks['mounted'].assert_called_once_with(dev) + self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-1']) + self.mocks['grab_driver_files'].assert_called_once_with() + self.mocks['load_drivers'].assert_called_once_with(self.modlist) + + def test_recursive(self): + """process_driver_disk: recursively process .isos at toplevel""" + dev = '/dev/fake' + # first mount has no repos, but an iso + self.frepo['/mnt/DD-1'] = [] + self.fiso['/mnt/DD-1'].append('magic.iso') + self.fiso['/mnt/DD-2'].append('ignored.iso') + process_driver_disk(dev) + # did we mount the initial device, and the iso therein? + # also: we ignore ignored.iso because magic.iso is a proper DD + self.mocks['mounted'].assert_has_calls([ + mock.call(dev), mock.call('magic.iso') + ]) + # we extracted drivers from the repo(s) in magic.iso + self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-2']) + self.mocks['grab_driver_files'].assert_called_once_with() + self.mocks['load_drivers'].assert_called_once_with(self.modlist) + + def test_no_drivers(self): + """process_driver_disk: don't run depmod etc. if no new drivers""" + dev = '/dev/fake' + self.mocks['extract_drivers'].return_value = False + process_driver_disk(dev) + self.assertFalse(self.mocks['grab_driver_files'].called) + self.assertFalse(self.mocks['load_drivers'].called) + +from driver_updates import finish, mark_finished, all_finished + +class FinishedTestCase(FileTestCaseBase): + def test_mark_finished(self): + """mark_finished: appends a line to /tmp/dd_finished""" + requeststr = "WOW SOMETHING OR OTHER" + mark_finished(requeststr, topdir=self.tmpdir) + finished = self.tmpdir+'/dd_finished' + self.assertTrue(os.path.exists(finished)) + self.assertEqual(read_lines(finished), [requeststr]) + + def test_all_finished(self): + """all_finished: True if all lines from dd_todo are in dd_finished""" + todo = self.tmpdir+'/dd_todo' + requests = ['one', 'two', 'final thingy'] + with open(todo, 'w') as outf: + outf.write(''.join(r+'\n' for r in requests)) + self.assertEqual(set(read_lines(todo)), set(requests)) + for r in reversed(requests): + self.assertFalse(all_finished(topdir=self.tmpdir)) + mark_finished(r, topdir=self.tmpdir) + self.assertTrue(all_finished(topdir=self.tmpdir)) + + def test_extra_finished(self): + """all_finished: True if dd_finished has more items than dd_todo""" + self.test_all_finished() + mark_finished("BONUS", topdir=self.tmpdir) + self.assertTrue(all_finished(topdir=self.tmpdir)) + + def test_finish(self): + """finish: mark request finished, and write dd.done if all complete""" + todo = self.tmpdir+'/dd_todo' + done = self.tmpdir+'/dd.done' + requests = ['one', 'two', 'final thingy'] + with open(todo, 'w') as outf: + outf.write(''.join(r+'\n' for r in requests)) + for r in reversed(requests): + print("marking %s" % r) + self.assertFalse(os.path.exists(done)) + finish(r, topdir=self.tmpdir) + self.assertTrue(os.path.exists(done)) + +from driver_updates import get_deviceinfo, DeviceInfo +blkid_output = b'''\ +DEVNAME=/dev/sda2 +UUID=0f21a3d1-dcd3-4ab4-a292-c5556850d561 +TYPE=ext4 + +DEVNAME=/dev/sda1 +UUID=C53C-EE46 +TYPE=vfat + +DEVNAME=/dev/sda3 +UUID=4126dbb6-c7d3-47b4-b1fc-9bb461df0067 +TYPE=btrfs + +DEVNAME=/dev/loop0 +UUID=6f16967e-0388-4276-bd8d-b88e5b217a55 +TYPE=ext4 +''' +disk_labels = { + '/dev/sdb1': 'metroid_srv', + '/dev/loop0': 'I\x20\u262d\x20COMMUNISM', + '/dev/sda3': 'metroid_root' +} +devicelist = [ + DeviceInfo(DEVNAME='/dev/sda2', TYPE='ext4', + UUID='0f21a3d1-dcd3-4ab4-a292-c5556850d561'), + DeviceInfo(DEVNAME='/dev/sda1', TYPE='vfat', + UUID='C53C-EE46'), + DeviceInfo(DEVNAME='/dev/sda3', TYPE='btrfs', LABEL='metroid_root', + UUID='4126dbb6-c7d3-47b4-b1fc-9bb461df0067'), + DeviceInfo(DEVNAME='/dev/loop0', TYPE='ext4', + LABEL='I\x20\u262d\x20COMMUNISM', + UUID='6f16967e-0388-4276-bd8d-b88e5b217a55'), +] +# also covers blkid, get_disk_labels, DeviceInfo +class DeviceInfoTestCase(unittest.TestCase): + @mock.patch('driver_updates.subprocess.check_output') + @mock.patch('driver_updates.get_disk_labels') + def test_basic(self, get_disk_labels, check_output): + """get_deviceinfo: parses DeviceInfo from blkid etc.""" + # configure mock objects + check_output.return_value = blkid_output + get_disk_labels.return_value = disk_labels + # now we're getting mock deviceinfo, whee + disks = get_deviceinfo() + self.assertEqual(len(disks), 4) + disks.sort(key=lambda d: d.device) + loop, efi, boot, root = disks + self.assertEqual(vars(boot), vars(devicelist[0])) + self.assertEqual(vars(efi), vars(devicelist[1])) + self.assertEqual(vars(root), vars(devicelist[2])) + self.assertEqual(vars(loop), vars(devicelist[3])) + +# TODO: test TextMenu itself + +# py2/3 compat +import sys +if sys.version_info.major == 3: + from io import StringIO +else: + from io import BytesIO as StringIO + +from driver_updates import device_menu +class DeviceMenuTestCase(unittest.TestCase): + def setUp(self): + patches = ( + mock.patch('driver_updates.get_deviceinfo',return_value=devicelist), + ) + self.mocks = {p.attribute:p.start() for p in patches} + for p in patches: self.addCleanup(p.stop) + + def test_device_menu_exit(self): + """device_menu: 'c' exits the menu""" + with mock.patch('driver_updates._input', side_effect=['c']): + dev = device_menu() + self.assertEqual(dev, []) + self.assertEqual(self.mocks['get_deviceinfo'].call_count, 1) + + def test_device_menu_refresh(self): + """device_menu: 'r' makes the menu refresh""" + with mock.patch('driver_updates._input', side_effect=['r','c']): + device_menu() + self.assertEqual(self.mocks['get_deviceinfo'].call_count, 2) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_device_menu(self, stdout): + """device_menu: choosing a number returns that Device""" + choose_num='2' + with mock.patch('driver_updates._input', return_value=choose_num): + result = device_menu() + # if you hit '2' you should get the corresponding device from the list + self.assertEqual(len(result), 1) + dev = result[0] + self.assertEqual(vars(dev), vars(devicelist[int(choose_num)-1])) + # find the corresponding line on-screen + screen = [l.strip() for l in stdout.getvalue().splitlines()] + match = [l for l in screen if l.startswith(choose_num+')')] + self.assertEqual(len(match), 1) + line = match.pop(0) + # the device name (at least) should be on this line + self.assertIn(os.path.basename(dev.device), line) diff --git a/tests/kickstart_tests/driverdisk-disk.ks b/tests/kickstart_tests/driverdisk-disk.ks new file mode 100644 index 0000000..e8025cd --- /dev/null +++ b/tests/kickstart_tests/driverdisk-disk.ks @@ -0,0 +1,41 @@ +#version=DEVEL +url --url="http://dl.fedoraproject.org/pub/fedora/linux/development/$releasever/$basear..." +install +network --bootproto=dhcp + +keyboard us +lang en_US.UTF-8 +timezone America/New_York --utc +rootpw testcase +shutdown + +bootloader --timeout=1 +zerombr +clearpart --all +autopart + +driverdisk /dev/disk/by-label/TEST_DD + +%packages +@core +%end + +%post --nochroot +SYSROOT=${ANA_INSTALL_PATH:-/mnt/sysimage} +RESULTFILE=$SYSROOT/root/RESULT +fail() { echo "*** $*" >> $RESULTFILE; } + +# check the installer environment +[ -f /lib/modules/`uname -r`/updates/fake-dd.ko ] || fail "kmod not loaded" +[ -f /usr/bin/fake-dd-bin ] || fail "installer-enhancement not loaded" + +# check the installed system +[ -f $SYSROOT/root/fake-dd-2.ko ] || fail "kmod rpm not installed" +[ ! -f $SYSROOT/usr/bin/fake-dd-bin ] || \ + fail "installer-enhancement package installed to target system" + +# write successful result if nothing failed +if [[ ! -e $RESULTFILE ]]; then + echo SUCCESS > $RESULTFILE +fi +%end diff --git a/tests/kickstart_tests/driverdisk-disk.sh b/tests/kickstart_tests/driverdisk-disk.sh new file mode 100755 index 0000000..c510742 --- /dev/null +++ b/tests/kickstart_tests/driverdisk-disk.sh @@ -0,0 +1,33 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see http://www.gnu.org/licenses/. +# +# Author: Will Woods wwoods@redhat.com + +. ${KSTESTDIR}/functions.sh + +prepare_disks() { + local diskdir="$1/disks" + # main disk + qemu-img create -q -f qcow2 ${diskdir}/a.img 10G + echo "${diskdir}/a.img" + + # driverdisk image + ${KSTESTDIR}/../lib/mkdud.py -k -b -L "TEST_DD" ${diskdir}/dd.iso >/dev/null + echo "${diskdir}/dd.iso,device=cdrom,readonly=on" +} + +#kernel_args() { +# echo inst.dd=/dev/disk/by-label/TEST_DD +#} diff --git a/tests/lib/mkdud.py b/tests/lib/mkdud.py new file mode 100755 index 0000000..1fb2b3c --- /dev/null +++ b/tests/lib/mkdud.py @@ -0,0 +1,120 @@ +#!/usr/bin/python +# mkdud.py - test helper that makes driverdisk images +# +# Copyright (c) 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see http://www.gnu.org/licenses/. +# +# Author: Will Woods wwoods@redhat.com + +import os +import rpmfluff +import subprocess +import argparse +import tempfile +import shutil + +from contextlib import contextmanager + +@contextmanager +def in_tempdir(prefix='tmp'): + oldcwd = os.getcwd() + tmpdir = tempfile.mkdtemp(prefix=prefix) + os.chdir(tmpdir) + yield + os.chdir(oldcwd) + shutil.rmtree(tmpdir) + +def parse_args(): + p = argparse.ArgumentParser( + description="make fake driver disk images for testing", + epilog="ex: %(prog)s dd.iso", + ) + p.add_argument("filename", + help="image filename to write") + p.add_argument("--label", "-L", default="OEMDRV", + help="disk image label (default: %(default)s)") + p.add_argument("--description", "-d", default="fake driverdisk", + help="driverdisk description (default: %(default)r)") + p.add_argument("--arch", "-a", default=rpmfluff.expectedArch, + help="arch to create RPMs for (default: host arch [%(default)s])") + p.add_argument("--kmod", "-k", action="store_true", default=False, + help="add a fake kmod to the driverdisk") + p.add_argument("--binary", "-b", action="store_true", default=False, + help="add a fake binary to the driverdisk") + p.add_argument("--createrepo", "-c", action="store_true", default=False, + help="run createrepo to add repodata to the driverdisk") + + return p.parse_args() + +def write_description(desc): + with open("rhdd3",'w') as rhdd3: + rhdd3.write(desc+'\n') + +def make_rpm(pkg, outdir=".", arch=None): + outdir = os.path.abspath(outdir) + with in_tempdir(prefix='mkdud.rpmfluff.'): + pkg.make() + rpmfile = pkg.get_built_rpm(arch or rpmfluff.expectedArch) + outfile = os.path.join(outdir, os.path.basename(rpmfile)) + shutil.move(rpmfile, outfile) + return outfile + +def write_kmod_rpm(outdir, for_kernel_ver=None, arch=None): + pkg = rpmfluff.SimpleRpmBuild('fake_kmod', '1.0', '1') + pkg.add_provides('kernel-modules >= %s' % for_kernel_ver) + pkg.add_installed_file("/lib/modules/%s/extra/fake-dd.ko" % for_kernel_ver, + rpmfluff.SourceFile("fake-dd.ko", "this is a fake kernel module"), + ) + pkg.add_installed_file("/root/fake-dd-2.ko", + rpmfluff.SourceFile("fake-dd-2.ko", "another fake kernel module"), + ) + return make_rpm(pkg, outdir, arch) + +def write_installer_enhancement_rpm(outdir, for_anaconda_ver=None, arch=None): + pkg = rpmfluff.SimpleRpmBuild('fake_bin', '1.0', '1') + pkg.add_provides('installer-enhancement = %s' % for_anaconda_ver) + pkg.add_installed_file("/usr/bin/fake-dd-bin", + rpmfluff.SourceFile("fake-dd-bin", "#!/bin/sh\necho FAKE BINARY OK"), + mode='755', + ) + return make_rpm(pkg, outdir, arch) + +def createrepo(repodir): + return subprocess.check_call(["createrepo", repodir]) + +def mkisofs(outfile, cd_dir, label=None): + cmd = ["mkisofs", "-o", outfile, "-r", "-input-charset", "utf-8"] + if label: + cmd += ["-V", label] + cmd.append(cd_dir) + subprocess.check_call(cmd) + +def main(): + opts = parse_args() + outfile = os.path.abspath(opts.filename) + with in_tempdir(prefix='mkdud.'): + write_description(opts.description) + rpmdir = os.path.join("rpms", opts.arch) + os.makedirs(rpmdir) + if opts.kmod: + write_kmod_rpm(rpmdir, "3.0.0") + if opts.binary: + write_installer_enhancement_rpm(rpmdir, "19.0") + if opts.createrepo: + createrepo(rpmdir) + mkisofs(outfile, cd_dir=".", label=opts.label) + +if __name__ == '__main__': + main()
From: Will Woods wwoods@redhat.com
Okay so: commit 4883b96 moved fetch-kickstart-net.sh from the online hook into initqueue (or initqueue/settled) to fix inst.ks.sendmac.
So when you boot with inst.ks=[URL], the order goes:
1. Network comes up 2. Run `online` hook * Schedule kickstart fetch in initqueue * Other online tasks from boot args (e.g. updates) 3. Run `initqueue` * fetch + run kickstart * add `anaconda-netroot.sh` (etc.) to `online` hook * `udevadm trigger` disk devices * `udevadm trigger` net devices
The problem is, triggering network devices is *not* sufficient to re-run the `online` hook - this only happens when the NIC actually gets _configured_, not when the device is triggered.
The fix is pretty straightforward: re-run the `online` hook for active NICs after we run the kickstart, in order to pick up any new tasks that the kickstart might have scheduled.
Resolves: RHBZ#1238987 Reverts: a66cb157dcfeb418b02555cc8baa9ee8910184b5 --- dracut/anaconda-lib.sh | 20 ++++++++++++++++++-- dracut/fetch-kickstart-net.sh | 1 - 2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/dracut/anaconda-lib.sh b/dracut/anaconda-lib.sh index 9bd13dd..f084ff6 100755 --- a/dracut/anaconda-lib.sh +++ b/dracut/anaconda-lib.sh @@ -183,6 +183,15 @@ parse_kickstart() { [ -e "$parsed_kickstart" ] && cp $parsed_kickstart /run/install/ks.cfg }
+# print a list of net devices that dracut says are set up. +online_netdevs() { + local netif="" + for netif in /tmp/net.*.did-setup; do + netif=${netif#*.}; netif=${netif%.*} + [ -d "/sys/class/net/$netif" ] && echo $netif + done +} + # This is where we actually run the kickstart. Whee! # We can't just add udev rules (we'll miss devices that are already active), # and we can't just run the scripts manually (we'll miss devices that aren't @@ -206,7 +215,7 @@ run_kickstart() { # re-parse new cmdline stuff from the kickstart . $hookdir/cmdline/*parse-anaconda-repo.sh . $hookdir/cmdline/*parse-livenet.sh - # TODO: parse for other stuff ks might set (dd? other stuff?) + . $hookdir/cmdline/*parse-anaconda-dd.sh case "$repotype" in http*|ftp|nfs*) do_net=1 ;; cdrom|hd|bd) do_disk=1 ;; @@ -236,17 +245,24 @@ run_kickstart() { rm /tmp/dd_args_ks fi
- # replay udev events to trigger actions + # disk: replay udev events to trigger actions if [ "$do_disk" ]; then + # set up new rules . $hookdir/pre-trigger/*repo-genrules.sh udevadm control --reload + # trigger the rules for all the block devices we see udevadm trigger --action=change --subsystem-match=block fi + + # net: re-run online hook if [ "$do_net" ]; then # make dracut create the net udev rules (based on the new cmdline) . $hookdir/pre-udev/*-net-genrules.sh udevadm control --reload udevadm trigger --action=add --subsystem-match=net + for netif in $(online_netdevs); do + source_hook initqueue/online $netif + done fi
# and that's it - we're back to the mainloop. diff --git a/dracut/fetch-kickstart-net.sh b/dracut/fetch-kickstart-net.sh index 2051e6d..f26ab25 100755 --- a/dracut/fetch-kickstart-net.sh +++ b/dracut/fetch-kickstart-net.sh @@ -55,7 +55,6 @@ info "anaconda fetching kickstart from $kickstart" if fetch_url "$kickstart" /tmp/ks.cfg; then parse_kickstart /tmp/ks.cfg run_kickstart - $hookdir/initqueue/online/*anaconda-netroot.sh else warn "failed to fetch kickstart from $kickstart" fi
From: Will Woods wwoods@redhat.com
Here's where we actually modify stuff to use `driver_updates.py`:
* Parse commandline options to /tmp/dd_{net,disk,interactive,todo} * Generate udev rules etc. in `driver-updates-genrules.sh` * `fetch-driver-net.sh`: fetch .iso file and call driver-updates * Modify `driver-updates@.service`: * Use `dmesg -n1` and `/proc/sys/kernel/printk` to hide console messages * Make sure we run after vconsole is set up * Use `plymouth --hide-splash` rather than `plymouth quit` (this is how dracut handles emergency shells) * Drop `driver-updates.sh` (which ran driver-updates in `pre-trigger`) * Drop `driver-updates-net.sh` (`driver_updates.py` handles saving repos) * Drop old `driver-updates` * Make necessary changes to `Makefile.am` and `module-setup.sh` * Add `dracut/test_*.py` to `nosetests.sh` * `anaconda-lib.sh`: Add `wait_for_dd`, remove `start_driver_update`, fix `run_kickstart` for the new stuff above --- dracut/Makefile.am | 6 +- dracut/anaconda-lib.sh | 50 +-- dracut/driver-updates | 830 -------------------------------------- dracut/driver-updates-genrules.sh | 23 ++ dracut/driver-updates-net.sh | 10 - dracut/driver-updates.sh | 23 -- dracut/driver-updates@.service | 12 +- dracut/fetch-driver-net.sh | 36 +- dracut/module-setup.sh | 7 +- dracut/parse-anaconda-dd.sh | 32 +- tests/nosetests.sh | 2 +- 11 files changed, 85 insertions(+), 946 deletions(-) delete mode 100755 dracut/driver-updates create mode 100644 dracut/driver-updates-genrules.sh delete mode 100755 dracut/driver-updates-net.sh delete mode 100755 dracut/driver-updates.sh
diff --git a/dracut/Makefile.am b/dracut/Makefile.am index ae704e9..44a5b46 100644 --- a/dracut/Makefile.am +++ b/dracut/Makefile.am @@ -44,9 +44,9 @@ dist_dracut_SCRIPTS = module-setup.sh \ parse-anaconda-dd.sh \ fetch-driver-net.sh \ driver-updates@.service \ - driver-updates.sh \ - driver-updates-net.sh \ + driver-updates-genrules.sh \ anaconda-depmod.sh \ - driver-updates + driver_updates.py \ + test_driver_updates.py
MAINTAINERCLEANFILES = Makefile.in diff --git a/dracut/anaconda-lib.sh b/dracut/anaconda-lib.sh index f084ff6..9e74e3d 100755 --- a/dracut/anaconda-lib.sh +++ b/dracut/anaconda-lib.sh @@ -216,39 +216,21 @@ run_kickstart() { . $hookdir/cmdline/*parse-anaconda-repo.sh . $hookdir/cmdline/*parse-livenet.sh . $hookdir/cmdline/*parse-anaconda-dd.sh - case "$repotype" in - http*|ftp|nfs*) do_net=1 ;; - cdrom|hd|bd) do_disk=1 ;; - esac - [ "$root" = "anaconda-auto-cd" ] && do_disk=1 - - # kickstart Driver Disk Handling - # parse-kickstart may have added network inst.dd entries to the cmdline - # Or it may have written devices to /tmp/dd_ks - - # Does network need to be rerun? - dd_args="$(getargs dd= inst.dd=)" - for dd in $dd_args; do - case "${dd%%:*}" in - http|https|ftp|nfs|nfs4) - do_net=1 - rm /tmp/dd_net.done - break - ;; - esac - done
- # Run the driver update UI for disks - if [ -e "/tmp/dd_args_ks" ]; then - # TODO: Seems like this should be a function, a mostly same version is used in 3 places - start_driver_update "Kickstart Driver Update Disk" - rm /tmp/dd_args_ks - fi + # Figure out whether we need to retry disk/net stuff + case "$root" in + anaconda-net:*) do_net=1 ;; + anaconda-disk:*) do_disk=1 ;; + anaconda-auto-cd) do_disk=1 ;; + esac + [ -f /tmp/dd_net ] && do_net=1 + [ -f /tmp/dd_disk ] && do_disk=1
# disk: replay udev events to trigger actions if [ "$do_disk" ]; then # set up new rules . $hookdir/pre-trigger/*repo-genrules.sh + . $hookdir/pre-trigger/*driver-updates-genrules.sh udevadm control --reload # trigger the rules for all the block devices we see udevadm trigger --action=change --subsystem-match=block @@ -277,16 +259,6 @@ wait_for_updates() { echo "[ -e /tmp/liveupdates.done ]" > $hookdir/initqueue/finished/updates.sh }
-start_driver_update() { - local title="$1" - - tty=$(find_tty) - - # save module state - cat /proc/modules > /tmp/dd_modules - - info "Starting $title Service on $tty" - systemctl start driver-updates@$tty.service - status=$(systemctl -p ExecMainStatus show driver-updates@$tty.service) - info "DD status=$status" +wait_for_dd() { + echo "[ -e /tmp/dd.done ]" > $hookdir/initqueue/finished/dd.sh } diff --git a/dracut/driver-updates b/dracut/driver-updates deleted file mode 100755 index 373884f..0000000 --- a/dracut/driver-updates +++ /dev/null @@ -1,830 +0,0 @@ -#!/usr/bin/python -# -# Copyright (C) 2013 by Red Hat, Inc. All rights reserved. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses/. -# -# Author(s): Brian C. Lane bcl@brianlane.com -# -""" -Driver Update Disk UI - -/tmp/dd_modules is a copy of /proc/modules at startup time -/tmp/dd_args is a parsed list of the inst.dd= cmdline args, and may include - 'dd' or 'inst.dd' if it was specified without arguments -/tmp/dd_args_ks is the same format, but skips processing existing OEMDRV devices. - -Pass a path and it will install the driver rpms from the path before checking -for new OEMDRV devices. - -Repositories for installed drivers are copied into /run/install/DD-X where X -starts at 1 and increments for each repository. - -Selected driver package names are saved in /run/install/dd_packages - -Anaconda uses the repository and package list to install the same set of drivers -to the target system. -""" -import logging -from logging.handlers import SysLogHandler -import sys -import os -import subprocess -import time -import glob - -log = logging.getLogger("DD") - - -class RunCmdError(Exception): - """ Raised when run_cmd gets a non-zero returncode - """ - pass - - -def run_cmd(cmd): - """ Run a command, collect stdout and the returncode. stderr is ignored. - - :param cmd: command and arguments to run - :type cmd: list of strings - :returns: exit code and stdout from the command - :rtype: (int, string) - :raises: OSError if the cmd doesn't exist, RunCmdError if the rc != 0 - """ - try: - with open("/dev/null", "w") as fd_null: - log.debug(" ".join(cmd)) - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=fd_null) - out = proc.communicate()[0] - if out: - for line in out.splitlines(): - log.debug(line) - except OSError as e: - log.error("Error running %s: %s", cmd[0], e.strerror) - raise - if proc.returncode: - log.debug("%s returned %s", cmd[0], proc.returncode) - raise RunCmdError() - return (proc.returncode, out) - - -def oemdrv_list(): - """ Get a list of devices labeled as OEMDRV - - :returns: list of devices - :rtype: list - """ - try: - outlines = run_cmd(["blkid", "-t", "LABEL=OEMDRV", "-o", "device"])[1] - except (OSError, RunCmdError): - # Nothing with that label - return [] - else: - return outlines.splitlines() - - -def get_dd_args(): - """ Get the dd arguments from /tmp/dd_args or /tmp/dd_args_ks - - :returns: List of arguments - :rtype: list of strings - """ - net_protocols = ["http", "https", "ftp", "nfs", "nfs4"] - args = [] - for dd_args_file in ["/tmp/dd_args", "/tmp/dd_args_ks"]: - if not os.path.exists(dd_args_file): - continue - try: - dd_args = open(dd_args_file, "r").readline().split() - except IOError: - return [] - - # skip dd args that need networking - args.extend(filter(lambda x: x.split(":")[0].lower() not in net_protocols, dd_args)) - return args - - -def is_interactive(): - """ Determine if the user requested interactive driver selection - - :returns: True if 'dd' or 'inst.dd' included in /tmp/dd_args False if not - :rtype: bool - """ - dd_args = get_dd_args() - if "dd" in dd_args or "inst.dd" in dd_args: - return True - else: - return False - - -def umount(device): - """ Unmount the device - - :param device: Device or mountpoint to unmount - :type device: string - :returns: None - """ - if not device: - return - - try: - run_cmd(["umount", device]) - except (OSError, RunCmdError): - pass - - -def mount_device(device, mnt="/media/DD/"): - """ Mount a device and check to see if it really is a driver disk - - :param device: path to device to mount - :type device: string - :param mnt: path to mount the device on - :type mnt: string - :returns: True if it is a DD, False if not - :rtype: bool - - It is unmounted if it is not a DD and left mounted if it is. - """ - try: - run_cmd(["mount", device, mnt]) - except (OSError, RunCmdError): - return False - return True - - -def copy_repo(dd_path, dest_prefix): - """ Copy the current arch's repository to a unique destination - - :param dd_path: Path to the driver repo directory - :type dd_path: string - :param dest_prefix: Destination directory prefix, a number is added - :type dest_prefix: string - :returns: None - - The destination directory names are in the order that the drivers - were loaded, starting from 1 - """ - suffix = 1 - while os.path.exists(dest_prefix+str(suffix)): - suffix += 1 - dest = dest_prefix+str(suffix) - os.makedirs(dest) - try: - run_cmd(["cp", "-ar", dd_path, dest]) - except (OSError, RunCmdError): - pass - - -def copy_file(src, dest): - """ Copy a file - - :param src: Source file - :type src: string - :param dest: Destination file - :type dest: string - :returns: None - """ - try: - run_cmd(["cp", "-a", src, dest]) - except (OSError, RunCmdError): - pass - - -def move_file(src, dest): - """ Move a file - - :param src: Source file - :type src: string - :param dest: Destination file - :type dest: string - :returns: None - """ - try: - run_cmd(["mv", "-f", src, dest]) - except (OSError, RunCmdError): - pass - - -def find_dd(mnt="/media/DD"): - """ Find all suitable DD repositories under a path - - :param mnt: Top of the directory tree to search - :type mnt: string - :returns: list of DD repositories - :rtype: list - """ - dd_repos = [] - arch = os.uname()[4] - for root, dirs, files in os.walk(mnt, followlinks=True): - if "rhdd3" in files and "rpms" in dirs and \ - os.path.exists(root+"/rpms/"+arch): - dd_repos.append(root+"/rpms/"+arch) - log.debug("Found repos - %s", " ".join(dd_repos)) - return dd_repos - - -def get_module_set(fname): - """ Read a module list and return a set of the names - - :param fname: Full path to filename - :type fname: string - :returns: set of the module names - """ - modules = set() - if os.path.exists(fname): - with open(fname, "r") as f: - for line in f: - mod_args = line.strip().split() - if mod_args: - modules.update([mod_args[0]]) - return modules - -def to_modname(modfile): - return os.path.basename(modfile)[:-3].replace('-','_') - -def reload_modules(newfiles): - """ Reload new module versions from /lib/modules/<kernel>/updates/ - """ - try: - run_cmd(["depmod", "-a"]) - except (OSError, RunCmdError): - pass - - # Make a list of modules added since startup - startup_modules = get_module_set("/tmp/dd_modules") - current_modules = get_module_set("/proc/modules") - new_modules = current_modules.difference(startup_modules) - log.debug("new_modules = %s", " ".join(new_modules)) - - # And a list of modules contained in the disk we just extracted - dd_modules = set(to_modname(f) for f in newfiles if f.endswith(".ko")) - # TODO: what modules do we unload when there's new firmware? - log.debug("dd_modules = %s", " ".join(dd_modules)) - new_modules.update(dd_modules) - - # I think we can just iterate once using modprobe -r to remove unused deps - for module in new_modules: - try: - run_cmd(["modprobe", "-r", module]) - except (OSError, RunCmdError): - pass - - time.sleep(2) - - # Reload the modules, using the new versions from /lib/modules/<kernel>/updates/ - try: - run_cmd(["udevadm", "trigger"]) - except (OSError, RunCmdError): - pass - - -class Driver(object): - def __init__(self): - self.source = "" - self.name = "" - self.flags = "" - self.description = [] - self.selected = False - - @property - def args(self): - return ["--%s" % a for a in self.flags.split()] - - @property - def rpm(self): - return self.source - - -def fake_drivers(num): - """ Generate a number of fake drivers for testing - """ - drivers = [] - for i in xrange(0, num): - d = Driver() - d.source = "driver-%d" % i - d.flags = "modules" - drivers.append(d) - return drivers - - -def dd_list(dd_path, kernel_ver=None, anaconda_ver=None): - """ Build a list of the drivers in the directory - - :param dd_path: Path to the driver repo - :type dd_path: string - :returns: list of drivers - :rtype: Driver object - - By default none of the drivers are selected - """ - if not kernel_ver: - kernel_ver = os.uname()[2] - if not anaconda_ver: - anaconda_ver = "19.0" - - try: - outlines = run_cmd(["dd_list", "-k", kernel_ver, "-a", anaconda_ver, "-d", dd_path])[1] - except (OSError, RunCmdError): - return [] - - # Output format is: - # source rpm\n - # name\n - # flags\n - # description (multi-line)\n - # ---\n - drivers = [] - new_driver = Driver() - line_idx = 0 - for line in outlines.splitlines(): - log.debug(line) - if line == "---": - drivers.append(new_driver) - new_driver = Driver() - line_idx = 0 - elif line_idx == 0: - new_driver.source = line - line_idx += 1 - elif line_idx == 1: - new_driver.name = line - line_idx += 1 - elif line_idx == 2: - new_driver.flags = line - line_idx += 1 - elif line_idx == 3: - new_driver.description.append(line) - - return drivers - - -def dd_extract(driver, dest_path="/updates/", kernel_ver=None): - """ Extract a driver rpm to a destination path - - :param driver: Driver to extract - :type driver: Driver object - :param dest_path: Top directory of the destination path - :type dest_path: string - :returns: list of paths to extracted firmware and modules - - This extracts the driver's files into 'dest_path' (which defaults - to /updates/ so that the normal live updates handling will overlay - any binary or library updates onto the initrd automatically. - """ - if not kernel_ver: - kernel_ver = os.uname()[2] - - cmd = ["dd_extract", "-k", kernel_ver] - cmd += driver.args - cmd += ["--rpm", driver.rpm, "--directory", dest_path] - log.info("Extracting files from %s", driver.rpm) - - # make sure the to be used directory exists - if not os.path.isdir(dest_path): - os.makedirs(dest_path) - - try: - run_cmd(cmd) - except (OSError, RunCmdError): - log.error("dd_extract failed, skipped %s", driver.rpm) - return - - # Create the destination directories - initrd_updates = "/lib/modules/" + os.uname()[2] + "/updates/" - ko_updates = dest_path + initrd_updates - initrd_firmware = "/lib/firmware/updates/" - firmware_updates = dest_path + initrd_firmware - for d in (initrd_updates, ko_updates, initrd_firmware, firmware_updates): - if not os.path.exists(d): - os.makedirs(d) - - filelist = [] - - # Copy *.ko files over to /updates/lib/modules/<kernel>/updates/ - for root, _dirs, files in os.walk(dest_path+"/lib/modules/"): - if root.endswith("/updates") and os.path.isdir(root): - continue - for f in (f for f in files if f.endswith(".ko")): - src = root+"/"+f - filelist.append(src) - copy_file(src, ko_updates) - move_file(src, initrd_updates) - - # Copy the firmware updates - for root, _dirs, files in os.walk(dest_path+"/lib/firmware/"): - if root.endswith("/updates") and os.path.isdir(root): - continue - for f in (f for f in files): - src = root+"/"+f - filelist.append(src) - copy_file(src, firmware_updates) - move_file(src, initrd_firmware) - - # Tell our caller about the newly-extracted stuff - return filelist - - -# an arbitrary value to signal refreshing the menu contents -DoRefresh = True - -def selection_menu(items, title, info_func, multi_choice=True, refresh=False): - """ Display menu and let user select one or more choices. - - :param items: list of items - :type items: list of objects (with the 'selected' property/attribute if - multi_choice=True is used) - :param title: title for the menu - :type title: str - :param info_func: function providing info about items - :type info_func: item -> str - :param multi_choice: whether it is a multiple choice menu or not - :type multi_choice: bool - :returns: the selected item in case of multi_choice=False and user did - selection, None otherwise - """ - - page_length = 20 - page = 1 - num_pages = len(items) / page_length - if len(items) % page_length > 0: - num_pages += 1 - - if multi_choice: - choice_format = "[%s]" - else: - choice_format = "" - format_str = "%3d) " + choice_format + " %s" - - while True: - # show a page of items - print("\nPage %d of %d" % (page, num_pages)) - print(title) - if page * page_length <= len(items): - num_items = page_length - else: - num_items = len(items) % page_length - for i in xrange(0, num_items): - item_idx = ((page-1) * page_length) + i - if multi_choice: - if items[item_idx].selected: - selected = "x" - else: - selected = " " - args = (i+1, selected, info_func(items[item_idx])) - else: - args = (i+1, info_func(items[item_idx])) - print(format_str % args) - - # Select an item to toggle, continue or change pages - opts = ["# to select", - "'n'-next page", - "'p'-previous page", - "'c'-continue"] - if multi_choice: - opts[0] = "# to toggle selection" - if refresh: - opts.insert(1,"'r'-refresh") - idx = raw_input(''.join(['\n', - ", ".join(opts[:-1]), - " or ", opts[-1], ": "])) - if idx.isdigit() and not (int(idx) < 1 or int(idx) > num_items): - item_idx = ((page-1) * page_length) + int(idx) - 1 - if multi_choice: - items[item_idx].selected = not items[item_idx].selected - else: - # single choice only, we can return now - return items[item_idx] - elif idx.lower() == 'n': - if page < num_pages: - page += 1 - else: - print("Last page") - elif idx.lower() == 'p': - if page > 1: - page -= 1 - else: - print("First page") - elif idx.lower() == 'r' and refresh: - return DoRefresh - elif idx.lower() == 'c': - return - else: - print("Invalid selection") - -def select_drivers(drivers): - """ Display pages of drivers to be loaded. - - :param drivers: Drivers to be selected by the user - :type drivers: list of Driver objects - :returns: None - """ - if not drivers: - return - - selection_menu(drivers, "Select drivers to install", - lambda driver: driver.source) - -def process_dd(dd_path): - """ Handle installing modules, firmware, enhancements from the dd repo - - :param dd_path: Path to the driver repository - :type dd_path: string - :returns: None - """ - drivers = dd_list(dd_path) - log.debug("drivers = %s", " ".join([d.rpm for d in drivers])) - - # If interactive mode or rhdd3.rules pass flag to deselect by default? - if os.path.exists(dd_path+"/rhdd3.rules") or is_interactive(): - select_drivers(drivers) - if not any((d.selected for d in drivers)): - return - else: - map(lambda d: setattr(d, "selected", True), drivers) - - # Copy the repository for Anaconda to use during install - copy_repo(dd_path, "/updates/run/install/DD-") - - extracted = [] - - for driver in filter(lambda d: d.selected, drivers): - extracted += dd_extract(driver, "/updates/") - - # Write the package names for all modules and firmware for Anaconda - if "modules" in driver.flags or "firmwares" in driver.flags: - with open("/run/install/dd_packages", "a") as f: - f.write("%s\n" % driver.name) - - reload_modules(extracted) - - -def select_dd(device): - """ Mount a device and check it for Driver Update repos - - :param device: Path to the device to mount and check - :type device: string - :returns: None - """ - mnt = "/media/DD/" - if not os.path.isdir(mnt): - os.makedirs(mnt) - if not mount_device(device, mnt): - return - - dd_repos = find_dd(mnt) - for repo in dd_repos: - log.info("Processing DD repo %s on %s", repo, device) - process_dd(repo) - - # TODO - does this need to be done before module reload? - umount(device) - - -def network_driver(dd_path): - """ Handle network driver download, then scan for new OEMDRV devices. - - :param dd_path: Path to the downloaded driver rpms - :type dd_path: string - :returns: None - """ - skip_dds = set(oemdrv_list()) - - log.info("Processing Network Drivers from %s", dd_path) - isos = glob.glob(os.path.join(dd_path, "*.iso")) - for iso in isos: - select_dd(iso) - - process_dd(dd_path) - - # TODO: May need to add new drivers to /tmp/dd_modules to prevent them from being unloaded - - # Scan for new OEMDRV devices and ignore dd_args - dd_scan(skip_dds, scan_dd_args=False, skip_device_menu=True) - -class DeviceInfo(object): - def __init__(self, **kwargs): - self.device = kwargs.get("device", None) - self.label = kwargs.get("label", None) - self.uuid = kwargs.get("uuid", None) - self.fs_type = kwargs.get("fs_type", None) - - def __str__(self): - return "%-10s %-20s %-15s %s" % (self.device or "", self.fs_type or "", - self.label or "", self.uuid or "") - -def parse_blkid(line): - """ Parse a line of output from blkid - - :param line: line of output from blkid - :param type: string - :returns: {} or dict of NAME=VALUE pairs including "device" - :rtype: dict - - blkid output cannot be trusted. labels may be missing or in a different - order so we parse what we get and return a dict with their values. - """ - import shlex - - device = {"device":None, "label":None, "uuid":None, "fs_type":None} - fields = shlex.split(line) - if len(fields) < 2 or not fields[0].startswith("/dev/"): - return {} - - # device is in [0] and the remainder are NAME=VALUE with possible spaces - # Use the sda1 part of device "/dev/sda1:" - device['device'] = fields[0][5:-1] - for f in fields[1:]: - if "=" in f: - (key, val) = f.split("=", 1) - if key == "TYPE": - key = "fs_type" - device[key.lower()] = val - return device - -def select_iso(): - """ Let user select device and DD ISO on it. - - :returns: path to the selected ISO file and mountpoint to be unmounted - or (None, None) if no ISO file is selected - :rtype: (str, str) - """ - header = " %-10s %-20s %-15s %s" % ("DEVICE", "TYPE", "LABEL", "UUID") - - iso_dev = DoRefresh - while iso_dev is DoRefresh: - try: - _ret, out = run_cmd(["blkid"]) - except (OSError, RunCmdError): - return (None, None) - - devices = [] - for line in out.splitlines(): - dev = parse_blkid(line) - if dev: - devices.append(DeviceInfo(**dev)) - - iso_dev = selection_menu(devices, - "Driver disk device selection\n" + header, - str, multi_choice=False, refresh=True) - - if not iso_dev: - return (None, None) - - mnt = "/media/DD-search" - if not os.path.isdir(mnt): - os.makedirs(mnt) - if not mount_device("/dev/" + iso_dev.device, mnt): - print("===Cannot mount the chosen device!===\n") - return select_iso() - - # is this device a Driver Update Disc? - if find_dd(mnt): - umount(mnt) # BLUH. unmount it first so select_dd can mount it OK - return ("/dev/" + iso_dev.device, None) - - # maybe it's a device containing multiple DUDs - let the user pick one - isos = list() - for dir_path, _dirs, files in os.walk(mnt): - # trim the mount point path - rel_dir = dir_path[len(mnt):] - - # and the starting "/" (if any) - if rel_dir.startswith("/"): - rel_dir = rel_dir[1:] - - isos += (os.path.join(rel_dir, iso_file) - for iso_file in files if iso_file.endswith(".iso")) - - if not isos: - print("===No ISO files found on %s!===\n" % iso_dev.device) - umount(mnt) - return select_iso() - else: - # mount writes out some mounting information, add blank line - print - - # let user choose the ISO file - dd_iso = selection_menu(isos, "Choose driver disk ISO file", - lambda iso_file: iso_file, - multi_choice=False) - - if not dd_iso: - return (None, None) - - return (os.path.join(mnt, dd_iso), "/media/DD-search") - -def dd_scan(skip_dds=None, scan_dd_args=True, skip_device_menu=False): - """ Scan the system for OEMDRV devices and and specified by dd=/dev/<device> - - :param skip_dds: devices to skip when checking for OEMDRV label - :type skip_dds: set() - :param scan_dd_args: Scan devices passed in /tmp/dd_args or dd_args_ks - :type scan_dd_args: bool - :returns: None - """ - dd_todo = set(oemdrv_list()) - - if skip_dds is None: - skip_dds = set() - - if skip_dds: - dd_todo.difference_update(skip_dds) - if dd_todo: - log.info("Found new OEMDRV device(s) - %s", ", ".join(dd_todo)) - - if scan_dd_args: - # Add the user specified devices - dd_devs = get_dd_args() - dd_devs = [dev for dev in dd_devs if dev not in ("dd", "inst.dd")] - dd_todo.update(dd_devs) - log.info("Checking devices %s", ", ".join(dd_todo)) - - # Process each Driver Disk, checking for new disks after each one - dd_finished = dd_load(dd_todo, skip_dds=skip_dds) - skip_dds.update(dd_finished) - - # Skip interactive selection of an iso if OEMDRV was found - if skip_dds or skip_device_menu or not is_interactive(): - return - - # Handle interactive driver selection - mount_point = None - while True: - iso, mount_point = select_iso() - if iso: - if iso in skip_dds: - skip_dds.remove(iso) - dd_load(set([iso]), skip_dds=skip_dds) - # NOTE: we intentionally do not add the newly-loaded device to - # skip_dds - the user might (e.g.) swap DVDs and use /dev/sr0 twice - umount(mount_point) - else: - break - -def dd_load(dd_todo, skip_dds=None): - """ Process each Driver Disk, checking for new disks after each one. - Return the set of devices that loaded stuff from. - - :param dd_todo: devices to load drivers from - :type dd_todo: set - :param skip_dds: devices to skip when checking for OEMDRV label - :type skip_dds: set - :returns: set of devices that have been loaded - """ - if skip_dds is None: - skip_dds = set() - - dd_finished = set() - while dd_todo: - device = dd_todo.pop() - if device in skip_dds: - continue - log.info("Checking device %s", device) - select_dd(device) - dd_finished.add(device) - new_oemdrv = set(oemdrv_list()).difference(dd_finished, dd_todo) - if new_oemdrv: - log.info("Found new OEMDRV device(s) - %s", ", ".join(new_oemdrv)) - dd_todo.update(new_oemdrv) - return dd_finished - -if __name__ == '__main__': - log.setLevel(logging.DEBUG) - handler = SysLogHandler(address="/dev/log") - log.addHandler(handler) - handler = logging.StreamHandler() - handler.setLevel(logging.INFO) - formatter = logging.Formatter("DD: %(message)s") - handler.setFormatter(formatter) - log.addHandler(handler) - - if len(sys.argv) > 1: - # Network driver source - network_driver(sys.argv[1]) - elif os.path.exists("/tmp/DD-net/"): - network_driver("/tmp/DD-net/") - elif os.path.exists("/tmp/dd_args_ks"): - # Kickstart driverdisk command, skip existing OEMDRV devices and - # process cmdline dd entries. This will process any OEMDRV that - # appear after loading the other drivers. - skip_devices = set(oemdrv_list()) - dd_scan(skip_devices, skip_device_menu=True) - else: - # Process /tmp/dd_args and OEMDRV devices - # Show device selection menu when inst.dd passed and no OEMDRV devices - dd_scan() - - sys.exit(0) - diff --git a/dracut/driver-updates-genrules.sh b/dracut/driver-updates-genrules.sh new file mode 100644 index 0000000..b20dbe2 --- /dev/null +++ b/dracut/driver-updates-genrules.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +command -v wait_for_dd >/dev/null || . /lib/anaconda-lib.sh + +# Don't leave initqueue until we've finished with the requested dd stuff +[ -f /tmp/dd_todo ] && wait_for_dd + +if [ -f /tmp/dd_interactive ]; then + initqueue --onetime --settled --name zz_dd_interactive \ + systemctl start driver-updates@$(find_tty).service +fi + +# Run driver-updates for LABEL=OEMDRV and any other requested disk +for dd in LABEL=OEMDRV $(cat /tmp/dd_disk); do + when_diskdev_appears "$(disk_to_dev_path $dd)" \ + driver-updates --disk $dd $devnode +done + +# force us to wait at least until we've settled at least once +echo '> /tmp/settle.done' > $hookdir/initqueue/settled/settle_done.sh +echo '[ -f /tmp/settle.done ]' > $hookdir/initqueue/finished/wait_for_settle.sh + +# NOTE: dd_net is handled by fetch-driver-net.sh diff --git a/dracut/driver-updates-net.sh b/dracut/driver-updates-net.sh deleted file mode 100755 index f8bed52..0000000 --- a/dracut/driver-updates-net.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh -[ -e /tmp/DD-net ] || return 0 - -command -v getarg >/dev/null || . /lib/dracut-lib.sh -. /lib/anaconda-lib.sh - -if [ -n "$(ls /tmp/DD-net)" ]; then - start_driver_update "Network Driver Update Disk" - rm -rf /tmp/DD-net -fi diff --git a/dracut/driver-updates.sh b/dracut/driver-updates.sh deleted file mode 100755 index fca0b83..0000000 --- a/dracut/driver-updates.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# Determine if a Driver Update Disk is present, or inst.dd passed on the cmdline -# and launch the driver update systemd service - -# load all modules -udevadm trigger -udevadm settle - -# Look for devices with the OEMDRV label -blkid -t LABEL=OEMDRV > /dev/null -blkid_rc=$? - -command -v getarg >/dev/null || . /lib/dracut-lib.sh -dd_args="$(getargs dd= inst.dd=)" -if [ -n "$dd_args" -o $blkid_rc -eq 0 ]; then - command -v getarg >/dev/null || . /lib/dracut-lib.sh - . /lib/anaconda-lib.sh - - # kludge to let kernel spit out some extra info w/o stomping on our UI - sleep 5 - echo "$dd_args" > /tmp/dd_args - start_driver_update "Driver Update Disk" -fi diff --git a/dracut/driver-updates@.service b/dracut/driver-updates@.service index 210bcd5..e8c876e 100644 --- a/dracut/driver-updates@.service +++ b/dracut/driver-updates@.service @@ -1,15 +1,19 @@ [Unit] Description=Driver Update Disk UI on %I DefaultDependencies=no +After=systemd-vconsole-setup.service +Wants=systemd-vconsole-setup.service Before=shutdown.target
[Service] Type=oneshot RemainAfterExit=no -WorkingDirectory=/tmp -Environment=LANG=en_US.UTF-8 -ExecStartPre=-/bin/plymouth quit -ExecStart=/bin/driver-updates +WorkingDirectory=/ +ExecStartPre=-/bin/plymouth --hide-splash +ExecStartPre=-/bin/cp -n /proc/sys/kernel/printk /tmp/printk +ExecStartPre=-/bin/dmesg -n 1 +ExecStart=/bin/driver-updates --interactive +ExecStopPost=-/bin/cp /tmp/printk /proc/sys/kernel/printk StandardInput=tty-force StandardOutput=inherit StandardError=inherit diff --git a/dracut/fetch-driver-net.sh b/dracut/fetch-driver-net.sh index 5adc39b..1632d00 100755 --- a/dracut/fetch-driver-net.sh +++ b/dracut/fetch-driver-net.sh @@ -5,26 +5,20 @@ # initqueue/online hook passes interface name as $1 netif="$1"
-# We already processed the dd_args - exit -[ -e /tmp/dd_net.done ] && return 0 - -command -v getarg >/dev/null || . /lib/dracut-lib.sh -dd_args="$(getargs dd= inst.dd=)" -[ -n "$dd_args" ] || return 0 +# No dd_net was requested - exit +[ -f /tmp/dd_net ] || return 0
. /lib/url-lib.sh -dd_repo=/tmp/DD-net/ -for dd in $dd_args; do - case "${dd%%:*}" in - http|https|ftp|nfs|nfs4) - [ -e "$dd_repo" ] || mkdir -p $dd_repo - info "Fetching driver from $dd" - if driver=$(fetch_url "$dd"); then - mv "$driver" $dd_repo - else - warn "Failed to fetch driver from $dd" - fi - ;; - esac -done -echo > /tmp/dd_net.done + +while read dd; do + # If we already fetched this URL, skip it + grep -Fqx "$dd" /tmp/dd_net.done && continue + # Otherwise try to fetch it + info "Fetching driverdisk from $dd" + if driver=$(fetch_url "$dd"); then + echo "$dd" >> /tmp/dd_net.done # mark it done so we don't fetch it again + driver-updates --net "$dd" "$driver" + else + warn "Failed to fetch driver from $dd" + fi +done < /tmp/dd_net diff --git a/dracut/module-setup.sh b/dracut/module-setup.sh index 287a61e..7e2df9f 100755 --- a/dracut/module-setup.sh +++ b/dracut/module-setup.sh @@ -46,16 +46,15 @@ install() { inst "$moddir/parse-kickstart" "/sbin/parse-kickstart" # Driver Update Disks inst_hook cmdline 29 "$moddir/parse-anaconda-dd.sh" + inst_hook pre-trigger 55 "$moddir/driver-updates-genrules.sh" inst_hook initqueue/online 20 "$moddir/fetch-driver-net.sh" - inst_hook pre-trigger 40 "$moddir/driver-updates.sh" - inst_hook pre-pivot 10 "$moddir/driver-updates-net.sh" inst_hook pre-pivot 50 "$moddir/anaconda-depmod.sh" - inst "$moddir/driver-updates" "/bin/driver-updates" + inst "$moddir/driver_updates.py" "/bin/driver-updates" inst_simple "$moddir/driver-updates@.service" "/etc/systemd/system/driver-updates@.service" # rpm configuration file (needed by dd_extract) inst "/usr/lib/rpm/rpmrc" # python deps for parse-kickstart. DOUBLE WOOOO - $moddir/python-deps $moddir/parse-kickstart $moddir/driver-updates | while read dep; do + $moddir/python-deps $moddir/parse-kickstart $moddir/driver_updates.py | while read dep; do case "$dep" in *.so) inst_library $dep ;; *.py) inst_simple $dep ;; diff --git a/dracut/parse-anaconda-dd.sh b/dracut/parse-anaconda-dd.sh index ebabd07..ee65501 100755 --- a/dracut/parse-anaconda-dd.sh +++ b/dracut/parse-anaconda-dd.sh @@ -1,18 +1,28 @@ #!/bin/bash # parse-anaconda-dd.sh: handle driver update disk settings
-# no need to do this twice -[ -f /tmp/dd_net.done ] && return +# Creates the following files: +# /tmp/dd_net: list of URLs to fetch +# /tmp/dd_disk: list of disk devices to load from +# /tmp/dd_interactive: "menu" if interactive mode requested +# /tmp/dd_todo: concatenation of the above files
-command -v getarg >/dev/null || . /lib/dracut-lib.sh +# clear everything to ensure idempotency +rm -f /tmp/dd_interactive /tmp/dd_net /tmp/dd_disk /tmp/dd_todo
-# inst.dd: May provide a "URI" for the driver rpm (possibly more than one) -dd_args="$(getargs dd= inst.dd=)" -for dd in $dd_args; do - case "${dd%%:*}" in - http|https|ftp|nfs|nfs4) - set_neednet - break - ;; +# parse any dd/inst.dd args found +for dd in $(getargs dd= inst.dd=); do + case "$dd" in + # plain 'dd'/'inst.dd': Engage interactive mode! + dd|inst.dd) echo menu > /tmp/dd_interactive ;; + # network URLs: add to dd_net + http:*|https:*|ftp:*|nfs:*|nfs4:*) echo $dd >> /tmp/dd_net ;; + # disks: strip "cdrom:" or "hd:" and add to dd_disk + cdrom:*|hd:*) echo ${dd#*:} >> /tmp/dd_disk ;; + # anything else is assumed to be a disk + *) echo $dd >> /tmp/dd_disk esac done + +# for convenience's sake, mash 'em all into one list +cat /tmp/dd_net /tmp/dd_disk /tmp/dd_interactive > /tmp/dd_todo diff --git a/tests/nosetests.sh b/tests/nosetests.sh index e0b1c1e..6db3c3d 100755 --- a/tests/nosetests.sh +++ b/tests/nosetests.sh @@ -8,7 +8,7 @@ fi
# If no tests were selected, select all of them if [ $# -eq 0 ]; then - set -- "${top_srcdir}"/tests/*_tests + set -- "${top_srcdir}"/tests/*_tests "${top_srcdir}"/dracut/test_*.py fi
exec nosetests -v --exclude=logpicker -a !acceptance,!slow "$@"
From: Will Woods wwoods@redhat.com
If the user specified some NTP servers, write them to the configfiles, regardless of whether the service(s) are actually enabled.
(Maybe you want to configure it now but turn it on later? I dunno, man.)
Resolves: RHBZ#1197575 --- pyanaconda/kickstart.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pyanaconda/kickstart.py b/pyanaconda/kickstart.py index a8f14be..0b1ed25 100644 --- a/pyanaconda/kickstart.py +++ b/pyanaconda/kickstart.py @@ -1661,7 +1661,7 @@ def execute(self, *args): timezone.write_timezone_config(self, iutil.getSysroot())
# write out NTP configuration (if set) - if not self.nontp and self.ntpservers: + if self.ntpservers: chronyd_out_path = os.path.normpath(iutil.getSysroot() + ntp.CHRONY_CONFIG_FILE) ntpd_out_path = os.path.normpath(iutil.getSysroot() + ntp.NTP_CONFIG_FILE) try:
From: Will Woods wwoods@redhat.com
- use constant for ANACONDAVER
- rename constants to uppercase (ARCH, KERNELVER, MODULE_UPDATES_DIR, FIRMWARE_UPDATES_DIR)
- extract_drivers(): explicitly refuse to be called with drivers=.. and repos=.. so we don't construct a temporary list
- add list_drivers(), use that wherever we were doing [d for r in repos for d in dd_list(r)]
- read_lines(): refactor and add comment to be clearer about what it's doing, add unit test
- DeviceInfo.shortdev: use os.path.realpath instead of manually resolving symlinks, add comment about why we don't use basename(), add unit test
- Add extra explanatory comments
- TextMenu.format_header(): use 4*' ' instead of ' '
- Move test_driver_updates.py to tests/dracut_tests/
- Fix ExtractDriversTestCase trying to create /updates --- dracut/Makefile.am | 3 +- dracut/driver_updates.py | 86 ++-- dracut/test_driver_updates.py | 624 ----------------------------- tests/dracut_tests/test_driver_updates.py | 631 ++++++++++++++++++++++++++++++ tests/nosetests.sh | 2 +- 5 files changed, 689 insertions(+), 657 deletions(-) delete mode 100644 dracut/test_driver_updates.py create mode 100644 tests/dracut_tests/test_driver_updates.py
diff --git a/dracut/Makefile.am b/dracut/Makefile.am index 44a5b46..71d6bed 100644 --- a/dracut/Makefile.am +++ b/dracut/Makefile.am @@ -46,7 +46,6 @@ dist_dracut_SCRIPTS = module-setup.sh \ driver-updates@.service \ driver-updates-genrules.sh \ anaconda-depmod.sh \ - driver_updates.py \ - test_driver_updates.py + driver_updates.py
MAINTAINERCLEANFILES = Makefile.in diff --git a/dracut/driver_updates.py b/dracut/driver_updates.py index 4c331d5..0bf9648 100755 --- a/dracut/driver_updates.py +++ b/dracut/driver_updates.py @@ -49,13 +49,18 @@ /tmp/dd.done should be created when all the user-requested stuff above has been handled; the installer won't start up until this file is created.
-Repositories for installed drivers are copied into /run/install/DD-X where X -starts at 1 and increments for each repository. +Packages will be extracted to /updates, which gets overlaid on top +of the installer's filesystem when we leave the initramfs.
-Selected driver package names are saved in /run/install/dd_packages. +Modules and firmware get moved to /lib/modules/`uname -r`/updates and +/lib/firmware/updates (under /updates, as above). They also get copied into the +corresponding paths in the initramfs, so we can load them immediately.
-Anaconda uses the repository and package list to install the same set of drivers -to the target system. +The repositories get copied into /run/install/DD-1, /run/install/DD-2, etc. +Driver package names are saved in /run/install/dd_packages. + +During system installation, anaconda will install the packages listed in +/run/install/dd_packages to the target system. """
import logging @@ -80,8 +85,19 @@
log = logging.getLogger("DD")
-arch = os.uname()[4] -kernelver = os.uname()[2] +# NOTE: Yes, the version is wrong, but previous versions of this utility also +# hardcoded this value, because changing it will break any driver disk that has +# binary/library packages with "installer-enhancement = 19.0".. +# If we *need* to break compatibility, this should definitely get changed, but +# otherwise we probably shouldn't change this unless/until we're sure that +# everyone is using something like "installer-enhancement >= 19.0" instead.. +ANACONDAVER = "19.0" + +ARCH = os.uname()[4] +KERNELVER = os.uname()[2] + +MODULE_UPDATES_DIR = "/lib/modules/%s/updates" % ARCH +FIRMWARE_UPDATES_DIR = "/lib/firmware/updates"
def mkdir_seq(stem): """ @@ -108,7 +124,7 @@ def find_repos(mnt): """find any valid driverdisk repos that exist under mnt.""" dd_repos = [] for root, dirs, files in os.walk(mnt, followlinks=True): - repo = root+"/rpms/"+arch + repo = root+"/rpms/"+ARCH if "rhdd3" in files and "rpms" in dirs and os.path.isdir(repo): log.debug("found repo: %s", repo) dd_repos.append(repo) @@ -133,9 +149,9 @@ def __init__(self, source="", name="", flags="", description="", repo=""): def dd_list(dd_path, anaconda_ver=None, kernel_ver=None): log.debug("dd_list: listing %s", dd_path) if not anaconda_ver: - anaconda_ver = '19.0' + anaconda_ver = ANACONDAVER if not kernel_ver: - kernel_ver = kernelver + kernel_ver = KERNELVER cmd = ["dd_list", '-d', dd_path, '-k', kernel_ver, '-a', anaconda_ver] out = subprocess.check_output(cmd, stderr=DEVNULL) out = out.decode('utf-8') @@ -147,10 +163,13 @@ def dd_list(dd_path, anaconda_ver=None, kernel_ver=None): def dd_extract(rpm_path, outdir, kernel_ver=None, flags='-blmf'): log.debug("dd_extract: extracting %s", rpm_path) if not kernel_ver: - kernel_ver = kernelver + kernel_ver = KERNELVER cmd = ["dd_extract", flags, '-r', rpm_path, '-d', outdir, '-k', kernel_ver] subprocess.check_output(cmd, stderr=DEVNULL) # discard stdout
+def list_drivers(repos, anaconda_ver=None, kernel_ver=None): + return [d for r in repos for d in dd_list(r, anaconda_ver, kernel_ver)] + def mount(dev, mnt=None): """Mount the given dev at the mountpoint given by mnt.""" # NOTE: dev may be a filesystem image - "-o loop" is not necessary anymore @@ -173,9 +192,6 @@ def mounted(dev, mnt=None): finally: umount(mnt)
-module_updates_dir = '/lib/modules/%s/updates' % os.uname()[2] -firmware_updates_dir = '/lib/firmware/updates' - def iter_files(topdir, pattern=None): """iterator; yields full paths to files under topdir that match pattern.""" for head, _, files in os.walk(topdir): @@ -210,9 +226,12 @@ def append_line(filename, line): with open(filename, 'a') as outf: outf.write(line)
+# NOTE: items returned by read_lines should match items passed to append_line, +# which is why we remove the newlines def read_lines(filename): + """return a list containing each line in filename, with newlines removed.""" try: - return open(filename).read().splitlines() + return [line.rstrip('\n') for line in open(filename)] except IOError: return []
@@ -230,7 +249,7 @@ def extract_drivers(drivers=None, repos=None, outdir="/updates",
drivers should be a list of Drivers to extract, or None. repos should be a list of repo paths to extract, or None. - (If both are empty, nothing happens..) + Raises ValueError if you pass both.
If any packages containing modules or firmware are extracted, also: * call save_repo for that package's repo @@ -240,8 +259,10 @@ def extract_drivers(drivers=None, repos=None, outdir="/updates", """ if not drivers: drivers = [] + if drivers and repos: + raise ValueError("extract_drivers: drivers or repos, not both") if repos: - drivers += [d for repo in repos for d in dd_list(repo)] + drivers = list_drivers(repos)
save_repos = set() new_drivers = False @@ -270,10 +291,10 @@ def grab_driver_files(outdir="/updates"): """ modules = list(iter_files(outdir+'/lib/modules',"*.ko*")) firmware = list(iter_files(outdir+'/lib/firmware')) - copy_files(modules, module_updates_dir) - copy_files(firmware, firmware_updates_dir) - move_files(modules, outdir+module_updates_dir) - move_files(firmware, outdir+firmware_updates_dir) + copy_files(modules, MODULE_UPDATES_DIR) + copy_files(firmware, FIRMWARE_UPDATES_DIR) + move_files(modules, outdir+MODULE_UPDATES_DIR) + move_files(firmware, outdir+FIRMWARE_UPDATES_DIR) return [os.path.basename(m).split('.ko')[0] for m in modules]
def load_drivers(modnames): @@ -282,6 +303,8 @@ def load_drivers(modnames): subprocess.call(["depmod", "-a"]) subprocess.call(["modprobe", "-a"] + modnames)
+# We *could* pass in "outdir" if we wanted to extract things somewhere else, +# but right now the only use case is running inside the initramfs, so.. def process_driver_disk(dev, interactive=False): try: _process_driver_disk(dev, interactive=interactive) @@ -350,12 +373,13 @@ def __repr__(self):
@property def shortdev(self): - if os.path.islink(self.device): - return os.path.basename(os.readlink(self.device)) - elif self.device.startswith('/dev/'): - return self.device[5:] - else: - return self.device + # resolve any symlinks (/dev/disk/by-label/OEMDRV -> /dev/sr0) + dev = os.path.realpath(self.device) + # NOTE: not os.path.basename 'cuz some devices legitimately have + # a '/' in their name: /dev/cciss/c0d0, /dev/i2o/hda, etc. + if dev.startswith('/dev/'): + dev = dev[5:] + return dev
def blkid(): out = subprocess.check_output("blkid -o export -s UUID -s TYPE".split()) @@ -363,6 +387,8 @@ def blkid(): return [dict(kv.split('=',1) for kv in block.splitlines()) for block in out.split('\n\n')]
+# We use this to get disk labels because blkid's encoding of non-printable and +# non-ascii characters is weird and doesn't match what you'd expect to see. def get_disk_labels(): return {os.path.realpath(s):os.path.basename(s) for s in iter_files("/dev/disk/by-label")} @@ -451,9 +477,9 @@ def format_items(self):
def format_header(self): if self.multi: - return ' '+self.format_item(self.headeritem) + return (8*' ')+self.format_item(self.headeritem) else: - return ' '+self.format_item(self.headeritem) + return (4*' ')+self.format_item(self.headeritem)
def action_dict(self): actions = { @@ -498,7 +524,7 @@ def run(self): return self.selected_items
def repo_menu(repos): - drivers = [d for r in repos for d in dd_list(r)] + drivers = list_drivers(repos) if not drivers: log.info("No suitable drivers found.") return [] diff --git a/dracut/test_driver_updates.py b/dracut/test_driver_updates.py deleted file mode 100644 index 2aaa993..0000000 --- a/dracut/test_driver_updates.py +++ /dev/null @@ -1,624 +0,0 @@ -# test_driver_updates.py - unittests for driver_updates.py - -import unittest -try: - import unittest.mock as mock -except ImportError: - import mock - -import os -import tempfile -import shutil - -from driver_updates import copy_files, move_files, iter_files, ensure_dir -from driver_updates import append_line, mkdir_seq - -def touch(path): - try: - open(path, 'a') - except IOError as e: - if e.errno != 17: raise - -def makedir(path): - ensure_dir(path) - return path - -def makefile(path): - makedir(os.path.dirname(path)) - touch(path) - return path - -def makefiles(*paths): - return [makefile(p) for p in paths] - -class FileTestCaseBase(unittest.TestCase): - def setUp(self): - self.tmpdir = tempfile.mkdtemp(prefix="test_driver_updates.") - self.srcdir = self.tmpdir+'/src/' - self.destdir = self.tmpdir+'/dest/' - - def tearDown(self): - shutil.rmtree(self.tmpdir, ignore_errors=True) - - def makefiles(self, *paths): - return [makefile(os.path.normpath(self.tmpdir+'/'+p)) for p in paths] - -class SelfTestCase(FileTestCaseBase): - def test_makefiles(self): - """check test helpers""" - filepaths = ["sub/dir/test.file", "testfile"] - self.makefiles(*filepaths) - for f in filepaths: - self.assertTrue(os.path.exists(self.tmpdir+'/'+f)) - -class TestCopyFiles(FileTestCaseBase): - def test_basic(self): - """copy_file: copy files into destdir, leaving existing contents""" - files = self.makefiles("src/file1", "src/subdir/file2") - self.makefiles("dest/file3") - copy_files(files, self.destdir) - result = set(os.listdir(self.destdir)) - self.assertEqual(result, set(["file1", "file2", "file3"])) - - def test_overwrite(self): - """copy_file: overwrite files in destdir if they have the same name""" - src, dest = self.makefiles("src/file1", "dest/file1") - with open(src, 'w') as outf: - outf.write("srcfile") - with open(dest, 'w') as outf: - outf.write("destfile") - copy_files([src], self.destdir) - self.assertEqual(os.listdir(self.destdir), ["file1"]) - self.assertEqual(open(dest).read(), "srcfile") - - def test_samefile(self): - """copy_file: skip files already in destdir""" - (dest,) = self.makefiles("dest/file1") - with open(dest, 'w') as outf: - outf.write("destfile") - copy_files([dest], self.destdir) - self.assertEqual(os.listdir(self.destdir), ["file1"]) - self.assertEqual(open(dest).read(), "destfile") - - def test_copy_to_parent(self): - """copy_file: skip files in subdirs of destdir""" - files = self.makefiles("dest/subdir/file1") - copy_files(files, self.destdir) - self.assertEqual(list(iter_files(self.destdir)), files) - -class TestIterFiles(FileTestCaseBase): - def test_basic(self): - """iter_files: iterates over full paths to files under topdir""" - files = set(self.makefiles("src/file1", "dest/file2", "src/sub/file3")) - makedir(self.tmpdir+'/empty/dir') - result = set(iter_files(self.tmpdir)) - self.assertEqual(files, result) - - def test_pattern(self): - """iter_files: match filename against glob pattern""" - self.makefiles("src/file1.so", "src/sub.ko/file2") - goodfiles = set(self.makefiles("src/sub/file1.ko", "src/file2.ko.xz")) - result = set(iter_files(self.tmpdir, pattern="*.ko*")) - self.assertEqual(result, goodfiles) - -class TestMoveFiles(FileTestCaseBase): - def test_basic(self): - """move_files: move files to destdir""" - files = self.makefiles("src/file1", "src/subdir/file2") - move_files(files, self.destdir) - self.assertEqual(set(os.listdir(self.destdir)), set(["file1", "file2"])) - self.assertEqual(list(iter_files(self.srcdir)), []) - - def test_overwrite(self): - """move_files: overwrite files with the same name""" - src, dest = self.makefiles("src/file1", "dest/file1") - with open(src, 'w') as outf: - outf.write("srcfile") - with open(dest, 'w') as outf: - outf.write("destfile") - move_files([src], self.destdir) - self.assertEqual(os.listdir(self.destdir), ["file1"]) - self.assertEqual(open(dest).read(), "srcfile") - self.assertEqual(list(iter_files(self.srcdir)), []) - - def test_samefile(self): - """move_files: leave files alone if they're already in destdir""" - (dest,) = self.makefiles("dest/file1") - with open(dest, 'w') as outf: - outf.write("destfile") - move_files([dest], self.destdir) - self.assertEqual(os.listdir(self.destdir), ["file1"]) - self.assertEqual(open(dest).read(), "destfile") - - def test_move_to_parent(self): - """move_files: leave files alone if they're in a subdir of destdir""" - files = set(self.makefiles("dest/subdir/file1", "dest/file2")) - move_files(files, self.destdir) - self.assertEqual(set(iter_files(self.destdir)), files) - -class TestAppendLine(FileTestCaseBase): - def test_empty(self): - """append_line: create file + append \n when needed""" - line = "this is a line of text with no newline" - outfile = self.tmpdir+'/outfile' - append_line(outfile, line) - self.assertEqual(open(outfile).read(), line+'\n') - - def test_append(self): - """append_line: adds a line to the end of an existing file""" - oldlines = ["line one", "line two", "and I'm line three"] - outfile = self.tmpdir+'/outfile' - with open(outfile, 'w') as outf: - for line in oldlines: - outf.write(line+'\n') - line = "this line contains a newline already\n" - append_line(outfile, line) - self.assertEqual(open(outfile).read(), '\n'.join(oldlines+[line])) - -from driver_updates import read_lines -class TestReadLine(FileTestCaseBase): - def test_empty(self): - """read_lines: return [] for empty file""" - [empty] = self.makefiles("emptyfile") - self.assertEqual(read_lines(empty), []) - - def test_missing(self): - """read_lines: return [] for missing file""" - self.assertEqual(read_lines(self.tmpdir+'/no-such-file'),[]) - - def test_readlines(self): - """read_lines: returns a list of lines without trailing newlines""" - filedata = 'line one\nline two\n\nline four\n' - outfile = self.tmpdir+'/outfile' - with open(outfile, 'w') as outf: - outf.write(filedata) - lines = read_lines(outfile) - self.assertEqual(lines, ['line one', 'line two','','line four']) - -class TestMkdirSeq(FileTestCaseBase): - def test_basic(self): - """mkdir_seq: first dir ends with 1""" - newdir = mkdir_seq(self.srcdir+'/DD-') - self.assertEqual(newdir, self.srcdir+'/DD-1') - self.assertTrue(os.path.isdir(newdir)) - - def test_one_exists(self): - """mkdir_seq: increment number if file exists""" - firstdir = mkdir_seq(self.srcdir+'/DD-') - newdir = mkdir_seq(self.srcdir+'/DD-') - self.assertEqual(newdir, self.srcdir+'/DD-2') - self.assertTrue(os.path.isdir(newdir)) - self.assertTrue(os.path.isdir(firstdir)) - -from driver_updates import find_repos, save_repo, arch -# As far as we know, this is what makes a valid repo: rhdd3 + rpms/`uname -m`/ -def makerepo(topdir, desc=None): - descfile = makefile(topdir+'/rhdd3') - if not desc: - desc = os.path.basename(topdir) - with open(descfile, "w") as outf: - outf.write(desc+"\n") - makedir(topdir+'/rpms/'+arch) - -class TestFindRepos(FileTestCaseBase): - def test_basic(self): - """find_repos: return RPM dir if a valid repo is found""" - makerepo(self.tmpdir) - repos = find_repos(self.tmpdir) - self.assertEqual(repos, [self.tmpdir+'/rpms/'+arch]) - self.assertTrue(os.path.isdir(repos[0])) - - def test_multiple_subdirs(self): - """find_repos: descend multiple subdirs if needed""" - makerepo(self.tmpdir+'/driver1') - makerepo(self.tmpdir+'/sub/driver1') - makerepo(self.tmpdir+'/sub/driver2') - repos = find_repos(self.tmpdir) - self.assertEqual(len(repos),3) - -class TestSaveRepo(FileTestCaseBase): - def test_basic(self): - """save_repo: copies a directory to /run/install/DD-X""" - makerepo(self.srcdir) - [repo] = find_repos(self.srcdir) - makefile(repo+'/fake-something.rpm') - saved = save_repo(repo, target=self.destdir) - self.assertEqual(set(os.listdir(saved)), set(["fake-something.rpm"])) - self.assertEqual(saved, os.path.join(self.destdir, "DD-1")) - -from driver_updates import mount, umount, mounted -class MountTestCase(unittest.TestCase): - @mock.patch('driver_updates.mkdir_seq') - @mock.patch('driver_updates.subprocess.check_call') - def test_mkdir(self, check_call, mkdir): - """mount: makes mountpoint if needed""" - dev, mnt = '/dev/fake', '/media/DD-1' - mkdir.return_value = mnt - mountpoint = mount(dev) - mkdir.assert_called_once_with('/media/DD-') - check_call.assert_called_once_with(["mount", dev, mnt]) - self.assertEqual(mnt, mountpoint) - - @mock.patch('driver_updates.mkdir_seq') - @mock.patch('driver_updates.subprocess.check_call') - def test_basic(self, check_call, mkdir): - """mount: calls mount(8) to mount a device/image""" - dev, mnt = '/dev/fake', '/media/fake' - mount(dev, mnt) - check_call.assert_called_once_with(["mount", dev, mnt]) - self.assertFalse(mkdir.called) - - @mock.patch('driver_updates.subprocess.call') - def test_umount(self, call): - """umount: calls umount(8)""" - mnt = '/mnt/fake' - umount(mnt) - call.assert_called_once_with(["umount", mnt]) - - @mock.patch('driver_updates.mount') - @mock.patch('driver_updates.umount') - def test_mount_manager(self, mock_umount, mock_mount): - """mounted: context manager mounts/umounts as expected""" - dev, mnt = '/dev/fake', '/media/fake' - mock_mount.return_value = mnt - with mounted(dev, mnt) as mountpoint: - mock_mount.assert_called_once_with(dev, mnt) - self.assertFalse(mock_umount.called) - self.assertEqual(mountpoint, mnt) - mock_umount.assert_called_once_with(mnt) - -# NOTE: dd_list and dd_extract get tested pretty thoroughly in tests/dd_tests, -# so this is a slightly higher-level test case -from driver_updates import dd_list, dd_extract, Driver -fake_module = Driver( - source='/repo/path/to/fake-driver-1.0-1.rpm', - name='fake-driver', - flags='modules firmwares', - description='Wow this is totally a fake driver.\nHooray for this', - repo='/repo/path/to' -) -fake_enhancement = Driver( - source='/repo/path/to/fake-enhancement-1.0-1.rpm', - name='fake-enhancement', - flags='binaries libraries', - description='This is enhancing the crap out of the installer.\n\nYeah.', - repo=fake_module.repo -) -def dd_list_output(driver): - out='{0.source}\n{0.name}\n{0.flags}\n{0.description}\n---\n'.format(driver) - return out.encode('utf-8') - -class DDUtilsTestCase(unittest.TestCase): - @mock.patch("driver_updates.subprocess.check_output") - def test_dd_list(self, check_output): - """dd_list: returns a list of Driver objects parsed from output""" - output = dd_list_output(fake_module)+dd_list_output(fake_enhancement) - check_output.return_value = output - anaconda, kernel = '19.0', os.uname()[2] - result = dd_list(fake_module.repo) - cmd = check_output.call_args[0][0] - self.assertIn(kernel, cmd) - self.assertIn(anaconda, cmd) - self.assertIn(fake_module.repo, cmd) - self.assertTrue(cmd[0].endswith("dd_list")) - self.assertEqual(len(result), 2) - mod, enh = sorted(result, key=lambda d: d.name) - self.assertEqual(mod.__dict__, fake_module.__dict__) - self.assertEqual(enh.__dict__, fake_enhancement.__dict__) - - @mock.patch("driver_updates.subprocess.check_output") - def test_dd_extract(self, check_output): - """dd_extract: call binary with expected arguments""" - rpm = "/some/kind/of/path.rpm" - outdir = "/output/dir" - dd_extract(rpm, outdir) - cmd = check_output.call_args[0][0] - self.assertIn(os.uname()[2], cmd) - self.assertIn(rpm, cmd) - self.assertIn(outdir, cmd) - self.assertIn("-blmf", cmd) - self.assertTrue(cmd[0].endswith("dd_extract")) - -from driver_updates import extract_drivers, grab_driver_files, load_drivers - -class ExtractDriversTestCase(unittest.TestCase): - @mock.patch("driver_updates.save_repo") - @mock.patch("driver_updates.append_line") - @mock.patch("driver_updates.dd_extract") - def test_drivers(self, mock_extract, mock_append, mock_save): - """extract_drivers: save repo, write pkglist""" - extract_drivers(drivers=[fake_enhancement, fake_module]) - # extracts all listed modules - mock_extract.assert_has_calls([ - mock.call(fake_enhancement.source, "/updates"), - mock.call(fake_module.source, "/updates") - ], any_order=True) - pkglist = "/run/install/dd_packages" - mock_append.assert_called_once_with(pkglist, fake_module.name) - mock_save.assert_called_once_with(fake_module.repo) - - @mock.patch("driver_updates.save_repo") - @mock.patch("driver_updates.append_line") - @mock.patch("driver_updates.dd_extract") - def test_enhancements(self, mock_extract, mock_append, mock_save): - """extract_drivers: extract selected drivers, don't save enhancements""" - extract_drivers(drivers=[fake_enhancement]) - mock_extract.assert_called_once_with( - fake_enhancement.source, "/updates" - ) - self.assertFalse(mock_append.called) - self.assertFalse(mock_save.called) - - @mock.patch("driver_updates.save_repo") - @mock.patch("driver_updates.append_line") - @mock.patch("driver_updates.dd_extract") - def test_repo(self, mock_extract, mock_append, mock_save): - """extract_drivers(repos=[...]) extracts all drivers from named repos""" - with mock.patch("driver_updates.dd_list", side_effect=[ - [fake_enhancement], - [fake_enhancement, fake_module]]): - extract_drivers(repos=['enh_repo', 'mod_repo']) - mock_extract.assert_has_calls([ - mock.call(fake_enhancement.source, "/updates"), - mock.call(fake_enhancement.source, "/updates"), - mock.call(fake_module.source, "/updates") - ]) - pkglist = "/run/install/dd_packages" - mock_append.assert_called_once_with(pkglist, fake_module.name) - mock_save.assert_called_once_with(fake_module.repo) - -class GrabDriverFilesTestCase(FileTestCaseBase): - def test_basic(self): - """grab_driver_files: copy drivers into place, return module list""" - # create a bunch of fake extracted files - outdir = self.tmpdir + '/extract-outdir' - moddir = outdir + "/lib/modules/%s/kernel/" % os.uname()[2] - fwdir = outdir + "/lib/firmware/" - modules = makefiles(moddir+"net/funk.ko", moddir+"fs/lolfs.ko.xz") - firmware = makefiles(fwdir+"funk.fw") - makefiles(outdir+"/usr/bin/monkey", outdir+"/other/dir/blah.ko") - mod_upd_dir = self.tmpdir+'/module-updates' - fw_upd_dir = self.tmpdir+'/fw-updates' - # use our updates dirs instead of the default updates dirs - with mock.patch.multiple("driver_updates", - module_updates_dir=mod_upd_dir, - firmware_updates_dir=fw_upd_dir): - modnames = grab_driver_files(outdir) - self.assertEqual(set(modnames), set(["funk", "lolfs"])) - modfiles = set(['funk.ko', 'lolfs.ko.xz']) - fwfiles = set(['funk.fw']) - # modules/firmware are *not* in their old locations - self.assertEqual([f for f in modules+firmware if os.path.exists(f)], []) - # modules are in the system's updates dir - self.assertEqual(set(os.listdir(mod_upd_dir)), modfiles) - # modules are also in outdir's updates dir - self.assertEqual(set(os.listdir(outdir+'/'+mod_upd_dir)), modfiles) - # repeat for firmware - self.assertEqual(set(os.listdir(fw_upd_dir)), fwfiles) - self.assertEqual(set(os.listdir(outdir+'/'+fw_upd_dir)), fwfiles) - -class LoadDriversTestCase(unittest.TestCase): - @mock.patch("driver_updates.subprocess.call") - def test_basic(self, call): - """load_drivers: runs depmod and modprobes all named modules""" - modnames = ['mod1', 'mod2'] - load_drivers(modnames) - call.assert_has_calls([ - mock.call(["depmod", "-a"]), - mock.call(["modprobe", "-a"] + modnames) - ]) - -from driver_updates import process_driver_disk -class ProcessDriverDiskTestCase(unittest.TestCase): - def setUp(self): - # an iterable that returns fake mountpoints, for mocking mount() - self.fakemount = ["/mnt/DD-%i" % n for n in range(1,10)] - # an iterable that returns fake repos, for mocking find_repos() - self.frepo = { - '/mnt/DD-1': ['/mnt/DD-1/repo1'], - '/mnt/DD-2': ['/mnt/DD-2/repo1', '/mnt/DD-2/repo2'], - } - # fake iso listings for iso_dir - self.fiso = { - '/mnt/DD-1': [], - '/mnt/DD-2': [], - '/mnt/DD-3': [], - } - # a context-manager object to be returned by the mock mounted() - mounted_ctx = mock.MagicMock( - __enter__=mock.MagicMock(side_effect=self.fakemount), # mount - __exit__=mock.MagicMock(return_value=None), # umount - ) - self.modlist = [] - # set up our patches - patches = ( - mock.patch("driver_updates.mounted", return_value=mounted_ctx), - mock.patch("driver_updates.find_repos", side_effect=self.frepo.get), - mock.patch("driver_updates.find_isos", side_effect=self.fiso.get), - mock.patch("driver_updates.extract_drivers", return_value=True), - mock.patch("driver_updates.load_drivers"), - mock.patch('driver_updates.grab_driver_files', - side_effect=lambda: self.modlist), - ) - self.mocks = {p.attribute:p.start() for p in patches} - for p in patches: self.addCleanup(p.stop) - - def test_basic(self): - """process_driver_disk: mount disk, extract RPMs, grab + load drivers""" - dev = '/dev/fake' - process_driver_disk(dev) - # did we mount the initial device, and then the .iso we find therein? - self.mocks['mounted'].assert_called_once_with(dev) - self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-1']) - self.mocks['grab_driver_files'].assert_called_once_with() - self.mocks['load_drivers'].assert_called_once_with(self.modlist) - - def test_recursive(self): - """process_driver_disk: recursively process .isos at toplevel""" - dev = '/dev/fake' - # first mount has no repos, but an iso - self.frepo['/mnt/DD-1'] = [] - self.fiso['/mnt/DD-1'].append('magic.iso') - self.fiso['/mnt/DD-2'].append('ignored.iso') - process_driver_disk(dev) - # did we mount the initial device, and the iso therein? - # also: we ignore ignored.iso because magic.iso is a proper DD - self.mocks['mounted'].assert_has_calls([ - mock.call(dev), mock.call('magic.iso') - ]) - # we extracted drivers from the repo(s) in magic.iso - self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-2']) - self.mocks['grab_driver_files'].assert_called_once_with() - self.mocks['load_drivers'].assert_called_once_with(self.modlist) - - def test_no_drivers(self): - """process_driver_disk: don't run depmod etc. if no new drivers""" - dev = '/dev/fake' - self.mocks['extract_drivers'].return_value = False - process_driver_disk(dev) - self.assertFalse(self.mocks['grab_driver_files'].called) - self.assertFalse(self.mocks['load_drivers'].called) - -from driver_updates import finish, mark_finished, all_finished - -class FinishedTestCase(FileTestCaseBase): - def test_mark_finished(self): - """mark_finished: appends a line to /tmp/dd_finished""" - requeststr = "WOW SOMETHING OR OTHER" - mark_finished(requeststr, topdir=self.tmpdir) - finished = self.tmpdir+'/dd_finished' - self.assertTrue(os.path.exists(finished)) - self.assertEqual(read_lines(finished), [requeststr]) - - def test_all_finished(self): - """all_finished: True if all lines from dd_todo are in dd_finished""" - todo = self.tmpdir+'/dd_todo' - requests = ['one', 'two', 'final thingy'] - with open(todo, 'w') as outf: - outf.write(''.join(r+'\n' for r in requests)) - self.assertEqual(set(read_lines(todo)), set(requests)) - for r in reversed(requests): - self.assertFalse(all_finished(topdir=self.tmpdir)) - mark_finished(r, topdir=self.tmpdir) - self.assertTrue(all_finished(topdir=self.tmpdir)) - - def test_extra_finished(self): - """all_finished: True if dd_finished has more items than dd_todo""" - self.test_all_finished() - mark_finished("BONUS", topdir=self.tmpdir) - self.assertTrue(all_finished(topdir=self.tmpdir)) - - def test_finish(self): - """finish: mark request finished, and write dd.done if all complete""" - todo = self.tmpdir+'/dd_todo' - done = self.tmpdir+'/dd.done' - requests = ['one', 'two', 'final thingy'] - with open(todo, 'w') as outf: - outf.write(''.join(r+'\n' for r in requests)) - for r in reversed(requests): - print("marking %s" % r) - self.assertFalse(os.path.exists(done)) - finish(r, topdir=self.tmpdir) - self.assertTrue(os.path.exists(done)) - -from driver_updates import get_deviceinfo, DeviceInfo -blkid_output = b'''\ -DEVNAME=/dev/sda2 -UUID=0f21a3d1-dcd3-4ab4-a292-c5556850d561 -TYPE=ext4 - -DEVNAME=/dev/sda1 -UUID=C53C-EE46 -TYPE=vfat - -DEVNAME=/dev/sda3 -UUID=4126dbb6-c7d3-47b4-b1fc-9bb461df0067 -TYPE=btrfs - -DEVNAME=/dev/loop0 -UUID=6f16967e-0388-4276-bd8d-b88e5b217a55 -TYPE=ext4 -''' -disk_labels = { - '/dev/sdb1': 'metroid_srv', - '/dev/loop0': 'I\x20\u262d\x20COMMUNISM', - '/dev/sda3': 'metroid_root' -} -devicelist = [ - DeviceInfo(DEVNAME='/dev/sda2', TYPE='ext4', - UUID='0f21a3d1-dcd3-4ab4-a292-c5556850d561'), - DeviceInfo(DEVNAME='/dev/sda1', TYPE='vfat', - UUID='C53C-EE46'), - DeviceInfo(DEVNAME='/dev/sda3', TYPE='btrfs', LABEL='metroid_root', - UUID='4126dbb6-c7d3-47b4-b1fc-9bb461df0067'), - DeviceInfo(DEVNAME='/dev/loop0', TYPE='ext4', - LABEL='I\x20\u262d\x20COMMUNISM', - UUID='6f16967e-0388-4276-bd8d-b88e5b217a55'), -] -# also covers blkid, get_disk_labels, DeviceInfo -class DeviceInfoTestCase(unittest.TestCase): - @mock.patch('driver_updates.subprocess.check_output') - @mock.patch('driver_updates.get_disk_labels') - def test_basic(self, get_disk_labels, check_output): - """get_deviceinfo: parses DeviceInfo from blkid etc.""" - # configure mock objects - check_output.return_value = blkid_output - get_disk_labels.return_value = disk_labels - # now we're getting mock deviceinfo, whee - disks = get_deviceinfo() - self.assertEqual(len(disks), 4) - disks.sort(key=lambda d: d.device) - loop, efi, boot, root = disks - self.assertEqual(vars(boot), vars(devicelist[0])) - self.assertEqual(vars(efi), vars(devicelist[1])) - self.assertEqual(vars(root), vars(devicelist[2])) - self.assertEqual(vars(loop), vars(devicelist[3])) - -# TODO: test TextMenu itself - -# py2/3 compat -import sys -if sys.version_info.major == 3: - from io import StringIO -else: - from io import BytesIO as StringIO - -from driver_updates import device_menu -class DeviceMenuTestCase(unittest.TestCase): - def setUp(self): - patches = ( - mock.patch('driver_updates.get_deviceinfo',return_value=devicelist), - ) - self.mocks = {p.attribute:p.start() for p in patches} - for p in patches: self.addCleanup(p.stop) - - def test_device_menu_exit(self): - """device_menu: 'c' exits the menu""" - with mock.patch('driver_updates._input', side_effect=['c']): - dev = device_menu() - self.assertEqual(dev, []) - self.assertEqual(self.mocks['get_deviceinfo'].call_count, 1) - - def test_device_menu_refresh(self): - """device_menu: 'r' makes the menu refresh""" - with mock.patch('driver_updates._input', side_effect=['r','c']): - device_menu() - self.assertEqual(self.mocks['get_deviceinfo'].call_count, 2) - - @mock.patch("sys.stdout", new_callable=StringIO) - def test_device_menu(self, stdout): - """device_menu: choosing a number returns that Device""" - choose_num='2' - with mock.patch('driver_updates._input', return_value=choose_num): - result = device_menu() - # if you hit '2' you should get the corresponding device from the list - self.assertEqual(len(result), 1) - dev = result[0] - self.assertEqual(vars(dev), vars(devicelist[int(choose_num)-1])) - # find the corresponding line on-screen - screen = [l.strip() for l in stdout.getvalue().splitlines()] - match = [l for l in screen if l.startswith(choose_num+')')] - self.assertEqual(len(match), 1) - line = match.pop(0) - # the device name (at least) should be on this line - self.assertIn(os.path.basename(dev.device), line) diff --git a/tests/dracut_tests/test_driver_updates.py b/tests/dracut_tests/test_driver_updates.py new file mode 100644 index 0000000..9b87c73 --- /dev/null +++ b/tests/dracut_tests/test_driver_updates.py @@ -0,0 +1,631 @@ +# test_driver_updates.py - unittests for driver_updates.py + +import unittest +try: + import unittest.mock as mock +except ImportError: + import mock + +import os +import tempfile +import shutil + +import sys +sys.path.append(os.path.normpath(os.path.dirname(__file__)+'/../../dracut')) + +from driver_updates import copy_files, move_files, iter_files, ensure_dir +from driver_updates import append_line, mkdir_seq + +def touch(path): + try: + open(path, 'a') + except IOError as e: + if e.errno != 17: raise + +def makedir(path): + ensure_dir(path) + return path + +def makefile(path): + makedir(os.path.dirname(path)) + touch(path) + return path + +def makefiles(*paths): + return [makefile(p) for p in paths] + +class FileTestCaseBase(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="test_driver_updates.") + self.srcdir = self.tmpdir+'/src/' + self.destdir = self.tmpdir+'/dest/' + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def makefiles(self, *paths): + return [makefile(os.path.normpath(self.tmpdir+'/'+p)) for p in paths] + +class SelfTestCase(FileTestCaseBase): + def test_makefiles(self): + """check test helpers""" + filepaths = ["sub/dir/test.file", "testfile"] + self.makefiles(*filepaths) + for f in filepaths: + self.assertTrue(os.path.exists(self.tmpdir+'/'+f)) + +class TestCopyFiles(FileTestCaseBase): + def test_basic(self): + """copy_file: copy files into destdir, leaving existing contents""" + files = self.makefiles("src/file1", "src/subdir/file2") + self.makefiles("dest/file3") + copy_files(files, self.destdir) + result = set(os.listdir(self.destdir)) + self.assertEqual(result, set(["file1", "file2", "file3"])) + + def test_overwrite(self): + """copy_file: overwrite files in destdir if they have the same name""" + src, dest = self.makefiles("src/file1", "dest/file1") + with open(src, 'w') as outf: + outf.write("srcfile") + with open(dest, 'w') as outf: + outf.write("destfile") + copy_files([src], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "srcfile") + + def test_samefile(self): + """copy_file: skip files already in destdir""" + (dest,) = self.makefiles("dest/file1") + with open(dest, 'w') as outf: + outf.write("destfile") + copy_files([dest], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "destfile") + + def test_copy_to_parent(self): + """copy_file: skip files in subdirs of destdir""" + files = self.makefiles("dest/subdir/file1") + copy_files(files, self.destdir) + self.assertEqual(list(iter_files(self.destdir)), files) + +class TestIterFiles(FileTestCaseBase): + def test_basic(self): + """iter_files: iterates over full paths to files under topdir""" + files = set(self.makefiles("src/file1", "dest/file2", "src/sub/file3")) + makedir(self.tmpdir+'/empty/dir') + result = set(iter_files(self.tmpdir)) + self.assertEqual(files, result) + + def test_pattern(self): + """iter_files: match filename against glob pattern""" + self.makefiles("src/file1.so", "src/sub.ko/file2") + goodfiles = set(self.makefiles("src/sub/file1.ko", "src/file2.ko.xz")) + result = set(iter_files(self.tmpdir, pattern="*.ko*")) + self.assertEqual(result, goodfiles) + +class TestMoveFiles(FileTestCaseBase): + def test_basic(self): + """move_files: move files to destdir""" + files = self.makefiles("src/file1", "src/subdir/file2") + move_files(files, self.destdir) + self.assertEqual(set(os.listdir(self.destdir)), set(["file1", "file2"])) + self.assertEqual(list(iter_files(self.srcdir)), []) + + def test_overwrite(self): + """move_files: overwrite files with the same name""" + src, dest = self.makefiles("src/file1", "dest/file1") + with open(src, 'w') as outf: + outf.write("srcfile") + with open(dest, 'w') as outf: + outf.write("destfile") + move_files([src], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "srcfile") + self.assertEqual(list(iter_files(self.srcdir)), []) + + def test_samefile(self): + """move_files: leave files alone if they're already in destdir""" + (dest,) = self.makefiles("dest/file1") + with open(dest, 'w') as outf: + outf.write("destfile") + move_files([dest], self.destdir) + self.assertEqual(os.listdir(self.destdir), ["file1"]) + self.assertEqual(open(dest).read(), "destfile") + + def test_move_to_parent(self): + """move_files: leave files alone if they're in a subdir of destdir""" + files = set(self.makefiles("dest/subdir/file1", "dest/file2")) + move_files(files, self.destdir) + self.assertEqual(set(iter_files(self.destdir)), files) + +class TestAppendLine(FileTestCaseBase): + def test_empty(self): + """append_line: create file + append \n when needed""" + line = "this is a line of text with no newline" + outfile = self.tmpdir+'/outfile' + append_line(outfile, line) + self.assertEqual(open(outfile).read(), line+'\n') + + def test_append(self): + """append_line: adds a line to the end of an existing file""" + oldlines = ["line one", "line two", "and I'm line three"] + outfile = self.tmpdir+'/outfile' + with open(outfile, 'w') as outf: + for line in oldlines: + outf.write(line+'\n') + line = "this line contains a newline already\n" + append_line(outfile, line) + self.assertEqual(open(outfile).read(), '\n'.join(oldlines+[line])) + +from driver_updates import read_lines +class TestReadLine(FileTestCaseBase): + def test_empty(self): + """read_lines: return [] for empty file""" + [empty] = self.makefiles("emptyfile") + self.assertEqual(read_lines(empty), []) + + def test_missing(self): + """read_lines: return [] for missing file""" + self.assertEqual(read_lines(self.tmpdir+'/no-such-file'),[]) + + def test_readlines(self): + """read_lines: returns a list of lines without trailing newlines""" + filedata = 'line one\nline two\n\nline four\n' + outfile = self.tmpdir+'/outfile' + with open(outfile, 'w') as outf: + outf.write(filedata) + lines = read_lines(outfile) + self.assertEqual(lines, ['line one', 'line two','','line four']) + + def test_readline_and_append_line(self): + """read_lines: returns items as passed to append_line""" + filename = self.tmpdir+'/outfile' + items = ["one", "two", "five"] + for i in items: + append_line(filename, i) + self.assertEqual(items, read_lines(filename)) + +class TestMkdirSeq(FileTestCaseBase): + def test_basic(self): + """mkdir_seq: first dir ends with 1""" + newdir = mkdir_seq(self.srcdir+'/DD-') + self.assertEqual(newdir, self.srcdir+'/DD-1') + self.assertTrue(os.path.isdir(newdir)) + + def test_one_exists(self): + """mkdir_seq: increment number if file exists""" + firstdir = mkdir_seq(self.srcdir+'/DD-') + newdir = mkdir_seq(self.srcdir+'/DD-') + self.assertEqual(newdir, self.srcdir+'/DD-2') + self.assertTrue(os.path.isdir(newdir)) + self.assertTrue(os.path.isdir(firstdir)) + +from driver_updates import find_repos, save_repo, ARCH +# As far as we know, this is what makes a valid repo: rhdd3 + rpms/`uname -m`/ +def makerepo(topdir, desc=None): + descfile = makefile(topdir+'/rhdd3') + if not desc: + desc = os.path.basename(topdir) + with open(descfile, "w") as outf: + outf.write(desc+"\n") + makedir(topdir+'/rpms/'+ARCH) + +class TestFindRepos(FileTestCaseBase): + def test_basic(self): + """find_repos: return RPM dir if a valid repo is found""" + makerepo(self.tmpdir) + repos = find_repos(self.tmpdir) + self.assertEqual(repos, [self.tmpdir+'/rpms/'+ARCH]) + self.assertTrue(os.path.isdir(repos[0])) + + def test_multiple_subdirs(self): + """find_repos: descend multiple subdirs if needed""" + makerepo(self.tmpdir+'/driver1') + makerepo(self.tmpdir+'/sub/driver1') + makerepo(self.tmpdir+'/sub/driver2') + repos = find_repos(self.tmpdir) + self.assertEqual(len(repos),3) + +class TestSaveRepo(FileTestCaseBase): + def test_basic(self): + """save_repo: copies a directory to /run/install/DD-X""" + makerepo(self.srcdir) + [repo] = find_repos(self.srcdir) + makefile(repo+'/fake-something.rpm') + saved = save_repo(repo, target=self.destdir) + self.assertEqual(set(os.listdir(saved)), set(["fake-something.rpm"])) + self.assertEqual(saved, os.path.join(self.destdir, "DD-1")) + +from driver_updates import mount, umount, mounted +class MountTestCase(unittest.TestCase): + @mock.patch('driver_updates.mkdir_seq') + @mock.patch('driver_updates.subprocess.check_call') + def test_mkdir(self, check_call, mkdir): + """mount: makes mountpoint if needed""" + dev, mnt = '/dev/fake', '/media/DD-1' + mkdir.return_value = mnt + mountpoint = mount(dev) + mkdir.assert_called_once_with('/media/DD-') + check_call.assert_called_once_with(["mount", dev, mnt]) + self.assertEqual(mnt, mountpoint) + + @mock.patch('driver_updates.mkdir_seq') + @mock.patch('driver_updates.subprocess.check_call') + def test_basic(self, check_call, mkdir): + """mount: calls mount(8) to mount a device/image""" + dev, mnt = '/dev/fake', '/media/fake' + mount(dev, mnt) + check_call.assert_called_once_with(["mount", dev, mnt]) + self.assertFalse(mkdir.called) + + @mock.patch('driver_updates.subprocess.call') + def test_umount(self, call): + """umount: calls umount(8)""" + mnt = '/mnt/fake' + umount(mnt) + call.assert_called_once_with(["umount", mnt]) + + @mock.patch('driver_updates.mount') + @mock.patch('driver_updates.umount') + def test_mount_manager(self, mock_umount, mock_mount): + """mounted: context manager mounts/umounts as expected""" + dev, mnt = '/dev/fake', '/media/fake' + mock_mount.return_value = mnt + with mounted(dev, mnt) as mountpoint: + mock_mount.assert_called_once_with(dev, mnt) + self.assertFalse(mock_umount.called) + self.assertEqual(mountpoint, mnt) + mock_umount.assert_called_once_with(mnt) + +# NOTE: dd_list and dd_extract get tested pretty thoroughly in tests/dd_tests, +# so this is a slightly higher-level test case +from driver_updates import dd_list, dd_extract, Driver +fake_module = Driver( + source='/repo/path/to/fake-driver-1.0-1.rpm', + name='fake-driver', + flags='modules firmwares', + description='Wow this is totally a fake driver.\nHooray for this', + repo='/repo/path/to' +) +fake_enhancement = Driver( + source='/repo/path/to/fake-enhancement-1.0-1.rpm', + name='fake-enhancement', + flags='binaries libraries', + description='This is enhancing the crap out of the installer.\n\nYeah.', + repo=fake_module.repo +) +def dd_list_output(driver): + out='{0.source}\n{0.name}\n{0.flags}\n{0.description}\n---\n'.format(driver) + return out.encode('utf-8') + +class DDUtilsTestCase(unittest.TestCase): + @mock.patch("driver_updates.subprocess.check_output") + def test_dd_list(self, check_output): + """dd_list: returns a list of Driver objects parsed from output""" + output = dd_list_output(fake_module)+dd_list_output(fake_enhancement) + check_output.return_value = output + anaconda, kernel = '19.0', os.uname()[2] + result = dd_list(fake_module.repo) + cmd = check_output.call_args[0][0] + self.assertIn(kernel, cmd) + self.assertIn(anaconda, cmd) + self.assertIn(fake_module.repo, cmd) + self.assertTrue(cmd[0].endswith("dd_list")) + self.assertEqual(len(result), 2) + mod, enh = sorted(result, key=lambda d: d.name) + self.assertEqual(mod.__dict__, fake_module.__dict__) + self.assertEqual(enh.__dict__, fake_enhancement.__dict__) + + @mock.patch("driver_updates.subprocess.check_output") + def test_dd_extract(self, check_output): + """dd_extract: call binary with expected arguments""" + rpm = "/some/kind/of/path.rpm" + outdir = "/output/dir" + dd_extract(rpm, outdir) + cmd = check_output.call_args[0][0] + self.assertIn(os.uname()[2], cmd) + self.assertIn(rpm, cmd) + self.assertIn(outdir, cmd) + self.assertIn("-blmf", cmd) + self.assertTrue(cmd[0].endswith("dd_extract")) + +from driver_updates import extract_drivers, grab_driver_files, load_drivers + +@mock.patch("driver_updates.ensure_dir") +@mock.patch("driver_updates.save_repo") +@mock.patch("driver_updates.append_line") +@mock.patch("driver_updates.dd_extract") +class ExtractDriversTestCase(unittest.TestCase): + def test_drivers(self, mock_extract, mock_append, mock_save, *args): + """extract_drivers: save repo, write pkglist""" + extract_drivers(drivers=[fake_enhancement, fake_module]) + # extracts all listed modules + mock_extract.assert_has_calls([ + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_module.source, "/updates") + ], any_order=True) + pkglist = "/run/install/dd_packages" + mock_append.assert_called_once_with(pkglist, fake_module.name) + mock_save.assert_called_once_with(fake_module.repo) + + def test_enhancements(self, mock_extract, mock_append, mock_save, *args): + """extract_drivers: extract selected drivers, don't save enhancements""" + extract_drivers(drivers=[fake_enhancement]) + mock_extract.assert_called_once_with( + fake_enhancement.source, "/updates" + ) + self.assertFalse(mock_append.called) + self.assertFalse(mock_save.called) + + def test_repo(self, mock_extract, mock_append, mock_save, *args): + """extract_drivers(repos=[...]) extracts all drivers from named repos""" + with mock.patch("driver_updates.dd_list", side_effect=[ + [fake_enhancement], + [fake_enhancement, fake_module]]): + extract_drivers(repos=['enh_repo', 'mod_repo']) + mock_extract.assert_has_calls([ + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_enhancement.source, "/updates"), + mock.call(fake_module.source, "/updates") + ]) + pkglist = "/run/install/dd_packages" + mock_append.assert_called_once_with(pkglist, fake_module.name) + mock_save.assert_called_once_with(fake_module.repo) + +class GrabDriverFilesTestCase(FileTestCaseBase): + def test_basic(self): + """grab_driver_files: copy drivers into place, return module list""" + # create a bunch of fake extracted files + outdir = self.tmpdir + '/extract-outdir' + moddir = outdir + "/lib/modules/%s/kernel/" % os.uname()[2] + fwdir = outdir + "/lib/firmware/" + modules = makefiles(moddir+"net/funk.ko", moddir+"fs/lolfs.ko.xz") + firmware = makefiles(fwdir+"funk.fw") + makefiles(outdir+"/usr/bin/monkey", outdir+"/other/dir/blah.ko") + mod_upd_dir = self.tmpdir+'/module-updates' + fw_upd_dir = self.tmpdir+'/fw-updates' + # use our updates dirs instead of the default updates dirs + with mock.patch.multiple("driver_updates", + MODULE_UPDATES_DIR=mod_upd_dir, + FIRMWARE_UPDATES_DIR=fw_upd_dir): + modnames = grab_driver_files(outdir) + self.assertEqual(set(modnames), set(["funk", "lolfs"])) + modfiles = set(['funk.ko', 'lolfs.ko.xz']) + fwfiles = set(['funk.fw']) + # modules/firmware are *not* in their old locations + self.assertEqual([f for f in modules+firmware if os.path.exists(f)], []) + # modules are in the system's updates dir + self.assertEqual(set(os.listdir(mod_upd_dir)), modfiles) + # modules are also in outdir's updates dir + self.assertEqual(set(os.listdir(outdir+'/'+mod_upd_dir)), modfiles) + # repeat for firmware + self.assertEqual(set(os.listdir(fw_upd_dir)), fwfiles) + self.assertEqual(set(os.listdir(outdir+'/'+fw_upd_dir)), fwfiles) + +class LoadDriversTestCase(unittest.TestCase): + @mock.patch("driver_updates.subprocess.call") + def test_basic(self, call): + """load_drivers: runs depmod and modprobes all named modules""" + modnames = ['mod1', 'mod2'] + load_drivers(modnames) + call.assert_has_calls([ + mock.call(["depmod", "-a"]), + mock.call(["modprobe", "-a"] + modnames) + ]) + +from driver_updates import process_driver_disk +class ProcessDriverDiskTestCase(unittest.TestCase): + def setUp(self): + # an iterable that returns fake mountpoints, for mocking mount() + self.fakemount = ["/mnt/DD-%i" % n for n in range(1,10)] + # an iterable that returns fake repos, for mocking find_repos() + self.frepo = { + '/mnt/DD-1': ['/mnt/DD-1/repo1'], + '/mnt/DD-2': ['/mnt/DD-2/repo1', '/mnt/DD-2/repo2'], + } + # fake iso listings for iso_dir + self.fiso = { + '/mnt/DD-1': [], + '/mnt/DD-2': [], + '/mnt/DD-3': [], + } + # a context-manager object to be returned by the mock mounted() + mounted_ctx = mock.MagicMock( + __enter__=mock.MagicMock(side_effect=self.fakemount), # mount + __exit__=mock.MagicMock(return_value=None), # umount + ) + self.modlist = [] + # set up our patches + patches = ( + mock.patch("driver_updates.mounted", return_value=mounted_ctx), + mock.patch("driver_updates.find_repos", side_effect=self.frepo.get), + mock.patch("driver_updates.find_isos", side_effect=self.fiso.get), + mock.patch("driver_updates.extract_drivers", return_value=True), + mock.patch("driver_updates.load_drivers"), + mock.patch('driver_updates.grab_driver_files', + side_effect=lambda: self.modlist), + ) + self.mocks = {p.attribute:p.start() for p in patches} + for p in patches: self.addCleanup(p.stop) + + def test_basic(self): + """process_driver_disk: mount disk, extract RPMs, grab + load drivers""" + dev = '/dev/fake' + process_driver_disk(dev) + # did we mount the initial device, and then the .iso we find therein? + self.mocks['mounted'].assert_called_once_with(dev) + self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-1']) + self.mocks['grab_driver_files'].assert_called_once_with() + self.mocks['load_drivers'].assert_called_once_with(self.modlist) + + def test_recursive(self): + """process_driver_disk: recursively process .isos at toplevel""" + dev = '/dev/fake' + # first mount has no repos, but an iso + self.frepo['/mnt/DD-1'] = [] + self.fiso['/mnt/DD-1'].append('magic.iso') + self.fiso['/mnt/DD-2'].append('ignored.iso') + process_driver_disk(dev) + # did we mount the initial device, and the iso therein? + # also: we ignore ignored.iso because magic.iso is a proper DD + self.mocks['mounted'].assert_has_calls([ + mock.call(dev), mock.call('magic.iso') + ]) + # we extracted drivers from the repo(s) in magic.iso + self.mocks['extract_drivers'].assert_called_once_with(repos=self.frepo['/mnt/DD-2']) + self.mocks['grab_driver_files'].assert_called_once_with() + self.mocks['load_drivers'].assert_called_once_with(self.modlist) + + def test_no_drivers(self): + """process_driver_disk: don't run depmod etc. if no new drivers""" + dev = '/dev/fake' + self.mocks['extract_drivers'].return_value = False + process_driver_disk(dev) + self.assertFalse(self.mocks['grab_driver_files'].called) + self.assertFalse(self.mocks['load_drivers'].called) + +from driver_updates import finish, mark_finished, all_finished + +class FinishedTestCase(FileTestCaseBase): + def test_mark_finished(self): + """mark_finished: appends a line to /tmp/dd_finished""" + requeststr = "WOW SOMETHING OR OTHER" + mark_finished(requeststr, topdir=self.tmpdir) + finished = self.tmpdir+'/dd_finished' + self.assertTrue(os.path.exists(finished)) + self.assertEqual(read_lines(finished), [requeststr]) + + def test_all_finished(self): + """all_finished: True if all lines from dd_todo are in dd_finished""" + todo = self.tmpdir+'/dd_todo' + requests = ['one', 'two', 'final thingy'] + with open(todo, 'w') as outf: + outf.write(''.join(r+'\n' for r in requests)) + self.assertEqual(set(read_lines(todo)), set(requests)) + for r in reversed(requests): + self.assertFalse(all_finished(topdir=self.tmpdir)) + mark_finished(r, topdir=self.tmpdir) + self.assertTrue(all_finished(topdir=self.tmpdir)) + + def test_extra_finished(self): + """all_finished: True if dd_finished has more items than dd_todo""" + self.test_all_finished() + mark_finished("BONUS", topdir=self.tmpdir) + self.assertTrue(all_finished(topdir=self.tmpdir)) + + def test_finish(self): + """finish: mark request finished, and write dd.done if all complete""" + todo = self.tmpdir+'/dd_todo' + done = self.tmpdir+'/dd.done' + requests = ['one', 'two', 'final thingy'] + with open(todo, 'w') as outf: + outf.write(''.join(r+'\n' for r in requests)) + for r in reversed(requests): + print("marking %s" % r) + self.assertFalse(os.path.exists(done)) + finish(r, topdir=self.tmpdir) + self.assertTrue(os.path.exists(done)) + +from driver_updates import get_deviceinfo, DeviceInfo +blkid_out = b'''\ +DEVNAME=/dev/sda2 +UUID=0f21a3d1-dcd3-4ab4-a292-c5556850d561 +TYPE=ext4 + +DEVNAME=/dev/sda1 +UUID=C53C-EE46 +TYPE=vfat + +DEVNAME=/dev/sda3 +UUID=4126dbb6-c7d3-47b4-b1fc-9bb461df0067 +TYPE=btrfs + +DEVNAME=/dev/loop0 +UUID=6f16967e-0388-4276-bd8d-b88e5b217a55 +TYPE=ext4 +''' +disk_labels = { + '/dev/sdb1': 'metroid_srv', + '/dev/loop0': 'I\x20\u262d\x20COMMUNISM', + '/dev/sda3': 'metroid_root' +} +devicelist = [ + DeviceInfo(DEVNAME='/dev/sda2', TYPE='ext4', + UUID='0f21a3d1-dcd3-4ab4-a292-c5556850d561'), + DeviceInfo(DEVNAME='/dev/sda1', TYPE='vfat', + UUID='C53C-EE46'), + DeviceInfo(DEVNAME='/dev/sda3', TYPE='btrfs', LABEL='metroid_root', + UUID='4126dbb6-c7d3-47b4-b1fc-9bb461df0067'), + DeviceInfo(DEVNAME='/dev/loop0', TYPE='ext4', + LABEL='I\x20\u262d\x20COMMUNISM', + UUID='6f16967e-0388-4276-bd8d-b88e5b217a55'), +] +# also covers blkid, get_disk_labels, DeviceInfo +class DeviceInfoTestCase(unittest.TestCase): + @mock.patch('driver_updates.subprocess.check_output',return_value=blkid_out) + @mock.patch('driver_updates.get_disk_labels',return_value=disk_labels) + def test_basic(self, get_disk_labels, check_output): + """get_deviceinfo: parses DeviceInfo from blkid etc.""" + disks = get_deviceinfo() + self.assertEqual(len(disks), 4) + disks.sort(key=lambda d: d.device) + loop, efi, boot, root = disks + self.assertEqual(vars(boot), vars(devicelist[0])) + self.assertEqual(vars(efi), vars(devicelist[1])) + self.assertEqual(vars(root), vars(devicelist[2])) + self.assertEqual(vars(loop), vars(devicelist[3])) + + def test_shortdev(self): + d = DeviceInfo(DEVNAME="/dev/disk/by-label/OEMDRV") + with mock.patch("os.path.realpath", return_value="/dev/i2o/hdb"): + self.assertEqual(d.shortdev, "i2o/hdb") + +# TODO: test TextMenu itself + +# py2/3 compat +import sys +if sys.version_info.major == 3: + from io import StringIO +else: + from io import BytesIO as StringIO + +from driver_updates import device_menu +class DeviceMenuTestCase(unittest.TestCase): + def setUp(self): + patches = ( + mock.patch('driver_updates.get_deviceinfo',return_value=devicelist), + ) + self.mocks = {p.attribute:p.start() for p in patches} + for p in patches: self.addCleanup(p.stop) + + def test_device_menu_exit(self): + """device_menu: 'c' exits the menu""" + with mock.patch('driver_updates._input', side_effect=['c']): + dev = device_menu() + self.assertEqual(dev, []) + self.assertEqual(self.mocks['get_deviceinfo'].call_count, 1) + + def test_device_menu_refresh(self): + """device_menu: 'r' makes the menu refresh""" + with mock.patch('driver_updates._input', side_effect=['r','c']): + device_menu() + self.assertEqual(self.mocks['get_deviceinfo'].call_count, 2) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_device_menu(self, stdout): + """device_menu: choosing a number returns that Device""" + choose_num='2' + with mock.patch('driver_updates._input', return_value=choose_num): + result = device_menu() + # if you hit '2' you should get the corresponding device from the list + self.assertEqual(len(result), 1) + dev = result[0] + self.assertEqual(vars(dev), vars(devicelist[int(choose_num)-1])) + # find the corresponding line on-screen + screen = [l.strip() for l in stdout.getvalue().splitlines()] + match = [l for l in screen if l.startswith(choose_num+')')] + self.assertEqual(len(match), 1) + line = match.pop(0) + # the device name (at least) should be on this line + self.assertIn(os.path.basename(dev.device), line) diff --git a/tests/nosetests.sh b/tests/nosetests.sh index 6db3c3d..e0b1c1e 100755 --- a/tests/nosetests.sh +++ b/tests/nosetests.sh @@ -8,7 +8,7 @@ fi
# If no tests were selected, select all of them if [ $# -eq 0 ]; then - set -- "${top_srcdir}"/tests/*_tests "${top_srcdir}"/dracut/test_*.py + set -- "${top_srcdir}"/tests/*_tests fi
exec nosetests -v --exclude=logpicker -a !acceptance,!slow "$@"
anaconda-patches@lists.fedorahosted.org