import ctypes
import locale
import logging
import os
import pymem.ressources.advapi32
import pymem.ressources.kernel32
import pymem.ressources.psapi
import pymem.ressources.structure
logger = logging.getLogger(__name__)
[docs]def get_python_dll(version):
"""Given a python dll version will find its path using the current process as a placeholder
Parameters
----------
version: str
A string representation of python version as a dll (python38.dll)
Returns
-------
str
The full path of dll
"""
current_process_id = os.getpid()
current_process_handle = pymem.process.open(current_process_id)
for module in pymem.process.enum_process_module(current_process_handle):
if module.name == version:
return module.filename
[docs]def inject_dll(handle, filepath):
"""Inject a dll into opened process.
Parameters
----------
handle: int
Handle to an open object
filepath: bytes
Dll to be injected filepath
Returns
-------
DWORD
The address of injected dll
"""
filepath_address = pymem.ressources.kernel32.VirtualAllocEx(
handle,
0,
len(filepath),
pymem.ressources.structure.MEMORY_STATE.MEM_COMMIT.value | pymem.ressources.structure.MEMORY_STATE.MEM_RESERVE.value,
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_EXECUTE_READWRITE.value
)
pymem.ressources.kernel32.WriteProcessMemory(handle, filepath_address, filepath, len(filepath), None)
kernel32_handle = pymem.ressources.kernel32.GetModuleHandleW("kernel32.dll")
load_library_a_address = pymem.ressources.kernel32.GetProcAddress(kernel32_handle, b"LoadLibraryA")
thread_h = pymem.ressources.kernel32.CreateRemoteThread(
handle, None, 0, load_library_a_address, filepath_address, 0, None
)
pymem.ressources.kernel32.WaitForSingleObject(thread_h, -1)
pymem.ressources.kernel32.VirtualFreeEx(
handle, filepath_address, len(filepath), pymem.ressources.structure.MEMORY_STATE.MEM_RELEASE.value
)
dll_name = os.path.basename(filepath)
dll_name = dll_name.decode('ascii')
module_address = pymem.ressources.kernel32.GetModuleHandleW(dll_name)
return module_address
[docs]def get_luid(name):
"""Get the LUID for the SeCreateSymbolicLinkPrivilege
"""
luid = pymem.ressources.structure.LUID()
res = pymem.ressources.advapi32.LookupPrivilegeValue(None, name, luid)
if not res > 0:
raise RuntimeError("Couldn't lookup privilege value")
return luid
[docs]def get_process_token():
"""Get the current process token
"""
token = ctypes.c_void_p()
res = pymem.ressources.advapi32.OpenProcessToken(
ctypes.windll.kernel32.GetCurrentProcess(),
pymem.ressources.structure.TOKEN.TOKEN_ALL_ACCESS,
token
)
if not res > 0:
raise RuntimeError("Couldn't get process token")
return token
[docs]def set_debug_privilege(lpszPrivilege, bEnablePrivilege):
"""Leverage current process privileges.
Parameters
----------
lpszPrivilege: str
Privilege name
bEnablePrivilege: bool
Enable privilege
Returns
-------
bool
If privileges have been leveraged
"""
# create a space in memory for a TOKEN_PRIVILEGES structure
# with one element
size = ctypes.sizeof(pymem.ressources.structure.TOKEN_PRIVILEGES)
size += ctypes.sizeof(pymem.ressources.structure.LUID_AND_ATTRIBUTES)
buffer = ctypes.create_string_buffer(size)
tp = ctypes.cast(buffer, ctypes.POINTER(pymem.ressources.structure.TOKEN_PRIVILEGES)).contents
tp.count = 1
tp.get_array()[0].LUID = get_luid(lpszPrivilege)
tp.get_array()[0].Attributes = (
pymem.ressources.structure.SE_TOKEN_PRIVILEGE.SE_PRIVILEGE_ENABLED if bEnablePrivilege else 0
)
token = get_process_token()
res = pymem.ressources.advapi32.AdjustTokenPrivileges(token, False, tp, 0, None, None)
if res == 0:
raise RuntimeError("AdjustTokenPrivileges error: 0x%08x\n" % ctypes.GetLastError())
ERROR_NOT_ALL_ASSIGNED = 1300
return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED
[docs]def base_module(handle):
"""Returns process base module
Parameters
----------
handle: int
A valid handle to an open object
Returns
-------
MODULEINFO
The base module of the process
"""
hModules = (ctypes.c_void_p * 1024)()
process_module_success = pymem.ressources.psapi.EnumProcessModulesEx(
handle,
ctypes.byref(hModules),
ctypes.sizeof(hModules),
ctypes.byref(ctypes.c_ulong()),
pymem.ressources.structure.EnumProcessModuleEX.LIST_MODULES_ALL
)
if not process_module_success:
return # xxx
module_info = pymem.ressources.structure.MODULEINFO(handle)
pymem.ressources.psapi.GetModuleInformation(
handle,
ctypes.c_void_p(hModules[0]),
ctypes.byref(module_info),
ctypes.sizeof(module_info)
)
return module_info
[docs]def open(process_id, debug=True, process_access=None):
"""Open a process given its process_id.
By default, the process is opened with full access and in debug mode.
https://msdn.microsoft.com/en-us/library/windows/desktop/ms684320%28v=vs.85%29.aspx
https://msdn.microsoft.com/en-us/library/windows/desktop/aa379588%28v=vs.85%29.aspx
Parameters
----------
process_id: int
The identifier of the process to be opened
debug: bool
If the process should be opened in debug mode
process_access: pymem.ressources.structure.PROCESS
Desired access level, defaulting to all access
Returns
-------
int
A handle to the opened process
"""
if not process_access:
process_access = pymem.ressources.structure.PROCESS.PROCESS_ALL_ACCESS.value
if debug:
set_debug_privilege('SeDebugPrivilege', True)
process_handle = pymem.ressources.kernel32.OpenProcess(process_access, False, process_id)
return process_handle
[docs]def open_main_thread(process_id):
"""List given process threads and return a handle to first created one.
Parameters
----------
process_id: int
The identifier of the process
Returns
-------
int
A handle to the main thread
"""
threads = enum_process_thread(process_id)
threads = sorted(threads, key=lambda t32: t32.creation_time)
if not threads:
return # todo: raise exception
main_thread = threads[0]
thread_handle = open_thread(main_thread.th32ThreadID)
return thread_handle
# TODO: impl enum for thread access levels
[docs]def open_thread(thread_id, thread_access=None):
"""Opens an existing thread object.
https://msdn.microsoft.com/en-us/library/windows/desktop/ms684335%28v=vs.85%29.aspx
Parameters
----------
thread_id: int
The identifier of the thread to be opened
thread_access: int
Desired access level, defaulting to all access
Returns
-------
int
A handle to the opened thread
"""
if thread_access is None:
thread_access = 0x001F03FF
thread_handle = pymem.ressources.kernel32.OpenThread(thread_access, 0, thread_id)
return thread_handle
[docs]def close_handle(handle):
"""Closes an open object handle.
https://msdn.microsoft.com/en-us/library/windows/desktop/ms724211%28v=vs.85%29.aspx
Parameters
----------
handle: int
A valid handle to an open object
Returns
-------
bool
If the closure succeeded
"""
if not handle:
return
success = pymem.ressources.kernel32.CloseHandle(handle)
return success != 0
[docs]def list_processes():
"""List all processes
https://msdn.microsoft.com/en-us/library/windows/desktop/ms682489%28v=vs.85%29.aspx
https://msdn.microsoft.com/en-us/library/windows/desktop/ms684834%28v=vs.85%29.aspx
Returns
-------
list[ProcessEntry32]
A list of open process entries
"""
SNAPPROCESS = 0x00000002
hSnap = pymem.ressources.kernel32.CreateToolhelp32Snapshot(SNAPPROCESS, 0)
process_entry = pymem.ressources.structure.ProcessEntry32()
process_entry.dwSize = ctypes.sizeof(process_entry)
p32 = pymem.ressources.kernel32.Process32First(hSnap, ctypes.byref(process_entry))
if p32:
yield process_entry
while p32:
yield process_entry
p32 = pymem.ressources.kernel32.Process32Next(hSnap, ctypes.byref(process_entry))
pymem.ressources.kernel32.CloseHandle(hSnap)
[docs]def process_from_name(
name: str,
exact_match: bool = False,
ignore_case: bool = True,
):
"""Open a process given its name.
Parameters
----------
name:
The name of the process to be opened
exact_match:
Defaults to False, is the full name match or just part of it expected?
ignore_case:
Default to True, should ignore process name case?
Returns
-------
ProcessEntry32
The process entry of the opened process
"""
if ignore_case:
name = name.lower()
processes = list_processes()
for process in processes:
process_name = process.szExeFile.decode(locale.getpreferredencoding())
if ignore_case:
process_name = process_name.lower()
if exact_match:
if process_name == name:
return process
else:
if name in process_name:
return process
[docs]def process_from_id(process_id):
"""Open a process given its name.
Parameters
----------
process_id: int
The identifier of the process to be opened
Returns
-------
ProcessEntry32
The process entry of the opened process
"""
processes = list_processes()
for process in processes:
if process_id == process.th32ProcessID:
return process
[docs]def module_from_name(process_handle, module_name):
"""Retrieve a module loaded by given process.
Parameters
----------
process_handle: int
Handle to the process to get the module from
module_name: str
Name of the module to get
Returns
-------
MODULEINFO
The retrieved module
Examples
--------
>>> d3d9 = module_from_name(process_handle, 'd3d9')
"""
module_name = module_name.lower()
modules = enum_process_module(process_handle)
for module in modules:
if module.name.lower() == module_name:
return module
[docs]def enum_process_thread(process_id):
"""List all threads of given processes_id
Parameters
----------
process_id: int
Identifier of the process to enum the threads of
Returns
-------
list[ThreadEntry32]
The process's threads
"""
TH32CS_SNAPTHREAD = 0x00000004
hSnap = pymem.ressources.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0)
thread_entry = pymem.ressources.structure.ThreadEntry32()
ret = pymem.ressources.kernel32.Thread32First(hSnap, ctypes.byref(thread_entry))
if not ret:
raise pymem.exception.PymemError('Could not get Thread32First')
while ret:
if thread_entry.th32OwnerProcessID == process_id:
yield thread_entry
ret = pymem.ressources.kernel32.Thread32Next(hSnap, ctypes.byref(thread_entry))
pymem.ressources.kernel32.CloseHandle(hSnap)
[docs]def enum_process_module(handle):
"""List and retrieves the base names of the specified loaded module within a process
https://msdn.microsoft.com/en-us/library/windows/desktop/ms682633(v=vs.85).aspx
https://msdn.microsoft.com/en-us/library/windows/desktop/ms683196(v=vs.85).aspx
Parameters
----------
handle: int
Handle of the process to enum the modules of
Returns
-------
list[MODULEINFO]
The process's modules
"""
hModules = (ctypes.c_void_p * 1024)()
process_module_success = pymem.ressources.psapi.EnumProcessModulesEx(
handle,
ctypes.byref(hModules),
ctypes.sizeof(hModules),
ctypes.byref(ctypes.c_ulong()),
pymem.ressources.structure.EnumProcessModuleEX.LIST_MODULES_ALL
)
if process_module_success:
hModules = iter(m for m in hModules if m)
for hModule in hModules:
module_info = pymem.ressources.structure.MODULEINFO(handle)
pymem.ressources.psapi.GetModuleInformation(
handle,
ctypes.c_void_p(hModule),
ctypes.byref(module_info),
ctypes.sizeof(module_info)
)
yield module_info
# TODO: should this be named is_wow64?
[docs]def is_64_bit(handle):
"""Determines whether the specified process is running under WOW64 (emulation).
Parameters
----------
handle: int
Handle of the process to check wow64 status of
Returns
-------
bool
If the process is running under wow64
"""
Wow64Process = ctypes.c_long()
pymem.ressources.kernel32.IsWow64Process(handle, ctypes.byref(Wow64Process))
return bool(Wow64Process.value)