#!/usr/bin/python3 """ Create a composite group. Example: grouper_create_composite_group.py -m dev -f this:is:folder -g newgroup -n 'New Group' -d 'Some description' -t intersection -l the:first:component -r the:second:component """ import argparse import requests from requests.auth import HTTPBasicAuth import json import sys,string def grouperCreateCompositeGroup(grouper_credentials, grouper_ws_uri, folderIdPath, groupId, groupName, groupDescription, compositeType, leftGroupIdPath, rightGroupIdPath): """Create composite group""" groupIdPath = folderIdPath+':'+groupId if compositeType != 'intersection' and compositeType != 'complement': print(f"composite type {compositeType} must be intersection or complement") exit(1) body = { "WsRestGroupSaveRequest":{ "wsGroupToSaves":[ { "wsGroupLookup":{ "groupName": groupIdPath }, "wsGroup":{ "extension": groupId, "displayExtension": groupName, "name": groupIdPath, "description": groupDescription, "detail":{ "hasComposite": "T", "compositeType": compositeType, "leftGroup":{ "name": leftGroupIdPath }, "rightGroup":{ "name": rightGroupIdPath } } } } ] } } return grouperWSRequest(grouper_credentials, grouper_ws_uri+"/groups", "POST", body) def grouperGetFolderUuid(grouper_credentials, grouper_ws_uri, folderName): """Get UUID for the specified folder""" thisuuid = 0 body = { "WsRestFindStemsRequest": { "wsStemQueryFilter": { "stemName": folderName, "stemQueryFilterType": "FIND_BY_STEM_NAME" } } } findStems = grouperWSRequest(grouper_credentials, grouper_ws_uri + "/stems", "POST", body) if findStems and findStems['WsFindStemsResults']['resultMetadata']['success']: stemList = findStems['WsFindStemsResults']['stemResults'] if len(stemList) == 1: thisuuid = findStems['WsFindStemsResults']['stemResults'][0]['uuid'] return thisuuid def grouperGetUuid(grouper_credentials, grouper_ws_uri, groupName): """Get UUID for the specified group""" thisuuid = 0 body = { "WsRestFindGroupsRequest": { "wsQueryFilter": { "groupName": groupName, "queryFilterType": "FIND_BY_GROUP_NAME_EXACT", } } } findGroups = grouperWSRequest(grouper_credentials, grouper_ws_uri+"/groups", "POST", body) if findGroups and findGroups['WsFindGroupsResults']['resultMetadata']['success'] and 'groupResults' in findGroups['WsFindGroupsResults']: thisuuid = findGroups['WsFindGroupsResults']['groupResults'][0]['uuid'] return thisuuid def grouperWSRequest(grouper_credentials, url, method, body=None): """Send a request to the Grouper Web Service.""" try: response = requests.request(method, url, json=body, auth=grouper_credentials) response.raise_for_status() except requests.RequestException as e: raise SystemExit(e) from None return response.json() def create_composite_group(mode, folder, group_id, group_name, group_description, composite_type, left_group, right_group): """Create composite group. Args: mode (str): The environment mode ('dev', 'prod'). folder (str): The group will be created in this folder. group_id (str): The group ID. group_name (str): The group name. group_description (str): The group description. composite_type (str): The composite type ('intersection','complement'). left_group (str): The group ID path of the left group. right_group (str): The group ID path of the right group. """ # set the Grouper Web Service username and password grouper_username = 'abc1234' grouper_password = 'xxxxxxxxxxxxxxxxxxxx' grouper_credentials = HTTPBasicAuth(grouper_username,grouper_password) # the Grouper Web Service URI should point to dev or prod Grouper devGrouperURI = 'https://grouper-dev.cc.columbia.edu/grouper-ws/servicesRest/v2_4_000' prodGrouperURI = 'https://grouper.cc.columbia.edu/grouper-ws/servicesRest/v2_4_000' if mode == 'prod': grouper_ws_uri = prodGrouperURI else: grouper_ws_uri = devGrouperURI # Verify that the folder exists and is accessible by this user if grouperGetFolderUuid(grouper_credentials,grouper_ws_uri,folder) == 0: print(f"{folder} folder not found (does not exist or is not accessible)") return # create a group result = grouperCreateCompositeGroup(grouper_credentials,grouper_ws_uri,folder,group_id,group_name, group_description,composite_type,left_group,right_group) if not result: print(f"Unable to create {group_id} in {folder}") return if not result['WsGroupSaveResults']['resultMetadata']['resultCode'] == 'SUCCESS': rc = result['WsGroupSaveResults']['resultMetadata']['resultCode'] print(f"Unable to create {group_id} in {folder} result code {rc}") return result_code = result['WsGroupSaveResults']['results'][0]['resultMetadata']['resultCode'] if result_code == 'SUCCESS_NO_CHANGES_NEEDED': print(f"{group_id} group already exists") elif result_code == 'SUCCESS_UPDATED': print(f"{group_id} group already exists, it has been updated") elif result_code == 'SUCCESS_INSERTED': print(f"{group_id} group created") else: print(f"{group_id} group created, {result_code}") def main(): """Parses command-line arguments and calls create_composite_group.""" parser = argparse.ArgumentParser( description="Create a composite group.", ) parser.add_argument( "-m", action="store", dest="mode", required=True, choices=["dev", "prod"], type=str.lower, help="the Grouper environment mode", ) parser.add_argument( "-f", action="store", dest="folder", required=True, help="the group will be created in this folder", ) parser.add_argument( "-g", action="store", dest="group_id", required=True, help="the group ID", ) parser.add_argument( "-n", action="store", dest="group_name", required=True, help="the group name", ) parser.add_argument( "-d", action="store", dest="group_description", required=True, help="the group description", ) parser.add_argument( "-t", action="store", dest="composite_type", required=True, choices=["intersection", "complement"], type=str.lower, help="the composite type", ) parser.add_argument( "-l", action="store", dest="left_group", required=True, help="the left group", ) parser.add_argument( "-r", action="store", dest="right_group", required=True, help="the right group", ) args = parser.parse_args() create_composite_group(args.mode, args.folder, args.group_id, args.group_name, args.group_description, args.composite_type, args.left_group, args.right_group) if __name__ == "__main__": main()