Files and I/O#
Read a File#
In Python 2, the content of the file which read from file system does not decode. That is, the content of the file is a byte string, not a Unicode string.
>>> with open("/etc/passwd") as f:
... content = f.read()
>>> print(type(content))
<type 'str'>
>>> print(type(content.decode("utf-8")))
<type 'unicode'>
In Python 3, open
provides encoding option. If files do not open in binary mode, the encoding
will be determined by locale.getpreferredencoding(False) or user’s input.
>>> with open("/etc/hosts", encoding="utf-8") as f:
... content = f.read()
...
>>> print(type(content))
<class 'str'>
Binary mode
>>> with open("/etc/hosts", "rb") as f:
... content = f.read()
...
>>> print(type(content))
<class 'bytes'>
Readline#
>>> with open("/etc/hosts") as f:
... for line in f:
... print(line, end='')
...
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
Reading File Chunks#
>>> chunk_size = 16
>>> content = ''
>>> with open('/etc/hosts') as f:
... for c in iter(lambda: f.read(chunk_size), ''):
... content += c
...
>>> print(content)
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
Write a File#
>>> content = "Awesome Python!"
>>> with open("foo.txt", "w") as f:
... f.write(content)
Create a Symbolic Link#
>>> import os
>>> os.symlink("foo", "bar")
>>> os.readlink("bar")
'foo'
Copy a File#
>>> from distutils.file_util import copy_file
>>> copy_file("foo", "bar")
('bar', 1)
Move a File#
>>> from distutils.file_util import move_file
>>> move_file("./foo", "./bar")
'./bar'
List a Directory#
>>> >>> import os
>>> dirs = os.listdir(".")
After Python 3.6, we can use os.scandir to list a directory. It is more
convenient because os.scandir return an iterator of os.DirEntry objects.
In this case, we can get file information through access the attributes of
os.DirEntry. Further information can be found on the
document.
>>> with os.scandir("foo") as it:
... for entry in it:
... st = entry.stat()
...
Create Directories#
Similar to mkdir -p /path/to/dest
>>> from distutils.dir_util import mkpath
>>> mkpath("foo/bar/baz")
['foo', 'foo/bar', 'foo/bar/baz']
Copy a Directory#
>>> from distutils.dir_util import copy_tree
>>> copy_tree("foo", "bar")
['bar/baz']
Remove a Directory#
>>> from distutils.dir_util import remove_tree
>>> remove_tree("dir")
Path Join#
>>> from pathlib import Path
>>> p = Path("/Users")
>>> p = p / "Guido" / "pysheeet"
>>> p
PosixPath('/Users/Guido/pysheeet')
Get Absolute Path#
>>> from pathlib import Path
>>> p = Path("README.rst")
PosixPath('/Users/Guido/pysheeet/README.rst')
Get Home Directory#
>>> from pathlib import Path
>>> Path.home()
PosixPath('/Users/Guido')
Get Current Directory#
>>> from pathlib import Path
>>> p = Path("README.rst")
>>> p.cwd()
PosixPath('/Users/Guido/pysheeet')
Get Path Properties#
>>> from pathlib import Path
>>> p = Path("README.rst").absolute()
>>> p.root
'/'
>>> p.anchor
'/'
>>> p.parent
PosixPath('/Users/Guido/pysheeet')
>>> p.parent.parent
PosixPath('/Users/Guido')
>>> p.name
'README.rst'
>>> p.suffix
'.rst'
>>> p.stem
'README'
>>> p.as_uri()
'file:///Users/Guido/pysheeet/README.rst'
Read a gzip CSV#
import gzip
import csv
f = "example.gz"
with gzip.open(f, 'rt', newline='') as gz:
reader = csv.DictReader(gz)
for row in reader:
print(row)
Linux Inotify#
import selectors
import struct
import ctypes
import sys
import os
from pathlib import Path
from ctypes.util import find_library
# ref: <sys/inotify.h>
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
INOTIFY_EVENT = "iIII"
INOTIFY_EVENT_LEN = struct.calcsize(INOTIFY_EVENT)
lib = find_library("c")
assert lib
libc = ctypes.CDLL(lib)
class Inotify(object):
def __init__(self, path):
self._path = path
self._fd = None
self._wd = None
self._buf = b""
self._sel = selectors.DefaultSelector()
def init(self):
fd = libc.inotify_init()
if fd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return fd
def watch(self, fd, path):
p = str(path).encode("utf8")
wd = libc.inotify_add_watch(fd, p, IN_CREATE | IN_DELETE)
if wd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return wd
def remove(self, fd, wd):
libc.inotify_rm_watch(self._fd, self._wd)
def handle(self, fd, *a):
b = os.read(fd, 1024)
if not b:
return
yield from self.parse(b);
def parse(self, buf):
self._buf += buf
while True:
l = len(self._buf)
if l < INOTIFY_EVENT_LEN:
break
hd = self._buf[:INOTIFY_EVENT_LEN]
wd, mask, cookie, length = struct.unpack(INOTIFY_EVENT, hd)
event_length = INOTIFY_EVENT_LEN + length
if l < event_length:
break
filename = self._buf[INOTIFY_EVENT_LEN:event_length]
self._buf = self._buf[event_length:]
yield mask, filename.rstrip(b"\0").decode("utf8")
def __enter__(self):
self._fd = self.init()
self._wd = self.watch(self._fd, self._path)
self._sel.register(self._fd, selectors.EVENT_READ, self.handle)
return self
def __exit__(self, *e):
self.remove(self._fd, self._wd)
if len(e) > 0 and e[0]:
print(e, file=sys.stderr)
def run(self):
while True:
events = self._sel.select()
for k, mask in events:
cb = k.data
yield from cb(k.fileobj, mask)
with Inotify(Path("/tmp")) as i:
for m, f in i.run():
print(m, f)