The exact scenario:
1. Both reader and writer connect to a FIFO.
2. Reader disconnects for some reason.
3. Writer writes N bytes of data and flushes it (and in the process gets a BrokenPipeError as expected).
4. Reader re-connects.
5. Writer does a flush (or disconnects, which implicitly invokes flush).
6. Reader reads data from the pipe (if any).
The main question here is:
How many bytes (sent in the 3rd step) did the reader receive in the end?
The answer to the question is pretty important as in the end it tells you whether to attempt to re-send the data (note that the reader can re-connect to the same FIFO and the communication can continue) or just re-flush the buffer as the data is already stored somewhere internally. Initially I actually expected there to be a way to check how many bytes are already in the buffer after the write, but that seemed not to be the case (or at least I couldn't find a way to do it without hacking too deep into the objects memory internals).
I've started with an experiment like this:
#!/usr/bin/python3
f = open('/tmp/mypipe', 'w')
input() # Wait for reader to connect and disconnect.
f.write("A"*N) # For various values of N.
f.flush()
input() # Waif for reader to re-connect.
f.close() # Implicit flush.
For N<=4096 reader actually received all the sent data, i.e. the data was successfully saved inside the internal buffer and re-transmitted on the implicit flush when close() was called. When N was larger than 4K, no data was received.
Where does the 4K boundary come from? It's a nice number and all, but instead of relying on the value itself it's always better to know where it is defined.
After digging into Python 3.6 source code and documentation I discovered two clues:
1. In Modules/_io/_iomodule.h (also exported as io.DEFAULT_BUFFER_SIZE):
#define DEFAULT_BUFFER_SIZE (8 * 1024) /* bytes */
Wait, but that's 8K, not 4K...
2. In the documentation for io.DEFAULT_BUFFER_SIZE:
io.DEFAULT_BUFFER_SIZE
An int containing the default buffer size used by the module’s buffered I/O classes. open() uses the file’s blksize (as obtained by os.stat()) if possible.
Checking os.stat('/tmp/mypipe').st_blksize indeed yields 4K sa the value.
Knowing the above I figured that it surely must mean that if N is lower-than-or-equal-to 4K, then the BrokenPipeError exception is raised on flush(), but if it's larger than 4K then the exception would be raised on write() (as the internal buffer would be omitted and the write would be passed straight into the lower layers of I/O).
Of course, that proved not to be the case.
I've automated the way to test this and got the following results (on Python 3.5 and 3.6):
--- Testing 1 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 1 byte(s).
--- Testing 4096 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 4096 byte(s).
--- Testing 4097 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 0 byte(s).
--- Testing 8192 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 0 byte(s).
--- Testing 8193 byte(s)
Writer raised BrokenPipeError on write().
Reader read 0 byte(s).
There seems to be an area between 4K+1 and 8K bytes where write() is still successful, but the reader doesn't receive any data at all. This points to another layer of buffering (third one if you're counting the kernel one as well), which actually has a buffer of 8K (?io.DEFAULT_BUFFER_SIZE?) and perhaps doesn't necessarily make sure the data actually is written to lower I/O levels before clearing the buffer in error conditions (for example when encountering the BrokenPipeError).
I traced this back to the manner I'm opening the file object - my test cases always opened it in text mode (i.e. "w"), but when I switched it to binary mode (i.e. "wb"), the behavior suddenly changed to this:
--- Testing 1 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 1 byte(s).
--- Testing 4096 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 4096 byte(s).
--- Testing 4097 byte(s)
Writer raised BrokenPipeError on write().
Reader read 0 byte(s).
--- Testing 8192 byte(s)
Writer raised BrokenPipeError on write().
Reader read 0 byte(s).
--- Testing 8193 byte(s)
Writer raised BrokenPipeError on write().
Reader read 0 byte(s).
Now it behaves as expected! So who's the culprit? Let's compare the file object layers for both the "text" and "binary" cases. First, "text" one:
>>> p = open('/tmp/mypipe', 'w')
>>> p
<_io.TextIOWrapper name='/tmp/mypipe' mode='w' encoding='UTF-8'>
>>> p.buffer
<_io.BufferedWriter name='/tmp/mypipe'>
>>> p.buffer.raw
<_io.FileIO name='/tmp/mypipe' mode='wb' closefd=True>
>>> p.close()
And now the "binary" case:
>>> p = open('/tmp/mypipe', 'wb')
>>> p
<_io.BufferedWriter name='/tmp/mypipe'>
>>> p.raw
<_io.FileIO name='/tmp/mypipe' mode='wb' closefd=True>
The only additional layer seems to be TextIOWrapper. After looking into it's code (Modules/_io/textio.c) I found another buffer, 8K in size as expected, and the following flush function:
/* Flush the internal write buffer. This doesn't explicitly flush the
underlying buffered object, though. */
static int
_textiowrapper_writeflush(textio *self)
{
PyObject *pending, *b, *ret;
if (self->pending_bytes == NULL)
return 0;
pending = self->pending_bytes;
Py_INCREF(pending);
self->pending_bytes_count = 0;
Py_CLEAR(self->pending_bytes);
b = _PyBytes_Join(_PyIO_empty_bytes, pending);
Py_DECREF(pending);
if (b == NULL)
return -1;
ret = NULL;
do {
ret = PyObject_CallMethodObjArgs(self->buffer,
_PyIO_str_write, b, NULL);
} while (ret == NULL && _PyIO_trap_eintr());
Py_DECREF(b);
if (ret == NULL)
return -1;
Py_DECREF(ret);
return 0;
}
The issue with the above code is that it doesn't make sure the data is successfully written to the underlying layer before removing it from its internal buffer (yellow part) - this behavior differs from that of _io.BufferedWriter+_io.FileIO (which do keep the data in the buffer on similar error conditions) and explains the 4K+1...8K data size weirdness.
By the way...
If you'd like to learn SSH in depth, in the second half of January'25 we're running a 6h course - you can find the details at hexarcana.ch/workshops/ssh-course
Well, knowing the above I could finally write some code which correctly re-tries either write+flush or only flush when hit with the BrokenPipeError exception.
try:
fd.write(final)
fd.flush()
continue # Done.
except BrokenPipeError:
pass
redo_write = (len(final) > self._sink_buffer_size) # 4096
while not self._the_end.is_set():
try:
if redo_write:
fd.write(final)
fd.flush()
break # Done.
except BrokenPipeError:
time.sleep(5) # Wait 5 seconds for a reconnect.
I'm still double checking if this is enough to make sure the writer side doesn't loose/duplicate any data, but it seems as soon as the data reaches kernel internal buffer (which seems to be 72KB or so) the ball is reader's court.
And that's it.
Add a comment: