Source code for streaming.base.shared.memory

# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Improved quiet implementation of shared memory in pure python."""

import atexit
from multiprocessing import resource_tracker  # pyright: ignore
from multiprocessing.shared_memory import SharedMemory as BuiltinSharedMemory
from time import sleep
from typing import Any, Optional

from streaming.base.constant import TICK


[docs]class SharedMemory: """Improved quiet implementation of shared memory. Args: name (str, optional): A unique shared memory block name. Defaults to ``None``. create (bool, optional): Creates a new shared memory block or attaches to an existing shared memory block. Defaults to ``None``. size (int, optional): A size of a shared memory block. Defaults to ``0``. auto_cleanup (bool, optional): Register atexit handler for cleanup or not. Defaults to ``True``. """ def __init__(self, name: Optional[str] = None, create: Optional[bool] = None, size: int = 0, auto_cleanup: bool = True): self.created_shms = [] self.opened_shms = [] shm = None # save the original register tracker function original_rtracker_reg = resource_tracker.register try: if create is True: # Creates a new shared memory block shm = BuiltinSharedMemory(name, create, size) self.created_shms.append(shm) elif create is False: # Avoid tracking shared memory resources in a process who attaches to an existing # shared memory block because the process who created the shared memory is # responsible for destroying the shared memory block. resource_tracker.register = self.fix_register # Attaches to an existing shared memory block shm = BuiltinSharedMemory(name, create, size) self.opened_shms.append(shm) else: try: # Creates a new shared memory block shm = BuiltinSharedMemory(name, True, size) self.created_shms.append(shm) except FileExistsError: sleep(TICK) resource_tracker.register = self.fix_register # Attaches to an existing shared memory block shm = BuiltinSharedMemory(name, False, size) self.opened_shms.append(shm) self.shm = shm finally: resource_tracker.register = original_rtracker_reg if auto_cleanup: # atexit handler doesn't get called if the program is killed by a signal not # handled by python or when os.exit() is called or for any python internal fatal error. atexit.register(self.cleanup) @property def buf(self) -> memoryview: """Internal buffer accessor. Returns: memoryview: Internal buffer. """ return self.shm.buf # Monkey-patched "multiprocessing.resource_tracker" to skip unwanted resource tracker warnings. # PR to remove resource tracker unlinking: https://github.com/python/cpython/pull/15989
[docs] def fix_register(self, name: str, rtype: str) -> Any: """Skip registering resource tracking for shared memory. Args: name (str): Name of a shared memory rtype (str): Name of a resource type Returns: Any: resource tracker or None """ if rtype == 'shared_memory': return return resource_tracker._resource_tracker.register(self, name, rtype)
[docs] def fix_unregister(self, name: str, rtype: str) -> Any: """Skip un-registering resource tracking for shared memory. Args: name (str): Name of a shared memory rtype (str): Name of a resource type Returns: Any: resource tracker or None """ if rtype == 'shared_memory': return return resource_tracker._resource_tracker.unregister(self, name, rtype)
[docs] def cleanup(self): """Clean up SharedMemory resources.""" # save the original unregister tracker function original_rtracker_unreg = resource_tracker.unregister # Close each SharedMemory instance try: for shm in self.created_shms: shm.close() # Destroy the shared memory block shm.unlink() for shm in self.opened_shms: resource_tracker.unregister = self.fix_unregister shm.close() # skip the error if a child process already cleaned up the shared memory except FileNotFoundError: pass finally: resource_tracker.unregister = original_rtracker_unreg