This guide provides methods to sync users from a CSV file to a FreeIPA server. FreeIPA 4.13.1 includes enhanced API capabilities and security features that improve user management workflows.
requests library installedHere’s an improved Python script that reads a CSV file containing user information and creates or updates those users in a FreeIPA server using the FreeIPA API:
#!/usr/bin/env python3
"""
CSV to FreeIPA User Sync Script
Sync users from a CSV file to a FreeIPA server
"""
import csv
import json
import requests
import argparse
import getpass
import sys
from typing import Dict, List, Optional
class FreeIPASync:
def __init__(self, server_url: str, username: str, password: str):
self.server_url = server_url.rstrip('/')
self.username = username
self.password = password
self.session = None
self.base_url = f"{self.server_url}/ipa/session/json"
def authenticate(self) -> bool:
"""Authenticate with FreeIPA and establish session"""
try:
# First, get the initial session cookie
login_url = f"{self.server_url}/ipa/session/login_password"
headers = {
'Referer': self.server_url,
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'text/plain'
}
data = {
'user': self.username,
'password': self.password
}
response = requests.post(login_url, headers=headers, data=data)
if response.status_code == 200:
self.session = requests.Session()
self.session.cookies.update(response.cookies)
return True
else:
print(f"Authentication failed: {response.status_code}")
return False
except Exception as e:
print(f"Authentication error: {str(e)}")
return False
def user_exists(self, username: str) -> bool:
"""Check if a user already exists in FreeIPA"""
try:
data = {
'method': 'user_show',
'params': [
[username],
{'all': True}
],
'id': 0
}
response = self.session.post(
self.base_url,
json=data,
headers={'Content-Type': 'application/json'}
)
result = response.json()
return 'result' in result and result['result'] is not None
except Exception:
return False
def create_user(self, user_data: Dict[str, str]) -> bool:
"""Create a new user in FreeIPA"""
try:
data = {
'method': 'user_add',
'params': [
[user_data['username']],
{
'givenname': user_data['givenname'],
'sn': user_data['surname'],
'mail': user_data['email'],
'setattr': f'loginshell={user_data.get("loginshell", "/bin/bash")}',
'setattr': f'homedirectory={user_data.get("homedir", f"/home/{user_data["username"]}")}'
}
],
'id': 0
}
# Add additional attributes if provided
params = data['params'][1]
if 'displayname' in user_data:
params['displayname'] = user_data['displayname']
if 'phone' in user_data:
params['telephonenumber'] = user_data['phone']
if 'mobile' in user_data:
params['mobile'] = user_data['mobile']
if 'jobtitle' in user_data:
params['title'] = user_data['jobtitle']
if 'department' in user_data:
params['departmentnumber'] = user_data['department']
if 'manager' in user_data:
params['manager'] = user_data['manager']
response = self.session.post(
self.base_url,
json=data,
headers={'Content-Type': 'application/json'}
)
result = response.json()
if 'error' in result and result['error'] is not None:
print(f"Error creating user {user_data['username']}: {result['error']['message']}")
return False
else:
print(f"Successfully created user {user_data['username']}")
return True
except Exception as e:
print(f"Error creating user {user_data['username']}: {str(e)}")
return False
def update_user(self, user_data: Dict[str, str]) -> bool:
"""Update an existing user in FreeIPA"""
try:
data = {
'method': 'user_mod',
'params': [
[user_data['username']],
{
'givenname': user_data['givenname'],
'sn': user_data['surname'],
'mail': user_data['email']
}
],
'id': 0
}
# Add additional attributes if provided
params = data['params'][1]
if 'displayname' in user_data:
params['displayname'] = user_data['displayname']
if 'phone' in user_data:
params['telephonenumber'] = user_data['phone']
if 'mobile' in user_data:
params['mobile'] = user_data['mobile']
if 'jobtitle' in user_data:
params['title'] = user_data['jobtitle']
if 'department' in user_data:
params['departmentnumber'] = user_data['department']
if 'manager' in user_data:
params['manager'] = user_data['manager']
response = self.session.post(
self.base_url,
json=data,
headers={'Content-Type': 'application/json'}
)
result = response.json()
if 'error' in result and result['error'] is not None:
print(f"Error updating user {user_data['username']}: {result['error']['message']}")
return False
else:
print(f"Successfully updated user {user_data['username']}")
return True
except Exception as e:
print(f"Error updating user {user_data['username']}: {str(e)}")
return False
def sync_users_from_csv(self, csv_file: str, update_existing: bool = True) -> int:
"""Sync users from CSV file to FreeIPA"""
success_count = 0
with open(csv_file, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
# Validate required columns
required_columns = ['username', 'givenname', 'surname', 'email']
missing_columns = [col for col in required_columns if col not in reader.fieldnames]
if missing_columns:
print(f"Error: Missing required columns in CSV: {missing_columns}")
return 0
for row_num, row in enumerate(reader, start=2): # Start at 2 since header is row 1
username = row['username'].strip()
if not username:
print(f"Warning: Empty username at row {row_num}, skipping...")
continue
# Prepare user data
user_data = {
'username': username,
'givenname': row['givenname'].strip(),
'surname': row['surname'].strip(),
'email': row['email'].strip()
}
# Add optional fields if present
optional_fields = ['displayname', 'phone', 'mobile', 'jobtitle', 'department', 'manager', 'loginshell', 'homedir']
for field in optional_fields:
if field in row and row[field].strip():
user_data[field] = row[field].strip()
# Check if user exists
if self.user_exists(username):
if update_existing:
if self.update_user(user_data):
success_count += 1
else:
print(f"User {username} already exists, skipping (use --update to update existing users)")
else:
if self.create_user(user_data):
success_count += 1
return success_count
def main():
parser = argparse.ArgumentParser(description='Sync users from CSV to FreeIPA')
parser.add_argument('--server', required=True, help='FreeIPA server URL (e.g., https://ipa.example.com)')
parser.add_argument('--username', required=True, help='FreeIPA admin username')
parser.add_argument('--csv-file', required=True, help='Path to CSV file with user data')
parser.add_argument('--update', action='store_true', help='Update existing users (default: skip existing)')
parser.add_argument('--password', help='FreeIPA admin password (will prompt if not provided)')
args = parser.parse_args()
# Get password if not provided
if not args.password:
args.password = getpass.getpass(f"Enter password for {args.username}: ")
# Initialize FreeIPA sync
sync_tool = FreeIPASync(args.server, args.username, args.password)
# Authenticate
print("Authenticating with FreeIPA...")
if not sync_tool.authenticate():
print("Authentication failed. Exiting.")
sys.exit(1)
print("Authentication successful. Starting sync...")
# Perform sync
success_count = sync_tool.sync_users_from_csv(args.csv_file, args.update)
print(f"\nSync completed. Successfully processed {success_count} users.")
if __name__ == "__main__":
main()
Your CSV file should have the following format:
username,givenname,surname,email,displayname,phone,mobile,jobtitle,department
jdoe,John,Doe,john.doe@example.com,John Doe,+1-555-1234,+1-555-5678,Engineer,Engineering
asmith,Alice,Smith,alice.smith@example.com,Alice Smith,+1-555-9012,+1-555-3456,Manager,Marketing
# Basic usage
python freeipa_csv_sync.py --server https://ipa.example.com --username admin --csv-file users.csv
# Update existing users
python freeipa_csv_sync.py --server https://ipa.example.com --username admin --csv-file users.csv --update
# Specify password on command line (not recommended for security)
python freeipa_csv_sync.py --server https://ipa.example.com --username admin --password secret123 --csv-file users.csv
With FreeIPA 4.13.x, consider these additional capabilities:
If you need to create system accounts instead of regular users:
def create_system_account(self, name: str, description: str) -> bool:
"""Create a system account (4.13.x+)"""
try:
data = {
'method': 'sysaccount_add',
'params': [
[name],
{
'description': description
}
],
'id': 0
}
response = self.session.post(
self.base_url,
json=data,
headers={'Content-Type': 'application/json'}
)
result = response.json()
if 'error' in result and result['error'] is not None:
print(f"Error creating system account {name}: {result['error']['message']}")
return False
else:
print(f"Successfully created system account {name}")
return True
except Exception as e:
print(f"Error creating system account {name}: {str(e)}")
return False
# Test API connectivity
curl -k -X POST -H "Content-Type: application/json" \
-d '{"method":"user_find","params":[["admin"],{}],"id":0}' \
https://ipa.example.com/ipa/session/json
# Check user existence
ipa user-show admin