From 316ff801ecb9e3e77b092ff8e6d2313463c5f784 Mon Sep 17 00:00:00 2001
From: Sudhir Menon <sumenon@redhat.com>
Date: Wed, 23 Sep 2020 13:28:44 +0530
Subject: [PATCH] ipatests: ipa-healthcheck test fixes running on RHEL

1. Added function in tasks.py to get healthcheck version.
2. Added if else condition to certain tests to
check healthcheck version and then assert the expected test output

Signed-off-by: Sudhir Menon <sumenon@redhat.com>
---
 ipatests/pytest_ipa/integration/tasks.py      |  22 +++
 .../test_integration/test_ipahealthcheck.py   | 126 ++++++++++++++----
 2 files changed, 125 insertions(+), 23 deletions(-)

diff --git a/ipatests/pytest_ipa/integration/tasks.py b/ipatests/pytest_ipa/integration/tasks.py
index fceac1b628..0690e7f6cb 100755
--- a/ipatests/pytest_ipa/integration/tasks.py
+++ b/ipatests/pytest_ipa/integration/tasks.py
@@ -2345,6 +2345,28 @@ def get_sssd_version(host):
     return parse_version(version)
 
 
+def get_healthcheck_version(host):
+    """
+    Function to get healthcheck version on fedora and rhel
+    """
+    platform = get_platform(host)
+    if platform in ("rhel", "fedora"):
+        cmd = host.run_command(
+            ["rpm", "-qa", "--qf", "%{VERSION}", "*ipa-healthcheck"]
+        )
+        healthcheck_version = cmd.stdout_text
+        if not healthcheck_version:
+            raise ValueError(
+                "get_healthcheck_version: "
+                "ipa-healthcheck package is not installed"
+            )
+    else:
+        raise ValueError(
+            "get_healthcheck_version: unknown platform %s" % platform
+        )
+    return healthcheck_version
+
+
 def run_ssh_cmd(
     from_host=None, to_host=None, username=None, cmd=None,
     auth_method=None, password=None, private_key_path=None,
diff --git a/ipatests/test_integration/test_ipahealthcheck.py b/ipatests/test_integration/test_ipahealthcheck.py
index f0ac5c978a..561777efee 100644
--- a/ipatests/test_integration/test_ipahealthcheck.py
+++ b/ipatests/test_integration/test_ipahealthcheck.py
@@ -23,6 +23,7 @@
 from ipaplatform.paths import paths
 from ipaplatform.osinfo import osinfo
 from ipatests.test_integration.base import IntegrationTest
+from pkg_resources import parse_version
 from ipatests.test_integration.test_cert import get_certmonger_fs_id
 from ipatests.test_integration.test_external_ca import (
     install_server_external_ca_step1,
@@ -44,6 +45,22 @@
 
 sources = [
     "ipahealthcheck.dogtag.ca",
+    "ipahealthcheck.ds.replication",
+    "ipahealthcheck.ipa.certs",
+    "ipahealthcheck.ipa.dna",
+    "ipahealthcheck.ipa.idns",
+    "ipahealthcheck.ipa.files",
+    "ipahealthcheck.ipa.host",
+    "ipahealthcheck.ipa.roles",
+    "ipahealthcheck.ipa.topology",
+    "ipahealthcheck.ipa.trust",
+    "ipahealthcheck.ipa.meta",
+    "ipahealthcheck.meta.core",
+    "ipahealthcheck.meta.services",
+    "ipahealthcheck.system.filesystemspace",
+]
+
+sources_0_4 = [
     "ipahealthcheck.ds.replication",
     "ipahealthcheck.dogtag.ca",
     "ipahealthcheck.ipa.certs",
@@ -55,6 +72,8 @@
     "ipahealthcheck.ipa.topology",
     "ipahealthcheck.ipa.trust",
     "ipahealthcheck.meta.services",
+    "ipahealthcheck.meta.core",
+    "ipahealthcheck.system.filesystemspace",
 ]
 
 ipa_cert_checks = [
@@ -103,6 +122,7 @@
 dogtag_checks = ["DogtagCertsConfigCheck", "DogtagCertsConnectivityCheck"]
 iparoles_checks = ["IPACRLManagerCheck", "IPARenewalMasterCheck"]
 replication_checks = ["ReplicationCheck"]
+replication_checks_0_4 = ["ReplicationConflictCheck"]
 ruv_checks = ["RUVCheck"]
 dna_checks = ["IPADNARangeCheck"]
 idns_checks = ["IPADNSSystemRecordsCheck"]
@@ -231,8 +251,13 @@ def test_run_ipahealthcheck_list_source(self):
         """
         Testcase to verify sources available in healthcheck tool.
         """
+        version = tasks.get_healthcheck_version(self.master)
         result = self.master.run_command(["ipa-healthcheck", "--list-sources"])
-        for source in sources:
+        if parse_version(version) >= parse_version("0.6"):
+            sources_avail = sources
+        else:
+            sources_avail = sources_0_4
+        for source in sources_avail:
             assert source in result.stdout_text
 
     def test_human_output(self, restart_service):
@@ -271,10 +296,15 @@ def test_replication_check_exists(self):
         Testcase to verify checks available in
         ipahealthcheck.ds.replication source
         """
+        version = tasks.get_healthcheck_version(self.master)
         result = self.master.run_command(
             ["ipa-healthcheck", "--source", "ipahealthcheck.ds.replication"]
         )
-        for check in replication_checks:
+        if parse_version(version) >= parse_version("0.6"):
+            checks = replication_checks
+        else:
+            checks = replication_checks_0_4
+        for check in checks:
             assert check in result.stdout_text
 
     def test_ipa_cert_check_exists(self):
@@ -663,6 +693,9 @@ def test_ipa_healthcheck_revocation(self):
         error_msg = (
             "Certificate tracked by {key} is revoked {revocation_reason}"
         )
+        error_msg_0_4 = (
+            "Certificate is revoked, unspecified"
+        )
 
         result = self.master.run_command(
             ["getcert", "list", "-f", paths.HTTPD_CERT_FILE]
@@ -685,11 +718,18 @@ def test_ipa_healthcheck_revocation(self):
         assert returncode == 1
         assert len(data) == 12
 
+        version = tasks.get_healthcheck_version(self.master)
         for check in data:
             if check["kw"]["key"] == request_id:
                 assert check["result"] == "ERROR"
                 assert check["kw"]["revocation_reason"] == "unspecified"
-                assert check["kw"]["msg"] == error_msg
+                if (parse_version(version) >= parse_version('0.6')):
+                    assert check["kw"]["msg"] == error_msg
+                else:
+                    assert (
+                        check["kw"]["msg"]
+                        == error_msg_0_4
+                    )
             else:
                 assert check["result"] == "SUCCESS"
 
@@ -735,14 +775,24 @@ def test_run_with_stopped_master(self, ipactl):
         contains only errors regarding master being stopped and no other false
         positives.
         """
-        returncode, output = run_healthcheck(
-            self.master,
-            "ipahealthcheck.meta",
-            output_type="human",
-            failures_only=True)
+        version = tasks.get_healthcheck_version(self.master)
+        if (parse_version(version) >= parse_version('0.6')):
+            returncode, output = run_healthcheck(
+                self.master,
+                "ipahealthcheck.meta",
+                output_type="human",
+                failures_only=True,
+            )
+        else:
+            returncode, output = run_healthcheck(
+                self.master,
+                "ipahealthcheck.meta.services",
+                output_type="human",
+                failures_only=True,
+            )
         assert returncode == 1
         errors = re.findall("ERROR: .*: not running", output)
-        assert len(errors) == len(output.split('\n'))
+        assert len(errors) == len(output.split("\n"))
 
     @pytest.fixture
     def move_ipa_ca_crt(self):
@@ -762,6 +812,7 @@ def test_chainexpiration_check_without_cert(self, move_ipa_ca_crt):
         Testcase checks that ERROR message is displayed
         when ipa ca crt file is not renamed
         """
+        version = tasks.get_healthcheck_version(self.master)
         error_text = (
             "[Errno 2] No such file or directory: '{}'"
             .format(paths.IPA_CA_CRT)
@@ -769,6 +820,10 @@ def test_chainexpiration_check_without_cert(self, move_ipa_ca_crt):
         msg_text = (
             "Error opening IPA CA chain at {key}: {error}"
         )
+        error_4_0_text = (
+            "[Errno 2] No such file or directory: '{}'"
+            .format(paths.IPA_CA_CRT)
+        )
         returncode, data = run_healthcheck(
             self.master,
             "ipahealthcheck.ipa.certs",
@@ -778,8 +833,11 @@ def test_chainexpiration_check_without_cert(self, move_ipa_ca_crt):
         for check in data:
             assert check["result"] == "ERROR"
             assert check["kw"]["key"] == paths.IPA_CA_CRT
-            assert check["kw"]["error"] == error_text
-            assert check["kw"]["msg"] == msg_text
+            if parse_version(version) >= parse_version("0.6"):
+                assert check["kw"]["error"] == error_text
+                assert check["kw"]["msg"] == msg_text
+            else:
+                assert error_4_0_text in check["kw"]["msg"]
 
     @pytest.fixture
     def modify_cert_trust_attr(self):
@@ -814,10 +872,15 @@ def test_ipacertnsstrust_check(self, modify_cert_trust_attr):
         Test for IPACertNSSTrust when trust attribute is modified
         for Server-Cert
         """
+        version = tasks.get_healthcheck_version(self.master)
         error_msg = (
             "Incorrect NSS trust for {nickname} in {dbdir}. "
             "Got {got} expected {expected}."
         )
+        error_msg_4_0 = (
+            "Incorrect NSS trust for Server-Cert cert-pki-ca. "
+            "Got CTu,u,u expected u,u,u"
+        )
         returncode, data = run_healthcheck(
             self.master, "ipahealthcheck.ipa.certs", "IPACertNSSTrust",
         )
@@ -828,7 +891,10 @@ def test_ipacertnsstrust_check(self, modify_cert_trust_attr):
                 assert check["kw"]["expected"] == "u,u,u"
                 assert check["kw"]["got"] == "CTu,u,u"
                 assert check["kw"]["dbdir"] == paths.PKI_TOMCAT_ALIAS_DIR
-                assert check["kw"]["msg"] == error_msg
+                if (parse_version(version) >= parse_version('0.6')):
+                    assert check["kw"]["msg"] == error_msg
+                else:
+                    assert check["kw"]["msg"] == error_msg_4_0
 
     def test_ipa_healthcheck_expiring(self, restart_service):
         """
@@ -1383,6 +1449,7 @@ def test_tomcat_filecheck_too_restrictive(self, modify_permissions):
             )
 
     def test_tomcat_filecheck_too_permissive(self, modify_permissions):
+        version = tasks.get_healthcheck_version(self.master)
         modify_permissions(self.master, path=paths.CA_CS_CFG_PATH,
                            mode="0666")
         returncode, data = run_healthcheck(
@@ -1399,13 +1466,20 @@ def test_tomcat_filecheck_too_permissive(self, modify_permissions):
             assert check["kw"]["type"] == 'mode'
             assert check["kw"]["expected"] == '0660'
             assert check["kw"]["got"] == '0666'
-            assert (
-                check["kw"]["msg"]
-                == "Permissions of %s are too permissive: "
-                   "0666 and should be 0660"
-                % check["kw"]["path"]
-            )
-
+            if (parse_version(version) >= parse_version('0.5')):
+                assert (
+                    check["kw"]["msg"]
+                    == "Permissions of %s are too permissive: "
+                       "0666 and should be 0660"
+                    % check["kw"]["path"]
+                )
+            else:
+                assert (
+                    check["kw"]["msg"]
+                    == "Permissions of %s are 0666 and should "
+                       "be 0660"
+                    % check["kw"]["path"]
+                )
 
 class TestIpaHealthCheckFilesystemSpace(IntegrationTest):
     """
@@ -1693,13 +1767,19 @@ def test_ipahealthcheck_certmongerca(self, getcert_ca):
             self.master, "ipahealthcheck.ipa.certs", "IPACertmongerCA",
         )
         assert returncode == 1
+        version = tasks.get_healthcheck_version(self.master)
         for check in data:
             if check["kw"]["key"] == "dogtag-ipa-ca-renew-agent":
                 assert check["result"] == "ERROR"
-                assert (
-                    check["kw"]["msg"]
-                    == "Certmonger CA '{key}' missing"
-                )
+                if (parse_version(version) >= parse_version('0.6')):
+                    assert (
+                        check["kw"]["msg"] == "Certmonger CA '{key}' missing"
+                    )
+                else:
+                    assert (
+                        check["kw"]["msg"]
+                        == "Certmonger CA 'dogtag-ipa-ca-renew-agent' missing"
+                    )
 
     @pytest.fixture()
     def rename_httpd_cert(self):
