Source code for dandan.system

from __future__ import unicode_literals, absolute_import


[docs]def is_win32(): """ Check os platform is win32 Returns: bool: True if is win32 else False """ import sys return "win32" in sys.platform
[docs]def is_linux(): """ Check os platform is linux Returns: bool: True if is linux else False """ import sys return "linux" in sys.platform
[docs]def is_python2(): """ Check current interpreter is python2 Returns: bool: True if is python2 else False """ import sys return sys.version_info.major == 2
[docs]def is_python3(): """ Check current interpreter is python3 Returns: bool: True if is python3 else False """ import sys return sys.version_info.major == 3
[docs]def set_unicode(): """ Set default encoding to utf8 """ if is_python3(): return import sys reload(sys) sys.setdefaultencoding("utf8")
[docs]def execute(command, callback=None, timeout=0): ''' Execute a system command return status and output Args: command (TYPE): execute command command must not bash command callback (None, optional): callback function per output line Returns: string: console output if callback is None int: execute exit code ''' import time import subprocess if not is_win32(): cmd = ["/bin/sh", "-c", command] shell = False else: cmd = command shell = True try: pipe = subprocess.Popen( cmd, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) except Exception as e: return e, -1 if not callback: res = pipe.stdout.read() pipe.communicate() if type(res) == bytes: line = "" for char in res: try: char = chr(char) line += char except Exception: continue res = line return (res, pipe.returncode) start_time = time.time() line = "" while True: char = pipe.stdout.read(1) if not char: break if type(char) == bytes: try: char = bytes.decode(char) except Exception: continue if char not in ["\n", "\r"]: line += char continue callback(line) if timeout and (time.time() - start_time) > timeout: kill(pipe.pid) break # return (None, 9) line = "" pipe.communicate() return (None, pipe.returncode)
[docs]def kill(pid): ''' Kill progress with pid Args: pid (int): progress id ''' import psutil process = psutil.Process(pid) for proc in process.children(recursive=True): try: proc.kill() except Exception: continue process.kill()
[docs]def kill_command(command): ''' Kill progress with command Args: command (string): run command ''' if not command: return import re if is_win32(): tup = command.split(" ", 1) caption = tup[0] if len(tup) > 1: run_para = tup[1] cmd = '''wmic process where name="{}" get commandline,processid'''.format(caption) result, code = execute(cmd) results = result.splitlines() for result in results: match = re.search(r"\w+ +(?P<para>.+) +(?P<pid>\d+)", result) if not match: continue para = match.group("para").strip() if para != run_para: continue pid = match.group("pid") kill(pid) return elif is_linux(): cmd = '''ps -ef | grep "{}" | grep -v grep'''.format(command) result, code = execute(cmd) results = result.splitlines() for result in results: result = re.sub(" +", " ", result) match = re.search(r"\w+ (?P<pid>\d+) .+:\d\d (?P<cmd>.+)", result) if not match: continue cmd = match.group("cmd") if cmd not in [command, '/bin/sh -c ' + command]: continue pid = match.group("pid") kill(pid) pass
[docs]def readable(path): ''' Check if a given path is readable by the current user. Args: path (string): local system path Returns: bool: True if readable else False ''' import os if os.access(path, os.F_OK) and os.access(path, os.R_OK): return True return False
[docs]def writeable(path, check_parent=True): ''' Check if a given path is writeable by the current user. Args: * path (string): local system path * check_parent (bool, optional): If the path to check does not exist, check for the ability to write to the parent directory instead Returns: * bool: True if writeable else False ''' import os if os.access(path, os.F_OK) and os.access(path, os.W_OK): # The path exists and is writeable return True if os.access(path, os.F_OK) and not os.access(path, os.W_OK): # The path exists and is not writeable return False # The path does not exists or is not writeable if check_parent is False: # We're not allowed to check the parent directory of the provided path return False # Lets get the parent directory of the provided path parent_dir = os.path.dirname(path) if not os.access(parent_dir, os.F_OK): # Parent directory does not exit return False # Finally, return if we're allowed to write in the parent directory of the # provided path return os.access(parent_dir, os.W_OK)
def __getch_win32(): import msvcrt return msvcrt.getch() def __getch_linux(): import termios import sys import os fd = sys.stdin.fileno() old_ttyinfo = termios.tcgetattr(fd) new_ttyinfo = old_ttyinfo[:] new_ttyinfo[3] &= ~termios.ICANON new_ttyinfo[3] &= ~termios.ECHO try: termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo) return os.read(fd, 7) except Exception: return chr(3) finally: termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
[docs]def getch(): """ Get a char pressed on keyboard without press Enter Returns: char(s): return pressed key """ if is_win32(): ch = __getch_win32() elif is_linux(): ch = __getch_linux() else: ch = None if is_python2(): return unicode(ch) elif is_python3(): return ch.decode('utf8')
[docs]def clear(): """ Clear console screen """ import os if is_win32(): os.system("cls") elif is_linux(): os.system("clear")