This is an automated email from the git hooks/post-receive script.
firstyear pushed a commit to branch master
in repository 389-ds-base.
commit 9e4b381530181025d8b198b05db13c6431155eae
Author: William Brown <firstyear(a)redhat.com>
Date: Fri Nov 10 14:10:44 2017 +1000
Ticket 3 lib389 - python 3 compat for paged results test
Bug Description: python 3 support for paged results test
Fix Description: Rework paged results to use lib389 types. Add
sort controls to lib389. Fix rawaci on entry to return bytes. Add
support for organisation objects. Improve mappingtree to support
parent additions.
https://pagure.io/lib389/issue/3
Author: wibrown
Review by: spichugi (Thanks!)
---
.../suites/paged_results/paged_results_test.py | 557 ++++++++-------------
.../tests/suites/paged_results/sss_control.py | 123 -----
src/lib389/lib389/_controls.py | 107 +++-
src/lib389/lib389/_entry.py | 2 +-
src/lib389/lib389/backend.py | 4 +
src/lib389/lib389/idm/organization.py | 62 +++
src/lib389/lib389/idm/user.py | 5 +-
src/lib389/lib389/mappingTree.py | 14 +
8 files changed, 393 insertions(+), 481 deletions(-)
diff --git a/dirsrvtests/tests/suites/paged_results/paged_results_test.py
b/dirsrvtests/tests/suites/paged_results/paged_results_test.py
index ae2627b..49fb85d 100644
--- a/dirsrvtests/tests/suites/paged_results/paged_results_test.py
+++ b/dirsrvtests/tests/suites/paged_results/paged_results_test.py
@@ -15,7 +15,14 @@ from lib389.utils import *
from lib389.topologies import topology_st
from lib389._constants import DN_LDBM, DN_DM, DEFAULT_SUFFIX, BACKEND_NAME, PASSWORD
-from sss_control import SSSRequestControl
+from lib389._controls import SSSRequestControl
+
+from lib389.idm.user import UserAccounts
+from lib389.idm.organization import Organisation
+from lib389.idm.organisationalunit import OrganisationalUnit
+from lib389.backend import Backends
+
+from lib389._mapped_object import DSLdapObject
DEBUGGING = os.getenv('DEBUGGING', False)
@@ -26,9 +33,8 @@ else:
log = logging.getLogger(__name__)
-TEST_USER_NAME = 'simplepaged_test'
-TEST_USER_DN = 'uid={},{}'.format(TEST_USER_NAME, DEFAULT_SUFFIX)
TEST_USER_PWD = 'simplepaged_test'
+
NEW_SUFFIX_1_NAME = 'test_parent'
NEW_SUFFIX_1 = 'o={}'.format(NEW_SUFFIX_1_NAME)
NEW_SUFFIX_2_NAME = 'child'
@@ -41,29 +47,30 @@ NEW_BACKEND_2 = 'child_base'
def test_user(topology_st, request):
"""User for binding operation"""
- log.info('Adding user {}'.format(TEST_USER_DN))
- try:
- topology_st.standalone.add_s(Entry((TEST_USER_DN, {
- 'objectclass': 'top person'.split(),
- 'objectclass': 'organizationalPerson',
- 'objectclass': 'inetorgperson',
- 'cn': TEST_USER_NAME,
- 'sn': TEST_USER_NAME,
- 'userpassword': TEST_USER_PWD,
- 'mail': '%s(a)redhat.com' % TEST_USER_NAME,
- 'uid': TEST_USER_NAME
- })))
- except ldap.LDAPError as e:
- log.error('Failed to add user (%s): error (%s)' % (TEST_USER_DN,
- e.message['desc']))
- raise e
+ log.info('Adding user simplepaged_test')
+
+ users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)
+ user = users.create(properties={
+ 'uid': 'simplepaged_test',
+ 'cn': 'simplepaged_test',
+ 'sn': 'simplepaged_test',
+ 'uidNumber': '1234',
+ 'gidNumber': '1234',
+ 'homeDirectory': '/home/simplepaged_test',
+ 'userPassword': TEST_USER_PWD,
+ })
+
+ # Now add the ACI so simplepage_test can read the users ...
+ ACI_BODY = ensure_bytes('(targetattr= "uid || sn || dn")(version 3.0;
acl "Allow read for user"; allow (read,search,compare) userdn =
"ldap:///all";)')
+ topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_REPLACE, 'aci',
ACI_BODY)])
def fin():
- log.info('Deleting user {}'.format(TEST_USER_DN))
- topology_st.standalone.delete_s(TEST_USER_DN)
+ log.info('Deleting user simplepaged_test')
+ user.delete()
request.addfinalizer(fin)
+ return user
@pytest.fixture(scope="module")
def new_suffixes(topology_st):
@@ -72,47 +79,40 @@ def new_suffixes(topology_st):
"""
log.info('Adding suffix:{} and backend: {}'.format(NEW_SUFFIX_1,
NEW_BACKEND_1))
- topology_st.standalone.backend.create(NEW_SUFFIX_1,
- {BACKEND_NAME: NEW_BACKEND_1})
- topology_st.standalone.mappingtree.create(NEW_SUFFIX_1,
- bename=NEW_BACKEND_1)
- try:
- topology_st.standalone.add_s(Entry((NEW_SUFFIX_1, {
- 'objectclass': 'top',
- 'objectclass': 'organization',
- 'o': NEW_SUFFIX_1_NAME
- })))
- except ldap.LDAPError as e:
- log.error('Failed to add suffix ({}): error ({})'.format(NEW_SUFFIX_1,
-
e.message['desc']))
- raise
-
- log.info('Adding suffix:{} and backend: {}'.format(NEW_SUFFIX_2,
NEW_BACKEND_2))
- topology_st.standalone.backend.create(NEW_SUFFIX_2,
- {BACKEND_NAME: NEW_BACKEND_2})
- topology_st.standalone.mappingtree.create(NEW_SUFFIX_2,
- bename=NEW_BACKEND_2,
- parent=NEW_SUFFIX_1)
- try:
- topology_st.standalone.add_s(Entry((NEW_SUFFIX_2, {
- 'objectclass': 'top',
- 'objectclass': 'organizationalunit',
- 'ou': NEW_SUFFIX_2_NAME
- })))
- except ldap.LDAPError as e:
- log.error('Failed to add suffix ({}): error ({})'.format(NEW_SUFFIX_2,
-
e.message['desc']))
- raise
+ bes = Backends(topology_st.standalone)
+ be_1 = bes.create(properties={
+ 'cn': 'NEW_BACKEND_1',
+ 'nsslapd-suffix': NEW_SUFFIX_1,
+ })
+ # Create the root objects with their ACI
log.info('Adding ACI to allow our test user to search')
ACI_TARGET = '(targetattr != "userPassword || aci")'
ACI_ALLOW = '(version 3.0; acl "Enable anonymous access";allow (read,
search, compare)'
ACI_SUBJECT = '(userdn = "ldap:///anyone");)'
ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
- topology_st.standalone.modify_s(NEW_SUFFIX_1, mod)
+ o_1 = Organisation(topology_st.standalone, NEW_SUFFIX_1)
+ o_1.create(properties={
+ 'o': NEW_SUFFIX_1_NAME,
+ 'aci': ACI_BODY,
+ })
+
+ log.info('Adding suffix:{} and backend: {}'.format(NEW_SUFFIX_2,
NEW_BACKEND_2))
+ be_2 = bes.create(properties={
+ 'cn': 'NEW_BACKEND_2',
+ 'nsslapd-suffix': NEW_SUFFIX_2,
+ })
+
+ # We have to adjust the MT to say that BE_1 is a parent.
+ mt = be_2.get_mapping_tree()
+ mt.set_parent(NEW_SUFFIX_1)
+
+ ou_2 = OrganisationalUnit(topology_st.standalone, NEW_SUFFIX_2)
+ ou_2.create(properties={
+ 'ou': NEW_SUFFIX_2_NAME
+ })
def add_users(topology_st, users_num, suffix):
@@ -122,40 +122,33 @@ def add_users(topology_st, users_num, suffix):
"""
users_list = []
+ users = UserAccounts(topology_st.standalone, suffix, rdn=None)
+
log.info('Adding %d users' % users_num)
for num in sample(range(1000), users_num):
num_ran = int(round(num))
USER_NAME = 'test%05d' % num_ran
- USER_DN = 'uid=%s,%s' % (USER_NAME, suffix)
- users_list.append(USER_DN)
- try:
- topology_st.standalone.add_s(Entry((USER_DN, {
- 'objectclass': 'top person'.split(),
- 'objectclass': 'organizationalPerson',
- 'objectclass': 'inetorgperson',
- 'cn': USER_NAME,
- 'sn': USER_NAME,
- 'userpassword': 'pass%s' % num_ran,
- 'mail': '%s(a)redhat.com' % USER_NAME,
- 'uid': USER_NAME})))
- except ldap.LDAPError as e:
- log.error('Failed to add user (%s): error (%s)' % (USER_DN,
-
e.message['desc']))
- raise e
+
+ user = users.create(properties={
+ 'uid': USER_NAME,
+ 'sn': USER_NAME,
+ 'cn': USER_NAME,
+ 'uidNumber': '%s' % num_ran,
+ 'gidNumber': '%s' % num_ran,
+ 'homeDirectory': '/home/%s' % USER_NAME,
+ 'mail': '%s(a)redhat.com' % USER_NAME,
+ 'userpassword': 'pass%s' % num_ran,
+ })
+ users_list.append(user)
return users_list
-def del_users(topology_st, users_list):
+def del_users(users_list):
"""Delete users with DNs from given list"""
log.info('Deleting %d users' % len(users_list))
- for user_dn in users_list:
- try:
- topology_st.standalone.delete_s(user_dn)
- except ldap.LDAPError as e:
- log.error('Failed to delete user (%s): error (%s)' % (user_dn,
-
e.message['desc']))
- raise e
+ for user in users_list:
+ user.delete()
def change_conf_attr(topology_st, suffix, attr_name, attr_value):
@@ -164,30 +157,19 @@ def change_conf_attr(topology_st, suffix, attr_name, attr_value):
Returns previous attribute value.
"""
- try:
- entries = topology_st.standalone.search_s(suffix, ldap.SCOPE_BASE,
- 'objectclass=top',
- [attr_name])
- attr_value_bck = entries[0].data.get(attr_name)
- log.info('Set %s to %s. Previous value - %s. Modified suffix - %s.' % (
- attr_name, attr_value, attr_value_bck, suffix))
- if attr_value is None:
- topology_st.standalone.modify_s(suffix, [(ldap.MOD_DELETE,
- attr_name,
- attr_value)])
- else:
- topology_st.standalone.modify_s(suffix, [(ldap.MOD_REPLACE,
- attr_name,
- attr_value)])
- except ldap.LDAPError as e:
- log.error('Failed to change attr value (%s): error (%s)' % (attr_name,
-
e.message['desc']))
- raise e
+ entry = DSLdapObject(topology_st.standalone, suffix)
+ attr_value_bck = entry.get_attr_val_bytes(attr_name)
+ log.info('Set %s to %s. Previous value - %s. Modified suffix - %s.' % (
+ attr_name, attr_value, attr_value_bck, suffix))
+ if attr_value is None:
+ entry.remove_all(attr_name)
+ else:
+ entry.replace(attr_name, attr_value)
return attr_value_bck
-def paged_search(topology_st, suffix, controls, search_flt, searchreq_attrlist):
+def paged_search(conn, suffix, controls, search_flt, searchreq_attrlist):
"""Search at the DEFAULT_SUFFIX with ldap.SCOPE_SUBTREE
using Simple Paged Control(should the first item in the
list controls.
@@ -206,14 +188,10 @@ def paged_search(topology_st, suffix, controls, search_flt,
searchreq_attrlist):
searchreq_attrlist,
req_pr_ctrl.size,
str(controls)))
- msgid = topology_st.standalone.search_ext(suffix,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(suffix, ldap.SCOPE_SUBTREE, search_flt, searchreq_attrlist,
serverctrls=controls)
while True:
log.info('Getting page %d' % (pages,))
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
log.debug('Data: {}'.format(rdata))
all_results.extend(rdata)
pages += 1
@@ -228,11 +206,7 @@ def paged_search(topology_st, suffix, controls, search_flt,
searchreq_attrlist):
# Copy cookie from response control to request control
log.debug('Cookie: {}'.format(pctrls[0].cookie))
req_pr_ctrl.cookie = pctrls[0].cookie
- msgid = topology_st.standalone.search_ext(suffix,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(suffix, ldap.SCOPE_SUBTREE, search_flt,
searchreq_attrlist, serverctrls=controls)
else:
break # No more pages available
else:
@@ -242,8 +216,7 @@ def paged_search(topology_st, suffix, controls, search_flt,
searchreq_attrlist):
return all_results
-(a)pytest.mark.parametrize("page_size,users_num",
- [(6, 5), (5, 5), (5, 25)])
+(a)pytest.mark.parametrize("page_size,users_num", [(6, 5), (5, 5), (5, 25)])
def test_search_success(topology_st, test_user, page_size, users_num):
"""Verify that search with a simple paged results control
returns all entries it should without errors.
@@ -264,21 +237,16 @@ def test_search_success(topology_st, test_user, page_size,
users_num):
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
- try:
- log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ log.info('Set user bind %s ' % test_user)
+ conn = test_user.bind(TEST_USER_PWD)
- req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
+ req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
+ all_results = paged_search(conn, DEFAULT_SUFFIX, [req_ctrl], search_flt,
searchreq_attrlist)
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, [req_ctrl],
- search_flt, searchreq_attrlist)
+ log.info('%d results' % len(all_results))
+ assert len(all_results) == len(users_list)
- log.info('%d results' % len(all_results))
- assert len(all_results) == len(users_list)
- finally:
- log.info('Set Directory Manager bind back (test_search_success)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
@pytest.mark.parametrize("page_size,users_num,suffix,attr_name,attr_value,expected_err",
[
@@ -321,7 +289,7 @@ def test_search_limits_fail(topology_st, test_user, page_size,
users_num,
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
@@ -330,11 +298,8 @@ def test_search_limits_fail(topology_st, test_user, page_size,
users_num,
sort_ctrl = SSSRequestControl(True, ['sn'])
controls.append(sort_ctrl)
log.info('Initiate ldapsearch with created control instance')
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist, serverctrls=controls)
time_val = conf_param_dict.get('nsslapd-timelimit')
if time_val:
@@ -347,10 +312,10 @@ def test_search_limits_fail(topology_st, test_user, page_size,
users_num,
log.info('Getting page %d' % (pages,))
if pages == 0 and (time_val or attr_name in
('nsslapd-lookthroughlimit',
'nsslapd-pagesizelimit')):
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
else:
with pytest.raises(expected_err):
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
all_results.extend(rdata)
pages += 1
pctrls = [
@@ -363,19 +328,14 @@ def test_search_limits_fail(topology_st, test_user, page_size,
users_num,
if pctrls[0].cookie:
# Copy cookie from response control to request control
req_ctrl.cookie = pctrls[0].cookie
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=controls)
else:
break # No more pages available
else:
break
finally:
- log.info('Set Directory Manager bind back (test_search_limits_fail)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
change_conf_attr(topology_st, suffix, attr_name, attr_value_bck)
@@ -403,8 +363,7 @@ def test_search_sort_success(topology_st, test_user):
searchreq_attrlist = ['dn', 'sn']
try:
- log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
sort_ctrl = SSSRequestControl(True, ['sn'])
@@ -412,18 +371,17 @@ def test_search_sort_success(topology_st, test_user):
log.info('Initiate ldapsearch with created control instance')
log.info('Collect data with sorting')
controls = [req_ctrl, sort_ctrl]
- results_sorted = paged_search(topology_st, DEFAULT_SUFFIX, controls,
+ results_sorted = paged_search(conn, DEFAULT_SUFFIX, controls,
search_flt, searchreq_attrlist)
log.info('Substring numbers from user DNs')
- r_nums = map(lambda x: int(x[0][8:13]), results_sorted)
+ # r_nums = map(lambda x: int(x[0][8:13]), results_sorted)
+ r_nums = [int(x[0][8:13]) for x in results_sorted]
log.info('Assert that list is sorted')
assert all(r_nums[i] <= r_nums[i + 1] for i in range(len(r_nums) - 1))
finally:
- log.info('Set Directory Manager bind back (test_search_sort_success)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
def test_search_abandon(topology_st, test_user):
@@ -452,28 +410,23 @@ def test_search_abandon(topology_st, test_user):
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
log.info('Initiate a search with a paged results control')
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist, serverctrls=controls)
log.info('Abandon the search')
- topology_st.standalone.abandon(msgid)
+ conn.abandon(msgid)
log.info('Expect an ldap.TIMEOUT exception, while trying to get the search
results')
with pytest.raises(ldap.TIMEOUT):
- topology_st.standalone.result3(msgid, timeout=5)
+ conn.result3(msgid, timeout=5)
finally:
- log.info('Set Directory Manager bind back (test_search_abandon)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
def test_search_with_timelimit(topology_st, test_user):
@@ -506,7 +459,7 @@ def test_search_with_timelimit(topology_st, test_user):
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
@@ -514,18 +467,14 @@ def test_search_with_timelimit(topology_st, test_user):
for ii in range(3):
log.info('Iteration %d' % ii)
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls,
- timeout=timelimit)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, search_flt,
+ searchreq_attrlist, serverctrls=controls,
timeout=timelimit)
pages = 0
pctrls = []
while True:
log.info('Getting page %d' % (pages,))
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
pages += 1
pctrls = [
c
@@ -537,12 +486,8 @@ def test_search_with_timelimit(topology_st, test_user):
if pctrls[0].cookie:
# Copy cookie from response control to request control
req_ctrl.cookie = pctrls[0].cookie
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls,
- timeout=timelimit)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
search_flt,
+ searchreq_attrlist, serverctrls=controls,
timeout=timelimit)
else:
log.info('Done with this search - sleeping %d seconds' %
(
timelimit * 2))
@@ -551,9 +496,7 @@ def test_search_with_timelimit(topology_st, test_user):
else:
break
finally:
- log.info('Set Directory Manager bind back (test_search_with_timelimit)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
@pytest.mark.parametrize('aci_subject',
@@ -599,17 +542,10 @@ def test_search_dns_ip_aci(topology_st, test_user, aci_subject):
ACI_TARGET = '(targetattr != "userPassword")'
ACI_ALLOW = '(version 3.0;acl "Anonymous access within domain";
allow (read,compare,search)'
ACI_SUBJECT = '(userdn = "ldap:///anyone") and (%s);)' %
aci_subject
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- try:
- topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_REPLACE,
- 'aci',
- ACI_BODY)])
- except ldap.LDAPError as e:
- log.fatal('Failed to add ACI: error (%s)' %
(e.message['desc']))
- raise e
-
+ ACI_BODY = ensure_bytes(ACI_TARGET + ACI_ALLOW + ACI_SUBJECT)
+ topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_REPLACE,
'aci', ACI_BODY)])
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
@@ -618,24 +554,18 @@ def test_search_dns_ip_aci(topology_st, test_user, aci_subject):
log.info('Initiate three searches with a paged results control')
for ii in range(3):
log.info('%d search' % (ii + 1))
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls,
search_flt, searchreq_attrlist)
log.info('%d results' % len(all_results))
assert len(all_results) == len(users_list)
log.info('If we are here, then no error has happened. We are good.')
finally:
- log.info('Set Directory Manager bind back (test_search_dns_ip_aci)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
log.info('Restore ACI')
- topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_DELETE,
- 'aci',
- None)])
+ topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_DELETE, 'aci',
None)])
for aci in acis_bck:
- topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD,
- 'aci',
- aci.getRawAci())])
- del_users(topology_st, users_list)
+ topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD,
'aci', aci.getRawAci())])
+ del_users(users_list)
def test_search_multiple_paging(topology_st, test_user):
@@ -665,7 +595,7 @@ def test_search_multiple_paging(topology_st, test_user):
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
@@ -673,12 +603,9 @@ def test_search_multiple_paging(topology_st, test_user):
for ii in range(3):
log.info('Iteration %d' % ii)
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=controls)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
pctrls = [
c
for c in rctrls
@@ -687,15 +614,10 @@ def test_search_multiple_paging(topology_st, test_user):
# Copy cookie from response control to request control
req_ctrl.cookie = pctrls[0].cookie
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=controls)
finally:
- log.info('Set Directory Manager bind back
(test_search_multiple_paging)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
@pytest.mark.parametrize("invalid_cookie", [1000, -1])
@@ -727,32 +649,24 @@ def test_search_invalid_cookie(topology_st, test_user,
invalid_cookie):
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist, serverctrls=controls)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
log.info('Put an invalid cookie (%d) to the control. TypeError is
expected' %
invalid_cookie)
req_ctrl.cookie = invalid_cookie
with pytest.raises(TypeError):
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=controls)
finally:
- log.info('Set Directory Manager bind back (test_search_invalid_cookie)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
def test_search_abandon_with_zero_size(topology_st, test_user):
@@ -779,18 +693,15 @@ def test_search_abandon_with_zero_size(topology_st, test_user):
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=controls)
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist, serverctrls=controls)
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
pctrls = [
c
for c in rctrls
@@ -798,9 +709,7 @@ def test_search_abandon_with_zero_size(topology_st, test_user):
]
assert not pctrls[0].cookie
finally:
- log.info('Set Directory Manager bind back
(test_search_abandon_with_zero_size)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
def test_search_pagedsizelimit_success(topology_st, test_user):
@@ -826,31 +735,26 @@ def test_search_pagedsizelimit_success(topology_st, test_user):
page_size = 10
attr_name = 'nsslapd-pagedsizelimit'
attr_value = '20'
- attr_value_bck = change_conf_attr(topology_st, DN_CONFIG,
- attr_name, attr_value)
+ attr_value_bck = change_conf_attr(topology_st, DN_CONFIG, attr_name, attr_value)
users_list = add_users(topology_st, users_num, DEFAULT_SUFFIX)
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
log.info('%d results' % len(all_results))
assert len(all_results) == len(users_list)
finally:
- log.info('Set Directory Manager bind back
(test_search_pagedsizelimit_success)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-pagedsizelimit', attr_value_bck)
+ del_users(users_list)
+ change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-pagedsizelimit',
attr_value_bck)
@pytest.mark.parametrize('conf_attr,user_attr,expected_rs',
@@ -895,14 +799,12 @@ def test_search_nspagedsizelimit(topology_st, test_user,
users_list = add_users(topology_st, users_num, DEFAULT_SUFFIX)
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
- conf_attr_bck = change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-pagedsizelimit', conf_attr)
- user_attr_bck = change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedSizeLimit', user_attr)
+ conf_attr_bck = change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-pagedsizelimit', conf_attr)
+ user_attr_bck = change_conf_attr(topology_st, test_user.dn,
'nsPagedSizeLimit', user_attr)
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
@@ -910,23 +812,17 @@ def test_search_nspagedsizelimit(topology_st, test_user,
if expected_rs == ldap.SIZELIMIT_EXCEEDED:
log.info('Expect to fail with SIZELIMIT_EXCEEDED')
with pytest.raises(expected_rs):
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
elif expected_rs == 'PASS':
log.info('Expect to pass')
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
log.info('%d results' % len(all_results))
assert len(all_results) == len(users_list)
finally:
- log.info('Set Directory Manager bind back
(test_search_nspagedsizelimit)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-pagedsizelimit', conf_attr_bck)
- change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedSizeLimit', user_attr_bck)
+ del_users(users_list)
+ change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-pagedsizelimit',
conf_attr_bck)
+ change_conf_attr(topology_st, test_user.dn, 'nsPagedSizeLimit',
user_attr_bck)
@pytest.mark.parametrize('conf_attr_values,expected_rs',
@@ -972,18 +868,14 @@ def test_search_paged_limits(topology_st, test_user,
conf_attr_values, expected_
users_list = add_users(topology_st, users_num, DEFAULT_SUFFIX)
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
- size_attr_bck = change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-sizelimit', conf_attr_values[0])
- pagedsize_attr_bck = change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-pagedsizelimit',
conf_attr_values[0])
- idlistscan_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM,
- 'nsslapd-idlistscanlimit',
conf_attr_values[1])
- lookthrough_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM,
- 'nsslapd-lookthroughlimit',
conf_attr_values[2])
+ size_attr_bck = change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-sizelimit',
conf_attr_values[0])
+ pagedsize_attr_bck = change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-pagedsizelimit', conf_attr_values[0])
+ idlistscan_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-idlistscanlimit', conf_attr_values[1])
+ lookthrough_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM, 'nsslapd-lookthroughlimit', conf_attr_values[2])
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
@@ -991,26 +883,18 @@ def test_search_paged_limits(topology_st, test_user,
conf_attr_values, expected_
if expected_rs == ldap.ADMINLIMIT_EXCEEDED:
log.info('Expect to fail with ADMINLIMIT_EXCEEDED')
with pytest.raises(expected_rs):
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
elif expected_rs == 'PASS':
log.info('Expect to pass')
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
log.info('%d results' % len(all_results))
assert len(all_results) == len(users_list)
finally:
- log.info('Set Directory Manager bind back (test_search_paged_limits)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-sizelimit', size_attr_bck)
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-pagedsizelimit', pagedsize_attr_bck)
- change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
- 'nsslapd-lookthroughlimit', lookthrough_attr_bck)
- change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
- 'nsslapd-idlistscanlimit', idlistscan_attr_bck)
+ del_users(users_list)
+ change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-sizelimit',
size_attr_bck)
+ change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-pagedsizelimit',
pagedsize_attr_bck)
+ change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-lookthroughlimit', lookthrough_attr_bck)
+ change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-idlistscanlimit', idlistscan_attr_bck)
@pytest.mark.parametrize('conf_attr_values,expected_rs',
@@ -1057,18 +941,14 @@ def test_search_paged_user_limits(topology_st, test_user,
conf_attr_values, expe
users_list = add_users(topology_st, users_num, DEFAULT_SUFFIX)
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
- lookthrough_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM,
- 'nsslapd-lookthroughlimit',
conf_attr_values[0])
- idlistscan_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM,
- 'nsslapd-idlistscanlimit',
conf_attr_values[0])
- user_idlistscan_attr_bck = change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedIDListScanLimit',
conf_attr_values[1])
- user_lookthrough_attr_bck = change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedLookthroughLimit',
conf_attr_values[2])
+ lookthrough_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' %
DN_LDBM, 'nsslapd-lookthroughlimit', conf_attr_values[0])
+ idlistscan_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-idlistscanlimit', conf_attr_values[0])
+ user_idlistscan_attr_bck = change_conf_attr(topology_st, test_user.dn,
'nsPagedIDListScanLimit', conf_attr_values[1])
+ user_lookthrough_attr_bck = change_conf_attr(topology_st, test_user.dn,
'nsPagedLookthroughLimit', conf_attr_values[2])
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
@@ -1076,26 +956,18 @@ def test_search_paged_user_limits(topology_st, test_user,
conf_attr_values, expe
if expected_rs == ldap.ADMINLIMIT_EXCEEDED:
log.info('Expect to fail with ADMINLIMIT_EXCEEDED')
with pytest.raises(expected_rs):
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
elif expected_rs == 'PASS':
log.info('Expect to pass')
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, controls,
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, controls, search_flt,
searchreq_attrlist)
log.info('%d results' % len(all_results))
assert len(all_results) == len(users_list)
finally:
- log.info('Set Directory Manager bind back
(test_search_paged_user_limits)')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
- change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
- 'nsslapd-lookthroughlimit', lookthrough_attr_bck)
- change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
- 'nsslapd-idlistscanlimit', idlistscan_attr_bck)
- change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedIDListScanLimit', user_idlistscan_attr_bck)
- change_conf_attr(topology_st, TEST_USER_DN,
- 'nsPagedLookthroughLimit', user_lookthrough_attr_bck)
+ del_users(users_list)
+ change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-lookthroughlimit', lookthrough_attr_bck)
+ change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM,
'nsslapd-idlistscanlimit', idlistscan_attr_bck)
+ change_conf_attr(topology_st, test_user.dn, 'nsPagedIDListScanLimit',
user_idlistscan_attr_bck)
+ change_conf_attr(topology_st, test_user.dn, 'nsPagedLookthroughLimit',
user_lookthrough_attr_bck)
def test_ger_basic(topology_st, test_user):
@@ -1120,13 +992,10 @@ def test_ger_basic(topology_st, test_user):
page_size = 4
try:
- log.info('Set bind to directory manager')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
-
spr_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
- ger_ctrl = GetEffectiveRightsControl(True, "dn: " + DN_DM)
+ ger_ctrl = GetEffectiveRightsControl(True, ensure_bytes("dn: " +
DN_DM))
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, [spr_ctrl, ger_ctrl],
+ all_results = paged_search(topology_st.standalone, DEFAULT_SUFFIX, [spr_ctrl,
ger_ctrl],
search_flt, searchreq_attrlist)
log.info('{} results'.format(len(all_results)))
@@ -1135,7 +1004,7 @@ def test_ger_basic(topology_st, test_user):
assert all(attrs['attributeLevelRights'][0] for dn, attrs in
all_results)
finally:
log.info('Remove added users')
- del_users(topology_st, users_list)
+ del_users(users_list)
def test_multi_suffix_search(topology_st, test_user, new_suffixes):
@@ -1168,17 +1037,13 @@ def test_multi_suffix_search(topology_st, test_user,
new_suffixes):
log.info('Clear the access log')
topology_st.standalone.deleteAccessLogs()
- users_list_1 = add_users(topology_st, users_num / 2, NEW_SUFFIX_1)
- users_list_2 = add_users(topology_st, users_num / 2, NEW_SUFFIX_2)
+ users_list_1 = add_users(topology_st, 10, NEW_SUFFIX_1)
+ users_list_2 = add_users(topology_st, 10, NEW_SUFFIX_2)
try:
- log.info('Set DM bind')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
-
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
- all_results = paged_search(topology_st, NEW_SUFFIX_1, [req_ctrl],
- search_flt, searchreq_attrlist)
+ all_results = paged_search(topology_st.standalone, NEW_SUFFIX_1, [req_ctrl],
search_flt, searchreq_attrlist)
log.info('{} results'.format(len(all_results)))
assert len(all_results) == users_num
@@ -1195,8 +1060,8 @@ def test_multi_suffix_search(topology_st, test_user, new_suffixes):
assert pr_cookie_list[-1] == -1
finally:
log.info('Remove added users')
- del_users(topology_st, users_list_1)
- del_users(topology_st, users_list_2)
+ del_users(users_list_1)
+ del_users(users_list_2)
@pytest.mark.parametrize('conf_attr_value', (None, '-1',
'1000'))
@@ -1223,28 +1088,23 @@ def test_maxsimplepaged_per_conn_success(topology_st, test_user,
conf_attr_value
searchreq_attrlist = ['dn', 'sn']
page_size = 4
if conf_attr_value:
- max_per_con_bck = change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-maxsimplepaged-per-conn',
- conf_attr_value)
+ max_per_con_bck = change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-maxsimplepaged-per-conn', conf_attr_value)
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
- all_results = paged_search(topology_st, DEFAULT_SUFFIX, [req_ctrl],
- search_flt, searchreq_attrlist)
+ all_results = paged_search(conn, DEFAULT_SUFFIX, [req_ctrl], search_flt,
searchreq_attrlist)
log.info('{} results'.format(len(all_results)))
assert len(all_results) == len(users_list)
finally:
log.info('Remove added users')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
+ del_users(users_list)
if conf_attr_value:
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-maxsimplepaged-per-conn', max_per_con_bck)
+ change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-maxsimplepaged-per-conn', max_per_con_bck)
@pytest.mark.parametrize('conf_attr_value', ('0', '1'))
@@ -1272,41 +1132,30 @@ def test_maxsimplepaged_per_conn_failure(topology_st, test_user,
conf_attr_value
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
page_size = 4
- max_per_con_bck = change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-maxsimplepaged-per-conn',
- conf_attr_value)
+ max_per_con_bck = change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-maxsimplepaged-per-conn', conf_attr_value)
try:
log.info('Set user bind')
- topology_st.standalone.simple_bind_s(TEST_USER_DN, TEST_USER_PWD)
+ conn = test_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=[req_ctrl])
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=[req_ctrl])
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
# If nsslapd-maxsimplepaged-per-conn = 1,
# it should pass this point, but failed on the next search
assert conf_attr_value == '1'
- msgid = topology_st.standalone.search_ext(DEFAULT_SUFFIX,
- ldap.SCOPE_SUBTREE,
- search_flt,
- searchreq_attrlist,
- serverctrls=[req_ctrl])
- rtype, rdata, rmsgid, rctrls = topology_st.standalone.result3(msgid)
+ msgid = conn.search_ext(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
+ search_flt, searchreq_attrlist,
serverctrls=[req_ctrl])
+ rtype, rdata, rmsgid, rctrls = conn.result3(msgid)
finally:
log.info('Remove added users')
- topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
- del_users(topology_st, users_list)
- change_conf_attr(topology_st, DN_CONFIG,
- 'nsslapd-maxsimplepaged-per-conn', max_per_con_bck)
-
+ del_users(users_list)
+ change_conf_attr(topology_st, DN_CONFIG,
'nsslapd-maxsimplepaged-per-conn', max_per_con_bck)
if __name__ == '__main__':
# Run isolated
diff --git a/dirsrvtests/tests/suites/paged_results/sss_control.py
b/dirsrvtests/tests/suites/paged_results/sss_control.py
deleted file mode 100644
index 4598b64..0000000
--- a/dirsrvtests/tests/suites/paged_results/sss_control.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-ldap.controls.sss - classes for Server Side Sorting
-(see RFC 2891)
-See
http://www.python-ldap.org/ for project details.
-$Id: sss.py,v 1.2 2015/10/24 15:52:23 stroeder Exp $
-"""
-
-__all__ = [
- 'SSSRequestControl',
- 'SSSResponseControl',
-]
-
-from ldap.controls import (RequestControl, ResponseControl,
- KNOWN_RESPONSE_CONTROLS)
-from pyasn1.codec.ber import encoder, decoder
-from pyasn1.type import univ, namedtype, tag, namedval, constraint
-
-
-# SortKeyList ::= SEQUENCE OF SEQUENCE {
-# attributeType AttributeDescription,
-# orderingRule [0] MatchingRuleId OPTIONAL,
-# reverseOrder [1] BOOLEAN DEFAULT FALSE }
-
-
-class SortKeyType(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('attributeType', univ.OctetString()),
- namedtype.OptionalNamedType('orderingRule',
- univ.OctetString().subtype(
- implicitTag=tag.Tag(tag.tagClassContext,
tag.tagFormatSimple, 0)
- )
- ),
- namedtype.DefaultedNamedType('reverseOrder',
univ.Boolean(False).subtype(
- implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))))
-
-
-class SortKeyListType(univ.SequenceOf):
- componentType = SortKeyType()
-
-
-class SSSRequestControl(RequestControl):
- '''Order result server side
- >>> s = SSSRequestControl('-cn')
- '''
- controlType = '1.2.840.113556.1.4.473'
-
- def __init__(
- self,
- criticality=False,
- ordering_rules=None,
- ):
- RequestControl.__init__(self, self.controlType, criticality)
- self.ordering_rules = ordering_rules
- if isinstance(ordering_rules, basestring):
- ordering_rules = [ordering_rules]
- for rule in ordering_rules:
- rule = rule.split(':')
- assert len(rule) < 3, 'syntax for ordering rule:
[-]<attribute-type>[:ordering-rule]'
-
- def asn1(self):
- p = SortKeyListType()
- for i, rule in enumerate(self.ordering_rules):
- q = SortKeyType()
- reverse_order = rule.startswith('-')
- if reverse_order:
- rule = rule[1:]
- if ':' in rule:
- attribute_type, ordering_rule = rule.split(':')
- else:
- attribute_type, ordering_rule = rule, None
- q.setComponentByName('attributeType', attribute_type)
- if ordering_rule:
- q.setComponentByName('orderingRule', ordering_rule)
- if reverse_order:
- q.setComponentByName('reverseOrder', 1)
- p.setComponentByPosition(i, q)
- return p
-
- def encodeControlValue(self):
- return encoder.encode(self.asn1())
-
-
-class SortResultType(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('sortResult', univ.Enumerated().subtype(
- namedValues=namedval.NamedValues(
- ('success', 0),
- ('operationsError', 1),
- ('timeLimitExceeded', 3),
- ('strongAuthRequired', 8),
- ('adminLimitExceeded', 11),
- ('noSuchAttribute', 16),
- ('inappropriateMatching', 18),
- ('insufficientAccessRights', 50),
- ('busy', 51),
- ('unwillingToPerform', 53),
- ('other', 80)),
- subtypeSpec=univ.Enumerated.subtypeSpec + constraint.SingleValueConstraint(
- 0, 1, 3, 8, 11, 16, 18, 50, 51, 53, 80))),
- namedtype.OptionalNamedType('attributeType',
- univ.OctetString().subtype(
- implicitTag=tag.Tag(tag.tagClassContext,
tag.tagFormatSimple, 0)
- )
- ))
-
-
-class SSSResponseControl(ResponseControl):
- controlType = '1.2.840.113556.1.4.474'
-
- def __init__(self, criticality=False):
- ResponseControl.__init__(self, self.controlType, criticality)
-
- def decodeControlValue(self, encoded):
- p, rest = decoder.decode(encoded, asn1Spec=SortResultType())
- assert not rest, 'all data could not be decoded'
- self.result = int(p.getComponentByName('sortResult'))
- self.result_code =
p.getComponentByName('sortResult').prettyOut(self.result)
- self.attribute_type_error = p.getComponentByName('attributeType')
-
-
-KNOWN_RESPONSE_CONTROLS[SSSRequestControl.controlType] = SSSRequestControl
-KNOWN_RESPONSE_CONTROLS[SSSResponseControl.controlType] = SSSResponseControl
diff --git a/src/lib389/lib389/_controls.py b/src/lib389/lib389/_controls.py
index 330b64b..7cbe78c 100644
--- a/src/lib389/lib389/_controls.py
+++ b/src/lib389/lib389/_controls.py
@@ -12,8 +12,8 @@ Lib389 python ldap request controls.
These should be upstreamed into python ldap when possible.
"""
-from ldap.controls import LDAPControl
-from pyasn1.type import namedtype, univ, tag
+from ldap.controls import (LDAPControl, RequestControl, ResponseControl)
+from pyasn1.type import namedtype, univ, tag, namedval, constraint
from pyasn1.codec.ber import encoder, decoder
from pyasn1_modules.rfc2251 import AttributeDescription, LDAPDN, AttributeValue
from lib389._constants import *
@@ -135,3 +135,106 @@ class DereferenceControl(LDAPControl):
av['vals'].append(str(val))
result['attrVals'].append(av)
self.entry.append(result)
+
+
+# SortKeyList ::= SEQUENCE OF SEQUENCE {
+# attributeType AttributeDescription,
+# orderingRule [0] MatchingRuleId OPTIONAL,
+# reverseOrder [1] BOOLEAN DEFAULT FALSE }
+
+
+class SortKeyType(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('attributeType', univ.OctetString()),
+ namedtype.OptionalNamedType('orderingRule',
+ univ.OctetString().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext,
tag.tagFormatSimple, 0)
+ )
+ ),
+ namedtype.DefaultedNamedType('reverseOrder',
univ.Boolean(False).subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))))
+
+
+class SortKeyListType(univ.SequenceOf):
+ componentType = SortKeyType()
+
+
+class SSSRequestControl(RequestControl):
+ '''Order result server side
+ >>> s = SSSRequestControl('-cn')
+ '''
+ controlType = '1.2.840.113556.1.4.473'
+
+ def __init__(
+ self,
+ criticality=False,
+ ordering_rules=None,
+ ):
+ RequestControl.__init__(self, self.controlType, criticality)
+ self.ordering_rules = ordering_rules
+ if isinstance(ordering_rules, str):
+ ordering_rules = [ordering_rules]
+ for rule in ordering_rules:
+ rule = rule.split(':')
+ assert len(rule) < 3, 'syntax for ordering rule:
[-]<attribute-type>[:ordering-rule]'
+
+ def asn1(self):
+ p = SortKeyListType()
+ for i, rule in enumerate(self.ordering_rules):
+ q = SortKeyType()
+ reverse_order = rule.startswith('-')
+ if reverse_order:
+ rule = rule[1:]
+ if ':' in rule:
+ attribute_type, ordering_rule = rule.split(':')
+ else:
+ attribute_type, ordering_rule = rule, None
+ q.setComponentByName('attributeType', attribute_type)
+ if ordering_rule:
+ q.setComponentByName('orderingRule', ordering_rule)
+ if reverse_order:
+ q.setComponentByName('reverseOrder', 1)
+ p.setComponentByPosition(i, q)
+ return p
+
+ def encodeControlValue(self):
+ return encoder.encode(self.asn1())
+
+
+class SortResultType(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('sortResult', univ.Enumerated().subtype(
+ namedValues=namedval.NamedValues(
+ ('success', 0),
+ ('operationsError', 1),
+ ('timeLimitExceeded', 3),
+ ('strongAuthRequired', 8),
+ ('adminLimitExceeded', 11),
+ ('noSuchAttribute', 16),
+ ('inappropriateMatching', 18),
+ ('insufficientAccessRights', 50),
+ ('busy', 51),
+ ('unwillingToPerform', 53),
+ ('other', 80)),
+ subtypeSpec=univ.Enumerated.subtypeSpec + constraint.SingleValueConstraint(
+ 0, 1, 3, 8, 11, 16, 18, 50, 51, 53, 80))),
+ namedtype.OptionalNamedType('attributeType',
+ univ.OctetString().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext,
tag.tagFormatSimple, 0)
+ )
+ ))
+
+
+class SSSResponseControl(ResponseControl):
+ controlType = '1.2.840.113556.1.4.474'
+
+ def __init__(self, criticality=False):
+ ResponseControl.__init__(self, self.controlType, criticality)
+
+ def decodeControlValue(self, encoded):
+ p, rest = decoder.decode(encoded, asn1Spec=SortResultType())
+ assert not rest, 'all data could not be decoded'
+ self.result = int(p.getComponentByName('sortResult'))
+ self.result_code =
p.getComponentByName('sortResult').prettyOut(self.result)
+ self.attribute_type_error = p.getComponentByName('attributeType')
+
diff --git a/src/lib389/lib389/_entry.py b/src/lib389/lib389/_entry.py
index d13f458..8a9e36b 100644
--- a/src/lib389/lib389/_entry.py
+++ b/src/lib389/lib389/_entry.py
@@ -503,7 +503,7 @@ class EntryAci(object):
rawaci += ('(%s);' % self.acidata["%s_raw_bindrules" %
key][0]['values'][-1])
rawaci += ")"
- return rawaci
+ return ensure_bytes(rawaci)
def _find_terms(self, aci):
if self.verbose:
diff --git a/src/lib389/lib389/backend.py b/src/lib389/lib389/backend.py
index 6f4c869..292bf03 100644
--- a/src/lib389/lib389/backend.py
+++ b/src/lib389/lib389/backend.py
@@ -543,6 +543,10 @@ class Backend(DSLdapObject):
return result
return None
+ def get_mapping_tree(self):
+ suffix = self.get_attr_val_utf8('nsslapd-suffix')
+ return self._mts.get(suffix)
+
def get_monitor(self):
"""Get a MonitorBackend(DSLdapObject) for the
backend"""
diff --git a/src/lib389/lib389/idm/organization.py
b/src/lib389/lib389/idm/organization.py
new file mode 100644
index 0000000..e0f0e6b
--- /dev/null
+++ b/src/lib389/lib389/idm/organization.py
@@ -0,0 +1,62 @@
+# --- BEGIN COPYRIGHT BLOCK ---
+# Copyright (C) 2017 Red Hat, Inc.
+# All rights reserved.
+#
+# License: GPL (version 3 or any later version).
+# See LICENSE for details.
+# --- END COPYRIGHT BLOCK ---
+
+from lib389._mapped_object import DSLdapObject, DSLdapObjects
+
+MUST_ATTRIBUTES = [
+ 'o',
+]
+RDN = 'o'
+
+
+class Organization(DSLdapObject):
+ """A single instance of Organization entry
+
+ :param instance: An instance
+ :type instance: lib389.DirSrv
+ :param dn: Entry DN
+ :type dn: str
+ :param batch: Not implemented
+ :type batch: bool
+ """
+
+ def __init__(self, instance, dn=None, batch=False):
+ super(Organization, self).__init__(instance, dn, batch)
+ self._rdn_attribute = RDN
+ self._must_attributes = MUST_ATTRIBUTES
+ self._create_objectclasses = [
+ 'top',
+ 'organization',
+ ]
+ self._protected = False
+
+
+class Organizations(DSLdapObjects):
+ """DSLdapObjects that represents Organizations entry
+
+ :param instance: An instance
+ :type instance: lib389.DirSrv
+ :param basedn: Base DN for all group entries below
+ :type basedn: str
+ :param batch: Not implemented
+ :type batch: bool
+ """
+
+ def __init__(self, instance, basedn, batch=False):
+ super(Organizations, self).__init__(instance, batch)
+ self._objectclasses = [
+ 'organization',
+ ]
+ self._filterattrs = [RDN]
+ self._childobject = Organisation
+ self._basedn = basedn
+
+# Alias for "rest of world" :)
+Organisation = Organization
+Organisations = Organizations
+
diff --git a/src/lib389/lib389/idm/user.py b/src/lib389/lib389/idm/user.py
index ee4e08d..5c3f55f 100644
--- a/src/lib389/lib389/idm/user.py
+++ b/src/lib389/lib389/idm/user.py
@@ -81,5 +81,8 @@ class UserAccounts(DSLdapObjects):
]
self._filterattrs = [RDN]
self._childobject = UserAccount
- self._basedn = '{},{}'.format(rdn, basedn)
+ if rdn is None:
+ self._basedn = basedn
+ else:
+ self._basedn = '{},{}'.format(rdn, basedn)
diff --git a/src/lib389/lib389/mappingTree.py b/src/lib389/lib389/mappingTree.py
index f319276..2796e22 100644
--- a/src/lib389/lib389/mappingTree.py
+++ b/src/lib389/lib389/mappingTree.py
@@ -400,6 +400,20 @@ class MappingTree(DSLdapObject):
self._create_objectclasses = ['top', 'extensibleObject',
MT_OBJECTCLASS_VALUE]
self._protected = False
+ def set_parent(self, parent):
+ """
+ Set the parent suffix to create a tree of backends. For example:
+
+ be_1 = bes.create(properties={...})
+ be_2 = bes.create(properties={...})
+ mt = be_2.get_mapping_tree()
+ mt.set_parent(be_1.get_attr_val_bytes('nsslapd-suffix'))
+
+ :param parent: The parent suffix above this mapping tree.
+ :type parent: str
+ """
+ self.replace('nsslapd-parent-suffix', parent)
+
class MappingTrees(DSLdapObjects):
"""DSLdapObjects that presents Mapping trees
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.