Add posix_to_nfsv4_acl converter.

This commit is contained in:
Darren 'Tadgy' Austin 2026-04-24 15:38:02 +01:00
commit 2f989d0721

239
posix_to_nfsv4_acl Executable file
View file

@ -0,0 +1,239 @@
#!/usr/bin/env python3
# wanlin.wang
# 2024/12/14, version 1.0
# 2024/12/21, version 1.1, add support for Python 3.6.8 and other improvements
import os
import subprocess
import shlex
# --- POSIX => NFSv4 基本权限映射表 ---
# 针对 'r', 'w', 'x' 做粗粒度映射到 NFSv4 权限子集。
# 这里示例借助alias来简化设置。规则是
# - file/dir: r -> R , w -> W , x -> X
# - file: r -> rntcy, w -> watTNcCy, x -> xtcy
# - dir: r -> rntcy, w -> waDtTNcCy, x -> xtcy
PERM_MAP = {
'r': 'R',
'w': 'W',
'x': 'X',
}
def convert_posix_perm_to_nfs4(posix_perm):
"""
将类似 'r-x' 的 POSIX 权限字符串映射为简单的 NFSv4 权限字符串。
"""
perms = []
if 'r' in posix_perm:
perms.append(PERM_MAP['r'])
if 'w' in posix_perm:
perms.append(PERM_MAP['w'])
if 'x' in posix_perm:
perms.append(PERM_MAP['x'])
merged = "".join(perms) # 例如 ['rtncy', 'x'] -> 'rtncyx'
# 去重保持顺序
result = "".join(sorted(set(merged), key=merged.index))
return result
def apply_mask(posix_perm, mask_perm):
"""
把某个条目的 nominal 权限与 mask 做交集,得到实际生效权限 (effective perms)。
mask_perm & posix_perm 都是三字符形式,如 'rwx', 'r-x', '---'。
逻辑:如果 mask 里无 'w',则最终也无 'w'。
"""
effective = ''
for i, c in enumerate(['r','w','x']):
if c in posix_perm and c in mask_perm:
effective += c
else:
effective += '-'
return effective
def parse_getfacl_output(acl_output):
result = {
'owner': None,
'group': None,
'mask': None, # 普通mask
'default_mask': None, # 新增: default:mask
'acl_entries': [],
'default_acl_entries': [],
}
lines = acl_output.strip().split('\n')
for raw_line in lines:
line = raw_line.strip()
# 去掉行内注释
if '#' in line:
line = line.split('#', 1)[0].strip()
if not line:
continue
parts = line.split(':')
if line.startswith('default:'):
entry_type = parts[1] # user / group / mask / other
if entry_type == 'mask':
# default:mask::rwx
result['default_mask'] = parts[-1] # 'rwx'
else:
name = ''
perm = parts[-1]
if len(parts) == 4:
name = parts[2]
result['default_acl_entries'].append({
'type': entry_type,
'name': name,
'perm': perm
})
else:
entry_type = parts[0]
if entry_type == 'mask':
result['mask'] = parts[-1]
else:
name = ''
perm = parts[-1]
if entry_type in ('user','group') and len(parts) == 3:
name = parts[1]
result['acl_entries'].append({
'type': entry_type,
'name': name,
'perm': perm
})
return result
# def build_nfs4_acl_cmd(acl_info, path="/dummy", domain='localdomain'):
def build_nfs4_ace(acl_info, path="/dummy", domain='localdomain'):
ace_list = []
# 普通 mask
normal_mask = acl_info['mask'] if acl_info['mask'] else 'rwx'
# default mask
default_mask = acl_info.get('default_mask', None)
if not default_mask:
default_mask = 'rwx'
# 1) OWNER@
owner_acl = next((e for e in acl_info['acl_entries']
if e['type'] == 'user' and e['name'] == ''), None)
if owner_acl:
# owner 不受普通mask约束
eff = apply_mask(owner_acl['perm'], 'rwx')
else:
eff = 'rwx'
ace_list.append(f"A::OWNER@:{convert_posix_perm_to_nfs4(eff)}")
# 2) user:xxx 或 group:xxx (普通条目)
for entry in acl_info['acl_entries']:
if entry['type'] == 'user' and entry['name']:
eff = apply_mask(entry['perm'], normal_mask)
nfs4_perm = convert_posix_perm_to_nfs4(eff)
if nfs4_perm:
ace_list.append(f"A::{entry['name']}@{domain}:{nfs4_perm}")
if entry['type'] == 'group' and entry['name']:
eff = apply_mask(entry['perm'], normal_mask)
nfs4_perm = convert_posix_perm_to_nfs4(eff)
if nfs4_perm:
ace_list.append(f"A:g:{entry['name']}@{domain}:{nfs4_perm}")
else:
ace_list.append(f"D:g:{entry['name']}@{domain}:RWXdo")
# 3) group:: (空组)
group_acl = next((e for e in acl_info['acl_entries']
if e['type'] == 'group' and e['name'] == ''), None)
if group_acl:
eff = apply_mask(group_acl['perm'], normal_mask)
g_perm = convert_posix_perm_to_nfs4(eff)
if g_perm:
ace_list.append(f"A:g:GROUP@:{g_perm}")
else:
ace_list.append(f"D:g:GROUP@:RWXdo")
# 4) other::
other_acl = next((e for e in acl_info['acl_entries']
if e['type'] == 'other'), None)
if other_acl:
eff = apply_mask(other_acl['perm'], normal_mask)
o_perm = convert_posix_perm_to_nfs4(eff)
if o_perm:
ace_list.append(f"A::EVERYONE@:{o_perm}")
else:
ace_list.append(f"D::EVERYONE@:RWXdo")
# 5) default: ACL (带继承)
for def_entry in acl_info['default_acl_entries']:
entry_type = def_entry['type']
nominal = def_entry['perm']
# 应用 default_mask
eff = apply_mask(nominal, default_mask)
nfs4_perm = convert_posix_perm_to_nfs4(eff)
if not nfs4_perm:
continue
if entry_type == 'user':
if def_entry['name'] == '':
ace_list.append(f"A:fd:OWNER@:{nfs4_perm}")
else:
ace_list.append(f"A:fd:{def_entry['name']}@{domain}:{nfs4_perm}")
elif entry_type == 'group':
if def_entry['name'] == '':
ace_list.append(f"A:fdg:GROUP@:{nfs4_perm}")
else:
ace_list.append(f"A:fdg:{def_entry['name']}@{domain}:{nfs4_perm}")
elif entry_type == 'other':
pass
return ace_list
def convert_acl_for_directory(root_dir, new_root_dir, domain='localdomain', max_depth=3, nfs4_cmd_file='nfs4_acl_cmds.txt'):
"""
遍历 root_dir 下所有文件和目录,
读取POSIX ACL并转换为NFSv4 ACL (包含 default: 继承处理, mask限制)
"""
for dirpath, dirnames, filenames in os.walk(root_dir):
# 计算当前目录的深度
depth = dirpath[len(root_dir):].count(os.sep)
if depth >= max_depth:
# 如果当前目录的深度超过了最大深度,则不再递归遍历子目录。适用于大型目录结构,只在最上的几层控制。
dirnames[:] = []
all_paths = [dirpath] + [os.path.join(dirpath, f) for f in filenames]
for p in all_paths:
try:
# 获取 POSIX ACL。这里调用subprocess时用了universal_newlines=True以便输出是str而不是bytes。text=True也可以用于高于3.7的Python版本。
completed_proc = subprocess.run(["getfacl", p],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
acl_output = completed_proc.stdout
except subprocess.CalledProcessError as e:
print(f"Warning: getfacl failed on {p}, error: {e}")
continue
acl_info = parse_getfacl_output(acl_output)
ace_list = build_nfs4_ace(acl_info, p, domain=domain)
if ace_list:
new_dir = p.replace(root_dir, new_root_dir)
nfs4_cmd = f"nfs4_setfacl -s \"{' '.join(shlex.quote(x) for x in ace_list)}\" {new_dir}"
print(f"\n# Got POSIX ACL from {p}, setting NFSv4 ACL for {new_dir} with command:")
print(nfs4_cmd)
with open(nfs4_cmd_file, 'a') as f:
f.write(f"\n# Directory: {dirpath}\n")
f.write(nfs4_cmd)
f.write("\n")
def main():
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <directory> <new_directory> [domain]")
sys.exit(1)
root_dir = sys.argv[1]
new_root_dir = sys.argv[2]
domain = sys.argv[3] if len(sys.argv) > 3 else 'localdomain'
if not os.path.isdir(root_dir):
print(f"Error: {root_dir} is not a valid directory.")
sys.exit(1)
convert_acl_for_directory(root_dir, new_root_dir, domain)
if __name__ == "__main__":
main()