This guide provides a script to sync users between a MySQL database and a FreeIPA server. The script should run on the FreeIPA server. The MySQL database contains user information (username, email), while password management is handled by FreeIPA. FreeIPA 4.13.1 includes enhanced API capabilities and security features that improve user management workflows.
mysql-connector-python and python-freeipa librariesInstall required Python packages:
pip install mysql-connector-python python-freeipa
Here’s an improved Python script that syncs users from a MySQL database to a FreeIPA server:
#!/usr/bin/env python3
"""
MySQL to FreeIPA User Sync Script
Sync users from a MySQL database to a FreeIPA server
"""
import mysql.connector
import os
import sys
from typing import List, Dict, Any
from python_freeipa import Client, exceptions
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MySQLFreeIPASync:
def __init__(self):
# MySQL configuration from environment variables
self.mysql_config = {
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', ''),
'host': os.getenv('MYSQL_HOST', 'localhost'),
'database': os.getenv('MYSQL_DATABASE', 'users_db'),
'port': int(os.getenv('MYSQL_PORT', 3306))
}
# FreeIPA configuration from environment variables
self.ipa_server = os.getenv('IPA_SERVER', 'ipa.example.com')
self.ipa_username = os.getenv('IPA_USERNAME', 'admin')
self.ipa_password = os.getenv('IPA_PASSWORD', '')
# Initialize FreeIPA client
self.ipa_client = None
def connect_to_mysql(self):
"""Establish connection to MySQL database"""
try:
conn = mysql.connector.connect(**self.mysql_config)
logger.info("Successfully connected to MySQL database")
return conn
except mysql.connector.Error as e:
logger.error(f"MySQL connection failed: {e}")
return None
def connect_to_freeipa(self):
"""Establish connection to FreeIPA server"""
try:
self.ipa_client = Client(self.ipa_server)
self.ipa_client.login(self.ipa_username, self.ipa_password)
logger.info("Successfully connected to FreeIPA server")
return True
except exceptions.AuthenticationError as e:
logger.error(f"FreeIPA authentication failed: {e}")
return False
except Exception as e:
logger.error(f"FreeIPA connection failed: {e}")
return False
def get_users_from_mysql(self) -> List[Dict[str, Any]]:
"""Retrieve users from MySQL database"""
conn = self.connect_to_mysql()
if not conn:
return []
try:
cursor = conn.cursor(dictionary=True)
# Query to fetch users from MySQL
# Adjust the query based on your table structure
query = """
SELECT username, first_name, last_name, email, department, title, phone, mobile,
status, created_date, last_login
FROM users
WHERE status = 'active' -- Only sync active users
ORDER BY username
"""
cursor.execute(query)
users = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"Retrieved {len(users)} users from MySQL")
return users
except mysql.connector.Error as e:
logger.error(f"Error retrieving users from MySQL: {e}")
return []
def user_exists_in_ipa(self, username: str) -> bool:
"""Check if a user exists in FreeIPA"""
try:
self.ipa_client.user_show(username)
return True
except exceptions.NotFound:
return False
except Exception as e:
logger.error(f"Error checking if user {username} exists in FreeIPA: {e}")
return False
def create_user_in_ipa(self, user_data: Dict[str, Any]) -> bool:
"""Create a user in FreeIPA"""
try:
# Prepare user attributes
first_name = user_data.get('first_name') or user_data['username'].split('.')[0].capitalize()
last_name = user_data.get('last_name') or user_data['username'].split('.')[-1].capitalize()
# Create user in FreeIPA
self.ipa_client.user_add(
user_data['username'],
first_name=first_name,
last_name=last_name,
full_name=f"{first_name} {last_name}",
display_name=user_data.get('first_name', '') + ' ' + user_data.get('last_name', ''),
email=user_data.get('email', ''),
job_title=user_data.get('title', ''),
department_number=user_data.get('department', ''),
telephone_number=user_data.get('phone', ''),
mobile=user_data.get('mobile', ''),
preferred_language='en_US'
)
logger.info(f"Successfully created user {user_data['username']} in FreeIPA")
return True
except exceptions.DuplicateEntry:
logger.warning(f"User {user_data['username']} already exists in FreeIPA")
return True # Return True as this isn't a failure
except Exception as e:
logger.error(f"Failed to create user {user_data['username']} in FreeIPA: {e}")
return False
def update_user_in_ipa(self, user_data: Dict[str, Any]) -> bool:
"""Update an existing user in FreeIPA"""
try:
# Prepare update attributes
attrs = {}
if user_data.get('first_name'):
attrs['first_name'] = user_data['first_name']
if user_data.get('last_name'):
attrs['last_name'] = user_data['last_name']
if user_data.get('email'):
attrs['email'] = user_data['email']
if user_data.get('title'):
attrs['job_title'] = user_data['title']
if user_data.get('department'):
attrs['department_number'] = user_data['department']
if user_data.get('phone'):
attrs['telephone_number'] = user_data['phone']
if user_data.get('mobile'):
attrs['mobile'] = user_data['mobile']
if attrs: # Only update if there are attributes to update
self.ipa_client.user_mod(user_data['username'], **attrs)
logger.info(f"Successfully updated user {user_data['username']} in FreeIPA")
return True
except Exception as e:
logger.error(f"Failed to update user {user_data['username']} in FreeIPA: {e}")
return False
def deactivate_user_in_ipa(self, username: str) -> bool:
"""Deactivate a user in FreeIPA (if needed based on MySQL status)"""
try:
self.ipa_client.user_disable(username)
logger.info(f"Successfully disabled user {username} in FreeIPA")
return True
except exceptions.NotFound:
logger.warning(f"User {username} not found in FreeIPA, cannot disable")
return True # Not an error if user doesn't exist
except Exception as e:
logger.error(f"Failed to disable user {username} in FreeIPA: {e}")
return False
def sync_users(self, update_existing: bool = True, deactivate_missing: bool = False) -> int:
"""Main synchronization function"""
# Connect to FreeIPA
if not self.connect_to_freeipa():
logger.error("Cannot proceed without FreeIPA connection")
return 0
# Get users from MySQL
mysql_users = self.get_users_from_mysql()
if not mysql_users:
logger.error("No users retrieved from MySQL, aborting sync")
return 0
success_count = 0
# Process each user from MySQL
for user_data in mysql_users:
username = user_data['username']
if not username:
logger.warning("Skipping user with empty username")
continue
# Check if user exists in FreeIPA
if self.user_exists_in_ipa(username):
if update_existing:
if self.update_user_in_ipa(user_data):
success_count += 1
else:
logger.info(f"User {username} exists in FreeIPA, skipping (use update mode to modify)")
success_count += 1 # Count as success since user exists
else:
if self.create_user_in_ipa(user_data):
success_count += 1
# Optionally deactivate users in FreeIPA that are not in MySQL
if deactivate_missing:
success_count += self.deactivate_missing_users(mysql_users)
return success_count
def deactivate_missing_users(self, mysql_users: List[Dict[str, Any]]) -> int:
"""Deactivate FreeIPA users that are not in the MySQL database"""
try:
# Get all users from FreeIPA
ipa_users = self.ipa_client.user_find(sizelimit=0)
mysql_usernames = {user['username'] for user in mysql_users}
deactivated_count = 0
for ipa_user in ipa_users['result']:
username = ipa_user['uid'][0]
# Skip admin user and other system users
if username in ['admin'] or username.startswith('ipa-'):
continue
if username not in mysql_usernames:
if self.deactivate_user_in_ipa(username):
deactivated_count += 1
logger.info(f"Deactivated {deactivated_count} users not in MySQL database")
return deactivated_count
except Exception as e:
logger.error(f"Error deactivating missing users: {e}")
return 0
def main():
# Initialize the sync tool
sync_tool = MySQLFreeIPASync()
# Get command-line arguments or use environment variables
import argparse
parser = argparse.ArgumentParser(description='Sync users from MySQL to FreeIPA')
parser.add_argument('--update', action='store_true', help='Update existing users in FreeIPA')
parser.add_argument('--deactivate-missing', action='store_true', help='Deactivate users in FreeIPA not in MySQL')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes')
args = parser.parse_args()
if args.dry_run:
logger.info("DRY RUN MODE: No changes will be made to FreeIPA")
# In dry run mode, we'll just show what would happen
mysql_users = sync_tool.get_users_from_mysql()
print(f"Would process {len(mysql_users)} users from MySQL")
for user in mysql_users[:5]: # Show first 5 users
print(f" - {user['username']}: {user.get('first_name', '')} {user.get('last_name', '')}")
if len(mysql_users) > 5:
print(f" ... and {len(mysql_users) - 5} more users")
return 0
# Perform the actual sync
logger.info("Starting MySQL to FreeIPA user synchronization")
success_count = sync_tool.sync_users(update_existing=args.update, deactivate_missing=args.deactivate_missing)
logger.info(f"Synchronization completed. Successfully processed {success_count} users.")
return success_count
if __name__ == "__main__":
sys.exit(main())
Set the following environment variables to configure the script:
export MYSQL_USER="your_mysql_user"
export MYSQL_PASSWORD="your_mysql_password"
export MYSQL_HOST="your_mysql_host"
export MYSQL_DATABASE="your_database_name"
export MYSQL_PORT="3306"
export IPA_SERVER="ipa.example.com"
export IPA_USERNAME="admin"
export IPA_PASSWORD="your_admin_password"
# Basic sync (create new users, skip existing)
python mysql_freeipa_sync.py
# Update existing users as well
python mysql_freeipa_sync.py --update
# Deactivate FreeIPA users not in MySQL
python mysql_freeipa_sync.py --update --deactivate-missing
# Dry run to see what would be done
python mysql_freeipa_sync.py --dry-run
With FreeIPA 4.13.x, consider these additional capabilities:
If you need to create system accounts instead of regular users:
def create_system_account(self, name: str, description: str) -> bool:
"""Create a system account (4.13.x+)"""
try:
self.ipa_client._request('sysaccount_add', name, description=description)
logger.info(f"Successfully created system account {name}")
return True
except Exception as e:
logger.error(f"Failed to create system account {name}: {e}")
return False
FreeIPA 4.13.x includes Random Serial Numbers (RSN) for certificates by default, enhancing security for any certificates issued to synchronized users.
# Test MySQL connectivity
mysql -h $MYSQL_HOST -u $MYSQL_USER -p -e "SELECT COUNT(*) FROM users;"
# Test FreeIPA connectivity
ipa user-find --sizelimit=1
# Check specific user
ipa user-show username