Source code for conftest

#!/usr/bin/env python2
#   Author(s): Milan Falesnik <mfalesni@redhat.com>
#              James Laska <jlaska@redhat.com>
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#
#   Copyright (c) 2012 Red Hat, Inc. All rights reserved.
#
#   This copyrighted material is made available to anyone wishing
#   to use, modify, copy, or redistribute it subject to the terms
#   and conditions of the GNU General Public License version 2.
#
#   This program is distributed in the hope that it will be
#   useful, but WITHOUT ANY WARRANTY; without even the implied
#   warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#   PURPOSE. See the GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public
#   License along with this program; if not, write to the Free
#   Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
#   Boston, MA 02110-1301, USA.
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

""" This module contains generator functions for variable injecting of py.test framework.

    Some of them are cached, some not. I don't know how to make multi-level dependency yet.
    Update 2013-08-19: Cahnged to newer styled fixtures. Dependency now should work O.K. :)
"""

import pytest
import os
import re
import subprocess
import common.shell
import common.yum
import common.rpm
import common.services

from ConfigParser import ConfigParser

try:
    import json
except ImportError:
    import simplejson as json

@pytest.fixture
[docs]def audreyvars(): """Setups variables for testing :returns: All Audrey-relevant environment variables. :rtype: dict """ result = {} for key in os.environ: if key.startswith("AUDREY_VAR_KATELLO_REGISTER_"): result[re.sub("^AUDREY_VAR_KATELLO_REGISTER_", "", key)] = os.environ[key] return result
@pytest.fixture
[docs]def katello_discoverable(request): """Returns boolean (True of False) to indicate whether the provided katello server is accessible :param request: py.test.request :returns: Accesibility of Katello server :rtype: ``bool`` """ cmd = "ping -q -c5 %s" % request.getfuncargvalue("audreyvars")["KATELLO_HOST"] print "# %s" % cmd return subprocess.call(cmd.split()) == 0
@pytest.fixture
[docs]def tunnel_requested(request): """Determines whether setting up SSH tunnel is requested :param request: py.test request. :returns: Whether was tunnel requested. :rtype: ``bool`` """ audreyvars = request.getfuncargvalue("audreyvars") ec2_deployment = request.getfuncargvalue("ec2_deployment") ssh = audreyvars.get("SSH_TUNNEL_ENABLED", "Auto") # Did we ask for a tunnel to be setup if ssh.lower() == "true": return True # Or if we are 'auto' and running in ec2 if ssh.lower() == "auto" and ec2_deployment: return True # Otherwise, it wasn't requested return False
@pytest.fixture
[docs]def system_groups(): """Determine applicable system groups for the current system :returns: All Audrey-relevant environment variables. :rtype: dict """ group_names = [] # A group for the current system platform (aka $basearch) group_names.append(common.yum.get_yum_variable('basearch')) # A group for the current system release (aka $releasever) # Per katello rules, replace any non-alpha-numeric character with a '_' group_names.append(re.sub(r'\W', '_', common.yum.get_yum_variable('releasever'))) # A group to indicate which provider the instance is deployed to if is_rhev_deployment(): group_names.append('provider_rhev') if is_vsphere_deployment(): group_names.append('provider_vsphere') if ec2_deployment(): group_names.append('provider_ec2') # Add a group name for the ec2 region (buf, rc) = common.shell.command('curl --fail http://169.254.169.254/latest/dynamic/instance-identity/document') if rc == 0: ec2_data = json.loads(buf) if ec2_data.has_key('region'): group_names.append('ec2-%s' % ec2_data.get('region')) return group_names
@pytest.fixture(scope="session")
[docs]def is_rhev_deployment(): """Setups cached variable whether it's RHEV deployment or not. :returns: Whether is this RHEV deployment (cached). :rtype: ``bool`` """ return common.rpm.package_installed('rhev-agent') or \ common.shell.command("grep -qi rhev /sys/class/virtio-ports/*/name")[1] == 0
@pytest.fixture(scope="session")
[docs]def is_vsphere_deployment(): """Setups cached variable whether it's vsphere deployment or not. :returns: Whether is this vsphere deployment (cached). :rtype: ``bool`` """ return common.rpm.package_installed('open-vm-tools') or \ common.shell.command("grep -qi vmware /sys/bus/scsi/devices/*/vendor")[1] == 0
@pytest.fixture(scope="session")
[docs]def ec2_deployment(): """Setups cached variable whether it's ec2 deployment or not. :returns: Whether is this EC2 deployment (cached). :rtype: ``bool`` """ # The --fail curl argument will cause curl to exit with rc=22 if a server # failure occurs (e.g. 403 or 404) cmd = 'curl --fail http://169.254.169.254/latest/dynamic/instance-identity/document' print "# %s" % cmd return subprocess.call(cmd.split()) == 0
@pytest.fixture
[docs]def subscription_manager_version(): """Setups cached variable of version of sub-man :returns: SM version from cache :rtype: 2-tuple """ sm_rpm_ver = common.shell.run("rpm -q --queryformat %{VERSION} subscription-manager") sm_ver_maj, sm_ver_min, sm_ver_rest = sm_rpm_ver.split(".", 2) return int(sm_ver_maj), int(sm_ver_min)
@pytest.fixture
[docs]def system_uuid(): """ Returns system UUID from subscription-manager :returns: System UUID :rtype: ``str`` """ facts = common.shell.run("subscription-manager facts --list") facts = facts.strip().split("\n") for fact in facts: name, value = fact.split(":", 1) if name == "system.uuid": return value.lstrip()
@pytest.fixture
[docs]def selinux_enabled(): """ Detects whether is SElinux enabled or not :returns: SElinux status :rtype: ``bool`` """ try: common.shell.run("selinuxenabled") return True except AssertionError: return False
@pytest.fixture
[docs]def selinux_getenforce(): """ Returns current enforcing mode of SELinux :returns: SElinux enforcing status :rtype: ``str`` """ return common.shell.run("/usr/sbin/getenforce").strip()
@pytest.fixture
[docs]def selinux_getenforce_conf(): """ Returns current enforcing mode of SELinux from config file :returns: SElinux enforcing status :rtype: ``str`` """ f = open("/etc/sysconfig/selinux", "r") lines = [] for line in f.readlines(): if line.startswith("SELINUX="): lines.append(line) f.close() # Check whether is only one assert len(lines) == 1 return lines[0].split("=")[1].strip()
@pytest.fixture
[docs]def selinux_type(): """ Returns current SELINUX type from config file :returns: SElinux type :rtype: ``str`` """ f = open("/etc/sysconfig/selinux", "r") lines = [] for line in f.readlines(): if line.startswith("SELINUXTYPE="): lines.append(line) f.close() # Check whether is only one assert len(lines) == 1 return lines[0].split("=")[1].strip()
@pytest.fixture
[docs]def rpm_package_list(): """ Returns list of all installed packages in computer. :returns: List of all installed packages in computer. :rtype: ``list`` """ raw = common.shell.run("rpm -qa").strip() return [x.strip() for x in raw.split("\n")]
@pytest.fixture
[docs]def rpm_package_list_names(): """ Returns list of all installed packages in computer. :returns: List of all installed packages in computer. :rtype: ``list`` """ raw = common.shell.run("rpm -qa --qf \"%{NAME} \"").strip() return raw.split(" ")
@pytest.fixture
[docs]def rhel_release(): """Returns RHEL version :returns: RHEL version :rtype: ``tuple`` """ redhat_release_content = common.shell.run("cat /etc/redhat-release").strip() redhat_version_field = redhat_release_content.split(" ")[6] class RedhatRelease(object): def __init__(self, major, minor, distro="RHEL"): self.major = int(major) self.minor = int(minor) self.distro = distro # Compatibility layer def __getitem__(self, position): if position == 0: return str(self.major) elif position == 1: return str(self.minor) else: raise KeyError("only 0 and 1 supported") #return tuple(redhat_version_field.split(".", 1)) return RedhatRelease(*redhat_version_field.split(".", 1))
@pytest.fixture
[docs]def PATH(): """ PATH environment variable :returns: List of directories in $PATH :rtype: ``list`` """ return os.environ["PATH"].split(":")
@pytest.fixture
[docs]def gpgcheck_enabled(): """ Whether is GPG check enabled in yum :returns: GPG check status :rtype: ``bool`` """ cfg = ConfigParser() cfg.read(["/etc/yum.conf"]) return int(cfg.get("main", "gpgcheck")) == 1
@pytest.fixture
[docs]def chkconfig_list(): """ Returns list of all services with enablement in each runlevel :returns: All services. :rtype: ``dict`` """ result = {} stdout = common.shell.run("chkconfig --list").strip() for line in stdout.split("\n"): line = re.sub("[[:blank:]]+", "\t", line) fields = line.split("\t") servicename = fields[0].strip() fields = fields[1:] result[servicename] = {} for field in fields: (runlevel, status) = field.split(":") runlevel = int(runlevel) if status.lower() == "on": status = True elif status.lower() == "off": status = False else: pytest.fail(msg="Bad parsing of chkconfig --list") result[servicename][runlevel] = status return result
@pytest.fixture
[docs]def service_check(): """ Produces service-checking fixture """ class ServiceChecker(object): def __init__(self, services): self.services = services def __call__(self, service, runlevel, active=True): return common.services.service_active_in_runlevel(self.services, service, runlevel, active) return ServiceChecker(chkconfig_list())
@pytest.fixture
[docs]def is_systemd(): """ Checks for systemd presence """ try: common.rpm.q("systemd") return True except AssertionError: return False