Files and I/O

Read a File

In Python 2, the content of the file which read from file system does not decode. That is, the content of the file is a byte string, not a Unicode string.

>>> with open("/etc/passwd") as f:
...    content = f.read()
>>> print(type(content))
<type 'str'>
>>> print(type(content.decode("utf-8")))
<type 'unicode'>

In Python 3, open provides encoding option. If files do not open in binary mode, the encoding will be determined by locale.getpreferredencoding(False) or user’s input.

>>> with open("/etc/hosts", encoding="utf-8") as f:
...     content = f.read()
...
>>> print(type(content))
<class 'str'>

Binary mode

>>> with open("/etc/hosts", "rb") as f:
...     content = f.read()
...
>>> print(type(content))
<class 'bytes'>

Readline

>>> with open("/etc/hosts") as f:
...     for line in f:
...         print(line, end='')
...
127.0.0.1       localhost
255.255.255.255     broadcasthost
::1             localhost

Reading File Chunks

>>> chunk_size = 16
>>> content = ''
>>> with open('/etc/hosts') as f:
...     for c in iter(lambda: f.read(chunk_size), ''):
...         content += c
...
>>> print(content)
127.0.0.1       localhost
255.255.255.255 broadcasthost
::1             localhost

Write a File

>>> content = "Awesome Python!"
>>> with open("foo.txt", "w") as f:
...     f.write(content)

Copy a File

>>> from distutils.file_util import copy_file
>>> copy_file("foo", "bar")
('bar', 1)

Move a File

>>> from distutils.file_util import move_file
>>> move_file("./foo", "./bar")
'./bar'

List a Directory

>>> >>> import os
>>> dirs = os.listdir(".")

After Python 3.6, we can use os.scandir to list a directory. It is more convenient because os.scandir return an iterator of os.DirEntry objects. In this case, we can get file information through access the attributes of os.DirEntry. Further information can be found on the document.

>>> with os.scandir("foo") as it:
...     for entry in it:
...         st = entry.stat()
...

Create Directories

Similar to mkdir -p /path/to/dest

>>> from distutils.dir_util import mkpath
>>> mkpath("foo/bar/baz")
['foo', 'foo/bar', 'foo/bar/baz']

Copy a Directory

>>> from distutils.dir_util import copy_tree
>>> copy_tree("foo", "bar")
['bar/baz']

Remove a Directory

>>> from distutils.dir_util import remove_tree
>>> remove_tree("dir")

Path Join

>>> from pathlib import Path
>>> p = Path("/Users")
>>> p = p / "Guido" / "pysheeet"
>>> p
PosixPath('/Users/Guido/pysheeet')

Get Absolute Path

>>> from pathlib import Path
>>> p = Path("README.rst")
PosixPath('/Users/Guido/pysheeet/README.rst')

Get Home Directory

>>> from pathlib import Path
>>> Path.home()
PosixPath('/Users/Guido')

Get Current Directory

>>> from pathlib import Path
>>> p = Path("README.rst")
>>> p.cwd()
PosixPath('/Users/Guido/pysheeet')

Get Path Properties

>>> from pathlib import Path
>>> p = Path("README.rst").absolute()
>>> p.root
'/'
>>> p.anchor
'/'
>>> p.parent
PosixPath('/Users/Guido/pysheeet')
>>> p.parent.parent
PosixPath('/Users/Guido')
>>> p.name
'README.rst'
>>> p.suffix
'.rst'
>>> p.stem
'README'
>>> p.as_uri()
'file:///Users/Guido/pysheeet/README.rst'

Read a gzip CSV

import gzip
import csv

f = "example.gz"
with gzip.open(f, 'rt', newline='') as gz:
    reader = csv.DictReader(gz)
    for row in reader:
        print(row)

Linux Inotify

import selectors
import struct
import ctypes
import sys
import os

from pathlib import Path
from ctypes.util import find_library

# ref: <sys/inotify.h>
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200

INOTIFY_EVENT = "iIII"
INOTIFY_EVENT_LEN = struct.calcsize(INOTIFY_EVENT)

lib = find_library("c")
assert lib

libc = ctypes.CDLL(lib)


class Inotify(object):
    def __init__(self, path):
        self._path = path
        self._fd = None
        self._wd = None
        self._buf = b""
        self._sel = selectors.DefaultSelector()

    def init(self):
        fd = libc.inotify_init()
        if fd < 0:
            errno = ctypes.get_errno()
            raise OSError(errno, f"{os.strerror(errno)}")
        return fd

    def watch(self, fd, path):
        p = str(path).encode("utf8")
        wd = libc.inotify_add_watch(fd, p, IN_CREATE | IN_DELETE)
        if wd < 0:
            errno = ctypes.get_errno()
            raise OSError(errno, f"{os.strerror(errno)}")
        return wd

    def remove(self, fd, wd):
        libc.inotify_rm_watch(self._fd, self._wd)

    def handle(self, fd, *a):
        b = os.read(fd, 1024)
        if not b:
            return
        yield from self.parse(b);

    def parse(self, buf):
        self._buf += buf
        while True:
            l = len(self._buf)
            if l < INOTIFY_EVENT_LEN:
                break

            hd = self._buf[:INOTIFY_EVENT_LEN]
            wd, mask, cookie, length = struct.unpack(INOTIFY_EVENT, hd)
            event_length = INOTIFY_EVENT_LEN + length
            if l < event_length:
                break

            filename = self._buf[INOTIFY_EVENT_LEN:event_length]
            self._buf = self._buf[event_length:]
            yield mask, filename.rstrip(b"\0").decode("utf8")

    def __enter__(self):
        self._fd = self.init()
        self._wd = self.watch(self._fd, self._path)
        self._sel.register(self._fd, selectors.EVENT_READ, self.handle)
        return self

    def __exit__(self, *e):
        self.remove(self._fd, self._wd)
        if len(e) > 0 and e[0]:
            print(e, file=sys.stderr)

    def run(self):
        while True:
            events = self._sel.select()
            for k, mask in events:
                cb = k.data
                yield from cb(k.fileobj, mask)


with Inotify(Path("/tmp")) as i:
    for m, f in i.run():
        print(m, f)