# -*- coding: utf-8 -*- r""" werkzeug.posixemulation ~~~~~~~~~~~~~~~~~~~~~~~ Provides a POSIX emulation for some features that are relevant to web applications. The main purpose is to simplify support for systems such as Windows NT that are not 100% POSIX compatible. Currently this only implements a :func:`rename` function that follows POSIX semantics. Eg: if the target file already exists it will be replaced without asking. This module was introduced in 0.6.1 and is not a public interface. It might become one in later versions of Werkzeug. :copyright: 2007 Pallets :license: BSD-3-Clause """ import errno import os import random import sys import time from ._compat import to_unicode from .filesystem import get_filesystem_encoding can_rename_open_file = False if os.name == "nt": try: import ctypes _MOVEFILE_REPLACE_EXISTING = 0x1 _MOVEFILE_WRITE_THROUGH = 0x8 _MoveFileEx = ctypes.windll.kernel32.MoveFileExW def _rename(src, dst): src = to_unicode(src, get_filesystem_encoding()) dst = to_unicode(dst, get_filesystem_encoding()) if _rename_atomic(src, dst): return True retry = 0 rv = False while not rv and retry < 100: rv = _MoveFileEx( src, dst, _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH ) if not rv: time.sleep(0.001) retry += 1 return rv # new in Vista and Windows Server 2008 _CreateTransaction = ctypes.windll.ktmw32.CreateTransaction _CommitTransaction = ctypes.windll.ktmw32.CommitTransaction _MoveFileTransacted = ctypes.windll.kernel32.MoveFileTransactedW _CloseHandle = ctypes.windll.kernel32.CloseHandle can_rename_open_file = True def _rename_atomic(src, dst): ta = _CreateTransaction(None, 0, 0, 0, 0, 1000, "Werkzeug rename") if ta == -1: return False try: retry = 0 rv = False while not rv and retry < 100: rv = _MoveFileTransacted( src, dst, None, None, _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH, ta, ) if rv: rv = _CommitTransaction(ta) break else: time.sleep(0.001) retry += 1 return rv finally: _CloseHandle(ta) except Exception: def _rename(src, dst): return False def _rename_atomic(src, dst): return False def rename(src, dst): # Try atomic or pseudo-atomic rename if _rename(src, dst): return # Fall back to "move away and replace" try: os.rename(src, dst) except OSError as e: if e.errno != errno.EEXIST: raise old = "%s-%08x" % (dst, random.randint(0, sys.maxsize)) os.rename(dst, old) os.rename(src, dst) try: os.unlink(old) except Exception: pass else: rename = os.rename can_rename_open_file = True