Files and I/O¶
Read a File¶
In Python 2, the content of the file which read from file system does not decode. That is, the content of the file is a byte string, not a Unicode string.
>>> with open("/etc/passwd") as f:
... content = f.read()
>>> print(type(content))
<type 'str'>
>>> print(type(content.decode("utf-8")))
<type 'unicode'>
In Python 3, open
provides encoding
option. If files do not open in binary mode, the encoding
will be determined by locale.getpreferredencoding(False)
or user’s input.
>>> with open("/etc/hosts", encoding="utf-8") as f:
... content = f.read()
...
>>> print(type(content))
<class 'str'>
Binary mode
>>> with open("/etc/hosts", "rb") as f:
... content = f.read()
...
>>> print(type(content))
<class 'bytes'>
Readline¶
>>> with open("/etc/hosts") as f:
... for line in f:
... print(line, end='')
...
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
Reading File Chunks¶
>>> chunk_size = 16
>>> content = ''
>>> with open('/etc/hosts') as f:
... for c in iter(lambda: f.read(chunk_size), ''):
... content += c
...
>>> print(content)
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
Write a File¶
>>> content = "Awesome Python!"
>>> with open("foo.txt", "w") as f:
... f.write(content)
Create a Symbolic Link¶
>>> import os
>>> os.symlink("foo", "bar")
>>> os.readlink("bar")
'foo'
Copy a File¶
>>> from distutils.file_util import copy_file
>>> copy_file("foo", "bar")
('bar', 1)
Move a File¶
>>> from distutils.file_util import move_file
>>> move_file("./foo", "./bar")
'./bar'
List a Directory¶
>>> >>> import os
>>> dirs = os.listdir(".")
After Python 3.6, we can use os.scandir
to list a directory. It is more
convenient because os.scandir
return an iterator of os.DirEntry
objects.
In this case, we can get file information through access the attributes of
os.DirEntry
. Further information can be found on the
document.
>>> with os.scandir("foo") as it:
... for entry in it:
... st = entry.stat()
...
Create Directories¶
Similar to mkdir -p /path/to/dest
>>> from distutils.dir_util import mkpath
>>> mkpath("foo/bar/baz")
['foo', 'foo/bar', 'foo/bar/baz']
Copy a Directory¶
>>> from distutils.dir_util import copy_tree
>>> copy_tree("foo", "bar")
['bar/baz']
Remove a Directory¶
>>> from distutils.dir_util import remove_tree
>>> remove_tree("dir")
Path Join¶
>>> from pathlib import Path
>>> p = Path("/Users")
>>> p = p / "Guido" / "pysheeet"
>>> p
PosixPath('/Users/Guido/pysheeet')
Get Absolute Path¶
>>> from pathlib import Path
>>> p = Path("README.rst")
PosixPath('/Users/Guido/pysheeet/README.rst')
Get Home Directory¶
>>> from pathlib import Path
>>> Path.home()
PosixPath('/Users/Guido')
Get Current Directory¶
>>> from pathlib import Path
>>> p = Path("README.rst")
>>> p.cwd()
PosixPath('/Users/Guido/pysheeet')
Get Path Properties¶
>>> from pathlib import Path
>>> p = Path("README.rst").absolute()
>>> p.root
'/'
>>> p.anchor
'/'
>>> p.parent
PosixPath('/Users/Guido/pysheeet')
>>> p.parent.parent
PosixPath('/Users/Guido')
>>> p.name
'README.rst'
>>> p.suffix
'.rst'
>>> p.stem
'README'
>>> p.as_uri()
'file:///Users/Guido/pysheeet/README.rst'
Read a gzip CSV¶
import gzip
import csv
f = "example.gz"
with gzip.open(f, 'rt', newline='') as gz:
reader = csv.DictReader(gz)
for row in reader:
print(row)
Linux Inotify¶
import selectors
import struct
import ctypes
import sys
import os
from pathlib import Path
from ctypes.util import find_library
# ref: <sys/inotify.h>
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
INOTIFY_EVENT = "iIII"
INOTIFY_EVENT_LEN = struct.calcsize(INOTIFY_EVENT)
lib = find_library("c")
assert lib
libc = ctypes.CDLL(lib)
class Inotify(object):
def __init__(self, path):
self._path = path
self._fd = None
self._wd = None
self._buf = b""
self._sel = selectors.DefaultSelector()
def init(self):
fd = libc.inotify_init()
if fd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return fd
def watch(self, fd, path):
p = str(path).encode("utf8")
wd = libc.inotify_add_watch(fd, p, IN_CREATE | IN_DELETE)
if wd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return wd
def remove(self, fd, wd):
libc.inotify_rm_watch(self._fd, self._wd)
def handle(self, fd, *a):
b = os.read(fd, 1024)
if not b:
return
yield from self.parse(b);
def parse(self, buf):
self._buf += buf
while True:
l = len(self._buf)
if l < INOTIFY_EVENT_LEN:
break
hd = self._buf[:INOTIFY_EVENT_LEN]
wd, mask, cookie, length = struct.unpack(INOTIFY_EVENT, hd)
event_length = INOTIFY_EVENT_LEN + length
if l < event_length:
break
filename = self._buf[INOTIFY_EVENT_LEN:event_length]
self._buf = self._buf[event_length:]
yield mask, filename.rstrip(b"\0").decode("utf8")
def __enter__(self):
self._fd = self.init()
self._wd = self.watch(self._fd, self._path)
self._sel.register(self._fd, selectors.EVENT_READ, self.handle)
return self
def __exit__(self, *e):
self.remove(self._fd, self._wd)
if len(e) > 0 and e[0]:
print(e, file=sys.stderr)
def run(self):
while True:
events = self._sel.select()
for k, mask in events:
cb = k.data
yield from cb(k.fileobj, mask)
with Inotify(Path("/tmp")) as i:
for m, f in i.run():
print(m, f)